Skip to main content

commonware_storage/archive/prunable/
storage.rs

1use super::{Config, Translator};
2use crate::{
3    archive::{Error, Identifier},
4    index::{unordered::Index, Unordered},
5    journal::segmented::oversized::{
6        Config as OversizedConfig, Oversized, Record as OversizedRecord,
7    },
8    rmap::RMap,
9};
10use commonware_codec::{CodecShared, FixedSize, Read, ReadExt, Write};
11use commonware_runtime::{
12    telemetry::metrics::status::GaugeExt, Buf, BufMut, BufferPooler, Metrics, Storage,
13};
14use commonware_utils::Array;
15use futures::{future::try_join_all, pin_mut, StreamExt};
16use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
17use std::collections::{btree_map, BTreeMap, BTreeSet};
18use tracing::debug;
19
20/// Index entry for the archive.
21#[derive(Debug, Clone, PartialEq)]
22struct Record<K: Array> {
23    /// The index for this entry.
24    index: u64,
25    /// The key for this entry.
26    key: K,
27    /// Byte offset in value journal (same section).
28    value_offset: u64,
29    /// Size of value data in the value journal.
30    value_size: u32,
31}
32
33impl<K: Array> Record<K> {
34    /// Create a new [Record].
35    const fn new(index: u64, key: K, value_offset: u64, value_size: u32) -> Self {
36        Self {
37            index,
38            key,
39            value_offset,
40            value_size,
41        }
42    }
43}
44
45impl<K: Array> Write for Record<K> {
46    fn write(&self, buf: &mut impl BufMut) {
47        self.index.write(buf);
48        self.key.write(buf);
49        self.value_offset.write(buf);
50        self.value_size.write(buf);
51    }
52}
53
54impl<K: Array> Read for Record<K> {
55    type Cfg = ();
56
57    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
58        let index = u64::read(buf)?;
59        let key = K::read(buf)?;
60        let value_offset = u64::read(buf)?;
61        let value_size = u32::read(buf)?;
62        Ok(Self {
63            index,
64            key,
65            value_offset,
66            value_size,
67        })
68    }
69}
70
71impl<K: Array> FixedSize for Record<K> {
72    // index + key + value_offset + value_size
73    const SIZE: usize = u64::SIZE + K::SIZE + u64::SIZE + u32::SIZE;
74}
75
76impl<K: Array> OversizedRecord for Record<K> {
77    fn value_location(&self) -> (u64, u32) {
78        (self.value_offset, self.value_size)
79    }
80
81    fn with_location(mut self, offset: u64, size: u32) -> Self {
82        self.value_offset = offset;
83        self.value_size = size;
84        self
85    }
86}
87
88#[cfg(feature = "arbitrary")]
89impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
90where
91    K: for<'a> arbitrary::Arbitrary<'a>,
92{
93    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
94        Ok(Self {
95            index: u64::arbitrary(u)?,
96            key: K::arbitrary(u)?,
97            value_offset: u64::arbitrary(u)?,
98            value_size: u32::arbitrary(u)?,
99        })
100    }
101}
102
103/// Implementation of `Archive` storage.
104pub struct Archive<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared> {
105    items_per_section: u64,
106
107    /// Combined index + value storage with crash recovery.
108    oversized: Oversized<E, Record<K>, V>,
109
110    pending: BTreeSet<u64>,
111
112    /// Oldest allowed section to read from. Updated when `prune` is called.
113    oldest_allowed: Option<u64>,
114
115    /// Maps translated key representation to its corresponding index.
116    keys: Index<T, u64>,
117
118    /// Maps index to its first position in the index journal.
119    indices: BTreeMap<u64, u64>,
120
121    /// Additional positions for indices that have more than one entry.
122    /// Only populated when used via [crate::archive::MultiArchive::put_multi].
123    extra_indices: BTreeMap<u64, Vec<u64>>,
124
125    /// Interval tracking for gap detection.
126    intervals: RMap,
127
128    // Metrics
129    items_tracked: Gauge,
130    indices_pruned: Counter,
131    unnecessary_reads: Counter,
132    gets: Counter,
133    has: Counter,
134    syncs: Counter,
135}
136
137impl<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared>
138    Archive<T, E, K, V>
139{
140    /// Calculate the section for a given index.
141    const fn section(&self, index: u64) -> u64 {
142        (index / self.items_per_section) * self.items_per_section
143    }
144
145    /// Iterate over all positions for a given index (first + extras).
146    fn iter_positions(&self, index: u64) -> impl Iterator<Item = u64> + '_ {
147        self.indices.get(&index).into_iter().copied().chain(
148            self.extra_indices
149                .get(&index)
150                .into_iter()
151                .flat_map(|v| v.iter().copied()),
152        )
153    }
154
155    /// Initialize a new `Archive` instance.
156    ///
157    /// The in-memory index for `Archive` is populated during this call
158    /// by replaying only the index journal (no values are read).
159    pub async fn init(context: E, cfg: Config<T, V::Cfg>) -> Result<Self, Error> {
160        // Initialize oversized journal
161        let oversized_cfg = OversizedConfig {
162            index_partition: cfg.key_partition,
163            value_partition: cfg.value_partition,
164            index_page_cache: cfg.key_page_cache,
165            index_write_buffer: cfg.key_write_buffer,
166            value_write_buffer: cfg.value_write_buffer,
167            compression: cfg.compression,
168            codec_config: cfg.codec_config,
169        };
170        let oversized: Oversized<E, Record<K>, V> =
171            Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
172
173        // Initialize keys and replay index journal (no values read!)
174        let mut indices: BTreeMap<u64, u64> = BTreeMap::new();
175        let mut extra_indices: BTreeMap<u64, Vec<u64>> = BTreeMap::new();
176        let mut keys = Index::new(context.with_label("index"), cfg.translator.clone());
177        let mut intervals = RMap::new();
178        {
179            debug!("initializing archive from index journal");
180            let stream = oversized.replay(0, 0, cfg.replay_buffer).await?;
181            pin_mut!(stream);
182            while let Some(result) = stream.next().await {
183                let (_section, position, entry) = result?;
184
185                // Store index location (position in index journal)
186                match indices.entry(entry.index) {
187                    btree_map::Entry::Vacant(e) => {
188                        e.insert(position);
189                    }
190                    btree_map::Entry::Occupied(_) => {
191                        extra_indices.entry(entry.index).or_default().push(position);
192                    }
193                }
194
195                // Store index in keys
196                keys.insert(&entry.key, entry.index);
197
198                // Store index in intervals
199                intervals.insert(entry.index);
200            }
201            debug!("archive initialized");
202        }
203
204        // Initialize metrics
205        let items_tracked = Gauge::default();
206        let indices_pruned = Counter::default();
207        let unnecessary_reads = Counter::default();
208        let gets = Counter::default();
209        let has = Counter::default();
210        let syncs = Counter::default();
211        context.register(
212            "items_tracked",
213            "Number of items tracked",
214            items_tracked.clone(),
215        );
216        context.register(
217            "indices_pruned",
218            "Number of indices pruned",
219            indices_pruned.clone(),
220        );
221        context.register(
222            "unnecessary_reads",
223            "Number of unnecessary reads performed during key lookups",
224            unnecessary_reads.clone(),
225        );
226        context.register("gets", "Number of gets performed", gets.clone());
227        context.register("has", "Number of has performed", has.clone());
228        context.register("syncs", "Number of syncs called", syncs.clone());
229        let _ = items_tracked.try_set(indices.len());
230
231        // Return populated archive
232        Ok(Self {
233            items_per_section: cfg.items_per_section.get(),
234            oversized,
235            pending: BTreeSet::new(),
236            oldest_allowed: None,
237            indices,
238            extra_indices,
239            intervals,
240            keys,
241            items_tracked,
242            indices_pruned,
243            unnecessary_reads,
244            gets,
245            has,
246            syncs,
247        })
248    }
249
250    async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
251        // Update metrics
252        self.gets.inc();
253
254        // Get first position at this index
255        let position = match self.indices.get(&index) {
256            Some(&position) => position,
257            None => return Ok(None),
258        };
259
260        // Fetch index entry to get value location
261        let section = self.section(index);
262        let entry = self.oversized.get(section, position).await?;
263        let (value_offset, value_size) = entry.value_location();
264
265        // Fetch value directly from blob storage (bypasses page cache)
266        let value = self
267            .oversized
268            .get_value(section, value_offset, value_size)
269            .await?;
270        Ok(Some(value))
271    }
272
273    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
274        // Update metrics
275        self.gets.inc();
276
277        // Fetch index
278        let iter = self.keys.get(key);
279        let min_allowed = self.oldest_allowed.unwrap_or(0);
280        for index in iter {
281            // Continue if index is no longer allowed due to pruning.
282            if *index < min_allowed {
283                continue;
284            }
285
286            // Get all positions at this index
287            if !self.indices.contains_key(index) {
288                return Err(Error::RecordCorrupted);
289            }
290            let section = self.section(*index);
291
292            for position in self.iter_positions(*index) {
293                // Fetch index entry from index journal to verify key
294                let entry = self.oversized.get(section, position).await?;
295
296                // Verify key matches
297                if entry.key.as_ref() == key.as_ref() {
298                    // Fetch value directly from blob storage (bypasses page cache)
299                    let (value_offset, value_size) = entry.value_location();
300                    let value = self
301                        .oversized
302                        .get_value(section, value_offset, value_size)
303                        .await?;
304                    return Ok(Some(value));
305                }
306                self.unnecessary_reads.inc();
307            }
308        }
309
310        Ok(None)
311    }
312
313    fn has_index(&self, index: u64) -> bool {
314        // Check if index exists
315        self.indices.contains_key(&index)
316    }
317
318    async fn put_internal(
319        &mut self,
320        index: u64,
321        key: K,
322        data: V,
323        skip_if_index_exists: bool,
324    ) -> Result<(), Error> {
325        // Check last pruned
326        let oldest_allowed = self.oldest_allowed.unwrap_or(0);
327        if index < oldest_allowed {
328            return Err(Error::AlreadyPrunedTo(oldest_allowed));
329        }
330
331        // Check for existing index when enforcing single-item semantics.
332        if skip_if_index_exists && self.indices.contains_key(&index) {
333            return Ok(());
334        }
335
336        // Write value and index entry atomically (glob first, then index)
337        let section = self.section(index);
338        let entry = Record::new(index, key.clone(), 0, 0);
339        let (position, _, _) = self.oversized.append(section, entry, &data).await?;
340
341        // Store index location
342        match self.indices.entry(index) {
343            btree_map::Entry::Vacant(e) => {
344                e.insert(position);
345            }
346            btree_map::Entry::Occupied(_) => {
347                self.extra_indices.entry(index).or_default().push(position);
348            }
349        }
350
351        // Store interval
352        self.intervals.insert(index);
353
354        // Insert and prune any useless keys
355        self.keys
356            .insert_and_prune(&key, index, |v| *v < oldest_allowed);
357
358        // Add section to pending
359        self.pending.insert(section);
360
361        // Update metrics
362        let _ = self.items_tracked.try_set(self.indices.len());
363        Ok(())
364    }
365
366    /// Prune `Archive` to the provided `min` (masked by the configured
367    /// section mask).
368    ///
369    /// If this is called with a min lower than the last pruned, nothing
370    /// will happen.
371    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
372        // Update `min` to reflect section mask
373        let min = self.section(min);
374
375        // Check if min is less than last pruned
376        if let Some(oldest_allowed) = self.oldest_allowed {
377            if min <= oldest_allowed {
378                // We don't return an error in this case because the caller
379                // shouldn't be burdened with converting `min` to some section.
380                return Ok(());
381            }
382        }
383        debug!(min, "pruning archive");
384
385        // Prune oversized journal (handles both index and values)
386        self.oversized.prune(min).await?;
387
388        // Remove pending writes (no need to call `sync` as we are pruning)
389        loop {
390            let next = match self.pending.iter().next() {
391                Some(section) if *section < min => *section,
392                _ => break,
393            };
394            self.pending.remove(&next);
395        }
396
397        // Remove all indices that are less than min
398        loop {
399            let next = match self.indices.first_key_value() {
400                Some((index, _)) if *index < min => *index,
401                _ => break,
402            };
403            self.indices.remove(&next).unwrap();
404            self.extra_indices.remove(&next);
405            self.indices_pruned.inc();
406        }
407
408        // Remove all keys from interval tree less than min
409        if min > 0 {
410            self.intervals.remove(0, min - 1);
411        }
412
413        // Update last pruned (to prevent reads from pruned sections)
414        self.oldest_allowed = Some(min);
415        let _ = self.items_tracked.try_set(self.indices.len());
416        Ok(())
417    }
418}
419
420impl<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared>
421    crate::archive::Archive for Archive<T, E, K, V>
422{
423    type Key = K;
424    type Value = V;
425
426    async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
427        self.put_internal(index, key, data, true).await
428    }
429
430    async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
431        match identifier {
432            Identifier::Index(index) => self.get_index(index).await,
433            Identifier::Key(key) => self.get_key(key).await,
434        }
435    }
436
437    async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
438        self.has.inc();
439        match identifier {
440            Identifier::Index(index) => Ok(self.has_index(index)),
441            Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
442        }
443    }
444
445    async fn sync(&mut self) -> Result<(), Error> {
446        // Collect pending sections and update metrics
447        let pending: Vec<u64> = self.pending.iter().copied().collect();
448        self.syncs.inc_by(pending.len() as u64);
449
450        // Sync oversized journal (handles both index and values)
451        let syncs: Vec<_> = pending.iter().map(|s| self.oversized.sync(*s)).collect();
452        try_join_all(syncs).await?;
453
454        self.pending.clear();
455        Ok(())
456    }
457
458    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
459        self.intervals.next_gap(index)
460    }
461
462    fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
463        self.intervals.missing_items(index, max)
464    }
465
466    fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
467        self.intervals.iter().map(|(&s, &e)| (s, e))
468    }
469
470    fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)> {
471        self.intervals.iter_from(from).map(|(&s, &e)| (s, e))
472    }
473
474    fn first_index(&self) -> Option<u64> {
475        self.intervals.first_index()
476    }
477
478    fn last_index(&self) -> Option<u64> {
479        self.intervals.last_index()
480    }
481
482    async fn destroy(self) -> Result<(), Error> {
483        Ok(self.oversized.destroy().await?)
484    }
485}
486
487impl<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared>
488    crate::archive::MultiArchive for Archive<T, E, K, V>
489{
490    async fn get_all(&self, index: u64) -> Result<Option<Vec<V>>, Error> {
491        // Update metrics
492        self.gets.inc();
493
494        // Check if the index exists.
495        if !self.indices.contains_key(&index) {
496            return Ok(None);
497        }
498
499        // Get all positions at this index
500        let section = self.section(index);
501        let extra_count = self.extra_indices.get(&index).map_or(0, Vec::len);
502
503        let mut values = Vec::with_capacity(1 + extra_count);
504        for position in self.iter_positions(index) {
505            // Fetch index entry from index journal to verify key
506            let entry = self.oversized.get(section, position).await?;
507
508            // Fetch value directly from blob storage (bypasses page cache)
509            let (value_offset, value_size) = entry.value_location();
510            let value = self
511                .oversized
512                .get_value(section, value_offset, value_size)
513                .await?;
514            values.push(value);
515        }
516        Ok(Some(values))
517    }
518
519    async fn put_multi(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
520        self.put_internal(index, key, data, false).await
521    }
522}
523
524#[cfg(all(test, feature = "arbitrary"))]
525mod conformance {
526    use super::*;
527    use commonware_codec::conformance::CodecConformance;
528    use commonware_utils::sequence::U64;
529
530    commonware_conformance::conformance_tests! {
531        CodecConformance<Record<U64>>
532    }
533}