mysticeti_core/
commit_observer.rs1use crate::consensus::{CommittedSubDag, LeaderStatus, Linearizer};
4use crate::data::Data;
5use crate::types::StatementBlock;
6
7pub trait CommitObserver: Send + Sync {
9 fn handle_commit(&self, committed: Vec<LeaderStatus>);
11}
12
13pub struct SimpleCommitObserver {
15 linearizer: Linearizer,
16 tx: tokio::sync::mpsc::UnboundedSender<CommittedSubDag>,
17}
18
19impl SimpleCommitObserver {
20 pub fn new(tx: tokio::sync::mpsc::UnboundedSender<CommittedSubDag>) -> Self {
21 Self {
22 linearizer: Linearizer::new(),
23 tx,
24 }
25 }
26
27 pub fn linearizer(&self) -> &Linearizer {
28 &self.linearizer
29 }
30}
31
32impl CommitObserver for SimpleCommitObserver {
33 fn handle_commit(&self, committed: Vec<LeaderStatus>) {
34 let blocks: Vec<Data<StatementBlock>> = committed
35 .into_iter()
36 .filter_map(|s| {
37 if let LeaderStatus::Commit(b) = s {
38 Some(b)
39 } else {
40 None
41 }
42 })
43 .collect();
44 if blocks.is_empty() {
45 return;
46 }
47 let subdag = self.linearizer.linearize(blocks);
48 let _ = self.tx.send(subdag);
49 }
50}