use crate::stateful::{
actor::{
core::mailbox::Message,
processor::{FinalizeStatus, Processor},
},
Application,
};
use commonware_actor::mailbox as actor_mailbox;
use commonware_consensus::{
marshal::{
ancestry::BlockProvider,
core::{Mailbox as MarshalMailbox, Variant},
},
types::Height,
Heightable,
};
use commonware_cryptography::certificate::Scheme;
use commonware_macros::select_loop;
use commonware_runtime::{Clock, ContextCell, Metrics, Spawner};
use commonware_utils::{channel::fallible::OneshotExt, Acknowledgement};
use rand::Rng;
use tracing::debug;
pub(super) struct Processing<E, A, S, V, R>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
S: Scheme,
V: Variant<ApplicationBlock = A::Block>,
{
pub(super) context: ContextCell<E>,
pub(super) mailbox: actor_mailbox::Receiver<Message<E, A>>,
pub(super) input_provider: A::InputProvider,
pub(super) marshal: MarshalMailbox<S, V>,
#[expect(
dead_code,
reason = "processing keeps resolver handles alive for peer state sync"
)]
pub(super) resolvers: R,
pub(super) processor: Processor<E, A>,
pub(super) skip_finalized_until: Option<Height>,
}
impl<E, A, S, V, R> Processing<E, A, S, V, R>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
S: Scheme,
V: Variant<ApplicationBlock = A::Block>,
MarshalMailbox<S, V>: BlockProvider<Block = A::Block>,
{
pub async fn start(mut self) {
select_loop! {
self.context,
on_stopped => {
debug!("processor received shutdown signal");
},
Some(message) = self.mailbox.recv() else {
debug!("mailbox closed, shutting down processor");
break;
} => match message {
Message::Propose {
context,
ancestry,
response,
} => {
self.processor
.propose(
self.context.as_present(),
self.marshal.clone(),
context,
ancestry,
&mut self.input_provider,
response,
)
.await;
}
Message::Verify {
context,
ancestry,
response,
} => {
self.processor
.verify(
self.context.as_present(),
self.marshal.clone(),
context,
ancestry,
response,
)
.await;
}
Message::Finalized {
block,
acknowledgement,
} => {
if skip_finalized_block(&mut self.skip_finalized_until, block.height()) {
acknowledgement.acknowledge();
continue;
}
if let FinalizeStatus::Persisted { height } =
self.processor.finalize(&self.context, block).await
{
debug!(height = height.get(), "persisted finalized database batch");
}
acknowledgement.acknowledge();
}
Message::SubscribeDatabases { response } => {
response.send_lossy(self.processor.databases().clone());
}
},
}
}
}
fn skip_finalized_block(skip_until: &mut Option<Height>, height: Height) -> bool {
let Some(target) = *skip_until else {
return false;
};
if height > target {
*skip_until = None;
return false;
}
if height == target {
*skip_until = None;
}
true
}
#[cfg(test)]
mod tests {
use super::skip_finalized_block;
use commonware_consensus::types::Height;
#[test]
fn skip_finalized_block_skips_through_target_height() {
let mut skip_until = Some(Height::new(3));
assert!(skip_finalized_block(&mut skip_until, Height::new(1)));
assert_eq!(skip_until, Some(Height::new(3)));
assert!(skip_finalized_block(&mut skip_until, Height::new(3)));
assert_eq!(skip_until, None);
assert!(!skip_finalized_block(&mut skip_until, Height::new(4)));
}
#[test]
fn skip_finalized_block_clears_stale_target() {
let mut skip_until = Some(Height::new(3));
assert!(!skip_finalized_block(&mut skip_until, Height::new(4)));
assert_eq!(skip_until, None);
}
}