1use crate::{
2 alerts::{Handler as AlertHandler, Service as AlertService, IO as AlertIO},
3 backup::{BackupLoader, BackupSaver},
4 collection::initial_unit_collection,
5 consensus::{
6 handler::Consensus,
7 service::{Service, IO as ConsensusIO},
8 },
9 creation, handle_task_termination,
10 interface::LocalIO,
11 network::{Hub as NetworkHub, NetworkData},
12 units::Validator,
13 Config, DataProvider, MultiKeychain, Network, SpawnHandle, Terminator, UnitFinalizationHandler,
14};
15use futures::{
16 channel::{mpsc, oneshot},
17 future::pending,
18 pin_mut, AsyncRead, AsyncWrite, FutureExt,
19};
20use log::{debug, error, info};
21
22mod handler;
23mod service;
24
25const LOG_TARGET: &str = "AlephBFT-consensus";
26
27pub async fn run_session<
33 DP: DataProvider,
34 UFH: UnitFinalizationHandler<Data = DP::Output>,
35 US: AsyncWrite + Send + Sync + 'static,
36 UL: AsyncRead + Send + Sync + 'static,
37 N: Network<NetworkData<UFH::Hasher, DP::Output, MK::Signature, MK::PartialMultisignature>>,
38 SH: SpawnHandle,
39 MK: MultiKeychain,
40>(
41 config: Config,
42 local_io: LocalIO<DP, UFH, US, UL>,
43 network: N,
44 keychain: MK,
45 spawn_handle: SH,
46 mut terminator: Terminator,
47) {
48 info!(target: LOG_TARGET, "Starting a new session.");
49 let index = keychain.index();
50 let session_id = config.session_id();
51 let (data_provider, finalization_handler, unit_saver, unit_loader) = local_io.into_components();
52
53 info!(target: LOG_TARGET, "Loading units from backup.");
54 let (loaded_units, loaded_starting_round) =
55 match BackupLoader::new(unit_loader, index, session_id)
56 .load_backup()
57 .await
58 {
59 Ok(result) => result,
60 Err(e) => {
61 error!(target: LOG_TARGET, "Error loading units from backup: {}.", e);
62 return;
63 }
64 };
65 info!(
66 target: LOG_TARGET,
67 "Loaded {:?} units from backup. Able to continue from round: {:?}.",
68 loaded_units.len(),
69 loaded_starting_round,
70 );
71
72 let (alert_messages_for_alerter, alert_messages_from_network) = mpsc::unbounded();
73 let (alert_messages_for_network, alert_messages_from_alerter) = mpsc::unbounded();
74 let (unit_messages_for_service, unit_messages_from_network) = mpsc::unbounded();
75 let (unit_messages_for_network, unit_messages_from_service) = mpsc::unbounded();
76
77 debug!(target: LOG_TARGET, "Spawning network.");
78 let network_terminator = terminator.add_offspring_connection("network");
79 let network_handle = spawn_handle
80 .spawn_essential("consensus/network", async move {
81 NetworkHub::new(
82 network,
83 unit_messages_from_service,
84 unit_messages_for_service,
85 alert_messages_from_alerter,
86 alert_messages_for_alerter,
87 )
88 .run(network_terminator)
89 .await
90 })
91 .fuse();
92 pin_mut!(network_handle);
93 debug!(target: LOG_TARGET, "Network spawned.");
94
95 let (new_units_for_service, new_units_from_creator) = mpsc::unbounded();
96 let (parents_for_creator, parents_from_service) = mpsc::unbounded();
97 let (starting_round_for_creator, starting_round_from_collection) = oneshot::channel();
98
99 debug!(target: LOG_TARGET, "Spawning creator.");
100 let creator_terminator = terminator.add_offspring_connection("creator");
101 let creator_config = config.clone();
102 let creator_keychain = keychain.clone();
103 let creator_handle = spawn_handle
104 .spawn_essential("consensus/creator", async move {
105 creation::run(
106 creator_config,
107 creation::IO {
108 outgoing_units: new_units_for_service,
109 incoming_parents: parents_from_service,
110 data_provider,
111 },
112 creator_keychain,
113 starting_round_from_collection,
114 creator_terminator,
115 )
116 .await
117 })
118 .shared();
119 let creator_handle_for_panic = creator_handle.clone();
120 let creator_panic_handle = async move {
121 if creator_handle_for_panic.await.is_err() {
122 return;
123 }
124 pending().await
125 }
126 .fuse();
127 pin_mut!(creator_panic_handle);
128 let creator_handle = creator_handle.fuse();
129 debug!(target: LOG_TARGET, "Creator spawned.");
130
131 let (backup_units_for_saver, backup_units_from_service) = mpsc::unbounded();
132 let (backup_units_for_service, backup_units_from_saver) = mpsc::unbounded();
133
134 debug!(target: LOG_TARGET, "Spawning backup saver.");
135 let backup_saver_terminator = terminator.add_offspring_connection("backup-saver");
136 let backup_saver_handle = spawn_handle.spawn_essential("consensus/backup_saver", {
137 let mut backup_saver = BackupSaver::new(
138 backup_units_from_service,
139 backup_units_for_service,
140 unit_saver,
141 );
142 async move {
143 backup_saver.run(backup_saver_terminator).await;
144 }
145 });
146 let mut backup_saver_handle = backup_saver_handle.fuse();
147 debug!(target: LOG_TARGET, "Backup saver spawned.");
148
149 let (alert_notifications_for_units, notifications_from_alerter) = mpsc::unbounded();
150 let (alerts_for_alerter, alerts_from_units) = mpsc::unbounded();
151
152 debug!(target: LOG_TARGET, "Spawning alerter.");
153 let alerter_terminator = terminator.add_offspring_connection("alerter");
154 let alerter_keychain = keychain.clone();
155 let alerter_handler = AlertHandler::new(alerter_keychain.clone(), session_id);
156 let mut alerter_service = AlertService::new(
157 alerter_keychain,
158 AlertIO {
159 messages_for_network: alert_messages_for_network,
160 messages_from_network: alert_messages_from_network,
161 notifications_for_units: alert_notifications_for_units,
162 alerts_from_units,
163 },
164 alerter_handler,
165 );
166 let mut alerter_handle = spawn_handle
167 .spawn_essential("consensus/alerter", async move {
168 alerter_service.run(alerter_terminator).await;
169 })
170 .fuse();
171 debug!(target: LOG_TARGET, "Alerter spawned.");
172
173 let validator = Validator::new(session_id, keychain.clone(), config.max_round());
174 let (responses_for_collection, responses_from_service) = mpsc::unbounded();
175
176 debug!(target: LOG_TARGET, "Spawning initial unit collection.");
177 let starting_round_handle = match initial_unit_collection(
178 &keychain,
179 &validator,
180 unit_messages_for_network.clone(),
181 starting_round_for_creator,
182 loaded_starting_round,
183 responses_from_service,
184 config.delay_config().newest_request_delay.clone(),
185 ) {
186 Ok(handle) => handle.fuse(),
187 Err(_) => return,
188 };
189 pin_mut!(starting_round_handle);
190 debug!(target: LOG_TARGET, "Initial unit collection spawned.");
191
192 debug!(target: LOG_TARGET, "Spawning consensus service.");
193 let consensus = Consensus::new(
194 keychain.clone(),
195 validator.clone(),
196 finalization_handler,
197 config.delay_config().clone(),
198 );
199 let service_handle = spawn_handle
200 .spawn_essential("consensus/service", {
201 let consensus_io = ConsensusIO {
202 backup_units_for_saver,
203 backup_units_from_saver,
204 alerts_for_alerter,
205 notifications_from_alerter,
206 unit_messages_for_network,
207 unit_messages_from_network,
208 responses_for_collection,
209 parents_for_creator,
210 new_units_from_creator,
211 };
212 let service_terminator = terminator.add_offspring_connection("service");
213 let service = Service::new(consensus, consensus_io);
214
215 async move { service.run(loaded_units, service_terminator).await }
216 })
217 .fuse();
218 pin_mut!(service_handle);
219 debug!(target: LOG_TARGET, "Consensus service spawned.");
220
221 loop {
222 futures::select! {
223 _ = starting_round_handle => {
224 debug!(target: LOG_TARGET, "Starting round task terminated.");
225 },
226 _ = network_handle => {
227 error!(target: LOG_TARGET, "Network hub terminated early.");
228 break;
229 },
230 _ = creator_panic_handle => {
231 error!(target: LOG_TARGET, "Creator task terminated early.");
232 break;
233 },
234 _ = backup_saver_handle => {
235 error!(target: LOG_TARGET, "Backup saving task terminated early.");
236 break;
237 },
238 _ = alerter_handle => {
239 error!(target: LOG_TARGET, "Alerter task terminated early.");
240 break;
241 },
242 _ = service_handle => {
243 error!(target: LOG_TARGET, "Consensus service terminated early.");
244 break;
245 },
246 _ = terminator.get_exit().fuse() => {
247 debug!(target: LOG_TARGET, "Exit channel was called.");
248 break;
249 },
250 }
251 }
252
253 debug!(target: LOG_TARGET, "Run ending.");
254
255 terminator.terminate_sync().await;
256
257 handle_task_termination(network_handle, LOG_TARGET, "Network").await;
258 handle_task_termination(creator_handle, LOG_TARGET, "Creator").await;
259 handle_task_termination(backup_saver_handle, LOG_TARGET, "Backup saver").await;
260 handle_task_termination(alerter_handle, LOG_TARGET, "Alerter").await;
261 handle_task_termination(service_handle, LOG_TARGET, "Consensus service").await;
262
263 info!(target: LOG_TARGET, "Session ended.");
264}