Skip to main content

commonware_storage/archive/immutable/
storage.rs

1use crate::{
2    archive::{immutable::Config, Error, Identifier},
3    freezer::{self, Checkpoint, Cursor, Freezer},
4    metadata::{self, Metadata},
5    ordinal::{self, Ordinal},
6};
7use commonware_codec::{CodecShared, EncodeSize, FixedSize, Read, ReadExt, Write};
8use commonware_runtime::{Buf, BufMut, Clock, Metrics, Storage};
9use commonware_utils::{bitmap::BitMap, sequence::prefixed_u64::U64, Array};
10use futures::join;
11use prometheus_client::metrics::counter::Counter;
12use std::collections::BTreeMap;
13use tracing::debug;
14
15/// Prefix for [Freezer] records.
16const FREEZER_PREFIX: u8 = 0;
17
18/// Prefix for [Ordinal] records.
19const ORDINAL_PREFIX: u8 = 1;
20
21/// Item stored in [Metadata] to ensure [Freezer] and [Ordinal] remain consistent.
22#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
23enum Record {
24    Freezer(Checkpoint),
25    Ordinal(Option<BitMap>),
26}
27
28impl Record {
29    /// Get the [Freezer] [Checkpoint] from the [Record].
30    fn freezer(&self) -> &Checkpoint {
31        match self {
32            Self::Freezer(checkpoint) => checkpoint,
33            _ => panic!("incorrect record"),
34        }
35    }
36
37    /// Get the [Ordinal] [BitMap] from the [Record].
38    fn ordinal(&self) -> &Option<BitMap> {
39        match self {
40            Self::Ordinal(indices) => indices,
41            _ => panic!("incorrect record"),
42        }
43    }
44}
45
46impl Write for Record {
47    fn write(&self, buf: &mut impl BufMut) {
48        match self {
49            Self::Freezer(checkpoint) => {
50                buf.put_u8(0);
51                checkpoint.write(buf);
52            }
53            Self::Ordinal(indices) => {
54                buf.put_u8(1);
55                indices.write(buf);
56            }
57        }
58    }
59}
60
61impl Read for Record {
62    type Cfg = ();
63    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
64        let tag = u8::read(buf)?;
65        match tag {
66            0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
67            1 => Ok(Self::Ordinal(Option::<BitMap>::read_cfg(
68                buf,
69                &(usize::MAX as u64),
70            )?)),
71            _ => Err(commonware_codec::Error::InvalidEnum(tag)),
72        }
73    }
74}
75
76impl EncodeSize for Record {
77    fn encode_size(&self) -> usize {
78        1 + match self {
79            Self::Freezer(_) => Checkpoint::SIZE,
80            Self::Ordinal(indices) => indices.encode_size(),
81        }
82    }
83}
84
85/// An immutable key-value store for ordered data with a minimal memory footprint.
86pub struct Archive<E: Storage + Metrics + Clock, K: Array, V: CodecShared> {
87    /// Number of items per section.
88    items_per_section: u64,
89
90    /// Metadata for the archive.
91    metadata: Metadata<E, U64, Record>,
92
93    /// Freezer for the archive.
94    freezer: Freezer<E, K, V>,
95
96    /// Ordinal for the archive.
97    ordinal: Ordinal<E, Cursor>,
98
99    // Metrics
100    gets: Counter,
101    has: Counter,
102    syncs: Counter,
103}
104
105impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Archive<E, K, V> {
106    /// Initialize a new [Archive] with the given [Config].
107    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
108        // Initialize metadata
109        let metadata = Metadata::<E, U64, Record>::init(
110            context.with_label("metadata"),
111            metadata::Config {
112                partition: cfg.metadata_partition,
113                codec_config: (),
114            },
115        )
116        .await?;
117
118        // Get checkpoint
119        let freezer_key = U64::new(FREEZER_PREFIX, 0);
120        let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
121
122        // Initialize table
123        //
124        // TODO (#1227): Use sharded metadata to provide consistency
125        let freezer = Freezer::init_with_checkpoint(
126            context.with_label("freezer"),
127            freezer::Config {
128                key_partition: cfg.freezer_key_partition,
129                key_write_buffer: cfg.freezer_key_write_buffer,
130                key_page_cache: cfg.freezer_key_page_cache,
131                value_partition: cfg.freezer_value_partition,
132                value_compression: cfg.freezer_value_compression,
133                value_write_buffer: cfg.freezer_value_write_buffer,
134                value_target_size: cfg.freezer_value_target_size,
135                table_partition: cfg.freezer_table_partition,
136                table_initial_size: cfg.freezer_table_initial_size,
137                table_resize_frequency: cfg.freezer_table_resize_frequency,
138                table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
139                table_replay_buffer: cfg.replay_buffer,
140                codec_config: cfg.codec_config,
141            },
142            checkpoint,
143        )
144        .await?;
145
146        // Collect sections
147        let sections = metadata
148            .keys()
149            .filter(|k| k.prefix() == ORDINAL_PREFIX)
150            .collect::<Vec<_>>();
151        let mut section_bits = BTreeMap::new();
152        for section in sections {
153            // Get record
154            let bits = metadata.get(section).unwrap().ordinal();
155
156            // Get section
157            let section = section.value();
158            section_bits.insert(section, bits);
159        }
160
161        // Initialize ordinal
162        //
163        // TODO (#1227): Use sharded metadata to provide consistency
164        let ordinal = Ordinal::init_with_bits(
165            context.with_label("ordinal"),
166            ordinal::Config {
167                partition: cfg.ordinal_partition,
168                items_per_blob: cfg.items_per_section,
169                write_buffer: cfg.ordinal_write_buffer,
170                replay_buffer: cfg.replay_buffer,
171            },
172            Some(section_bits),
173        )
174        .await?;
175
176        // Initialize metrics
177        let gets = Counter::default();
178        let has = Counter::default();
179        let syncs = Counter::default();
180        context.register("gets", "Number of gets performed", gets.clone());
181        context.register("has", "Number of has performed", has.clone());
182        context.register("syncs", "Number of syncs called", syncs.clone());
183
184        Ok(Self {
185            items_per_section: cfg.items_per_section.get(),
186            metadata,
187            freezer,
188            ordinal,
189            gets,
190            has,
191            syncs,
192        })
193    }
194
195    /// Get the value for the given index.
196    async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
197        // Get ordinal
198        let Some(cursor) = self.ordinal.get(index).await? else {
199            return Ok(None);
200        };
201
202        // Get journal entry
203        let result = self
204            .freezer
205            .get(freezer::Identifier::Cursor(cursor))
206            .await?;
207
208        // Get value
209        Ok(result)
210    }
211
212    /// Get the value for the given key.
213    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
214        // Get table entry
215        let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
216
217        // Get value
218        Ok(result)
219    }
220
221    /// Initialize the section.
222    async fn initialize_section(&mut self, section: u64) {
223        // Create active bit vector
224        let bits = BitMap::zeroes(self.items_per_section);
225
226        // Store record
227        let key = U64::new(ORDINAL_PREFIX, section);
228        self.metadata.put(key, Record::Ordinal(Some(bits)));
229        debug!(section, "initialized section");
230    }
231}
232
233impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> crate::archive::Archive
234    for Archive<E, K, V>
235{
236    type Key = K;
237    type Value = V;
238
239    async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
240        // Ignore duplicates
241        if self.ordinal.has(index) {
242            return Ok(());
243        }
244
245        // Initialize section if it doesn't exist
246        let section = index / self.items_per_section;
247        let ordinal_key = U64::new(ORDINAL_PREFIX, section);
248        if self.metadata.get(&ordinal_key).is_none() {
249            self.initialize_section(section).await;
250        }
251        let record = self.metadata.get_mut(&ordinal_key).unwrap();
252
253        // Update active bits
254        let done = if let Record::Ordinal(Some(bits)) = record {
255            bits.set(index % self.items_per_section, true);
256            bits.count_ones() == self.items_per_section
257        } else {
258            false
259        };
260        if done {
261            *record = Record::Ordinal(None);
262        }
263
264        // Put in table
265        let cursor = self.freezer.put(key, data).await?;
266
267        // Put section and offset in ordinal
268        self.ordinal.put(index, cursor).await?;
269
270        Ok(())
271    }
272
273    async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
274        self.gets.inc();
275
276        match identifier {
277            Identifier::Index(index) => self.get_index(index).await,
278            Identifier::Key(key) => self.get_key(key).await,
279        }
280    }
281
282    async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
283        self.has.inc();
284
285        match identifier {
286            Identifier::Index(index) => Ok(self.ordinal.has(index)),
287            Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
288        }
289    }
290
291    async fn sync(&mut self) -> Result<(), Error> {
292        self.syncs.inc();
293
294        // Sync journal and ordinal
295        let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
296        let checkpoint = freezer_result?;
297        ordinal_result?;
298
299        // Update checkpoint
300        let freezer_key = U64::new(FREEZER_PREFIX, 0);
301        self.metadata.put(freezer_key, Record::Freezer(checkpoint));
302
303        // Sync metadata once underlying are synced
304        self.metadata.sync().await?;
305
306        Ok(())
307    }
308
309    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
310        self.ordinal.next_gap(index)
311    }
312
313    fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
314        self.ordinal.missing_items(index, max)
315    }
316
317    fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
318        self.ordinal.ranges()
319    }
320
321    fn first_index(&self) -> Option<u64> {
322        self.ordinal.first_index()
323    }
324
325    fn last_index(&self) -> Option<u64> {
326        self.ordinal.last_index()
327    }
328
329    async fn destroy(self) -> Result<(), Error> {
330        // Destroy ordinal
331        self.ordinal.destroy().await?;
332
333        // Destroy freezer
334        self.freezer.destroy().await?;
335
336        // Destroy metadata
337        self.metadata.destroy().await?;
338
339        Ok(())
340    }
341}
342
343#[cfg(all(test, feature = "arbitrary"))]
344mod conformance {
345    use super::*;
346    use commonware_codec::conformance::CodecConformance;
347
348    commonware_conformance::conformance_tests! {
349        CodecConformance<Record>
350    }
351}