commonware_storage/archive/prunable/
storage.rs

1use super::{Config, Translator};
2use crate::{
3    archive::{Error, Identifier},
4    index::{unordered::Index, Unordered},
5    journal::segmented::variable::{Config as JConfig, Journal},
6    rmap::RMap,
7};
8use bytes::{Buf, BufMut};
9use commonware_codec::{varint::UInt, Codec, EncodeSize, Read, ReadExt, Write};
10use commonware_runtime::{telemetry::metrics::status::GaugeExt, Metrics, Storage};
11use commonware_utils::Array;
12use futures::{future::try_join_all, pin_mut, StreamExt};
13use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
14use std::collections::{BTreeMap, BTreeSet};
15use tracing::debug;
16
17/// Record stored in the `Archive`.
18struct Record<K: Array, V: Codec> {
19    index: u64,
20    key: K,
21    value: V,
22}
23
24impl<K: Array, V: Codec> Record<K, V> {
25    /// Create a new `Record`.
26    const fn new(index: u64, key: K, value: V) -> Self {
27        Self { index, key, value }
28    }
29}
30
31impl<K: Array, V: Codec> Write for Record<K, V> {
32    fn write(&self, buf: &mut impl BufMut) {
33        UInt(self.index).write(buf);
34        self.key.write(buf);
35        self.value.write(buf);
36    }
37}
38
39impl<K: Array, V: Codec> Read for Record<K, V> {
40    type Cfg = V::Cfg;
41
42    fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
43        let index = UInt::read(buf)?.into();
44        let key = K::read(buf)?;
45        let value = V::read_cfg(buf, cfg)?;
46        Ok(Self { index, key, value })
47    }
48}
49
50impl<K: Array, V: Codec> EncodeSize for Record<K, V> {
51    fn encode_size(&self) -> usize {
52        UInt(self.index).encode_size() + K::SIZE + self.value.encode_size()
53    }
54}
55
56#[cfg(feature = "arbitrary")]
57impl<K: Array, V: Codec> arbitrary::Arbitrary<'_> for Record<K, V>
58where
59    K: for<'a> arbitrary::Arbitrary<'a>,
60    V: for<'a> arbitrary::Arbitrary<'a>,
61{
62    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
63        Ok(Self::new(
64            u.arbitrary::<u64>()?,
65            u.arbitrary::<K>()?,
66            u.arbitrary::<V>()?,
67        ))
68    }
69}
70
71/// Implementation of `Archive` storage.
72pub struct Archive<T: Translator, E: Storage + Metrics, K: Array, V: Codec> {
73    items_per_section: u64,
74    journal: Journal<E, Record<K, V>>,
75    pending: BTreeSet<u64>,
76
77    // Oldest allowed section to read from. This is updated when `prune` is called.
78    oldest_allowed: Option<u64>,
79
80    // To efficiently serve `get` and `has` requests, we map a translated representation of each key
81    // to its corresponding index. To avoid iterating over this keys map during pruning, we map said
82    // indexes to their locations in the journal.
83    keys: Index<T, u64>,
84    indices: BTreeMap<u64, u32>,
85    intervals: RMap,
86
87    items_tracked: Gauge,
88    indices_pruned: Counter,
89    unnecessary_reads: Counter,
90    gets: Counter,
91    has: Counter,
92    syncs: Counter,
93}
94
95impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> Archive<T, E, K, V> {
96    /// Calculate the section for a given index.
97    const fn section(&self, index: u64) -> u64 {
98        (index / self.items_per_section) * self.items_per_section
99    }
100
101    /// Initialize a new `Archive` instance.
102    ///
103    /// The in-memory index for `Archive` is populated during this call
104    /// by replaying the journal.
105    pub async fn init(context: E, cfg: Config<T, V::Cfg>) -> Result<Self, Error> {
106        // Initialize journal
107        let journal = Journal::<E, Record<K, V>>::init(
108            context.with_label("journal"),
109            JConfig {
110                partition: cfg.partition,
111                compression: cfg.compression,
112                codec_config: cfg.codec_config,
113                buffer_pool: cfg.buffer_pool,
114                write_buffer: cfg.write_buffer,
115            },
116        )
117        .await?;
118
119        // Initialize keys and run corruption check
120        let mut indices = BTreeMap::new();
121        let mut keys = Index::new(context.with_label("index"), cfg.translator.clone());
122        let mut intervals = RMap::new();
123        {
124            debug!("initializing archive");
125            let stream = journal.replay(0, 0, cfg.replay_buffer).await?;
126            pin_mut!(stream);
127            while let Some(result) = stream.next().await {
128                // Extract key from record
129                let (_, offset, _, data) = result?;
130
131                // Store index
132                indices.insert(data.index, offset);
133
134                // Store index in keys
135                keys.insert(&data.key, data.index);
136
137                // Store index in intervals
138                intervals.insert(data.index);
139            }
140            debug!("archive initialized");
141        }
142
143        // Initialize metrics
144        let items_tracked = Gauge::default();
145        let indices_pruned = Counter::default();
146        let unnecessary_reads = Counter::default();
147        let gets = Counter::default();
148        let has = Counter::default();
149        let syncs = Counter::default();
150        context.register(
151            "items_tracked",
152            "Number of items tracked",
153            items_tracked.clone(),
154        );
155        context.register(
156            "indices_pruned",
157            "Number of indices pruned",
158            indices_pruned.clone(),
159        );
160        context.register(
161            "unnecessary_reads",
162            "Number of unnecessary reads performed during key lookups",
163            unnecessary_reads.clone(),
164        );
165        context.register("gets", "Number of gets performed", gets.clone());
166        context.register("has", "Number of has performed", has.clone());
167        context.register("syncs", "Number of syncs called", syncs.clone());
168        let _ = items_tracked.try_set(indices.len());
169
170        // Return populated archive
171        Ok(Self {
172            items_per_section: cfg.items_per_section.get(),
173            journal,
174            pending: BTreeSet::new(),
175            oldest_allowed: None,
176            indices,
177            intervals,
178            keys,
179            items_tracked,
180            indices_pruned,
181            unnecessary_reads,
182            gets,
183            has,
184            syncs,
185        })
186    }
187
188    async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
189        // Update metrics
190        self.gets.inc();
191
192        // Get index location
193        let offset = match self.indices.get(&index) {
194            Some(offset) => *offset,
195            None => return Ok(None),
196        };
197
198        // Fetch item from disk
199        let section = self.section(index);
200        let record = self.journal.get(section, offset).await?;
201        Ok(Some(record.value))
202    }
203
204    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
205        // Update metrics
206        self.gets.inc();
207
208        // Fetch index
209        let iter = self.keys.get(key);
210        let min_allowed = self.oldest_allowed.unwrap_or(0);
211        for index in iter {
212            // Continue if index is no longer allowed due to pruning.
213            if *index < min_allowed {
214                continue;
215            }
216
217            // Fetch item from disk
218            let offset = *self.indices.get(index).ok_or(Error::RecordCorrupted)?;
219            let section = self.section(*index);
220            let record = self.journal.get(section, offset).await?;
221
222            // Get key from item
223            if record.key.as_ref() == key.as_ref() {
224                return Ok(Some(record.value));
225            }
226            self.unnecessary_reads.inc();
227        }
228
229        Ok(None)
230    }
231
232    fn has_index(&self, index: u64) -> bool {
233        // Check if index exists
234        self.indices.contains_key(&index)
235    }
236
237    /// Prune `Archive` to the provided `min` (masked by the configured
238    /// section mask).
239    ///
240    /// If this is called with a min lower than the last pruned, nothing
241    /// will happen.
242    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
243        // Update `min` to reflect section mask
244        let min = self.section(min);
245
246        // Check if min is less than last pruned
247        if let Some(oldest_allowed) = self.oldest_allowed {
248            if min <= oldest_allowed {
249                // We don't return an error in this case because the caller
250                // shouldn't be burdened with converting `min` to some section.
251                return Ok(());
252            }
253        }
254        debug!(min, "pruning archive");
255
256        // Prune journal
257        self.journal.prune(min).await.map_err(Error::Journal)?;
258
259        // Remove pending writes (no need to call `sync` as we are pruning)
260        loop {
261            let next = match self.pending.iter().next() {
262                Some(section) if *section < min => *section,
263                _ => break,
264            };
265            self.pending.remove(&next);
266        }
267
268        // Remove all indices that are less than min
269        loop {
270            let next = match self.indices.first_key_value() {
271                Some((index, _)) if *index < min => *index,
272                _ => break,
273            };
274            self.indices.remove(&next).unwrap();
275            self.indices_pruned.inc();
276        }
277
278        // Remove all keys from interval tree less than min
279        if min > 0 {
280            self.intervals.remove(0, min - 1);
281        }
282
283        // Update last pruned (to prevent reads from
284        // pruned sections)
285        self.oldest_allowed = Some(min);
286        let _ = self.items_tracked.try_set(self.indices.len());
287        Ok(())
288    }
289}
290
291impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> crate::archive::Archive
292    for Archive<T, E, K, V>
293{
294    type Key = K;
295    type Value = V;
296
297    async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
298        // Check last pruned
299        let oldest_allowed = self.oldest_allowed.unwrap_or(0);
300        if index < oldest_allowed {
301            return Err(Error::AlreadyPrunedTo(oldest_allowed));
302        }
303
304        // Check for existing index
305        if self.indices.contains_key(&index) {
306            return Ok(());
307        }
308
309        // Store item in journal
310        let record = Record::new(index, key.clone(), data);
311        let section = self.section(index);
312        let (offset, _) = self.journal.append(section, record).await?;
313
314        // Store index
315        self.indices.insert(index, offset);
316
317        // Store interval
318        self.intervals.insert(index);
319
320        // Insert and prune any useless keys
321        self.keys
322            .insert_and_prune(&key, index, |v| *v < oldest_allowed);
323
324        // Add section to pending
325        self.pending.insert(section);
326
327        // Update metrics
328        let _ = self.items_tracked.try_set(self.indices.len());
329        Ok(())
330    }
331
332    async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
333        match identifier {
334            Identifier::Index(index) => self.get_index(index).await,
335            Identifier::Key(key) => self.get_key(key).await,
336        }
337    }
338
339    async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
340        self.has.inc();
341        match identifier {
342            Identifier::Index(index) => Ok(self.has_index(index)),
343            Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
344        }
345    }
346
347    async fn sync(&mut self) -> Result<(), Error> {
348        let mut syncs = Vec::with_capacity(self.pending.len());
349        for section in self.pending.iter() {
350            syncs.push(self.journal.sync(*section));
351            self.syncs.inc();
352        }
353        try_join_all(syncs).await?;
354        self.pending.clear();
355        Ok(())
356    }
357
358    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
359        self.intervals.next_gap(index)
360    }
361
362    fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
363        self.intervals.missing_items(index, max)
364    }
365
366    fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
367        self.intervals.iter().map(|(&s, &e)| (s, e))
368    }
369
370    fn first_index(&self) -> Option<u64> {
371        self.intervals.first_index()
372    }
373
374    fn last_index(&self) -> Option<u64> {
375        self.intervals.last_index()
376    }
377
378    async fn close(self) -> Result<(), Error> {
379        self.journal.close().await.map_err(Error::Journal)
380    }
381
382    async fn destroy(self) -> Result<(), Error> {
383        self.journal.destroy().await.map_err(Error::Journal)
384    }
385}
386
387#[cfg(all(test, feature = "arbitrary"))]
388mod conformance {
389    use super::*;
390    use commonware_codec::conformance::CodecConformance;
391    use commonware_utils::sequence::U64;
392
393    commonware_conformance::conformance_tests! {
394        CodecConformance<Record<U64, Vec<u8>>>
395    }
396}