commonware_storage/cache/
storage.rs

1use super::{Config, Error};
2use crate::{
3    journal::variable::{Config as JConfig, Journal},
4    rmap::RMap,
5};
6use bytes::{Buf, BufMut};
7use commonware_codec::{varint::UInt, Codec, EncodeSize, Read, ReadExt, Write};
8use commonware_runtime::{Metrics, Storage};
9use futures::{future::try_join_all, pin_mut, StreamExt};
10use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
11use std::collections::{BTreeMap, BTreeSet};
12use tracing::debug;
13
14/// Location of a record in `Journal`.
15struct Location {
16    offset: u32,
17    len: u32,
18}
19
20/// Record stored in the `Cache`.
21struct Record<V: Codec> {
22    index: u64,
23    value: V,
24}
25
26impl<V: Codec> Record<V> {
27    /// Create a new `Record`.
28    fn new(index: u64, value: V) -> Self {
29        Self { index, value }
30    }
31}
32
33impl<V: Codec> Write for Record<V> {
34    fn write(&self, buf: &mut impl BufMut) {
35        UInt(self.index).write(buf);
36        self.value.write(buf);
37    }
38}
39
40impl<V: Codec> Read for Record<V> {
41    type Cfg = V::Cfg;
42
43    fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
44        let index = UInt::read(buf)?.into();
45        let value = V::read_cfg(buf, cfg)?;
46        Ok(Self { index, value })
47    }
48}
49
50impl<V: Codec> EncodeSize for Record<V> {
51    fn encode_size(&self) -> usize {
52        UInt(self.index).encode_size() + self.value.encode_size()
53    }
54}
55
56/// Implementation of `Cache` storage.
57pub struct Cache<E: Storage + Metrics, V: Codec> {
58    items_per_blob: u64,
59    journal: Journal<E, Record<V>>,
60    pending: BTreeSet<u64>,
61
62    // Oldest allowed section to read from. This is updated when `prune` is called.
63    oldest_allowed: Option<u64>,
64    indices: BTreeMap<u64, Location>,
65    intervals: RMap,
66
67    items_tracked: Gauge,
68    gets: Counter,
69    has: Counter,
70    syncs: Counter,
71}
72
73impl<E: Storage + Metrics, V: Codec> Cache<E, V> {
74    /// Calculate the section for a given index.
75    fn section(&self, index: u64) -> u64 {
76        (index / self.items_per_blob) * self.items_per_blob
77    }
78
79    /// Initialize a new `Cache` instance.
80    ///
81    /// The in-memory index for `Cache` is populated during this call
82    /// by replaying the journal.
83    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
84        // Initialize journal
85        let journal = Journal::<E, Record<V>>::init(
86            context.with_label("journal"),
87            JConfig {
88                partition: cfg.partition,
89                compression: cfg.compression,
90                codec_config: cfg.codec_config,
91                buffer_pool: cfg.buffer_pool,
92                write_buffer: cfg.write_buffer,
93            },
94        )
95        .await?;
96
97        // Initialize keys and run corruption check
98        let mut indices = BTreeMap::new();
99        let mut intervals = RMap::new();
100        {
101            debug!("initializing cache");
102            let stream = journal.replay(0, 0, cfg.replay_buffer).await?;
103            pin_mut!(stream);
104            while let Some(result) = stream.next().await {
105                // Extract key from record
106                let (_, offset, len, data) = result?;
107
108                // Store index
109                indices.insert(data.index, Location { offset, len });
110
111                // Store index in intervals
112                intervals.insert(data.index);
113            }
114            debug!(items = indices.len(), "cache initialized");
115        }
116
117        // Initialize metrics
118        let items_tracked = Gauge::default();
119        let gets = Counter::default();
120        let has = Counter::default();
121        let syncs = Counter::default();
122        context.register(
123            "items_tracked",
124            "Number of items tracked",
125            items_tracked.clone(),
126        );
127        context.register("gets", "Number of gets performed", gets.clone());
128        context.register("has", "Number of has performed", has.clone());
129        context.register("syncs", "Number of syncs called", syncs.clone());
130        items_tracked.set(indices.len() as i64);
131
132        // Return populated cache
133        Ok(Self {
134            items_per_blob: cfg.items_per_blob.get(),
135            journal,
136            pending: BTreeSet::new(),
137            oldest_allowed: None,
138            indices,
139            intervals,
140            items_tracked,
141            gets,
142            has,
143            syncs,
144        })
145    }
146
147    /// Retrieve an item from the [Cache].
148    pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
149        // Update metrics
150        self.gets.inc();
151
152        // Get index location
153        let location = match self.indices.get(&index) {
154            Some(offset) => offset,
155            None => return Ok(None),
156        };
157
158        // Fetch item from disk
159        let section = self.section(index);
160        let record = self
161            .journal
162            .get_exact(section, location.offset, location.len)
163            .await?
164            .ok_or(Error::RecordCorrupted)?;
165        Ok(Some(record.value))
166    }
167
168    /// Retrieve the next gap in the [Cache].
169    pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
170        self.intervals.next_gap(index)
171    }
172
173    /// Get up to the next `max` missing items after `start`.
174    pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
175        self.intervals.missing_items(start, max)
176    }
177
178    /// Check if an item exists in the [Cache].
179    pub fn has(&self, index: u64) -> bool {
180        // Update metrics
181        self.has.inc();
182
183        // Check if index exists
184        self.indices.contains_key(&index)
185    }
186
187    /// Prune [Cache] to the provided `min`.
188    ///
189    /// If this is called with a min lower than the last pruned, nothing
190    /// will happen.
191    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
192        // Update `min` to reflect section mask
193        let min = self.section(min);
194
195        // Check if min is less than last pruned
196        if let Some(oldest_allowed) = self.oldest_allowed {
197            if min <= oldest_allowed {
198                // We don't return an error in this case because the caller
199                // shouldn't be burdened with converting `min` to some section.
200                return Ok(());
201            }
202        }
203        debug!(min, "pruning cache");
204
205        // Prune journal
206        self.journal.prune(min).await.map_err(Error::Journal)?;
207
208        // Remove pending writes (no need to call `sync` as we are pruning)
209        loop {
210            let next = match self.pending.iter().next() {
211                Some(section) if *section < min => *section,
212                _ => break,
213            };
214            self.pending.remove(&next);
215        }
216
217        // Remove all indices that are less than min
218        loop {
219            let next = match self.indices.first_key_value() {
220                Some((index, _)) if *index < min => *index,
221                _ => break,
222            };
223            self.indices.remove(&next).unwrap();
224        }
225
226        // Remove all intervals that are less than min
227        if min > 0 {
228            self.intervals.remove(0, min - 1);
229        }
230
231        // Update last pruned (to prevent reads from
232        // pruned sections)
233        self.oldest_allowed = Some(min);
234        self.items_tracked.set(self.indices.len() as i64);
235        Ok(())
236    }
237
238    /// Store an item in the [Cache].
239    ///
240    /// If the index already exists, put does nothing and returns.
241    pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
242        // Check last pruned
243        let oldest_allowed = self.oldest_allowed.unwrap_or(0);
244        if index < oldest_allowed {
245            return Err(Error::AlreadyPrunedTo(oldest_allowed));
246        }
247
248        // Check for existing index
249        if self.indices.contains_key(&index) {
250            return Ok(());
251        }
252
253        // Store item in journal
254        let record = Record::new(index, value);
255        let section = self.section(index);
256        let (offset, len) = self.journal.append(section, record).await?;
257
258        // Store index
259        self.indices.insert(index, Location { offset, len });
260
261        // Add index to intervals
262        self.intervals.insert(index);
263
264        // Add section to pending
265        self.pending.insert(section);
266
267        // Update metrics
268        self.items_tracked.set(self.indices.len() as i64);
269        Ok(())
270    }
271
272    /// Sync all pending writes.
273    pub async fn sync(&mut self) -> Result<(), Error> {
274        let mut syncs = Vec::with_capacity(self.pending.len());
275        for section in self.pending.iter() {
276            syncs.push(self.journal.sync(*section));
277            self.syncs.inc();
278        }
279        try_join_all(syncs).await?;
280        self.pending.clear();
281        Ok(())
282    }
283
284    /// Close the [Cache].
285    ///
286    /// Any pending writes will be synced prior to closing.
287    pub async fn close(self) -> Result<(), Error> {
288        self.journal.close().await.map_err(Error::Journal)
289    }
290
291    /// Remove all persistent data created by this [Cache].
292    pub async fn destroy(self) -> Result<(), Error> {
293        self.journal.destroy().await.map_err(Error::Journal)
294    }
295}