commonware_storage/archive/immutable/
storage.rs

1use crate::{
2    archive::{immutable::Config, Error, Identifier},
3    freezer::{self, Checkpoint, Cursor, Freezer},
4    metadata::{self, Metadata},
5    ordinal::{self, Ordinal},
6};
7use bytes::{Buf, BufMut};
8use commonware_codec::{CodecShared, EncodeSize, FixedSize, Read, ReadExt, Write};
9use commonware_runtime::{Clock, Metrics, Storage};
10use commonware_utils::{bitmap::BitMap, sequence::prefixed_u64::U64, Array};
11use futures::join;
12use prometheus_client::metrics::counter::Counter;
13use std::collections::BTreeMap;
14use tracing::debug;
15
16/// Prefix for [Freezer] records.
17const FREEZER_PREFIX: u8 = 0;
18
19/// Prefix for [Ordinal] records.
20const ORDINAL_PREFIX: u8 = 1;
21
22/// Item stored in [Metadata] to ensure [Freezer] and [Ordinal] remain consistent.
23#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
24enum Record {
25    Freezer(Checkpoint),
26    Ordinal(Option<BitMap>),
27}
28
29impl Record {
30    /// Get the [Freezer] [Checkpoint] from the [Record].
31    fn freezer(&self) -> &Checkpoint {
32        match self {
33            Self::Freezer(checkpoint) => checkpoint,
34            _ => panic!("incorrect record"),
35        }
36    }
37
38    /// Get the [Ordinal] [BitMap] from the [Record].
39    fn ordinal(&self) -> &Option<BitMap> {
40        match self {
41            Self::Ordinal(indices) => indices,
42            _ => panic!("incorrect record"),
43        }
44    }
45}
46
47impl Write for Record {
48    fn write(&self, buf: &mut impl BufMut) {
49        match self {
50            Self::Freezer(checkpoint) => {
51                buf.put_u8(0);
52                checkpoint.write(buf);
53            }
54            Self::Ordinal(indices) => {
55                buf.put_u8(1);
56                indices.write(buf);
57            }
58        }
59    }
60}
61
62impl Read for Record {
63    type Cfg = ();
64    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
65        let tag = u8::read(buf)?;
66        match tag {
67            0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
68            1 => Ok(Self::Ordinal(Option::<BitMap>::read_cfg(
69                buf,
70                &(usize::MAX as u64),
71            )?)),
72            _ => Err(commonware_codec::Error::InvalidEnum(tag)),
73        }
74    }
75}
76
77impl EncodeSize for Record {
78    fn encode_size(&self) -> usize {
79        1 + match self {
80            Self::Freezer(_) => Checkpoint::SIZE,
81            Self::Ordinal(indices) => indices.encode_size(),
82        }
83    }
84}
85
86/// An immutable key-value store for ordered data with a minimal memory footprint.
87pub struct Archive<E: Storage + Metrics + Clock, K: Array, V: CodecShared> {
88    /// Number of items per section.
89    items_per_section: u64,
90
91    /// Metadata for the archive.
92    metadata: Metadata<E, U64, Record>,
93
94    /// Freezer for the archive.
95    freezer: Freezer<E, K, V>,
96
97    /// Ordinal for the archive.
98    ordinal: Ordinal<E, Cursor>,
99
100    // Metrics
101    gets: Counter,
102    has: Counter,
103    syncs: Counter,
104}
105
106impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Archive<E, K, V> {
107    /// Initialize a new [Archive] with the given [Config].
108    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109        // Initialize metadata
110        let metadata = Metadata::<E, U64, Record>::init(
111            context.with_label("metadata"),
112            metadata::Config {
113                partition: cfg.metadata_partition,
114                codec_config: (),
115            },
116        )
117        .await?;
118
119        // Get checkpoint
120        let freezer_key = U64::new(FREEZER_PREFIX, 0);
121        let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
122
123        // Initialize table
124        //
125        // TODO (#1227): Use sharded metadata to provide consistency
126        let freezer = Freezer::init_with_checkpoint(
127            context.with_label("freezer"),
128            freezer::Config {
129                key_partition: cfg.freezer_key_partition,
130                key_write_buffer: cfg.freezer_key_write_buffer,
131                key_buffer_pool: cfg.freezer_key_buffer_pool,
132                value_partition: cfg.freezer_value_partition,
133                value_compression: cfg.freezer_value_compression,
134                value_write_buffer: cfg.freezer_value_write_buffer,
135                value_target_size: cfg.freezer_value_target_size,
136                table_partition: cfg.freezer_table_partition,
137                table_initial_size: cfg.freezer_table_initial_size,
138                table_resize_frequency: cfg.freezer_table_resize_frequency,
139                table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
140                table_replay_buffer: cfg.replay_buffer,
141                codec_config: cfg.codec_config,
142            },
143            checkpoint,
144        )
145        .await?;
146
147        // Collect sections
148        let sections = metadata
149            .keys()
150            .filter(|k| k.prefix() == ORDINAL_PREFIX)
151            .collect::<Vec<_>>();
152        let mut section_bits = BTreeMap::new();
153        for section in sections {
154            // Get record
155            let bits = metadata.get(section).unwrap().ordinal();
156
157            // Get section
158            let section = section.value();
159            section_bits.insert(section, bits);
160        }
161
162        // Initialize ordinal
163        //
164        // TODO (#1227): Use sharded metadata to provide consistency
165        let ordinal = Ordinal::init_with_bits(
166            context.with_label("ordinal"),
167            ordinal::Config {
168                partition: cfg.ordinal_partition,
169                items_per_blob: cfg.items_per_section,
170                write_buffer: cfg.ordinal_write_buffer,
171                replay_buffer: cfg.replay_buffer,
172            },
173            Some(section_bits),
174        )
175        .await?;
176
177        // Initialize metrics
178        let gets = Counter::default();
179        let has = Counter::default();
180        let syncs = Counter::default();
181        context.register("gets", "Number of gets performed", gets.clone());
182        context.register("has", "Number of has performed", has.clone());
183        context.register("syncs", "Number of syncs called", syncs.clone());
184
185        Ok(Self {
186            items_per_section: cfg.items_per_section.get(),
187            metadata,
188            freezer,
189            ordinal,
190            gets,
191            has,
192            syncs,
193        })
194    }
195
196    /// Get the value for the given index.
197    async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
198        // Get ordinal
199        let Some(cursor) = self.ordinal.get(index).await? else {
200            return Ok(None);
201        };
202
203        // Get journal entry
204        let result = self
205            .freezer
206            .get(freezer::Identifier::Cursor(cursor))
207            .await?;
208
209        // Get value
210        Ok(result)
211    }
212
213    /// Get the value for the given key.
214    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
215        // Get table entry
216        let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
217
218        // Get value
219        Ok(result)
220    }
221
222    /// Initialize the section.
223    async fn initialize_section(&mut self, section: u64) {
224        // Create active bit vector
225        let bits = BitMap::zeroes(self.items_per_section);
226
227        // Store record
228        let key = U64::new(ORDINAL_PREFIX, section);
229        self.metadata.put(key, Record::Ordinal(Some(bits)));
230        debug!(section, "initialized section");
231    }
232}
233
234impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> crate::archive::Archive
235    for Archive<E, K, V>
236{
237    type Key = K;
238    type Value = V;
239
240    async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
241        // Ignore duplicates
242        if self.ordinal.has(index) {
243            return Ok(());
244        }
245
246        // Initialize section if it doesn't exist
247        let section = index / self.items_per_section;
248        let ordinal_key = U64::new(ORDINAL_PREFIX, section);
249        if self.metadata.get(&ordinal_key).is_none() {
250            self.initialize_section(section).await;
251        }
252        let record = self.metadata.get_mut(&ordinal_key).unwrap();
253
254        // Update active bits
255        let done = if let Record::Ordinal(Some(bits)) = record {
256            bits.set(index % self.items_per_section, true);
257            bits.count_ones() == self.items_per_section
258        } else {
259            false
260        };
261        if done {
262            *record = Record::Ordinal(None);
263        }
264
265        // Put in table
266        let cursor = self.freezer.put(key, data).await?;
267
268        // Put section and offset in ordinal
269        self.ordinal.put(index, cursor).await?;
270
271        Ok(())
272    }
273
274    async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
275        self.gets.inc();
276
277        match identifier {
278            Identifier::Index(index) => self.get_index(index).await,
279            Identifier::Key(key) => self.get_key(key).await,
280        }
281    }
282
283    async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
284        self.has.inc();
285
286        match identifier {
287            Identifier::Index(index) => Ok(self.ordinal.has(index)),
288            Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
289        }
290    }
291
292    async fn sync(&mut self) -> Result<(), Error> {
293        self.syncs.inc();
294
295        // Sync journal and ordinal
296        let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
297        let checkpoint = freezer_result?;
298        ordinal_result?;
299
300        // Update checkpoint
301        let freezer_key = U64::new(FREEZER_PREFIX, 0);
302        self.metadata.put(freezer_key, Record::Freezer(checkpoint));
303
304        // Sync metadata once underlying are synced
305        self.metadata.sync().await?;
306
307        Ok(())
308    }
309
310    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
311        self.ordinal.next_gap(index)
312    }
313
314    fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
315        self.ordinal.missing_items(index, max)
316    }
317
318    fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
319        self.ordinal.ranges()
320    }
321
322    fn first_index(&self) -> Option<u64> {
323        self.ordinal.first_index()
324    }
325
326    fn last_index(&self) -> Option<u64> {
327        self.ordinal.last_index()
328    }
329
330    async fn destroy(self) -> Result<(), Error> {
331        // Destroy ordinal
332        self.ordinal.destroy().await?;
333
334        // Destroy freezer
335        self.freezer.destroy().await?;
336
337        // Destroy metadata
338        self.metadata.destroy().await?;
339
340        Ok(())
341    }
342}
343
344#[cfg(all(test, feature = "arbitrary"))]
345mod conformance {
346    use super::*;
347    use commonware_codec::conformance::CodecConformance;
348
349    commonware_conformance::conformance_tests! {
350        CodecConformance<Record>
351    }
352}