use crate::{
archive::{immutable::Config, Error, Identifier},
freezer::{self, Checkpoint, Cursor, Freezer},
metadata::{self, Metadata},
ordinal::{self, Ordinal},
Context,
};
use commonware_codec::{CodecShared, EncodeSize, FixedSize, Read, ReadExt, Write};
use commonware_runtime::{Buf, BufMut, BufferPooler};
use commonware_utils::{bitmap::BitMap, sequence::prefixed_u64::U64, Array};
use futures::join;
use prometheus_client::metrics::counter::Counter;
use std::collections::BTreeMap;
use tracing::debug;
const FREEZER_PREFIX: u8 = 0;
const ORDINAL_PREFIX: u8 = 1;
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
enum Record {
Freezer(Checkpoint),
Ordinal(Option<BitMap>),
}
impl Record {
fn freezer(&self) -> &Checkpoint {
match self {
Self::Freezer(checkpoint) => checkpoint,
_ => panic!("incorrect record"),
}
}
fn ordinal(&self) -> &Option<BitMap> {
match self {
Self::Ordinal(indices) => indices,
_ => panic!("incorrect record"),
}
}
}
impl Write for Record {
fn write(&self, buf: &mut impl BufMut) {
match self {
Self::Freezer(checkpoint) => {
buf.put_u8(0);
checkpoint.write(buf);
}
Self::Ordinal(indices) => {
buf.put_u8(1);
indices.write(buf);
}
}
}
}
impl Read for Record {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
let tag = u8::read(buf)?;
match tag {
0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
1 => Ok(Self::Ordinal(Option::<BitMap>::read_cfg(
buf,
&(usize::MAX as u64),
)?)),
_ => Err(commonware_codec::Error::InvalidEnum(tag)),
}
}
}
impl EncodeSize for Record {
fn encode_size(&self) -> usize {
1 + match self {
Self::Freezer(_) => Checkpoint::SIZE,
Self::Ordinal(indices) => indices.encode_size(),
}
}
}
pub struct Archive<E: BufferPooler + Context, K: Array, V: CodecShared> {
items_per_section: u64,
metadata: Metadata<E, U64, Record>,
freezer: Freezer<E, K, V>,
ordinal: Ordinal<E, Cursor>,
gets: Counter,
has: Counter,
syncs: Counter,
}
impl<E: BufferPooler + Context, K: Array, V: CodecShared> Archive<E, K, V> {
pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
let metadata = Metadata::<E, U64, Record>::init(
context.with_label("metadata"),
metadata::Config {
partition: cfg.metadata_partition,
codec_config: (),
},
)
.await?;
let freezer_key = U64::new(FREEZER_PREFIX, 0);
let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
let freezer = Freezer::init_with_checkpoint(
context.with_label("freezer"),
freezer::Config {
key_partition: cfg.freezer_key_partition,
key_write_buffer: cfg.freezer_key_write_buffer,
key_page_cache: cfg.freezer_key_page_cache,
value_partition: cfg.freezer_value_partition,
value_compression: cfg.freezer_value_compression,
value_write_buffer: cfg.freezer_value_write_buffer,
value_target_size: cfg.freezer_value_target_size,
table_partition: cfg.freezer_table_partition,
table_initial_size: cfg.freezer_table_initial_size,
table_resize_frequency: cfg.freezer_table_resize_frequency,
table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
table_replay_buffer: cfg.replay_buffer,
codec_config: cfg.codec_config,
},
checkpoint,
)
.await?;
let sections = metadata
.keys()
.filter(|k| k.prefix() == ORDINAL_PREFIX)
.collect::<Vec<_>>();
let mut section_bits = BTreeMap::new();
for section in sections {
let bits = metadata.get(section).unwrap().ordinal();
let section = section.value();
section_bits.insert(section, bits);
}
let ordinal = Ordinal::init_with_bits(
context.with_label("ordinal"),
ordinal::Config {
partition: cfg.ordinal_partition,
items_per_blob: cfg.items_per_section,
write_buffer: cfg.ordinal_write_buffer,
replay_buffer: cfg.replay_buffer,
},
Some(section_bits),
)
.await?;
let gets = Counter::default();
let has = Counter::default();
let syncs = Counter::default();
context.register("gets", "Number of gets performed", gets.clone());
context.register("has", "Number of has performed", has.clone());
context.register("syncs", "Number of syncs called", syncs.clone());
Ok(Self {
items_per_section: cfg.items_per_section.get(),
metadata,
freezer,
ordinal,
gets,
has,
syncs,
})
}
async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
let Some(cursor) = self.ordinal.get(index).await? else {
return Ok(None);
};
let result = self
.freezer
.get(freezer::Identifier::Cursor(cursor))
.await?;
Ok(result)
}
async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
Ok(result)
}
fn initialize_section(&mut self, section: u64) {
let bits = BitMap::zeroes(self.items_per_section);
let key = U64::new(ORDINAL_PREFIX, section);
self.metadata.put(key, Record::Ordinal(Some(bits)));
debug!(section, "initialized section");
}
}
impl<E: BufferPooler + Context, K: Array, V: CodecShared> crate::archive::Archive
for Archive<E, K, V>
{
type Key = K;
type Value = V;
async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
if self.ordinal.has(index) {
return Ok(());
}
let section = index / self.items_per_section;
let ordinal_key = U64::new(ORDINAL_PREFIX, section);
if self.metadata.get(&ordinal_key).is_none() {
self.initialize_section(section);
}
let record = self.metadata.get_mut(&ordinal_key).unwrap();
let done = if let Record::Ordinal(Some(bits)) = record {
bits.set(index % self.items_per_section, true);
bits.count_ones() == self.items_per_section
} else {
false
};
if done {
*record = Record::Ordinal(None);
}
let cursor = self.freezer.put(key, data).await?;
self.ordinal.put(index, cursor).await?;
Ok(())
}
async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
self.gets.inc();
match identifier {
Identifier::Index(index) => self.get_index(index).await,
Identifier::Key(key) => self.get_key(key).await,
}
}
async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
self.has.inc();
match identifier {
Identifier::Index(index) => Ok(self.ordinal.has(index)),
Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
}
}
async fn sync(&mut self) -> Result<(), Error> {
self.syncs.inc();
let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
let checkpoint = freezer_result?;
ordinal_result?;
let freezer_key = U64::new(FREEZER_PREFIX, 0);
self.metadata.put(freezer_key, Record::Freezer(checkpoint));
self.metadata.sync().await?;
Ok(())
}
fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
self.ordinal.next_gap(index)
}
fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
self.ordinal.missing_items(index, max)
}
fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
self.ordinal.ranges()
}
fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)> {
self.ordinal.ranges_from(from)
}
fn first_index(&self) -> Option<u64> {
self.ordinal.first_index()
}
fn last_index(&self) -> Option<u64> {
self.ordinal.last_index()
}
async fn destroy(self) -> Result<(), Error> {
self.ordinal.destroy().await?;
self.freezer.destroy().await?;
self.metadata.destroy().await?;
Ok(())
}
}
#[cfg(all(test, feature = "arbitrary"))]
mod conformance {
use super::*;
use commonware_codec::conformance::CodecConformance;
commonware_conformance::conformance_tests! {
CodecConformance<Record>
}
}