1use super::{Config, Translator};
2use crate::{
3 archive::{Error, Identifier},
4 index::{unordered::Index, Unordered},
5 journal::segmented::oversized::{
6 Config as OversizedConfig, Oversized, Record as OversizedRecord,
7 },
8 rmap::RMap,
9};
10use commonware_codec::{CodecShared, FixedSize, Read, ReadExt, Write};
11use commonware_runtime::{telemetry::metrics::status::GaugeExt, Buf, BufMut, Metrics, Storage};
12use commonware_utils::Array;
13use futures::{future::try_join_all, pin_mut, StreamExt};
14use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
15use std::collections::{BTreeMap, BTreeSet};
16use tracing::debug;
17
18#[derive(Debug, Clone, PartialEq)]
20struct Record<K: Array> {
21 index: u64,
23 key: K,
25 value_offset: u64,
27 value_size: u32,
29}
30
31impl<K: Array> Record<K> {
32 const fn new(index: u64, key: K, value_offset: u64, value_size: u32) -> Self {
34 Self {
35 index,
36 key,
37 value_offset,
38 value_size,
39 }
40 }
41}
42
43impl<K: Array> Write for Record<K> {
44 fn write(&self, buf: &mut impl BufMut) {
45 self.index.write(buf);
46 self.key.write(buf);
47 self.value_offset.write(buf);
48 self.value_size.write(buf);
49 }
50}
51
52impl<K: Array> Read for Record<K> {
53 type Cfg = ();
54
55 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
56 let index = u64::read(buf)?;
57 let key = K::read(buf)?;
58 let value_offset = u64::read(buf)?;
59 let value_size = u32::read(buf)?;
60 Ok(Self {
61 index,
62 key,
63 value_offset,
64 value_size,
65 })
66 }
67}
68
69impl<K: Array> FixedSize for Record<K> {
70 const SIZE: usize = u64::SIZE + K::SIZE + u64::SIZE + u32::SIZE;
72}
73
74impl<K: Array> OversizedRecord for Record<K> {
75 fn value_location(&self) -> (u64, u32) {
76 (self.value_offset, self.value_size)
77 }
78
79 fn with_location(mut self, offset: u64, size: u32) -> Self {
80 self.value_offset = offset;
81 self.value_size = size;
82 self
83 }
84}
85
86#[cfg(feature = "arbitrary")]
87impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
88where
89 K: for<'a> arbitrary::Arbitrary<'a>,
90{
91 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
92 Ok(Self {
93 index: u64::arbitrary(u)?,
94 key: K::arbitrary(u)?,
95 value_offset: u64::arbitrary(u)?,
96 value_size: u32::arbitrary(u)?,
97 })
98 }
99}
100
101pub struct Archive<T: Translator, E: Storage + Metrics, K: Array, V: CodecShared> {
103 items_per_section: u64,
104
105 oversized: Oversized<E, Record<K>, V>,
107
108 pending: BTreeSet<u64>,
109
110 oldest_allowed: Option<u64>,
112
113 keys: Index<T, u64>,
115
116 indices: BTreeMap<u64, u64>,
118
119 intervals: RMap,
121
122 items_tracked: Gauge,
124 indices_pruned: Counter,
125 unnecessary_reads: Counter,
126 gets: Counter,
127 has: Counter,
128 syncs: Counter,
129}
130
131impl<T: Translator, E: Storage + Metrics, K: Array, V: CodecShared> Archive<T, E, K, V> {
132 const fn section(&self, index: u64) -> u64 {
134 (index / self.items_per_section) * self.items_per_section
135 }
136
137 pub async fn init(context: E, cfg: Config<T, V::Cfg>) -> Result<Self, Error> {
142 let oversized_cfg = OversizedConfig {
144 index_partition: cfg.key_partition,
145 value_partition: cfg.value_partition,
146 index_page_cache: cfg.key_page_cache,
147 index_write_buffer: cfg.key_write_buffer,
148 value_write_buffer: cfg.value_write_buffer,
149 compression: cfg.compression,
150 codec_config: cfg.codec_config,
151 };
152 let oversized: Oversized<E, Record<K>, V> =
153 Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
154
155 let mut indices = BTreeMap::new();
157 let mut keys = Index::new(context.with_label("index"), cfg.translator.clone());
158 let mut intervals = RMap::new();
159 {
160 debug!("initializing archive from index journal");
161 let stream = oversized.replay(0, 0, cfg.replay_buffer).await?;
162 pin_mut!(stream);
163 while let Some(result) = stream.next().await {
164 let (_section, position, entry) = result?;
165
166 indices.insert(entry.index, position);
168
169 keys.insert(&entry.key, entry.index);
171
172 intervals.insert(entry.index);
174 }
175 debug!("archive initialized");
176 }
177
178 let items_tracked = Gauge::default();
180 let indices_pruned = Counter::default();
181 let unnecessary_reads = Counter::default();
182 let gets = Counter::default();
183 let has = Counter::default();
184 let syncs = Counter::default();
185 context.register(
186 "items_tracked",
187 "Number of items tracked",
188 items_tracked.clone(),
189 );
190 context.register(
191 "indices_pruned",
192 "Number of indices pruned",
193 indices_pruned.clone(),
194 );
195 context.register(
196 "unnecessary_reads",
197 "Number of unnecessary reads performed during key lookups",
198 unnecessary_reads.clone(),
199 );
200 context.register("gets", "Number of gets performed", gets.clone());
201 context.register("has", "Number of has performed", has.clone());
202 context.register("syncs", "Number of syncs called", syncs.clone());
203 let _ = items_tracked.try_set(indices.len());
204
205 Ok(Self {
207 items_per_section: cfg.items_per_section.get(),
208 oversized,
209 pending: BTreeSet::new(),
210 oldest_allowed: None,
211 indices,
212 intervals,
213 keys,
214 items_tracked,
215 indices_pruned,
216 unnecessary_reads,
217 gets,
218 has,
219 syncs,
220 })
221 }
222
223 async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
224 self.gets.inc();
226
227 let position = match self.indices.get(&index) {
229 Some(pos) => *pos,
230 None => return Ok(None),
231 };
232
233 let section = self.section(index);
235 let entry = self.oversized.get(section, position).await?;
236 let (value_offset, value_size) = entry.value_location();
237
238 let value = self
240 .oversized
241 .get_value(section, value_offset, value_size)
242 .await?;
243 Ok(Some(value))
244 }
245
246 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
247 self.gets.inc();
249
250 let iter = self.keys.get(key);
252 let min_allowed = self.oldest_allowed.unwrap_or(0);
253 for index in iter {
254 if *index < min_allowed {
256 continue;
257 }
258
259 let position = *self.indices.get(index).ok_or(Error::RecordCorrupted)?;
261
262 let section = self.section(*index);
264 let entry = self.oversized.get(section, position).await?;
265
266 if entry.key.as_ref() == key.as_ref() {
268 let (value_offset, value_size) = entry.value_location();
270 let value = self
271 .oversized
272 .get_value(section, value_offset, value_size)
273 .await?;
274 return Ok(Some(value));
275 }
276 self.unnecessary_reads.inc();
277 }
278
279 Ok(None)
280 }
281
282 fn has_index(&self, index: u64) -> bool {
283 self.indices.contains_key(&index)
285 }
286
287 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
293 let min = self.section(min);
295
296 if let Some(oldest_allowed) = self.oldest_allowed {
298 if min <= oldest_allowed {
299 return Ok(());
302 }
303 }
304 debug!(min, "pruning archive");
305
306 self.oversized.prune(min).await?;
308
309 loop {
311 let next = match self.pending.iter().next() {
312 Some(section) if *section < min => *section,
313 _ => break,
314 };
315 self.pending.remove(&next);
316 }
317
318 loop {
320 let next = match self.indices.first_key_value() {
321 Some((index, _)) if *index < min => *index,
322 _ => break,
323 };
324 self.indices.remove(&next).unwrap();
325 self.indices_pruned.inc();
326 }
327
328 if min > 0 {
330 self.intervals.remove(0, min - 1);
331 }
332
333 self.oldest_allowed = Some(min);
335 let _ = self.items_tracked.try_set(self.indices.len());
336 Ok(())
337 }
338}
339
340impl<T: Translator, E: Storage + Metrics, K: Array, V: CodecShared> crate::archive::Archive
341 for Archive<T, E, K, V>
342{
343 type Key = K;
344 type Value = V;
345
346 async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
347 let oldest_allowed = self.oldest_allowed.unwrap_or(0);
349 if index < oldest_allowed {
350 return Err(Error::AlreadyPrunedTo(oldest_allowed));
351 }
352
353 if self.indices.contains_key(&index) {
355 return Ok(());
356 }
357
358 let section = self.section(index);
360 let entry = Record::new(index, key.clone(), 0, 0);
361 let (position, _, _) = self.oversized.append(section, entry, &data).await?;
362
363 self.indices.insert(index, position);
365
366 self.intervals.insert(index);
368
369 self.keys
371 .insert_and_prune(&key, index, |v| *v < oldest_allowed);
372
373 self.pending.insert(section);
375
376 let _ = self.items_tracked.try_set(self.indices.len());
378 Ok(())
379 }
380
381 async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
382 match identifier {
383 Identifier::Index(index) => self.get_index(index).await,
384 Identifier::Key(key) => self.get_key(key).await,
385 }
386 }
387
388 async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
389 self.has.inc();
390 match identifier {
391 Identifier::Index(index) => Ok(self.has_index(index)),
392 Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
393 }
394 }
395
396 async fn sync(&mut self) -> Result<(), Error> {
397 let pending: Vec<u64> = self.pending.iter().copied().collect();
399 self.syncs.inc_by(pending.len() as u64);
400
401 let syncs: Vec<_> = pending.iter().map(|s| self.oversized.sync(*s)).collect();
403 try_join_all(syncs).await?;
404
405 self.pending.clear();
406 Ok(())
407 }
408
409 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
410 self.intervals.next_gap(index)
411 }
412
413 fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
414 self.intervals.missing_items(index, max)
415 }
416
417 fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
418 self.intervals.iter().map(|(&s, &e)| (s, e))
419 }
420
421 fn first_index(&self) -> Option<u64> {
422 self.intervals.first_index()
423 }
424
425 fn last_index(&self) -> Option<u64> {
426 self.intervals.last_index()
427 }
428
429 async fn destroy(self) -> Result<(), Error> {
430 Ok(self.oversized.destroy().await?)
431 }
432}
433
434#[cfg(all(test, feature = "arbitrary"))]
435mod conformance {
436 use super::*;
437 use commonware_codec::conformance::CodecConformance;
438 use commonware_utils::sequence::U64;
439
440 commonware_conformance::conformance_tests! {
441 CodecConformance<Record<U64>>
442 }
443}