1use super::{Config, Translator};
2use crate::{
3 archive::{Error, Identifier},
4 index::{unordered::Index, Unordered},
5 journal::segmented::oversized::{
6 Config as OversizedConfig, Oversized, Record as OversizedRecord,
7 },
8 rmap::RMap,
9};
10use commonware_codec::{CodecShared, FixedSize, Read, ReadExt, Write};
11use commonware_runtime::{
12 telemetry::metrics::status::GaugeExt, Buf, BufMut, BufferPooler, Metrics, Storage,
13};
14use commonware_utils::Array;
15use futures::{future::try_join_all, pin_mut, StreamExt};
16use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
17use std::collections::{btree_map, BTreeMap, BTreeSet};
18use tracing::debug;
19
20#[derive(Debug, Clone, PartialEq)]
22struct Record<K: Array> {
23 index: u64,
25 key: K,
27 value_offset: u64,
29 value_size: u32,
31}
32
33impl<K: Array> Record<K> {
34 const fn new(index: u64, key: K, value_offset: u64, value_size: u32) -> Self {
36 Self {
37 index,
38 key,
39 value_offset,
40 value_size,
41 }
42 }
43}
44
45impl<K: Array> Write for Record<K> {
46 fn write(&self, buf: &mut impl BufMut) {
47 self.index.write(buf);
48 self.key.write(buf);
49 self.value_offset.write(buf);
50 self.value_size.write(buf);
51 }
52}
53
54impl<K: Array> Read for Record<K> {
55 type Cfg = ();
56
57 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
58 let index = u64::read(buf)?;
59 let key = K::read(buf)?;
60 let value_offset = u64::read(buf)?;
61 let value_size = u32::read(buf)?;
62 Ok(Self {
63 index,
64 key,
65 value_offset,
66 value_size,
67 })
68 }
69}
70
71impl<K: Array> FixedSize for Record<K> {
72 const SIZE: usize = u64::SIZE + K::SIZE + u64::SIZE + u32::SIZE;
74}
75
76impl<K: Array> OversizedRecord for Record<K> {
77 fn value_location(&self) -> (u64, u32) {
78 (self.value_offset, self.value_size)
79 }
80
81 fn with_location(mut self, offset: u64, size: u32) -> Self {
82 self.value_offset = offset;
83 self.value_size = size;
84 self
85 }
86}
87
88#[cfg(feature = "arbitrary")]
89impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
90where
91 K: for<'a> arbitrary::Arbitrary<'a>,
92{
93 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
94 Ok(Self {
95 index: u64::arbitrary(u)?,
96 key: K::arbitrary(u)?,
97 value_offset: u64::arbitrary(u)?,
98 value_size: u32::arbitrary(u)?,
99 })
100 }
101}
102
103pub struct Archive<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared> {
105 items_per_section: u64,
106
107 oversized: Oversized<E, Record<K>, V>,
109
110 pending: BTreeSet<u64>,
111
112 oldest_allowed: Option<u64>,
114
115 keys: Index<T, u64>,
117
118 indices: BTreeMap<u64, u64>,
120
121 extra_indices: BTreeMap<u64, Vec<u64>>,
124
125 intervals: RMap,
127
128 items_tracked: Gauge,
130 indices_pruned: Counter,
131 unnecessary_reads: Counter,
132 gets: Counter,
133 has: Counter,
134 syncs: Counter,
135}
136
137impl<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared>
138 Archive<T, E, K, V>
139{
140 const fn section(&self, index: u64) -> u64 {
142 (index / self.items_per_section) * self.items_per_section
143 }
144
145 fn iter_positions(&self, index: u64) -> impl Iterator<Item = u64> + '_ {
147 self.indices.get(&index).into_iter().copied().chain(
148 self.extra_indices
149 .get(&index)
150 .into_iter()
151 .flat_map(|v| v.iter().copied()),
152 )
153 }
154
155 pub async fn init(context: E, cfg: Config<T, V::Cfg>) -> Result<Self, Error> {
160 let oversized_cfg = OversizedConfig {
162 index_partition: cfg.key_partition,
163 value_partition: cfg.value_partition,
164 index_page_cache: cfg.key_page_cache,
165 index_write_buffer: cfg.key_write_buffer,
166 value_write_buffer: cfg.value_write_buffer,
167 compression: cfg.compression,
168 codec_config: cfg.codec_config,
169 };
170 let oversized: Oversized<E, Record<K>, V> =
171 Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
172
173 let mut indices: BTreeMap<u64, u64> = BTreeMap::new();
175 let mut extra_indices: BTreeMap<u64, Vec<u64>> = BTreeMap::new();
176 let mut keys = Index::new(context.with_label("index"), cfg.translator.clone());
177 let mut intervals = RMap::new();
178 {
179 debug!("initializing archive from index journal");
180 let stream = oversized.replay(0, 0, cfg.replay_buffer).await?;
181 pin_mut!(stream);
182 while let Some(result) = stream.next().await {
183 let (_section, position, entry) = result?;
184
185 match indices.entry(entry.index) {
187 btree_map::Entry::Vacant(e) => {
188 e.insert(position);
189 }
190 btree_map::Entry::Occupied(_) => {
191 extra_indices.entry(entry.index).or_default().push(position);
192 }
193 }
194
195 keys.insert(&entry.key, entry.index);
197
198 intervals.insert(entry.index);
200 }
201 debug!("archive initialized");
202 }
203
204 let items_tracked = Gauge::default();
206 let indices_pruned = Counter::default();
207 let unnecessary_reads = Counter::default();
208 let gets = Counter::default();
209 let has = Counter::default();
210 let syncs = Counter::default();
211 context.register(
212 "items_tracked",
213 "Number of items tracked",
214 items_tracked.clone(),
215 );
216 context.register(
217 "indices_pruned",
218 "Number of indices pruned",
219 indices_pruned.clone(),
220 );
221 context.register(
222 "unnecessary_reads",
223 "Number of unnecessary reads performed during key lookups",
224 unnecessary_reads.clone(),
225 );
226 context.register("gets", "Number of gets performed", gets.clone());
227 context.register("has", "Number of has performed", has.clone());
228 context.register("syncs", "Number of syncs called", syncs.clone());
229 let _ = items_tracked.try_set(indices.len());
230
231 Ok(Self {
233 items_per_section: cfg.items_per_section.get(),
234 oversized,
235 pending: BTreeSet::new(),
236 oldest_allowed: None,
237 indices,
238 extra_indices,
239 intervals,
240 keys,
241 items_tracked,
242 indices_pruned,
243 unnecessary_reads,
244 gets,
245 has,
246 syncs,
247 })
248 }
249
250 async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
251 self.gets.inc();
253
254 let position = match self.indices.get(&index) {
256 Some(&position) => position,
257 None => return Ok(None),
258 };
259
260 let section = self.section(index);
262 let entry = self.oversized.get(section, position).await?;
263 let (value_offset, value_size) = entry.value_location();
264
265 let value = self
267 .oversized
268 .get_value(section, value_offset, value_size)
269 .await?;
270 Ok(Some(value))
271 }
272
273 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
274 self.gets.inc();
276
277 let iter = self.keys.get(key);
279 let min_allowed = self.oldest_allowed.unwrap_or(0);
280 for index in iter {
281 if *index < min_allowed {
283 continue;
284 }
285
286 if !self.indices.contains_key(index) {
288 return Err(Error::RecordCorrupted);
289 }
290 let section = self.section(*index);
291
292 for position in self.iter_positions(*index) {
293 let entry = self.oversized.get(section, position).await?;
295
296 if entry.key.as_ref() == key.as_ref() {
298 let (value_offset, value_size) = entry.value_location();
300 let value = self
301 .oversized
302 .get_value(section, value_offset, value_size)
303 .await?;
304 return Ok(Some(value));
305 }
306 self.unnecessary_reads.inc();
307 }
308 }
309
310 Ok(None)
311 }
312
313 fn has_index(&self, index: u64) -> bool {
314 self.indices.contains_key(&index)
316 }
317
318 async fn put_internal(
319 &mut self,
320 index: u64,
321 key: K,
322 data: V,
323 skip_if_index_exists: bool,
324 ) -> Result<(), Error> {
325 let oldest_allowed = self.oldest_allowed.unwrap_or(0);
327 if index < oldest_allowed {
328 return Err(Error::AlreadyPrunedTo(oldest_allowed));
329 }
330
331 if skip_if_index_exists && self.indices.contains_key(&index) {
333 return Ok(());
334 }
335
336 let section = self.section(index);
338 let entry = Record::new(index, key.clone(), 0, 0);
339 let (position, _, _) = self.oversized.append(section, entry, &data).await?;
340
341 match self.indices.entry(index) {
343 btree_map::Entry::Vacant(e) => {
344 e.insert(position);
345 }
346 btree_map::Entry::Occupied(_) => {
347 self.extra_indices.entry(index).or_default().push(position);
348 }
349 }
350
351 self.intervals.insert(index);
353
354 self.keys
356 .insert_and_prune(&key, index, |v| *v < oldest_allowed);
357
358 self.pending.insert(section);
360
361 let _ = self.items_tracked.try_set(self.indices.len());
363 Ok(())
364 }
365
366 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
372 let min = self.section(min);
374
375 if let Some(oldest_allowed) = self.oldest_allowed {
377 if min <= oldest_allowed {
378 return Ok(());
381 }
382 }
383 debug!(min, "pruning archive");
384
385 self.oversized.prune(min).await?;
387
388 loop {
390 let next = match self.pending.iter().next() {
391 Some(section) if *section < min => *section,
392 _ => break,
393 };
394 self.pending.remove(&next);
395 }
396
397 loop {
399 let next = match self.indices.first_key_value() {
400 Some((index, _)) if *index < min => *index,
401 _ => break,
402 };
403 self.indices.remove(&next).unwrap();
404 self.extra_indices.remove(&next);
405 self.indices_pruned.inc();
406 }
407
408 if min > 0 {
410 self.intervals.remove(0, min - 1);
411 }
412
413 self.oldest_allowed = Some(min);
415 let _ = self.items_tracked.try_set(self.indices.len());
416 Ok(())
417 }
418}
419
420impl<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared>
421 crate::archive::Archive for Archive<T, E, K, V>
422{
423 type Key = K;
424 type Value = V;
425
426 async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
427 self.put_internal(index, key, data, true).await
428 }
429
430 async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
431 match identifier {
432 Identifier::Index(index) => self.get_index(index).await,
433 Identifier::Key(key) => self.get_key(key).await,
434 }
435 }
436
437 async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
438 self.has.inc();
439 match identifier {
440 Identifier::Index(index) => Ok(self.has_index(index)),
441 Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
442 }
443 }
444
445 async fn sync(&mut self) -> Result<(), Error> {
446 let pending: Vec<u64> = self.pending.iter().copied().collect();
448 self.syncs.inc_by(pending.len() as u64);
449
450 let syncs: Vec<_> = pending.iter().map(|s| self.oversized.sync(*s)).collect();
452 try_join_all(syncs).await?;
453
454 self.pending.clear();
455 Ok(())
456 }
457
458 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
459 self.intervals.next_gap(index)
460 }
461
462 fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
463 self.intervals.missing_items(index, max)
464 }
465
466 fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
467 self.intervals.iter().map(|(&s, &e)| (s, e))
468 }
469
470 fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)> {
471 self.intervals.iter_from(from).map(|(&s, &e)| (s, e))
472 }
473
474 fn first_index(&self) -> Option<u64> {
475 self.intervals.first_index()
476 }
477
478 fn last_index(&self) -> Option<u64> {
479 self.intervals.last_index()
480 }
481
482 async fn destroy(self) -> Result<(), Error> {
483 Ok(self.oversized.destroy().await?)
484 }
485}
486
487impl<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared>
488 crate::archive::MultiArchive for Archive<T, E, K, V>
489{
490 async fn get_all(&self, index: u64) -> Result<Option<Vec<V>>, Error> {
491 self.gets.inc();
493
494 if !self.indices.contains_key(&index) {
496 return Ok(None);
497 }
498
499 let section = self.section(index);
501 let extra_count = self.extra_indices.get(&index).map_or(0, Vec::len);
502
503 let mut values = Vec::with_capacity(1 + extra_count);
504 for position in self.iter_positions(index) {
505 let entry = self.oversized.get(section, position).await?;
507
508 let (value_offset, value_size) = entry.value_location();
510 let value = self
511 .oversized
512 .get_value(section, value_offset, value_size)
513 .await?;
514 values.push(value);
515 }
516 Ok(Some(values))
517 }
518
519 async fn put_multi(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
520 self.put_internal(index, key, data, false).await
521 }
522}
523
524#[cfg(all(test, feature = "arbitrary"))]
525mod conformance {
526 use super::*;
527 use commonware_codec::conformance::CodecConformance;
528 use commonware_utils::sequence::U64;
529
530 commonware_conformance::conformance_tests! {
531 CodecConformance<Record<U64>>
532 }
533}