commonware_storage/archive/immutable/
storage.rs

1use crate::{
2    archive::{immutable::Config, Error, Identifier},
3    freezer::{self, Checkpoint, Cursor, Freezer},
4    metadata::{self, Metadata},
5    ordinal::{self, Ordinal},
6};
7use bytes::{Buf, BufMut};
8use commonware_codec::{Codec, EncodeSize, FixedSize, Read, ReadExt, Write};
9use commonware_runtime::{Clock, Metrics, Storage};
10use commonware_utils::{sequence::prefixed_u64::U64, Array, BitVec};
11use futures::join;
12use prometheus_client::metrics::counter::Counter;
13use std::collections::BTreeMap;
14use tracing::debug;
15
16/// Prefix for [Freezer] records.
17const FREEZER_PREFIX: u8 = 0;
18
19/// Prefix for [Ordinal] records.
20const ORDINAL_PREFIX: u8 = 1;
21
22/// Item stored in [Metadata] to ensure [Freezer] and [Ordinal] remain consistent.
23enum Record {
24    Freezer(Checkpoint),
25    Ordinal(Option<BitVec>),
26}
27
28impl Record {
29    /// Get the [Freezer] [Checkpoint] from the [Record].
30    fn freezer(&self) -> &Checkpoint {
31        match self {
32            Self::Freezer(checkpoint) => checkpoint,
33            _ => panic!("incorrect record"),
34        }
35    }
36
37    /// Get the [Ordinal] [BitVec] from the [Record].
38    fn ordinal(&self) -> &Option<BitVec> {
39        match self {
40            Self::Ordinal(indices) => indices,
41            _ => panic!("incorrect record"),
42        }
43    }
44}
45
46impl Write for Record {
47    fn write(&self, buf: &mut impl BufMut) {
48        match self {
49            Self::Freezer(checkpoint) => {
50                buf.put_u8(0);
51                checkpoint.write(buf);
52            }
53            Self::Ordinal(indices) => {
54                buf.put_u8(1);
55                indices.write(buf);
56            }
57        }
58    }
59}
60
61impl Read for Record {
62    type Cfg = ();
63    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
64        let tag = u8::read(buf)?;
65        match tag {
66            0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
67            1 => Ok(Self::Ordinal(Option::<BitVec>::read_cfg(
68                buf,
69                &(0..=usize::MAX).into(),
70            )?)),
71            _ => Err(commonware_codec::Error::InvalidEnum(tag)),
72        }
73    }
74}
75
76impl EncodeSize for Record {
77    fn encode_size(&self) -> usize {
78        1 + match self {
79            Self::Freezer(_) => Checkpoint::SIZE,
80            Self::Ordinal(indices) => indices.encode_size(),
81        }
82    }
83}
84
85/// An immutable key-value store for ordered data with a minimal memory footprint.
86pub struct Archive<E: Storage + Metrics + Clock, K: Array, V: Codec> {
87    /// Number of items per section.
88    items_per_section: u64,
89
90    /// Metadata for the archive.
91    metadata: Metadata<E, U64, Record>,
92
93    /// Freezer for the archive.
94    freezer: Freezer<E, K, V>,
95
96    /// Ordinal for the archive.
97    ordinal: Ordinal<E, Cursor>,
98
99    // Metrics
100    gets: Counter,
101    has: Counter,
102    syncs: Counter,
103}
104
105impl<E: Storage + Metrics + Clock, K: Array, V: Codec> Archive<E, K, V> {
106    /// Initialize a new [Archive] with the given [Config].
107    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
108        // Initialize metadata
109        let metadata = Metadata::<E, U64, Record>::init(
110            context.with_label("metadata"),
111            metadata::Config {
112                partition: cfg.metadata_partition,
113                codec_config: (),
114            },
115        )
116        .await?;
117
118        // Get checkpoint
119        let freezer_key = U64::new(FREEZER_PREFIX, 0);
120        let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
121
122        // Initialize table
123        //
124        // TODO (#1227): Use sharded metadata to provide consistency
125        let freezer = Freezer::init_with_checkpoint(
126            context.with_label("freezer"),
127            freezer::Config {
128                journal_partition: cfg.freezer_journal_partition,
129                journal_compression: cfg.freezer_journal_compression,
130                journal_write_buffer: cfg.write_buffer,
131                journal_target_size: cfg.freezer_journal_target_size,
132                journal_buffer_pool: cfg.freezer_journal_buffer_pool,
133                table_partition: cfg.freezer_table_partition,
134                table_initial_size: cfg.freezer_table_initial_size,
135                table_resize_frequency: cfg.freezer_table_resize_frequency,
136                table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
137                table_replay_buffer: cfg.replay_buffer,
138                codec_config: cfg.codec_config,
139            },
140            checkpoint,
141        )
142        .await?;
143
144        // Collect sections
145        let sections = metadata.keys(Some(&[ORDINAL_PREFIX])).collect::<Vec<_>>();
146        let mut section_bits = BTreeMap::new();
147        for section in sections {
148            // Get record
149            let bits = metadata.get(section).unwrap().ordinal();
150
151            // Get section
152            let section = section.value();
153            section_bits.insert(section, bits);
154        }
155
156        // Initialize ordinal
157        //
158        // TODO (#1227): Use sharded metadata to provide consistency
159        let ordinal = Ordinal::init_with_bits(
160            context.with_label("ordinal"),
161            ordinal::Config {
162                partition: cfg.ordinal_partition,
163                items_per_blob: cfg.items_per_section,
164                write_buffer: cfg.write_buffer,
165                replay_buffer: cfg.replay_buffer,
166            },
167            Some(section_bits),
168        )
169        .await?;
170
171        // Initialize metrics
172        let gets = Counter::default();
173        let has = Counter::default();
174        let syncs = Counter::default();
175        context.register("gets", "Number of gets performed", gets.clone());
176        context.register("has", "Number of has performed", has.clone());
177        context.register("syncs", "Number of syncs called", syncs.clone());
178
179        Ok(Self {
180            items_per_section: cfg.items_per_section.get(),
181            metadata,
182            freezer,
183            ordinal,
184            gets,
185            has,
186            syncs,
187        })
188    }
189
190    /// Get the value for the given index.
191    async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
192        // Get ordinal
193        let Some(cursor) = self.ordinal.get(index).await? else {
194            return Ok(None);
195        };
196
197        // Get journal entry
198        let result = self
199            .freezer
200            .get(freezer::Identifier::Cursor(cursor))
201            .await?;
202
203        // Get value
204        Ok(result)
205    }
206
207    /// Get the value for the given key.
208    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
209        // Get table entry
210        let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
211
212        // Get value
213        Ok(result)
214    }
215
216    /// Initialize the section.
217    async fn initialize_section(&mut self, section: u64) {
218        // Create active bit vector
219        let bits = BitVec::zeroes(self.items_per_section as usize);
220
221        // Store record
222        let key = U64::new(ORDINAL_PREFIX, section);
223        self.metadata.put(key, Record::Ordinal(Some(bits)));
224        debug!(section, "initialized section");
225    }
226}
227
228impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::archive::Archive
229    for Archive<E, K, V>
230{
231    type Key = K;
232    type Value = V;
233
234    async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
235        // Ignore duplicates
236        if self.ordinal.has(index) {
237            return Ok(());
238        }
239
240        // Initialize section if it doesn't exist
241        let section = index / self.items_per_section;
242        let ordinal_key = U64::new(ORDINAL_PREFIX, section);
243        if self.metadata.get(&ordinal_key).is_none() {
244            self.initialize_section(section).await;
245        }
246        let record = self.metadata.get_mut(&ordinal_key).unwrap();
247
248        // Update active bits
249        let done = if let Record::Ordinal(Some(bits)) = record {
250            bits.set((index % self.items_per_section) as usize);
251            bits.count_ones() == self.items_per_section as usize
252        } else {
253            false
254        };
255        if done {
256            *record = Record::Ordinal(None);
257        }
258
259        // Put in table
260        let cursor = self.freezer.put(key, data).await?;
261
262        // Put section and offset in ordinal
263        self.ordinal.put(index, cursor).await?;
264
265        Ok(())
266    }
267
268    async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
269        self.gets.inc();
270
271        match identifier {
272            Identifier::Index(index) => self.get_index(index).await,
273            Identifier::Key(key) => self.get_key(key).await,
274        }
275    }
276
277    async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
278        self.has.inc();
279
280        match identifier {
281            Identifier::Index(index) => Ok(self.ordinal.has(index)),
282            Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
283        }
284    }
285
286    async fn sync(&mut self) -> Result<(), Error> {
287        self.syncs.inc();
288
289        // Sync journal and ordinal
290        let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
291        let checkpoint = freezer_result?;
292        ordinal_result?;
293
294        // Update checkpoint
295        let freezer_key = U64::new(FREEZER_PREFIX, 0);
296        self.metadata.put(freezer_key, Record::Freezer(checkpoint));
297
298        // Sync metadata once underlying are synced
299        self.metadata.sync().await?;
300
301        Ok(())
302    }
303
304    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
305        self.ordinal.next_gap(index)
306    }
307
308    async fn close(mut self) -> Result<(), Error> {
309        // Close ordinal
310        self.ordinal.close().await?;
311
312        // Close table
313        let checkpoint = self.freezer.close().await?;
314
315        // Update checkpoint
316        let freezer_key = U64::new(FREEZER_PREFIX, 0);
317        self.metadata.put(freezer_key, Record::Freezer(checkpoint));
318
319        // Close metadata
320        self.metadata.close().await?;
321
322        Ok(())
323    }
324
325    async fn destroy(self) -> Result<(), Error> {
326        // Destroy ordinal
327        self.ordinal.destroy().await?;
328
329        // Destroy freezer
330        self.freezer.destroy().await?;
331
332        // Destroy metadata
333        self.metadata.destroy().await?;
334
335        Ok(())
336    }
337}