1use crate::{
2 alerts::{Alert, ForkingNotification},
3 collection::Salt,
4 consensus::LOG_TARGET,
5 dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest},
6 dissemination::{Addressed, DisseminationMessage, Responder, TaskManager, TaskManagerStatus},
7 extension::Ordering,
8 units::{UncheckedSignedUnit, Unit, UnitStore, UnitStoreStatus, Validator},
9 Data, DelayConfig, Hasher, MultiKeychain, NodeIndex, UnitFinalizationHandler,
10};
11use log::{debug, trace};
12use std::{
13 cmp::max,
14 fmt::{Display, Formatter, Result as FmtResult},
15 time::Duration,
16};
17
18pub struct Consensus<UFH, MK>
20where
21 UFH: UnitFinalizationHandler,
22 MK: MultiKeychain,
23{
24 store: UnitStore<DagUnit<UFH::Hasher, UFH::Data, MK>>,
25 dag: Dag<UFH::Hasher, UFH::Data, MK>,
26 responder: Responder<UFH::Hasher, UFH::Data, MK>,
27 ordering: Ordering<MK, UFH>,
28 task_manager: TaskManager<UFH::Hasher>,
29}
30
31pub struct Status<H: Hasher> {
33 task_manager_status: TaskManagerStatus<H>,
34 dag_status: DagStatus,
35 store_status: UnitStoreStatus,
36}
37
38impl<H: Hasher> Status<H> {
39 fn short_report(&self) -> String {
40 let rounds_behind = max(self.dag_status.top_round(), self.store_status.top_round())
41 - self.store_status.top_round();
42 match rounds_behind {
43 (0..=2) => "healthy".to_string(),
44 (3..) => format!("behind by {rounds_behind} rounds"),
45 }
46 }
47}
48
49impl<H: Hasher> Display for Status<H> {
50 fn fmt(&self, f: &mut Formatter) -> FmtResult {
51 write!(f, "{}", self.short_report())?;
52 write!(f, ";reconstructed DAG: {}", self.store_status)?;
53 write!(f, ";additional information: {}", self.dag_status)?;
54 write!(f, ";task manager: {}", self.task_manager_status)?;
55 Ok(())
56 }
57}
58
59type AddressedDisseminationMessage<H, D, MK> = Addressed<DisseminationMessage<H, D, MK>>;
60
61pub struct ConsensusResult<H: Hasher, D: Data, MK: MultiKeychain> {
64 pub units: Vec<DagUnit<H, D, MK>>,
66 pub alerts: Vec<Alert<H, D, MK::Signature>>,
68 pub messages: Vec<AddressedDisseminationMessage<H, D, MK::Signature>>,
70}
71
72impl<H: Hasher, D: Data, MK: MultiKeychain> ConsensusResult<H, D, MK> {
73 fn noop() -> Self {
74 ConsensusResult {
75 units: Vec::new(),
76 alerts: Vec::new(),
77 messages: Vec::new(),
78 }
79 }
80}
81
82impl<UFH, MK> Consensus<UFH, MK>
83where
84 UFH: UnitFinalizationHandler,
85 MK: MultiKeychain,
86{
87 pub fn new(
89 keychain: MK,
90 validator: Validator<MK>,
91 finalization_handler: UFH,
92 delay_config: DelayConfig,
93 ) -> Self {
94 let n_members = keychain.node_count();
95 let index = keychain.index();
96 Consensus {
97 store: UnitStore::new(n_members),
98 dag: Dag::new(validator),
99 responder: Responder::new(keychain),
100 ordering: Ordering::new(finalization_handler),
101 task_manager: TaskManager::new(index, n_members, delay_config),
102 }
103 }
104
105 fn handle_dag_result(
106 &mut self,
107 result: DagResult<UFH::Hasher, UFH::Data, MK>,
108 ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
109 let DagResult {
110 units,
111 alerts,
112 requests,
113 } = result;
114 for request in requests {
115 self.task_manager.add_request(request);
116 }
117 let messages = self.trigger_tasks();
118 ConsensusResult {
119 units,
120 alerts,
121 messages,
122 }
123 }
124
125 pub fn process_incoming_unit(
127 &mut self,
128 unit: UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>,
129 ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
130 let result = self.dag.add_unit(unit, &self.store);
131 self.handle_dag_result(result)
132 }
133
134 pub fn process_request(
136 &mut self,
137 request: ReconstructionRequest<UFH::Hasher>,
138 node_id: NodeIndex,
139 ) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
140 match self.responder.handle_request(request, &self.store) {
141 Ok(response) => Some(Addressed::addressed_to(response.into(), node_id)),
142 Err(err) => {
143 debug!(target: LOG_TARGET, "Not answering request from node {:?}: {}.", node_id, err);
144 None
145 }
146 }
147 }
148
149 pub fn process_parents(
151 &mut self,
152 u_hash: <UFH::Hasher as Hasher>::Hash,
153 parents: Vec<UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>>,
154 ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
155 if self.store.unit(&u_hash).is_some() {
156 trace!(target: LOG_TARGET, "We got parents response but already imported the unit.");
157 return ConsensusResult::noop();
158 }
159 let result = self.dag.add_parents(u_hash, parents, &self.store);
160 self.handle_dag_result(result)
161 }
162
163 pub fn process_newest_unit_request(
165 &mut self,
166 salt: Salt,
167 node_id: NodeIndex,
168 ) -> AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature> {
169 Addressed::addressed_to(
170 self.responder
171 .handle_newest_unit_request(node_id, salt, &self.store)
172 .into(),
173 node_id,
174 )
175 }
176
177 pub fn process_forking_notification(
179 &mut self,
180 notification: ForkingNotification<UFH::Hasher, UFH::Data, MK::Signature>,
181 ) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
182 let result = self
183 .dag
184 .process_forking_notification(notification, &self.store);
185 self.handle_dag_result(result)
186 }
187
188 pub fn on_unit_backup_saved(
190 &mut self,
191 unit: DagUnit<UFH::Hasher, UFH::Data, MK>,
192 ) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
193 let unit_hash = unit.hash();
194 self.store.insert(unit.clone());
195 self.dag.finished_processing(&unit_hash);
196 self.ordering.add_unit(unit.clone());
197 self.task_manager.add_unit(&unit)
198 }
199
200 pub fn next_tick(&self) -> Duration {
202 self.task_manager.next_tick()
203 }
204
205 pub fn trigger_tasks(
207 &mut self,
208 ) -> Vec<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
209 self.task_manager
210 .trigger_tasks(&self.store, self.dag.processing_units())
211 }
212
213 pub fn status(&self) -> Status<UFH::Hasher> {
215 Status {
216 dag_status: self.dag.status(),
217 store_status: self.store.status(),
218 task_manager_status: self.task_manager.status(),
219 }
220 }
221}