use crate::{
journal::{
contiguous::{
fixed::{Config as JConfig, Journal},
Many, Reader,
},
Error as JError,
},
merkle::{
batch,
hasher::Hasher,
mem::{Config as MemConfig, Mem},
Error, Family, Location, Position, Proof, Readable,
},
metadata::{Config as MConfig, Metadata},
};
use commonware_codec::DecodeExt;
use commonware_cryptography::Digest;
use commonware_parallel::ThreadPool;
use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
use commonware_utils::{
sequence::prefixed_u64::U64,
sync::{AsyncMutex, RwLock},
};
use std::{
collections::BTreeMap,
num::{NonZeroU64, NonZeroUsize},
sync::Arc,
};
use tracing::{debug, error, warn};
pub struct UnmerkleizedBatch<F: Family, D: Digest> {
inner: batch::UnmerkleizedBatch<F, D>,
}
impl<F: Family, D: Digest> UnmerkleizedBatch<F, D> {
pub fn add(self, hasher: &impl Hasher<F, Digest = D>, element: &[u8]) -> Self {
Self {
inner: self.inner.add(hasher, element),
}
}
pub fn add_leaf_digest(self, digest: D) -> Self {
Self {
inner: self.inner.add_leaf_digest(digest),
}
}
pub fn leaves(&self) -> Location<F> {
self.inner.leaves()
}
#[cfg(feature = "std")]
pub fn with_pool(self, pool: Option<ThreadPool>) -> Self {
Self {
inner: self.inner.with_pool(pool),
}
}
pub fn merkleize(
self,
base: &Mem<F, D>,
hasher: &impl Hasher<F, Digest = D>,
) -> Arc<batch::MerkleizedBatch<F, D>> {
self.inner.merkleize(base, hasher)
}
}
pub(crate) struct Inner<F: Family, D: Digest> {
pub(crate) mem: Mem<F, D>,
pub(crate) pruned_to_pos: Position<F>,
}
#[derive(Clone)]
pub struct Config {
pub journal_partition: String,
pub metadata_partition: String,
pub items_per_blob: NonZeroU64,
pub write_buffer: NonZeroUsize,
pub thread_pool: Option<ThreadPool>,
pub page_cache: CacheRef,
}
pub struct SyncConfig<F: Family, D: Digest> {
pub config: Config,
pub range: std::ops::Range<Location<F>>,
pub pinned_nodes: Option<Vec<D>>,
}
pub struct Journaled<F: Family, E: RStorage + Clock + Metrics, D: Digest> {
pub(crate) inner: RwLock<Inner<F, D>>,
pub(crate) journal: Journal<E, D>,
pub(crate) metadata: Metadata<E, U64, Vec<u8>>,
pub(crate) sync_lock: AsyncMutex<()>,
pub(crate) pool: Option<ThreadPool>,
}
const NODE_PREFIX: u8 = 0;
pub(crate) const PRUNED_TO_PREFIX: u8 = 1;
impl<F: Family, E: RStorage + Clock + Metrics, D: Digest> Journaled<F, E, D> {
pub fn size(&self) -> Position<F> {
self.inner.read().mem.size()
}
pub fn leaves(&self) -> Location<F> {
self.inner.read().mem.leaves()
}
async fn get_from_metadata_or_journal(
metadata: &Metadata<E, U64, Vec<u8>>,
journal: &Journal<E, D>,
pos: Position<F>,
) -> Result<D, Error<F>> {
if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, *pos)) {
debug!(?pos, "read node from metadata");
let digest = D::decode(bytes.as_ref());
let Ok(digest) = digest else {
error!(
?pos,
err = %digest.expect_err("digest is Err in else branch"),
"could not convert node from metadata bytes to digest"
);
return Err(Error::DataCorrupted(
"could not read digest at requested pos",
));
};
return Ok(digest);
}
debug!(?pos, "reading node from journal");
let node = journal.reader().await.read(*pos).await;
match node {
Ok(node) => Ok(node),
Err(JError::ItemPruned(_)) => {
error!(?pos, "node is missing from metadata and journal");
Err(Error::MissingNode(pos))
}
Err(e) => Err(Error::Journal(e)),
}
}
pub fn bounds(&self) -> std::ops::Range<Location<F>> {
let inner = self.inner.read();
Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos")..inner.mem.leaves()
}
async fn add_extra_pinned_nodes(
mem: &mut Mem<F, D>,
metadata: &Metadata<E, U64, Vec<u8>>,
journal: &Journal<E, D>,
prune_pos: Position<F>,
) -> Result<(), Error<F>> {
let prune_loc = Location::try_from(prune_pos).expect("valid prune_pos");
let mut pinned_nodes = BTreeMap::new();
for pos in F::nodes_to_pin(prune_loc) {
let digest = Self::get_from_metadata_or_journal(metadata, journal, pos).await?;
pinned_nodes.insert(pos, digest);
}
mem.add_pinned_nodes(pinned_nodes);
Ok(())
}
pub async fn init(
context: E,
hasher: &impl Hasher<F, Digest = D>,
cfg: Config,
) -> Result<Self, Error<F>> {
let journal_cfg = JConfig {
partition: cfg.journal_partition,
items_per_blob: cfg.items_per_blob,
page_cache: cfg.page_cache,
write_buffer: cfg.write_buffer,
};
let journal =
Journal::<E, D>::init(context.with_label("merkle_journal"), journal_cfg).await?;
let mut journal_size = Position::<F>::new(journal.size().await);
let metadata_cfg = MConfig {
partition: cfg.metadata_partition,
codec_config: ((0..).into(), ()),
};
let metadata =
Metadata::<_, U64, Vec<u8>>::init(context.with_label("merkle_metadata"), metadata_cfg)
.await?;
if journal_size == 0 {
let mem = Mem::init(
MemConfig {
nodes: vec![],
pruning_boundary: Location::new(0),
pinned_nodes: vec![],
},
hasher,
)?;
return Ok(Self {
inner: RwLock::new(Inner {
mem,
pruned_to_pos: Position::new(0),
}),
journal,
metadata,
sync_lock: AsyncMutex::new(()),
pool: cfg.thread_pool,
});
}
let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
let metadata_pruned_to = Location::<F>::new(metadata.get(&key).map_or(0, |bytes| {
u64::from_be_bytes(
bytes
.as_slice()
.try_into()
.expect("metadata pruned_to is not 8 bytes"),
)
}));
let metadata_prune_pos = Position::try_from(metadata_pruned_to)?;
let journal_bounds_start = journal.reader().await.bounds().start;
if *metadata_prune_pos > journal_bounds_start {
journal.prune(*metadata_prune_pos).await?;
if journal.reader().await.bounds().start != journal_bounds_start {
warn!(
journal_bounds_start,
?metadata_prune_pos,
"journal pruned to match metadata"
);
}
} else if *metadata_prune_pos < journal_bounds_start {
warn!(
?metadata_prune_pos,
journal_bounds_start, "metadata stale, using journal pruning boundary"
);
}
let journal_boundary_pos = Position::<F>::new(journal_bounds_start);
let journal_boundary_floor = F::to_nearest_size(journal_boundary_pos);
let journal_boundary_leaf_aligned_pos = if journal_boundary_floor == journal_boundary_pos {
journal_boundary_floor
} else {
Position::try_from(Location::try_from(journal_boundary_floor)? + 1)?
};
let effective_prune_pos =
std::cmp::max(metadata_prune_pos, journal_boundary_leaf_aligned_pos);
let last_valid_size = F::to_nearest_size(journal_size);
let mut orphaned_leaf: Option<D> = None;
if last_valid_size != journal_size {
warn!(
?last_valid_size,
"encountered invalid structure, recovering from last valid size"
);
let recovered_item = journal.reader().await.read(*last_valid_size).await;
if let Ok(item) = recovered_item {
orphaned_leaf = Some(item);
}
journal.rewind(*last_valid_size).await?;
journal.sync().await?;
journal_size = last_valid_size
}
let journal_leaves = Location::try_from(journal_size)?;
let mut pinned_nodes = Vec::new();
for pos in F::nodes_to_pin(journal_leaves) {
let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
pinned_nodes.push(digest);
}
let mut mem = Mem::init(
MemConfig {
nodes: vec![],
pruning_boundary: journal_leaves,
pinned_nodes,
},
hasher,
)?;
Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, effective_prune_pos).await?;
if let Some(leaf) = orphaned_leaf {
let pos = mem.size();
warn!(?pos, "recovering orphaned leaf");
let batch = mem
.new_batch()
.add_leaf_digest(leaf)
.merkleize(&mem, hasher);
mem.apply_batch(&batch)?;
assert_eq!(pos, journal_size);
for p in journal.size().await..*mem.size() {
let p = Position::new(p);
let node = *mem.get_node_unchecked(p);
journal.append(&node).await?;
}
journal.sync().await?;
assert_eq!(mem.size(), journal.size().await);
let effective_prune_loc =
Location::try_from(effective_prune_pos).expect("valid effective_prune_pos");
let mut pn = BTreeMap::new();
for p in F::nodes_to_pin(effective_prune_loc) {
let d = mem.get_node_unchecked(p);
pn.insert(p, *d);
}
mem.prune_all();
mem.add_pinned_nodes(pn);
}
Ok(Self {
inner: RwLock::new(Inner {
mem,
pruned_to_pos: effective_prune_pos,
}),
journal,
metadata,
sync_lock: AsyncMutex::new(()),
pool: cfg.thread_pool,
})
}
pub async fn init_sync(
context: E,
cfg: SyncConfig<F, D>,
hasher: &impl Hasher<F, Digest = D>,
) -> Result<Self, Error<F>> {
let prune_pos = Position::try_from(cfg.range.start)?;
let end_pos = Position::try_from(cfg.range.end)?;
let journal_cfg = JConfig {
partition: cfg.config.journal_partition.clone(),
items_per_blob: cfg.config.items_per_blob,
write_buffer: cfg.config.write_buffer,
page_cache: cfg.config.page_cache.clone(),
};
let journal: Journal<E, D> =
Journal::init(context.with_label("merkle_journal"), journal_cfg).await?;
let mut journal_size = Position::<F>::new(journal.size().await);
let last_valid_size = F::to_nearest_size(journal_size);
if last_valid_size != journal_size {
warn!(
?last_valid_size,
"init_sync: encountered invalid structure, recovering from last valid size"
);
journal.rewind(*last_valid_size).await?;
journal.sync().await?;
journal_size = last_valid_size;
}
assert!(!cfg.range.is_empty(), "range must not be empty");
if journal_size > *end_pos {
return Err(crate::journal::Error::ItemOutOfRange(*journal_size).into());
}
if journal_size <= *prune_pos && *prune_pos != 0 {
journal.clear_to_size(*prune_pos).await?;
journal_size = Position::new(journal.size().await);
}
let metadata_cfg = MConfig {
partition: cfg.config.metadata_partition,
codec_config: ((0..).into(), ()),
};
let mut metadata =
Metadata::init(context.with_label("merkle_metadata"), metadata_cfg).await?;
let pruning_boundary_key = U64::new(PRUNED_TO_PREFIX, 0);
metadata.put(
pruning_boundary_key,
cfg.range.start.as_u64().to_be_bytes().into(),
);
let prune_loc = Location::try_from(prune_pos)?;
let journal_leaves = Location::try_from(journal_size)?;
if let Some(pinned_nodes) = cfg.pinned_nodes {
let nodes_to_pin_persisted: Vec<_> = F::nodes_to_pin(prune_loc).collect();
if pinned_nodes.len() != nodes_to_pin_persisted.len() {
return Err(Error::<F>::InvalidPinnedNodes);
}
for (pos, digest) in nodes_to_pin_persisted.into_iter().zip(pinned_nodes.iter()) {
metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
}
}
let nodes_to_pin_mem = F::nodes_to_pin(journal_leaves);
let mut mem_pinned_nodes = Vec::new();
for pos in nodes_to_pin_mem {
let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
mem_pinned_nodes.push(digest);
}
let mut mem = Mem::init(
MemConfig {
nodes: vec![],
pruning_boundary: Location::try_from(journal_size)?,
pinned_nodes: mem_pinned_nodes,
},
hasher,
)?;
if prune_pos < journal_size {
Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, prune_pos).await?;
}
metadata.sync().await?;
journal.prune(*prune_pos).await?;
Ok(Self {
inner: RwLock::new(Inner {
mem,
pruned_to_pos: prune_pos,
}),
journal,
metadata,
sync_lock: AsyncMutex::new(()),
pool: cfg.config.thread_pool,
})
}
async fn update_metadata(
&mut self,
prune_to_pos: Position<F>,
) -> Result<BTreeMap<Position<F>, D>, Error<F>> {
assert!(prune_to_pos >= self.inner.get_mut().pruned_to_pos);
let prune_loc = Location::try_from(prune_to_pos).expect("valid prune_to_pos");
let mut pinned_nodes = BTreeMap::new();
for pos in F::nodes_to_pin(prune_loc) {
let digest = self.get_node(pos).await?.expect(
"pinned node should exist if prune_to_pos is no less than self.pruned_to_pos",
);
self.metadata
.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
pinned_nodes.insert(pos, digest);
}
let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
self.metadata.put(
key,
Location::try_from(prune_to_pos)?
.as_u64()
.to_be_bytes()
.into(),
);
self.metadata.sync().await.map_err(Error::Metadata)?;
Ok(pinned_nodes)
}
pub async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
{
let inner = self.inner.read();
if let Some(node) = inner.mem.get_node(position) {
return Ok(Some(node));
}
}
match self.journal.reader().await.read(*position).await {
Ok(item) => Ok(Some(item)),
Err(JError::ItemPruned(_)) => Ok(None),
Err(e) => Err(Error::Journal(e)),
}
}
pub async fn sync(&self) -> Result<(), Error<F>> {
let _sync_guard = self.sync_lock.lock().await;
let journal_size = Position::<F>::new(self.journal.size().await);
let (sync_target_leaves, missing_nodes, pinned_nodes) = {
let inner = self.inner.read();
let size = inner.mem.size();
let sync_target_leaves = inner.mem.leaves();
assert!(
journal_size <= size,
"journal size should never exceed in-memory structure size"
);
if journal_size == size {
return Ok(());
}
let mut missing_nodes = Vec::with_capacity((*size - *journal_size) as usize);
for pos in *journal_size..*size {
let node = *inner.mem.get_node_unchecked(Position::new(pos));
missing_nodes.push(node);
}
let prune_loc = Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos");
let mut pinned_nodes = BTreeMap::new();
for pos in F::nodes_to_pin(prune_loc) {
let digest = inner.mem.get_node_unchecked(pos);
pinned_nodes.insert(pos, *digest);
}
(sync_target_leaves, missing_nodes, pinned_nodes)
};
self.journal.append_many(Many::Flat(&missing_nodes)).await?;
self.journal.sync().await?;
{
let mut inner = self.inner.write();
inner
.mem
.prune(sync_target_leaves)
.expect("captured leaves is in bounds");
inner.mem.add_pinned_nodes(pinned_nodes);
}
Ok(())
}
pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
let pos = Position::try_from(loc)?;
{
let inner = self.inner.get_mut();
if loc > inner.mem.leaves() {
return Err(Error::LeafOutOfBounds(loc));
}
if pos <= inner.pruned_to_pos {
return Ok(());
}
}
self.sync().await?;
let pinned_nodes = self.update_metadata(pos).await?;
self.journal.prune(*pos).await?;
let inner = self.inner.get_mut();
inner.mem.add_pinned_nodes(pinned_nodes);
inner.pruned_to_pos = pos;
Ok(())
}
pub fn root(&self) -> D {
*self.inner.read().mem.root()
}
pub async fn prune_all(&mut self) -> Result<(), Error<F>> {
let leaves = self.inner.get_mut().mem.leaves();
if leaves != 0 {
self.prune(leaves).await?;
}
Ok(())
}
pub async fn destroy(self) -> Result<(), Error<F>> {
self.journal.destroy().await?;
self.metadata.destroy().await?;
Ok(())
}
#[cfg(any(test, feature = "fuzzing"))]
pub async fn simulate_partial_sync(&mut self, write_limit: usize) -> Result<(), Error<F>> {
if write_limit == 0 {
return Ok(());
}
let inner = self.inner.get_mut();
let journal_size = Position::<F>::new(self.journal.size().await);
let mut written_count = 0usize;
for i in *journal_size..*inner.mem.size() {
let node = *inner.mem.get_node_unchecked(Position::new(i));
self.journal.append(&node).await?;
written_count += 1;
if written_count >= write_limit {
break;
}
}
self.journal.sync().await?;
Ok(())
}
#[cfg(test)]
pub fn get_pinned_nodes(&self) -> BTreeMap<Position<F>, D> {
self.inner.read().mem.pinned_nodes()
}
#[cfg(test)]
pub async fn simulate_pruning_failure(mut self, prune_to: Location<F>) -> Result<(), Error<F>> {
let prune_to_pos = Position::try_from(prune_to)?;
assert!(prune_to_pos <= self.inner.get_mut().mem.size());
self.sync().await?;
self.update_metadata(prune_to_pos).await?;
Ok(())
}
pub fn apply_batch(&mut self, batch: &batch::MerkleizedBatch<F, D>) -> Result<(), Error<F>> {
self.inner.get_mut().mem.apply_batch(batch)?;
Ok(())
}
pub(crate) fn to_batch(&self) -> Arc<batch::MerkleizedBatch<F, D>> {
let inner = self.inner.read();
let mut batch = batch::MerkleizedBatch::from_mem(&inner.mem);
#[cfg(feature = "std")]
if let Some(pool) = &self.pool {
Arc::get_mut(&mut batch).expect("just created").pool = Some(pool.clone());
}
batch
}
pub fn with_mem<R>(&self, f: impl FnOnce(&Mem<F, D>) -> R) -> R {
let inner = self.inner.read();
f(&inner.mem)
}
pub fn new_batch(&self) -> UnmerkleizedBatch<F, D> {
let inner = self.inner.read();
let root = batch::MerkleizedBatch::from_mem(&inner.mem);
drop(inner);
UnmerkleizedBatch {
inner: root.new_batch(),
}
.with_pool(self.pool())
}
pub fn pool(&self) -> Option<ThreadPool> {
self.pool.clone()
}
pub(crate) async fn rewind(
&mut self,
leaves_to_remove: usize,
hasher: &impl Hasher<F, Digest = D>,
) -> Result<(), Error<F>> {
if leaves_to_remove == 0 {
return Ok(());
}
let current_leaves = *self.leaves();
let destination_leaf = match current_leaves.checked_sub(leaves_to_remove as u64) {
Some(dest) => dest,
None => {
let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
return Err(if pruned_to_pos == 0 {
Error::Empty
} else {
Error::ElementPruned(pruned_to_pos - 1)
});
}
};
let destination_loc = Location::new(destination_leaf);
let new_size = Position::try_from(destination_loc).expect("valid leaf");
let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
if new_size < pruned_to_pos {
return Err(Error::ElementPruned(new_size));
}
let journal_size = Position::<F>::new(self.journal.size().await);
if new_size < journal_size {
self.journal.rewind(*new_size).await?;
self.journal.sync().await?;
}
let inner = self.inner.get_mut();
if new_size >= Position::try_from(inner.mem.bounds().start).expect("valid mem bounds start")
{
inner.mem.truncate(new_size, hasher);
} else {
let mut pinned_nodes = Vec::new();
for pos in F::nodes_to_pin(destination_loc) {
pinned_nodes.push(
Self::get_from_metadata_or_journal(&self.metadata, &self.journal, pos).await?,
);
}
inner.mem = Mem::from_components(hasher, vec![], destination_loc, pinned_nodes)?;
Self::add_extra_pinned_nodes(
&mut inner.mem,
&self.metadata,
&self.journal,
inner.pruned_to_pos,
)
.await?;
}
Ok(())
}
}
impl<F: Family, E: RStorage + Clock + Metrics, D: Digest> Readable for Journaled<F, E, D> {
type Family = F;
type Digest = D;
type Error = Error<F>;
fn size(&self) -> Position<F> {
self.size()
}
fn get_node(&self, pos: Position<F>) -> Option<D> {
self.inner.read().mem.get_node(pos)
}
fn root(&self) -> D {
*self.inner.read().mem.root()
}
fn pruning_boundary(&self) -> Location<F> {
self.inner.read().mem.pruning_boundary()
}
fn proof(
&self,
hasher: &impl Hasher<F, Digest = D>,
loc: Location<F>,
) -> Result<Proof<F, D>, Error<F>> {
if !loc.is_valid_index() {
return Err(Error::LocationOverflow(loc));
}
crate::merkle::proof::build_range_proof(
hasher,
self.leaves(),
loc..loc + 1,
|pos| <Self as Readable>::get_node(self, pos),
Error::ElementPruned,
)
.map_err(|e| match e {
Error::RangeOutOfBounds(_) => Error::LeafOutOfBounds(loc),
_ => e,
})
}
fn range_proof(
&self,
hasher: &impl Hasher<F, Digest = D>,
range: core::ops::Range<Location<F>>,
) -> Result<Proof<F, D>, Error<F>> {
crate::merkle::proof::build_range_proof(
hasher,
self.leaves(),
range,
|pos| <Self as Readable>::get_node(self, pos),
Error::ElementPruned,
)
}
}
impl<F: Family, E: RStorage + Clock + Metrics + Sync, D: Digest> crate::merkle::storage::Storage<F>
for Journaled<F, E, D>
{
type Digest = D;
async fn size(&self) -> Position<F> {
self.size()
}
async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
Self::get_node(self, position).await
}
}
impl<F: Family, E: RStorage + Clock + Metrics, D: Digest> Journaled<F, E, D> {
pub async fn historical_proof(
&self,
hasher: &impl Hasher<F, Digest = D>,
leaves: Location<F>,
loc: Location<F>,
) -> Result<Proof<F, D>, Error<F>> {
if !loc.is_valid_index() {
return Err(Error::LocationOverflow(loc));
}
self.historical_range_proof(hasher, leaves, loc..loc + 1)
.await
}
pub async fn historical_range_proof(
&self,
hasher: &impl Hasher<F, Digest = D>,
leaves: Location<F>,
range: core::ops::Range<Location<F>>,
) -> Result<Proof<F, D>, Error<F>> {
if leaves > self.leaves() {
return Err(Error::RangeOutOfBounds(leaves));
}
crate::merkle::verification::historical_range_proof(hasher, self, leaves, range).await
}
pub async fn proof(
&self,
hasher: &impl Hasher<F, Digest = D>,
loc: Location<F>,
) -> Result<Proof<F, D>, Error<F>> {
if !loc.is_valid_index() {
return Err(Error::LocationOverflow(loc));
}
self.range_proof(hasher, loc..loc + 1).await
}
pub async fn range_proof(
&self,
hasher: &impl Hasher<F, Digest = D>,
range: core::ops::Range<Location<F>>,
) -> Result<Proof<F, D>, Error<F>> {
self.historical_range_proof(hasher, self.leaves(), range)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
journal::contiguous::fixed::{Config as JConfig, Journal},
merkle::{hasher::Standard, mmb, mmr, Location, LocationRangeExt as _, Position, Proof},
metadata::{Config as MConfig, Metadata},
};
use commonware_cryptography::{
sha256::{self, Digest},
Hasher as _, Sha256,
};
use commonware_macros::test_traced;
use commonware_runtime::{buffer::paged::CacheRef, deterministic, BufferPooler, Runner};
use commonware_utils::{sequence::prefixed_u64::U64, NZUsize, NZU16, NZU64};
use std::{
collections::BTreeMap,
num::{NonZeroU16, NonZeroUsize},
};
fn test_digest(v: usize) -> Digest {
Sha256::hash(&v.to_be_bytes())
}
const PAGE_SIZE: NonZeroU16 = NZU16!(111);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(5);
fn test_config(pooler: &impl BufferPooler) -> Config {
Config {
journal_partition: "journal-partition".into(),
metadata_partition: "metadata-partition".into(),
items_per_blob: NZU64!(7),
write_buffer: NZUsize!(1024),
thread_pool: None,
page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
}
}
async fn journaled_empty_inner<F: Family>(context: deterministic::Context) {
let hasher: Standard<Sha256> = Standard::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("first"),
&hasher,
test_config(&context),
)
.await
.unwrap();
assert_eq!(mmr.size(), 0);
assert!(mmr.get_node(Position::<F>::new(0)).await.is_err());
let bounds = mmr.bounds();
assert!(bounds.is_empty());
assert!(mmr.prune_all().await.is_ok());
assert_eq!(bounds.start, 0);
assert!(mmr.prune(Location::<F>::new(0)).await.is_ok());
assert!(mmr.sync().await.is_ok());
assert!(matches!(mmr.rewind(1, &hasher).await, Err(Error::Empty)));
let batch = mmr.new_batch().add(&hasher, &test_digest(0));
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
assert_eq!(mmr.size(), 1);
mmr.sync().await.unwrap();
assert!(mmr.get_node(Position::<F>::new(0)).await.is_ok());
assert!(mmr.rewind(1, &hasher).await.is_ok());
assert_eq!(mmr.size(), 0);
mmr.sync().await.unwrap();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("second"),
&hasher,
test_config(&context),
)
.await
.unwrap();
assert_eq!(mmr.size(), 0);
let empty_proof = Proof::<F, Digest>::default();
let hasher: Standard<Sha256> = Standard::new();
let root = mmr.root();
assert!(empty_proof.verify_range_inclusion(
&hasher,
&[] as &[Digest],
Location::<F>::new(0),
&root
));
assert!(empty_proof.verify_multi_inclusion(
&hasher,
&[] as &[(Digest, Location<F>)],
&root
));
let batch = mmr.new_batch().add(&hasher, &test_digest(0));
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let root = mmr.root();
assert!(!empty_proof.verify_range_inclusion(
&hasher,
&[] as &[Digest],
Location::<F>::new(0),
&root
));
assert!(!empty_proof.verify_multi_inclusion(
&hasher,
&[] as &[(Digest, Location<F>)],
&root
));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_empty_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_empty_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_empty_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_empty_inner::<mmb::Family>);
}
async fn journaled_prune_out_of_bounds_returns_error_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("oob_prune"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let batch = mmr.new_batch().add(&hasher, &test_digest(0));
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
assert!(matches!(
mmr.prune(Location::<F>::new(2)).await,
Err(Error::LeafOutOfBounds(loc)) if loc == Location::<F>::new(2)
));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_prune_out_of_bounds_returns_error_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_prune_out_of_bounds_returns_error_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_prune_out_of_bounds_returns_error_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_prune_out_of_bounds_returns_error_inner::<mmb::Family>);
}
async fn journaled_rewind_error_leaves_valid_state_inner<F: Family>(
context: deterministic::Context,
) {
let hasher: Standard<Sha256> = Standard::new();
let element_pruned_context = context.with_label("element_pruned_case");
let mut mmr = Journaled::<F, _, Digest>::init(
element_pruned_context.clone(),
&hasher,
test_config(&element_pruned_context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0u64..32 {
batch = batch.add(&hasher, &i.to_be_bytes());
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
mmr.prune(Location::<F>::new(8)).await.unwrap();
let leaves_before = mmr.leaves();
assert!(matches!(
mmr.rewind(128, &hasher).await,
Err(Error::ElementPruned(_))
));
assert!(mmr.leaves() <= leaves_before);
mmr.destroy().await.unwrap();
let empty_context = context.with_label("empty_case");
let cfg = test_config(&empty_context);
let mut mmr = Journaled::<F, _, Digest>::init(empty_context, &hasher, cfg)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0u64..8 {
batch = batch.add(&hasher, &i.to_be_bytes());
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let leaves_before = mmr.leaves();
assert!(matches!(mmr.rewind(9, &hasher).await, Err(Error::Empty)));
assert_eq!(mmr.leaves(), leaves_before);
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_rewind_error_leaves_valid_state_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_rewind_error_leaves_valid_state_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_rewind_error_leaves_valid_state_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_rewind_error_leaves_valid_state_inner::<mmb::Family>);
}
async fn journaled_basic_inner<F: Family>(context: deterministic::Context) {
let hasher: Standard<Sha256> = Standard::new();
let cfg = test_config(&context);
let mut mmr = Journaled::<F, _, Digest>::init(context, &hasher, cfg)
.await
.unwrap();
const LEAF_COUNT: usize = 255;
let mut leaves = Vec::with_capacity(LEAF_COUNT);
for i in 0..LEAF_COUNT {
leaves.push(test_digest(i));
}
let mut batch = mmr.new_batch();
for leaf in &leaves {
batch = batch.add(&hasher, leaf);
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
assert_eq!(mmr.size(), expected_size);
const TEST_ELEMENT: usize = 133;
let test_element_loc: Location<F> = Location::new(TEST_ELEMENT as u64);
let proof = mmr.proof(&hasher, test_element_loc).await.unwrap();
let root = mmr.root();
assert!(proof.verify_element_inclusion(
&hasher,
&leaves[TEST_ELEMENT],
test_element_loc,
&root,
));
mmr.sync().await.unwrap();
let proof2 = mmr.proof(&hasher, test_element_loc).await.unwrap();
assert_eq!(proof, proof2);
let range = Location::<F>::new(TEST_ELEMENT as u64)..Location::<F>::new(LEAF_COUNT as u64);
let proof = mmr.range_proof(&hasher, range.clone()).await.unwrap();
assert!(proof.verify_range_inclusion(
&hasher,
&leaves[range.to_usize_range()],
test_element_loc,
&root
));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_basic_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_basic_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_basic_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_basic_inner::<mmb::Family>);
}
async fn journaled_recovery_inner<F: Family>(context: deterministic::Context) {
use crate::journal::contiguous::fixed::{Config as JConfig, Journal};
let hasher: Standard<Sha256> = Standard::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("first"),
&hasher,
test_config(&context),
)
.await
.unwrap();
assert_eq!(mmr.size(), 0);
const LEAF_COUNT: usize = 252;
let mut leaves = Vec::with_capacity(LEAF_COUNT);
for i in 0..LEAF_COUNT {
leaves.push(test_digest(i));
}
let mut batch = mmr.new_batch();
for leaf in &leaves {
batch = batch.add(&hasher, leaf);
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
assert_eq!(mmr.size(), expected_size);
mmr.sync().await.unwrap();
drop(mmr);
{
let journal: Journal<_, Digest> = Journal::init(
context.with_label("corrupt"),
JConfig {
partition: "journal-partition".into(),
items_per_blob: NZU64!(7),
write_buffer: NZUsize!(1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
},
)
.await
.unwrap();
assert_eq!(journal.size().await, expected_size);
journal.append(&Sha256::hash(b"orphan")).await.unwrap();
journal.sync().await.unwrap();
assert_eq!(journal.size().await, expected_size + 1);
}
let mmr = Journaled::<F, _, Digest>::init(
context.with_label("second"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let recovered_size =
Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64 + 1)).unwrap();
assert_eq!(mmr.size(), recovered_size);
drop(mmr);
let mmr = Journaled::<F, _, Digest>::init(
context.with_label("third"),
&hasher,
test_config(&context),
)
.await
.unwrap();
assert_eq!(mmr.size(), recovered_size);
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_recovery_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_recovery_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_recovery_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_recovery_inner::<mmb::Family>);
}
async fn journaled_pruning_inner<F: Family>(context: deterministic::Context) {
let hasher: Standard<Sha256> = Standard::new();
const LEAF_COUNT: usize = 2000;
let cfg_pruned = test_config(&context);
let mut pruned_mmr = Journaled::<F, _, Digest>::init(
context.with_label("pruned"),
&hasher,
cfg_pruned.clone(),
)
.await
.unwrap();
let cfg_unpruned = Config {
journal_partition: "unpruned-journal-partition".into(),
metadata_partition: "unpruned-metadata-partition".into(),
items_per_blob: NZU64!(7),
write_buffer: NZUsize!(1024),
thread_pool: None,
page_cache: cfg_pruned.page_cache.clone(),
};
let mut mmr =
Journaled::<F, _, Digest>::init(context.with_label("unpruned"), &hasher, cfg_unpruned)
.await
.unwrap();
let mut leaves = Vec::with_capacity(LEAF_COUNT);
for i in 0..LEAF_COUNT {
leaves.push(test_digest(i));
}
let mut batch = mmr.new_batch();
for leaf in &leaves {
batch = batch.add(&hasher, leaf);
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let mut batch = pruned_mmr.new_batch();
for leaf in &leaves {
batch = batch.add(&hasher, leaf);
}
let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
pruned_mmr.apply_batch(&batch).unwrap();
let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
assert_eq!(mmr.size(), expected_size);
assert_eq!(pruned_mmr.size(), expected_size);
for i in 0usize..300 {
let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 10, *pruned_mmr.leaves()));
pruned_mmr.prune(prune_loc).await.unwrap();
assert_eq!(prune_loc, pruned_mmr.bounds().start);
let digest = test_digest(LEAF_COUNT + i);
leaves.push(digest);
let last_leaf = leaves.last().unwrap();
let batch = pruned_mmr.new_batch().add(&hasher, last_leaf);
let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
pruned_mmr.apply_batch(&batch).unwrap();
let batch = mmr.new_batch().add(&hasher, last_leaf);
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
assert_eq!(pruned_mmr.root(), mmr.root());
}
pruned_mmr.sync().await.unwrap();
assert_eq!(pruned_mmr.root(), mmr.root());
pruned_mmr.sync().await.unwrap();
drop(pruned_mmr);
let mut pruned_mmr = Journaled::<F, _, Digest>::init(
context.with_label("pruned_reopen"),
&hasher,
cfg_pruned.clone(),
)
.await
.unwrap();
assert_eq!(pruned_mmr.root(), mmr.root());
let size = pruned_mmr.size();
pruned_mmr.prune_all().await.unwrap();
assert_eq!(pruned_mmr.root(), mmr.root());
let bounds = pruned_mmr.bounds();
assert!(bounds.is_empty());
assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
let batch = mmr.new_batch().add(&hasher, &test_digest(LEAF_COUNT));
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let batch = pruned_mmr
.new_batch()
.add(&hasher, &test_digest(LEAF_COUNT));
let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
pruned_mmr.apply_batch(&batch).unwrap();
assert!(*pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
pruned_mmr.sync().await.unwrap();
drop(pruned_mmr);
let mut pruned_mmr = Journaled::<F, _, Digest>::init(
context.with_label("pruned_reopen2"),
&hasher,
cfg_pruned.clone(),
)
.await
.unwrap();
assert_eq!(pruned_mmr.root(), mmr.root());
let bounds = pruned_mmr.bounds();
assert!(!bounds.is_empty());
assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
assert!(pruned_mmr
.prune(Location::<F>::try_from(size).unwrap() - 1)
.await
.is_ok());
assert_eq!(
pruned_mmr.bounds().start,
Location::<F>::try_from(size).unwrap()
);
while *pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
let batch = pruned_mmr
.new_batch()
.add(&hasher, &test_digest(LEAF_COUNT));
let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
pruned_mmr.apply_batch(&batch).unwrap();
}
pruned_mmr.prune_all().await.unwrap();
assert!(pruned_mmr.bounds().is_empty());
pruned_mmr.destroy().await.unwrap();
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_pruning_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_pruning_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_pruning_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_pruning_inner::<mmb::Family>);
}
async fn journaled_recovery_with_pruning_inner<F: Family>(context: deterministic::Context) {
let hasher: Standard<Sha256> = Standard::new();
const LEAF_COUNT: usize = 2000;
let mut leaves = Vec::with_capacity(LEAF_COUNT);
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
for i in 0..LEAF_COUNT {
leaves.push(test_digest(i));
}
let mut batch = mmr.new_batch();
for leaf in &leaves {
batch = batch.add(&hasher, leaf);
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
assert_eq!(mmr.size(), expected_size);
mmr.sync().await.unwrap();
drop(mmr);
for i in 0usize..200 {
let label = format!("iter_{i}");
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label(&label),
&hasher,
test_config(&context),
)
.await
.unwrap();
let start_size = mmr.size();
let start_leaves = *mmr.leaves();
let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 50, start_leaves));
if i % 5 == 0 {
mmr.simulate_pruning_failure(prune_loc).await.unwrap();
continue;
}
mmr.prune(prune_loc).await.unwrap();
for j in 0..10 {
let digest = test_digest(100 * (i + 1) + j);
leaves.push(digest);
let batch = mmr
.new_batch()
.add(&hasher, leaves.last().unwrap())
.add(&hasher, leaves.last().unwrap());
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let digest = test_digest(LEAF_COUNT + i);
leaves.push(digest);
let batch = mmr
.new_batch()
.add(&hasher, leaves.last().unwrap())
.add(&hasher, leaves.last().unwrap());
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
}
let end_size = mmr.size();
let total_to_write = (*end_size - *start_size) as usize;
let partial_write_limit = i % total_to_write;
mmr.simulate_partial_sync(partial_write_limit)
.await
.unwrap();
}
let mmr = Journaled::<F, _, Digest>::init(
context.with_label("final"),
&hasher,
test_config(&context),
)
.await
.unwrap();
mmr.destroy().await.unwrap();
}
#[test_traced("WARN")]
fn test_journaled_recovery_with_pruning_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_recovery_with_pruning_inner::<mmr::Family>);
}
#[test_traced("WARN")]
fn test_journaled_recovery_with_pruning_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_recovery_with_pruning_inner::<mmb::Family>);
}
async fn journaled_historical_proof_basic_inner<F: Family>(context: deterministic::Context) {
let hasher = Standard::<Sha256>::new();
let cfg = test_config(&context);
let mut mmr = Journaled::<F, _, Digest>::init(context, &hasher, cfg)
.await
.unwrap();
let mut elements = Vec::new();
for i in 0..10 {
elements.push(test_digest(i));
}
let mut batch = mmr.new_batch();
for elt in &elements {
batch = batch.add(&hasher, elt);
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let original_leaves = mmr.leaves();
let historical_proof = mmr
.historical_range_proof(
&hasher,
original_leaves,
Location::<F>::new(2)..Location::<F>::new(6),
)
.await
.unwrap();
assert_eq!(historical_proof.leaves, original_leaves);
let root = mmr.root();
assert!(historical_proof.verify_range_inclusion(
&hasher,
&elements[2..6],
Location::<F>::new(2),
&root
));
let regular_proof = mmr
.range_proof(&hasher, Location::<F>::new(2)..Location::<F>::new(6))
.await
.unwrap();
assert_eq!(regular_proof.leaves, historical_proof.leaves);
assert_eq!(regular_proof.digests, historical_proof.digests);
for i in 10..20 {
elements.push(test_digest(i));
}
let mut batch = mmr.new_batch();
for elt in &elements[10..20] {
batch = batch.add(&hasher, elt);
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let new_historical_proof = mmr
.historical_range_proof(
&hasher,
original_leaves,
Location::<F>::new(2)..Location::<F>::new(6),
)
.await
.unwrap();
assert_eq!(new_historical_proof.leaves, historical_proof.leaves);
assert_eq!(new_historical_proof.digests, historical_proof.digests);
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_basic_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_basic_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_basic_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_basic_inner::<mmb::Family>);
}
async fn journaled_historical_proof_with_pruning_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("main"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut elements = Vec::new();
for i in 0..50 {
elements.push(test_digest(i));
}
let mut batch = mmr.new_batch();
for elt in &elements {
batch = batch.add(&hasher, elt);
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let prune_loc = Location::<F>::new(16);
mmr.prune(prune_loc).await.unwrap();
let mut ref_mmr = Journaled::<F, _, Digest>::init(
context.with_label("ref"),
&hasher,
Config {
journal_partition: "ref-journal-pruned".into(),
metadata_partition: "ref-metadata-pruned".into(),
items_per_blob: NZU64!(7),
write_buffer: NZUsize!(1024),
thread_pool: None,
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
},
)
.await
.unwrap();
let mut batch = ref_mmr.new_batch();
for elt in elements.iter().take(41) {
batch = batch.add(&hasher, elt);
}
let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
ref_mmr.apply_batch(&batch).unwrap();
let historical_leaves = ref_mmr.leaves();
let historical_root = ref_mmr.root();
let historical_proof = mmr
.historical_range_proof(
&hasher,
historical_leaves,
Location::<F>::new(35)..Location::<F>::new(39),
)
.await
.unwrap();
assert_eq!(historical_proof.leaves, historical_leaves);
assert!(historical_proof.verify_range_inclusion(
&hasher,
&elements[35..39],
Location::<F>::new(35),
&historical_root
));
ref_mmr.destroy().await.unwrap();
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_with_pruning_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_with_pruning_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_with_pruning_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_with_pruning_inner::<mmb::Family>);
}
async fn journaled_historical_proof_large_inner<F: Family>(context: deterministic::Context) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("server"),
&hasher,
Config {
journal_partition: "server-journal".into(),
metadata_partition: "server-metadata".into(),
items_per_blob: NZU64!(7),
write_buffer: NZUsize!(1024),
thread_pool: None,
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
},
)
.await
.unwrap();
let mut elements = Vec::new();
for i in 0..100 {
elements.push(test_digest(i));
}
let mut batch = mmr.new_batch();
for elt in &elements {
batch = batch.add(&hasher, elt);
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let range = Location::<F>::new(30)..Location::<F>::new(61);
let mut ref_mmr = Journaled::<F, _, Digest>::init(
context.with_label("client"),
&hasher,
Config {
journal_partition: "client-journal".into(),
metadata_partition: "client-metadata".into(),
items_per_blob: NZU64!(7),
write_buffer: NZUsize!(1024),
thread_pool: None,
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
},
)
.await
.unwrap();
let mut batch = ref_mmr.new_batch();
for elt in elements.iter().take(*range.end as usize) {
batch = batch.add(&hasher, elt);
}
let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
ref_mmr.apply_batch(&batch).unwrap();
let historical_leaves = ref_mmr.leaves();
let expected_root = ref_mmr.root();
let proof = mmr
.historical_range_proof(&hasher, historical_leaves, range.clone())
.await
.unwrap();
assert!(proof.verify_range_inclusion(
&hasher,
&elements[range.to_usize_range()],
range.start,
&expected_root ));
ref_mmr.destroy().await.unwrap();
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_large_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_large_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_large_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_large_inner::<mmb::Family>);
}
async fn journaled_historical_proof_singleton_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let cfg = test_config(&context);
let mut mmr = Journaled::<F, _, Digest>::init(context, &hasher, cfg)
.await
.unwrap();
let element = test_digest(0);
let batch = mmr.new_batch().add(&hasher, &element);
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let single_proof = mmr
.historical_range_proof(
&hasher,
Location::<F>::new(1),
Location::<F>::new(0)..Location::<F>::new(1),
)
.await
.unwrap();
let root = mmr.root();
assert!(single_proof.verify_range_inclusion(
&hasher,
&[element],
Location::<F>::new(0),
&root
));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_singleton_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_singleton_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_singleton_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_singleton_inner::<mmb::Family>);
}
async fn journaled_init_sync_empty_inner<F: Family>(context: deterministic::Context) {
let hasher = Standard::<Sha256>::new();
let sync_cfg = SyncConfig::<F, sha256::Digest> {
config: test_config(&context),
range: Location::<F>::new(0)..Location::<F>::new(52),
pinned_nodes: None,
};
let mut sync_mmr = Journaled::<F, _, Digest>::init_sync(context.clone(), sync_cfg, &hasher)
.await
.unwrap();
assert_eq!(sync_mmr.size(), 0);
let bounds = sync_mmr.bounds();
assert_eq!(bounds.start, 0);
assert!(bounds.is_empty());
let new_element = test_digest(999);
let batch = sync_mmr.new_batch().add(&hasher, &new_element);
let batch = sync_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
sync_mmr.apply_batch(&batch).unwrap();
let _root = sync_mmr.root();
sync_mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_init_sync_empty_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_empty_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_init_sync_empty_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_empty_inner::<mmb::Family>);
}
async fn journaled_init_sync_nonempty_exact_match_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..50 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
mmr.sync().await.unwrap();
let original_size = mmr.size();
let original_leaves = mmr.leaves();
let original_root = mmr.root();
let lower_bound_loc = mmr.bounds().start;
let upper_bound_loc = mmr.leaves();
let lower_bound_pos = Position::<F>::try_from(lower_bound_loc).unwrap();
let upper_bound_pos = mmr.size();
let mut expected_nodes = BTreeMap::new();
for i in *lower_bound_pos..*upper_bound_pos {
expected_nodes.insert(
Position::<F>::new(i),
mmr.get_node(Position::<F>::new(i)).await.unwrap().unwrap(),
);
}
let sync_cfg = SyncConfig::<F, sha256::Digest> {
config: test_config(&context),
range: lower_bound_loc..upper_bound_loc,
pinned_nodes: None,
};
mmr.sync().await.unwrap();
drop(mmr);
let sync_mmr =
Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
.await
.unwrap();
assert_eq!(sync_mmr.size(), original_size);
assert_eq!(sync_mmr.leaves(), original_leaves);
let bounds = sync_mmr.bounds();
assert_eq!(bounds.start, lower_bound_loc);
assert!(!bounds.is_empty());
assert_eq!(sync_mmr.root(), original_root);
for pos in *lower_bound_pos..*upper_bound_pos {
let pos = Position::<F>::new(pos);
assert_eq!(
sync_mmr.get_node(pos).await.unwrap(),
expected_nodes.get(&pos).cloned()
);
}
sync_mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_init_sync_nonempty_exact_match_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_nonempty_exact_match_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_init_sync_nonempty_exact_match_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_nonempty_exact_match_inner::<mmb::Family>);
}
async fn journaled_init_sync_partial_overlap_inner<F: Family>(context: deterministic::Context) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..30 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
mmr.sync().await.unwrap();
mmr.prune(Location::<F>::new(6)).await.unwrap();
let original_size = mmr.size();
let original_leaves = mmr.leaves();
let original_root = mmr.root();
let original_pruning_boundary = mmr.bounds().start;
let original_pruning_pos = Position::<F>::try_from(original_pruning_boundary).unwrap();
let lower_bound_loc = original_pruning_boundary;
let upper_bound_loc = original_leaves + 6;
let mut expected_nodes = BTreeMap::new();
for i in *original_pruning_pos..*original_size {
let pos = Position::<F>::new(i);
expected_nodes.insert(pos, mmr.get_node(pos).await.unwrap().unwrap());
}
let sync_cfg = SyncConfig::<F, sha256::Digest> {
config: test_config(&context),
range: lower_bound_loc..upper_bound_loc,
pinned_nodes: None,
};
mmr.sync().await.unwrap();
drop(mmr);
let sync_mmr =
Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
.await
.unwrap();
assert_eq!(sync_mmr.size(), original_size);
let bounds = sync_mmr.bounds();
assert_eq!(bounds.start, lower_bound_loc);
assert!(!bounds.is_empty());
assert_eq!(sync_mmr.root(), original_root);
for i in *original_pruning_pos..*original_size {
let pos = Position::<F>::new(i);
assert_eq!(
sync_mmr.get_node(pos).await.unwrap(),
expected_nodes.get(&pos).cloned()
);
}
sync_mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_init_sync_partial_overlap_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_partial_overlap_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_init_sync_partial_overlap_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_partial_overlap_inner::<mmb::Family>);
}
async fn journaled_init_sync_rejects_extra_pinned_nodes_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let sync_cfg = SyncConfig::<F, sha256::Digest> {
config: test_config(&context),
range: Location::<F>::new(6)..Location::<F>::new(20),
pinned_nodes: Some(vec![test_digest(1), test_digest(2), test_digest(3)]),
};
let result =
Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
.await;
assert!(matches!(result, Err(Error::InvalidPinnedNodes)));
}
#[test_traced]
fn test_journaled_init_sync_rejects_extra_pinned_nodes_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_rejects_extra_pinned_nodes_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_init_sync_rejects_extra_pinned_nodes_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_rejects_extra_pinned_nodes_inner::<mmb::Family>);
}
async fn journaled_init_stale_metadata_returns_error_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..50 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
mmr.sync().await.unwrap();
let prune_loc = Location::<F>::new(25);
mmr.prune(prune_loc).await.unwrap();
drop(mmr);
let meta_cfg = MConfig {
partition: test_config(&context).metadata_partition,
codec_config: ((0..).into(), ()),
};
let mut metadata =
Metadata::<_, U64, Vec<u8>>::init(context.with_label("meta_tamper"), meta_cfg)
.await
.unwrap();
metadata.clear();
let key = U64::new(PRUNED_TO_PREFIX, 0);
metadata.put(key, 0u64.to_be_bytes().to_vec());
metadata.sync().await.unwrap();
drop(metadata);
let result = Journaled::<F, _, Digest>::init(
context.with_label("reopened"),
&hasher,
test_config(&context),
)
.await;
match result {
Err(Error::MissingNode(_)) => {} Ok(_) => panic!("expected MissingNode error, got Ok"),
Err(e) => panic!("expected MissingNode error, got {:?}", e),
}
}
#[test_traced("WARN")]
fn test_journaled_init_stale_metadata_returns_error_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_stale_metadata_returns_error_inner::<mmr::Family>);
}
#[test_traced("WARN")]
fn test_journaled_init_stale_metadata_returns_error_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_stale_metadata_returns_error_inner::<mmb::Family>);
}
async fn journaled_init_metadata_ahead_inner<F: Family>(context: deterministic::Context) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..50 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
mmr.sync().await.unwrap();
let prune_loc = Location::<F>::new(16);
mmr.prune(prune_loc).await.unwrap();
let expected_root = mmr.root();
let expected_size = mmr.size();
drop(mmr);
let mmr = Journaled::<F, _, Digest>::init(
context.with_label("reopened"),
&hasher,
test_config(&context),
)
.await
.unwrap();
assert_eq!(mmr.bounds().start, prune_loc);
assert_eq!(mmr.size(), expected_size);
assert_eq!(mmr.root(), expected_root);
mmr.destroy().await.unwrap();
}
#[test_traced("WARN")]
fn test_journaled_init_metadata_ahead_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_metadata_ahead_inner::<mmr::Family>);
}
#[test_traced("WARN")]
fn test_journaled_init_metadata_ahead_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_metadata_ahead_inner::<mmb::Family>);
}
async fn journaled_init_sync_computes_pinned_nodes_before_pruning_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let cfg = Config {
journal_partition: "mmr-journal".into(),
metadata_partition: "mmr-metadata".into(),
items_per_blob: NZU64!(7),
write_buffer: NZUsize!(64),
thread_pool: None,
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let mut mmr =
Journaled::<F, _, Digest>::init(context.with_label("init"), &hasher, cfg.clone())
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..100 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
mmr.sync().await.unwrap();
let original_size = mmr.size();
let original_root = mmr.root();
drop(mmr);
let prune_loc = Location::<F>::new(32);
let sync_cfg = SyncConfig::<F, sha256::Digest> {
config: cfg,
range: prune_loc..Location::<F>::new(128),
pinned_nodes: None, };
let sync_mmr =
Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
.await
.unwrap();
assert_eq!(sync_mmr.size(), original_size);
assert_eq!(sync_mmr.root(), original_root);
assert_eq!(sync_mmr.bounds().start, prune_loc);
sync_mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_init_sync_computes_pinned_nodes_before_pruning_mmr() {
let executor = deterministic::Runner::default();
executor
.start(journaled_init_sync_computes_pinned_nodes_before_pruning_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_init_sync_computes_pinned_nodes_before_pruning_mmb() {
let executor = deterministic::Runner::default();
executor
.start(journaled_init_sync_computes_pinned_nodes_before_pruning_inner::<mmb::Family>);
}
async fn journaled_historical_proof_pruned_elements_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..64 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let prune_loc = Location::<F>::new(16);
mmr.prune(prune_loc).await.unwrap();
let historical_leaves = mmr.leaves();
let mut pruned_loc = None;
for loc_u64 in 0..*historical_leaves {
let loc = Location::<F>::new(loc_u64);
let result = mmr
.historical_range_proof(&hasher, historical_leaves, loc..loc + 1)
.await;
if matches!(result, Err(Error::ElementPruned(_))) {
pruned_loc = Some(loc);
break;
}
}
let pruned_loc = pruned_loc.expect("expected at least one pruned location");
let mut batch = mmr.new_batch();
for i in 0..8 {
batch = batch.add(&hasher, &test_digest(10_000 + i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let requested = mmr.leaves();
let result = mmr
.historical_range_proof(&hasher, requested, pruned_loc..pruned_loc + 1)
.await;
assert!(matches!(result, Err(Error::ElementPruned(_))));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_pruned_elements_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_pruned_elements_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_pruned_elements_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_pruned_elements_inner::<mmb::Family>);
}
async fn journaled_append_while_historical_proof_is_available_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..20 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let historical_leaves = Location::<F>::new(10);
let range = Location::<F>::new(2)..Location::<F>::new(8);
let batch = mmr
.new_batch()
.add(&hasher, &test_digest(100))
.add(&hasher, &test_digest(101));
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let proof = mmr
.historical_range_proof(&hasher, historical_leaves, range.clone())
.await
.unwrap();
let expected = mmr
.historical_range_proof(&hasher, historical_leaves, range)
.await
.unwrap();
assert_eq!(proof, expected);
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_append_while_historical_proof_is_available_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_append_while_historical_proof_is_available_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_append_while_historical_proof_is_available_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_append_while_historical_proof_is_available_inner::<mmb::Family>);
}
async fn journaled_historical_proof_after_sync_reads_from_journal_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..64 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
mmr.sync().await.unwrap();
let historical_leaves = Location::<F>::new(20);
let range = Location::<F>::new(5)..Location::<F>::new(15);
let expected = mmr
.historical_range_proof(&hasher, historical_leaves, range.clone())
.await
.unwrap();
let actual = mmr
.historical_range_proof(&hasher, historical_leaves, range)
.await
.unwrap();
assert_eq!(actual, expected);
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_after_sync_reads_from_journal_mmr() {
let executor = deterministic::Runner::default();
executor
.start(journaled_historical_proof_after_sync_reads_from_journal_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_after_sync_reads_from_journal_mmb() {
let executor = deterministic::Runner::default();
executor
.start(journaled_historical_proof_after_sync_reads_from_journal_inner::<mmb::Family>);
}
async fn journaled_historical_proof_after_pruning_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..30 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let prune_loc = Location::<F>::new(10);
mmr.prune(prune_loc).await.unwrap();
let requested = Location::<F>::new(20);
let range = prune_loc..requested;
let proof = mmr
.historical_range_proof(&hasher, requested, range)
.await
.unwrap();
assert!(proof.leaves > Location::<F>::new(0));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_after_pruning_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_after_pruning_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_after_pruning_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_after_pruning_inner::<mmb::Family>);
}
async fn journaled_historical_proof_edge_cases_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mmr = Journaled::<F, _, Digest>::init(
context.with_label("empty"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let empty_end = Location::<F>::new(0);
let empty_result = mmr
.historical_range_proof(&hasher, empty_end, empty_end..empty_end)
.await;
assert!(matches!(empty_result, Err(Error::Empty)));
let oob_result = mmr
.historical_range_proof(&hasher, empty_end + 1, empty_end..empty_end + 1)
.await;
assert!(matches!(
oob_result,
Err(Error::RangeOutOfBounds(loc)) if loc == empty_end + 1
));
mmr.destroy().await.unwrap();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("fully_pruned"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..20 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let end = mmr.leaves();
mmr.prune_all().await.unwrap();
assert!(mmr.bounds().is_empty());
let pruned_result = mmr.historical_range_proof(&hasher, end, end - 1..end).await;
assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
let oob_result = mmr
.historical_range_proof(&hasher, end + 1, end - 1..end)
.await;
assert!(matches!(
oob_result,
Err(Error::RangeOutOfBounds(loc)) if loc == end + 1
));
mmr.destroy().await.unwrap();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("single_leaf"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..11 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let end = mmr.leaves();
let keep_loc = end - 1;
mmr.prune(keep_loc).await.unwrap();
let ok_result = mmr
.historical_range_proof(&hasher, end, keep_loc..end)
.await;
assert!(ok_result.is_ok());
let pruned_end = keep_loc - 1;
let start_loc = Location::<F>::new(1);
let pruned_result = mmr
.historical_range_proof(&hasher, end, start_loc..pruned_end + 1)
.await;
assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
let oob_result = mmr
.historical_range_proof(&hasher, end + 1, keep_loc..end)
.await;
assert!(matches!(oob_result, Err(Error::RangeOutOfBounds(_))));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_edge_cases_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_edge_cases_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_edge_cases_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_edge_cases_inner::<mmb::Family>);
}
async fn journaled_historical_proof_out_of_bounds_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("oob"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..8 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let requested = mmr.leaves() + 1;
let result = mmr
.historical_range_proof(&hasher, requested, Location::<F>::new(0)..requested)
.await;
assert!(matches!(
result,
Err(Error::RangeOutOfBounds(loc)) if loc == requested
));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_out_of_bounds_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_out_of_bounds_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_out_of_bounds_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_out_of_bounds_inner::<mmb::Family>);
}
async fn journaled_historical_proof_range_validation_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("range_validation"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..32 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let valid_range = Location::<F>::new(0)..Location::<F>::new(1);
let requested = Location::<F>::new(5);
let empty_range = requested..requested;
let empty_result = mmr
.historical_range_proof(&hasher, requested, empty_range)
.await;
assert!(matches!(empty_result, Err(Error::Empty)));
let leaves_oob = mmr.leaves() + 1;
let result = mmr
.historical_range_proof(&hasher, leaves_oob, valid_range.clone())
.await;
assert!(matches!(
result,
Err(Error::RangeOutOfBounds(loc)) if loc == leaves_oob
));
let end_oob = mmr.leaves() + 1;
let range_oob = Location::<F>::new(0)..end_oob;
let result = mmr
.historical_range_proof(&hasher, requested, range_oob)
.await;
assert!(matches!(
result,
Err(Error::RangeOutOfBounds(loc)) if loc == end_oob
));
let range_end_gt_requested = requested + 1;
let range_oob_at_requested = Location::<F>::new(0)..range_end_gt_requested;
assert!(range_end_gt_requested <= mmr.leaves());
let result = mmr
.historical_range_proof(&hasher, requested, range_oob_at_requested)
.await;
assert!(matches!(
result,
Err(Error::RangeOutOfBounds(loc)) if loc == range_end_gt_requested
));
let overflow_loc = Location::<F>::new(u64::MAX);
let overflow_range = Location::<F>::new(0)..overflow_loc;
let result = mmr
.historical_range_proof(&hasher, requested, overflow_range)
.await;
assert!(matches!(
result,
Err(Error::RangeOutOfBounds(loc)) if loc == overflow_loc
));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_range_validation_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_range_validation_inner::<mmr::Family>);
}
#[test_traced]
fn test_journaled_historical_proof_range_validation_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_historical_proof_range_validation_inner::<mmb::Family>);
}
async fn journaled_historical_proof_non_size_prune_excludes_pruned_leaves_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("non_size_prune"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..16 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let end = mmr.leaves();
let mut failures = Vec::new();
for prune_leaf in 1..*end {
let prune_loc = Location::<F>::new(prune_leaf);
mmr.prune(prune_loc).await.unwrap();
for loc_u64 in 0..*end {
let loc = Location::<F>::new(loc_u64);
let range_includes_pruned_leaf = loc < prune_loc;
match mmr.historical_proof(&hasher, end, loc).await {
Ok(_) => {}
Err(Error::ElementPruned(_)) if range_includes_pruned_leaf => {}
Err(Error::ElementPruned(_)) => failures.push(format!(
"prune_loc={prune_loc} loc={loc} returned ElementPruned without a pruned range element"
)),
Err(err) => failures
.push(format!("prune_loc={prune_loc} loc={loc} err={err}")),
}
}
}
assert!(
failures.is_empty(),
"historical proof generation returned unexpected errors: {failures:?}"
);
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_journaled_historical_proof_non_size_prune_excludes_pruned_leaves_mmr() {
let executor = deterministic::Runner::default();
executor.start(
journaled_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmr::Family>,
);
}
#[test_traced]
fn test_journaled_historical_proof_non_size_prune_excludes_pruned_leaves_mmb() {
let executor = deterministic::Runner::default();
executor.start(
journaled_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmb::Family>,
);
}
async fn journaled_init_sync_recovers_from_invalid_journal_size_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.with_label("init"),
&hasher,
test_config(&context),
)
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..3 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
let valid_size = mmr.size();
let valid_root = mmr.root();
mmr.sync().await.unwrap();
drop(mmr);
{
let journal: Journal<_, Digest> = Journal::init(
context.with_label("corrupt"),
JConfig {
partition: "journal-partition".into(),
items_per_blob: NZU64!(7),
write_buffer: NZUsize!(1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
},
)
.await
.unwrap();
assert_eq!(journal.size().await, valid_size);
journal.append(&Sha256::hash(b"orphan")).await.unwrap();
journal.sync().await.unwrap();
assert_eq!(journal.size().await, valid_size + 1);
}
let sync_cfg = SyncConfig::<F, Digest> {
config: test_config(&context),
range: Location::<F>::new(0)..Location::<F>::new(100),
pinned_nodes: None,
};
let sync_mmr =
Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
.await
.unwrap();
assert_eq!(sync_mmr.size(), valid_size);
assert_eq!(sync_mmr.root(), valid_root);
sync_mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_init_sync_recovers_from_invalid_journal_size_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_recovers_from_invalid_journal_size_inner::<mmr::Family>);
}
#[test_traced]
fn test_init_sync_recovers_from_invalid_journal_size_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_init_sync_recovers_from_invalid_journal_size_inner::<mmb::Family>);
}
async fn journaled_stale_batch_inner<F: Family>(context: deterministic::Context) {
let hasher: Standard<Sha256> = Standard::new();
let mut mmr = Journaled::<F, _, Digest>::init(
context.clone(),
&Standard::<Sha256>::new(),
test_config(&context),
)
.await
.unwrap();
let batch_a = mmr.new_batch().add(&hasher, b"leaf-a");
let batch_a = mmr.with_mem(|mem| batch_a.merkleize(mem, &hasher));
let batch_b = mmr.new_batch().add(&hasher, b"leaf-b");
let batch_b = mmr.with_mem(|mem| batch_b.merkleize(mem, &hasher));
mmr.apply_batch(&batch_a).unwrap();
let result = mmr.apply_batch(&batch_b);
assert!(
matches!(result, Err(Error::StaleBatch { .. })),
"expected StaleBatch, got {result:?}"
);
mmr.destroy().await.unwrap();
}
#[test]
fn test_stale_batch_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_stale_batch_inner::<mmr::Family>);
}
#[test]
fn test_stale_batch_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_stale_batch_inner::<mmb::Family>);
}
async fn journaled_new_batch_returns_append_only_wrapper_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mmr = Journaled::<F, _, Digest>::init(context.clone(), &hasher, test_config(&context))
.await
.unwrap();
let _batch: UnmerkleizedBatch<F, Digest> = mmr.new_batch();
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_new_batch_returns_append_only_wrapper_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_new_batch_returns_append_only_wrapper_inner::<mmr::Family>);
}
#[test_traced]
fn test_new_batch_returns_append_only_wrapper_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_new_batch_returns_append_only_wrapper_inner::<mmb::Family>);
}
async fn journaled_update_leaf_after_sync_returns_pruned_inner<F: Family>(
context: deterministic::Context,
) {
let hasher = Standard::<Sha256>::new();
let mut mmr =
Journaled::<F, _, Digest>::init(context.clone(), &hasher, test_config(&context))
.await
.unwrap();
let mut batch = mmr.new_batch();
for i in 0..50 {
batch = batch.add(&hasher, &test_digest(i));
}
let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
mmr.apply_batch(&batch).unwrap();
mmr.sync().await.unwrap();
let batch = mmr.to_batch().new_batch();
let result = batch.update_leaf(&hasher, Location::<F>::new(0), b"updated");
assert!(matches!(result, Err(Error::ElementPruned(_))));
mmr.destroy().await.unwrap();
}
#[test_traced]
fn test_update_leaf_after_sync_returns_pruned_mmr() {
let executor = deterministic::Runner::default();
executor.start(journaled_update_leaf_after_sync_returns_pruned_inner::<mmr::Family>);
}
#[test_traced]
fn test_update_leaf_after_sync_returns_pruned_mmb() {
let executor = deterministic::Runner::default();
executor.start(journaled_update_leaf_after_sync_returns_pruned_inner::<mmb::Family>);
}
}