use super::{Config, Error, Identifier};
use crate::{
journal::segmented::oversized::{
Config as OversizedConfig, Oversized, Record as OversizedRecord,
},
Context,
};
use commonware_codec::{CodecShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
use commonware_cryptography::{crc32, Crc32, Hasher};
use commonware_runtime::{buffer, Blob, Buf, BufMut, BufferPooler, IoBuf};
use commonware_utils::{Array, Span};
use futures::future::{try_join, try_join_all};
use prometheus_client::metrics::counter::Counter;
use std::{cmp::Ordering, collections::BTreeSet, num::NonZeroUsize, ops::Deref};
use tracing::debug;
const RESIZE_THRESHOLD: u64 = 50;
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[repr(transparent)]
pub struct Cursor([u8; u64::SIZE + u64::SIZE + u32::SIZE]);
impl Cursor {
fn new(section: u64, offset: u64, size: u32) -> Self {
let mut buf = [0u8; u64::SIZE + u64::SIZE + u32::SIZE];
buf[..u64::SIZE].copy_from_slice(§ion.to_be_bytes());
buf[u64::SIZE..u64::SIZE + u64::SIZE].copy_from_slice(&offset.to_be_bytes());
buf[u64::SIZE + u64::SIZE..].copy_from_slice(&size.to_be_bytes());
Self(buf)
}
fn section(&self) -> u64 {
u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
}
fn offset(&self) -> u64 {
u64::from_be_bytes(self.0[u64::SIZE..u64::SIZE + u64::SIZE].try_into().unwrap())
}
fn size(&self) -> u32 {
u32::from_be_bytes(self.0[u64::SIZE + u64::SIZE..].try_into().unwrap())
}
}
impl Read for Cursor {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
<[u8; u64::SIZE + u64::SIZE + u32::SIZE]>::read(buf).map(Self)
}
}
impl CodecWrite for Cursor {
fn write(&self, buf: &mut impl BufMut) {
self.0.write(buf);
}
}
impl FixedSize for Cursor {
const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
}
impl Span for Cursor {}
impl Array for Cursor {}
impl Deref for Cursor {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl AsRef<[u8]> for Cursor {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl std::fmt::Debug for Cursor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Cursor(section={}, offset={}, size={})",
self.section(),
self.offset(),
self.size()
)
}
}
impl std::fmt::Display for Cursor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Cursor(section={}, offset={}, size={})",
self.section(),
self.offset(),
self.size()
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
pub struct Checkpoint {
epoch: u64,
section: u64,
oversized_size: u64,
table_size: u32,
}
impl Checkpoint {
const fn init(table_size: u32) -> Self {
Self {
table_size,
epoch: 0,
section: 0,
oversized_size: 0,
}
}
}
impl Read for Checkpoint {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
let epoch = u64::read(buf)?;
let section = u64::read(buf)?;
let oversized_size = u64::read(buf)?;
let table_size = u32::read(buf)?;
Ok(Self {
epoch,
section,
oversized_size,
table_size,
})
}
}
impl CodecWrite for Checkpoint {
fn write(&self, buf: &mut impl BufMut) {
self.epoch.write(buf);
self.section.write(buf);
self.oversized_size.write(buf);
self.table_size.write(buf);
}
}
impl FixedSize for Checkpoint {
const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
}
const TABLE_BLOB_NAME: &[u8] = b"table";
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
struct Entry {
epoch: u64,
section: u64,
position: u64,
added: u8,
crc: u32,
}
impl Entry {
const FULL_SIZE: usize = Self::SIZE * 2;
fn compute_crc(epoch: u64, section: u64, position: u64, added: u8) -> u32 {
let mut hasher = Crc32::new();
hasher.update(&epoch.to_be_bytes());
hasher.update(§ion.to_be_bytes());
hasher.update(&position.to_be_bytes());
hasher.update(&added.to_be_bytes());
hasher.finalize().as_u32()
}
fn new(epoch: u64, section: u64, position: u64, added: u8) -> Self {
Self {
epoch,
section,
position,
added,
crc: Self::compute_crc(epoch, section, position, added),
}
}
const fn new_empty() -> Self {
Self {
epoch: 0,
section: 0,
position: 0,
added: 0,
crc: 0,
}
}
const fn is_empty(&self) -> bool {
self.epoch == 0
&& self.section == 0
&& self.position == 0
&& self.added == 0
&& self.crc == 0
}
fn is_valid(&self) -> bool {
Self::compute_crc(self.epoch, self.section, self.position, self.added) == self.crc
}
}
impl FixedSize for Entry {
const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u8::SIZE + crc32::Digest::SIZE;
}
impl CodecWrite for Entry {
fn write(&self, buf: &mut impl BufMut) {
self.epoch.write(buf);
self.section.write(buf);
self.position.write(buf);
self.added.write(buf);
self.crc.write(buf);
}
}
impl Read for Entry {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
let epoch = u64::read(buf)?;
let section = u64::read(buf)?;
let position = u64::read(buf)?;
let added = u8::read(buf)?;
let crc = u32::read(buf)?;
Ok(Self {
epoch,
section,
position,
added,
crc,
})
}
}
const NO_NEXT_SECTION: u64 = u64::MAX;
const NO_NEXT_POSITION: u64 = u64::MAX;
#[derive(Debug, Clone, PartialEq)]
struct Record<K: Array> {
key: K,
next_section: u64,
next_position: u64,
value_offset: u64,
value_size: u32,
}
impl<K: Array> Record<K> {
fn new(key: K, next: Option<(u64, u64)>, value_offset: u64, value_size: u32) -> Self {
let (next_section, next_position) = next.unwrap_or((NO_NEXT_SECTION, NO_NEXT_POSITION));
Self {
key,
next_section,
next_position,
value_offset,
value_size,
}
}
const fn next(&self) -> Option<(u64, u64)> {
if self.next_section == NO_NEXT_SECTION && self.next_position == NO_NEXT_POSITION {
None
} else {
Some((self.next_section, self.next_position))
}
}
}
impl<K: Array> CodecWrite for Record<K> {
fn write(&self, buf: &mut impl BufMut) {
self.key.write(buf);
self.next_section.write(buf);
self.next_position.write(buf);
self.value_offset.write(buf);
self.value_size.write(buf);
}
}
impl<K: Array> Read for Record<K> {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
let key = K::read(buf)?;
let next_section = u64::read(buf)?;
let next_position = u64::read(buf)?;
let value_offset = u64::read(buf)?;
let value_size = u32::read(buf)?;
Ok(Self {
key,
next_section,
next_position,
value_offset,
value_size,
})
}
}
impl<K: Array> FixedSize for Record<K> {
const SIZE: usize = K::SIZE + u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
}
impl<K: Array> OversizedRecord for Record<K> {
fn value_location(&self) -> (u64, u32) {
(self.value_offset, self.value_size)
}
fn with_location(mut self, offset: u64, size: u32) -> Self {
self.value_offset = offset;
self.value_size = size;
self
}
}
#[cfg(feature = "arbitrary")]
impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
where
K: for<'a> arbitrary::Arbitrary<'a>,
{
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
Ok(Self {
key: K::arbitrary(u)?,
next_section: u64::arbitrary(u)?,
next_position: u64::arbitrary(u)?,
value_offset: u64::arbitrary(u)?,
value_size: u32::arbitrary(u)?,
})
}
}
pub struct Freezer<E: BufferPooler + Context, K: Array, V: CodecShared> {
context: E,
table_partition: String,
table_size: u32,
table_resize_threshold: u64,
table_resize_frequency: u8,
table_resize_chunk_size: u32,
table: E::Blob,
oversized: Oversized<E, Record<K>, V>,
blob_target_size: u64,
current_section: u64,
next_epoch: u64,
modified_sections: BTreeSet<u64>,
resizable: u32,
resize_progress: Option<u32>,
puts: Counter,
gets: Counter,
unnecessary_reads: Counter,
unnecessary_writes: Counter,
resizes: Counter,
}
impl<E: BufferPooler + Context, K: Array, V: CodecShared> Freezer<E, K, V> {
#[inline]
const fn table_offset(table_index: u32) -> u64 {
table_index as u64 * Entry::FULL_SIZE as u64
}
fn parse_entries(mut buf: impl Buf) -> Result<(Entry, Entry), Error> {
let entry1 = Entry::read(&mut buf)?;
let entry2 = Entry::read(&mut buf)?;
Ok((entry1, entry2))
}
async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
let offset = Self::table_offset(table_index);
let read_buf = blob.read_at(offset, Entry::FULL_SIZE).await?;
Self::parse_entries(read_buf)
}
async fn recover_entry(
blob: &E::Blob,
entry: &mut Entry,
entry_offset: u64,
max_valid_epoch: Option<u64>,
max_epoch: &mut u64,
max_section: &mut u64,
) -> Result<bool, Error> {
if entry.is_empty() {
return Ok(false);
}
if !entry.is_valid()
|| (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
{
debug!(
valid_epoch = max_valid_epoch,
entry_epoch = entry.epoch,
"found invalid table entry"
);
*entry = Entry::new_empty();
let zero_buf = vec![0u8; Entry::SIZE];
blob.write_at(entry_offset, zero_buf).await?;
Ok(true)
} else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
*max_epoch = entry.epoch;
*max_section = entry.section;
Ok(false)
} else {
Ok(false)
}
}
async fn recover_table(
pooler: &impl BufferPooler,
blob: &E::Blob,
table_size: u32,
table_resize_frequency: u8,
max_valid_epoch: Option<u64>,
table_replay_buffer: NonZeroUsize,
) -> Result<(bool, u64, u64, u32), Error> {
let blob_size = Self::table_offset(table_size);
let mut reader =
buffer::Read::from_pooler(pooler, blob.clone(), blob_size, table_replay_buffer);
let mut modified = false;
let mut max_epoch = 0u64;
let mut max_section = 0u64;
let mut resizable = 0u32;
for table_index in 0..table_size {
let offset = Self::table_offset(table_index);
let entry_buf = reader.read(Entry::FULL_SIZE).await?;
let (mut entry1, mut entry2) = Self::parse_entries(entry_buf)?;
let entry1_cleared = Self::recover_entry(
blob,
&mut entry1,
offset,
max_valid_epoch,
&mut max_epoch,
&mut max_section,
)
.await?;
let entry2_cleared = Self::recover_entry(
blob,
&mut entry2,
offset + Entry::SIZE as u64,
max_valid_epoch,
&mut max_epoch,
&mut max_section,
)
.await?;
modified |= entry1_cleared || entry2_cleared;
if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
if added >= table_resize_frequency {
resizable += 1;
}
}
}
Ok((modified, max_epoch, max_section, resizable))
}
const fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
if !entry1.is_empty() && entry1.epoch == epoch {
return 0;
}
if !entry2.is_empty() && entry2.epoch == epoch {
return Entry::SIZE as u64;
}
match (entry1.is_empty(), entry2.is_empty()) {
(true, _) => 0, (_, true) => Entry::SIZE as u64, (false, false) => {
if entry1.epoch < entry2.epoch {
0
} else {
Entry::SIZE as u64
}
}
}
}
fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u64, u8)> {
match (
!entry1.is_empty() && entry1.is_valid(),
!entry2.is_empty() && entry2.is_valid(),
) {
(true, true) => match entry1.epoch.cmp(&entry2.epoch) {
Ordering::Greater => Some((entry1.section, entry1.position, entry1.added)),
Ordering::Less => Some((entry2.section, entry2.position, entry2.added)),
Ordering::Equal => {
unreachable!("two valid entries with the same epoch")
}
},
(true, false) => Some((entry1.section, entry1.position, entry1.added)),
(false, true) => Some((entry2.section, entry2.position, entry2.added)),
(false, false) => None,
}
}
async fn update_head(
table: &E::Blob,
table_index: u32,
entry1: &Entry,
entry2: &Entry,
update: Entry,
) -> Result<(), Error> {
let table_offset = Self::table_offset(table_index);
let start = Self::compute_write_offset(entry1, entry2, update.epoch);
table
.write_at(table_offset + start, update.encode_mut())
.await
.map_err(Error::Runtime)
}
async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
let table_len = Self::table_offset(table_size);
blob.resize(table_len).await?;
blob.sync().await?;
Ok(())
}
pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
Self::init_with_checkpoint(context, config, None).await
}
pub async fn init_with_checkpoint(
context: E,
config: Config<V::Cfg>,
checkpoint: Option<Checkpoint>,
) -> Result<Self, Error> {
assert!(
config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
"table_initial_size must be a power of 2"
);
let oversized_cfg = OversizedConfig {
index_partition: config.key_partition.clone(),
value_partition: config.value_partition.clone(),
index_page_cache: config.key_page_cache.clone(),
index_write_buffer: config.key_write_buffer,
value_write_buffer: config.value_write_buffer,
compression: config.value_compression,
codec_config: config.codec_config,
};
let mut oversized: Oversized<E, Record<K>, V> =
Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
let (table, table_len) = context
.open(&config.table_partition, TABLE_BLOB_NAME)
.await?;
let (checkpoint, resizable) = match (table_len, checkpoint) {
(0, None) => {
Self::init_table(&table, config.table_initial_size).await?;
(Checkpoint::init(config.table_initial_size), 0)
}
(0, Some(checkpoint)) => {
assert_eq!(checkpoint.epoch, 0);
assert_eq!(checkpoint.section, 0);
assert_eq!(checkpoint.oversized_size, 0);
assert_eq!(checkpoint.table_size, 0);
Self::init_table(&table, config.table_initial_size).await?;
(Checkpoint::init(config.table_initial_size), 0)
}
(_, Some(checkpoint)) => {
assert!(
checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
"table_size must be a power of 2"
);
oversized
.rewind(checkpoint.section, checkpoint.oversized_size)
.await?;
oversized.sync(checkpoint.section).await?;
let expected_table_len = Self::table_offset(checkpoint.table_size);
let mut modified = if table_len != expected_table_len {
table.resize(expected_table_len).await?;
true
} else {
false
};
let (table_modified, _, _, resizable) = Self::recover_table(
&context,
&table,
checkpoint.table_size,
config.table_resize_frequency,
Some(checkpoint.epoch),
config.table_replay_buffer,
)
.await?;
if table_modified {
modified = true;
}
if modified {
table.sync().await?;
}
(checkpoint, resizable)
}
(_, None) => {
let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
let (modified, max_epoch, max_section, resizable) = Self::recover_table(
&context,
&table,
table_size,
config.table_resize_frequency,
None,
config.table_replay_buffer,
)
.await?;
if modified {
table.sync().await?;
}
let oversized_size = oversized.size(max_section).await?;
(
Checkpoint {
epoch: max_epoch,
section: max_section,
oversized_size,
table_size,
},
resizable,
)
}
};
let puts = Counter::default();
let gets = Counter::default();
let unnecessary_reads = Counter::default();
let unnecessary_writes = Counter::default();
let resizes = Counter::default();
context.register("puts", "number of put operations", puts.clone());
context.register("gets", "number of get operations", gets.clone());
context.register(
"unnecessary_reads",
"number of unnecessary reads performed during key lookups",
unnecessary_reads.clone(),
);
context.register(
"unnecessary_writes",
"number of unnecessary writes performed during resize",
unnecessary_writes.clone(),
);
context.register(
"resizes",
"number of table resizing operations",
resizes.clone(),
);
Ok(Self {
context,
table_partition: config.table_partition,
table_size: checkpoint.table_size,
table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
table_resize_frequency: config.table_resize_frequency,
table_resize_chunk_size: config.table_resize_chunk_size,
table,
oversized,
blob_target_size: config.value_target_size,
current_section: checkpoint.section,
next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
modified_sections: BTreeSet::new(),
resizable,
resize_progress: None,
puts,
gets,
unnecessary_reads,
unnecessary_writes,
resizes,
})
}
fn table_index(&self, key: &K) -> u32 {
let hash = Crc32::checksum(key.as_ref());
hash & (self.table_size - 1)
}
const fn should_resize(&self) -> bool {
self.resizable as u64 >= self.table_resize_threshold
}
async fn update_section(&mut self) -> Result<(), Error> {
let value_size = self.oversized.value_size(self.current_section).await?;
if value_size >= self.blob_target_size {
self.current_section += 1;
debug!(
size = value_size,
section = self.current_section,
"updated section"
);
}
Ok(())
}
pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
self.puts.inc();
self.update_section().await?;
let table_index = self.table_index(&key);
let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
let head = Self::read_latest_entry(&entry1, &entry2);
let key_entry = Record::new(
key,
head.map(|(section, position, _)| (section, position)),
0,
0,
);
let (position, value_offset, value_size) = self
.oversized
.append(self.current_section, key_entry, &value)
.await?;
let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
added = added.saturating_add(1);
if added == self.table_resize_frequency {
self.resizable += 1;
}
self.modified_sections.insert(self.current_section);
let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
if let Some(resize_progress) = self.resize_progress {
if table_index < resize_progress {
self.unnecessary_writes.inc();
if added == self.table_resize_frequency {
self.resizable += 1;
}
let new_table_index = self.table_size + table_index;
let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
.await?;
}
}
Ok(Cursor::new(self.current_section, value_offset, value_size))
}
async fn get_cursor(&self, cursor: Cursor) -> Result<V, Error> {
let value = self
.oversized
.get_value(cursor.section(), cursor.offset(), cursor.size())
.await?;
Ok(value)
}
async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
self.gets.inc();
let table_index = self.table_index(key);
let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
let Some((mut section, mut position, _)) = Self::read_latest_entry(&entry1, &entry2) else {
return Ok(None);
};
loop {
let key_entry = self.oversized.get(section, position).await?;
if key_entry.key.as_ref() == key.as_ref() {
let value = self
.oversized
.get_value(section, key_entry.value_offset, key_entry.value_size)
.await?;
return Ok(Some(value));
}
self.unnecessary_reads.inc();
let Some(next) = key_entry.next() else {
break; };
section = next.0;
position = next.1;
}
Ok(None)
}
pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
match identifier {
Identifier::Cursor(cursor) => self.get_cursor(cursor).await.map(Some),
Identifier::Key(key) => self.get_key(key).await,
}
}
async fn start_resize(&mut self) -> Result<(), Error> {
self.resizes.inc();
let old_size = self.table_size;
let Some(new_size) = old_size.checked_mul(2) else {
return Ok(());
};
self.table.resize(Self::table_offset(new_size)).await?;
self.resize_progress = Some(0);
debug!(old = old_size, new = new_size, "table resize started");
Ok(())
}
fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
buf.extend_from_slice(&new_entry.encode());
buf.extend_from_slice(&entry2.encode());
} else {
buf.extend_from_slice(&entry1.encode());
buf.extend_from_slice(&new_entry.encode());
}
}
async fn advance_resize(&mut self) -> Result<(), Error> {
let current_index = self.resize_progress.unwrap();
let old_size = self.table_size;
let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
let chunk_size = chunk_end - current_index;
let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
let read_offset = Self::table_offset(current_index);
let mut read_buf = self.table.read_at(read_offset, chunk_bytes).await?;
let mut writes = Vec::with_capacity(chunk_bytes);
for _ in 0..chunk_size {
let (entry1, entry2) = Self::parse_entries(&mut read_buf)?;
let head = Self::read_latest_entry(&entry1, &entry2);
let reset_entry = match head {
Some((section, position, added)) => {
if added >= self.table_resize_frequency {
self.resizable -= 1;
}
Entry::new(self.next_epoch, section, position, 0)
}
None => Entry::new_empty(),
};
Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
}
let writes = IoBuf::from(writes);
let old_write = self.table.write_at(read_offset, writes.clone());
let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
let new_write = self.table.write_at(new_offset, writes);
try_join(old_write, new_write).await?;
if chunk_end >= old_size {
self.table_size = old_size * 2;
self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
self.resize_progress = None;
debug!(
old = old_size,
new = self.table_size,
"table resize completed"
);
} else {
self.resize_progress = Some(chunk_end);
debug!(current = current_index, chunk_end, "table resize progress");
}
Ok(())
}
pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
let syncs: Vec<_> = self
.modified_sections
.iter()
.map(|section| self.oversized.sync(*section))
.collect();
try_join_all(syncs).await?;
self.modified_sections.clear();
if self.should_resize() && self.resize_progress.is_none() {
self.start_resize().await?;
}
if self.resize_progress.is_some() {
self.advance_resize().await?;
}
self.table.sync().await?;
let stored_epoch = self.next_epoch;
self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
let oversized_size = self.oversized.size(self.current_section).await?;
Ok(Checkpoint {
epoch: stored_epoch,
section: self.current_section,
oversized_size,
table_size: self.table_size,
})
}
pub async fn close(mut self) -> Result<Checkpoint, Error> {
while self.resize_progress.is_some() {
self.advance_resize().await?;
}
let checkpoint = self.sync().await?;
Ok(checkpoint)
}
pub async fn destroy(self) -> Result<(), Error> {
self.oversized.destroy().await?;
drop(self.table);
self.context
.remove(&self.table_partition, Some(TABLE_BLOB_NAME))
.await?;
self.context.remove(&self.table_partition, None).await?;
Ok(())
}
#[cfg(test)]
pub const fn resizing(&self) -> Option<u32> {
self.resize_progress
}
#[cfg(test)]
pub const fn resizable(&self) -> u32 {
self.resizable
}
}
#[cfg(all(test, feature = "arbitrary"))]
mod conformance {
use super::*;
use commonware_codec::conformance::CodecConformance;
use commonware_utils::sequence::U64;
commonware_conformance::conformance_tests! {
CodecConformance<Cursor>,
CodecConformance<Checkpoint>,
CodecConformance<Entry>,
CodecConformance<Record<U64>>
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_codec::DecodeExt;
use commonware_macros::test_traced;
use commonware_runtime::{
buffer::paged::CacheRef, deterministic, deterministic::Context, Metrics, Runner, Storage,
};
use commonware_utils::{
sequence::{FixedBytes, U64},
NZUsize, NZU16,
};
fn test_key(key: &str) -> FixedBytes<64> {
let mut buf = [0u8; 64];
let key = key.as_bytes();
assert!(key.len() <= buf.len());
buf[..key.len()].copy_from_slice(key);
FixedBytes::decode(buf.as_ref()).unwrap()
}
type TestFreezer = Freezer<Context, U64, u64>;
fn is_send<T: Send>(_: T) {}
#[allow(dead_code)]
fn assert_freezer_futures_are_send(freezer: &mut TestFreezer, key: U64) {
is_send(freezer.get(Identifier::Key(&key)));
is_send(freezer.put(key, 0u64));
}
#[allow(dead_code)]
fn assert_freezer_destroy_is_send(freezer: TestFreezer) {
is_send(freezer.destroy());
}
#[test_traced]
fn issue_2966_regression() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = super::super::Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(1024),
key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(1024),
value_target_size: 10 * 1024 * 1024,
table_partition: "test-table".into(),
table_initial_size: 4,
table_resize_frequency: 1,
table_resize_chunk_size: 4,
table_replay_buffer: NZUsize!(64 * 1024),
codec_config: (),
};
let mut freezer =
Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
.await
.unwrap();
freezer.put(test_key("key0"), 0).await.unwrap();
freezer.put(test_key("key2"), 1).await.unwrap();
freezer.close().await.unwrap();
let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
let table_data = blob.read_at(0, size as usize).await.unwrap().coalesce();
let num_entries = size as usize / Entry::FULL_SIZE;
assert_eq!(num_entries, 8);
let mut both_empty_count = 0;
for entry_idx in 0..num_entries {
let offset = entry_idx * Entry::FULL_SIZE;
let buf = &table_data.as_ref()[offset..offset + Entry::FULL_SIZE];
let (slot0, slot1) =
Freezer::<Context, FixedBytes<64>, i32>::parse_entries(buf).unwrap();
if slot0.is_empty() && slot1.is_empty() {
both_empty_count += 1;
}
}
assert_eq!(both_empty_count, 4);
});
}
#[test_traced]
fn issue_2955_regression() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = super::super::Config {
key_partition: "test-key-index".into(),
key_write_buffer: NZUsize!(1024),
key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
value_partition: "test-value-journal".into(),
value_compression: None,
value_write_buffer: NZUsize!(1024),
value_target_size: 10 * 1024 * 1024,
table_partition: "test-table".into(),
table_initial_size: 4,
table_resize_frequency: 1,
table_resize_chunk_size: 4,
table_replay_buffer: NZUsize!(64 * 1024),
codec_config: (),
};
let checkpoint = {
let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
context.with_label("first"),
cfg.clone(),
)
.await
.unwrap();
freezer.put(test_key("key0"), 42).await.unwrap();
freezer.sync().await.unwrap();
freezer.close().await.unwrap()
};
{
let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
let entry_data = blob.read_at(0, Entry::FULL_SIZE).await.unwrap();
let mut corrupted = entry_data.coalesce();
corrupted.as_mut()[Entry::SIZE - 4] ^= 0xFF;
corrupted.as_mut()[Entry::FULL_SIZE - 4] ^= 0xFF;
blob.write_at(0, corrupted).await.unwrap();
blob.sync().await.unwrap();
}
let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
context.with_label("second"),
cfg.clone(),
Some(checkpoint),
)
.await
.unwrap();
drop(freezer);
});
}
}