Skip to main content

commonware_storage/archive/prunable/
storage.rs

1use super::{Config, Translator};
2use crate::{
3    archive::{Error, Identifier},
4    index::{unordered::Index, Unordered},
5    journal::segmented::oversized::{
6        Config as OversizedConfig, Oversized, Record as OversizedRecord,
7    },
8    rmap::RMap,
9};
10use commonware_codec::{CodecShared, FixedSize, Read, ReadExt, Write};
11use commonware_runtime::{telemetry::metrics::status::GaugeExt, Buf, BufMut, Metrics, Storage};
12use commonware_utils::Array;
13use futures::{future::try_join_all, pin_mut, StreamExt};
14use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
15use std::collections::{BTreeMap, BTreeSet};
16use tracing::debug;
17
18/// Index entry for the archive.
19#[derive(Debug, Clone, PartialEq)]
20struct Record<K: Array> {
21    /// The index for this entry.
22    index: u64,
23    /// The key for this entry.
24    key: K,
25    /// Byte offset in value journal (same section).
26    value_offset: u64,
27    /// Size of value data in the value journal.
28    value_size: u32,
29}
30
31impl<K: Array> Record<K> {
32    /// Create a new [Record].
33    const fn new(index: u64, key: K, value_offset: u64, value_size: u32) -> Self {
34        Self {
35            index,
36            key,
37            value_offset,
38            value_size,
39        }
40    }
41}
42
43impl<K: Array> Write for Record<K> {
44    fn write(&self, buf: &mut impl BufMut) {
45        self.index.write(buf);
46        self.key.write(buf);
47        self.value_offset.write(buf);
48        self.value_size.write(buf);
49    }
50}
51
52impl<K: Array> Read for Record<K> {
53    type Cfg = ();
54
55    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
56        let index = u64::read(buf)?;
57        let key = K::read(buf)?;
58        let value_offset = u64::read(buf)?;
59        let value_size = u32::read(buf)?;
60        Ok(Self {
61            index,
62            key,
63            value_offset,
64            value_size,
65        })
66    }
67}
68
69impl<K: Array> FixedSize for Record<K> {
70    // index + key + value_offset + value_size
71    const SIZE: usize = u64::SIZE + K::SIZE + u64::SIZE + u32::SIZE;
72}
73
74impl<K: Array> OversizedRecord for Record<K> {
75    fn value_location(&self) -> (u64, u32) {
76        (self.value_offset, self.value_size)
77    }
78
79    fn with_location(mut self, offset: u64, size: u32) -> Self {
80        self.value_offset = offset;
81        self.value_size = size;
82        self
83    }
84}
85
86#[cfg(feature = "arbitrary")]
87impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
88where
89    K: for<'a> arbitrary::Arbitrary<'a>,
90{
91    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
92        Ok(Self {
93            index: u64::arbitrary(u)?,
94            key: K::arbitrary(u)?,
95            value_offset: u64::arbitrary(u)?,
96            value_size: u32::arbitrary(u)?,
97        })
98    }
99}
100
101/// Implementation of `Archive` storage.
102pub struct Archive<T: Translator, E: Storage + Metrics, K: Array, V: CodecShared> {
103    items_per_section: u64,
104
105    /// Combined index + value storage with crash recovery.
106    oversized: Oversized<E, Record<K>, V>,
107
108    pending: BTreeSet<u64>,
109
110    /// Oldest allowed section to read from. Updated when `prune` is called.
111    oldest_allowed: Option<u64>,
112
113    /// Maps translated key representation to its corresponding index.
114    keys: Index<T, u64>,
115
116    /// Maps index to position in index journal.
117    indices: BTreeMap<u64, u64>,
118
119    /// Interval tracking for gap detection.
120    intervals: RMap,
121
122    // Metrics
123    items_tracked: Gauge,
124    indices_pruned: Counter,
125    unnecessary_reads: Counter,
126    gets: Counter,
127    has: Counter,
128    syncs: Counter,
129}
130
131impl<T: Translator, E: Storage + Metrics, K: Array, V: CodecShared> Archive<T, E, K, V> {
132    /// Calculate the section for a given index.
133    const fn section(&self, index: u64) -> u64 {
134        (index / self.items_per_section) * self.items_per_section
135    }
136
137    /// Initialize a new `Archive` instance.
138    ///
139    /// The in-memory index for `Archive` is populated during this call
140    /// by replaying only the index journal (no values are read).
141    pub async fn init(context: E, cfg: Config<T, V::Cfg>) -> Result<Self, Error> {
142        // Initialize oversized journal
143        let oversized_cfg = OversizedConfig {
144            index_partition: cfg.key_partition,
145            value_partition: cfg.value_partition,
146            index_page_cache: cfg.key_page_cache,
147            index_write_buffer: cfg.key_write_buffer,
148            value_write_buffer: cfg.value_write_buffer,
149            compression: cfg.compression,
150            codec_config: cfg.codec_config,
151        };
152        let oversized: Oversized<E, Record<K>, V> =
153            Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
154
155        // Initialize keys and replay index journal (no values read!)
156        let mut indices = BTreeMap::new();
157        let mut keys = Index::new(context.with_label("index"), cfg.translator.clone());
158        let mut intervals = RMap::new();
159        {
160            debug!("initializing archive from index journal");
161            let stream = oversized.replay(0, 0, cfg.replay_buffer).await?;
162            pin_mut!(stream);
163            while let Some(result) = stream.next().await {
164                let (_section, position, entry) = result?;
165
166                // Store index location (position in index journal)
167                indices.insert(entry.index, position);
168
169                // Store index in keys
170                keys.insert(&entry.key, entry.index);
171
172                // Store index in intervals
173                intervals.insert(entry.index);
174            }
175            debug!("archive initialized");
176        }
177
178        // Initialize metrics
179        let items_tracked = Gauge::default();
180        let indices_pruned = Counter::default();
181        let unnecessary_reads = Counter::default();
182        let gets = Counter::default();
183        let has = Counter::default();
184        let syncs = Counter::default();
185        context.register(
186            "items_tracked",
187            "Number of items tracked",
188            items_tracked.clone(),
189        );
190        context.register(
191            "indices_pruned",
192            "Number of indices pruned",
193            indices_pruned.clone(),
194        );
195        context.register(
196            "unnecessary_reads",
197            "Number of unnecessary reads performed during key lookups",
198            unnecessary_reads.clone(),
199        );
200        context.register("gets", "Number of gets performed", gets.clone());
201        context.register("has", "Number of has performed", has.clone());
202        context.register("syncs", "Number of syncs called", syncs.clone());
203        let _ = items_tracked.try_set(indices.len());
204
205        // Return populated archive
206        Ok(Self {
207            items_per_section: cfg.items_per_section.get(),
208            oversized,
209            pending: BTreeSet::new(),
210            oldest_allowed: None,
211            indices,
212            intervals,
213            keys,
214            items_tracked,
215            indices_pruned,
216            unnecessary_reads,
217            gets,
218            has,
219            syncs,
220        })
221    }
222
223    async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
224        // Update metrics
225        self.gets.inc();
226
227        // Get index location
228        let position = match self.indices.get(&index) {
229            Some(pos) => *pos,
230            None => return Ok(None),
231        };
232
233        // Fetch index entry to get value location
234        let section = self.section(index);
235        let entry = self.oversized.get(section, position).await?;
236        let (value_offset, value_size) = entry.value_location();
237
238        // Fetch value directly from blob storage (bypasses page cache)
239        let value = self
240            .oversized
241            .get_value(section, value_offset, value_size)
242            .await?;
243        Ok(Some(value))
244    }
245
246    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
247        // Update metrics
248        self.gets.inc();
249
250        // Fetch index
251        let iter = self.keys.get(key);
252        let min_allowed = self.oldest_allowed.unwrap_or(0);
253        for index in iter {
254            // Continue if index is no longer allowed due to pruning.
255            if *index < min_allowed {
256                continue;
257            }
258
259            // Get index location
260            let position = *self.indices.get(index).ok_or(Error::RecordCorrupted)?;
261
262            // Fetch index entry from index journal to verify key
263            let section = self.section(*index);
264            let entry = self.oversized.get(section, position).await?;
265
266            // Verify key matches
267            if entry.key.as_ref() == key.as_ref() {
268                // Fetch value directly from blob storage (bypasses page cache)
269                let (value_offset, value_size) = entry.value_location();
270                let value = self
271                    .oversized
272                    .get_value(section, value_offset, value_size)
273                    .await?;
274                return Ok(Some(value));
275            }
276            self.unnecessary_reads.inc();
277        }
278
279        Ok(None)
280    }
281
282    fn has_index(&self, index: u64) -> bool {
283        // Check if index exists
284        self.indices.contains_key(&index)
285    }
286
287    /// Prune `Archive` to the provided `min` (masked by the configured
288    /// section mask).
289    ///
290    /// If this is called with a min lower than the last pruned, nothing
291    /// will happen.
292    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
293        // Update `min` to reflect section mask
294        let min = self.section(min);
295
296        // Check if min is less than last pruned
297        if let Some(oldest_allowed) = self.oldest_allowed {
298            if min <= oldest_allowed {
299                // We don't return an error in this case because the caller
300                // shouldn't be burdened with converting `min` to some section.
301                return Ok(());
302            }
303        }
304        debug!(min, "pruning archive");
305
306        // Prune oversized journal (handles both index and values)
307        self.oversized.prune(min).await?;
308
309        // Remove pending writes (no need to call `sync` as we are pruning)
310        loop {
311            let next = match self.pending.iter().next() {
312                Some(section) if *section < min => *section,
313                _ => break,
314            };
315            self.pending.remove(&next);
316        }
317
318        // Remove all indices that are less than min
319        loop {
320            let next = match self.indices.first_key_value() {
321                Some((index, _)) if *index < min => *index,
322                _ => break,
323            };
324            self.indices.remove(&next).unwrap();
325            self.indices_pruned.inc();
326        }
327
328        // Remove all keys from interval tree less than min
329        if min > 0 {
330            self.intervals.remove(0, min - 1);
331        }
332
333        // Update last pruned (to prevent reads from pruned sections)
334        self.oldest_allowed = Some(min);
335        let _ = self.items_tracked.try_set(self.indices.len());
336        Ok(())
337    }
338}
339
340impl<T: Translator, E: Storage + Metrics, K: Array, V: CodecShared> crate::archive::Archive
341    for Archive<T, E, K, V>
342{
343    type Key = K;
344    type Value = V;
345
346    async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
347        // Check last pruned
348        let oldest_allowed = self.oldest_allowed.unwrap_or(0);
349        if index < oldest_allowed {
350            return Err(Error::AlreadyPrunedTo(oldest_allowed));
351        }
352
353        // Check for existing index
354        if self.indices.contains_key(&index) {
355            return Ok(());
356        }
357
358        // Write value and index entry atomically (glob first, then index)
359        let section = self.section(index);
360        let entry = Record::new(index, key.clone(), 0, 0);
361        let (position, _, _) = self.oversized.append(section, entry, &data).await?;
362
363        // Store index location
364        self.indices.insert(index, position);
365
366        // Store interval
367        self.intervals.insert(index);
368
369        // Insert and prune any useless keys
370        self.keys
371            .insert_and_prune(&key, index, |v| *v < oldest_allowed);
372
373        // Add section to pending
374        self.pending.insert(section);
375
376        // Update metrics
377        let _ = self.items_tracked.try_set(self.indices.len());
378        Ok(())
379    }
380
381    async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
382        match identifier {
383            Identifier::Index(index) => self.get_index(index).await,
384            Identifier::Key(key) => self.get_key(key).await,
385        }
386    }
387
388    async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
389        self.has.inc();
390        match identifier {
391            Identifier::Index(index) => Ok(self.has_index(index)),
392            Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
393        }
394    }
395
396    async fn sync(&mut self) -> Result<(), Error> {
397        // Collect pending sections and update metrics
398        let pending: Vec<u64> = self.pending.iter().copied().collect();
399        self.syncs.inc_by(pending.len() as u64);
400
401        // Sync oversized journal (handles both index and values)
402        let syncs: Vec<_> = pending.iter().map(|s| self.oversized.sync(*s)).collect();
403        try_join_all(syncs).await?;
404
405        self.pending.clear();
406        Ok(())
407    }
408
409    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
410        self.intervals.next_gap(index)
411    }
412
413    fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
414        self.intervals.missing_items(index, max)
415    }
416
417    fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
418        self.intervals.iter().map(|(&s, &e)| (s, e))
419    }
420
421    fn first_index(&self) -> Option<u64> {
422        self.intervals.first_index()
423    }
424
425    fn last_index(&self) -> Option<u64> {
426        self.intervals.last_index()
427    }
428
429    async fn destroy(self) -> Result<(), Error> {
430        Ok(self.oversized.destroy().await?)
431    }
432}
433
434#[cfg(all(test, feature = "arbitrary"))]
435mod conformance {
436    use super::*;
437    use commonware_codec::conformance::CodecConformance;
438    use commonware_utils::sequence::U64;
439
440    commonware_conformance::conformance_tests! {
441        CodecConformance<Record<U64>>
442    }
443}