commonware_storage/ordinal/
storage.rs

1use super::{Config, Error};
2use crate::rmap::RMap;
3use bytes::{Buf, BufMut};
4use commonware_codec::{CodecFixed, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
5use commonware_runtime::{
6    buffer::{Read as ReadBuffer, Write},
7    Blob, Clock, Error as RError, Metrics, Storage,
8};
9use commonware_utils::{bitmap::BitMap, hex};
10use futures::future::try_join_all;
11use prometheus_client::metrics::counter::Counter;
12use std::{
13    collections::{btree_map::Entry, BTreeMap, BTreeSet},
14    marker::PhantomData,
15    mem::take,
16};
17use tracing::{debug, warn};
18
19/// Value stored in the index file.
20#[derive(Debug, Clone)]
21struct Record<V: CodecFixed<Cfg = ()>> {
22    value: V,
23    crc: u32,
24}
25
26impl<V: CodecFixed<Cfg = ()>> Record<V> {
27    fn new(value: V) -> Self {
28        let crc = crc32fast::hash(&value.encode());
29        Self { value, crc }
30    }
31
32    fn is_valid(&self) -> bool {
33        self.crc == crc32fast::hash(&self.value.encode())
34    }
35}
36
37impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
38    const SIZE: usize = V::SIZE + u32::SIZE;
39}
40
41impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
42    fn write(&self, buf: &mut impl BufMut) {
43        self.value.write(buf);
44        self.crc.write(buf);
45    }
46}
47
48impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
49    type Cfg = ();
50
51    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
52        let value = V::read(buf)?;
53        let crc = u32::read(buf)?;
54
55        Ok(Self { value, crc })
56    }
57}
58
59#[cfg(feature = "arbitrary")]
60impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
61where
62    V: for<'a> arbitrary::Arbitrary<'a>,
63{
64    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
65        let value = V::arbitrary(u)?;
66        Ok(Self::new(value))
67    }
68}
69
70/// Implementation of [Ordinal].
71pub struct Ordinal<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
72    // Configuration and context
73    context: E,
74    config: Config,
75
76    // Index blobs for storing key records
77    blobs: BTreeMap<u64, Write<E::Blob>>,
78
79    // RMap for interval tracking
80    intervals: RMap,
81
82    // Pending index entries to be synced, grouped by section
83    pending: BTreeSet<u64>,
84
85    // Metrics
86    puts: Counter,
87    gets: Counter,
88    has: Counter,
89    syncs: Counter,
90    pruned: Counter,
91
92    _phantom: PhantomData<V>,
93}
94
95impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
96    /// Initialize a new [Ordinal] instance.
97    pub async fn init(context: E, config: Config) -> Result<Self, Error> {
98        Self::init_with_bits(context, config, None).await
99    }
100
101    /// Initialize a new [Ordinal] instance with a collection of [BitMap]s (indicating which
102    /// records should be considered available).
103    ///
104    /// If a section is not provided in the [BTreeMap], all records in that section are considered
105    /// unavailable. If a [BitMap] is provided for a section, all records in that section are
106    /// considered available if and only if the [BitMap] is set for the record. If a section is provided
107    /// but no [BitMap] is populated, all records in that section are considered available.
108    // TODO(#1227): Hide this complexity from the caller.
109    pub async fn init_with_bits(
110        context: E,
111        config: Config,
112        bits: Option<BTreeMap<u64, &Option<BitMap>>>,
113    ) -> Result<Self, Error> {
114        // Scan for all blobs in the partition
115        let mut blobs = BTreeMap::new();
116        let stored_blobs = match context.scan(&config.partition).await {
117            Ok(blobs) => blobs,
118            Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
119            Err(err) => return Err(Error::Runtime(err)),
120        };
121
122        // Open all blobs and check for partial records
123        for name in stored_blobs {
124            let (blob, mut len) = context.open(&config.partition, &name).await?;
125            let index = match name.try_into() {
126                Ok(index) => u64::from_be_bytes(index),
127                Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
128            };
129
130            // Check if blob size is aligned to record size
131            let record_size = Record::<V>::SIZE as u64;
132            if len % record_size != 0 {
133                warn!(
134                    blob = index,
135                    invalid_size = len,
136                    record_size,
137                    "blob size is not a multiple of record size, truncating"
138                );
139                len -= len % record_size;
140                blob.resize(len).await?;
141                blob.sync().await?;
142            }
143
144            debug!(blob = index, len, "found index blob");
145            let wrapped_blob = Write::new(blob, len, config.write_buffer);
146            blobs.insert(index, wrapped_blob);
147        }
148
149        // Initialize intervals by scanning existing records
150        debug!(
151            blobs = blobs.len(),
152            "rebuilding intervals from existing index"
153        );
154        let start = context.current();
155        let mut items = 0;
156        let mut intervals = RMap::new();
157        for (section, blob) in &blobs {
158            // Skip if bits are provided and the section is not in the bits
159            if let Some(bits) = &bits {
160                if !bits.contains_key(section) {
161                    warn!(section, "skipping section without bits");
162                    continue;
163                }
164            }
165
166            // Initialize read buffer
167            let size = blob.size().await;
168            let mut replay_blob = ReadBuffer::new(blob.clone(), size, config.replay_buffer);
169
170            // Iterate over all records in the blob
171            let mut offset = 0;
172            let items_per_blob = config.items_per_blob.get();
173            while offset < size {
174                // Calculate index for this record
175                let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
176
177                // If bits are provided, skip if not set
178                let mut must_exist = false;
179                if let Some(bits) = &bits {
180                    // If bits are provided, check if the record exists
181                    let bits = bits.get(section).unwrap();
182                    if let Some(bits) = bits {
183                        let bit_index = offset as usize / Record::<V>::SIZE;
184                        if !bits.get(bit_index as u64) {
185                            offset += Record::<V>::SIZE as u64;
186                            continue;
187                        }
188                    }
189
190                    // If bit section exists but it is empty, we must have all records
191                    must_exist = true;
192                }
193
194                // Attempt to read record at offset
195                replay_blob.seek_to(offset)?;
196                let mut record_buf = vec![0u8; Record::<V>::SIZE];
197                replay_blob
198                    .read_exact(&mut record_buf, Record::<V>::SIZE)
199                    .await?;
200                offset += Record::<V>::SIZE as u64;
201
202                // If record is valid, add to intervals
203                if let Ok(record) = Record::<V>::read(&mut record_buf.as_slice()) {
204                    if record.is_valid() {
205                        items += 1;
206                        intervals.insert(index);
207                        continue;
208                    }
209                };
210
211                // If record is invalid, it may either be empty or corrupted. We only care
212                // which is which if the provided bits indicate that the record must exist.
213                if must_exist {
214                    return Err(Error::MissingRecord(index));
215                }
216            }
217        }
218        debug!(
219            items,
220            elapsed = ?context.current().duration_since(start).unwrap_or_default(),
221            "rebuilt intervals"
222        );
223
224        // Initialize metrics
225        let puts = Counter::default();
226        let gets = Counter::default();
227        let has = Counter::default();
228        let syncs = Counter::default();
229        let pruned = Counter::default();
230        context.register("puts", "Number of put calls", puts.clone());
231        context.register("gets", "Number of get calls", gets.clone());
232        context.register("has", "Number of has calls", has.clone());
233        context.register("syncs", "Number of sync calls", syncs.clone());
234        context.register("pruned", "Number of pruned blobs", pruned.clone());
235
236        Ok(Self {
237            context,
238            config,
239            blobs,
240            intervals,
241            pending: BTreeSet::new(),
242            puts,
243            gets,
244            has,
245            syncs,
246            pruned,
247            _phantom: PhantomData,
248        })
249    }
250
251    /// Add a value at the specified index (pending until sync).
252    pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
253        self.puts.inc();
254
255        // Check if blob exists
256        let items_per_blob = self.config.items_per_blob.get();
257        let section = index / items_per_blob;
258        if let Entry::Vacant(entry) = self.blobs.entry(section) {
259            let (blob, len) = self
260                .context
261                .open(&self.config.partition, &section.to_be_bytes())
262                .await?;
263            entry.insert(Write::new(blob, len, self.config.write_buffer));
264            debug!(section, "created blob");
265        }
266
267        // Write the value to the blob
268        let blob = self.blobs.get(&section).unwrap();
269        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
270        let record = Record::new(value);
271        blob.write_at(record.encode(), offset).await?;
272        self.pending.insert(section);
273
274        // Add to intervals
275        self.intervals.insert(index);
276
277        Ok(())
278    }
279
280    /// Get the value for a given index.
281    pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
282        self.gets.inc();
283
284        // If get isn't in an interval, it doesn't exist and we don't need to access disk
285        if self.intervals.get(&index).is_none() {
286            return Ok(None);
287        }
288
289        // Read from disk
290        let items_per_blob = self.config.items_per_blob.get();
291        let section = index / items_per_blob;
292        let blob = self.blobs.get(&section).unwrap();
293        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
294        let read_buf = vec![0u8; Record::<V>::SIZE];
295        let read_buf = blob.read_at(read_buf, offset).await?;
296        let record = Record::<V>::read(&mut read_buf.as_ref())?;
297
298        // If record is valid, return it
299        if record.is_valid() {
300            Ok(Some(record.value))
301        } else {
302            Err(Error::InvalidRecord(index))
303        }
304    }
305
306    /// Check if an index exists.
307    pub fn has(&self, index: u64) -> bool {
308        self.has.inc();
309
310        self.intervals.get(&index).is_some()
311    }
312
313    /// Get the next gap information for backfill operations.
314    pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
315        self.intervals.next_gap(index)
316    }
317
318    /// Get an iterator over all ranges in the [Ordinal].
319    pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
320        self.intervals.iter().map(|(&s, &e)| (s, e))
321    }
322
323    /// Retrieve the first index in the [Ordinal].
324    pub fn first_index(&self) -> Option<u64> {
325        self.intervals.first_index()
326    }
327
328    /// Retrieve the last index in the [Ordinal].
329    pub fn last_index(&self) -> Option<u64> {
330        self.intervals.last_index()
331    }
332
333    /// Returns up to `max` missing items starting from `start`.
334    ///
335    /// This method iterates through gaps between existing ranges, collecting missing indices
336    /// until either `max` items are found or there are no more gaps to fill.
337    pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
338        self.intervals.missing_items(start, max)
339    }
340
341    /// Prune indices older than `min` by removing entire blobs.
342    ///
343    /// Pruning is done at blob boundaries to avoid partial deletions. A blob is pruned only if
344    /// all possible indices in that blob are less than `min`.
345    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
346        // Collect sections to remove
347        let items_per_blob = self.config.items_per_blob.get();
348        let min_section = min / items_per_blob;
349        let sections_to_remove: Vec<u64> = self
350            .blobs
351            .keys()
352            .filter(|&&section| section < min_section)
353            .copied()
354            .collect();
355
356        // Remove the collected sections
357        for section in sections_to_remove {
358            if let Some(blob) = self.blobs.remove(&section) {
359                drop(blob);
360                self.context
361                    .remove(&self.config.partition, Some(&section.to_be_bytes()))
362                    .await?;
363
364                // Remove the corresponding index range from intervals
365                let start_index = section * items_per_blob;
366                let end_index = (section + 1) * items_per_blob - 1;
367                self.intervals.remove(start_index, end_index);
368                debug!(section, start_index, end_index, "pruned blob");
369            }
370
371            // Update metrics
372            self.pruned.inc();
373        }
374
375        // Clean pending entries that fall into pruned sections.
376        self.pending.retain(|&section| section >= min_section);
377
378        Ok(())
379    }
380
381    /// Write all pending entries and sync all modified [Blob]s.
382    pub async fn sync(&mut self) -> Result<(), Error> {
383        self.syncs.inc();
384
385        // Sync all modified blobs
386        let mut futures = Vec::with_capacity(self.pending.len());
387        for &section in &self.pending {
388            futures.push(self.blobs.get(&section).unwrap().sync());
389        }
390        try_join_all(futures).await?;
391
392        // Clear pending sections
393        self.pending.clear();
394
395        Ok(())
396    }
397
398    /// Sync all pending entries and [Blob]s.
399    pub async fn close(mut self) -> Result<(), Error> {
400        self.sync().await?;
401        for (_, blob) in take(&mut self.blobs) {
402            blob.sync().await?;
403        }
404        Ok(())
405    }
406
407    /// Destroy [Ordinal] and remove all data.
408    pub async fn destroy(self) -> Result<(), Error> {
409        for (i, blob) in self.blobs.into_iter() {
410            drop(blob);
411            self.context
412                .remove(&self.config.partition, Some(&i.to_be_bytes()))
413                .await?;
414            debug!(section = i, "destroyed blob");
415        }
416        match self.context.remove(&self.config.partition, None).await {
417            Ok(()) => {}
418            Err(RError::PartitionMissing(_)) => {
419                // Partition already removed or never existed.
420            }
421            Err(err) => return Err(Error::Runtime(err)),
422        }
423        Ok(())
424    }
425}
426
427impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> crate::store::Store for Ordinal<E, V> {
428    type Key = u64;
429    type Value = V;
430    type Error = Error;
431
432    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
433        self.get(*key).await
434    }
435}
436
437impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> crate::store::StoreMut
438    for Ordinal<E, V>
439{
440    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
441        self.put(key, value).await
442    }
443}
444
445impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> crate::store::StorePersistable
446    for Ordinal<E, V>
447{
448    async fn commit(&mut self) -> Result<(), Self::Error> {
449        self.sync().await
450    }
451
452    async fn destroy(self) -> Result<(), Self::Error> {
453        self.destroy().await
454    }
455}
456
457#[cfg(all(test, feature = "arbitrary"))]
458mod conformance {
459    use super::*;
460    use commonware_codec::conformance::CodecConformance;
461
462    commonware_conformance::conformance_tests! {
463        CodecConformance<Record<u32>>
464    }
465}