commonware_storage/archive/prunable/
storage.rs

1use super::{Config, Translator};
2use crate::{
3    archive::{Error, Identifier},
4    index::Index,
5    journal::variable::{Config as JConfig, Journal},
6    rmap::RMap,
7};
8use bytes::{Buf, BufMut};
9use commonware_codec::{varint::UInt, Codec, EncodeSize, Read, ReadExt, Write};
10use commonware_runtime::{Metrics, Storage};
11use commonware_utils::Array;
12use futures::{future::try_join_all, pin_mut, StreamExt};
13use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
14use std::collections::{BTreeMap, BTreeSet};
15use tracing::debug;
16
17/// Location of a record in `Journal`.
18struct Location {
19    offset: u32,
20    len: u32,
21}
22
23/// Record stored in the `Archive`.
24struct Record<K: Array, V: Codec> {
25    index: u64,
26    key: K,
27    value: V,
28}
29
30impl<K: Array, V: Codec> Record<K, V> {
31    /// Create a new `Record`.
32    fn new(index: u64, key: K, value: V) -> Self {
33        Self { index, key, value }
34    }
35}
36
37impl<K: Array, V: Codec> Write for Record<K, V> {
38    fn write(&self, buf: &mut impl BufMut) {
39        UInt(self.index).write(buf);
40        self.key.write(buf);
41        self.value.write(buf);
42    }
43}
44
45impl<K: Array, V: Codec> Read for Record<K, V> {
46    type Cfg = V::Cfg;
47
48    fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
49        let index = UInt::read(buf)?.into();
50        let key = K::read(buf)?;
51        let value = V::read_cfg(buf, cfg)?;
52        Ok(Self { index, key, value })
53    }
54}
55
56impl<K: Array, V: Codec> EncodeSize for Record<K, V> {
57    fn encode_size(&self) -> usize {
58        UInt(self.index).encode_size() + K::SIZE + self.value.encode_size()
59    }
60}
61
62/// Implementation of `Archive` storage.
63pub struct Archive<T: Translator, E: Storage + Metrics, K: Array, V: Codec> {
64    items_per_section: u64,
65    journal: Journal<E, Record<K, V>>,
66    pending: BTreeSet<u64>,
67
68    // Oldest allowed section to read from. This is updated when `prune` is called.
69    oldest_allowed: Option<u64>,
70
71    // To efficiently serve `get` and `has` requests, we map a translated representation of each key
72    // to its corresponding index. To avoid iterating over this keys map during pruning, we map said
73    // indexes to their locations in the journal.
74    keys: Index<T, u64>,
75    indices: BTreeMap<u64, Location>,
76    intervals: RMap,
77
78    items_tracked: Gauge,
79    indices_pruned: Counter,
80    unnecessary_reads: Counter,
81    gets: Counter,
82    has: Counter,
83    syncs: Counter,
84}
85
86impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> Archive<T, E, K, V> {
87    /// Calculate the section for a given index.
88    fn section(&self, index: u64) -> u64 {
89        (index / self.items_per_section) * self.items_per_section
90    }
91
92    /// Initialize a new `Archive` instance.
93    ///
94    /// The in-memory index for `Archive` is populated during this call
95    /// by replaying the journal.
96    pub async fn init(context: E, cfg: Config<T, V::Cfg>) -> Result<Self, Error> {
97        // Initialize journal
98        let journal = Journal::<E, Record<K, V>>::init(
99            context.with_label("journal"),
100            JConfig {
101                partition: cfg.partition,
102                compression: cfg.compression,
103                codec_config: cfg.codec_config,
104                buffer_pool: cfg.buffer_pool,
105                write_buffer: cfg.write_buffer,
106            },
107        )
108        .await?;
109
110        // Initialize keys and run corruption check
111        let mut indices = BTreeMap::new();
112        let mut keys = Index::init(context.with_label("index"), cfg.translator.clone());
113        let mut intervals = RMap::new();
114        {
115            debug!("initializing archive");
116            let stream = journal.replay(0, 0, cfg.replay_buffer).await?;
117            pin_mut!(stream);
118            while let Some(result) = stream.next().await {
119                // Extract key from record
120                let (_, offset, len, data) = result?;
121
122                // Store index
123                indices.insert(data.index, Location { offset, len });
124
125                // Store index in keys
126                keys.insert(&data.key, data.index);
127
128                // Store index in intervals
129                intervals.insert(data.index);
130            }
131            debug!(keys = keys.keys(), "archive initialized");
132        }
133
134        // Initialize metrics
135        let items_tracked = Gauge::default();
136        let indices_pruned = Counter::default();
137        let unnecessary_reads = Counter::default();
138        let gets = Counter::default();
139        let has = Counter::default();
140        let syncs = Counter::default();
141        context.register(
142            "items_tracked",
143            "Number of items tracked",
144            items_tracked.clone(),
145        );
146        context.register(
147            "indices_pruned",
148            "Number of indices pruned",
149            indices_pruned.clone(),
150        );
151        context.register(
152            "unnecessary_reads",
153            "Number of unnecessary reads performed during key lookups",
154            unnecessary_reads.clone(),
155        );
156        context.register("gets", "Number of gets performed", gets.clone());
157        context.register("has", "Number of has performed", has.clone());
158        context.register("syncs", "Number of syncs called", syncs.clone());
159        items_tracked.set(indices.len() as i64);
160
161        // Return populated archive
162        Ok(Self {
163            items_per_section: cfg.items_per_section.get(),
164            journal,
165            pending: BTreeSet::new(),
166            oldest_allowed: None,
167            indices,
168            intervals,
169            keys,
170            items_tracked,
171            indices_pruned,
172            unnecessary_reads,
173            gets,
174            has,
175            syncs,
176        })
177    }
178
179    async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
180        // Update metrics
181        self.gets.inc();
182
183        // Get index location
184        let location = match self.indices.get(&index) {
185            Some(offset) => offset,
186            None => return Ok(None),
187        };
188
189        // Fetch item from disk
190        let section = self.section(index);
191        let record = self
192            .journal
193            .get_exact(section, location.offset, location.len)
194            .await?;
195        Ok(Some(record.value))
196    }
197
198    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
199        // Update metrics
200        self.gets.inc();
201
202        // Fetch index
203        let iter = self.keys.get(key);
204        let min_allowed = self.oldest_allowed.unwrap_or(0);
205        for index in iter {
206            // Continue if index is no longer allowed due to pruning.
207            if *index < min_allowed {
208                continue;
209            }
210
211            // Fetch item from disk
212            let location = self.indices.get(index).ok_or(Error::RecordCorrupted)?;
213            let section = self.section(*index);
214            let record = self
215                .journal
216                .get_exact(section, location.offset, location.len)
217                .await?;
218
219            // Get key from item
220            if record.key.as_ref() == key.as_ref() {
221                return Ok(Some(record.value));
222            }
223            self.unnecessary_reads.inc();
224        }
225
226        Ok(None)
227    }
228
229    fn has_index(&self, index: u64) -> bool {
230        // Check if index exists
231        self.indices.contains_key(&index)
232    }
233
234    /// Prune `Archive` to the provided `min` (masked by the configured
235    /// section mask).
236    ///
237    /// If this is called with a min lower than the last pruned, nothing
238    /// will happen.
239    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
240        // Update `min` to reflect section mask
241        let min = self.section(min);
242
243        // Check if min is less than last pruned
244        if let Some(oldest_allowed) = self.oldest_allowed {
245            if min <= oldest_allowed {
246                // We don't return an error in this case because the caller
247                // shouldn't be burdened with converting `min` to some section.
248                return Ok(());
249            }
250        }
251        debug!(min, "pruning archive");
252
253        // Prune journal
254        self.journal.prune(min).await.map_err(Error::Journal)?;
255
256        // Remove pending writes (no need to call `sync` as we are pruning)
257        loop {
258            let next = match self.pending.iter().next() {
259                Some(section) if *section < min => *section,
260                _ => break,
261            };
262            self.pending.remove(&next);
263        }
264
265        // Remove all indices that are less than min
266        loop {
267            let next = match self.indices.first_key_value() {
268                Some((index, _)) if *index < min => *index,
269                _ => break,
270            };
271            self.indices.remove(&next).unwrap();
272            self.indices_pruned.inc();
273        }
274
275        // Remove all keys from interval tree less than min
276        if min > 0 {
277            self.intervals.remove(0, min - 1);
278        }
279
280        // Update last pruned (to prevent reads from
281        // pruned sections)
282        self.oldest_allowed = Some(min);
283        self.items_tracked.set(self.indices.len() as i64);
284        Ok(())
285    }
286}
287
288impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> crate::archive::Archive
289    for Archive<T, E, K, V>
290{
291    type Key = K;
292    type Value = V;
293
294    async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
295        // Check last pruned
296        let oldest_allowed = self.oldest_allowed.unwrap_or(0);
297        if index < oldest_allowed {
298            return Err(Error::AlreadyPrunedTo(oldest_allowed));
299        }
300
301        // Check for existing index
302        if self.indices.contains_key(&index) {
303            return Ok(());
304        }
305
306        // Store item in journal
307        let record = Record::new(index, key.clone(), data);
308        let section = self.section(index);
309        let (offset, len) = self.journal.append(section, record).await?;
310
311        // Store index
312        self.indices.insert(index, Location { offset, len });
313
314        // Store interval
315        self.intervals.insert(index);
316
317        // Insert and prune any useless keys
318        self.keys
319            .insert_and_prune(&key, index, |v| *v < oldest_allowed);
320
321        // Add section to pending
322        self.pending.insert(section);
323
324        // Update metrics
325        self.items_tracked.set(self.indices.len() as i64);
326        Ok(())
327    }
328
329    async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
330        match identifier {
331            Identifier::Index(index) => self.get_index(index).await,
332            Identifier::Key(key) => self.get_key(key).await,
333        }
334    }
335
336    async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
337        self.has.inc();
338        match identifier {
339            Identifier::Index(index) => Ok(self.has_index(index)),
340            Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
341        }
342    }
343
344    async fn sync(&mut self) -> Result<(), Error> {
345        let mut syncs = Vec::with_capacity(self.pending.len());
346        for section in self.pending.iter() {
347            syncs.push(self.journal.sync(*section));
348            self.syncs.inc();
349        }
350        try_join_all(syncs).await?;
351        self.pending.clear();
352        Ok(())
353    }
354
355    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
356        self.intervals.next_gap(index)
357    }
358
359    async fn close(self) -> Result<(), Error> {
360        self.journal.close().await.map_err(Error::Journal)
361    }
362
363    async fn destroy(self) -> Result<(), Error> {
364        self.journal.destroy().await.map_err(Error::Journal)
365    }
366}