use super::{Config, Error};
use crate::{rmap::RMap, Context, Persistable};
use commonware_codec::{
CodecFixed, CodecFixedShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite,
};
use commonware_cryptography::{crc32, Crc32};
use commonware_runtime::{
buffer::{Read as ReadBuffer, Write},
Blob, Buf, BufMut, BufferPooler, Error as RError,
};
use commonware_utils::{bitmap::BitMap, hex, sync::AsyncMutex};
use futures::future::try_join_all;
use prometheus_client::metrics::counter::Counter;
use std::{
collections::{btree_map::Entry, BTreeMap, BTreeSet},
marker::PhantomData,
};
use tracing::{debug, warn};
#[derive(Debug, Clone)]
struct Record<V: CodecFixed<Cfg = ()>> {
value: V,
crc: u32,
}
impl<V: CodecFixed<Cfg = ()>> Record<V> {
fn new(value: V) -> Self {
let crc = Crc32::checksum(&value.encode());
Self { value, crc }
}
fn is_valid(&self) -> bool {
self.crc == Crc32::checksum(&self.value.encode())
}
}
impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
const SIZE: usize = V::SIZE + crc32::Digest::SIZE;
}
impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
fn write(&self, buf: &mut impl BufMut) {
self.value.write(buf);
self.crc.write(buf);
}
}
impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
let value = V::read(buf)?;
let crc = u32::read(buf)?;
Ok(Self { value, crc })
}
}
#[cfg(feature = "arbitrary")]
impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
where
V: for<'a> arbitrary::Arbitrary<'a>,
{
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
let value = V::arbitrary(u)?;
Ok(Self::new(value))
}
}
pub struct Ordinal<E: BufferPooler + Context, V: CodecFixed<Cfg = ()>> {
context: E,
config: Config,
blobs: BTreeMap<u64, Write<E::Blob>>,
intervals: RMap,
pending: AsyncMutex<BTreeSet<u64>>,
puts: Counter,
gets: Counter,
has: Counter,
syncs: Counter,
pruned: Counter,
_phantom: PhantomData<V>,
}
impl<E: BufferPooler + Context, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
pub async fn init(context: E, config: Config) -> Result<Self, Error> {
Self::init_with_bits(context, config, None).await
}
pub async fn init_with_bits(
context: E,
config: Config,
bits: Option<BTreeMap<u64, &Option<BitMap>>>,
) -> Result<Self, Error> {
let mut blobs = BTreeMap::new();
let stored_blobs = match context.scan(&config.partition).await {
Ok(blobs) => blobs,
Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
Err(err) => return Err(Error::Runtime(err)),
};
for name in stored_blobs {
let (blob, mut len) = context.open(&config.partition, &name).await?;
let index = match name.try_into() {
Ok(index) => u64::from_be_bytes(index),
Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
};
let record_size = Record::<V>::SIZE as u64;
if len % record_size != 0 {
warn!(
blob = index,
invalid_size = len,
record_size,
"blob size is not a multiple of record size, truncating"
);
len -= len % record_size;
blob.resize(len).await?;
blob.sync().await?;
}
debug!(blob = index, len, "found index blob");
blobs.insert(index, (blob, len));
}
debug!(
blobs = blobs.len(),
"rebuilding intervals from existing index"
);
let start = context.current();
let mut items = 0;
let mut intervals = RMap::new();
for (section, (blob, size)) in &blobs {
if let Some(bits) = &bits {
if !bits.contains_key(section) {
warn!(section, "skipping section without bits");
continue;
}
}
let mut replay_blob =
ReadBuffer::from_pooler(&context, blob.clone(), *size, config.replay_buffer);
let mut offset = 0;
let items_per_blob = config.items_per_blob.get();
while offset < *size {
let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
let mut must_exist = false;
if let Some(bits) = &bits {
let bits = bits.get(section).unwrap();
if let Some(bits) = bits {
let bit_index = offset as usize / Record::<V>::SIZE;
if !bits.get(bit_index as u64) {
offset += Record::<V>::SIZE as u64;
continue;
}
}
must_exist = true;
}
replay_blob.seek_to(offset)?;
let mut record_buf = replay_blob.read(Record::<V>::SIZE).await?;
offset += Record::<V>::SIZE as u64;
if let Ok(record) = Record::<V>::read(&mut record_buf) {
if record.is_valid() {
items += 1;
intervals.insert(index);
continue;
}
};
if must_exist {
return Err(Error::MissingRecord(index));
}
}
}
debug!(
items,
elapsed = ?context.current().duration_since(start).unwrap_or_default(),
"rebuilt intervals"
);
let blobs = blobs
.into_iter()
.map(|(index, (blob, len))| {
(
index,
Write::from_pooler(&context, blob, len, config.write_buffer),
)
})
.collect();
let puts = Counter::default();
let gets = Counter::default();
let has = Counter::default();
let syncs = Counter::default();
let pruned = Counter::default();
context.register("puts", "Number of put calls", puts.clone());
context.register("gets", "Number of get calls", gets.clone());
context.register("has", "Number of has calls", has.clone());
context.register("syncs", "Number of sync calls", syncs.clone());
context.register("pruned", "Number of pruned blobs", pruned.clone());
Ok(Self {
context,
config,
blobs,
intervals,
pending: AsyncMutex::new(BTreeSet::new()),
puts,
gets,
has,
syncs,
pruned,
_phantom: PhantomData,
})
}
pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
self.puts.inc();
let items_per_blob = self.config.items_per_blob.get();
let section = index / items_per_blob;
if let Entry::Vacant(entry) = self.blobs.entry(section) {
let (blob, len) = self
.context
.open(&self.config.partition, §ion.to_be_bytes())
.await?;
entry.insert(Write::from_pooler(
&self.context,
blob,
len,
self.config.write_buffer,
));
debug!(section, "created blob");
}
let blob = self.blobs.get(§ion).unwrap();
let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
let record = Record::new(value);
blob.write_at(offset, record.encode_mut()).await?;
self.pending.lock().await.insert(section);
self.intervals.insert(index);
Ok(())
}
pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
self.gets.inc();
if self.intervals.get(&index).is_none() {
return Ok(None);
}
let items_per_blob = self.config.items_per_blob.get();
let section = index / items_per_blob;
let blob = self.blobs.get(§ion).unwrap();
let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
let mut read_buf = blob.read_at(offset, Record::<V>::SIZE).await?;
let record = Record::<V>::read(&mut read_buf)?;
if record.is_valid() {
Ok(Some(record.value))
} else {
Err(Error::InvalidRecord(index))
}
}
pub fn has(&self, index: u64) -> bool {
self.has.inc();
self.intervals.get(&index).is_some()
}
pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
self.intervals.next_gap(index)
}
pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
self.intervals.iter().map(|(&s, &e)| (s, e))
}
pub fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)> + '_ {
self.intervals.iter_from(from).map(|(&s, &e)| (s, e))
}
pub fn first_index(&self) -> Option<u64> {
self.intervals.first_index()
}
pub fn last_index(&self) -> Option<u64> {
self.intervals.last_index()
}
pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
self.intervals.missing_items(start, max)
}
pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
let items_per_blob = self.config.items_per_blob.get();
let min_section = min / items_per_blob;
let sections_to_remove: Vec<u64> = self
.blobs
.keys()
.filter(|&§ion| section < min_section)
.copied()
.collect();
for section in sections_to_remove {
if let Some(blob) = self.blobs.remove(§ion) {
drop(blob);
self.context
.remove(&self.config.partition, Some(§ion.to_be_bytes()))
.await?;
let start_index = section * items_per_blob;
let end_index = (section + 1) * items_per_blob - 1;
self.intervals.remove(start_index, end_index);
debug!(section, start_index, end_index, "pruned blob");
}
self.pruned.inc();
}
self.pending
.lock()
.await
.retain(|§ion| section >= min_section);
Ok(())
}
pub async fn sync(&self) -> Result<(), Error> {
self.syncs.inc();
let mut pending = self.pending.lock().await;
if pending.is_empty() {
return Ok(());
}
let mut futures = Vec::with_capacity(pending.len());
for section in pending.iter() {
futures.push(self.blobs.get(section).unwrap().sync());
}
try_join_all(futures).await?;
pending.clear();
Ok(())
}
pub async fn destroy(self) -> Result<(), Error> {
for (i, blob) in self.blobs.into_iter() {
drop(blob);
self.context
.remove(&self.config.partition, Some(&i.to_be_bytes()))
.await?;
debug!(section = i, "destroyed blob");
}
match self.context.remove(&self.config.partition, None).await {
Ok(()) => {}
Err(RError::PartitionMissing(_)) => {
}
Err(err) => return Err(Error::Runtime(err)),
}
Ok(())
}
}
impl<E: BufferPooler + Context, V: CodecFixedShared> Persistable for Ordinal<E, V> {
type Error = Error;
async fn commit(&self) -> Result<(), Self::Error> {
self.sync().await
}
async fn sync(&self) -> Result<(), Self::Error> {
self.sync().await
}
async fn destroy(self) -> Result<(), Self::Error> {
self.destroy().await
}
}
#[cfg(all(test, feature = "arbitrary"))]
mod conformance {
use super::*;
use commonware_codec::conformance::CodecConformance;
commonware_conformance::conformance_tests! {
CodecConformance<Record<u32>>
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_runtime::deterministic::Context;
type TestOrdinal = Ordinal<Context, u64>;
fn is_send<T: Send>(_: T) {}
#[allow(dead_code)]
fn assert_ordinal_futures_are_send(ordinal: &mut TestOrdinal, key: u64) {
is_send(ordinal.get(key));
is_send(ordinal.put(key, 0u64));
}
#[allow(dead_code)]
fn assert_ordinal_destroy_is_send(ordinal: TestOrdinal) {
is_send(ordinal.destroy());
}
}