Skip to main content

commonware_storage/ordinal/
storage.rs

1use super::{Config, Error};
2use crate::{kv, rmap::RMap, Persistable};
3use commonware_codec::{
4    CodecFixed, CodecFixedShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite,
5};
6use commonware_cryptography::{crc32, Crc32};
7use commonware_runtime::{
8    buffer::{Read as ReadBuffer, Write},
9    Blob, Buf, BufMut, Clock, Error as RError, IoBufMut, Metrics, Storage,
10};
11use commonware_utils::{bitmap::BitMap, hex};
12use futures::future::try_join_all;
13use prometheus_client::metrics::counter::Counter;
14use std::{
15    collections::{btree_map::Entry, BTreeMap, BTreeSet},
16    marker::PhantomData,
17};
18use tracing::{debug, warn};
19
20/// Value stored in the index file.
21#[derive(Debug, Clone)]
22struct Record<V: CodecFixed<Cfg = ()>> {
23    value: V,
24    crc: u32,
25}
26
27impl<V: CodecFixed<Cfg = ()>> Record<V> {
28    fn new(value: V) -> Self {
29        let crc = Crc32::checksum(&value.encode());
30        Self { value, crc }
31    }
32
33    fn is_valid(&self) -> bool {
34        self.crc == Crc32::checksum(&self.value.encode())
35    }
36}
37
38impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
39    const SIZE: usize = V::SIZE + crc32::Digest::SIZE;
40}
41
42impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
43    fn write(&self, buf: &mut impl BufMut) {
44        self.value.write(buf);
45        self.crc.write(buf);
46    }
47}
48
49impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
50    type Cfg = ();
51
52    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
53        let value = V::read(buf)?;
54        let crc = u32::read(buf)?;
55
56        Ok(Self { value, crc })
57    }
58}
59
60#[cfg(feature = "arbitrary")]
61impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
62where
63    V: for<'a> arbitrary::Arbitrary<'a>,
64{
65    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
66        let value = V::arbitrary(u)?;
67        Ok(Self::new(value))
68    }
69}
70
71/// Implementation of [Ordinal].
72pub struct Ordinal<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
73    // Configuration and context
74    context: E,
75    config: Config,
76
77    // Index blobs for storing key records
78    blobs: BTreeMap<u64, Write<E::Blob>>,
79
80    // RMap for interval tracking
81    intervals: RMap,
82
83    // Pending index entries to be synced, grouped by section
84    pending: BTreeSet<u64>,
85
86    // Metrics
87    puts: Counter,
88    gets: Counter,
89    has: Counter,
90    syncs: Counter,
91    pruned: Counter,
92
93    _phantom: PhantomData<V>,
94}
95
96impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
97    /// Initialize a new [Ordinal] instance.
98    pub async fn init(context: E, config: Config) -> Result<Self, Error> {
99        Self::init_with_bits(context, config, None).await
100    }
101
102    /// Initialize a new [Ordinal] instance with a collection of [BitMap]s (indicating which
103    /// records should be considered available).
104    ///
105    /// If a section is not provided in the [BTreeMap], all records in that section are considered
106    /// unavailable. If a [BitMap] is provided for a section, all records in that section are
107    /// considered available if and only if the [BitMap] is set for the record. If a section is provided
108    /// but no [BitMap] is populated, all records in that section are considered available.
109    // TODO(#1227): Hide this complexity from the caller.
110    pub async fn init_with_bits(
111        context: E,
112        config: Config,
113        bits: Option<BTreeMap<u64, &Option<BitMap>>>,
114    ) -> Result<Self, Error> {
115        // Scan for all blobs in the partition
116        let mut blobs = BTreeMap::new();
117        let stored_blobs = match context.scan(&config.partition).await {
118            Ok(blobs) => blobs,
119            Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
120            Err(err) => return Err(Error::Runtime(err)),
121        };
122
123        // Open all blobs and check for partial records
124        for name in stored_blobs {
125            let (blob, mut len) = context.open(&config.partition, &name).await?;
126            let index = match name.try_into() {
127                Ok(index) => u64::from_be_bytes(index),
128                Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
129            };
130
131            // Check if blob size is aligned to record size
132            let record_size = Record::<V>::SIZE as u64;
133            if len % record_size != 0 {
134                warn!(
135                    blob = index,
136                    invalid_size = len,
137                    record_size,
138                    "blob size is not a multiple of record size, truncating"
139                );
140                len -= len % record_size;
141                blob.resize(len).await?;
142                blob.sync().await?;
143            }
144
145            debug!(blob = index, len, "found index blob");
146            let wrapped_blob = Write::new(blob, len, config.write_buffer);
147            blobs.insert(index, wrapped_blob);
148        }
149
150        // Initialize intervals by scanning existing records
151        debug!(
152            blobs = blobs.len(),
153            "rebuilding intervals from existing index"
154        );
155        let start = context.current();
156        let mut items = 0;
157        let mut intervals = RMap::new();
158        for (section, blob) in &blobs {
159            // Skip if bits are provided and the section is not in the bits
160            if let Some(bits) = &bits {
161                if !bits.contains_key(section) {
162                    warn!(section, "skipping section without bits");
163                    continue;
164                }
165            }
166
167            // Initialize read buffer
168            let size = blob.size().await;
169            let mut replay_blob = ReadBuffer::new(blob.clone(), size, config.replay_buffer);
170
171            // Iterate over all records in the blob
172            let mut offset = 0;
173            let items_per_blob = config.items_per_blob.get();
174            while offset < size {
175                // Calculate index for this record
176                let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
177
178                // If bits are provided, skip if not set
179                let mut must_exist = false;
180                if let Some(bits) = &bits {
181                    // If bits are provided, check if the record exists
182                    let bits = bits.get(section).unwrap();
183                    if let Some(bits) = bits {
184                        let bit_index = offset as usize / Record::<V>::SIZE;
185                        if !bits.get(bit_index as u64) {
186                            offset += Record::<V>::SIZE as u64;
187                            continue;
188                        }
189                    }
190
191                    // If bit section exists but it is empty, we must have all records
192                    must_exist = true;
193                }
194
195                // Attempt to read record at offset
196                replay_blob.seek_to(offset)?;
197                let mut record_buf = vec![0u8; Record::<V>::SIZE];
198                replay_blob
199                    .read_exact(&mut record_buf, Record::<V>::SIZE)
200                    .await?;
201                offset += Record::<V>::SIZE as u64;
202
203                // If record is valid, add to intervals
204                if let Ok(record) = Record::<V>::read(&mut record_buf.as_slice()) {
205                    if record.is_valid() {
206                        items += 1;
207                        intervals.insert(index);
208                        continue;
209                    }
210                };
211
212                // If record is invalid, it may either be empty or corrupted. We only care
213                // which is which if the provided bits indicate that the record must exist.
214                if must_exist {
215                    return Err(Error::MissingRecord(index));
216                }
217            }
218        }
219        debug!(
220            items,
221            elapsed = ?context.current().duration_since(start).unwrap_or_default(),
222            "rebuilt intervals"
223        );
224
225        // Initialize metrics
226        let puts = Counter::default();
227        let gets = Counter::default();
228        let has = Counter::default();
229        let syncs = Counter::default();
230        let pruned = Counter::default();
231        context.register("puts", "Number of put calls", puts.clone());
232        context.register("gets", "Number of get calls", gets.clone());
233        context.register("has", "Number of has calls", has.clone());
234        context.register("syncs", "Number of sync calls", syncs.clone());
235        context.register("pruned", "Number of pruned blobs", pruned.clone());
236
237        Ok(Self {
238            context,
239            config,
240            blobs,
241            intervals,
242            pending: BTreeSet::new(),
243            puts,
244            gets,
245            has,
246            syncs,
247            pruned,
248            _phantom: PhantomData,
249        })
250    }
251
252    /// Add a value at the specified index (pending until sync).
253    pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
254        self.puts.inc();
255
256        // Check if blob exists
257        let items_per_blob = self.config.items_per_blob.get();
258        let section = index / items_per_blob;
259        if let Entry::Vacant(entry) = self.blobs.entry(section) {
260            let (blob, len) = self
261                .context
262                .open(&self.config.partition, &section.to_be_bytes())
263                .await?;
264            entry.insert(Write::new(blob, len, self.config.write_buffer));
265            debug!(section, "created blob");
266        }
267
268        // Write the value to the blob
269        let blob = self.blobs.get(&section).unwrap();
270        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
271        let record = Record::new(value);
272        blob.write_at(offset, record.encode_mut()).await?;
273        self.pending.insert(section);
274
275        // Add to intervals
276        self.intervals.insert(index);
277
278        Ok(())
279    }
280
281    /// Get the value for a given index.
282    pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
283        self.gets.inc();
284
285        // If get isn't in an interval, it doesn't exist and we don't need to access disk
286        if self.intervals.get(&index).is_none() {
287            return Ok(None);
288        }
289
290        // Read from disk
291        let items_per_blob = self.config.items_per_blob.get();
292        let section = index / items_per_blob;
293        let blob = self.blobs.get(&section).unwrap();
294        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
295        let read_buf = blob
296            .read_at(offset, IoBufMut::zeroed(Record::<V>::SIZE))
297            .await?
298            .coalesce();
299        let record = Record::<V>::read(&mut read_buf.as_ref())?;
300
301        // If record is valid, return it
302        if record.is_valid() {
303            Ok(Some(record.value))
304        } else {
305            Err(Error::InvalidRecord(index))
306        }
307    }
308
309    /// Check if an index exists.
310    pub fn has(&self, index: u64) -> bool {
311        self.has.inc();
312
313        self.intervals.get(&index).is_some()
314    }
315
316    /// Get the next gap information for backfill operations.
317    pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
318        self.intervals.next_gap(index)
319    }
320
321    /// Get an iterator over all ranges in the [Ordinal].
322    pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
323        self.intervals.iter().map(|(&s, &e)| (s, e))
324    }
325
326    /// Retrieve the first index in the [Ordinal].
327    pub fn first_index(&self) -> Option<u64> {
328        self.intervals.first_index()
329    }
330
331    /// Retrieve the last index in the [Ordinal].
332    pub fn last_index(&self) -> Option<u64> {
333        self.intervals.last_index()
334    }
335
336    /// Returns up to `max` missing items starting from `start`.
337    ///
338    /// This method iterates through gaps between existing ranges, collecting missing indices
339    /// until either `max` items are found or there are no more gaps to fill.
340    pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
341        self.intervals.missing_items(start, max)
342    }
343
344    /// Prune indices older than `min` by removing entire blobs.
345    ///
346    /// Pruning is done at blob boundaries to avoid partial deletions. A blob is pruned only if
347    /// all possible indices in that blob are less than `min`.
348    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
349        // Collect sections to remove
350        let items_per_blob = self.config.items_per_blob.get();
351        let min_section = min / items_per_blob;
352        let sections_to_remove: Vec<u64> = self
353            .blobs
354            .keys()
355            .filter(|&&section| section < min_section)
356            .copied()
357            .collect();
358
359        // Remove the collected sections
360        for section in sections_to_remove {
361            if let Some(blob) = self.blobs.remove(&section) {
362                drop(blob);
363                self.context
364                    .remove(&self.config.partition, Some(&section.to_be_bytes()))
365                    .await?;
366
367                // Remove the corresponding index range from intervals
368                let start_index = section * items_per_blob;
369                let end_index = (section + 1) * items_per_blob - 1;
370                self.intervals.remove(start_index, end_index);
371                debug!(section, start_index, end_index, "pruned blob");
372            }
373
374            // Update metrics
375            self.pruned.inc();
376        }
377
378        // Clean pending entries that fall into pruned sections.
379        self.pending.retain(|&section| section >= min_section);
380
381        Ok(())
382    }
383
384    /// Write all pending entries and sync all modified [Blob]s.
385    pub async fn sync(&mut self) -> Result<(), Error> {
386        self.syncs.inc();
387
388        // Sync all modified blobs
389        let mut futures = Vec::with_capacity(self.pending.len());
390        for &section in &self.pending {
391            futures.push(self.blobs.get(&section).unwrap().sync());
392        }
393        try_join_all(futures).await?;
394
395        // Clear pending sections
396        self.pending.clear();
397
398        Ok(())
399    }
400
401    /// Destroy [Ordinal] and remove all data.
402    pub async fn destroy(self) -> Result<(), Error> {
403        for (i, blob) in self.blobs.into_iter() {
404            drop(blob);
405            self.context
406                .remove(&self.config.partition, Some(&i.to_be_bytes()))
407                .await?;
408            debug!(section = i, "destroyed blob");
409        }
410        match self.context.remove(&self.config.partition, None).await {
411            Ok(()) => {}
412            Err(RError::PartitionMissing(_)) => {
413                // Partition already removed or never existed.
414            }
415            Err(err) => return Err(Error::Runtime(err)),
416        }
417        Ok(())
418    }
419}
420
421impl<E: Storage + Metrics + Clock, V: CodecFixedShared> kv::Gettable for Ordinal<E, V> {
422    type Key = u64;
423    type Value = V;
424    type Error = Error;
425
426    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
427        self.get(*key).await
428    }
429}
430
431impl<E: Storage + Metrics + Clock, V: CodecFixedShared> kv::Updatable for Ordinal<E, V> {
432    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
433        self.put(key, value).await
434    }
435}
436
437impl<E: Storage + Metrics + Clock, V: CodecFixedShared> Persistable for Ordinal<E, V> {
438    type Error = Error;
439
440    async fn commit(&mut self) -> Result<(), Self::Error> {
441        self.sync().await
442    }
443
444    async fn sync(&mut self) -> Result<(), Self::Error> {
445        self.sync().await
446    }
447
448    async fn destroy(self) -> Result<(), Self::Error> {
449        self.destroy().await
450    }
451}
452
453#[cfg(all(test, feature = "arbitrary"))]
454mod conformance {
455    use super::*;
456    use commonware_codec::conformance::CodecConformance;
457
458    commonware_conformance::conformance_tests! {
459        CodecConformance<Record<u32>>
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466    use crate::kv::tests::{assert_gettable, assert_send, assert_updatable};
467    use commonware_runtime::deterministic::Context;
468
469    type TestOrdinal = Ordinal<Context, u64>;
470
471    #[allow(dead_code)]
472    fn assert_ordinal_futures_are_send(ordinal: &mut TestOrdinal, key: u64) {
473        assert_gettable(ordinal, &key);
474        assert_updatable(ordinal, key, 0u64);
475    }
476
477    #[allow(dead_code)]
478    fn assert_ordinal_destroy_is_send(ordinal: TestOrdinal) {
479        assert_send(ordinal.destroy());
480    }
481}