commonware_storage/archive/immutable/
storage.rs

1use crate::{
2    archive::{immutable::Config, Error, Identifier},
3    freezer::{self, Checkpoint, Cursor, Freezer},
4    metadata::{self, Metadata},
5    ordinal::{self, Ordinal},
6};
7use bytes::{Buf, BufMut};
8use commonware_codec::{Codec, EncodeSize, FixedSize, Read, ReadExt, Write};
9use commonware_runtime::{Clock, Metrics, Storage};
10use commonware_utils::{bitmap::BitMap, sequence::prefixed_u64::U64, Array};
11use futures::join;
12use prometheus_client::metrics::counter::Counter;
13use std::collections::BTreeMap;
14use tracing::debug;
15
16/// Prefix for [Freezer] records.
17const FREEZER_PREFIX: u8 = 0;
18
19/// Prefix for [Ordinal] records.
20const ORDINAL_PREFIX: u8 = 1;
21
22/// Item stored in [Metadata] to ensure [Freezer] and [Ordinal] remain consistent.
23#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
24enum Record {
25    Freezer(Checkpoint),
26    Ordinal(Option<BitMap>),
27}
28
29impl Record {
30    /// Get the [Freezer] [Checkpoint] from the [Record].
31    fn freezer(&self) -> &Checkpoint {
32        match self {
33            Self::Freezer(checkpoint) => checkpoint,
34            _ => panic!("incorrect record"),
35        }
36    }
37
38    /// Get the [Ordinal] [BitMap] from the [Record].
39    fn ordinal(&self) -> &Option<BitMap> {
40        match self {
41            Self::Ordinal(indices) => indices,
42            _ => panic!("incorrect record"),
43        }
44    }
45}
46
47impl Write for Record {
48    fn write(&self, buf: &mut impl BufMut) {
49        match self {
50            Self::Freezer(checkpoint) => {
51                buf.put_u8(0);
52                checkpoint.write(buf);
53            }
54            Self::Ordinal(indices) => {
55                buf.put_u8(1);
56                indices.write(buf);
57            }
58        }
59    }
60}
61
62impl Read for Record {
63    type Cfg = ();
64    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
65        let tag = u8::read(buf)?;
66        match tag {
67            0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
68            1 => Ok(Self::Ordinal(Option::<BitMap>::read_cfg(
69                buf,
70                &(usize::MAX as u64),
71            )?)),
72            _ => Err(commonware_codec::Error::InvalidEnum(tag)),
73        }
74    }
75}
76
77impl EncodeSize for Record {
78    fn encode_size(&self) -> usize {
79        1 + match self {
80            Self::Freezer(_) => Checkpoint::SIZE,
81            Self::Ordinal(indices) => indices.encode_size(),
82        }
83    }
84}
85
86/// An immutable key-value store for ordered data with a minimal memory footprint.
87pub struct Archive<E: Storage + Metrics + Clock, K: Array, V: Codec> {
88    /// Number of items per section.
89    items_per_section: u64,
90
91    /// Metadata for the archive.
92    metadata: Metadata<E, U64, Record>,
93
94    /// Freezer for the archive.
95    freezer: Freezer<E, K, V>,
96
97    /// Ordinal for the archive.
98    ordinal: Ordinal<E, Cursor>,
99
100    // Metrics
101    gets: Counter,
102    has: Counter,
103    syncs: Counter,
104}
105
106impl<E: Storage + Metrics + Clock, K: Array, V: Codec> Archive<E, K, V> {
107    /// Initialize a new [Archive] with the given [Config].
108    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109        // Initialize metadata
110        let metadata = Metadata::<E, U64, Record>::init(
111            context.with_label("metadata"),
112            metadata::Config {
113                partition: cfg.metadata_partition,
114                codec_config: (),
115            },
116        )
117        .await?;
118
119        // Get checkpoint
120        let freezer_key = U64::new(FREEZER_PREFIX, 0);
121        let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
122
123        // Initialize table
124        //
125        // TODO (#1227): Use sharded metadata to provide consistency
126        let freezer = Freezer::init_with_checkpoint(
127            context.with_label("freezer"),
128            freezer::Config {
129                journal_partition: cfg.freezer_journal_partition,
130                journal_compression: cfg.freezer_journal_compression,
131                journal_write_buffer: cfg.write_buffer,
132                journal_target_size: cfg.freezer_journal_target_size,
133                journal_buffer_pool: cfg.freezer_journal_buffer_pool,
134                table_partition: cfg.freezer_table_partition,
135                table_initial_size: cfg.freezer_table_initial_size,
136                table_resize_frequency: cfg.freezer_table_resize_frequency,
137                table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
138                table_replay_buffer: cfg.replay_buffer,
139                codec_config: cfg.codec_config,
140            },
141            checkpoint,
142        )
143        .await?;
144
145        // Collect sections
146        let sections = metadata
147            .keys()
148            .filter(|k| k.prefix() == ORDINAL_PREFIX)
149            .collect::<Vec<_>>();
150        let mut section_bits = BTreeMap::new();
151        for section in sections {
152            // Get record
153            let bits = metadata.get(section).unwrap().ordinal();
154
155            // Get section
156            let section = section.value();
157            section_bits.insert(section, bits);
158        }
159
160        // Initialize ordinal
161        //
162        // TODO (#1227): Use sharded metadata to provide consistency
163        let ordinal = Ordinal::init_with_bits(
164            context.with_label("ordinal"),
165            ordinal::Config {
166                partition: cfg.ordinal_partition,
167                items_per_blob: cfg.items_per_section,
168                write_buffer: cfg.write_buffer,
169                replay_buffer: cfg.replay_buffer,
170            },
171            Some(section_bits),
172        )
173        .await?;
174
175        // Initialize metrics
176        let gets = Counter::default();
177        let has = Counter::default();
178        let syncs = Counter::default();
179        context.register("gets", "Number of gets performed", gets.clone());
180        context.register("has", "Number of has performed", has.clone());
181        context.register("syncs", "Number of syncs called", syncs.clone());
182
183        Ok(Self {
184            items_per_section: cfg.items_per_section.get(),
185            metadata,
186            freezer,
187            ordinal,
188            gets,
189            has,
190            syncs,
191        })
192    }
193
194    /// Get the value for the given index.
195    async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
196        // Get ordinal
197        let Some(cursor) = self.ordinal.get(index).await? else {
198            return Ok(None);
199        };
200
201        // Get journal entry
202        let result = self
203            .freezer
204            .get(freezer::Identifier::Cursor(cursor))
205            .await?;
206
207        // Get value
208        Ok(result)
209    }
210
211    /// Get the value for the given key.
212    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
213        // Get table entry
214        let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
215
216        // Get value
217        Ok(result)
218    }
219
220    /// Initialize the section.
221    async fn initialize_section(&mut self, section: u64) {
222        // Create active bit vector
223        let bits = BitMap::zeroes(self.items_per_section);
224
225        // Store record
226        let key = U64::new(ORDINAL_PREFIX, section);
227        self.metadata.put(key, Record::Ordinal(Some(bits)));
228        debug!(section, "initialized section");
229    }
230}
231
232impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::archive::Archive
233    for Archive<E, K, V>
234{
235    type Key = K;
236    type Value = V;
237
238    async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
239        // Ignore duplicates
240        if self.ordinal.has(index) {
241            return Ok(());
242        }
243
244        // Initialize section if it doesn't exist
245        let section = index / self.items_per_section;
246        let ordinal_key = U64::new(ORDINAL_PREFIX, section);
247        if self.metadata.get(&ordinal_key).is_none() {
248            self.initialize_section(section).await;
249        }
250        let record = self.metadata.get_mut(&ordinal_key).unwrap();
251
252        // Update active bits
253        let done = if let Record::Ordinal(Some(bits)) = record {
254            bits.set(index % self.items_per_section, true);
255            bits.count_ones() == self.items_per_section
256        } else {
257            false
258        };
259        if done {
260            *record = Record::Ordinal(None);
261        }
262
263        // Put in table
264        let cursor = self.freezer.put(key, data).await?;
265
266        // Put section and offset in ordinal
267        self.ordinal.put(index, cursor).await?;
268
269        Ok(())
270    }
271
272    async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
273        self.gets.inc();
274
275        match identifier {
276            Identifier::Index(index) => self.get_index(index).await,
277            Identifier::Key(key) => self.get_key(key).await,
278        }
279    }
280
281    async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
282        self.has.inc();
283
284        match identifier {
285            Identifier::Index(index) => Ok(self.ordinal.has(index)),
286            Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
287        }
288    }
289
290    async fn sync(&mut self) -> Result<(), Error> {
291        self.syncs.inc();
292
293        // Sync journal and ordinal
294        let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
295        let checkpoint = freezer_result?;
296        ordinal_result?;
297
298        // Update checkpoint
299        let freezer_key = U64::new(FREEZER_PREFIX, 0);
300        self.metadata.put(freezer_key, Record::Freezer(checkpoint));
301
302        // Sync metadata once underlying are synced
303        self.metadata.sync().await?;
304
305        Ok(())
306    }
307
308    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
309        self.ordinal.next_gap(index)
310    }
311
312    fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
313        self.ordinal.missing_items(index, max)
314    }
315
316    fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
317        self.ordinal.ranges()
318    }
319
320    fn first_index(&self) -> Option<u64> {
321        self.ordinal.first_index()
322    }
323
324    fn last_index(&self) -> Option<u64> {
325        self.ordinal.last_index()
326    }
327
328    async fn close(mut self) -> Result<(), Error> {
329        // Close ordinal
330        self.ordinal.close().await?;
331
332        // Close table
333        let checkpoint = self.freezer.close().await?;
334
335        // Update checkpoint
336        let freezer_key = U64::new(FREEZER_PREFIX, 0);
337        self.metadata.put(freezer_key, Record::Freezer(checkpoint));
338
339        // Close metadata
340        self.metadata.close().await?;
341
342        Ok(())
343    }
344
345    async fn destroy(self) -> Result<(), Error> {
346        // Destroy ordinal
347        self.ordinal.destroy().await?;
348
349        // Destroy freezer
350        self.freezer.destroy().await?;
351
352        // Destroy metadata
353        self.metadata.destroy().await?;
354
355        Ok(())
356    }
357}
358
359#[cfg(all(test, feature = "arbitrary"))]
360mod conformance {
361    use super::*;
362    use commonware_codec::conformance::CodecConformance;
363
364    commonware_conformance::conformance_tests! {
365        CodecConformance<Record>
366    }
367}