use super::{Config, Translator};
use crate::{
archive::{Error, Identifier},
index::{unordered::Index, Unordered},
journal::segmented::oversized::{
Config as OversizedConfig, Oversized, Record as OversizedRecord,
},
rmap::RMap,
};
use commonware_codec::{CodecShared, FixedSize, Read, ReadExt, Write};
use commonware_runtime::{
telemetry::metrics::status::GaugeExt, Buf, BufMut, BufferPooler, Metrics, Storage,
};
use commonware_utils::Array;
use futures::{future::try_join_all, pin_mut, StreamExt};
use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
use std::collections::{btree_map, BTreeMap, BTreeSet};
use tracing::debug;
#[derive(Debug, Clone, PartialEq)]
struct Record<K: Array> {
index: u64,
key: K,
value_offset: u64,
value_size: u32,
}
impl<K: Array> Record<K> {
const fn new(index: u64, key: K, value_offset: u64, value_size: u32) -> Self {
Self {
index,
key,
value_offset,
value_size,
}
}
}
impl<K: Array> Write for Record<K> {
fn write(&self, buf: &mut impl BufMut) {
self.index.write(buf);
self.key.write(buf);
self.value_offset.write(buf);
self.value_size.write(buf);
}
}
impl<K: Array> Read for Record<K> {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
let index = u64::read(buf)?;
let key = K::read(buf)?;
let value_offset = u64::read(buf)?;
let value_size = u32::read(buf)?;
Ok(Self {
index,
key,
value_offset,
value_size,
})
}
}
impl<K: Array> FixedSize for Record<K> {
const SIZE: usize = u64::SIZE + K::SIZE + u64::SIZE + u32::SIZE;
}
impl<K: Array> OversizedRecord for Record<K> {
fn value_location(&self) -> (u64, u32) {
(self.value_offset, self.value_size)
}
fn with_location(mut self, offset: u64, size: u32) -> Self {
self.value_offset = offset;
self.value_size = size;
self
}
}
#[cfg(feature = "arbitrary")]
impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
where
K: for<'a> arbitrary::Arbitrary<'a>,
{
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
Ok(Self {
index: u64::arbitrary(u)?,
key: K::arbitrary(u)?,
value_offset: u64::arbitrary(u)?,
value_size: u32::arbitrary(u)?,
})
}
}
pub struct Archive<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared> {
items_per_section: u64,
oversized: Oversized<E, Record<K>, V>,
pending: BTreeSet<u64>,
oldest_allowed: Option<u64>,
keys: Index<T, u64>,
indices: BTreeMap<u64, u64>,
extra_indices: BTreeMap<u64, Vec<u64>>,
intervals: RMap,
items_tracked: Gauge,
indices_pruned: Counter,
unnecessary_reads: Counter,
gets: Counter,
has: Counter,
syncs: Counter,
}
impl<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared>
Archive<T, E, K, V>
{
const fn section(&self, index: u64) -> u64 {
(index / self.items_per_section) * self.items_per_section
}
fn iter_positions(&self, index: u64) -> impl Iterator<Item = u64> + '_ {
self.indices.get(&index).into_iter().copied().chain(
self.extra_indices
.get(&index)
.into_iter()
.flat_map(|v| v.iter().copied()),
)
}
pub async fn init(context: E, cfg: Config<T, V::Cfg>) -> Result<Self, Error> {
let oversized_cfg = OversizedConfig {
index_partition: cfg.key_partition,
value_partition: cfg.value_partition,
index_page_cache: cfg.key_page_cache,
index_write_buffer: cfg.key_write_buffer,
value_write_buffer: cfg.value_write_buffer,
compression: cfg.compression,
codec_config: cfg.codec_config,
};
let oversized: Oversized<E, Record<K>, V> =
Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
let mut indices: BTreeMap<u64, u64> = BTreeMap::new();
let mut extra_indices: BTreeMap<u64, Vec<u64>> = BTreeMap::new();
let mut keys = Index::new(context.with_label("index"), cfg.translator.clone());
let mut intervals = RMap::new();
{
debug!("initializing archive from index journal");
let stream = oversized.replay(0, 0, cfg.replay_buffer).await?;
pin_mut!(stream);
while let Some(result) = stream.next().await {
let (_section, position, entry) = result?;
match indices.entry(entry.index) {
btree_map::Entry::Vacant(e) => {
e.insert(position);
}
btree_map::Entry::Occupied(_) => {
extra_indices.entry(entry.index).or_default().push(position);
}
}
keys.insert(&entry.key, entry.index);
intervals.insert(entry.index);
}
debug!("archive initialized");
}
let items_tracked = Gauge::default();
let indices_pruned = Counter::default();
let unnecessary_reads = Counter::default();
let gets = Counter::default();
let has = Counter::default();
let syncs = Counter::default();
context.register(
"items_tracked",
"Number of items tracked",
items_tracked.clone(),
);
context.register(
"indices_pruned",
"Number of indices pruned",
indices_pruned.clone(),
);
context.register(
"unnecessary_reads",
"Number of unnecessary reads performed during key lookups",
unnecessary_reads.clone(),
);
context.register("gets", "Number of gets performed", gets.clone());
context.register("has", "Number of has performed", has.clone());
context.register("syncs", "Number of syncs called", syncs.clone());
let _ = items_tracked.try_set(indices.len());
Ok(Self {
items_per_section: cfg.items_per_section.get(),
oversized,
pending: BTreeSet::new(),
oldest_allowed: None,
indices,
extra_indices,
intervals,
keys,
items_tracked,
indices_pruned,
unnecessary_reads,
gets,
has,
syncs,
})
}
async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
self.gets.inc();
let position = match self.indices.get(&index) {
Some(&position) => position,
None => return Ok(None),
};
let section = self.section(index);
let entry = self.oversized.get(section, position).await?;
let (value_offset, value_size) = entry.value_location();
let value = self
.oversized
.get_value(section, value_offset, value_size)
.await?;
Ok(Some(value))
}
async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
self.gets.inc();
let iter = self.keys.get(key);
let min_allowed = self.oldest_allowed.unwrap_or(0);
for index in iter {
if *index < min_allowed {
continue;
}
if !self.indices.contains_key(index) {
return Err(Error::RecordCorrupted);
}
let section = self.section(*index);
for position in self.iter_positions(*index) {
let entry = self.oversized.get(section, position).await?;
if entry.key.as_ref() == key.as_ref() {
let (value_offset, value_size) = entry.value_location();
let value = self
.oversized
.get_value(section, value_offset, value_size)
.await?;
return Ok(Some(value));
}
self.unnecessary_reads.inc();
}
}
Ok(None)
}
fn has_index(&self, index: u64) -> bool {
self.indices.contains_key(&index)
}
async fn put_internal(
&mut self,
index: u64,
key: K,
data: V,
skip_if_index_exists: bool,
) -> Result<(), Error> {
let oldest_allowed = self.oldest_allowed.unwrap_or(0);
if index < oldest_allowed {
return Err(Error::AlreadyPrunedTo(oldest_allowed));
}
if skip_if_index_exists && self.indices.contains_key(&index) {
return Ok(());
}
let section = self.section(index);
let entry = Record::new(index, key.clone(), 0, 0);
let (position, _, _) = self.oversized.append(section, entry, &data).await?;
match self.indices.entry(index) {
btree_map::Entry::Vacant(e) => {
e.insert(position);
}
btree_map::Entry::Occupied(_) => {
self.extra_indices.entry(index).or_default().push(position);
}
}
self.intervals.insert(index);
self.keys
.insert_and_prune(&key, index, |v| *v < oldest_allowed);
self.pending.insert(section);
let _ = self.items_tracked.try_set(self.indices.len());
Ok(())
}
pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
let min = self.section(min);
if let Some(oldest_allowed) = self.oldest_allowed {
if min <= oldest_allowed {
return Ok(());
}
}
debug!(min, "pruning archive");
self.oversized.prune(min).await?;
loop {
let next = match self.pending.iter().next() {
Some(section) if *section < min => *section,
_ => break,
};
self.pending.remove(&next);
}
loop {
let next = match self.indices.first_key_value() {
Some((index, _)) if *index < min => *index,
_ => break,
};
self.indices.remove(&next).unwrap();
self.extra_indices.remove(&next);
self.indices_pruned.inc();
}
if min > 0 {
self.intervals.remove(0, min - 1);
}
self.oldest_allowed = Some(min);
let _ = self.items_tracked.try_set(self.indices.len());
Ok(())
}
}
impl<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared>
crate::archive::Archive for Archive<T, E, K, V>
{
type Key = K;
type Value = V;
async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
self.put_internal(index, key, data, true).await
}
async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
match identifier {
Identifier::Index(index) => self.get_index(index).await,
Identifier::Key(key) => self.get_key(key).await,
}
}
async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
self.has.inc();
match identifier {
Identifier::Index(index) => Ok(self.has_index(index)),
Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
}
}
async fn sync(&mut self) -> Result<(), Error> {
let pending: Vec<u64> = self.pending.iter().copied().collect();
self.syncs.inc_by(pending.len() as u64);
let syncs: Vec<_> = pending.iter().map(|s| self.oversized.sync(*s)).collect();
try_join_all(syncs).await?;
self.pending.clear();
Ok(())
}
fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
self.intervals.next_gap(index)
}
fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
self.intervals.missing_items(index, max)
}
fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
self.intervals.iter().map(|(&s, &e)| (s, e))
}
fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)> {
self.intervals.iter_from(from).map(|(&s, &e)| (s, e))
}
fn first_index(&self) -> Option<u64> {
self.intervals.first_index()
}
fn last_index(&self) -> Option<u64> {
self.intervals.last_index()
}
async fn destroy(self) -> Result<(), Error> {
Ok(self.oversized.destroy().await?)
}
}
impl<T: Translator, E: BufferPooler + Storage + Metrics, K: Array, V: CodecShared>
crate::archive::MultiArchive for Archive<T, E, K, V>
{
async fn get_all(&self, index: u64) -> Result<Option<Vec<V>>, Error> {
self.gets.inc();
if !self.indices.contains_key(&index) {
return Ok(None);
}
let section = self.section(index);
let extra_count = self.extra_indices.get(&index).map_or(0, Vec::len);
let mut values = Vec::with_capacity(1 + extra_count);
for position in self.iter_positions(index) {
let entry = self.oversized.get(section, position).await?;
let (value_offset, value_size) = entry.value_location();
let value = self
.oversized
.get_value(section, value_offset, value_size)
.await?;
values.push(value);
}
Ok(Some(values))
}
async fn put_multi(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
self.put_internal(index, key, data, false).await
}
}
#[cfg(all(test, feature = "arbitrary"))]
mod conformance {
use super::*;
use commonware_codec::conformance::CodecConformance;
use commonware_utils::sequence::U64;
commonware_conformance::conformance_tests! {
CodecConformance<Record<U64>>
}
}