Skip to main content

commonware_storage/ordinal/
storage.rs

1use super::{Config, Error};
2use crate::{rmap::RMap, Context, Persistable};
3use commonware_codec::{
4    CodecFixed, CodecFixedShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite,
5};
6use commonware_cryptography::{crc32, Crc32};
7use commonware_formatting::hex;
8use commonware_runtime::{
9    buffer::{Read as ReadBuffer, Write},
10    telemetry::metrics::{Counter, MetricsExt as _},
11    Blob, Buf, BufMut, BufferPooler, Error as RError,
12};
13use commonware_utils::{bitmap::BitMap, sync::AsyncMutex};
14use futures::future::try_join_all;
15use std::{
16    collections::{btree_map::Entry, BTreeMap, BTreeSet},
17    marker::PhantomData,
18};
19use tracing::{debug, warn};
20
21/// Value stored in the index file.
22#[derive(Debug, Clone)]
23struct Record<V: CodecFixed<Cfg = ()>> {
24    value: V,
25    crc: u32,
26}
27
28impl<V: CodecFixed<Cfg = ()>> Record<V> {
29    fn new(value: V) -> Self {
30        let crc = Crc32::checksum(&value.encode());
31        Self { value, crc }
32    }
33
34    fn is_valid(&self) -> bool {
35        self.crc == Crc32::checksum(&self.value.encode())
36    }
37}
38
39impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
40    const SIZE: usize = V::SIZE + crc32::Digest::SIZE;
41}
42
43impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
44    fn write(&self, buf: &mut impl BufMut) {
45        self.value.write(buf);
46        self.crc.write(buf);
47    }
48}
49
50impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
51    type Cfg = ();
52
53    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
54        let value = V::read(buf)?;
55        let crc = u32::read(buf)?;
56
57        Ok(Self { value, crc })
58    }
59}
60
61#[cfg(feature = "arbitrary")]
62impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
63where
64    V: for<'a> arbitrary::Arbitrary<'a>,
65{
66    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
67        let value = V::arbitrary(u)?;
68        Ok(Self::new(value))
69    }
70}
71
72/// Implementation of [Ordinal].
73pub struct Ordinal<E: BufferPooler + Context, V: CodecFixed<Cfg = ()>> {
74    // Configuration and context
75    context: E,
76    config: Config,
77
78    // Index blobs for storing key records
79    blobs: BTreeMap<u64, Write<E::Blob>>,
80
81    // RMap for interval tracking
82    intervals: RMap,
83
84    // Pending sections to be synced. The async mutex serializes
85    // concurrent sync calls so a second sync cannot return before
86    // the first has finished flushing.
87    pending: AsyncMutex<BTreeSet<u64>>,
88
89    // Metrics
90    puts: Counter,
91    gets: Counter,
92    has: Counter,
93    syncs: Counter,
94    pruned: Counter,
95
96    _phantom: PhantomData<V>,
97}
98
99impl<E: BufferPooler + Context, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
100    /// Initialize a new [Ordinal] instance.
101    pub async fn init(context: E, config: Config) -> Result<Self, Error> {
102        Self::init_with_bits(context, config, None).await
103    }
104
105    /// Initialize a new [Ordinal] instance with a collection of [BitMap]s (indicating which
106    /// records should be considered available).
107    ///
108    /// If a section is not provided in the [BTreeMap], all records in that section are considered
109    /// unavailable. If a [BitMap] is provided for a section, all records in that section are
110    /// considered available if and only if the [BitMap] is set for the record. If a section is provided
111    /// but no [BitMap] is populated, all records in that section are considered available.
112    // TODO(#1227): Hide this complexity from the caller.
113    pub async fn init_with_bits(
114        context: E,
115        config: Config,
116        bits: Option<BTreeMap<u64, &Option<BitMap>>>,
117    ) -> Result<Self, Error> {
118        // Scan for all blobs in the partition
119        let mut blobs = BTreeMap::new();
120        let stored_blobs = match context.scan(&config.partition).await {
121            Ok(blobs) => blobs,
122            Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
123            Err(err) => return Err(Error::Runtime(err)),
124        };
125
126        // Open all blobs and check for partial records
127        for name in stored_blobs {
128            let (blob, mut len) = context.open(&config.partition, &name).await?;
129            let index = match name.try_into() {
130                Ok(index) => u64::from_be_bytes(index),
131                Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
132            };
133
134            // Check if blob size is aligned to record size
135            let record_size = Record::<V>::SIZE as u64;
136            if len % record_size != 0 {
137                warn!(
138                    blob = index,
139                    invalid_size = len,
140                    record_size,
141                    "blob size is not a multiple of record size, truncating"
142                );
143                len -= len % record_size;
144                blob.resize(len).await?;
145                blob.sync().await?;
146            }
147
148            debug!(blob = index, len, "found index blob");
149            blobs.insert(index, (blob, len));
150        }
151
152        // Initialize intervals by scanning existing records
153        debug!(
154            blobs = blobs.len(),
155            "rebuilding intervals from existing index"
156        );
157        let start = context.current();
158        let mut items = 0;
159        let mut intervals = RMap::new();
160        for (section, (blob, size)) in &blobs {
161            // Skip if bits are provided and the section is not in the bits
162            if let Some(bits) = &bits {
163                if !bits.contains_key(section) {
164                    warn!(section, "skipping section without bits");
165                    continue;
166                }
167            }
168
169            // Initialize read buffer
170            let mut replay_blob =
171                ReadBuffer::from_pooler(&context, blob.clone(), *size, config.replay_buffer);
172
173            // Iterate over all records in the blob
174            let mut offset = 0;
175            let items_per_blob = config.items_per_blob.get();
176            while offset < *size {
177                // Calculate index for this record
178                let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
179
180                // If bits are provided, skip if not set
181                let mut must_exist = false;
182                if let Some(bits) = &bits {
183                    // If bits are provided, check if the record exists
184                    let bits = bits.get(section).unwrap();
185                    if let Some(bits) = bits {
186                        let bit_index = offset as usize / Record::<V>::SIZE;
187                        if !bits.get(bit_index as u64) {
188                            offset += Record::<V>::SIZE as u64;
189                            continue;
190                        }
191                    }
192
193                    // If bit section exists but it is empty, we must have all records
194                    must_exist = true;
195                }
196
197                // Attempt to read record at offset
198                replay_blob.seek_to(offset)?;
199                let mut record_buf = replay_blob.read(Record::<V>::SIZE).await?;
200                offset += Record::<V>::SIZE as u64;
201
202                // If record is valid, add to intervals
203                if let Ok(record) = Record::<V>::read(&mut record_buf) {
204                    if record.is_valid() {
205                        items += 1;
206                        intervals.insert(index);
207                        continue;
208                    }
209                };
210
211                // If record is invalid, it may either be empty or corrupted. We only care
212                // which is which if the provided bits indicate that the record must exist.
213                if must_exist {
214                    return Err(Error::MissingRecord(index));
215                }
216            }
217        }
218        debug!(
219            items,
220            elapsed = ?context.current().duration_since(start).unwrap_or_default(),
221            "rebuilt intervals"
222        );
223
224        // Wrap blobs in write buffers
225        let blobs = blobs
226            .into_iter()
227            .map(|(index, (blob, len))| {
228                (
229                    index,
230                    Write::from_pooler(&context, blob, len, config.write_buffer),
231                )
232            })
233            .collect();
234
235        // Initialize metrics
236        let puts = context.counter("puts", "Number of put calls");
237        let gets = context.counter("gets", "Number of get calls");
238        let has = context.counter("has", "Number of has calls");
239        let syncs = context.counter("syncs", "Number of sync calls");
240        let pruned = context.counter("pruned", "Number of pruned blobs");
241
242        Ok(Self {
243            context,
244            config,
245            blobs,
246            intervals,
247            pending: AsyncMutex::new(BTreeSet::new()),
248            puts,
249            gets,
250            has,
251            syncs,
252            pruned,
253            _phantom: PhantomData,
254        })
255    }
256
257    /// Add a value at the specified index (pending until sync).
258    pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
259        self.puts.inc();
260
261        // Check if blob exists
262        let items_per_blob = self.config.items_per_blob.get();
263        let section = index / items_per_blob;
264        if let Entry::Vacant(entry) = self.blobs.entry(section) {
265            let (blob, len) = self
266                .context
267                .open(&self.config.partition, &section.to_be_bytes())
268                .await?;
269            entry.insert(Write::from_pooler(
270                &self.context,
271                blob,
272                len,
273                self.config.write_buffer,
274            ));
275            debug!(section, "created blob");
276        }
277
278        // Write the value to the blob
279        let blob = self.blobs.get(&section).unwrap();
280        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
281        let record = Record::new(value);
282        blob.write_at(offset, record.encode_mut()).await?;
283        self.pending.lock().await.insert(section);
284
285        // Add to intervals
286        self.intervals.insert(index);
287
288        Ok(())
289    }
290
291    /// Get the value for a given index.
292    pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
293        self.gets.inc();
294
295        // If get isn't in an interval, it doesn't exist and we don't need to access disk
296        if self.intervals.get(&index).is_none() {
297            return Ok(None);
298        }
299
300        // Read from disk
301        let items_per_blob = self.config.items_per_blob.get();
302        let section = index / items_per_blob;
303        let blob = self.blobs.get(&section).unwrap();
304        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
305        let mut read_buf = blob.read_at(offset, Record::<V>::SIZE).await?;
306        let record = Record::<V>::read(&mut read_buf)?;
307
308        // If record is valid, return it
309        if record.is_valid() {
310            Ok(Some(record.value))
311        } else {
312            Err(Error::InvalidRecord(index))
313        }
314    }
315
316    /// Check if an index exists.
317    pub fn has(&self, index: u64) -> bool {
318        self.has.inc();
319
320        self.intervals.get(&index).is_some()
321    }
322
323    /// Get the next gap information for backfill operations.
324    pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
325        self.intervals.next_gap(index)
326    }
327
328    /// Get an iterator over all ranges in the [Ordinal].
329    pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
330        self.intervals.iter().map(|(&s, &e)| (s, e))
331    }
332
333    /// Get an iterator over ranges that overlap or follow `from`.
334    pub fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)> + '_ {
335        self.intervals.iter_from(from).map(|(&s, &e)| (s, e))
336    }
337
338    /// Retrieve the first index in the [Ordinal].
339    pub fn first_index(&self) -> Option<u64> {
340        self.intervals.first_index()
341    }
342
343    /// Retrieve the last index in the [Ordinal].
344    pub fn last_index(&self) -> Option<u64> {
345        self.intervals.last_index()
346    }
347
348    /// Returns up to `max` missing items starting from `start`.
349    ///
350    /// This method iterates through gaps between existing ranges, collecting missing indices
351    /// until either `max` items are found or there are no more gaps to fill.
352    pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
353        self.intervals.missing_items(start, max)
354    }
355
356    /// Prune indices older than `min` by removing entire blobs.
357    ///
358    /// Pruning is done at blob boundaries to avoid partial deletions. A blob is pruned only if
359    /// all possible indices in that blob are less than `min`.
360    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
361        // Collect sections to remove
362        let items_per_blob = self.config.items_per_blob.get();
363        let min_section = min / items_per_blob;
364        let sections_to_remove: Vec<u64> = self
365            .blobs
366            .keys()
367            .filter(|&&section| section < min_section)
368            .copied()
369            .collect();
370
371        // Remove the collected sections
372        for section in sections_to_remove {
373            if let Some(blob) = self.blobs.remove(&section) {
374                drop(blob);
375                self.context
376                    .remove(&self.config.partition, Some(&section.to_be_bytes()))
377                    .await?;
378
379                // Remove the corresponding index range from intervals
380                let start_index = section * items_per_blob;
381                let end_index = (section + 1) * items_per_blob - 1;
382                self.intervals.remove(start_index, end_index);
383                debug!(section, start_index, end_index, "pruned blob");
384            }
385
386            // Update metrics
387            self.pruned.inc();
388        }
389
390        // Clean pending entries that fall into pruned sections.
391        self.pending
392            .lock()
393            .await
394            .retain(|&section| section >= min_section);
395
396        Ok(())
397    }
398
399    /// Write all pending entries and sync all modified [Blob]s.
400    pub async fn sync(&self) -> Result<(), Error> {
401        self.syncs.inc();
402
403        // Hold the lock across the entire flush so a concurrent sync
404        // cannot return before durability is established.
405        let mut pending = self.pending.lock().await;
406        if pending.is_empty() {
407            return Ok(());
408        }
409
410        let mut futures = Vec::with_capacity(pending.len());
411        for section in pending.iter() {
412            futures.push(self.blobs.get(section).unwrap().sync());
413        }
414        try_join_all(futures).await?;
415
416        // Clear pending sections.
417        pending.clear();
418
419        Ok(())
420    }
421
422    /// Destroy [Ordinal] and remove all data.
423    pub async fn destroy(self) -> Result<(), Error> {
424        for (i, blob) in self.blobs.into_iter() {
425            drop(blob);
426            self.context
427                .remove(&self.config.partition, Some(&i.to_be_bytes()))
428                .await?;
429            debug!(section = i, "destroyed blob");
430        }
431        match self.context.remove(&self.config.partition, None).await {
432            Ok(()) => {}
433            Err(RError::PartitionMissing(_)) => {
434                // Partition already removed or never existed.
435            }
436            Err(err) => return Err(Error::Runtime(err)),
437        }
438        Ok(())
439    }
440}
441
442impl<E: BufferPooler + Context, V: CodecFixedShared> Persistable for Ordinal<E, V> {
443    type Error = Error;
444
445    async fn commit(&self) -> Result<(), Self::Error> {
446        self.sync().await
447    }
448
449    async fn sync(&self) -> Result<(), Self::Error> {
450        self.sync().await
451    }
452
453    async fn destroy(self) -> Result<(), Self::Error> {
454        self.destroy().await
455    }
456}
457
458#[cfg(all(test, feature = "arbitrary"))]
459mod conformance {
460    use super::*;
461    use commonware_codec::conformance::CodecConformance;
462
463    commonware_conformance::conformance_tests! {
464        CodecConformance<Record<u32>>
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use commonware_runtime::deterministic::Context;
472
473    type TestOrdinal = Ordinal<Context, u64>;
474
475    fn is_send<T: Send>(_: T) {}
476
477    #[allow(dead_code)]
478    fn assert_ordinal_futures_are_send(ordinal: &mut TestOrdinal, key: u64) {
479        is_send(ordinal.get(key));
480        is_send(ordinal.put(key, 0u64));
481    }
482
483    #[allow(dead_code)]
484    fn assert_ordinal_destroy_is_send(ordinal: TestOrdinal) {
485        is_send(ordinal.destroy());
486    }
487}