fedimint_aleph_bft/
member.rs

1use crate::{
2    handle_task_termination,
3    member::Task::{CoordRequest, ParentsRequest, RequestNewest, UnitBroadcast},
4    network,
5    runway::{
6        self, NetworkIO, NewestUnitResponse, Request, Response, RunwayIO, RunwayNotificationIn,
7        RunwayNotificationOut,
8    },
9    task_queue::TaskQueue,
10    units::{UncheckedSignedUnit, UnitCoord},
11    BackupReader, BackupWriter, Config, Data, DataProvider, FinalizationHandler, Hasher,
12    MultiKeychain, Network, NodeIndex, Receiver, Recipient, Round, Sender, Signature, SpawnHandle,
13    Terminator, UncheckedSigned,
14};
15use aleph_bft_types::NodeMap;
16use codec::{Decode, Encode};
17use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
18use futures_timer::Delay;
19use itertools::Itertools;
20use log::{debug, error, info, trace, warn};
21use network::NetworkData;
22use rand::{prelude::SliceRandom, Rng};
23use std::{
24    collections::HashSet,
25    convert::TryInto,
26    fmt::{self, Debug},
27    marker::PhantomData,
28    time::Duration,
29};
30
31/// A message concerning units, either about new units or some requests for them.
32#[derive(Clone, Eq, PartialEq, Debug, Decode, Encode)]
33pub(crate) enum UnitMessage<H: Hasher, D: Data, S: Signature> {
34    /// For disseminating newly created units.
35    NewUnit(UncheckedSignedUnit<H, D, S>),
36    /// Request for a unit by its coord.
37    RequestCoord(NodeIndex, UnitCoord),
38    /// Response to a request by coord.
39    ResponseCoord(UncheckedSignedUnit<H, D, S>),
40    /// Request for the full list of parents of a unit.
41    RequestParents(NodeIndex, H::Hash),
42    /// Response to a request for a full list of parents.
43    ResponseParents(H::Hash, Vec<UncheckedSignedUnit<H, D, S>>),
44    /// Request by a node for the newest unit created by them, together with a u64 salt
45    RequestNewest(NodeIndex, u64),
46    /// Response to RequestNewest: (our index, maybe unit, salt) signed by us
47    ResponseNewest(UncheckedSigned<NewestUnitResponse<H, D, S>, S>),
48}
49
50impl<H: Hasher, D: Data, S: Signature> UnitMessage<H, D, S> {
51    pub(crate) fn included_data(&self) -> Vec<D> {
52        match self {
53            Self::NewUnit(uu) => uu.as_signable().included_data(),
54            Self::RequestCoord(_, _) => Vec::new(),
55            Self::ResponseCoord(uu) => uu.as_signable().included_data(),
56            Self::RequestParents(_, _) => Vec::new(),
57            Self::ResponseParents(_, units) => units
58                .iter()
59                .flat_map(|uu| uu.as_signable().included_data())
60                .collect(),
61            UnitMessage::RequestNewest(_, _) => Vec::new(),
62            UnitMessage::ResponseNewest(response) => response.as_signable().included_data(),
63        }
64    }
65}
66
67#[derive(Eq, PartialEq, Debug)]
68enum Task<H: Hasher, D: Data, S: Signature> {
69    // Request the unit with the given (creator, round) coordinates.
70    CoordRequest(UnitCoord),
71    // Request parents of the unit with the given hash and Recipient.
72    ParentsRequest(H::Hash),
73    // Rebroadcast a given unit periodically (cancelled after a more recent unit by the same creator is received)
74    UnitBroadcast(UncheckedSignedUnit<H, D, S>),
75    // Request the newest unit created by node itself.
76    RequestNewest(u64),
77}
78
79#[derive(Eq, PartialEq, Debug)]
80struct RepeatableTask<H: Hasher, D: Data, S: Signature> {
81    task: Task<H, D, S>,
82    counter: usize,
83}
84
85impl<H: Hasher, D: Data, S: Signature> fmt::Display for RepeatableTask<H, D, S> {
86    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
87        write!(
88            f,
89            "RepeatableTask({:?}, counter {})",
90            self.task, self.counter
91        )
92    }
93}
94
95impl<H: Hasher, D: Data, S: Signature> RepeatableTask<H, D, S> {
96    fn new(task: Task<H, D, S>) -> Self {
97        Self { task, counter: 0 }
98    }
99}
100
101enum TaskDetails<H: Hasher, D: Data, S: Signature> {
102    Cancel,
103    Perform {
104        message: UnitMessage<H, D, S>,
105        recipients: Vec<Recipient>,
106        reschedule: Duration,
107    },
108}
109
110#[derive(Clone)]
111pub struct LocalIO<
112    D: Data,
113    DP: DataProvider<D>,
114    FH: FinalizationHandler<D>,
115    US: BackupWriter,
116    UL: BackupReader,
117> {
118    data_provider: DP,
119    finalization_handler: FH,
120    unit_saver: US,
121    unit_loader: UL,
122    _phantom: PhantomData<D>,
123}
124
125impl<
126        D: Data,
127        DP: DataProvider<D>,
128        FH: FinalizationHandler<D>,
129        US: BackupWriter,
130        UL: BackupReader,
131    > LocalIO<D, DP, FH, US, UL>
132{
133    pub fn new(
134        data_provider: DP,
135        finalization_handler: FH,
136        unit_saver: US,
137        unit_loader: UL,
138    ) -> LocalIO<D, DP, FH, US, UL> {
139        LocalIO {
140            data_provider,
141            finalization_handler,
142            unit_saver,
143            unit_loader,
144            _phantom: PhantomData,
145        }
146    }
147}
148
149struct MemberStatus<'a, H: Hasher, D: Data, S: Signature> {
150    task_queue: &'a TaskQueue<RepeatableTask<H, D, S>>,
151    not_resolved_parents: &'a HashSet<H::Hash>,
152    not_resolved_coords: &'a HashSet<UnitCoord>,
153}
154
155impl<'a, H: Hasher, D: Data, S: Signature> MemberStatus<'a, H, D, S> {
156    fn new(
157        task_queue: &'a TaskQueue<RepeatableTask<H, D, S>>,
158        not_resolved_parents: &'a HashSet<H::Hash>,
159        not_resolved_coords: &'a HashSet<UnitCoord>,
160    ) -> Self {
161        Self {
162            task_queue,
163            not_resolved_parents,
164            not_resolved_coords,
165        }
166    }
167}
168
169impl<'a, H: Hasher, D: Data, S: Signature> fmt::Display for MemberStatus<'a, H, D, S>
170where
171    H: Hasher,
172{
173    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
174        let mut count_coord_request: usize = 0;
175        let mut count_parents_request: usize = 0;
176        let mut count_request_newest: usize = 0;
177        let mut count_rebroadcast: usize = 0;
178        for task in self.task_queue.iter().map(|st| &st.task) {
179            match task {
180                CoordRequest(_) => count_coord_request += 1,
181                ParentsRequest(_) => count_parents_request += 1,
182                RequestNewest(_) => count_request_newest += 1,
183                UnitBroadcast(_) => count_rebroadcast += 1,
184            }
185        }
186        let long_time_pending_tasks: Vec<_> = self
187            .task_queue
188            .iter()
189            .filter(|st| st.counter >= 5)
190            .collect();
191        write!(f, "Member status report: ")?;
192        write!(f, "task queue content: ")?;
193        write!(
194            f,
195            "CoordRequest - {}, ParentsRequest - {}, UnitBroadcast - {}, RequestNewest - {}",
196            count_coord_request, count_parents_request, count_rebroadcast, count_request_newest,
197        )?;
198        if !self.not_resolved_coords.is_empty() {
199            write!(
200                f,
201                "; not_resolved_coords.len() - {}",
202                self.not_resolved_coords.len()
203            )?;
204        }
205        if !self.not_resolved_parents.is_empty() {
206            write!(
207                f,
208                "; not_resolved_parents.len() - {}",
209                self.not_resolved_parents.len()
210            )?;
211        }
212
213        static ITEMS_PRINT_LIMIT: usize = 10;
214
215        if !long_time_pending_tasks.is_empty() {
216            write!(f, "; pending tasks with counter >= 5 -")?;
217            write!(f, " {}", {
218                long_time_pending_tasks
219                    .iter()
220                    .take(ITEMS_PRINT_LIMIT)
221                    .join(", ")
222            })?;
223
224            if let Some(remaining) = long_time_pending_tasks.len().checked_sub(ITEMS_PRINT_LIMIT) {
225                write!(f, " and {remaining} more")?
226            }
227        }
228        write!(f, ".")?;
229        Ok(())
230    }
231}
232
233struct Member<H, D, S>
234where
235    H: Hasher,
236    D: Data,
237    S: Signature,
238{
239    config: Config,
240    task_queue: TaskQueue<RepeatableTask<H, D, S>>,
241    not_resolved_parents: HashSet<H::Hash>,
242    not_resolved_coords: HashSet<UnitCoord>,
243    newest_unit_resolved: bool,
244    peers: Vec<Recipient>,
245    unit_messages_for_network: Sender<(UnitMessage<H, D, S>, Recipient)>,
246    unit_messages_from_network: Receiver<UnitMessage<H, D, S>>,
247    notifications_for_runway: Sender<RunwayNotificationIn<H, D, S>>,
248    notifications_from_runway: Receiver<RunwayNotificationOut<H, D, S>>,
249    resolved_requests: Receiver<Request<H>>,
250    exiting: bool,
251    top_units: NodeMap<Round>,
252}
253
254impl<H, D, S> Member<H, D, S>
255where
256    H: Hasher,
257    D: Data,
258    S: Signature,
259{
260    fn new(
261        config: Config,
262        unit_messages_for_network: Sender<(UnitMessage<H, D, S>, Recipient)>,
263        unit_messages_from_network: Receiver<UnitMessage<H, D, S>>,
264        notifications_for_runway: Sender<RunwayNotificationIn<H, D, S>>,
265        notifications_from_runway: Receiver<RunwayNotificationOut<H, D, S>>,
266        resolved_requests: Receiver<Request<H>>,
267    ) -> Self {
268        let n_members = config.n_members();
269        let peers = (0..n_members.0)
270            .map(NodeIndex)
271            .filter(|x| *x != config.node_ix())
272            .map(Recipient::Node)
273            .collect();
274
275        Self {
276            config,
277            task_queue: TaskQueue::new(),
278            not_resolved_parents: HashSet::new(),
279            not_resolved_coords: HashSet::new(),
280            newest_unit_resolved: false,
281            peers,
282            unit_messages_for_network,
283            unit_messages_from_network,
284            notifications_for_runway,
285            notifications_from_runway,
286            resolved_requests,
287            exiting: false,
288            top_units: NodeMap::with_size(n_members),
289        }
290    }
291
292    fn on_create(&mut self, u: UncheckedSignedUnit<H, D, S>) {
293        self.send_unit_message(UnitMessage::NewUnit(u), Recipient::Everyone);
294    }
295
296    fn on_unit_discovered(&mut self, new_unit: UncheckedSignedUnit<H, D, S>) {
297        let unit_creator = new_unit.as_signable().creator();
298        let unit_round = new_unit.as_signable().round();
299        if self
300            .top_units
301            .get(unit_creator)
302            .map(|round| round < &unit_round)
303            .unwrap_or(true)
304        {
305            self.top_units.insert(unit_creator, unit_round);
306            let task = RepeatableTask::new(UnitBroadcast(new_unit));
307            let delay = self.delay(&task.task, task.counter);
308            self.task_queue.schedule_in(task, delay)
309        }
310    }
311
312    fn on_request_coord(&mut self, coord: UnitCoord) {
313        trace!(target: "AlephBFT-member", "{:?} Dealing with missing coord notification {:?}.", self.index(), coord);
314        if !self.not_resolved_coords.insert(coord) {
315            return;
316        }
317
318        self.task_queue
319            .schedule_now(RepeatableTask::new(CoordRequest(coord)));
320        self.trigger_tasks();
321    }
322
323    fn on_request_parents(&mut self, u_hash: H::Hash) {
324        if !self.not_resolved_parents.insert(u_hash) {
325            return;
326        }
327
328        self.task_queue
329            .schedule_now(RepeatableTask::new(ParentsRequest(u_hash)));
330        self.trigger_tasks();
331    }
332
333    fn on_request_newest(&mut self, salt: u64) {
334        self.task_queue
335            .schedule_now(RepeatableTask::new(RequestNewest(salt)));
336        self.trigger_tasks();
337    }
338
339    fn trigger_tasks(&mut self) {
340        while let Some(mut task) = self.task_queue.pop_due_task() {
341            match self.task_details(&task.task, task.counter) {
342                TaskDetails::Cancel => (),
343                TaskDetails::Perform {
344                    message,
345                    recipients,
346                    reschedule,
347                } => {
348                    for recipient in recipients.into_iter() {
349                        self.send_unit_message(message.clone(), recipient);
350                    }
351
352                    task.counter += 1;
353                    self.task_queue.schedule_in(task, reschedule)
354                }
355            }
356        }
357    }
358
359    fn random_peers(&self, n: usize) -> Vec<Recipient> {
360        self.peers
361            .choose_multiple(&mut rand::thread_rng(), n)
362            .cloned()
363            .collect()
364    }
365
366    fn index(&self) -> NodeIndex {
367        self.config.node_ix()
368    }
369
370    fn send_unit_message(&mut self, message: UnitMessage<H, D, S>, recipient: Recipient) {
371        if self
372            .unit_messages_for_network
373            .unbounded_send((message, recipient))
374            .is_err()
375        {
376            warn!(target: "AlephBFT-member", "{:?} Channel to network should be open", self.index());
377            self.exiting = true;
378        }
379    }
380
381    /// Given a task and the number of times it was performed, returns `Cancel` if the task is no longer active,
382    /// `Delay(Duration)` if the task is active, but cannot be performed right now, and
383    /// `Perform { message, recipient, reschedule }` if the task is to send `message` to `recipient` and it should
384    /// be rescheduled after `reschedule`.
385    fn task_details(&mut self, task: &Task<H, D, S>, counter: usize) -> TaskDetails<H, D, S> {
386        match self.still_valid(task) {
387            false => TaskDetails::Cancel,
388            true => TaskDetails::Perform {
389                message: self.message(task),
390                recipients: self.recipients(task, counter),
391                reschedule: self.delay(task, counter),
392            },
393        }
394    }
395
396    fn message(&self, task: &Task<H, D, S>) -> UnitMessage<H, D, S> {
397        match task {
398            CoordRequest(coord) => UnitMessage::RequestCoord(self.index(), *coord),
399            ParentsRequest(hash) => UnitMessage::RequestParents(self.index(), *hash),
400            UnitBroadcast(unit) => UnitMessage::NewUnit(unit.clone()),
401            RequestNewest(salt) => UnitMessage::RequestNewest(self.index(), *salt),
402        }
403    }
404
405    fn recipients(&self, task: &Task<H, D, S>, counter: usize) -> Vec<Recipient> {
406        match task {
407            CoordRequest(_) => {
408                self.random_peers((self.config.delay_config().coord_request_recipients)(
409                    counter,
410                ))
411            }
412            ParentsRequest(_) => {
413                self.random_peers((self.config.delay_config().parent_request_recipients)(
414                    counter,
415                ))
416            }
417            UnitBroadcast(_) => vec![Recipient::Everyone],
418            RequestNewest(_) => vec![Recipient::Everyone],
419        }
420    }
421
422    fn still_valid(&self, task: &Task<H, D, S>) -> bool {
423        match task {
424            CoordRequest(coord) => self.not_resolved_coords.contains(coord),
425            ParentsRequest(hash) => self.not_resolved_parents.contains(hash),
426            RequestNewest(_) => !self.newest_unit_resolved,
427            UnitBroadcast(unit) => {
428                Some(&unit.as_signable().round())
429                    == self.top_units.get(unit.as_signable().creator())
430            }
431        }
432    }
433
434    /// Most tasks use `requests_interval` (see [crate::DelayConfig]) as their delay.
435    ///
436    /// The first exception is [Task::UnitBroadcast] - this one picks a random delay between
437    /// `unit_rebroadcast_interval_min` and `unit_rebroadcast_interval_max`.
438    ///
439    /// The other exception is [Task::CoordRequest] - this one uses the configurable
440    /// `coord_request_delay` schedule.
441    fn delay(&self, task: &Task<H, D, S>, counter: usize) -> Duration {
442        match task {
443            UnitBroadcast(_) => {
444                let low = self.config.delay_config().unit_rebroadcast_interval_min;
445                let high = self.config.delay_config().unit_rebroadcast_interval_max;
446                let millis = rand::thread_rng().gen_range(low.as_millis()..high.as_millis());
447                Duration::from_millis(millis as u64)
448            }
449            CoordRequest(_) => (self.config.delay_config().coord_request_delay)(counter),
450            ParentsRequest(_) => (self.config.delay_config().parent_request_delay)(counter),
451            RequestNewest(_) => (self.config.delay_config().newest_request_delay)(counter),
452        }
453    }
454
455    fn on_unit_message_from_units(&mut self, message: RunwayNotificationOut<H, D, S>) {
456        match message {
457            RunwayNotificationOut::NewSelfUnit(u) => self.on_create(u),
458            RunwayNotificationOut::NewAnyUnit(u) => self.on_unit_discovered(u),
459            RunwayNotificationOut::Request(request) => match request {
460                Request::Coord(coord) => self.on_request_coord(coord),
461                Request::Parents(u_hash) => self.on_request_parents(u_hash),
462                Request::NewestUnit(salt) => self.on_request_newest(salt),
463            },
464            RunwayNotificationOut::Response(response, recipient) => match response {
465                Response::Coord(u) => {
466                    let message = UnitMessage::ResponseCoord(u);
467                    self.send_unit_message(message, Recipient::Node(recipient))
468                }
469                Response::Parents(u_hash, parents) => {
470                    let message = UnitMessage::ResponseParents(u_hash, parents);
471                    self.send_unit_message(message, Recipient::Node(recipient))
472                }
473                Response::NewestUnit(response) => {
474                    let requester = response.as_signable().requester();
475                    let message = UnitMessage::ResponseNewest(response);
476                    self.send_unit_message(message, Recipient::Node(requester))
477                }
478            },
479        }
480    }
481
482    fn status_report(&self) {
483        let status = MemberStatus::new(
484            &self.task_queue,
485            &self.not_resolved_parents,
486            &self.not_resolved_coords,
487        );
488        info!(target: "AlephBFT-member", "{}", status);
489    }
490
491    async fn run(mut self, mut terminator: Terminator) {
492        let ticker_delay = self.config.delay_config().tick_interval;
493        let mut ticker = Delay::new(ticker_delay).fuse();
494        let status_ticker_delay = Duration::from_secs(10);
495        let mut status_ticker = Delay::new(status_ticker_delay).fuse();
496
497        loop {
498            futures::select! {
499                event = self.notifications_from_runway.next() => match event {
500                    Some(message) => {
501                        self.on_unit_message_from_units(message);
502                    },
503                    None => {
504                        error!(target: "AlephBFT-member", "{:?} Unit message stream from Runway closed.", self.index());
505                        break;
506                    },
507                },
508
509                event = self.resolved_requests.next() => match event {
510                    Some(request) => match request {
511                        Request::Coord(coord) => {
512                            self.not_resolved_coords.remove(&coord);
513                        },
514                        Request::Parents(u_hash) => {
515                            self.not_resolved_parents.remove(&u_hash);
516                        },
517                        Request::NewestUnit(_) => {
518                            self.newest_unit_resolved = true;
519                        }
520                    },
521                    None => {
522                        error!(target: "AlephBFT-member", "{:?} Resolved-requests stream from Runway closed.", self.index());
523                        break;
524                    }
525                },
526
527                event = self.unit_messages_from_network.next() => match event {
528                    Some(message) => match message.try_into() {
529                        Ok(notification) => {
530                            self.send_notification_to_runway(notification)
531                        },
532                        Err(_) => error!(target: "AlephBFT-member", "{:?} Unable to convert a UnitMessage into an instance of RunwayNotificationIn.", self.index()),
533                    },
534                    None => {
535                        error!(target: "AlephBFT-member", "{:?} Unit message stream from network closed.", self.index());
536                        break;
537                    },
538                },
539
540                _ = &mut ticker => {
541                    self.trigger_tasks();
542                    ticker = Delay::new(ticker_delay).fuse();
543                },
544
545                _ = &mut status_ticker => {
546                    self.status_report();
547                    status_ticker = Delay::new(status_ticker_delay).fuse();
548                },
549
550                _ = terminator.get_exit().fuse() => {
551                    debug!(target: "AlephBFT-member", "{:?} received exit signal", self.index());
552                    self.exiting = true;
553                },
554            }
555            if self.exiting {
556                debug!(target: "AlephBFT-member", "{:?} Member decided to exit.", self.index());
557                terminator.terminate_sync().await;
558                break;
559            }
560        }
561
562        debug!(target: "AlephBFT-member", "{:?} Member stopped.", self.index());
563    }
564
565    fn send_notification_to_runway(&mut self, notification: RunwayNotificationIn<H, D, S>) {
566        if self
567            .notifications_for_runway
568            .unbounded_send(notification)
569            .is_err()
570        {
571            warn!(target: "AlephBFT-member", "{:?} Sender to runway with RunwayNotificationIn messages should be open", self.index());
572            self.exiting = true;
573        }
574    }
575}
576
577/// Starts the consensus algorithm as an async task. It stops establishing consensus for new data items after
578/// reaching the threshold specified in [`Config::max_round`] or upon receiving a stop signal from `exit`.
579/// For a detailed description of the consensus implemented by `run_session` see
580/// [docs for devs](https://cardinal-cryptography.github.io/AlephBFT/index.html)
581/// or the [original paper](https://arxiv.org/abs/1908.05156).
582pub async fn run_session<
583    H: Hasher,
584    D: Data,
585    DP: DataProvider<D>,
586    FH: FinalizationHandler<D>,
587    US: BackupWriter + Send + Sync + 'static,
588    UL: BackupReader + Send + Sync + 'static,
589    N: Network<NetworkData<H, D, MK::Signature, MK::PartialMultisignature>> + 'static,
590    SH: SpawnHandle,
591    MK: MultiKeychain,
592>(
593    config: Config,
594    local_io: LocalIO<D, DP, FH, US, UL>,
595    network: N,
596    keychain: MK,
597    spawn_handle: SH,
598    mut terminator: Terminator,
599) {
600    let index = config.node_ix();
601    info!(target: "AlephBFT-member", "{:?} Starting a new session.", index);
602    debug!(target: "AlephBFT-member", "{:?} Spawning party for a session.", index);
603
604    let (alert_messages_for_alerter, alert_messages_from_network) = mpsc::unbounded();
605    let (alert_messages_for_network, alert_messages_from_alerter) = mpsc::unbounded();
606    let (unit_messages_for_units, unit_messages_from_network) = mpsc::unbounded();
607    let (unit_messages_for_network, unit_messages_from_units) = mpsc::unbounded();
608    let (runway_messages_for_runway, runway_messages_from_network) = mpsc::unbounded();
609    let (runway_messages_for_network, runway_messages_from_runway) = mpsc::unbounded();
610    let (resolved_requests_tx, resolved_requests_rx) = mpsc::unbounded();
611
612    debug!(target: "AlephBFT-member", "{:?} Spawning network.", index);
613    let network_terminator = terminator.add_offspring_connection("AlephBFT-network");
614
615    let network_handle = spawn_handle
616        .spawn_essential("member/network", async move {
617            network::run(
618                network,
619                unit_messages_from_units,
620                unit_messages_for_units,
621                alert_messages_from_alerter,
622                alert_messages_for_alerter,
623                network_terminator,
624            )
625            .await
626        })
627        .fuse();
628    pin_mut!(network_handle);
629    debug!(target: "AlephBFT-member", "{:?} Network spawned.", index);
630
631    debug!(target: "AlephBFT-member", "{:?} Initializing Runway.", index);
632    let runway_terminator = terminator.add_offspring_connection("AlephBFT-runway");
633    let network_io = NetworkIO {
634        alert_messages_for_network,
635        alert_messages_from_network,
636        unit_messages_from_network: runway_messages_from_network,
637        unit_messages_for_network: runway_messages_for_network,
638        resolved_requests: resolved_requests_tx,
639    };
640    let runway_io = RunwayIO::new(
641        local_io.data_provider,
642        local_io.finalization_handler,
643        local_io.unit_saver,
644        local_io.unit_loader,
645    );
646    let spawn_copy = spawn_handle.clone();
647    let config_copy = config.clone();
648    let runway_handle = spawn_handle
649        .spawn_essential("member/runway", async move {
650            runway::run(
651                config_copy,
652                runway_io,
653                &keychain,
654                spawn_copy,
655                network_io,
656                runway_terminator,
657            )
658            .await
659        })
660        .fuse();
661    pin_mut!(runway_handle);
662    debug!(target: "AlephBFT-member", "{:?} Runway spawned.", index);
663
664    debug!(target: "AlephBFT-member", "{:?} Initializing Member.", index);
665    let member = Member::new(
666        config,
667        unit_messages_for_network,
668        unit_messages_from_network,
669        runway_messages_for_runway,
670        runway_messages_from_runway,
671        resolved_requests_rx,
672    );
673    let member_terminator = terminator.add_offspring_connection("AlephBFT-member");
674    let member_handle = spawn_handle
675        .spawn_essential("member", async move {
676            member.run(member_terminator).await;
677        })
678        .fuse();
679    pin_mut!(member_handle);
680    debug!(target: "AlephBFT-member", "{:?} Member initialized.", index);
681
682    futures::select! {
683        _ = network_handle => {
684            error!(target: "AlephBFT-member", "{:?} Network-hub terminated early.", index);
685        },
686
687        _ = runway_handle => {
688            error!(target: "AlephBFT-member", "{:?} Runway terminated early.", index);
689        },
690
691        _ = member_handle => {
692            error!(target: "AlephBFT-member", "{:?} Member terminated early.", index);
693        },
694
695        _ = terminator.get_exit().fuse() => {
696            debug!(target: "AlephBFT-member", "{:?} exit channel was called.", index);
697        },
698    }
699
700    debug!(target: "AlephBFT-member", "{:?} Run ending.", index);
701
702    terminator.terminate_sync().await;
703
704    handle_task_termination(network_handle, "AlephBFT-member", "Network", index).await;
705    handle_task_termination(runway_handle, "AlephBFT-member", "Runway", index).await;
706    handle_task_termination(member_handle, "AlephBFT-member", "Member", index).await;
707
708    info!(target: "AlephBFT-member", "{:?} Session ended.", index);
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714    use crate::{
715        testing::{gen_config, gen_delay_config},
716        DelayConfig,
717    };
718    use aleph_bft_mock::{Hasher64, Signature};
719    use aleph_bft_types::NodeCount;
720    use futures::channel::mpsc::unbounded;
721    use itertools::Itertools;
722    use std::sync::Arc;
723
724    fn mock_member(
725        node_ix: NodeIndex,
726        node_count: NodeCount,
727        delay_config: DelayConfig,
728    ) -> Member<Hasher64, u32, Signature> {
729        let config = gen_config(node_ix, node_count, delay_config);
730        let (unit_messages_for_network_sx, _) = unbounded();
731        let (_, unit_messages_from_network_rx) = unbounded();
732        let (notifications_for_runway_sx, _) = unbounded();
733        let (_, notifications_from_runway_rx) = unbounded();
734        let (_, resolved_requests_rx) = unbounded();
735
736        Member::new(
737            config,
738            unit_messages_for_network_sx,
739            unit_messages_from_network_rx,
740            notifications_for_runway_sx,
741            notifications_from_runway_rx,
742            resolved_requests_rx,
743        )
744    }
745
746    #[test]
747    fn delay_for_coord_request() {
748        let mut delay_config = gen_delay_config();
749        delay_config.coord_request_delay = Arc::new(|t| Duration::from_millis(123 + t as u64));
750
751        let member = mock_member(NodeIndex(7), NodeCount(20), delay_config);
752
753        let delay = member.delay(&CoordRequest(UnitCoord::new(1, NodeIndex(3))), 10);
754
755        assert_eq!(delay, Duration::from_millis(133));
756    }
757
758    #[test]
759    fn delay_for_parent_request() {
760        let mut delay_config = gen_delay_config();
761        delay_config.parent_request_delay = Arc::new(|t| Duration::from_millis(123 + t as u64));
762
763        let member = mock_member(NodeIndex(7), NodeCount(20), delay_config);
764
765        let delay = member.delay(&ParentsRequest(Hasher64::hash(&[0x0])), 10);
766
767        assert_eq!(delay, Duration::from_millis(133));
768    }
769
770    #[test]
771    fn delay_for_newest_request() {
772        let mut delay_config = gen_delay_config();
773        delay_config.newest_request_delay = Arc::new(|t| Duration::from_millis(123 + t as u64));
774
775        let member = mock_member(NodeIndex(7), NodeCount(20), delay_config);
776
777        let delay = member.delay(&RequestNewest(12345), 10);
778
779        assert_eq!(delay, Duration::from_millis(133));
780    }
781
782    #[test]
783    fn recipients_for_coord_request() {
784        let node_ix = NodeIndex(7);
785        let mut delay_config = gen_delay_config();
786        delay_config.coord_request_recipients = Arc::new(|t| 10 - t);
787
788        let member = mock_member(node_ix, NodeCount(20), delay_config);
789
790        let request = CoordRequest(UnitCoord::new(1, NodeIndex(3)));
791        let recipients = member.recipients(&request, 3);
792
793        assert_eq!(recipients.len(), 7);
794        assert_eq!(
795            recipients.iter().cloned().unique().collect::<Vec<_>>(),
796            recipients
797        );
798        assert!(!recipients.contains(&Recipient::Node(node_ix)));
799    }
800
801    #[test]
802    fn recipients_for_parent_request() {
803        let node_ix = NodeIndex(7);
804        let mut delay_config = gen_delay_config();
805        delay_config.parent_request_recipients = Arc::new(|t| 10 - t);
806
807        let member = mock_member(node_ix, NodeCount(20), delay_config);
808
809        let request = ParentsRequest(Hasher64::hash(&[0x0]));
810        let recipients = member.recipients(&request, 3);
811
812        assert_eq!(recipients.len(), 7);
813        assert_eq!(
814            recipients.iter().cloned().unique().collect::<Vec<_>>(),
815            recipients
816        );
817        assert!(!recipients.contains(&Recipient::Node(node_ix)));
818    }
819
820    #[test]
821    fn at_most_n_members_recipients_for_coord_request() {
822        let mut delay_config = gen_delay_config();
823        delay_config.coord_request_recipients = Arc::new(move |_| 30);
824
825        let member = mock_member(NodeIndex(7), NodeCount(20), delay_config);
826
827        let request = CoordRequest(UnitCoord::new(1, NodeIndex(3)));
828        let recipients = member.recipients(&request, 10);
829
830        assert_eq!(recipients.len(), member.config.n_members().0 - 1);
831    }
832
833    #[test]
834    fn no_recipients_for_coord_request_in_one_node_setup() {
835        let mut delay_config = gen_delay_config();
836        delay_config.coord_request_recipients = Arc::new(move |_| 30);
837
838        let member = mock_member(NodeIndex(0), NodeCount(1), delay_config);
839
840        let request = CoordRequest(UnitCoord::new(1, NodeIndex(3)));
841        let recipients = member.recipients(&request, 10);
842
843        assert_eq!(recipients, vec![]);
844    }
845}