1use crate::{
2 handle_task_termination,
3 member::Task::{CoordRequest, ParentsRequest, RequestNewest, UnitBroadcast},
4 network,
5 runway::{
6 self, NetworkIO, NewestUnitResponse, Request, Response, RunwayIO, RunwayNotificationIn,
7 RunwayNotificationOut,
8 },
9 task_queue::TaskQueue,
10 units::{UncheckedSignedUnit, UnitCoord},
11 BackupReader, BackupWriter, Config, Data, DataProvider, FinalizationHandler, Hasher,
12 MultiKeychain, Network, NodeIndex, Receiver, Recipient, Round, Sender, Signature, SpawnHandle,
13 Terminator, UncheckedSigned,
14};
15use aleph_bft_types::NodeMap;
16use codec::{Decode, Encode};
17use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
18use futures_timer::Delay;
19use itertools::Itertools;
20use log::{debug, error, info, trace, warn};
21use network::NetworkData;
22use rand::{prelude::SliceRandom, Rng};
23use std::{
24 collections::HashSet,
25 convert::TryInto,
26 fmt::{self, Debug},
27 marker::PhantomData,
28 time::Duration,
29};
30
31#[derive(Clone, Eq, PartialEq, Debug, Decode, Encode)]
33pub(crate) enum UnitMessage<H: Hasher, D: Data, S: Signature> {
34 NewUnit(UncheckedSignedUnit<H, D, S>),
36 RequestCoord(NodeIndex, UnitCoord),
38 ResponseCoord(UncheckedSignedUnit<H, D, S>),
40 RequestParents(NodeIndex, H::Hash),
42 ResponseParents(H::Hash, Vec<UncheckedSignedUnit<H, D, S>>),
44 RequestNewest(NodeIndex, u64),
46 ResponseNewest(UncheckedSigned<NewestUnitResponse<H, D, S>, S>),
48}
49
50impl<H: Hasher, D: Data, S: Signature> UnitMessage<H, D, S> {
51 pub(crate) fn included_data(&self) -> Vec<D> {
52 match self {
53 Self::NewUnit(uu) => uu.as_signable().included_data(),
54 Self::RequestCoord(_, _) => Vec::new(),
55 Self::ResponseCoord(uu) => uu.as_signable().included_data(),
56 Self::RequestParents(_, _) => Vec::new(),
57 Self::ResponseParents(_, units) => units
58 .iter()
59 .flat_map(|uu| uu.as_signable().included_data())
60 .collect(),
61 UnitMessage::RequestNewest(_, _) => Vec::new(),
62 UnitMessage::ResponseNewest(response) => response.as_signable().included_data(),
63 }
64 }
65}
66
67#[derive(Eq, PartialEq, Debug)]
68enum Task<H: Hasher, D: Data, S: Signature> {
69 CoordRequest(UnitCoord),
71 ParentsRequest(H::Hash),
73 UnitBroadcast(UncheckedSignedUnit<H, D, S>),
75 RequestNewest(u64),
77}
78
79#[derive(Eq, PartialEq, Debug)]
80struct RepeatableTask<H: Hasher, D: Data, S: Signature> {
81 task: Task<H, D, S>,
82 counter: usize,
83}
84
85impl<H: Hasher, D: Data, S: Signature> fmt::Display for RepeatableTask<H, D, S> {
86 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
87 write!(
88 f,
89 "RepeatableTask({:?}, counter {})",
90 self.task, self.counter
91 )
92 }
93}
94
95impl<H: Hasher, D: Data, S: Signature> RepeatableTask<H, D, S> {
96 fn new(task: Task<H, D, S>) -> Self {
97 Self { task, counter: 0 }
98 }
99}
100
101enum TaskDetails<H: Hasher, D: Data, S: Signature> {
102 Cancel,
103 Perform {
104 message: UnitMessage<H, D, S>,
105 recipients: Vec<Recipient>,
106 reschedule: Duration,
107 },
108}
109
110#[derive(Clone)]
111pub struct LocalIO<
112 D: Data,
113 DP: DataProvider<D>,
114 FH: FinalizationHandler<D>,
115 US: BackupWriter,
116 UL: BackupReader,
117> {
118 data_provider: DP,
119 finalization_handler: FH,
120 unit_saver: US,
121 unit_loader: UL,
122 _phantom: PhantomData<D>,
123}
124
125impl<
126 D: Data,
127 DP: DataProvider<D>,
128 FH: FinalizationHandler<D>,
129 US: BackupWriter,
130 UL: BackupReader,
131 > LocalIO<D, DP, FH, US, UL>
132{
133 pub fn new(
134 data_provider: DP,
135 finalization_handler: FH,
136 unit_saver: US,
137 unit_loader: UL,
138 ) -> LocalIO<D, DP, FH, US, UL> {
139 LocalIO {
140 data_provider,
141 finalization_handler,
142 unit_saver,
143 unit_loader,
144 _phantom: PhantomData,
145 }
146 }
147}
148
149struct MemberStatus<'a, H: Hasher, D: Data, S: Signature> {
150 task_queue: &'a TaskQueue<RepeatableTask<H, D, S>>,
151 not_resolved_parents: &'a HashSet<H::Hash>,
152 not_resolved_coords: &'a HashSet<UnitCoord>,
153}
154
155impl<'a, H: Hasher, D: Data, S: Signature> MemberStatus<'a, H, D, S> {
156 fn new(
157 task_queue: &'a TaskQueue<RepeatableTask<H, D, S>>,
158 not_resolved_parents: &'a HashSet<H::Hash>,
159 not_resolved_coords: &'a HashSet<UnitCoord>,
160 ) -> Self {
161 Self {
162 task_queue,
163 not_resolved_parents,
164 not_resolved_coords,
165 }
166 }
167}
168
169impl<'a, H: Hasher, D: Data, S: Signature> fmt::Display for MemberStatus<'a, H, D, S>
170where
171 H: Hasher,
172{
173 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
174 let mut count_coord_request: usize = 0;
175 let mut count_parents_request: usize = 0;
176 let mut count_request_newest: usize = 0;
177 let mut count_rebroadcast: usize = 0;
178 for task in self.task_queue.iter().map(|st| &st.task) {
179 match task {
180 CoordRequest(_) => count_coord_request += 1,
181 ParentsRequest(_) => count_parents_request += 1,
182 RequestNewest(_) => count_request_newest += 1,
183 UnitBroadcast(_) => count_rebroadcast += 1,
184 }
185 }
186 let long_time_pending_tasks: Vec<_> = self
187 .task_queue
188 .iter()
189 .filter(|st| st.counter >= 5)
190 .collect();
191 write!(f, "Member status report: ")?;
192 write!(f, "task queue content: ")?;
193 write!(
194 f,
195 "CoordRequest - {}, ParentsRequest - {}, UnitBroadcast - {}, RequestNewest - {}",
196 count_coord_request, count_parents_request, count_rebroadcast, count_request_newest,
197 )?;
198 if !self.not_resolved_coords.is_empty() {
199 write!(
200 f,
201 "; not_resolved_coords.len() - {}",
202 self.not_resolved_coords.len()
203 )?;
204 }
205 if !self.not_resolved_parents.is_empty() {
206 write!(
207 f,
208 "; not_resolved_parents.len() - {}",
209 self.not_resolved_parents.len()
210 )?;
211 }
212
213 static ITEMS_PRINT_LIMIT: usize = 10;
214
215 if !long_time_pending_tasks.is_empty() {
216 write!(f, "; pending tasks with counter >= 5 -")?;
217 write!(f, " {}", {
218 long_time_pending_tasks
219 .iter()
220 .take(ITEMS_PRINT_LIMIT)
221 .join(", ")
222 })?;
223
224 if let Some(remaining) = long_time_pending_tasks.len().checked_sub(ITEMS_PRINT_LIMIT) {
225 write!(f, " and {remaining} more")?
226 }
227 }
228 write!(f, ".")?;
229 Ok(())
230 }
231}
232
233struct Member<H, D, S>
234where
235 H: Hasher,
236 D: Data,
237 S: Signature,
238{
239 config: Config,
240 task_queue: TaskQueue<RepeatableTask<H, D, S>>,
241 not_resolved_parents: HashSet<H::Hash>,
242 not_resolved_coords: HashSet<UnitCoord>,
243 newest_unit_resolved: bool,
244 peers: Vec<Recipient>,
245 unit_messages_for_network: Sender<(UnitMessage<H, D, S>, Recipient)>,
246 unit_messages_from_network: Receiver<UnitMessage<H, D, S>>,
247 notifications_for_runway: Sender<RunwayNotificationIn<H, D, S>>,
248 notifications_from_runway: Receiver<RunwayNotificationOut<H, D, S>>,
249 resolved_requests: Receiver<Request<H>>,
250 exiting: bool,
251 top_units: NodeMap<Round>,
252}
253
254impl<H, D, S> Member<H, D, S>
255where
256 H: Hasher,
257 D: Data,
258 S: Signature,
259{
260 fn new(
261 config: Config,
262 unit_messages_for_network: Sender<(UnitMessage<H, D, S>, Recipient)>,
263 unit_messages_from_network: Receiver<UnitMessage<H, D, S>>,
264 notifications_for_runway: Sender<RunwayNotificationIn<H, D, S>>,
265 notifications_from_runway: Receiver<RunwayNotificationOut<H, D, S>>,
266 resolved_requests: Receiver<Request<H>>,
267 ) -> Self {
268 let n_members = config.n_members();
269 let peers = (0..n_members.0)
270 .map(NodeIndex)
271 .filter(|x| *x != config.node_ix())
272 .map(Recipient::Node)
273 .collect();
274
275 Self {
276 config,
277 task_queue: TaskQueue::new(),
278 not_resolved_parents: HashSet::new(),
279 not_resolved_coords: HashSet::new(),
280 newest_unit_resolved: false,
281 peers,
282 unit_messages_for_network,
283 unit_messages_from_network,
284 notifications_for_runway,
285 notifications_from_runway,
286 resolved_requests,
287 exiting: false,
288 top_units: NodeMap::with_size(n_members),
289 }
290 }
291
292 fn on_create(&mut self, u: UncheckedSignedUnit<H, D, S>) {
293 self.send_unit_message(UnitMessage::NewUnit(u), Recipient::Everyone);
294 }
295
296 fn on_unit_discovered(&mut self, new_unit: UncheckedSignedUnit<H, D, S>) {
297 let unit_creator = new_unit.as_signable().creator();
298 let unit_round = new_unit.as_signable().round();
299 if self
300 .top_units
301 .get(unit_creator)
302 .map(|round| round < &unit_round)
303 .unwrap_or(true)
304 {
305 self.top_units.insert(unit_creator, unit_round);
306 let task = RepeatableTask::new(UnitBroadcast(new_unit));
307 let delay = self.delay(&task.task, task.counter);
308 self.task_queue.schedule_in(task, delay)
309 }
310 }
311
312 fn on_request_coord(&mut self, coord: UnitCoord) {
313 trace!(target: "AlephBFT-member", "{:?} Dealing with missing coord notification {:?}.", self.index(), coord);
314 if !self.not_resolved_coords.insert(coord) {
315 return;
316 }
317
318 self.task_queue
319 .schedule_now(RepeatableTask::new(CoordRequest(coord)));
320 self.trigger_tasks();
321 }
322
323 fn on_request_parents(&mut self, u_hash: H::Hash) {
324 if !self.not_resolved_parents.insert(u_hash) {
325 return;
326 }
327
328 self.task_queue
329 .schedule_now(RepeatableTask::new(ParentsRequest(u_hash)));
330 self.trigger_tasks();
331 }
332
333 fn on_request_newest(&mut self, salt: u64) {
334 self.task_queue
335 .schedule_now(RepeatableTask::new(RequestNewest(salt)));
336 self.trigger_tasks();
337 }
338
339 fn trigger_tasks(&mut self) {
340 while let Some(mut task) = self.task_queue.pop_due_task() {
341 match self.task_details(&task.task, task.counter) {
342 TaskDetails::Cancel => (),
343 TaskDetails::Perform {
344 message,
345 recipients,
346 reschedule,
347 } => {
348 for recipient in recipients.into_iter() {
349 self.send_unit_message(message.clone(), recipient);
350 }
351
352 task.counter += 1;
353 self.task_queue.schedule_in(task, reschedule)
354 }
355 }
356 }
357 }
358
359 fn random_peers(&self, n: usize) -> Vec<Recipient> {
360 self.peers
361 .choose_multiple(&mut rand::thread_rng(), n)
362 .cloned()
363 .collect()
364 }
365
366 fn index(&self) -> NodeIndex {
367 self.config.node_ix()
368 }
369
370 fn send_unit_message(&mut self, message: UnitMessage<H, D, S>, recipient: Recipient) {
371 if self
372 .unit_messages_for_network
373 .unbounded_send((message, recipient))
374 .is_err()
375 {
376 warn!(target: "AlephBFT-member", "{:?} Channel to network should be open", self.index());
377 self.exiting = true;
378 }
379 }
380
381 fn task_details(&mut self, task: &Task<H, D, S>, counter: usize) -> TaskDetails<H, D, S> {
386 match self.still_valid(task) {
387 false => TaskDetails::Cancel,
388 true => TaskDetails::Perform {
389 message: self.message(task),
390 recipients: self.recipients(task, counter),
391 reschedule: self.delay(task, counter),
392 },
393 }
394 }
395
396 fn message(&self, task: &Task<H, D, S>) -> UnitMessage<H, D, S> {
397 match task {
398 CoordRequest(coord) => UnitMessage::RequestCoord(self.index(), *coord),
399 ParentsRequest(hash) => UnitMessage::RequestParents(self.index(), *hash),
400 UnitBroadcast(unit) => UnitMessage::NewUnit(unit.clone()),
401 RequestNewest(salt) => UnitMessage::RequestNewest(self.index(), *salt),
402 }
403 }
404
405 fn recipients(&self, task: &Task<H, D, S>, counter: usize) -> Vec<Recipient> {
406 match task {
407 CoordRequest(_) => {
408 self.random_peers((self.config.delay_config().coord_request_recipients)(
409 counter,
410 ))
411 }
412 ParentsRequest(_) => {
413 self.random_peers((self.config.delay_config().parent_request_recipients)(
414 counter,
415 ))
416 }
417 UnitBroadcast(_) => vec![Recipient::Everyone],
418 RequestNewest(_) => vec![Recipient::Everyone],
419 }
420 }
421
422 fn still_valid(&self, task: &Task<H, D, S>) -> bool {
423 match task {
424 CoordRequest(coord) => self.not_resolved_coords.contains(coord),
425 ParentsRequest(hash) => self.not_resolved_parents.contains(hash),
426 RequestNewest(_) => !self.newest_unit_resolved,
427 UnitBroadcast(unit) => {
428 Some(&unit.as_signable().round())
429 == self.top_units.get(unit.as_signable().creator())
430 }
431 }
432 }
433
434 fn delay(&self, task: &Task<H, D, S>, counter: usize) -> Duration {
442 match task {
443 UnitBroadcast(_) => {
444 let low = self.config.delay_config().unit_rebroadcast_interval_min;
445 let high = self.config.delay_config().unit_rebroadcast_interval_max;
446 let millis = rand::thread_rng().gen_range(low.as_millis()..high.as_millis());
447 Duration::from_millis(millis as u64)
448 }
449 CoordRequest(_) => (self.config.delay_config().coord_request_delay)(counter),
450 ParentsRequest(_) => (self.config.delay_config().parent_request_delay)(counter),
451 RequestNewest(_) => (self.config.delay_config().newest_request_delay)(counter),
452 }
453 }
454
455 fn on_unit_message_from_units(&mut self, message: RunwayNotificationOut<H, D, S>) {
456 match message {
457 RunwayNotificationOut::NewSelfUnit(u) => self.on_create(u),
458 RunwayNotificationOut::NewAnyUnit(u) => self.on_unit_discovered(u),
459 RunwayNotificationOut::Request(request) => match request {
460 Request::Coord(coord) => self.on_request_coord(coord),
461 Request::Parents(u_hash) => self.on_request_parents(u_hash),
462 Request::NewestUnit(salt) => self.on_request_newest(salt),
463 },
464 RunwayNotificationOut::Response(response, recipient) => match response {
465 Response::Coord(u) => {
466 let message = UnitMessage::ResponseCoord(u);
467 self.send_unit_message(message, Recipient::Node(recipient))
468 }
469 Response::Parents(u_hash, parents) => {
470 let message = UnitMessage::ResponseParents(u_hash, parents);
471 self.send_unit_message(message, Recipient::Node(recipient))
472 }
473 Response::NewestUnit(response) => {
474 let requester = response.as_signable().requester();
475 let message = UnitMessage::ResponseNewest(response);
476 self.send_unit_message(message, Recipient::Node(requester))
477 }
478 },
479 }
480 }
481
482 fn status_report(&self) {
483 let status = MemberStatus::new(
484 &self.task_queue,
485 &self.not_resolved_parents,
486 &self.not_resolved_coords,
487 );
488 info!(target: "AlephBFT-member", "{}", status);
489 }
490
491 async fn run(mut self, mut terminator: Terminator) {
492 let ticker_delay = self.config.delay_config().tick_interval;
493 let mut ticker = Delay::new(ticker_delay).fuse();
494 let status_ticker_delay = Duration::from_secs(10);
495 let mut status_ticker = Delay::new(status_ticker_delay).fuse();
496
497 loop {
498 futures::select! {
499 event = self.notifications_from_runway.next() => match event {
500 Some(message) => {
501 self.on_unit_message_from_units(message);
502 },
503 None => {
504 error!(target: "AlephBFT-member", "{:?} Unit message stream from Runway closed.", self.index());
505 break;
506 },
507 },
508
509 event = self.resolved_requests.next() => match event {
510 Some(request) => match request {
511 Request::Coord(coord) => {
512 self.not_resolved_coords.remove(&coord);
513 },
514 Request::Parents(u_hash) => {
515 self.not_resolved_parents.remove(&u_hash);
516 },
517 Request::NewestUnit(_) => {
518 self.newest_unit_resolved = true;
519 }
520 },
521 None => {
522 error!(target: "AlephBFT-member", "{:?} Resolved-requests stream from Runway closed.", self.index());
523 break;
524 }
525 },
526
527 event = self.unit_messages_from_network.next() => match event {
528 Some(message) => match message.try_into() {
529 Ok(notification) => {
530 self.send_notification_to_runway(notification)
531 },
532 Err(_) => error!(target: "AlephBFT-member", "{:?} Unable to convert a UnitMessage into an instance of RunwayNotificationIn.", self.index()),
533 },
534 None => {
535 error!(target: "AlephBFT-member", "{:?} Unit message stream from network closed.", self.index());
536 break;
537 },
538 },
539
540 _ = &mut ticker => {
541 self.trigger_tasks();
542 ticker = Delay::new(ticker_delay).fuse();
543 },
544
545 _ = &mut status_ticker => {
546 self.status_report();
547 status_ticker = Delay::new(status_ticker_delay).fuse();
548 },
549
550 _ = terminator.get_exit().fuse() => {
551 debug!(target: "AlephBFT-member", "{:?} received exit signal", self.index());
552 self.exiting = true;
553 },
554 }
555 if self.exiting {
556 debug!(target: "AlephBFT-member", "{:?} Member decided to exit.", self.index());
557 terminator.terminate_sync().await;
558 break;
559 }
560 }
561
562 debug!(target: "AlephBFT-member", "{:?} Member stopped.", self.index());
563 }
564
565 fn send_notification_to_runway(&mut self, notification: RunwayNotificationIn<H, D, S>) {
566 if self
567 .notifications_for_runway
568 .unbounded_send(notification)
569 .is_err()
570 {
571 warn!(target: "AlephBFT-member", "{:?} Sender to runway with RunwayNotificationIn messages should be open", self.index());
572 self.exiting = true;
573 }
574 }
575}
576
577pub async fn run_session<
583 H: Hasher,
584 D: Data,
585 DP: DataProvider<D>,
586 FH: FinalizationHandler<D>,
587 US: BackupWriter + Send + Sync + 'static,
588 UL: BackupReader + Send + Sync + 'static,
589 N: Network<NetworkData<H, D, MK::Signature, MK::PartialMultisignature>> + 'static,
590 SH: SpawnHandle,
591 MK: MultiKeychain,
592>(
593 config: Config,
594 local_io: LocalIO<D, DP, FH, US, UL>,
595 network: N,
596 keychain: MK,
597 spawn_handle: SH,
598 mut terminator: Terminator,
599) {
600 let index = config.node_ix();
601 info!(target: "AlephBFT-member", "{:?} Starting a new session.", index);
602 debug!(target: "AlephBFT-member", "{:?} Spawning party for a session.", index);
603
604 let (alert_messages_for_alerter, alert_messages_from_network) = mpsc::unbounded();
605 let (alert_messages_for_network, alert_messages_from_alerter) = mpsc::unbounded();
606 let (unit_messages_for_units, unit_messages_from_network) = mpsc::unbounded();
607 let (unit_messages_for_network, unit_messages_from_units) = mpsc::unbounded();
608 let (runway_messages_for_runway, runway_messages_from_network) = mpsc::unbounded();
609 let (runway_messages_for_network, runway_messages_from_runway) = mpsc::unbounded();
610 let (resolved_requests_tx, resolved_requests_rx) = mpsc::unbounded();
611
612 debug!(target: "AlephBFT-member", "{:?} Spawning network.", index);
613 let network_terminator = terminator.add_offspring_connection("AlephBFT-network");
614
615 let network_handle = spawn_handle
616 .spawn_essential("member/network", async move {
617 network::run(
618 network,
619 unit_messages_from_units,
620 unit_messages_for_units,
621 alert_messages_from_alerter,
622 alert_messages_for_alerter,
623 network_terminator,
624 )
625 .await
626 })
627 .fuse();
628 pin_mut!(network_handle);
629 debug!(target: "AlephBFT-member", "{:?} Network spawned.", index);
630
631 debug!(target: "AlephBFT-member", "{:?} Initializing Runway.", index);
632 let runway_terminator = terminator.add_offspring_connection("AlephBFT-runway");
633 let network_io = NetworkIO {
634 alert_messages_for_network,
635 alert_messages_from_network,
636 unit_messages_from_network: runway_messages_from_network,
637 unit_messages_for_network: runway_messages_for_network,
638 resolved_requests: resolved_requests_tx,
639 };
640 let runway_io = RunwayIO::new(
641 local_io.data_provider,
642 local_io.finalization_handler,
643 local_io.unit_saver,
644 local_io.unit_loader,
645 );
646 let spawn_copy = spawn_handle.clone();
647 let config_copy = config.clone();
648 let runway_handle = spawn_handle
649 .spawn_essential("member/runway", async move {
650 runway::run(
651 config_copy,
652 runway_io,
653 &keychain,
654 spawn_copy,
655 network_io,
656 runway_terminator,
657 )
658 .await
659 })
660 .fuse();
661 pin_mut!(runway_handle);
662 debug!(target: "AlephBFT-member", "{:?} Runway spawned.", index);
663
664 debug!(target: "AlephBFT-member", "{:?} Initializing Member.", index);
665 let member = Member::new(
666 config,
667 unit_messages_for_network,
668 unit_messages_from_network,
669 runway_messages_for_runway,
670 runway_messages_from_runway,
671 resolved_requests_rx,
672 );
673 let member_terminator = terminator.add_offspring_connection("AlephBFT-member");
674 let member_handle = spawn_handle
675 .spawn_essential("member", async move {
676 member.run(member_terminator).await;
677 })
678 .fuse();
679 pin_mut!(member_handle);
680 debug!(target: "AlephBFT-member", "{:?} Member initialized.", index);
681
682 futures::select! {
683 _ = network_handle => {
684 error!(target: "AlephBFT-member", "{:?} Network-hub terminated early.", index);
685 },
686
687 _ = runway_handle => {
688 error!(target: "AlephBFT-member", "{:?} Runway terminated early.", index);
689 },
690
691 _ = member_handle => {
692 error!(target: "AlephBFT-member", "{:?} Member terminated early.", index);
693 },
694
695 _ = terminator.get_exit().fuse() => {
696 debug!(target: "AlephBFT-member", "{:?} exit channel was called.", index);
697 },
698 }
699
700 debug!(target: "AlephBFT-member", "{:?} Run ending.", index);
701
702 terminator.terminate_sync().await;
703
704 handle_task_termination(network_handle, "AlephBFT-member", "Network", index).await;
705 handle_task_termination(runway_handle, "AlephBFT-member", "Runway", index).await;
706 handle_task_termination(member_handle, "AlephBFT-member", "Member", index).await;
707
708 info!(target: "AlephBFT-member", "{:?} Session ended.", index);
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714 use crate::{
715 testing::{gen_config, gen_delay_config},
716 DelayConfig,
717 };
718 use aleph_bft_mock::{Hasher64, Signature};
719 use aleph_bft_types::NodeCount;
720 use futures::channel::mpsc::unbounded;
721 use itertools::Itertools;
722 use std::sync::Arc;
723
724 fn mock_member(
725 node_ix: NodeIndex,
726 node_count: NodeCount,
727 delay_config: DelayConfig,
728 ) -> Member<Hasher64, u32, Signature> {
729 let config = gen_config(node_ix, node_count, delay_config);
730 let (unit_messages_for_network_sx, _) = unbounded();
731 let (_, unit_messages_from_network_rx) = unbounded();
732 let (notifications_for_runway_sx, _) = unbounded();
733 let (_, notifications_from_runway_rx) = unbounded();
734 let (_, resolved_requests_rx) = unbounded();
735
736 Member::new(
737 config,
738 unit_messages_for_network_sx,
739 unit_messages_from_network_rx,
740 notifications_for_runway_sx,
741 notifications_from_runway_rx,
742 resolved_requests_rx,
743 )
744 }
745
746 #[test]
747 fn delay_for_coord_request() {
748 let mut delay_config = gen_delay_config();
749 delay_config.coord_request_delay = Arc::new(|t| Duration::from_millis(123 + t as u64));
750
751 let member = mock_member(NodeIndex(7), NodeCount(20), delay_config);
752
753 let delay = member.delay(&CoordRequest(UnitCoord::new(1, NodeIndex(3))), 10);
754
755 assert_eq!(delay, Duration::from_millis(133));
756 }
757
758 #[test]
759 fn delay_for_parent_request() {
760 let mut delay_config = gen_delay_config();
761 delay_config.parent_request_delay = Arc::new(|t| Duration::from_millis(123 + t as u64));
762
763 let member = mock_member(NodeIndex(7), NodeCount(20), delay_config);
764
765 let delay = member.delay(&ParentsRequest(Hasher64::hash(&[0x0])), 10);
766
767 assert_eq!(delay, Duration::from_millis(133));
768 }
769
770 #[test]
771 fn delay_for_newest_request() {
772 let mut delay_config = gen_delay_config();
773 delay_config.newest_request_delay = Arc::new(|t| Duration::from_millis(123 + t as u64));
774
775 let member = mock_member(NodeIndex(7), NodeCount(20), delay_config);
776
777 let delay = member.delay(&RequestNewest(12345), 10);
778
779 assert_eq!(delay, Duration::from_millis(133));
780 }
781
782 #[test]
783 fn recipients_for_coord_request() {
784 let node_ix = NodeIndex(7);
785 let mut delay_config = gen_delay_config();
786 delay_config.coord_request_recipients = Arc::new(|t| 10 - t);
787
788 let member = mock_member(node_ix, NodeCount(20), delay_config);
789
790 let request = CoordRequest(UnitCoord::new(1, NodeIndex(3)));
791 let recipients = member.recipients(&request, 3);
792
793 assert_eq!(recipients.len(), 7);
794 assert_eq!(
795 recipients.iter().cloned().unique().collect::<Vec<_>>(),
796 recipients
797 );
798 assert!(!recipients.contains(&Recipient::Node(node_ix)));
799 }
800
801 #[test]
802 fn recipients_for_parent_request() {
803 let node_ix = NodeIndex(7);
804 let mut delay_config = gen_delay_config();
805 delay_config.parent_request_recipients = Arc::new(|t| 10 - t);
806
807 let member = mock_member(node_ix, NodeCount(20), delay_config);
808
809 let request = ParentsRequest(Hasher64::hash(&[0x0]));
810 let recipients = member.recipients(&request, 3);
811
812 assert_eq!(recipients.len(), 7);
813 assert_eq!(
814 recipients.iter().cloned().unique().collect::<Vec<_>>(),
815 recipients
816 );
817 assert!(!recipients.contains(&Recipient::Node(node_ix)));
818 }
819
820 #[test]
821 fn at_most_n_members_recipients_for_coord_request() {
822 let mut delay_config = gen_delay_config();
823 delay_config.coord_request_recipients = Arc::new(move |_| 30);
824
825 let member = mock_member(NodeIndex(7), NodeCount(20), delay_config);
826
827 let request = CoordRequest(UnitCoord::new(1, NodeIndex(3)));
828 let recipients = member.recipients(&request, 10);
829
830 assert_eq!(recipients.len(), member.config.n_members().0 - 1);
831 }
832
833 #[test]
834 fn no_recipients_for_coord_request_in_one_node_setup() {
835 let mut delay_config = gen_delay_config();
836 delay_config.coord_request_recipients = Arc::new(move |_| 30);
837
838 let member = mock_member(NodeIndex(0), NodeCount(1), delay_config);
839
840 let request = CoordRequest(UnitCoord::new(1, NodeIndex(3)));
841 let recipients = member.recipients(&request, 10);
842
843 assert_eq!(recipients, vec![]);
844 }
845}