commonware_storage/archive/immutable/
storage.rs1use crate::{
2 archive::{immutable::Config, Error, Identifier},
3 freezer::{self, Checkpoint, Cursor, Freezer},
4 metadata::{self, Metadata},
5 ordinal::{self, Ordinal},
6 Context,
7};
8use commonware_codec::{CodecShared, EncodeSize, FixedSize, Read, ReadExt, Write};
9use commonware_runtime::{
10 telemetry::metrics::{Counter, MetricsExt as _},
11 Buf, BufMut, BufferPooler,
12};
13use commonware_utils::{bitmap::BitMap, sequence::prefixed_u64::U64, Array};
14use futures::join;
15use std::collections::BTreeMap;
16use tracing::debug;
17
18const FREEZER_PREFIX: u8 = 0;
20
21const ORDINAL_PREFIX: u8 = 1;
23
24#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
26enum Record {
27 Freezer(Checkpoint),
28 Ordinal(Option<BitMap>),
29}
30
31impl Record {
32 fn freezer(&self) -> &Checkpoint {
34 match self {
35 Self::Freezer(checkpoint) => checkpoint,
36 _ => panic!("incorrect record"),
37 }
38 }
39
40 fn ordinal(&self) -> &Option<BitMap> {
42 match self {
43 Self::Ordinal(indices) => indices,
44 _ => panic!("incorrect record"),
45 }
46 }
47}
48
49impl Write for Record {
50 fn write(&self, buf: &mut impl BufMut) {
51 match self {
52 Self::Freezer(checkpoint) => {
53 buf.put_u8(0);
54 checkpoint.write(buf);
55 }
56 Self::Ordinal(indices) => {
57 buf.put_u8(1);
58 indices.write(buf);
59 }
60 }
61 }
62}
63
64impl Read for Record {
65 type Cfg = ();
66 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
67 let tag = u8::read(buf)?;
68 match tag {
69 0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
70 1 => Ok(Self::Ordinal(Option::<BitMap>::read_cfg(
71 buf,
72 &(usize::MAX as u64),
73 )?)),
74 _ => Err(commonware_codec::Error::InvalidEnum(tag)),
75 }
76 }
77}
78
79impl EncodeSize for Record {
80 fn encode_size(&self) -> usize {
81 1 + match self {
82 Self::Freezer(_) => Checkpoint::SIZE,
83 Self::Ordinal(indices) => indices.encode_size(),
84 }
85 }
86}
87
88pub struct Archive<E: BufferPooler + Context, K: Array, V: CodecShared> {
90 items_per_section: u64,
92
93 metadata: Metadata<E, U64, Record>,
95
96 freezer: Freezer<E, K, V>,
98
99 ordinal: Ordinal<E, Cursor>,
101
102 gets: Counter,
104 has: Counter,
105 syncs: Counter,
106}
107
108impl<E: BufferPooler + Context, K: Array, V: CodecShared> Archive<E, K, V> {
109 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
111 let metadata = Metadata::<E, U64, Record>::init(
113 context.child("metadata"),
114 metadata::Config {
115 partition: cfg.metadata_partition,
116 codec_config: (),
117 },
118 )
119 .await?;
120
121 let freezer_key = U64::new(FREEZER_PREFIX, 0);
123 let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
124
125 let freezer = Freezer::init_with_checkpoint(
129 context.child("freezer"),
130 freezer::Config {
131 key_partition: cfg.freezer_key_partition,
132 key_write_buffer: cfg.freezer_key_write_buffer,
133 key_page_cache: cfg.freezer_key_page_cache,
134 value_partition: cfg.freezer_value_partition,
135 value_compression: cfg.freezer_value_compression,
136 value_write_buffer: cfg.freezer_value_write_buffer,
137 value_target_size: cfg.freezer_value_target_size,
138 table_partition: cfg.freezer_table_partition,
139 table_initial_size: cfg.freezer_table_initial_size,
140 table_resize_frequency: cfg.freezer_table_resize_frequency,
141 table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
142 table_replay_buffer: cfg.replay_buffer,
143 codec_config: cfg.codec_config,
144 },
145 checkpoint,
146 )
147 .await?;
148
149 let sections = metadata
151 .keys()
152 .filter(|k| k.prefix() == ORDINAL_PREFIX)
153 .collect::<Vec<_>>();
154 let mut section_bits = BTreeMap::new();
155 for section in sections {
156 let bits = metadata.get(section).unwrap().ordinal();
158
159 let section = section.value();
161 section_bits.insert(section, bits);
162 }
163
164 let ordinal = Ordinal::init_with_bits(
168 context.child("ordinal"),
169 ordinal::Config {
170 partition: cfg.ordinal_partition,
171 items_per_blob: cfg.items_per_section,
172 write_buffer: cfg.ordinal_write_buffer,
173 replay_buffer: cfg.replay_buffer,
174 },
175 Some(section_bits),
176 )
177 .await?;
178
179 let gets = context.counter("gets", "Number of gets performed");
181 let has = context.counter("has", "Number of has performed");
182 let syncs = context.counter("syncs", "Number of syncs called");
183
184 Ok(Self {
185 items_per_section: cfg.items_per_section.get(),
186 metadata,
187 freezer,
188 ordinal,
189 gets,
190 has,
191 syncs,
192 })
193 }
194
195 async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
197 let Some(cursor) = self.ordinal.get(index).await? else {
199 return Ok(None);
200 };
201
202 let result = self
204 .freezer
205 .get(freezer::Identifier::Cursor(cursor))
206 .await?;
207
208 Ok(result)
210 }
211
212 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
214 let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
216
217 Ok(result)
219 }
220
221 fn initialize_section(&mut self, section: u64) {
223 let bits = BitMap::zeroes(self.items_per_section);
225
226 let key = U64::new(ORDINAL_PREFIX, section);
228 self.metadata.put(key, Record::Ordinal(Some(bits)));
229 debug!(section, "initialized section");
230 }
231}
232
233impl<E: BufferPooler + Context, K: Array, V: CodecShared> crate::archive::Archive
234 for Archive<E, K, V>
235{
236 type Key = K;
237 type Value = V;
238
239 async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
240 if self.ordinal.has(index) {
242 return Ok(());
243 }
244
245 let section = index / self.items_per_section;
247 let ordinal_key = U64::new(ORDINAL_PREFIX, section);
248 if self.metadata.get(&ordinal_key).is_none() {
249 self.initialize_section(section);
250 }
251 let record = self.metadata.get_mut(&ordinal_key).unwrap();
252
253 let done = if let Record::Ordinal(Some(bits)) = record {
255 bits.set(index % self.items_per_section, true);
256 bits.count_ones() == self.items_per_section
257 } else {
258 false
259 };
260 if done {
261 *record = Record::Ordinal(None);
262 }
263
264 let cursor = self.freezer.put(key, data).await?;
266
267 self.ordinal.put(index, cursor).await?;
269
270 Ok(())
271 }
272
273 async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
274 self.gets.inc();
275
276 match identifier {
277 Identifier::Index(index) => self.get_index(index).await,
278 Identifier::Key(key) => self.get_key(key).await,
279 }
280 }
281
282 async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
283 self.has.inc();
284
285 match identifier {
286 Identifier::Index(index) => Ok(self.ordinal.has(index)),
287 Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
288 }
289 }
290
291 async fn sync(&mut self) -> Result<(), Error> {
292 self.syncs.inc();
293
294 let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
296 let checkpoint = freezer_result?;
297 ordinal_result?;
298
299 let freezer_key = U64::new(FREEZER_PREFIX, 0);
302 self.metadata
303 .put_sync(freezer_key, Record::Freezer(checkpoint))
304 .await?;
305
306 Ok(())
307 }
308
309 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
310 self.ordinal.next_gap(index)
311 }
312
313 fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
314 self.ordinal.missing_items(index, max)
315 }
316
317 fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
318 self.ordinal.ranges()
319 }
320
321 fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)> {
322 self.ordinal.ranges_from(from)
323 }
324
325 fn first_index(&self) -> Option<u64> {
326 self.ordinal.first_index()
327 }
328
329 fn last_index(&self) -> Option<u64> {
330 self.ordinal.last_index()
331 }
332
333 async fn destroy(self) -> Result<(), Error> {
334 self.ordinal.destroy().await?;
336
337 self.freezer.destroy().await?;
339
340 self.metadata.destroy().await?;
342
343 Ok(())
344 }
345}
346
347#[cfg(all(test, feature = "arbitrary"))]
348mod conformance {
349 use super::*;
350 use commonware_codec::conformance::CodecConformance;
351
352 commonware_conformance::conformance_tests! {
353 CodecConformance<Record>
354 }
355}