Skip to main content

mysticeti_core/
commit_observer.rs

1//! Commit observer: handle committed leaders and produce CommittedSubDag for execution.
2
3use crate::consensus::{CommittedSubDag, LeaderStatus, Linearizer};
4use crate::data::Data;
5use crate::types::StatementBlock;
6
7/// Observes commit decisions and turns them into ordered subdags (e.g. for execution or WAL).
8pub trait CommitObserver: Send + Sync {
9    /// Handle a batch of committed leader statuses from the committer.
10    fn handle_commit(&self, committed: Vec<LeaderStatus>);
11}
12
13/// Simple implementation: linearize committed blocks and send the subdag on a channel.
14pub struct SimpleCommitObserver {
15    linearizer: Linearizer,
16    tx: tokio::sync::mpsc::UnboundedSender<CommittedSubDag>,
17}
18
19impl SimpleCommitObserver {
20    pub fn new(tx: tokio::sync::mpsc::UnboundedSender<CommittedSubDag>) -> Self {
21        Self {
22            linearizer: Linearizer::new(),
23            tx,
24        }
25    }
26
27    pub fn linearizer(&self) -> &Linearizer {
28        &self.linearizer
29    }
30}
31
32impl CommitObserver for SimpleCommitObserver {
33    fn handle_commit(&self, committed: Vec<LeaderStatus>) {
34        let blocks: Vec<Data<StatementBlock>> = committed
35            .into_iter()
36            .filter_map(|s| {
37                if let LeaderStatus::Commit(b) = s {
38                    Some(b)
39                } else {
40                    None
41                }
42            })
43            .collect();
44        if blocks.is_empty() {
45            return;
46        }
47        let subdag = self.linearizer.linearize(blocks);
48        let _ = self.tx.send(subdag);
49    }
50}