commonware_storage/ordinal/
storage.rs

1use super::{Config, Error};
2use crate::rmap::RMap;
3use bytes::{Buf, BufMut};
4use commonware_codec::{CodecFixed, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
5use commonware_runtime::{
6    buffer::{Read as ReadBuffer, Write},
7    Blob, Clock, Error as RError, Metrics, Storage,
8};
9use commonware_utils::{bitmap::BitMap, hex};
10use futures::future::try_join_all;
11use prometheus_client::metrics::counter::Counter;
12use std::{
13    collections::{btree_map::Entry, BTreeMap, BTreeSet},
14    marker::PhantomData,
15    mem::take,
16};
17use tracing::{debug, warn};
18
19/// Value stored in the index file.
20#[derive(Debug, Clone)]
21struct Record<V: CodecFixed<Cfg = ()>> {
22    value: V,
23    crc: u32,
24}
25
26impl<V: CodecFixed<Cfg = ()>> Record<V> {
27    fn new(value: V) -> Self {
28        let crc = crc32fast::hash(&value.encode());
29        Self { value, crc }
30    }
31
32    fn is_valid(&self) -> bool {
33        self.crc == crc32fast::hash(&self.value.encode())
34    }
35}
36
37impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
38    const SIZE: usize = V::SIZE + u32::SIZE;
39}
40
41impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
42    fn write(&self, buf: &mut impl BufMut) {
43        self.value.write(buf);
44        self.crc.write(buf);
45    }
46}
47
48impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
49    type Cfg = ();
50
51    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
52        let value = V::read(buf)?;
53        let crc = u32::read(buf)?;
54
55        Ok(Self { value, crc })
56    }
57}
58
59/// Implementation of [Ordinal].
60pub struct Ordinal<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
61    // Configuration and context
62    context: E,
63    config: Config,
64
65    // Index blobs for storing key records
66    blobs: BTreeMap<u64, Write<E::Blob>>,
67
68    // RMap for interval tracking
69    intervals: RMap,
70
71    // Pending index entries to be synced, grouped by section
72    pending: BTreeSet<u64>,
73
74    // Metrics
75    puts: Counter,
76    gets: Counter,
77    has: Counter,
78    syncs: Counter,
79    pruned: Counter,
80
81    _phantom: PhantomData<V>,
82}
83
84impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
85    /// Initialize a new [Ordinal] instance.
86    pub async fn init(context: E, config: Config) -> Result<Self, Error> {
87        Self::init_with_bits(context, config, None).await
88    }
89
90    /// Initialize a new [Ordinal] instance with a collection of [BitMap]s (indicating which
91    /// records should be considered available).
92    ///
93    /// If a section is not provided in the [BTreeMap], all records in that section are considered
94    /// unavailable. If a [BitMap] is provided for a section, all records in that section are
95    /// considered available if and only if the [BitMap] is set for the record. If a section is provided
96    /// but no [BitMap] is populated, all records in that section are considered available.
97    // TODO(#1227): Hide this complexity from the caller.
98    pub async fn init_with_bits(
99        context: E,
100        config: Config,
101        bits: Option<BTreeMap<u64, &Option<BitMap>>>,
102    ) -> Result<Self, Error> {
103        // Scan for all blobs in the partition
104        let mut blobs = BTreeMap::new();
105        let stored_blobs = match context.scan(&config.partition).await {
106            Ok(blobs) => blobs,
107            Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
108            Err(err) => return Err(Error::Runtime(err)),
109        };
110
111        // Open all blobs and check for partial records
112        for name in stored_blobs {
113            let (blob, mut len) = context.open(&config.partition, &name).await?;
114            let index = match name.try_into() {
115                Ok(index) => u64::from_be_bytes(index),
116                Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
117            };
118
119            // Check if blob size is aligned to record size
120            let record_size = Record::<V>::SIZE as u64;
121            if len % record_size != 0 {
122                warn!(
123                    blob = index,
124                    invalid_size = len,
125                    record_size,
126                    "blob size is not a multiple of record size, truncating"
127                );
128                len -= len % record_size;
129                blob.resize(len).await?;
130                blob.sync().await?;
131            }
132
133            debug!(blob = index, len, "found index blob");
134            let wrapped_blob = Write::new(blob, len, config.write_buffer);
135            blobs.insert(index, wrapped_blob);
136        }
137
138        // Initialize intervals by scanning existing records
139        debug!(
140            blobs = blobs.len(),
141            "rebuilding intervals from existing index"
142        );
143        let start = context.current();
144        let mut items = 0;
145        let mut intervals = RMap::new();
146        for (section, blob) in &blobs {
147            // Skip if bits are provided and the section is not in the bits
148            if let Some(bits) = &bits {
149                if !bits.contains_key(section) {
150                    warn!(section, "skipping section without bits");
151                    continue;
152                }
153            }
154
155            // Initialize read buffer
156            let size = blob.size().await;
157            let mut replay_blob = ReadBuffer::new(blob.clone(), size, config.replay_buffer);
158
159            // Iterate over all records in the blob
160            let mut offset = 0;
161            let items_per_blob = config.items_per_blob.get();
162            while offset < size {
163                // Calculate index for this record
164                let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
165
166                // If bits are provided, skip if not set
167                let mut must_exist = false;
168                if let Some(bits) = &bits {
169                    // If bits are provided, check if the record exists
170                    let bits = bits.get(section).unwrap();
171                    if let Some(bits) = bits {
172                        let bit_index = offset as usize / Record::<V>::SIZE;
173                        if !bits.get(bit_index as u64) {
174                            offset += Record::<V>::SIZE as u64;
175                            continue;
176                        }
177                    }
178
179                    // If bit section exists but it is empty, we must have all records
180                    must_exist = true;
181                }
182
183                // Attempt to read record at offset
184                replay_blob.seek_to(offset)?;
185                let mut record_buf = vec![0u8; Record::<V>::SIZE];
186                replay_blob
187                    .read_exact(&mut record_buf, Record::<V>::SIZE)
188                    .await?;
189                offset += Record::<V>::SIZE as u64;
190
191                // If record is valid, add to intervals
192                if let Ok(record) = Record::<V>::read(&mut record_buf.as_slice()) {
193                    if record.is_valid() {
194                        items += 1;
195                        intervals.insert(index);
196                        continue;
197                    }
198                };
199
200                // If record is invalid, it may either be empty or corrupted. We only care
201                // which is which if the provided bits indicate that the record must exist.
202                if must_exist {
203                    return Err(Error::MissingRecord(index));
204                }
205            }
206        }
207        debug!(
208            items,
209            elapsed = ?context.current().duration_since(start).unwrap_or_default(),
210            "rebuilt intervals"
211        );
212
213        // Initialize metrics
214        let puts = Counter::default();
215        let gets = Counter::default();
216        let has = Counter::default();
217        let syncs = Counter::default();
218        let pruned = Counter::default();
219        context.register("puts", "Number of put calls", puts.clone());
220        context.register("gets", "Number of get calls", gets.clone());
221        context.register("has", "Number of has calls", has.clone());
222        context.register("syncs", "Number of sync calls", syncs.clone());
223        context.register("pruned", "Number of pruned blobs", pruned.clone());
224
225        Ok(Self {
226            context,
227            config,
228            blobs,
229            intervals,
230            pending: BTreeSet::new(),
231            puts,
232            gets,
233            has,
234            syncs,
235            pruned,
236            _phantom: PhantomData,
237        })
238    }
239
240    /// Add a value at the specified index (pending until sync).
241    pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
242        self.puts.inc();
243
244        // Check if blob exists
245        let items_per_blob = self.config.items_per_blob.get();
246        let section = index / items_per_blob;
247        if let Entry::Vacant(entry) = self.blobs.entry(section) {
248            let (blob, len) = self
249                .context
250                .open(&self.config.partition, &section.to_be_bytes())
251                .await?;
252            entry.insert(Write::new(blob, len, self.config.write_buffer));
253            debug!(section, "created blob");
254        }
255
256        // Write the value to the blob
257        let blob = self.blobs.get(&section).unwrap();
258        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
259        let record = Record::new(value);
260        blob.write_at(record.encode(), offset).await?;
261        self.pending.insert(section);
262
263        // Add to intervals
264        self.intervals.insert(index);
265
266        Ok(())
267    }
268
269    /// Get the value for a given index.
270    pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
271        self.gets.inc();
272
273        // If get isn't in an interval, it doesn't exist and we don't need to access disk
274        if self.intervals.get(&index).is_none() {
275            return Ok(None);
276        }
277
278        // Read from disk
279        let items_per_blob = self.config.items_per_blob.get();
280        let section = index / items_per_blob;
281        let blob = self.blobs.get(&section).unwrap();
282        let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
283        let read_buf = vec![0u8; Record::<V>::SIZE];
284        let read_buf = blob.read_at(read_buf, offset).await?;
285        let record = Record::<V>::read(&mut read_buf.as_ref())?;
286
287        // If record is valid, return it
288        if record.is_valid() {
289            Ok(Some(record.value))
290        } else {
291            Err(Error::InvalidRecord(index))
292        }
293    }
294
295    /// Check if an index exists.
296    pub fn has(&self, index: u64) -> bool {
297        self.has.inc();
298
299        self.intervals.get(&index).is_some()
300    }
301
302    /// Get the next gap information for backfill operations.
303    pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
304        self.intervals.next_gap(index)
305    }
306
307    /// Retrieve the first index in the [Ordinal].
308    pub fn first_index(&self) -> Option<u64> {
309        self.intervals.first_index()
310    }
311
312    /// Retrieve the last index in the [Ordinal].
313    pub fn last_index(&self) -> Option<u64> {
314        self.intervals.last_index()
315    }
316
317    /// Get up to the next `max` missing items after `start`.
318    pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
319        self.intervals.missing_items(start, max)
320    }
321
322    /// Prune indices older than `min` by removing entire blobs.
323    ///
324    /// Pruning is done at blob boundaries to avoid partial deletions. A blob is pruned only if
325    /// all possible indices in that blob are less than `min`.
326    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
327        // Collect sections to remove
328        let items_per_blob = self.config.items_per_blob.get();
329        let min_section = min / items_per_blob;
330        let sections_to_remove: Vec<u64> = self
331            .blobs
332            .keys()
333            .filter(|&&section| section < min_section)
334            .copied()
335            .collect();
336
337        // Remove the collected sections
338        for section in sections_to_remove {
339            if let Some(blob) = self.blobs.remove(&section) {
340                drop(blob);
341                self.context
342                    .remove(&self.config.partition, Some(&section.to_be_bytes()))
343                    .await?;
344
345                // Remove the corresponding index range from intervals
346                let start_index = section * items_per_blob;
347                let end_index = (section + 1) * items_per_blob - 1;
348                self.intervals.remove(start_index, end_index);
349                debug!(section, start_index, end_index, "pruned blob");
350            }
351
352            // Update metrics
353            self.pruned.inc();
354        }
355
356        // Clean pending entries that fall into pruned sections.
357        self.pending.retain(|&section| section >= min_section);
358
359        Ok(())
360    }
361
362    /// Write all pending entries and sync all modified [Blob]s.
363    pub async fn sync(&mut self) -> Result<(), Error> {
364        self.syncs.inc();
365
366        // Sync all modified blobs
367        let mut futures = Vec::with_capacity(self.pending.len());
368        for &section in &self.pending {
369            futures.push(self.blobs.get(&section).unwrap().sync());
370        }
371        try_join_all(futures).await?;
372
373        // Clear pending sections
374        self.pending.clear();
375
376        Ok(())
377    }
378
379    /// Sync all pending entries and [Blob]s.
380    pub async fn close(mut self) -> Result<(), Error> {
381        self.sync().await?;
382        for (_, blob) in take(&mut self.blobs) {
383            blob.sync().await?;
384        }
385        Ok(())
386    }
387
388    /// Destroy [Ordinal] and remove all data.
389    pub async fn destroy(self) -> Result<(), Error> {
390        for (i, blob) in self.blobs.into_iter() {
391            drop(blob);
392            self.context
393                .remove(&self.config.partition, Some(&i.to_be_bytes()))
394                .await?;
395            debug!(section = i, "destroyed blob");
396        }
397        match self.context.remove(&self.config.partition, None).await {
398            Ok(()) => {}
399            Err(RError::PartitionMissing(_)) => {
400                // Partition already removed or never existed.
401            }
402            Err(err) => return Err(Error::Runtime(err)),
403        }
404        Ok(())
405    }
406}