use crate::stateful::{
db::{Anchor, DatabaseSet},
Application, Proposed,
};
use commonware_consensus::{
marshal::{
ancestry::BlockProvider,
core::{Mailbox as MarshalMailbox, Variant as MarshalVariant},
Identifier,
},
types::{Height, Round},
Block, CertifiableBlock, Heightable, Roundable,
};
use commonware_cryptography::{certificate::Scheme, Digestible};
use commonware_macros::select;
use commonware_runtime::{telemetry::metrics::GaugeExt, Clock, Metrics, Spawner};
use commonware_utils::channel::{fallible::OneshotExt, oneshot};
use futures::{stream, Stream, StreamExt};
use rand::Rng;
use std::{
collections::{BTreeMap, HashSet, VecDeque},
future::Future,
};
use tracing::{debug, warn};
mod metrics;
pub(crate) use metrics::Metrics as ProcessorMetrics;
type PendingDigest<A, E> = <<A as Application<E>>::Block as Digestible>::Digest;
type PendingBatches<A, E> = <<A as Application<E>>::Databases as DatabaseSet<E>>::Merkleized;
type PendingMap<A, E> = BTreeMap<PendingDigest<A, E>, PendingEntry<A, E>>;
struct PendingEntry<A, E>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
round: Round,
parent: PendingDigest<A, E>,
merkleized: PendingBatches<A, E>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) enum PrepareBatchesError {
Invalid,
Incomplete,
Cancelled,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) enum FinalizeStatus {
Duplicate,
Persisted { height: Height },
}
pub(super) struct Processor<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
app: A,
databases: A::Databases,
pending: PendingMap<A, E>,
last_processed: Anchor<PendingDigest<A, E>>,
metrics: ProcessorMetrics,
}
impl<E, A> Processor<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
pub(super) const fn new(
app: A,
databases: A::Databases,
last_processed: Anchor<PendingDigest<A, E>>,
metrics: ProcessorMetrics,
) -> Self {
Self {
app,
databases,
pending: BTreeMap::new(),
last_processed,
metrics,
}
}
pub(super) const fn databases(&self) -> &A::Databases {
&self.databases
}
pub(super) async fn propose<S, V>(
&mut self,
context: &E,
marshal: MarshalMailbox<S, V>,
(runtime_context, consensus_context): (E, A::Context),
ancestry: impl Stream<Item = A::Block> + Send + 'static,
input_provider: &mut A::InputProvider,
mut response: oneshot::Sender<Option<A::Block>>,
) where
S: Scheme,
V: MarshalVariant<ApplicationBlock = A::Block>,
MarshalMailbox<S, V>: BlockProvider<Block = A::Block>,
{
let timer = self.metrics.propose_duration.timer(context);
let mut ancestry = Box::pin(ancestry);
let parent = match next_or_cancel(&mut response, &mut ancestry).await {
Some(Some(parent)) => parent,
Some(None) => {
response.send_lossy(None);
return;
}
None => {
debug!("proposal request cancelled before initial ancestry arrived");
return;
}
};
let parent_digest = parent.digest();
let ancestry = stream::once(std::future::ready(parent.clone())).chain(ancestry);
let round = consensus_context.round();
let batches = match self
.prepare_batches(context, marshal, parent, &mut response)
.await
{
Ok(batches) => batches,
Err(PrepareBatchesError::Invalid) => {
response.send_lossy(None);
return;
}
Err(PrepareBatchesError::Incomplete) => {
debug!(
?parent_digest,
"proposal request waiting on incomplete ancestry during prepare_batches"
);
response.closed().await;
return;
}
Err(PrepareBatchesError::Cancelled) => {
debug!(
?parent_digest,
"proposal request cancelled during prepare_batches"
);
return;
}
};
let proposed = match await_or_cancel(
&mut response,
self.app.propose(
(runtime_context, consensus_context),
ancestry,
batches,
input_provider,
),
)
.await
{
Some(result) => result,
None => {
debug!(?parent_digest, "proposal request cancelled during propose");
return;
}
};
let Some(Proposed { block, merkleized }) = proposed else {
response.send_lossy(None);
return;
};
assert!(
A::Databases::matches_sync_targets(&merkleized, &A::sync_targets(&block)),
"proposed state must match block commitments",
);
self.cache_pending(block.digest(), parent_digest, round, merkleized);
let _ = self.metrics.pending_blocks.try_set(self.pending.len());
timer.observe(context);
response.send_lossy(Some(block));
}
pub(super) async fn verify<S, V>(
&mut self,
context: &E,
marshal: MarshalMailbox<S, V>,
(runtime_context, consensus_context): (E, A::Context),
ancestry: impl Stream<Item = A::Block> + Send + 'static,
mut response: oneshot::Sender<bool>,
) where
S: Scheme,
V: MarshalVariant<ApplicationBlock = A::Block>,
MarshalMailbox<S, V>: BlockProvider<Block = A::Block>,
{
let timer = self.metrics.verify_duration.timer(context);
let mut ancestry = Box::pin(ancestry);
let block = match next_or_cancel(&mut response, &mut ancestry).await {
Some(Some(block)) => block,
Some(None) => {
debug!("verification request waiting on incomplete block ancestry");
response.closed().await;
return;
}
None => {
debug!("verification request cancelled before initial block arrived");
return;
}
};
let block_digest = block.digest();
if self.pending.contains_key(&block_digest) {
timer.observe(context);
response.send_lossy(true);
return;
}
match is_already_processed(self.last_processed, marshal.clone(), &block, &mut response)
.await
{
Ok(true) => {
timer.observe(context);
response.send_lossy(true);
return;
}
Ok(false) => {
if block.height() <= self.last_processed.height {
response.send_lossy(false);
return;
}
}
Err(PrepareBatchesError::Cancelled) => {
debug!(
?block_digest,
"verification request cancelled during processed-block check"
);
return;
}
Err(PrepareBatchesError::Incomplete) => {
debug!(
?block_digest,
"verification request waiting on incomplete processed-block ancestry"
);
response.closed().await;
return;
}
Err(PrepareBatchesError::Invalid) => {
unreachable!("processed-block check cannot return Invalid")
}
}
let round = consensus_context.round();
let parent = match next_or_cancel(&mut response, &mut ancestry).await {
Some(Some(parent)) => parent,
Some(None) => {
debug!(
?block_digest,
"verification request waiting on incomplete parent ancestry"
);
response.closed().await;
return;
}
None => {
debug!(
?block_digest,
"verification request cancelled before parent ancestry arrived"
);
return;
}
};
let parent_digest = parent.digest();
let batches = match self
.prepare_batches(context, marshal, parent.clone(), &mut response)
.await
{
Ok(batches) => batches,
Err(PrepareBatchesError::Invalid) => {
warn!(
?parent_digest,
?block_digest,
pending_keys = self.pending.len(),
last_processed = ?self.last_processed.digest,
"verification rejected: prepare_batches returned Invalid"
);
response.send_lossy(false);
return;
}
Err(PrepareBatchesError::Incomplete) => {
debug!(
?parent_digest,
?block_digest,
"verification request waiting on incomplete ancestry during prepare_batches"
);
response.closed().await;
return;
}
Err(PrepareBatchesError::Cancelled) => {
debug!(
?parent_digest,
"verification request cancelled during prepare_batches"
);
return;
}
};
let ancestry = stream::iter([block.clone(), parent]).chain(ancestry);
let verified = match await_or_cancel(
&mut response,
self.app
.verify((runtime_context, consensus_context), ancestry, batches),
)
.await
{
Some(result) => result,
None => {
debug!(
?parent_digest,
"verification request cancelled during verify"
);
return;
}
};
let Some(merkleized) = verified else {
warn!(
?parent_digest,
?block_digest,
"verification rejected: app.verify returned None"
);
response.send_lossy(false);
return;
};
if !A::Databases::matches_sync_targets(&merkleized, &A::sync_targets(&block)) {
warn!(
?parent_digest,
?block_digest,
"verification rejected: verified state must match block commitments"
);
response.send_lossy(false);
return;
}
self.cache_pending(block_digest, parent_digest, round, merkleized);
let _ = self.metrics.pending_blocks.try_set(self.pending.len());
timer.observe(context);
response.send_lossy(true);
}
pub(super) async fn prepare_batches<S, V, Response>(
&mut self,
context: &E,
marshal: MarshalMailbox<S, V>,
parent: A::Block,
response: &mut oneshot::Sender<Response>,
) -> Result<<A::Databases as DatabaseSet<E>>::Unmerkleized, PrepareBatchesError>
where
S: Scheme,
V: MarshalVariant<ApplicationBlock = A::Block>,
MarshalMailbox<S, V>: BlockProvider<Block = A::Block>,
{
let parent_digest = parent.digest();
if self.last_processed.digest != parent_digest && !self.pending.contains_key(&parent_digest)
{
self.rebuild_pending(context, marshal, parent, response)
.await?;
}
await_or_cancel(response, self.fork_batches(&parent_digest))
.await
.unwrap_or(Err(PrepareBatchesError::Cancelled))
}
pub(super) async fn fork_batches(
&mut self,
parent: &<A::Block as Digestible>::Digest,
) -> Result<<A::Databases as DatabaseSet<E>>::Unmerkleized, PrepareBatchesError> {
if let Some(entry) = self.pending.get(parent) {
return Ok(<A::Databases as DatabaseSet<E>>::fork_batches(
&entry.merkleized,
));
}
if &self.last_processed.digest == parent {
return Ok(self.databases.new_batches().await);
}
Err(PrepareBatchesError::Invalid)
}
pub(super) async fn rebuild_pending<P, Response>(
&mut self,
context: &E,
provider: P,
target: A::Block,
response: &mut oneshot::Sender<Response>,
) -> Result<(), PrepareBatchesError>
where
P: BlockProvider<Block = A::Block> + Clone,
{
let timer = self.metrics.rebuild_pending_duration.timer(context);
let target_digest = target.digest();
let mut replay_path = Vec::new();
let mut cursor = target;
while cursor.digest() != self.last_processed.digest
&& !self.pending.contains_key(&cursor.digest())
{
let Some(parent) =
await_or_cancel(response, provider.clone().subscribe_parent(&cursor)).await
else {
return Err(PrepareBatchesError::Cancelled);
};
let Some(parent) = parent else {
debug!(
?target_digest,
cursor = ?cursor.digest(),
"ancestor subscription ended before delivery"
);
return Err(PrepareBatchesError::Incomplete);
};
let cursor_height = cursor.height();
if parent.digest() != cursor.parent() || parent.height().next() != cursor_height {
warn!(
?target_digest,
cursor = ?cursor.digest(),
parent = ?parent.digest(),
cursor_height = cursor_height.get(),
parent_height = parent.height().get(),
expected_parent = ?cursor.parent(),
"rebuild_pending received non-contiguous ancestry"
);
return Err(PrepareBatchesError::Invalid);
}
if cursor_height <= self.last_processed.height {
warn!(
?target_digest,
cursor = ?cursor.digest(),
current_height = cursor_height.get(),
last_processed_height = self.last_processed.height.get(),
last_processed = ?self.last_processed.digest,
"rebuild_pending reached stale ancestry below processed height"
);
return Err(PrepareBatchesError::Invalid);
}
if cursor_height.previous().is_none() {
warn!(
?target_digest,
cursor = ?cursor.digest(),
reached_height = %cursor_height,
last_processed = ?self.last_processed.digest,
pending_keys = self.pending.len(),
"rebuild reached ancestry boundary without known anchor"
);
return Err(PrepareBatchesError::Invalid);
}
replay_path.push(cursor);
cursor = parent;
}
let depth = replay_path.len();
for block in replay_path.into_iter().rev() {
let (digest, parent_digest) = (block.digest(), block.parent());
let consensus_context = block.context();
let round = consensus_context.round();
let Some(batches) = await_or_cancel(response, self.fork_batches(&parent_digest)).await
else {
return Err(PrepareBatchesError::Cancelled);
};
let batches = batches.expect("rebuild replay parent must be available");
let Some(merkleized) = await_or_cancel(
response,
self.app.apply(
(context.child("rebuild_pending_apply"), consensus_context),
&block,
batches,
),
)
.await
else {
return Err(PrepareBatchesError::Cancelled);
};
if !A::Databases::matches_sync_targets(&merkleized, &A::sync_targets(&block)) {
warn!(
?target_digest,
block = ?digest,
"rebuild replay state root must match block commitments"
);
return Err(PrepareBatchesError::Invalid);
}
self.cache_pending(digest, parent_digest, round, merkleized);
}
let _ = self.metrics.pending_blocks.try_set(self.pending.len());
let _ = self.metrics.rebuild_pending_depth.try_set(depth);
timer.observe(context);
Ok(())
}
pub(super) async fn finalize(&mut self, context: &E, block: A::Block) -> FinalizeStatus {
let (height, digest) = (block.height(), block.digest());
if height < self.last_processed.height {
panic!(
"received finalized block below processed height: finalized={} processed={}",
height.get(),
self.last_processed.height.get(),
);
}
if height == self.last_processed.height {
assert_eq!(
digest, self.last_processed.digest,
"received conflicting finalized block at processed height",
);
return FinalizeStatus::Duplicate;
}
let timer = self.metrics.finalize_duration.timer(context);
let block_context = block.context();
let round = block_context.round();
let batch = match self.pending.remove(&digest) {
Some(entry) => entry.merkleized,
None => {
let batches = self.databases.new_batches().await;
let batch = self
.app
.apply(
(context.child("finalize_replay"), block_context),
&block,
batches,
)
.await;
assert!(
A::Databases::matches_sync_targets(&batch, &A::sync_targets(&block)),
"finalize replay state root must match block commitments",
);
batch
}
};
self.databases.finalize(batch).await;
self.app
.finalized(
(context.child("finalized"), block.context()),
&block,
&self.databases,
)
.await;
self.prune_pending_after_finalize(&digest, round);
self.last_processed = Anchor {
height,
round,
digest,
};
timer.observe(context);
FinalizeStatus::Persisted { height }
}
fn prune_pending_after_finalize(
&mut self,
finalized_digest: &<A::Block as Digestible>::Digest,
finalized_round: Round,
) {
let mut children_by_parent = BTreeMap::new();
for (candidate_digest, entry) in &self.pending {
children_by_parent
.entry(entry.parent)
.or_insert_with(Vec::new)
.push(*candidate_digest);
}
let mut compatible = HashSet::new();
compatible.insert(*finalized_digest);
let mut to_visit = VecDeque::new();
to_visit.push_back(*finalized_digest);
while let Some(parent) = to_visit.pop_front() {
let Some(children) = children_by_parent.get(&parent) else {
continue;
};
for &child in children {
if compatible.insert(child) {
to_visit.push_back(child);
}
}
}
let before = self.pending.len();
self.pending.retain(|candidate_digest, entry| {
entry.round > finalized_round && compatible.contains(candidate_digest)
});
let pruned = before - self.pending.len();
self.metrics.pruned_forks.inc_by(pruned as u64);
let _ = self.metrics.pending_blocks.try_set(self.pending.len());
}
fn cache_pending(
&mut self,
digest: PendingDigest<A, E>,
parent: PendingDigest<A, E>,
round: Round,
merkleized: PendingBatches<A, E>,
) {
if let Some(existing) = self.pending.get(&digest) {
debug_assert_eq!(existing.parent, parent, "pending parent changed for digest");
debug_assert_eq!(existing.round, round, "pending round changed for digest");
return;
}
self.pending.insert(
digest,
PendingEntry {
round,
parent,
merkleized,
},
);
}
}
async fn is_already_processed<S, V, Response>(
last_processed: Anchor<<V::ApplicationBlock as Digestible>::Digest>,
marshal: MarshalMailbox<S, V>,
block: &V::ApplicationBlock,
response: &mut oneshot::Sender<Response>,
) -> Result<bool, PrepareBatchesError>
where
S: Scheme,
V: MarshalVariant,
V::ApplicationBlock: Block + Clone,
{
let target_height = block.height();
if target_height > last_processed.height {
return Ok(false);
}
if target_height == last_processed.height {
return Ok(block.digest() == last_processed.digest);
}
let Some(canonical) = await_or_cancel(
response,
marshal.get_block(Identifier::Height(target_height)),
)
.await
else {
return Err(PrepareBatchesError::Cancelled);
};
let Some(canonical) = canonical else {
warn!(
target_height = target_height.get(),
processed_height = last_processed.height.get(),
"failed to fetch canonical processed block for stale-block check"
);
return Err(PrepareBatchesError::Incomplete);
};
Ok(canonical.digest() == block.digest())
}
pub(super) async fn next_or_cancel<R, T, S>(
response: &mut oneshot::Sender<R>,
stream: &mut S,
) -> Option<Option<T>>
where
S: Stream<Item = T> + Unpin,
{
await_or_cancel(response, stream.next()).await
}
pub(super) async fn await_or_cancel<R, T, F>(
response: &mut oneshot::Sender<R>,
future: F,
) -> Option<T>
where
F: Future<Output = T>,
{
select! {
_ = response.closed() => None,
output = future => Some(output),
}
}
#[cfg(test)]
mod tests {
use super::{await_or_cancel, next_or_cancel, FinalizeStatus, PrepareBatchesError, Processor};
use crate::stateful::{
actor::processor::ProcessorMetrics,
db::{Anchor, DatabaseSet, Merkleized as _, Unmerkleized as _},
Application, Proposed,
};
use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt as _, Write};
use commonware_consensus::{
marshal::ancestry::BlockProvider,
simplex::{mocks::scheme::Scheme as MockScheme, types::Context as ConsensusContext},
types::{Epoch, Height, Round, View},
Block as ConsensusBlock, CertifiableBlock, Heightable, Roundable,
};
use commonware_cryptography::{
ed25519, sha256::Digest, Digest as _, Digestible, Hasher, Sha256, Signer as _,
};
use commonware_parallel::Sequential;
use commonware_runtime::{
buffer::paged::CacheRef, deterministic, ContextCell, Runner as _, Supervisor as _,
};
use commonware_storage::{
journal::contiguous::fixed::Config as FixedLogConfig,
mmr::{self, full::Config as MmrJournalConfig, Location},
qmdb::{any, sync::Target},
translator::TwoCap,
};
use commonware_utils::{
channel::oneshot,
non_empty_range,
range::NonEmptyRange,
sync::{AsyncRwLock, Mutex},
NZUsize, NZU16, NZU64,
};
use futures::{Stream, StreamExt};
use std::{
collections::{BTreeMap, VecDeque},
future::Future,
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
type TestContext = ConsensusContext<Digest, ed25519::PublicKey>;
const PAGE_SIZE: std::num::NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(8);
const IO_BUFFER_SIZE: NonZeroUsize = NZUsize!(2048);
type Qmdb<E> =
any::unordered::fixed::Db<mmr::Family, E, Digest, Digest, Sha256, TwoCap, Sequential>;
type DbSet<E> = Arc<AsyncRwLock<Qmdb<E>>>;
#[derive(Clone, Debug, PartialEq, Eq)]
struct Block {
context: TestContext,
parent: Digest,
height: Height,
state_root: Digest,
range: NonEmptyRange<Location>,
}
impl Write for Block {
fn write(&self, buf: &mut impl commonware_runtime::BufMut) {
self.context.write(buf);
self.parent.write(buf);
self.height.write(buf);
self.state_root.write(buf);
self.range.write(buf);
}
}
impl EncodeSize for Block {
fn encode_size(&self) -> usize {
self.context.encode_size()
+ self.parent.encode_size()
+ self.height.encode_size()
+ self.state_root.encode_size()
+ self.range.encode_size()
}
}
impl Read for Block {
type Cfg = ();
fn read_cfg(
buf: &mut impl commonware_runtime::Buf,
_: &Self::Cfg,
) -> Result<Self, CodecError> {
Ok(Self {
context: TestContext::read(buf)?,
parent: Digest::read(buf)?,
height: Height::read(buf)?,
state_root: Digest::read(buf)?,
range: commonware_utils::range::NonEmptyRange::read(buf)?,
})
}
}
impl Digestible for Block {
type Digest = Digest;
fn digest(&self) -> Digest {
Sha256::hash(&self.encode())
}
}
impl Heightable for Block {
fn height(&self) -> Height {
self.height
}
}
impl ConsensusBlock for Block {
fn parent(&self) -> Digest {
self.parent
}
}
impl CertifiableBlock for Block {
type Context = TestContext;
fn context(&self) -> Self::Context {
self.context.clone()
}
}
impl Block {
fn genesis() -> Self {
Self {
context: consensus_context(Digest::EMPTY, View::zero()),
parent: Digest::EMPTY,
height: Height::zero(),
state_root: Digest::EMPTY,
range: non_empty_range!(Location::new(0), Location::new(1)),
}
}
}
fn consensus_context(parent: Digest, view: View) -> TestContext {
TestContext {
round: Round::new(Epoch::zero(), view),
leader: ed25519::PrivateKey::from_seed(0).public_key(),
parent: (
if view.is_zero() {
View::zero()
} else {
View::new(view.get() - 1)
},
parent,
),
}
}
fn u64_to_digest(value: u64) -> Digest {
let mut bytes = [0u8; 32];
bytes[..8].copy_from_slice(&value.to_be_bytes());
Digest::from(bytes)
}
fn digest_to_u64(value: &Digest) -> u64 {
let bytes: &[u8] = value.as_ref();
u64::from_be_bytes(
bytes[..8]
.try_into()
.expect("digest prefix should be 8 bytes"),
)
}
fn height_key(height: Height) -> Digest {
Sha256::hash(&height.get().to_be_bytes())
}
fn counter_key() -> Digest {
Sha256::hash(b"processor_harness_counter")
}
#[derive(Clone)]
struct FinalizedObserver {
db_config: any::FixedConfig<TwoCap, Sequential>,
reopened_counters: Arc<Mutex<Vec<u64>>>,
}
#[derive(Clone)]
struct ExecutionApp {
genesis: Block,
finalized_observer: Option<FinalizedObserver>,
}
impl ExecutionApp {
fn new() -> Self {
Self {
genesis: Block::genesis(),
finalized_observer: None,
}
}
fn with_finalized_observer(
db_config: any::FixedConfig<TwoCap, Sequential>,
) -> (Self, Arc<Mutex<Vec<u64>>>) {
let reopened_counters = Arc::new(Mutex::new(Vec::new()));
let observer = FinalizedObserver {
db_config,
reopened_counters: reopened_counters.clone(),
};
(
Self {
genesis: Block::genesis(),
finalized_observer: Some(observer),
},
reopened_counters,
)
}
async fn execute(
height: Height,
view: View,
mut batches: <DbSet<deterministic::Context> as DatabaseSet<deterministic::Context>>::Unmerkleized,
) -> <DbSet<deterministic::Context> as DatabaseSet<deterministic::Context>>::Merkleized
{
let current_counter = batches
.get(&counter_key())
.await
.expect("counter read should succeed")
.map_or(0, |digest| digest_to_u64(&digest));
batches = batches.write(counter_key(), Some(u64_to_digest(current_counter + 1)));
batches = batches.write(height_key(height), Some(u64_to_digest(view.get())));
batches.merkleize().await.expect("merkleize should succeed")
}
}
impl Application<deterministic::Context> for ExecutionApp {
type SigningScheme = MockScheme<ed25519::PublicKey>;
type Context = TestContext;
type Block = Block;
type Databases = DbSet<deterministic::Context>;
type InputProvider = ();
async fn genesis(&mut self) -> Self::Block {
self.genesis.clone()
}
async fn propose(
&mut self,
context: (deterministic::Context, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
batches: <Self::Databases as DatabaseSet<deterministic::Context>>::Unmerkleized,
_input: &mut Self::InputProvider,
) -> Option<Proposed<Self, deterministic::Context>> {
let mut ancestry = Box::pin(ancestry);
let parent = ancestry.next().await?;
let context = context.1.clone();
let view = context.round.view();
let height = parent.height().next();
let merkleized = Self::execute(height, view, batches).await;
let block = Block {
context,
parent: parent.digest(),
height,
state_root: merkleized.root(),
range: non_empty_range!(
merkleized.bounds().inactivity_floor,
Location::new(merkleized.bounds().total_size)
),
};
Some(Proposed { block, merkleized })
}
async fn verify(
&mut self,
_context: (deterministic::Context, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send,
batches: <Self::Databases as DatabaseSet<deterministic::Context>>::Unmerkleized,
) -> Option<<Self::Databases as DatabaseSet<deterministic::Context>>::Merkleized> {
let mut ancestry = Box::pin(ancestry);
let block = ancestry.next().await?;
let merkleized =
Self::execute(block.height(), block.context.round.view(), batches).await;
if merkleized.root() != block.state_root {
return None;
}
Some(merkleized)
}
async fn apply(
&mut self,
_context: (deterministic::Context, Self::Context),
block: &Self::Block,
batches: <Self::Databases as DatabaseSet<deterministic::Context>>::Unmerkleized,
) -> <Self::Databases as DatabaseSet<deterministic::Context>>::Merkleized {
Self::execute(block.height(), block.context.round.view(), batches).await
}
fn sync_targets(
block: &Self::Block,
) -> <Self::Databases as DatabaseSet<deterministic::Context>>::SyncTargets {
Target::new(block.state_root, block.range.clone())
}
async fn finalized(
&mut self,
context: (deterministic::Context, Self::Context),
_block: &Self::Block,
_databases: &Self::Databases,
) {
let Some(observer) = &self.finalized_observer else {
return;
};
let reopened = Qmdb::init(
context.0.child("finalized_observer_reopen"),
observer.db_config.clone(),
)
.await
.expect("database reopen inside finalized hook should succeed");
let counter = reopened
.get(&counter_key())
.await
.expect("reopened counter read should succeed")
.map(|value| digest_to_u64(&value))
.unwrap_or(0);
observer.reopened_counters.lock().push(counter);
}
}
#[derive(Clone, Default)]
struct MapProvider {
blocks: Arc<Mutex<BTreeMap<Digest, Block>>>,
fetches: Arc<AtomicUsize>,
}
impl MapProvider {
fn insert(&self, block: Block) {
self.blocks.lock().insert(block.digest(), block);
}
fn fetch_by_digest(&self, digest: Digest) -> Option<Block> {
self.fetches.fetch_add(1, Ordering::SeqCst);
self.blocks.lock().get(&digest).cloned()
}
fn fetches(&self) -> usize {
self.fetches.load(Ordering::SeqCst)
}
}
impl BlockProvider for MapProvider {
type Block = Block;
fn subscribe_parent(
&self,
block: &Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send + 'static {
let provider = self.clone();
let parent = block.parent();
async move { provider.fetch_by_digest(parent) }
}
}
#[derive(Clone, Default)]
struct ScriptedParentProvider {
responses: Arc<Mutex<BTreeMap<Digest, VecDeque<Option<Block>>>>>,
fetches: Arc<AtomicUsize>,
}
impl ScriptedParentProvider {
fn push(&self, child: &Block, responses: impl IntoIterator<Item = Option<Block>>) {
self.responses
.lock()
.insert(child.digest(), responses.into_iter().collect());
}
fn fetches(&self) -> usize {
self.fetches.load(Ordering::SeqCst)
}
}
impl BlockProvider for ScriptedParentProvider {
type Block = Block;
fn subscribe_parent(
&self,
block: &Self::Block,
) -> impl Future<Output = Option<Self::Block>> + Send + 'static {
let provider = self.clone();
let child = block.digest();
async move {
provider.fetches.fetch_add(1, Ordering::SeqCst);
provider
.responses
.lock()
.get_mut(&child)
.and_then(VecDeque::pop_front)
.flatten()
}
}
}
struct Harness {
context_cell: ContextCell<deterministic::Context>,
processor: Processor<deterministic::Context, ExecutionApp>,
provider: MapProvider,
db_config: any::FixedConfig<TwoCap, Sequential>,
finalized_reopened_counters: Option<Arc<Mutex<Vec<u64>>>>,
}
impl Harness {
async fn new(context: deterministic::Context) -> Self {
let provider = MapProvider::default();
let config = qmdb_config(&next_partition_prefix(), &context);
Self::with_app(context, provider, config.clone(), ExecutionApp::new(), None).await
}
async fn new_with_finalized_observer(context: deterministic::Context) -> Self {
let provider = MapProvider::default();
let config = qmdb_config(&next_partition_prefix(), &context);
let (app, finalized_reopened_counters) =
ExecutionApp::with_finalized_observer(config.clone());
Self::with_app(
context,
provider,
config,
app,
Some(finalized_reopened_counters),
)
.await
}
async fn with_app(
context: deterministic::Context,
provider: MapProvider,
config: any::FixedConfig<TwoCap, Sequential>,
app: ExecutionApp,
finalized_reopened_counters: Option<Arc<Mutex<Vec<u64>>>>,
) -> Self {
let databases = <DbSet<deterministic::Context> as DatabaseSet<
deterministic::Context,
>>::init(context.child("db_set"), config.clone())
.await;
let metrics = ProcessorMetrics::new(context.child("processor_metrics"));
Self {
context_cell: ContextCell::new(context),
processor: Processor::new(
app,
databases,
Anchor {
height: Height::zero(),
round: Block::genesis().context().round,
digest: Block::genesis().digest(),
},
metrics,
),
provider,
db_config: config,
finalized_reopened_counters,
}
}
async fn stage_pending_child(&mut self, parent: &Block, view: View) -> Block {
let context = consensus_context(parent.digest(), view);
let height = Height::new(parent.height().get() + 1);
let batches = self
.processor
.fork_batches(&parent.digest())
.await
.expect("parent should be available");
let merkleized = ExecutionApp::execute(height, view, batches).await;
let block = Block {
context,
parent: parent.digest(),
height,
state_root: merkleized.root(),
range: non_empty_range!(
merkleized.bounds().inactivity_floor,
Location::new(merkleized.bounds().total_size)
),
};
let round = Round::new(Epoch::zero(), view);
self.processor
.cache_pending(block.digest(), parent.digest(), round, merkleized);
self.provider.insert(block.clone());
block
}
async fn rebuild_pending(
&mut self,
target: Digest,
response: &mut oneshot::Sender<bool>,
) -> Result<(), PrepareBatchesError> {
let mut replay_path = Vec::new();
let mut cursor = target;
while cursor != self.processor.last_processed.digest
&& !self.processor.pending.contains_key(&cursor)
{
let Some(block) =
await_or_cancel(response, async { self.provider.fetch_by_digest(cursor) })
.await
else {
return Err(PrepareBatchesError::Cancelled);
};
let Some(block) = block else {
continue;
};
if block.height() <= self.processor.last_processed.height {
return Err(PrepareBatchesError::Invalid);
}
if block.height().previous().is_none() {
return Err(PrepareBatchesError::Invalid);
}
cursor = block.parent();
replay_path.push(block);
}
for block in replay_path.into_iter().rev() {
let (digest, parent_digest) = (block.digest(), block.parent());
let consensus_context = block.context();
let round = consensus_context.round();
let batches = self
.processor
.fork_batches(&parent_digest)
.await
.expect("rebuild replay parent must be available");
let merkleized = self
.processor
.app
.apply(
(
self.context_cell
.as_present()
.child("rebuild_pending_apply"),
consensus_context,
),
&block,
batches,
)
.await;
if !DbSet::<deterministic::Context>::matches_sync_targets(
&merkleized,
&ExecutionApp::sync_targets(&block),
) {
return Err(PrepareBatchesError::Invalid);
}
self.processor
.cache_pending(digest, parent_digest, round, merkleized);
}
Ok(())
}
fn is_canonical_processed(&self, block: &Block) -> bool {
let target_height = block.height();
if target_height > self.processor.last_processed.height {
return false;
}
if target_height == self.processor.last_processed.height {
return block.digest() == self.processor.last_processed.digest;
}
let mut cursor = self.processor.last_processed.digest;
while let Some(canonical) = self.provider.fetch_by_digest(cursor) {
let canonical_height = canonical.height();
if canonical_height == target_height {
return canonical.digest() == block.digest();
}
if canonical_height < target_height {
return false;
}
cursor = canonical.parent();
}
false
}
async fn finalize(&mut self, block: Block) -> FinalizeStatus {
self.processor
.finalize(self.context_cell.as_present(), block)
.await
}
async fn height_value(&self, height: Height) -> Option<u64> {
let db = self.processor.databases.read().await;
db.get(&height_key(height))
.await
.expect("database read should succeed")
.map(|value| digest_to_u64(&value))
}
async fn counter_value(&self) -> Option<u64> {
let db = self.processor.databases.read().await;
db.get(&counter_key())
.await
.expect("database read should succeed")
.map(|value| digest_to_u64(&value))
}
async fn reopen_height_value(
&self,
context: deterministic::Context,
height: Height,
) -> Option<u64> {
let reopened: Qmdb<deterministic::Context> =
Qmdb::init(context.child("reopen_db"), self.db_config.clone())
.await
.expect("database reopen should succeed");
reopened
.get(&height_key(height))
.await
.expect("reopened db read should succeed")
.map(|value| digest_to_u64(&value))
}
fn finalized_reopened_counters(&self) -> Vec<u64> {
self.finalized_reopened_counters
.as_ref()
.expect("finalized observer should be configured")
.lock()
.clone()
}
}
fn next_partition_prefix() -> String {
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
let id = NEXT_ID.fetch_add(1, Ordering::SeqCst);
format!("processor_harness_{id}")
}
fn qmdb_config(
prefix: &str,
context: &deterministic::Context,
) -> any::FixedConfig<TwoCap, Sequential> {
let page_cache = CacheRef::from_pooler(context, PAGE_SIZE, PAGE_CACHE_SIZE);
any::FixedConfig {
merkle_config: MmrJournalConfig {
journal_partition: format!("{prefix}_mmr_journal"),
metadata_partition: format!("{prefix}_mmr_metadata"),
items_per_blob: NZU64!(11),
write_buffer: IO_BUFFER_SIZE,
strategy: Sequential,
page_cache: page_cache.clone(),
},
journal_config: FixedLogConfig {
partition: format!("{prefix}_log_journal"),
items_per_blob: NZU64!(7),
page_cache,
write_buffer: IO_BUFFER_SIZE,
},
translator: TwoCap,
}
}
#[test]
fn execution_finalization_prunes_losing_fork() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context).await;
let genesis = Block::genesis();
let block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
let winner = harness.stage_pending_child(&block1, View::new(3)).await;
let loser = harness.stage_pending_child(&block1, View::new(2)).await;
assert!(harness.processor.pending.contains_key(&winner.digest()));
assert!(harness.processor.pending.contains_key(&loser.digest()));
let status = harness.finalize(winner.clone()).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(2)
},
"finalization should persist winner state",
);
assert!(
!harness.processor.pending.contains_key(&loser.digest()),
"losing fork at finalized round should be pruned",
);
assert_eq!(harness.processor.last_processed.digest, winner.digest());
assert_eq!(harness.height_value(Height::new(2)).await, Some(3));
});
}
#[test]
fn execution_finalization_prunes_losing_fork_descendants() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context).await;
let genesis = Block::genesis();
let block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
let loser = harness.stage_pending_child(&block1, View::new(2)).await;
let winner = harness.stage_pending_child(&block1, View::new(3)).await;
let loser_child = harness.stage_pending_child(&loser, View::new(4)).await;
assert!(harness.processor.pending.contains_key(&winner.digest()));
assert!(harness.processor.pending.contains_key(&loser.digest()));
assert!(harness
.processor
.pending
.contains_key(&loser_child.digest()));
let status = harness.finalize(winner.clone()).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(2)
},
"finalization should persist winner state",
);
assert!(
!harness.processor.pending.contains_key(&loser.digest()),
"losing fork at finalized round should be pruned",
);
assert!(
!harness
.processor
.pending
.contains_key(&loser_child.digest()),
"descendants of the losing fork should also be pruned",
);
});
}
#[test]
fn execution_rebuild_pending_restores_missing_chain() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context).await;
let genesis = Block::genesis();
let block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
let status = harness.finalize(block1.clone()).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(1)
}
);
let block2 = harness.stage_pending_child(&block1, View::new(2)).await;
let block3 = harness.stage_pending_child(&block2, View::new(3)).await;
harness.processor.pending.clear();
harness.provider.insert(block2.clone());
harness.provider.insert(block3.clone());
let (mut response, _rx) = oneshot::channel::<bool>();
let result = harness
.rebuild_pending(block3.digest(), &mut response)
.await;
assert_eq!(result, Ok(()), "rebuild should succeed");
assert!(
harness.processor.pending.contains_key(&block2.digest()),
"first missing descendant should be reconstructed",
);
assert!(
harness.processor.pending.contains_key(&block3.digest()),
"target block should be reconstructed",
);
});
}
#[test]
fn execution_rebuild_pending_rejects_stale_ancestor_quickly() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context).await;
let genesis = Block::genesis();
let mut chain = Vec::new();
let mut parent = genesis;
for view in 1..=5 {
let block = harness.stage_pending_child(&parent, View::new(view)).await;
let status = harness.finalize(block.clone()).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(view),
}
);
parent = block.clone();
chain.push(block);
}
harness.processor.pending.clear();
let stale_parent = chain[1].digest(); let fetches_before = harness.provider.fetches();
let (mut response, _rx) = oneshot::channel::<bool>();
let result = harness.rebuild_pending(stale_parent, &mut response).await;
assert_eq!(
result,
Err(PrepareBatchesError::Invalid),
"stale ancestry should be rejected",
);
let fetches_after = harness.provider.fetches();
assert_eq!(
fetches_after.saturating_sub(fetches_before),
1,
"stale ancestry should be rejected after a single fetch",
);
});
}
#[test]
fn execution_rebuild_pending_rejects_sync_target_mismatch_before_caching() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context).await;
let genesis = Block::genesis();
let block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
let status = harness.finalize(block1.clone()).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(1)
}
);
let mut block2 = harness.stage_pending_child(&block1, View::new(2)).await;
harness.processor.pending.clear();
block2.range = non_empty_range!(Location::new(1), Location::new(2));
harness.provider.insert(block2.clone());
let (mut response, _rx) = oneshot::channel::<bool>();
let result = harness
.rebuild_pending(block2.digest(), &mut response)
.await;
assert_eq!(
result,
Err(PrepareBatchesError::Invalid),
"rebuild should reject a replayed batch whose sync target does not match the block",
);
assert!(
!harness.processor.pending.contains_key(&block2.digest()),
"rejected replay must not be inserted into the pending cache",
);
});
}
#[test]
fn execution_rebuild_pending_rejects_height_gap_to_processed_anchor() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context.child("harness")).await;
let genesis = Block::genesis();
let block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
let status = harness.finalize(block1.clone()).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(1)
}
);
let gap_height = Height::new(3);
let gap_view = View::new(3);
let batches = harness
.processor
.fork_batches(&block1.digest())
.await
.expect("processed anchor should be available");
let merkleized = ExecutionApp::execute(gap_height, gap_view, batches).await;
let gap_block = Block {
context: consensus_context(block1.digest(), gap_view),
parent: block1.digest(),
height: gap_height,
state_root: merkleized.root(),
range: non_empty_range!(
merkleized.bounds().inactivity_floor,
Location::new(merkleized.bounds().total_size)
),
};
let provider = ScriptedParentProvider::default();
provider.push(&gap_block, [Some(block1)]);
let (mut response, _rx) = oneshot::channel::<bool>();
let result = harness
.processor
.rebuild_pending(
harness.context_cell.as_present(),
provider,
gap_block.clone(),
&mut response,
)
.await;
assert_eq!(
result,
Err(PrepareBatchesError::Invalid),
"rebuild must reject non-contiguous ancestry above the processed anchor",
);
assert!(
!harness.processor.pending.contains_key(&gap_block.digest()),
"height-gap block must not be cached as pending",
);
});
}
#[test]
fn execution_verify_rejects_conflicting_stale_block() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context).await;
let genesis = Block::genesis();
let canonical = harness.stage_pending_child(&genesis, View::new(1)).await;
let conflicting = harness.stage_pending_child(&genesis, View::new(2)).await;
let status = harness.finalize(canonical).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(1),
}
);
assert!(
!harness.is_canonical_processed(&conflicting),
"conflicting stale block must not be accepted as already processed",
);
});
}
#[test]
#[should_panic(expected = "received conflicting finalized block at processed height")]
fn execution_finalize_panics_on_conflicting_duplicate_height() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context).await;
let genesis = Block::genesis();
let canonical = harness.stage_pending_child(&genesis, View::new(1)).await;
let conflicting = harness.stage_pending_child(&genesis, View::new(2)).await;
let status = harness.finalize(canonical).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(1),
}
);
let _ = harness.finalize(conflicting).await;
});
}
#[test]
fn execution_finalization_persists_state_to_db() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context.child("harness")).await;
let genesis = Block::genesis();
let block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
let status = harness.finalize(block1).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(1)
}
);
assert_eq!(harness.counter_value().await, Some(1));
assert_eq!(
harness
.reopen_height_value(context.child("reopen"), Height::new(1))
.await,
Some(1),
"height state should survive reopen after finalization",
);
});
}
#[test]
#[should_panic(expected = "finalize replay state root must match block commitments")]
fn execution_finalize_replay_rejects_state_root_mismatch() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context).await;
let genesis = Block::genesis();
let mut block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
block1.state_root = u64_to_digest(999);
harness.processor.pending.clear();
let _ = harness.finalize(block1.clone()).await;
});
}
#[test]
fn execution_finalized_hook_runs_after_durable_finalize() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new_with_finalized_observer(context).await;
let genesis = Block::genesis();
let block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
let status = harness.finalize(block1).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(1)
}
);
assert_eq!(
harness.finalized_reopened_counters(),
vec![1],
"finalized hook should observe the durably committed state",
);
});
}
#[test]
fn initial_ancestry_read_cancels_when_response_dropped() {
deterministic::Runner::default().start(|_context| async move {
let (mut response, receiver) = oneshot::channel::<bool>();
let mut ancestry = Box::pin(futures::stream::pending::<Block>());
drop(receiver);
assert_eq!(next_or_cancel(&mut response, &mut ancestry).await, None);
});
}
#[test]
fn execution_rebuild_pending_returns_incomplete_when_parent_subscription_ends() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context.child("harness")).await;
let genesis = Block::genesis();
let block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
let status = harness.finalize(block1.clone()).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(1)
}
);
let block2 = harness.stage_pending_child(&block1, View::new(2)).await;
harness.processor.pending.clear();
let provider = ScriptedParentProvider::default();
provider.push(&block2, [None]);
let (mut response, _rx) = oneshot::channel::<bool>();
let result = harness
.processor
.rebuild_pending(
harness.context_cell.as_present(),
provider,
block2,
&mut response,
)
.await;
assert_eq!(result, Err(PrepareBatchesError::Incomplete));
});
}
#[test]
fn execution_rebuild_pending_does_not_retry_closed_provider_forever() {
deterministic::Runner::default().start(|context| async move {
let mut harness = Harness::new(context.child("harness")).await;
let genesis = Block::genesis();
let block1 = harness.stage_pending_child(&genesis, View::new(1)).await;
let status = harness.finalize(block1.clone()).await;
assert_eq!(
status,
FinalizeStatus::Persisted {
height: Height::new(1)
}
);
let block2 = harness.stage_pending_child(&block1, View::new(2)).await;
harness.processor.pending.clear();
let provider = ScriptedParentProvider::default();
provider.push(&block2, [None, Some(block1.clone())]);
let (mut response, _rx) = oneshot::channel::<bool>();
let result = harness
.processor
.rebuild_pending(
harness.context_cell.as_present(),
provider.clone(),
block2,
&mut response,
)
.await;
assert_eq!(result, Err(PrepareBatchesError::Incomplete));
assert_eq!(
provider.fetches(),
1,
"closed ancestry should not be retried"
);
});
}
}