aleph_bft/consensus/
handler.rs

1use crate::{
2    alerts::{Alert, ForkingNotification},
3    collection::Salt,
4    consensus::LOG_TARGET,
5    dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest},
6    dissemination::{Addressed, DisseminationMessage, Responder, TaskManager, TaskManagerStatus},
7    extension::Ordering,
8    units::{UncheckedSignedUnit, Unit, UnitStore, UnitStoreStatus, Validator},
9    Data, DelayConfig, Hasher, MultiKeychain, NodeIndex, UnitFinalizationHandler,
10};
11use log::{debug, trace};
12use std::{
13    cmp::max,
14    fmt::{Display, Formatter, Result as FmtResult},
15    time::Duration,
16};
17
18/// The main logic of the consensus, minus all the asynchronous components.
19pub struct Consensus<UFH, MK>
20where
21    UFH: UnitFinalizationHandler,
22    MK: MultiKeychain,
23{
24    store: UnitStore<DagUnit<UFH::Hasher, UFH::Data, MK>>,
25    dag: Dag<UFH::Hasher, UFH::Data, MK>,
26    responder: Responder<UFH::Hasher, UFH::Data, MK>,
27    ordering: Ordering<MK, UFH>,
28    task_manager: TaskManager<UFH::Hasher>,
29}
30
31/// The status of the consensus, for logging purposes.
32pub struct Status<H: Hasher> {
33    task_manager_status: TaskManagerStatus<H>,
34    dag_status: DagStatus,
35    store_status: UnitStoreStatus,
36}
37
38impl<H: Hasher> Status<H> {
39    fn short_report(&self) -> String {
40        let rounds_behind = max(self.dag_status.top_round(), self.store_status.top_round())
41            - self.store_status.top_round();
42        match rounds_behind {
43            (0..=2) => "healthy".to_string(),
44            (3..) => format!("behind by {rounds_behind} rounds"),
45        }
46    }
47}
48
49impl<H: Hasher> Display for Status<H> {
50    fn fmt(&self, f: &mut Formatter) -> FmtResult {
51        write!(f, "{}", self.short_report())?;
52        write!(f, ";reconstructed DAG: {}", self.store_status)?;
53        write!(f, ";additional information: {}", self.dag_status)?;
54        write!(f, ";task manager: {}", self.task_manager_status)?;
55        Ok(())
56    }
57}
58
59type AddressedDisseminationMessage<H, D, MK> = Addressed<DisseminationMessage<H, D, MK>>;
60
61/// The result of some operation within the consensus, requiring either other components should get
62/// informed about it, or messages should be sent to the network.
63pub struct ConsensusResult<H: Hasher, D: Data, MK: MultiKeychain> {
64    /// Units that should be sent for backup saving.
65    pub units: Vec<DagUnit<H, D, MK>>,
66    /// Alerts that should be sent to the alerting component.
67    pub alerts: Vec<Alert<H, D, MK::Signature>>,
68    /// Messages that should be sent to other committee members.
69    pub messages: Vec<AddressedDisseminationMessage<H, D, MK::Signature>>,
70}
71
72impl<H: Hasher, D: Data, MK: MultiKeychain> ConsensusResult<H, D, MK> {
73    fn noop() -> Self {
74        ConsensusResult {
75            units: Vec::new(),
76            alerts: Vec::new(),
77            messages: Vec::new(),
78        }
79    }
80}
81
82impl<UFH, MK> Consensus<UFH, MK>
83where
84    UFH: UnitFinalizationHandler,
85    MK: MultiKeychain,
86{
87    /// Create a new Consensus.
88    pub fn new(
89        keychain: MK,
90        validator: Validator<MK>,
91        finalization_handler: UFH,
92        delay_config: DelayConfig,
93    ) -> Self {
94        let n_members = keychain.node_count();
95        let index = keychain.index();
96        Consensus {
97            store: UnitStore::new(n_members),
98            dag: Dag::new(validator),
99            responder: Responder::new(keychain),
100            ordering: Ordering::new(finalization_handler),
101            task_manager: TaskManager::new(index, n_members, delay_config),
102        }
103    }
104
105    fn handle_dag_result(
106        &mut self,
107        result: DagResult<UFH::Hasher, UFH::Data, MK>,
108    ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
109        let DagResult {
110            units,
111            alerts,
112            requests,
113        } = result;
114        for request in requests {
115            self.task_manager.add_request(request);
116        }
117        let messages = self.trigger_tasks();
118        ConsensusResult {
119            units,
120            alerts,
121            messages,
122        }
123    }
124
125    /// Process a unit received (usually) from the network.
126    pub fn process_incoming_unit(
127        &mut self,
128        unit: UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>,
129    ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
130        let result = self.dag.add_unit(unit, &self.store);
131        self.handle_dag_result(result)
132    }
133
134    /// Process a request received from the network.
135    pub fn process_request(
136        &mut self,
137        request: ReconstructionRequest<UFH::Hasher>,
138        node_id: NodeIndex,
139    ) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
140        match self.responder.handle_request(request, &self.store) {
141            Ok(response) => Some(Addressed::addressed_to(response.into(), node_id)),
142            Err(err) => {
143                debug!(target: LOG_TARGET, "Not answering request from node {:?}: {}.", node_id, err);
144                None
145            }
146        }
147    }
148
149    /// Process a parents response.
150    pub fn process_parents(
151        &mut self,
152        u_hash: <UFH::Hasher as Hasher>::Hash,
153        parents: Vec<UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>>,
154    ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
155        if self.store.unit(&u_hash).is_some() {
156            trace!(target: LOG_TARGET, "We got parents response but already imported the unit.");
157            return ConsensusResult::noop();
158        }
159        let result = self.dag.add_parents(u_hash, parents, &self.store);
160        self.handle_dag_result(result)
161    }
162
163    /// Process a newest unit request.
164    pub fn process_newest_unit_request(
165        &mut self,
166        salt: Salt,
167        node_id: NodeIndex,
168    ) -> AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature> {
169        Addressed::addressed_to(
170            self.responder
171                .handle_newest_unit_request(node_id, salt, &self.store)
172                .into(),
173            node_id,
174        )
175    }
176
177    /// Process a forking notification.
178    pub fn process_forking_notification(
179        &mut self,
180        notification: ForkingNotification<UFH::Hasher, UFH::Data, MK::Signature>,
181    ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
182        let result = self
183            .dag
184            .process_forking_notification(notification, &self.store);
185        self.handle_dag_result(result)
186    }
187
188    /// What to do once a unit has been securely backed up on disk.
189    pub fn on_unit_backup_saved(
190        &mut self,
191        unit: DagUnit<UFH::Hasher, UFH::Data, MK>,
192    ) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
193        let unit_hash = unit.hash();
194        self.store.insert(unit.clone());
195        self.dag.finished_processing(&unit_hash);
196        self.ordering.add_unit(unit.clone());
197        self.task_manager.add_unit(&unit)
198    }
199
200    /// When should `trigger_tasks` be called next.
201    pub fn next_tick(&self) -> Duration {
202        self.task_manager.next_tick()
203    }
204
205    /// Trigger all the ready tasks and get all the messages that should be sent now.
206    pub fn trigger_tasks(
207        &mut self,
208    ) -> Vec<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
209        self.task_manager
210            .trigger_tasks(&self.store, self.dag.processing_units())
211    }
212
213    /// The status of the consensus handler, for logging purposes.
214    pub fn status(&self) -> Status<UFH::Hasher> {
215        Status {
216            dag_status: self.dag.status(),
217            store_status: self.store.status(),
218            task_manager_status: self.task_manager.status(),
219        }
220    }
221}