aleph_bft/consensus/
mod.rs

1use crate::{
2    alerts::{Handler as AlertHandler, Service as AlertService, IO as AlertIO},
3    backup::{BackupLoader, BackupSaver},
4    collection::initial_unit_collection,
5    consensus::{
6        handler::Consensus,
7        service::{Service, IO as ConsensusIO},
8    },
9    creation, handle_task_termination,
10    interface::LocalIO,
11    network::{Hub as NetworkHub, NetworkData},
12    units::Validator,
13    Config, DataProvider, MultiKeychain, Network, SpawnHandle, Terminator, UnitFinalizationHandler,
14};
15use futures::{
16    channel::{mpsc, oneshot},
17    future::pending,
18    pin_mut, AsyncRead, AsyncWrite, FutureExt,
19};
20use log::{debug, error, info};
21
22mod handler;
23mod service;
24
25const LOG_TARGET: &str = "AlephBFT-consensus";
26
27/// Starts the consensus algorithm as an async task. It stops establishing consensus for new data items after
28/// reaching the threshold specified in [`Config::max_round`] or upon receiving a stop signal from `exit`.
29/// For a detailed description of the consensus implemented by `run_session` see
30/// [docs for devs](https://cardinal-cryptography.github.io/AlephBFT/index.html)
31/// or the [original paper](https://arxiv.org/abs/1908.05156).
32pub async fn run_session<
33    DP: DataProvider,
34    UFH: UnitFinalizationHandler<Data = DP::Output>,
35    US: AsyncWrite + Send + Sync + 'static,
36    UL: AsyncRead + Send + Sync + 'static,
37    N: Network<NetworkData<UFH::Hasher, DP::Output, MK::Signature, MK::PartialMultisignature>>,
38    SH: SpawnHandle,
39    MK: MultiKeychain,
40>(
41    config: Config,
42    local_io: LocalIO<DP, UFH, US, UL>,
43    network: N,
44    keychain: MK,
45    spawn_handle: SH,
46    mut terminator: Terminator,
47) {
48    info!(target: LOG_TARGET, "Starting a new session.");
49    let index = keychain.index();
50    let session_id = config.session_id();
51    let (data_provider, finalization_handler, unit_saver, unit_loader) = local_io.into_components();
52
53    info!(target: LOG_TARGET, "Loading units from backup.");
54    let (loaded_units, loaded_starting_round) =
55        match BackupLoader::new(unit_loader, index, session_id)
56            .load_backup()
57            .await
58        {
59            Ok(result) => result,
60            Err(e) => {
61                error!(target: LOG_TARGET, "Error loading units from backup: {}.", e);
62                return;
63            }
64        };
65    info!(
66        target: LOG_TARGET,
67        "Loaded {:?} units from backup. Able to continue from round: {:?}.",
68        loaded_units.len(),
69        loaded_starting_round,
70    );
71
72    let (alert_messages_for_alerter, alert_messages_from_network) = mpsc::unbounded();
73    let (alert_messages_for_network, alert_messages_from_alerter) = mpsc::unbounded();
74    let (unit_messages_for_service, unit_messages_from_network) = mpsc::unbounded();
75    let (unit_messages_for_network, unit_messages_from_service) = mpsc::unbounded();
76
77    debug!(target: LOG_TARGET, "Spawning network.");
78    let network_terminator = terminator.add_offspring_connection("network");
79    let network_handle = spawn_handle
80        .spawn_essential("consensus/network", async move {
81            NetworkHub::new(
82                network,
83                unit_messages_from_service,
84                unit_messages_for_service,
85                alert_messages_from_alerter,
86                alert_messages_for_alerter,
87            )
88            .run(network_terminator)
89            .await
90        })
91        .fuse();
92    pin_mut!(network_handle);
93    debug!(target: LOG_TARGET, "Network spawned.");
94
95    let (new_units_for_service, new_units_from_creator) = mpsc::unbounded();
96    let (parents_for_creator, parents_from_service) = mpsc::unbounded();
97    let (starting_round_for_creator, starting_round_from_collection) = oneshot::channel();
98
99    debug!(target: LOG_TARGET, "Spawning creator.");
100    let creator_terminator = terminator.add_offspring_connection("creator");
101    let creator_config = config.clone();
102    let creator_keychain = keychain.clone();
103    let creator_handle = spawn_handle
104        .spawn_essential("consensus/creator", async move {
105            creation::run(
106                creator_config,
107                creation::IO {
108                    outgoing_units: new_units_for_service,
109                    incoming_parents: parents_from_service,
110                    data_provider,
111                },
112                creator_keychain,
113                starting_round_from_collection,
114                creator_terminator,
115            )
116            .await
117        })
118        .shared();
119    let creator_handle_for_panic = creator_handle.clone();
120    let creator_panic_handle = async move {
121        if creator_handle_for_panic.await.is_err() {
122            return;
123        }
124        pending().await
125    }
126    .fuse();
127    pin_mut!(creator_panic_handle);
128    let creator_handle = creator_handle.fuse();
129    debug!(target: LOG_TARGET, "Creator spawned.");
130
131    let (backup_units_for_saver, backup_units_from_service) = mpsc::unbounded();
132    let (backup_units_for_service, backup_units_from_saver) = mpsc::unbounded();
133
134    debug!(target: LOG_TARGET, "Spawning backup saver.");
135    let backup_saver_terminator = terminator.add_offspring_connection("backup-saver");
136    let backup_saver_handle = spawn_handle.spawn_essential("consensus/backup_saver", {
137        let mut backup_saver = BackupSaver::new(
138            backup_units_from_service,
139            backup_units_for_service,
140            unit_saver,
141        );
142        async move {
143            backup_saver.run(backup_saver_terminator).await;
144        }
145    });
146    let mut backup_saver_handle = backup_saver_handle.fuse();
147    debug!(target: LOG_TARGET, "Backup saver spawned.");
148
149    let (alert_notifications_for_units, notifications_from_alerter) = mpsc::unbounded();
150    let (alerts_for_alerter, alerts_from_units) = mpsc::unbounded();
151
152    debug!(target: LOG_TARGET, "Spawning alerter.");
153    let alerter_terminator = terminator.add_offspring_connection("alerter");
154    let alerter_keychain = keychain.clone();
155    let alerter_handler = AlertHandler::new(alerter_keychain.clone(), session_id);
156    let mut alerter_service = AlertService::new(
157        alerter_keychain,
158        AlertIO {
159            messages_for_network: alert_messages_for_network,
160            messages_from_network: alert_messages_from_network,
161            notifications_for_units: alert_notifications_for_units,
162            alerts_from_units,
163        },
164        alerter_handler,
165    );
166    let mut alerter_handle = spawn_handle
167        .spawn_essential("consensus/alerter", async move {
168            alerter_service.run(alerter_terminator).await;
169        })
170        .fuse();
171    debug!(target: LOG_TARGET, "Alerter spawned.");
172
173    let validator = Validator::new(session_id, keychain.clone(), config.max_round());
174    let (responses_for_collection, responses_from_service) = mpsc::unbounded();
175
176    debug!(target: LOG_TARGET, "Spawning initial unit collection.");
177    let starting_round_handle = match initial_unit_collection(
178        &keychain,
179        &validator,
180        unit_messages_for_network.clone(),
181        starting_round_for_creator,
182        loaded_starting_round,
183        responses_from_service,
184        config.delay_config().newest_request_delay.clone(),
185    ) {
186        Ok(handle) => handle.fuse(),
187        Err(_) => return,
188    };
189    pin_mut!(starting_round_handle);
190    debug!(target: LOG_TARGET, "Initial unit collection spawned.");
191
192    debug!(target: LOG_TARGET, "Spawning consensus service.");
193    let consensus = Consensus::new(
194        keychain.clone(),
195        validator.clone(),
196        finalization_handler,
197        config.delay_config().clone(),
198    );
199    let service_handle = spawn_handle
200        .spawn_essential("consensus/service", {
201            let consensus_io = ConsensusIO {
202                backup_units_for_saver,
203                backup_units_from_saver,
204                alerts_for_alerter,
205                notifications_from_alerter,
206                unit_messages_for_network,
207                unit_messages_from_network,
208                responses_for_collection,
209                parents_for_creator,
210                new_units_from_creator,
211            };
212            let service_terminator = terminator.add_offspring_connection("service");
213            let service = Service::new(consensus, consensus_io);
214
215            async move { service.run(loaded_units, service_terminator).await }
216        })
217        .fuse();
218    pin_mut!(service_handle);
219    debug!(target: LOG_TARGET, "Consensus service spawned.");
220
221    loop {
222        futures::select! {
223            _ = starting_round_handle => {
224                debug!(target: LOG_TARGET, "Starting round task terminated.");
225            },
226            _ = network_handle => {
227                error!(target: LOG_TARGET, "Network hub terminated early.");
228                break;
229            },
230            _ = creator_panic_handle => {
231                error!(target: LOG_TARGET, "Creator task terminated early.");
232                break;
233            },
234            _ = backup_saver_handle => {
235                error!(target: LOG_TARGET, "Backup saving task terminated early.");
236                break;
237            },
238            _ = alerter_handle => {
239                error!(target: LOG_TARGET, "Alerter task terminated early.");
240                break;
241            },
242            _ = service_handle => {
243                error!(target: LOG_TARGET, "Consensus service terminated early.");
244                break;
245            },
246            _ = terminator.get_exit().fuse() => {
247                debug!(target: LOG_TARGET, "Exit channel was called.");
248                break;
249            },
250        }
251    }
252
253    debug!(target: LOG_TARGET, "Run ending.");
254
255    terminator.terminate_sync().await;
256
257    handle_task_termination(network_handle, LOG_TARGET, "Network").await;
258    handle_task_termination(creator_handle, LOG_TARGET, "Creator").await;
259    handle_task_termination(backup_saver_handle, LOG_TARGET, "Backup saver").await;
260    handle_task_termination(alerter_handle, LOG_TARGET, "Alerter").await;
261    handle_task_termination(service_handle, LOG_TARGET, "Consensus service").await;
262
263    info!(target: LOG_TARGET, "Session ended.");
264}