1use super::{Config, Translator};
2use crate::{
3 archive::{Error, Identifier},
4 index::Index,
5 journal::variable::{Config as JConfig, Journal},
6 rmap::RMap,
7};
8use bytes::{Buf, BufMut};
9use commonware_codec::{varint::UInt, Codec, EncodeSize, Read, ReadExt, Write};
10use commonware_runtime::{Metrics, Storage};
11use commonware_utils::Array;
12use futures::{future::try_join_all, pin_mut, StreamExt};
13use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
14use std::collections::{BTreeMap, BTreeSet};
15use tracing::debug;
16
17struct Location {
19 offset: u32,
20 len: u32,
21}
22
23struct Record<K: Array, V: Codec> {
25 index: u64,
26 key: K,
27 value: V,
28}
29
30impl<K: Array, V: Codec> Record<K, V> {
31 fn new(index: u64, key: K, value: V) -> Self {
33 Self { index, key, value }
34 }
35}
36
37impl<K: Array, V: Codec> Write for Record<K, V> {
38 fn write(&self, buf: &mut impl BufMut) {
39 UInt(self.index).write(buf);
40 self.key.write(buf);
41 self.value.write(buf);
42 }
43}
44
45impl<K: Array, V: Codec> Read for Record<K, V> {
46 type Cfg = V::Cfg;
47
48 fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
49 let index = UInt::read(buf)?.into();
50 let key = K::read(buf)?;
51 let value = V::read_cfg(buf, cfg)?;
52 Ok(Self { index, key, value })
53 }
54}
55
56impl<K: Array, V: Codec> EncodeSize for Record<K, V> {
57 fn encode_size(&self) -> usize {
58 UInt(self.index).encode_size() + K::SIZE + self.value.encode_size()
59 }
60}
61
62pub struct Archive<T: Translator, E: Storage + Metrics, K: Array, V: Codec> {
64 items_per_section: u64,
65 journal: Journal<E, Record<K, V>>,
66 pending: BTreeSet<u64>,
67
68 oldest_allowed: Option<u64>,
70
71 keys: Index<T, u64>,
75 indices: BTreeMap<u64, Location>,
76 intervals: RMap,
77
78 items_tracked: Gauge,
79 indices_pruned: Counter,
80 unnecessary_reads: Counter,
81 gets: Counter,
82 has: Counter,
83 syncs: Counter,
84}
85
86impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> Archive<T, E, K, V> {
87 fn section(&self, index: u64) -> u64 {
89 (index / self.items_per_section) * self.items_per_section
90 }
91
92 pub async fn init(context: E, cfg: Config<T, V::Cfg>) -> Result<Self, Error> {
97 let journal = Journal::<E, Record<K, V>>::init(
99 context.with_label("journal"),
100 JConfig {
101 partition: cfg.partition,
102 compression: cfg.compression,
103 codec_config: cfg.codec_config,
104 buffer_pool: cfg.buffer_pool,
105 write_buffer: cfg.write_buffer,
106 },
107 )
108 .await?;
109
110 let mut indices = BTreeMap::new();
112 let mut keys = Index::init(context.with_label("index"), cfg.translator.clone());
113 let mut intervals = RMap::new();
114 {
115 debug!("initializing archive");
116 let stream = journal.replay(0, 0, cfg.replay_buffer).await?;
117 pin_mut!(stream);
118 while let Some(result) = stream.next().await {
119 let (_, offset, len, data) = result?;
121
122 indices.insert(data.index, Location { offset, len });
124
125 keys.insert(&data.key, data.index);
127
128 intervals.insert(data.index);
130 }
131 debug!(keys = keys.keys(), "archive initialized");
132 }
133
134 let items_tracked = Gauge::default();
136 let indices_pruned = Counter::default();
137 let unnecessary_reads = Counter::default();
138 let gets = Counter::default();
139 let has = Counter::default();
140 let syncs = Counter::default();
141 context.register(
142 "items_tracked",
143 "Number of items tracked",
144 items_tracked.clone(),
145 );
146 context.register(
147 "indices_pruned",
148 "Number of indices pruned",
149 indices_pruned.clone(),
150 );
151 context.register(
152 "unnecessary_reads",
153 "Number of unnecessary reads performed during key lookups",
154 unnecessary_reads.clone(),
155 );
156 context.register("gets", "Number of gets performed", gets.clone());
157 context.register("has", "Number of has performed", has.clone());
158 context.register("syncs", "Number of syncs called", syncs.clone());
159 items_tracked.set(indices.len() as i64);
160
161 Ok(Self {
163 items_per_section: cfg.items_per_section.get(),
164 journal,
165 pending: BTreeSet::new(),
166 oldest_allowed: None,
167 indices,
168 intervals,
169 keys,
170 items_tracked,
171 indices_pruned,
172 unnecessary_reads,
173 gets,
174 has,
175 syncs,
176 })
177 }
178
179 async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
180 self.gets.inc();
182
183 let location = match self.indices.get(&index) {
185 Some(offset) => offset,
186 None => return Ok(None),
187 };
188
189 let section = self.section(index);
191 let record = self
192 .journal
193 .get_exact(section, location.offset, location.len)
194 .await?;
195 Ok(Some(record.value))
196 }
197
198 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
199 self.gets.inc();
201
202 let iter = self.keys.get(key);
204 let min_allowed = self.oldest_allowed.unwrap_or(0);
205 for index in iter {
206 if *index < min_allowed {
208 continue;
209 }
210
211 let location = self.indices.get(index).ok_or(Error::RecordCorrupted)?;
213 let section = self.section(*index);
214 let record = self
215 .journal
216 .get_exact(section, location.offset, location.len)
217 .await?;
218
219 if record.key.as_ref() == key.as_ref() {
221 return Ok(Some(record.value));
222 }
223 self.unnecessary_reads.inc();
224 }
225
226 Ok(None)
227 }
228
229 fn has_index(&self, index: u64) -> bool {
230 self.indices.contains_key(&index)
232 }
233
234 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
240 let min = self.section(min);
242
243 if let Some(oldest_allowed) = self.oldest_allowed {
245 if min <= oldest_allowed {
246 return Ok(());
249 }
250 }
251 debug!(min, "pruning archive");
252
253 self.journal.prune(min).await.map_err(Error::Journal)?;
255
256 loop {
258 let next = match self.pending.iter().next() {
259 Some(section) if *section < min => *section,
260 _ => break,
261 };
262 self.pending.remove(&next);
263 }
264
265 loop {
267 let next = match self.indices.first_key_value() {
268 Some((index, _)) if *index < min => *index,
269 _ => break,
270 };
271 self.indices.remove(&next).unwrap();
272 self.indices_pruned.inc();
273 }
274
275 if min > 0 {
277 self.intervals.remove(0, min - 1);
278 }
279
280 self.oldest_allowed = Some(min);
283 self.items_tracked.set(self.indices.len() as i64);
284 Ok(())
285 }
286}
287
288impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> crate::archive::Archive
289 for Archive<T, E, K, V>
290{
291 type Key = K;
292 type Value = V;
293
294 async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
295 let oldest_allowed = self.oldest_allowed.unwrap_or(0);
297 if index < oldest_allowed {
298 return Err(Error::AlreadyPrunedTo(oldest_allowed));
299 }
300
301 if self.indices.contains_key(&index) {
303 return Ok(());
304 }
305
306 let record = Record::new(index, key.clone(), data);
308 let section = self.section(index);
309 let (offset, len) = self.journal.append(section, record).await?;
310
311 self.indices.insert(index, Location { offset, len });
313
314 self.intervals.insert(index);
316
317 self.keys
319 .insert_and_prune(&key, index, |v| *v < oldest_allowed);
320
321 self.pending.insert(section);
323
324 self.items_tracked.set(self.indices.len() as i64);
326 Ok(())
327 }
328
329 async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
330 match identifier {
331 Identifier::Index(index) => self.get_index(index).await,
332 Identifier::Key(key) => self.get_key(key).await,
333 }
334 }
335
336 async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
337 self.has.inc();
338 match identifier {
339 Identifier::Index(index) => Ok(self.has_index(index)),
340 Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
341 }
342 }
343
344 async fn sync(&mut self) -> Result<(), Error> {
345 let mut syncs = Vec::with_capacity(self.pending.len());
346 for section in self.pending.iter() {
347 syncs.push(self.journal.sync(*section));
348 self.syncs.inc();
349 }
350 try_join_all(syncs).await?;
351 self.pending.clear();
352 Ok(())
353 }
354
355 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
356 self.intervals.next_gap(index)
357 }
358
359 async fn close(self) -> Result<(), Error> {
360 self.journal.close().await.map_err(Error::Journal)
361 }
362
363 async fn destroy(self) -> Result<(), Error> {
364 self.journal.destroy().await.map_err(Error::Journal)
365 }
366}