use crate::journal::Error;
use commonware_runtime::{
buffer::{
paged::{Append, CacheRef},
Write,
},
telemetry::metrics::status::GaugeExt,
Blob, BufferPool, Error as RError, Metrics, Storage,
};
use commonware_utils::hex;
use futures::future::try_join_all;
use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
use std::{collections::BTreeMap, future::Future, mem::take, num::NonZeroUsize};
use tracing::debug;
pub trait SectionBuffer: Clone + Send + Sync {
fn size(&self) -> impl Future<Output = u64> + Send;
fn sync(&self) -> impl Future<Output = Result<(), RError>> + Send;
fn resize(&self, len: u64) -> impl Future<Output = Result<(), RError>> + Send;
}
impl<B: Blob> SectionBuffer for Append<B> {
async fn size(&self) -> u64 {
Self::size(self).await
}
async fn sync(&self) -> Result<(), RError> {
Self::sync(self).await
}
async fn resize(&self, len: u64) -> Result<(), RError> {
Self::resize(self, len).await
}
}
impl<B: Blob> SectionBuffer for Write<B> {
async fn size(&self) -> u64 {
Self::size(self).await
}
async fn sync(&self) -> Result<(), RError> {
Self::sync(self).await
}
async fn resize(&self, len: u64) -> Result<(), RError> {
Self::resize(self, len).await
}
}
pub trait BufferFactory<B: Blob>: Clone + Send + Sync {
type Buffer: SectionBuffer;
fn create(
&self,
blob: B,
size: u64,
) -> impl Future<Output = Result<Self::Buffer, RError>> + Send;
}
#[derive(Clone)]
pub struct AppendFactory {
pub write_buffer: NonZeroUsize,
pub page_cache_ref: CacheRef,
}
impl<B: Blob> BufferFactory<B> for AppendFactory {
type Buffer = Append<B>;
async fn create(&self, blob: B, size: u64) -> Result<Self::Buffer, RError> {
Append::new(
blob,
size,
self.write_buffer.get(),
self.page_cache_ref.clone(),
)
.await
}
}
#[derive(Clone)]
pub struct WriteFactory {
pub capacity: NonZeroUsize,
pub pool: BufferPool,
}
impl<B: Blob> BufferFactory<B> for WriteFactory {
type Buffer = Write<B>;
async fn create(&self, blob: B, size: u64) -> Result<Self::Buffer, RError> {
Ok(Write::new(blob, size, self.capacity, self.pool.clone()))
}
}
#[derive(Clone)]
pub struct Config<F> {
pub partition: String,
pub factory: F,
}
pub struct Manager<E: Storage + Metrics, F: BufferFactory<E::Blob>> {
context: E,
partition: String,
factory: F,
pub(crate) blobs: BTreeMap<u64, F::Buffer>,
oldest_retained_section: u64,
tracked: Gauge,
synced: Counter,
pruned: Counter,
}
impl<E: Storage + Metrics, F: BufferFactory<E::Blob>> Manager<E, F> {
pub async fn init(context: E, cfg: Config<F>) -> Result<Self, Error> {
let mut blobs = BTreeMap::new();
let stored_blobs = match context.scan(&cfg.partition).await {
Ok(blobs) => blobs,
Err(RError::PartitionMissing(_)) => Vec::new(),
Err(err) => return Err(Error::Runtime(err)),
};
for name in stored_blobs {
let (blob, size) = context.open(&cfg.partition, &name).await?;
let hex_name = hex(&name);
let section = match name.try_into() {
Ok(section) => u64::from_be_bytes(section),
Err(_) => return Err(Error::InvalidBlobName(hex_name)),
};
debug!(section, blob = hex_name, size, "loaded section");
let buffer = cfg.factory.create(blob, size).await?;
blobs.insert(section, buffer);
}
let tracked = Gauge::default();
let synced = Counter::default();
let pruned = Counter::default();
context.register("tracked", "Number of blobs", tracked.clone());
context.register("synced", "Number of syncs", synced.clone());
context.register("pruned", "Number of blobs pruned", pruned.clone());
let _ = tracked.try_set(blobs.len());
Ok(Self {
context,
partition: cfg.partition,
factory: cfg.factory,
blobs,
oldest_retained_section: 0,
tracked,
synced,
pruned,
})
}
pub const fn prune_guard(&self, section: u64) -> Result<(), Error> {
if section < self.oldest_retained_section {
Err(Error::AlreadyPrunedToSection(self.oldest_retained_section))
} else {
Ok(())
}
}
pub fn get(&self, section: u64) -> Result<Option<&F::Buffer>, Error> {
self.prune_guard(section)?;
Ok(self.blobs.get(§ion))
}
pub async fn get_or_create(&mut self, section: u64) -> Result<&mut F::Buffer, Error> {
self.prune_guard(section)?;
if !self.blobs.contains_key(§ion) {
let name = section.to_be_bytes();
let (blob, size) = self.context.open(&self.partition, &name).await?;
let buffer = self.factory.create(blob, size).await?;
self.tracked.inc();
self.blobs.insert(section, buffer);
}
Ok(self.blobs.get_mut(§ion).unwrap())
}
pub async fn sync(&self, section: u64) -> Result<(), Error> {
self.prune_guard(section)?;
if let Some(blob) = self.blobs.get(§ion) {
self.synced.inc();
blob.sync().await.map_err(Error::Runtime)?;
}
Ok(())
}
pub async fn sync_all(&self) -> Result<(), Error> {
let futures: Vec<_> = self.blobs.values().map(|blob| blob.sync()).collect();
let results = try_join_all(futures).await.map_err(Error::Runtime)?;
self.synced.inc_by(results.len() as u64);
Ok(())
}
pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
let mut pruned = false;
while let Some((§ion, _)) = self.blobs.first_key_value() {
if section >= min {
break;
}
let blob = self.blobs.remove(§ion).unwrap();
let size = blob.size().await;
drop(blob);
self.context
.remove(&self.partition, Some(§ion.to_be_bytes()))
.await?;
pruned = true;
debug!(section, size, "pruned blob");
self.tracked.dec();
self.pruned.inc();
}
if pruned {
self.oldest_retained_section = min;
}
Ok(pruned)
}
pub fn oldest_section(&self) -> Option<u64> {
self.blobs.first_key_value().map(|(&s, _)| s)
}
pub fn newest_section(&self) -> Option<u64> {
self.blobs.last_key_value().map(|(&s, _)| s)
}
pub fn is_empty(&self) -> bool {
self.blobs.is_empty()
}
pub fn num_sections(&self) -> usize {
self.blobs.len()
}
pub fn sections_from(&self, start_section: u64) -> impl Iterator<Item = (&u64, &F::Buffer)> {
self.blobs.range(start_section..)
}
pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
self.blobs.keys().copied()
}
pub async fn remove_section(&mut self, section: u64) -> Result<bool, Error> {
self.prune_guard(section)?;
if let Some(blob) = self.blobs.remove(§ion) {
let size = blob.size().await;
drop(blob);
self.context
.remove(&self.partition, Some(§ion.to_be_bytes()))
.await?;
self.tracked.dec();
debug!(section, size, "removed section");
Ok(true)
} else {
Ok(false)
}
}
pub async fn destroy(self) -> Result<(), Error> {
for (section, blob) in self.blobs.into_iter() {
let size = blob.size().await;
drop(blob);
debug!(section, size, "destroyed blob");
self.context
.remove(&self.partition, Some(§ion.to_be_bytes()))
.await?;
}
match self.context.remove(&self.partition, None).await {
Ok(()) => {}
Err(RError::PartitionMissing(_)) => {}
Err(err) => return Err(Error::Runtime(err)),
}
Ok(())
}
pub async fn clear(&mut self) -> Result<(), Error> {
let blobs = take(&mut self.blobs);
for (section, blob) in blobs {
let size = blob.size().await;
drop(blob);
debug!(section, size, "cleared blob");
self.context
.remove(&self.partition, Some(§ion.to_be_bytes()))
.await?;
}
let _ = self.tracked.try_set(0);
self.oldest_retained_section = 0;
Ok(())
}
pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
self.prune_guard(section)?;
let sections_to_remove: Vec<u64> = self
.blobs
.range((section + 1)..)
.rev()
.map(|(&s, _)| s)
.collect();
for s in sections_to_remove {
let blob = self.blobs.remove(&s).unwrap();
drop(blob);
self.context
.remove(&self.partition, Some(&s.to_be_bytes()))
.await?;
self.tracked.dec();
debug!(section = s, "removed blob during rewind");
}
if let Some(blob) = self.blobs.get(§ion) {
let current_size = blob.size().await;
if size < current_size {
blob.resize(size).await?;
debug!(
section,
old_size = current_size,
new_size = size,
"rewound blob"
);
}
}
Ok(())
}
pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
self.prune_guard(section)?;
if let Some(blob) = self.blobs.get(§ion) {
let current = blob.size().await;
if size < current {
blob.resize(size).await?;
debug!(section, from = current, to = size, "rewound section");
}
}
Ok(())
}
pub async fn size(&self, section: u64) -> Result<u64, Error> {
self.prune_guard(section)?;
match self.blobs.get(§ion) {
Some(blob) => Ok(blob.size().await),
None => Ok(0),
}
}
}