Skip to main content

commonware_storage/ordinal/
storage.rs

1use super::{Config, Error};
2use crate::{kv, rmap::RMap, Persistable};
3use commonware_codec::{
4    CodecFixed, CodecFixedShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite,
5};
6use commonware_cryptography::{crc32, Crc32};
7use commonware_runtime::{
8    buffer::{Read as ReadBuffer, Write},
9    Blob, Buf, BufMut, BufferPooler, Clock, Error as RError, Metrics, Storage,
10};
11use commonware_utils::{bitmap::BitMap, hex, sync::AsyncMutex};
12use futures::future::try_join_all;
13use prometheus_client::metrics::counter::Counter;
14use std::{
15    collections::{btree_map::Entry, BTreeMap, BTreeSet},
16    marker::PhantomData,
17};
18use tracing::{debug, warn};
19
20/// Value stored in the index file.
21#[derive(Debug, Clone)]
22struct Record<V: CodecFixed<Cfg = ()>> {
23    value: V,
24    crc: u32,
25}
26
27impl<V: CodecFixed<Cfg = ()>> Record<V> {
28    fn new(value: V) -> Self {
29        let crc = Crc32::checksum(&value.encode());
30        Self { value, crc }
31    }
32
33    fn is_valid(&self) -> bool {
34        self.crc == Crc32::checksum(&self.value.encode())
35    }
36}
37
38impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
39    const SIZE: usize = V::SIZE + crc32::Digest::SIZE;
40}
41
42impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
43    fn write(&self, buf: &mut impl BufMut) {
44        self.value.write(buf);
45        self.crc.write(buf);
46    }
47}
48
49impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
50    type Cfg = ();
51
52    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
53        let value = V::read(buf)?;
54        let crc = u32::read(buf)?;
55
56        Ok(Self { value, crc })
57    }
58}
59
60#[cfg(feature = "arbitrary")]
61impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
62where
63    V: for<'a> arbitrary::Arbitrary<'a>,
64{
65    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
66        let value = V::arbitrary(u)?;
67        Ok(Self::new(value))
68    }
69}
70
71/// Implementation of [Ordinal].
72pub struct Ordinal<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
73    // Configuration and context
74    context: E,
75    config: Config,
76
77    // Index blobs for storing key records
78    blobs: BTreeMap<u64, Write<E::Blob>>,
79
80    // RMap for interval tracking
81    intervals: RMap,
82
83    // Pending sections to be synced. The async mutex serializes
84    // concurrent sync calls so a second sync cannot return before
85    // the first has finished flushing.
86    pending: AsyncMutex<BTreeSet<u64>>,
87
88    // Metrics
89    puts: Counter,
90    gets: Counter,
91    has: Counter,
92    syncs: Counter,
93    pruned: Counter,
94
95    _phantom: PhantomData<V>,
96}
97
98impl<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
99    /// Initialize a new [Ordinal] instance.
100    pub async fn init(context: E, config: Config) -> Result<Self, Error> {
101        Self::init_with_bits(context, config, None).await
102    }
103
104    /// Initialize a new [Ordinal] instance with a collection of [BitMap]s (indicating which
105    /// records should be considered available).
106    ///
107    /// If a section is not provided in the [BTreeMap], all records in that section are considered
108    /// unavailable. If a [BitMap] is provided for a section, all records in that section are
109    /// considered available if and only if the [BitMap] is set for the record. If a section is provided
110    /// but no [BitMap] is populated, all records in that section are considered available.
111    // TODO(#1227): Hide this complexity from the caller.
112    pub async fn init_with_bits(
113        context: E,
114        config: Config,
115        bits: Option<BTreeMap<u64, &Option<BitMap>>>,
116    ) -> Result<Self, Error> {
117        // Scan for all blobs in the partition
118        let mut blobs = BTreeMap::new();
119        let stored_blobs = match context.scan(&config.partition).await {
120            Ok(blobs) => blobs,
121            Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
122            Err(err) => return Err(Error::Runtime(err)),
123        };
124
125        // Open all blobs and check for partial records
126        for name in stored_blobs {
127            let (blob, mut len) = context.open(&config.partition, &name).await?;
128            let index = match name.try_into() {
129                Ok(index) => u64::from_be_bytes(index),
130                Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
131            };
132
133            // Check if blob size is aligned to record size
134            let record_size = Record::<V>::SIZE as u64;
135            if len % record_size != 0 {
136                warn!(
137                    blob = index,
138                    invalid_size = len,
139                    record_size,
140                    "blob size is not a multiple of record size, truncating"
141                );
142                len -= len % record_size;
143                blob.resize(len).await?;
144                blob.sync().await?;
145            }
146
147            debug!(blob = index, len, "found index blob");
148            blobs.insert(index, (blob, len));
149        }
150
151        // Initialize intervals by scanning existing records
152        debug!(
153            blobs = blobs.len(),
154            "rebuilding intervals from existing index"
155        );
156        let start = context.current();
157        let mut items = 0;
158        let mut intervals = RMap::new();
159        for (section, (blob, size)) in &blobs {
160            // Skip if bits are provided and the section is not in the bits
161            if let Some(bits) = &bits {
162                if !bits.contains_key(section) {
163                    warn!(section, "skipping section without bits");
164                    continue;
165                }
166            }
167
168            // Initialize read buffer
169            let mut replay_blob =
170                ReadBuffer::from_pooler(&context, blob.clone(), *size, config.replay_buffer);
171
172            // Iterate over all records in the blob
173            let mut offset = 0;
174            let items_per_blob = config.items_per_blob.get();
175            while offset < *size {
176                // Calculate index for this record
177                let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
178
179                // If bits are provided, skip if not set
180                let mut must_exist = false;
181                if let Some(bits) = &bits {
182                    // If bits are provided, check if the record exists
183                    let bits = bits.get(section).unwrap();
184                    if let Some(bits) = bits {
185                        let bit_index = offset as usize / Record::<V>::SIZE;
186                        if !bits.get(bit_index as u64) {
187                            offset += Record::<V>::SIZE as u64;
188                            continue;
189                        }
190                    }
191
192                    // If bit section exists but it is empty, we must have all records
193                    must_exist = true;
194                }
195
196                // Attempt to read record at offset
197                replay_blob.seek_to(offset)?;
198                let mut record_buf = replay_blob.read(Record::<V>::SIZE).await?;
199                offset += Record::<V>::SIZE as u64;
200
201                // If record is valid, add to intervals
202                if let Ok(record) = Record::<V>::read(&mut record_buf) {
203                    if record.is_valid() {
204                        items += 1;
205                        intervals.insert(index);
206                        continue;
207                    }
208                };
209
210                // If record is invalid, it may either be empty or corrupted. We only care
211                // which is which if the provided bits indicate that the record must exist.
212                if must_exist {
213                    return Err(Error::MissingRecord(index));
214                }
215            }
216        }
217        debug!(
218            items,
219            elapsed = ?context.current().duration_since(start).unwrap_or_default(),
220            "rebuilt intervals"
221        );
222
223        // Wrap blobs in write buffers
224        let blobs = blobs
225            .into_iter()
226            .map(|(index, (blob, len))| {
227                (
228                    index,
229                    Write::from_pooler(&context, blob, len, config.write_buffer),
230                )
231            })
232            .collect();
233
234        // Initialize metrics
235        let puts = Counter::default();
236        let gets = Counter::default();
237        let has = Counter::default();
238        let syncs = Counter::default();
239        let pruned = Counter::default();
240        context.register("puts", "Number of put calls", puts.clone());
241        context.register("gets", "Number of get calls", gets.clone());
242        context.register("has", "Number of has calls", has.clone());
243        context.register("syncs", "Number of sync calls", syncs.clone());
244        context.register("pruned", "Number of pruned blobs", pruned.clone());
245
246        Ok(Self {
247            context,
248            config,
249            blobs,
250            intervals,
251            pending: AsyncMutex::new(BTreeSet::new()),
252            puts,
253            gets,
254            has,
255            syncs,
256            pruned,
257            _phantom: PhantomData,
258        })
259    }
260
261    /// Add a value at the specified index (pending until sync).
262    pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
263        self.puts.inc();
264
265        // Check if blob exists
266        let items_per_blob = self.config.items_per_blob.get();
267        let section = index / items_per_blob;
268        if let Entry::Vacant(entry) = self.blobs.entry(section) {
269            let (blob, len) = self
270                .context
271                .open(&self.config.partition, &section.to_be_bytes())
272                .await?;
273            entry.insert(Write::from_pooler(
274                &self.context,
275                blob,
276                len,
277                self.config.write_buffer,
278            ));
279            debug!(section, "created blob");
280        }
281
282        // Write the value to the blob
283        let blob = self.blobs.get(&section).unwrap();
284        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
285        let record = Record::new(value);
286        blob.write_at(offset, record.encode_mut()).await?;
287        self.pending.lock().await.insert(section);
288
289        // Add to intervals
290        self.intervals.insert(index);
291
292        Ok(())
293    }
294
295    /// Get the value for a given index.
296    pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
297        self.gets.inc();
298
299        // If get isn't in an interval, it doesn't exist and we don't need to access disk
300        if self.intervals.get(&index).is_none() {
301            return Ok(None);
302        }
303
304        // Read from disk
305        let items_per_blob = self.config.items_per_blob.get();
306        let section = index / items_per_blob;
307        let blob = self.blobs.get(&section).unwrap();
308        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
309        let mut read_buf = blob.read_at(offset, Record::<V>::SIZE).await?;
310        let record = Record::<V>::read(&mut read_buf)?;
311
312        // If record is valid, return it
313        if record.is_valid() {
314            Ok(Some(record.value))
315        } else {
316            Err(Error::InvalidRecord(index))
317        }
318    }
319
320    /// Check if an index exists.
321    pub fn has(&self, index: u64) -> bool {
322        self.has.inc();
323
324        self.intervals.get(&index).is_some()
325    }
326
327    /// Get the next gap information for backfill operations.
328    pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
329        self.intervals.next_gap(index)
330    }
331
332    /// Get an iterator over all ranges in the [Ordinal].
333    pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
334        self.intervals.iter().map(|(&s, &e)| (s, e))
335    }
336
337    /// Retrieve the first index in the [Ordinal].
338    pub fn first_index(&self) -> Option<u64> {
339        self.intervals.first_index()
340    }
341
342    /// Retrieve the last index in the [Ordinal].
343    pub fn last_index(&self) -> Option<u64> {
344        self.intervals.last_index()
345    }
346
347    /// Returns up to `max` missing items starting from `start`.
348    ///
349    /// This method iterates through gaps between existing ranges, collecting missing indices
350    /// until either `max` items are found or there are no more gaps to fill.
351    pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
352        self.intervals.missing_items(start, max)
353    }
354
355    /// Prune indices older than `min` by removing entire blobs.
356    ///
357    /// Pruning is done at blob boundaries to avoid partial deletions. A blob is pruned only if
358    /// all possible indices in that blob are less than `min`.
359    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
360        // Collect sections to remove
361        let items_per_blob = self.config.items_per_blob.get();
362        let min_section = min / items_per_blob;
363        let sections_to_remove: Vec<u64> = self
364            .blobs
365            .keys()
366            .filter(|&&section| section < min_section)
367            .copied()
368            .collect();
369
370        // Remove the collected sections
371        for section in sections_to_remove {
372            if let Some(blob) = self.blobs.remove(&section) {
373                drop(blob);
374                self.context
375                    .remove(&self.config.partition, Some(&section.to_be_bytes()))
376                    .await?;
377
378                // Remove the corresponding index range from intervals
379                let start_index = section * items_per_blob;
380                let end_index = (section + 1) * items_per_blob - 1;
381                self.intervals.remove(start_index, end_index);
382                debug!(section, start_index, end_index, "pruned blob");
383            }
384
385            // Update metrics
386            self.pruned.inc();
387        }
388
389        // Clean pending entries that fall into pruned sections.
390        self.pending
391            .lock()
392            .await
393            .retain(|&section| section >= min_section);
394
395        Ok(())
396    }
397
398    /// Write all pending entries and sync all modified [Blob]s.
399    pub async fn sync(&self) -> Result<(), Error> {
400        self.syncs.inc();
401
402        // Hold the lock across the entire flush so a concurrent sync
403        // cannot return before durability is established.
404        let mut pending = self.pending.lock().await;
405        if pending.is_empty() {
406            return Ok(());
407        }
408
409        let mut futures = Vec::with_capacity(pending.len());
410        for section in pending.iter() {
411            futures.push(self.blobs.get(section).unwrap().sync());
412        }
413        try_join_all(futures).await?;
414
415        // Clear pending sections.
416        pending.clear();
417
418        Ok(())
419    }
420
421    /// Destroy [Ordinal] and remove all data.
422    pub async fn destroy(self) -> Result<(), Error> {
423        for (i, blob) in self.blobs.into_iter() {
424            drop(blob);
425            self.context
426                .remove(&self.config.partition, Some(&i.to_be_bytes()))
427                .await?;
428            debug!(section = i, "destroyed blob");
429        }
430        match self.context.remove(&self.config.partition, None).await {
431            Ok(()) => {}
432            Err(RError::PartitionMissing(_)) => {
433                // Partition already removed or never existed.
434            }
435            Err(err) => return Err(Error::Runtime(err)),
436        }
437        Ok(())
438    }
439}
440
441impl<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixedShared> kv::Gettable
442    for Ordinal<E, V>
443{
444    type Key = u64;
445    type Value = V;
446    type Error = Error;
447
448    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
449        self.get(*key).await
450    }
451}
452
453impl<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixedShared> kv::Updatable
454    for Ordinal<E, V>
455{
456    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
457        self.put(key, value).await
458    }
459}
460
461impl<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixedShared> Persistable
462    for Ordinal<E, V>
463{
464    type Error = Error;
465
466    async fn commit(&self) -> Result<(), Self::Error> {
467        self.sync().await
468    }
469
470    async fn sync(&self) -> Result<(), Self::Error> {
471        self.sync().await
472    }
473
474    async fn destroy(self) -> Result<(), Self::Error> {
475        self.destroy().await
476    }
477}
478
479#[cfg(all(test, feature = "arbitrary"))]
480mod conformance {
481    use super::*;
482    use commonware_codec::conformance::CodecConformance;
483
484    commonware_conformance::conformance_tests! {
485        CodecConformance<Record<u32>>
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492    use crate::kv::tests::{assert_gettable, assert_send, assert_updatable};
493    use commonware_runtime::deterministic::Context;
494
495    type TestOrdinal = Ordinal<Context, u64>;
496
497    #[allow(dead_code)]
498    fn assert_ordinal_futures_are_send(ordinal: &mut TestOrdinal, key: u64) {
499        assert_gettable(ordinal, &key);
500        assert_updatable(ordinal, key, 0u64);
501    }
502
503    #[allow(dead_code)]
504    fn assert_ordinal_destroy_is_send(ordinal: TestOrdinal) {
505        assert_send(ordinal.destroy());
506    }
507}