1use super::{Config, Translator};
2use crate::{
3 archive::{Error, Identifier},
4 index::{unordered::Index, Unordered},
5 journal::segmented::variable::{Config as JConfig, Journal},
6 rmap::RMap,
7};
8use bytes::{Buf, BufMut};
9use commonware_codec::{varint::UInt, Codec, EncodeSize, Read, ReadExt, Write};
10use commonware_runtime::{telemetry::metrics::status::GaugeExt, Metrics, Storage};
11use commonware_utils::Array;
12use futures::{future::try_join_all, pin_mut, StreamExt};
13use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
14use std::collections::{BTreeMap, BTreeSet};
15use tracing::debug;
16
17struct Record<K: Array, V: Codec> {
19 index: u64,
20 key: K,
21 value: V,
22}
23
24impl<K: Array, V: Codec> Record<K, V> {
25 const fn new(index: u64, key: K, value: V) -> Self {
27 Self { index, key, value }
28 }
29}
30
31impl<K: Array, V: Codec> Write for Record<K, V> {
32 fn write(&self, buf: &mut impl BufMut) {
33 UInt(self.index).write(buf);
34 self.key.write(buf);
35 self.value.write(buf);
36 }
37}
38
39impl<K: Array, V: Codec> Read for Record<K, V> {
40 type Cfg = V::Cfg;
41
42 fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
43 let index = UInt::read(buf)?.into();
44 let key = K::read(buf)?;
45 let value = V::read_cfg(buf, cfg)?;
46 Ok(Self { index, key, value })
47 }
48}
49
50impl<K: Array, V: Codec> EncodeSize for Record<K, V> {
51 fn encode_size(&self) -> usize {
52 UInt(self.index).encode_size() + K::SIZE + self.value.encode_size()
53 }
54}
55
56#[cfg(feature = "arbitrary")]
57impl<K: Array, V: Codec> arbitrary::Arbitrary<'_> for Record<K, V>
58where
59 K: for<'a> arbitrary::Arbitrary<'a>,
60 V: for<'a> arbitrary::Arbitrary<'a>,
61{
62 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
63 Ok(Self::new(
64 u.arbitrary::<u64>()?,
65 u.arbitrary::<K>()?,
66 u.arbitrary::<V>()?,
67 ))
68 }
69}
70
71pub struct Archive<T: Translator, E: Storage + Metrics, K: Array, V: Codec> {
73 items_per_section: u64,
74 journal: Journal<E, Record<K, V>>,
75 pending: BTreeSet<u64>,
76
77 oldest_allowed: Option<u64>,
79
80 keys: Index<T, u64>,
84 indices: BTreeMap<u64, u32>,
85 intervals: RMap,
86
87 items_tracked: Gauge,
88 indices_pruned: Counter,
89 unnecessary_reads: Counter,
90 gets: Counter,
91 has: Counter,
92 syncs: Counter,
93}
94
95impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> Archive<T, E, K, V> {
96 const fn section(&self, index: u64) -> u64 {
98 (index / self.items_per_section) * self.items_per_section
99 }
100
101 pub async fn init(context: E, cfg: Config<T, V::Cfg>) -> Result<Self, Error> {
106 let journal = Journal::<E, Record<K, V>>::init(
108 context.with_label("journal"),
109 JConfig {
110 partition: cfg.partition,
111 compression: cfg.compression,
112 codec_config: cfg.codec_config,
113 buffer_pool: cfg.buffer_pool,
114 write_buffer: cfg.write_buffer,
115 },
116 )
117 .await?;
118
119 let mut indices = BTreeMap::new();
121 let mut keys = Index::new(context.with_label("index"), cfg.translator.clone());
122 let mut intervals = RMap::new();
123 {
124 debug!("initializing archive");
125 let stream = journal.replay(0, 0, cfg.replay_buffer).await?;
126 pin_mut!(stream);
127 while let Some(result) = stream.next().await {
128 let (_, offset, _, data) = result?;
130
131 indices.insert(data.index, offset);
133
134 keys.insert(&data.key, data.index);
136
137 intervals.insert(data.index);
139 }
140 debug!("archive initialized");
141 }
142
143 let items_tracked = Gauge::default();
145 let indices_pruned = Counter::default();
146 let unnecessary_reads = Counter::default();
147 let gets = Counter::default();
148 let has = Counter::default();
149 let syncs = Counter::default();
150 context.register(
151 "items_tracked",
152 "Number of items tracked",
153 items_tracked.clone(),
154 );
155 context.register(
156 "indices_pruned",
157 "Number of indices pruned",
158 indices_pruned.clone(),
159 );
160 context.register(
161 "unnecessary_reads",
162 "Number of unnecessary reads performed during key lookups",
163 unnecessary_reads.clone(),
164 );
165 context.register("gets", "Number of gets performed", gets.clone());
166 context.register("has", "Number of has performed", has.clone());
167 context.register("syncs", "Number of syncs called", syncs.clone());
168 let _ = items_tracked.try_set(indices.len());
169
170 Ok(Self {
172 items_per_section: cfg.items_per_section.get(),
173 journal,
174 pending: BTreeSet::new(),
175 oldest_allowed: None,
176 indices,
177 intervals,
178 keys,
179 items_tracked,
180 indices_pruned,
181 unnecessary_reads,
182 gets,
183 has,
184 syncs,
185 })
186 }
187
188 async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
189 self.gets.inc();
191
192 let offset = match self.indices.get(&index) {
194 Some(offset) => *offset,
195 None => return Ok(None),
196 };
197
198 let section = self.section(index);
200 let record = self.journal.get(section, offset).await?;
201 Ok(Some(record.value))
202 }
203
204 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
205 self.gets.inc();
207
208 let iter = self.keys.get(key);
210 let min_allowed = self.oldest_allowed.unwrap_or(0);
211 for index in iter {
212 if *index < min_allowed {
214 continue;
215 }
216
217 let offset = *self.indices.get(index).ok_or(Error::RecordCorrupted)?;
219 let section = self.section(*index);
220 let record = self.journal.get(section, offset).await?;
221
222 if record.key.as_ref() == key.as_ref() {
224 return Ok(Some(record.value));
225 }
226 self.unnecessary_reads.inc();
227 }
228
229 Ok(None)
230 }
231
232 fn has_index(&self, index: u64) -> bool {
233 self.indices.contains_key(&index)
235 }
236
237 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
243 let min = self.section(min);
245
246 if let Some(oldest_allowed) = self.oldest_allowed {
248 if min <= oldest_allowed {
249 return Ok(());
252 }
253 }
254 debug!(min, "pruning archive");
255
256 self.journal.prune(min).await.map_err(Error::Journal)?;
258
259 loop {
261 let next = match self.pending.iter().next() {
262 Some(section) if *section < min => *section,
263 _ => break,
264 };
265 self.pending.remove(&next);
266 }
267
268 loop {
270 let next = match self.indices.first_key_value() {
271 Some((index, _)) if *index < min => *index,
272 _ => break,
273 };
274 self.indices.remove(&next).unwrap();
275 self.indices_pruned.inc();
276 }
277
278 if min > 0 {
280 self.intervals.remove(0, min - 1);
281 }
282
283 self.oldest_allowed = Some(min);
286 let _ = self.items_tracked.try_set(self.indices.len());
287 Ok(())
288 }
289}
290
291impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> crate::archive::Archive
292 for Archive<T, E, K, V>
293{
294 type Key = K;
295 type Value = V;
296
297 async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
298 let oldest_allowed = self.oldest_allowed.unwrap_or(0);
300 if index < oldest_allowed {
301 return Err(Error::AlreadyPrunedTo(oldest_allowed));
302 }
303
304 if self.indices.contains_key(&index) {
306 return Ok(());
307 }
308
309 let record = Record::new(index, key.clone(), data);
311 let section = self.section(index);
312 let (offset, _) = self.journal.append(section, record).await?;
313
314 self.indices.insert(index, offset);
316
317 self.intervals.insert(index);
319
320 self.keys
322 .insert_and_prune(&key, index, |v| *v < oldest_allowed);
323
324 self.pending.insert(section);
326
327 let _ = self.items_tracked.try_set(self.indices.len());
329 Ok(())
330 }
331
332 async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
333 match identifier {
334 Identifier::Index(index) => self.get_index(index).await,
335 Identifier::Key(key) => self.get_key(key).await,
336 }
337 }
338
339 async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
340 self.has.inc();
341 match identifier {
342 Identifier::Index(index) => Ok(self.has_index(index)),
343 Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
344 }
345 }
346
347 async fn sync(&mut self) -> Result<(), Error> {
348 let mut syncs = Vec::with_capacity(self.pending.len());
349 for section in self.pending.iter() {
350 syncs.push(self.journal.sync(*section));
351 self.syncs.inc();
352 }
353 try_join_all(syncs).await?;
354 self.pending.clear();
355 Ok(())
356 }
357
358 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
359 self.intervals.next_gap(index)
360 }
361
362 fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
363 self.intervals.missing_items(index, max)
364 }
365
366 fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
367 self.intervals.iter().map(|(&s, &e)| (s, e))
368 }
369
370 fn first_index(&self) -> Option<u64> {
371 self.intervals.first_index()
372 }
373
374 fn last_index(&self) -> Option<u64> {
375 self.intervals.last_index()
376 }
377
378 async fn close(self) -> Result<(), Error> {
379 self.journal.close().await.map_err(Error::Journal)
380 }
381
382 async fn destroy(self) -> Result<(), Error> {
383 self.journal.destroy().await.map_err(Error::Journal)
384 }
385}
386
387#[cfg(all(test, feature = "arbitrary"))]
388mod conformance {
389 use super::*;
390 use commonware_codec::conformance::CodecConformance;
391 use commonware_utils::sequence::U64;
392
393 commonware_conformance::conformance_tests! {
394 CodecConformance<Record<U64, Vec<u8>>>
395 }
396}