commonware_storage/ordinal/
storage.rs

1use super::{Config, Error};
2use crate::{kv, rmap::RMap, Persistable};
3use bytes::{Buf, BufMut};
4use commonware_codec::{
5    CodecFixed, CodecFixedShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite,
6};
7use commonware_cryptography::{crc32, Crc32};
8use commonware_runtime::{
9    buffer::{Read as ReadBuffer, Write},
10    Blob, Clock, Error as RError, Metrics, Storage,
11};
12use commonware_utils::{bitmap::BitMap, hex};
13use futures::future::try_join_all;
14use prometheus_client::metrics::counter::Counter;
15use std::{
16    collections::{btree_map::Entry, BTreeMap, BTreeSet},
17    marker::PhantomData,
18};
19use tracing::{debug, warn};
20
21/// Value stored in the index file.
22#[derive(Debug, Clone)]
23struct Record<V: CodecFixed<Cfg = ()>> {
24    value: V,
25    crc: u32,
26}
27
28impl<V: CodecFixed<Cfg = ()>> Record<V> {
29    fn new(value: V) -> Self {
30        let crc = Crc32::checksum(&value.encode());
31        Self { value, crc }
32    }
33
34    fn is_valid(&self) -> bool {
35        self.crc == Crc32::checksum(&self.value.encode())
36    }
37}
38
39impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
40    const SIZE: usize = V::SIZE + crc32::Digest::SIZE;
41}
42
43impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
44    fn write(&self, buf: &mut impl BufMut) {
45        self.value.write(buf);
46        self.crc.write(buf);
47    }
48}
49
50impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
51    type Cfg = ();
52
53    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
54        let value = V::read(buf)?;
55        let crc = u32::read(buf)?;
56
57        Ok(Self { value, crc })
58    }
59}
60
61#[cfg(feature = "arbitrary")]
62impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
63where
64    V: for<'a> arbitrary::Arbitrary<'a>,
65{
66    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
67        let value = V::arbitrary(u)?;
68        Ok(Self::new(value))
69    }
70}
71
72/// Implementation of [Ordinal].
73pub struct Ordinal<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
74    // Configuration and context
75    context: E,
76    config: Config,
77
78    // Index blobs for storing key records
79    blobs: BTreeMap<u64, Write<E::Blob>>,
80
81    // RMap for interval tracking
82    intervals: RMap,
83
84    // Pending index entries to be synced, grouped by section
85    pending: BTreeSet<u64>,
86
87    // Metrics
88    puts: Counter,
89    gets: Counter,
90    has: Counter,
91    syncs: Counter,
92    pruned: Counter,
93
94    _phantom: PhantomData<V>,
95}
96
97impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
98    /// Initialize a new [Ordinal] instance.
99    pub async fn init(context: E, config: Config) -> Result<Self, Error> {
100        Self::init_with_bits(context, config, None).await
101    }
102
103    /// Initialize a new [Ordinal] instance with a collection of [BitMap]s (indicating which
104    /// records should be considered available).
105    ///
106    /// If a section is not provided in the [BTreeMap], all records in that section are considered
107    /// unavailable. If a [BitMap] is provided for a section, all records in that section are
108    /// considered available if and only if the [BitMap] is set for the record. If a section is provided
109    /// but no [BitMap] is populated, all records in that section are considered available.
110    // TODO(#1227): Hide this complexity from the caller.
111    pub async fn init_with_bits(
112        context: E,
113        config: Config,
114        bits: Option<BTreeMap<u64, &Option<BitMap>>>,
115    ) -> Result<Self, Error> {
116        // Scan for all blobs in the partition
117        let mut blobs = BTreeMap::new();
118        let stored_blobs = match context.scan(&config.partition).await {
119            Ok(blobs) => blobs,
120            Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
121            Err(err) => return Err(Error::Runtime(err)),
122        };
123
124        // Open all blobs and check for partial records
125        for name in stored_blobs {
126            let (blob, mut len) = context.open(&config.partition, &name).await?;
127            let index = match name.try_into() {
128                Ok(index) => u64::from_be_bytes(index),
129                Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
130            };
131
132            // Check if blob size is aligned to record size
133            let record_size = Record::<V>::SIZE as u64;
134            if len % record_size != 0 {
135                warn!(
136                    blob = index,
137                    invalid_size = len,
138                    record_size,
139                    "blob size is not a multiple of record size, truncating"
140                );
141                len -= len % record_size;
142                blob.resize(len).await?;
143                blob.sync().await?;
144            }
145
146            debug!(blob = index, len, "found index blob");
147            let wrapped_blob = Write::new(blob, len, config.write_buffer);
148            blobs.insert(index, wrapped_blob);
149        }
150
151        // Initialize intervals by scanning existing records
152        debug!(
153            blobs = blobs.len(),
154            "rebuilding intervals from existing index"
155        );
156        let start = context.current();
157        let mut items = 0;
158        let mut intervals = RMap::new();
159        for (section, blob) in &blobs {
160            // Skip if bits are provided and the section is not in the bits
161            if let Some(bits) = &bits {
162                if !bits.contains_key(section) {
163                    warn!(section, "skipping section without bits");
164                    continue;
165                }
166            }
167
168            // Initialize read buffer
169            let size = blob.size().await;
170            let mut replay_blob = ReadBuffer::new(blob.clone(), size, config.replay_buffer);
171
172            // Iterate over all records in the blob
173            let mut offset = 0;
174            let items_per_blob = config.items_per_blob.get();
175            while offset < size {
176                // Calculate index for this record
177                let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
178
179                // If bits are provided, skip if not set
180                let mut must_exist = false;
181                if let Some(bits) = &bits {
182                    // If bits are provided, check if the record exists
183                    let bits = bits.get(section).unwrap();
184                    if let Some(bits) = bits {
185                        let bit_index = offset as usize / Record::<V>::SIZE;
186                        if !bits.get(bit_index as u64) {
187                            offset += Record::<V>::SIZE as u64;
188                            continue;
189                        }
190                    }
191
192                    // If bit section exists but it is empty, we must have all records
193                    must_exist = true;
194                }
195
196                // Attempt to read record at offset
197                replay_blob.seek_to(offset)?;
198                let mut record_buf = vec![0u8; Record::<V>::SIZE];
199                replay_blob
200                    .read_exact(&mut record_buf, Record::<V>::SIZE)
201                    .await?;
202                offset += Record::<V>::SIZE as u64;
203
204                // If record is valid, add to intervals
205                if let Ok(record) = Record::<V>::read(&mut record_buf.as_slice()) {
206                    if record.is_valid() {
207                        items += 1;
208                        intervals.insert(index);
209                        continue;
210                    }
211                };
212
213                // If record is invalid, it may either be empty or corrupted. We only care
214                // which is which if the provided bits indicate that the record must exist.
215                if must_exist {
216                    return Err(Error::MissingRecord(index));
217                }
218            }
219        }
220        debug!(
221            items,
222            elapsed = ?context.current().duration_since(start).unwrap_or_default(),
223            "rebuilt intervals"
224        );
225
226        // Initialize metrics
227        let puts = Counter::default();
228        let gets = Counter::default();
229        let has = Counter::default();
230        let syncs = Counter::default();
231        let pruned = Counter::default();
232        context.register("puts", "Number of put calls", puts.clone());
233        context.register("gets", "Number of get calls", gets.clone());
234        context.register("has", "Number of has calls", has.clone());
235        context.register("syncs", "Number of sync calls", syncs.clone());
236        context.register("pruned", "Number of pruned blobs", pruned.clone());
237
238        Ok(Self {
239            context,
240            config,
241            blobs,
242            intervals,
243            pending: BTreeSet::new(),
244            puts,
245            gets,
246            has,
247            syncs,
248            pruned,
249            _phantom: PhantomData,
250        })
251    }
252
253    /// Add a value at the specified index (pending until sync).
254    pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
255        self.puts.inc();
256
257        // Check if blob exists
258        let items_per_blob = self.config.items_per_blob.get();
259        let section = index / items_per_blob;
260        if let Entry::Vacant(entry) = self.blobs.entry(section) {
261            let (blob, len) = self
262                .context
263                .open(&self.config.partition, &section.to_be_bytes())
264                .await?;
265            entry.insert(Write::new(blob, len, self.config.write_buffer));
266            debug!(section, "created blob");
267        }
268
269        // Write the value to the blob
270        let blob = self.blobs.get(&section).unwrap();
271        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
272        let record = Record::new(value);
273        blob.write_at(record.encode_mut(), offset).await?;
274        self.pending.insert(section);
275
276        // Add to intervals
277        self.intervals.insert(index);
278
279        Ok(())
280    }
281
282    /// Get the value for a given index.
283    pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
284        self.gets.inc();
285
286        // If get isn't in an interval, it doesn't exist and we don't need to access disk
287        if self.intervals.get(&index).is_none() {
288            return Ok(None);
289        }
290
291        // Read from disk
292        let items_per_blob = self.config.items_per_blob.get();
293        let section = index / items_per_blob;
294        let blob = self.blobs.get(&section).unwrap();
295        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
296        let read_buf = vec![0u8; Record::<V>::SIZE];
297        let read_buf = blob.read_at(read_buf, offset).await?;
298        let record = Record::<V>::read(&mut read_buf.as_ref())?;
299
300        // If record is valid, return it
301        if record.is_valid() {
302            Ok(Some(record.value))
303        } else {
304            Err(Error::InvalidRecord(index))
305        }
306    }
307
308    /// Check if an index exists.
309    pub fn has(&self, index: u64) -> bool {
310        self.has.inc();
311
312        self.intervals.get(&index).is_some()
313    }
314
315    /// Get the next gap information for backfill operations.
316    pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
317        self.intervals.next_gap(index)
318    }
319
320    /// Get an iterator over all ranges in the [Ordinal].
321    pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
322        self.intervals.iter().map(|(&s, &e)| (s, e))
323    }
324
325    /// Retrieve the first index in the [Ordinal].
326    pub fn first_index(&self) -> Option<u64> {
327        self.intervals.first_index()
328    }
329
330    /// Retrieve the last index in the [Ordinal].
331    pub fn last_index(&self) -> Option<u64> {
332        self.intervals.last_index()
333    }
334
335    /// Returns up to `max` missing items starting from `start`.
336    ///
337    /// This method iterates through gaps between existing ranges, collecting missing indices
338    /// until either `max` items are found or there are no more gaps to fill.
339    pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
340        self.intervals.missing_items(start, max)
341    }
342
343    /// Prune indices older than `min` by removing entire blobs.
344    ///
345    /// Pruning is done at blob boundaries to avoid partial deletions. A blob is pruned only if
346    /// all possible indices in that blob are less than `min`.
347    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
348        // Collect sections to remove
349        let items_per_blob = self.config.items_per_blob.get();
350        let min_section = min / items_per_blob;
351        let sections_to_remove: Vec<u64> = self
352            .blobs
353            .keys()
354            .filter(|&&section| section < min_section)
355            .copied()
356            .collect();
357
358        // Remove the collected sections
359        for section in sections_to_remove {
360            if let Some(blob) = self.blobs.remove(&section) {
361                drop(blob);
362                self.context
363                    .remove(&self.config.partition, Some(&section.to_be_bytes()))
364                    .await?;
365
366                // Remove the corresponding index range from intervals
367                let start_index = section * items_per_blob;
368                let end_index = (section + 1) * items_per_blob - 1;
369                self.intervals.remove(start_index, end_index);
370                debug!(section, start_index, end_index, "pruned blob");
371            }
372
373            // Update metrics
374            self.pruned.inc();
375        }
376
377        // Clean pending entries that fall into pruned sections.
378        self.pending.retain(|&section| section >= min_section);
379
380        Ok(())
381    }
382
383    /// Write all pending entries and sync all modified [Blob]s.
384    pub async fn sync(&mut self) -> Result<(), Error> {
385        self.syncs.inc();
386
387        // Sync all modified blobs
388        let mut futures = Vec::with_capacity(self.pending.len());
389        for &section in &self.pending {
390            futures.push(self.blobs.get(&section).unwrap().sync());
391        }
392        try_join_all(futures).await?;
393
394        // Clear pending sections
395        self.pending.clear();
396
397        Ok(())
398    }
399
400    /// Destroy [Ordinal] and remove all data.
401    pub async fn destroy(self) -> Result<(), Error> {
402        for (i, blob) in self.blobs.into_iter() {
403            drop(blob);
404            self.context
405                .remove(&self.config.partition, Some(&i.to_be_bytes()))
406                .await?;
407            debug!(section = i, "destroyed blob");
408        }
409        match self.context.remove(&self.config.partition, None).await {
410            Ok(()) => {}
411            Err(RError::PartitionMissing(_)) => {
412                // Partition already removed or never existed.
413            }
414            Err(err) => return Err(Error::Runtime(err)),
415        }
416        Ok(())
417    }
418}
419
420impl<E: Storage + Metrics + Clock, V: CodecFixedShared> kv::Gettable for Ordinal<E, V> {
421    type Key = u64;
422    type Value = V;
423    type Error = Error;
424
425    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
426        self.get(*key).await
427    }
428}
429
430impl<E: Storage + Metrics + Clock, V: CodecFixedShared> kv::Updatable for Ordinal<E, V> {
431    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
432        self.put(key, value).await
433    }
434}
435
436impl<E: Storage + Metrics + Clock, V: CodecFixedShared> Persistable for Ordinal<E, V> {
437    type Error = Error;
438
439    async fn commit(&mut self) -> Result<(), Self::Error> {
440        self.sync().await
441    }
442
443    async fn sync(&mut self) -> Result<(), Self::Error> {
444        self.sync().await
445    }
446
447    async fn destroy(self) -> Result<(), Self::Error> {
448        self.destroy().await
449    }
450}
451
452#[cfg(all(test, feature = "arbitrary"))]
453mod conformance {
454    use super::*;
455    use commonware_codec::conformance::CodecConformance;
456
457    commonware_conformance::conformance_tests! {
458        CodecConformance<Record<u32>>
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use crate::kv::tests::{assert_gettable, assert_send, assert_updatable};
466    use commonware_runtime::deterministic::Context;
467
468    type TestOrdinal = Ordinal<Context, u64>;
469
470    #[allow(dead_code)]
471    fn assert_ordinal_futures_are_send(ordinal: &mut TestOrdinal, key: u64) {
472        assert_gettable(ordinal, &key);
473        assert_updatable(ordinal, key, 0u64);
474    }
475
476    #[allow(dead_code)]
477    fn assert_ordinal_destroy_is_send(ordinal: TestOrdinal) {
478        assert_send(ordinal.destroy());
479    }
480}