commonware_storage/cache/
storage.rs1use super::{Config, Error};
2use crate::{
3 journal::variable::{Config as JConfig, Journal},
4 rmap::RMap,
5};
6use bytes::{Buf, BufMut};
7use commonware_codec::{varint::UInt, Codec, EncodeSize, Read, ReadExt, Write};
8use commonware_runtime::{Metrics, Storage};
9use futures::{future::try_join_all, pin_mut, StreamExt};
10use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
11use std::collections::{BTreeMap, BTreeSet};
12use tracing::debug;
13
14struct Location {
16 offset: u32,
17 len: u32,
18}
19
20struct Record<V: Codec> {
22 index: u64,
23 value: V,
24}
25
26impl<V: Codec> Record<V> {
27 fn new(index: u64, value: V) -> Self {
29 Self { index, value }
30 }
31}
32
33impl<V: Codec> Write for Record<V> {
34 fn write(&self, buf: &mut impl BufMut) {
35 UInt(self.index).write(buf);
36 self.value.write(buf);
37 }
38}
39
40impl<V: Codec> Read for Record<V> {
41 type Cfg = V::Cfg;
42
43 fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
44 let index = UInt::read(buf)?.into();
45 let value = V::read_cfg(buf, cfg)?;
46 Ok(Self { index, value })
47 }
48}
49
50impl<V: Codec> EncodeSize for Record<V> {
51 fn encode_size(&self) -> usize {
52 UInt(self.index).encode_size() + self.value.encode_size()
53 }
54}
55
56pub struct Cache<E: Storage + Metrics, V: Codec> {
58 items_per_blob: u64,
59 journal: Journal<E, Record<V>>,
60 pending: BTreeSet<u64>,
61
62 oldest_allowed: Option<u64>,
64 indices: BTreeMap<u64, Location>,
65 intervals: RMap,
66
67 items_tracked: Gauge,
68 gets: Counter,
69 has: Counter,
70 syncs: Counter,
71}
72
73impl<E: Storage + Metrics, V: Codec> Cache<E, V> {
74 fn section(&self, index: u64) -> u64 {
76 (index / self.items_per_blob) * self.items_per_blob
77 }
78
79 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
84 let journal = Journal::<E, Record<V>>::init(
86 context.with_label("journal"),
87 JConfig {
88 partition: cfg.partition,
89 compression: cfg.compression,
90 codec_config: cfg.codec_config,
91 buffer_pool: cfg.buffer_pool,
92 write_buffer: cfg.write_buffer,
93 },
94 )
95 .await?;
96
97 let mut indices = BTreeMap::new();
99 let mut intervals = RMap::new();
100 {
101 debug!("initializing cache");
102 let stream = journal.replay(0, 0, cfg.replay_buffer).await?;
103 pin_mut!(stream);
104 while let Some(result) = stream.next().await {
105 let (_, offset, len, data) = result?;
107
108 indices.insert(data.index, Location { offset, len });
110
111 intervals.insert(data.index);
113 }
114 debug!(items = indices.len(), "cache initialized");
115 }
116
117 let items_tracked = Gauge::default();
119 let gets = Counter::default();
120 let has = Counter::default();
121 let syncs = Counter::default();
122 context.register(
123 "items_tracked",
124 "Number of items tracked",
125 items_tracked.clone(),
126 );
127 context.register("gets", "Number of gets performed", gets.clone());
128 context.register("has", "Number of has performed", has.clone());
129 context.register("syncs", "Number of syncs called", syncs.clone());
130 items_tracked.set(indices.len() as i64);
131
132 Ok(Self {
134 items_per_blob: cfg.items_per_blob.get(),
135 journal,
136 pending: BTreeSet::new(),
137 oldest_allowed: None,
138 indices,
139 intervals,
140 items_tracked,
141 gets,
142 has,
143 syncs,
144 })
145 }
146
147 pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
149 self.gets.inc();
151
152 let location = match self.indices.get(&index) {
154 Some(offset) => offset,
155 None => return Ok(None),
156 };
157
158 let section = self.section(index);
160 let record = self
161 .journal
162 .get_exact(section, location.offset, location.len)
163 .await?
164 .ok_or(Error::RecordCorrupted)?;
165 Ok(Some(record.value))
166 }
167
168 pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
170 self.intervals.next_gap(index)
171 }
172
173 pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
175 self.intervals.missing_items(start, max)
176 }
177
178 pub fn has(&self, index: u64) -> bool {
180 self.has.inc();
182
183 self.indices.contains_key(&index)
185 }
186
187 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
192 let min = self.section(min);
194
195 if let Some(oldest_allowed) = self.oldest_allowed {
197 if min <= oldest_allowed {
198 return Ok(());
201 }
202 }
203 debug!(min, "pruning cache");
204
205 self.journal.prune(min).await.map_err(Error::Journal)?;
207
208 loop {
210 let next = match self.pending.iter().next() {
211 Some(section) if *section < min => *section,
212 _ => break,
213 };
214 self.pending.remove(&next);
215 }
216
217 loop {
219 let next = match self.indices.first_key_value() {
220 Some((index, _)) if *index < min => *index,
221 _ => break,
222 };
223 self.indices.remove(&next).unwrap();
224 }
225
226 if min > 0 {
228 self.intervals.remove(0, min - 1);
229 }
230
231 self.oldest_allowed = Some(min);
234 self.items_tracked.set(self.indices.len() as i64);
235 Ok(())
236 }
237
238 pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
242 let oldest_allowed = self.oldest_allowed.unwrap_or(0);
244 if index < oldest_allowed {
245 return Err(Error::AlreadyPrunedTo(oldest_allowed));
246 }
247
248 if self.indices.contains_key(&index) {
250 return Ok(());
251 }
252
253 let record = Record::new(index, value);
255 let section = self.section(index);
256 let (offset, len) = self.journal.append(section, record).await?;
257
258 self.indices.insert(index, Location { offset, len });
260
261 self.intervals.insert(index);
263
264 self.pending.insert(section);
266
267 self.items_tracked.set(self.indices.len() as i64);
269 Ok(())
270 }
271
272 pub async fn sync(&mut self) -> Result<(), Error> {
274 let mut syncs = Vec::with_capacity(self.pending.len());
275 for section in self.pending.iter() {
276 syncs.push(self.journal.sync(*section));
277 self.syncs.inc();
278 }
279 try_join_all(syncs).await?;
280 self.pending.clear();
281 Ok(())
282 }
283
284 pub async fn close(self) -> Result<(), Error> {
288 self.journal.close().await.map_err(Error::Journal)
289 }
290
291 pub async fn destroy(self) -> Result<(), Error> {
293 self.journal.destroy().await.map_err(Error::Journal)
294 }
295}