use super::manager::{Config as ManagerConfig, Manager, WriteFactory};
use crate::journal::Error;
use commonware_codec::{Codec, CodecShared, FixedSize};
use commonware_cryptography::{crc32, Crc32};
use commonware_runtime::{BufMut, BufferPooler, Error as RError, Metrics, Storage};
use std::{io::Cursor, num::NonZeroUsize};
use zstd::{bulk::compress, decode_all};
#[derive(Clone)]
pub struct Config<C> {
pub partition: String,
pub compression: Option<u8>,
pub codec_config: C,
pub write_buffer: NonZeroUsize,
}
pub struct Glob<E: BufferPooler + Storage + Metrics, V: Codec> {
manager: Manager<E, WriteFactory>,
compression: Option<u8>,
codec_config: V::Cfg,
}
impl<E: BufferPooler + Storage + Metrics, V: CodecShared> Glob<E, V> {
pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
let manager_cfg = ManagerConfig {
partition: cfg.partition,
factory: WriteFactory {
capacity: cfg.write_buffer,
pool: context.storage_buffer_pool().clone(),
},
};
let manager = Manager::init(context, manager_cfg).await?;
Ok(Self {
manager,
compression: cfg.compression,
codec_config: cfg.codec_config,
})
}
pub async fn append(&mut self, section: u64, value: &V) -> Result<(u64, u32), Error> {
let buf = if let Some(level) = self.compression {
let encoded = value.encode();
let mut compressed =
compress(&encoded, level as i32).map_err(|_| Error::CompressionFailed)?;
let checksum = Crc32::checksum(&compressed);
compressed.put_u32(checksum);
compressed
} else {
let entry_size = value.encode_size() + crc32::Digest::SIZE;
let mut buf = Vec::with_capacity(entry_size);
value.write(&mut buf);
let checksum = Crc32::checksum(&buf);
buf.put_u32(checksum);
buf
};
let entry_size = u32::try_from(buf.len()).map_err(|_| Error::ValueTooLarge)?;
let writer = self.manager.get_or_create(section).await?;
let offset = writer.size().await;
writer.write_at(offset, buf).await.map_err(Error::Runtime)?;
Ok((offset, entry_size))
}
pub async fn get(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
let writer = self
.manager
.get(section)?
.ok_or(Error::SectionOutOfRange(section))?;
let buf = writer.read_at(offset, size as usize).await?.coalesce();
if buf.len() < crc32::Digest::SIZE {
return Err(Error::Runtime(RError::BlobInsufficientLength));
}
let data_len = buf.len() - crc32::Digest::SIZE;
let compressed_data = &buf.as_ref()[..data_len];
let stored_checksum = u32::from_be_bytes(
buf.as_ref()[data_len..]
.try_into()
.expect("checksum is 4 bytes"),
);
let checksum = Crc32::checksum(compressed_data);
if checksum != stored_checksum {
return Err(Error::ChecksumMismatch(stored_checksum, checksum));
}
let value = if self.compression.is_some() {
let decompressed =
decode_all(Cursor::new(compressed_data)).map_err(|_| Error::DecompressionFailed)?;
V::decode_cfg(decompressed.as_ref(), &self.codec_config).map_err(Error::Codec)?
} else {
V::decode_cfg(compressed_data, &self.codec_config).map_err(Error::Codec)?
};
Ok(value)
}
pub async fn sync(&self, section: u64) -> Result<(), Error> {
self.manager.sync(section).await
}
pub async fn sync_all(&self) -> Result<(), Error> {
self.manager.sync_all().await
}
pub async fn size(&self, section: u64) -> Result<u64, Error> {
self.manager.size(section).await
}
pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
self.manager.rewind(section, size).await
}
pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
self.manager.rewind_section(section, size).await
}
pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
self.manager.prune(min).await
}
pub fn oldest_section(&self) -> Option<u64> {
self.manager.oldest_section()
}
pub fn newest_section(&self) -> Option<u64> {
self.manager.newest_section()
}
pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
self.manager.sections()
}
pub async fn remove_section(&mut self, section: u64) -> Result<bool, Error> {
self.manager.remove_section(section).await
}
pub async fn close(&mut self) -> Result<(), Error> {
self.sync_all().await
}
pub async fn destroy(self) -> Result<(), Error> {
self.manager.destroy().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_macros::test_traced;
use commonware_runtime::{deterministic, Metrics, Runner};
use commonware_utils::NZUsize;
fn test_cfg() -> Config<()> {
Config {
partition: "test-partition".into(),
compression: None,
codec_config: (),
write_buffer: NZUsize!(1024),
}
}
#[test_traced]
fn test_glob_append_and_get() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
.await
.expect("Failed to init glob");
let value: i32 = 42;
let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
assert_eq!(offset, 0);
let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
assert_eq!(retrieved, value);
glob.sync(1).await.expect("Failed to sync");
let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
assert_eq!(retrieved, value);
glob.destroy().await.expect("Failed to destroy");
});
}
#[test_traced]
fn test_glob_multiple_values() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
.await
.expect("Failed to init glob");
let values: Vec<i32> = vec![1, 2, 3, 4, 5];
let mut locations = Vec::new();
for value in &values {
let (offset, size) = glob.append(1, value).await.expect("Failed to append");
locations.push((offset, size));
}
for (i, (offset, size)) in locations.iter().enumerate() {
let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
assert_eq!(retrieved, values[i]);
}
glob.destroy().await.expect("Failed to destroy");
});
}
#[test_traced]
fn test_glob_with_compression() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
partition: "test-partition".into(),
compression: Some(3), codec_config: (),
write_buffer: NZUsize!(1024),
};
let mut glob: Glob<_, [u8; 100]> = Glob::init(context.clone(), cfg)
.await
.expect("Failed to init glob");
let value: [u8; 100] = [0u8; 100]; let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
assert!(size < 100 + 4);
let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
assert_eq!(retrieved, value);
glob.destroy().await.expect("Failed to destroy");
});
}
#[test_traced]
fn test_glob_prune() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
.await
.expect("Failed to init glob");
for section in 1..=5 {
glob.append(section, &(section as i32))
.await
.expect("Failed to append");
glob.sync(section).await.expect("Failed to sync");
}
glob.prune(3).await.expect("Failed to prune");
assert!(glob.get(1, 0, 8).await.is_err());
assert!(glob.get(2, 0, 8).await.is_err());
assert!(glob.manager.blobs.contains_key(&3));
assert!(glob.manager.blobs.contains_key(&4));
assert!(glob.manager.blobs.contains_key(&5));
glob.destroy().await.expect("Failed to destroy");
});
}
#[test_traced]
fn test_glob_checksum_mismatch() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
.await
.expect("Failed to init glob");
let value: i32 = 42;
let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
glob.sync(1).await.expect("Failed to sync");
let writer = glob.manager.blobs.get(&1).unwrap();
writer
.write_at(offset, vec![0xFF, 0xFF, 0xFF, 0xFF])
.await
.expect("Failed to corrupt");
writer.sync().await.expect("Failed to sync");
let result = glob.get(1, offset, size).await;
assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
glob.destroy().await.expect("Failed to destroy");
});
}
#[test_traced]
fn test_glob_rewind() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
.await
.expect("Failed to init glob");
let values: Vec<i32> = vec![1, 2, 3, 4, 5];
let mut locations = Vec::new();
for value in &values {
let (offset, size) = glob.append(1, value).await.expect("Failed to append");
locations.push((offset, size));
}
glob.sync(1).await.expect("Failed to sync");
let (third_offset, third_size) = locations[2];
let rewind_size = third_offset + u64::from(third_size);
glob.rewind_section(1, rewind_size)
.await
.expect("Failed to rewind");
for (i, (offset, size)) in locations.iter().take(3).enumerate() {
let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
assert_eq!(retrieved, values[i]);
}
let (fourth_offset, fourth_size) = locations[3];
let result = glob.get(1, fourth_offset, fourth_size).await;
assert!(result.is_err());
glob.destroy().await.expect("Failed to destroy");
});
}
#[test_traced]
fn test_glob_persistence() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = test_cfg();
let mut glob: Glob<_, i32> = Glob::init(context.with_label("first"), cfg.clone())
.await
.expect("Failed to init glob");
let value: i32 = 42;
let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
glob.sync(1).await.expect("Failed to sync");
drop(glob);
let glob: Glob<_, i32> = Glob::init(context.with_label("second"), cfg)
.await
.expect("Failed to reinit glob");
let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
assert_eq!(retrieved, value);
glob.destroy().await.expect("Failed to destroy");
});
}
#[test_traced]
fn test_glob_get_invalid_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
.await
.expect("Failed to init glob");
let (offset, _size) = glob.append(1, &42).await.expect("Failed to append");
glob.sync(1).await.expect("Failed to sync");
assert!(glob.get(1, offset, 0).await.is_err());
for size in 1..4u32 {
let result = glob.get(1, offset, size).await;
assert!(matches!(
result,
Err(Error::Runtime(RError::BlobInsufficientLength))
));
}
glob.destroy().await.expect("Failed to destroy");
});
}
#[test_traced]
fn test_glob_get_wrong_size() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
.await
.expect("Failed to init glob");
let (offset, correct_size) = glob.append(1, &42).await.expect("Failed to append");
glob.sync(1).await.expect("Failed to sync");
let result = glob.get(1, offset, correct_size - 1).await;
assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
glob.destroy().await.expect("Failed to destroy");
});
}
}