1use crate::config::BPConConfig;
8use crate::error::FollowEventError::FailedToSendMessage;
9use crate::error::LaunchBallotError::{
10 EventChannelClosed, FailedToSendEvent, LeaderElectionError, MessageChannelClosed,
11};
12use crate::error::UpdateStateError::ValueVerificationFailed;
13use crate::error::{
14 BallotNumberMismatch, DeserializationError, FollowEventError, LaunchBallotError,
15 LeaderMismatch, PartyStatusMismatch, SerializationError, UpdateStateError, ValueMismatch,
16};
17use crate::leader::LeaderElector;
18use crate::message::{
19 Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent,
20 MessagePacket, MessageRoundState, ProtocolMessage,
21};
22use crate::value::{Value, ValueSelector};
23use log::{debug, warn};
24use std::cmp::PartialEq;
25use std::collections::hash_map::Entry::Vacant;
26use std::collections::{HashMap, HashSet};
27use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
28use tokio::time::{sleep, sleep_until};
29
30#[derive(PartialEq, Eq, Debug, Copy, Clone)]
35pub enum PartyStatus {
36 None,
37 Launched,
38 Passed1a,
39 Passed1b,
40 Passed2a,
41 Passed2av,
42 Passed2b,
43 Finished,
44 Failed,
45}
46
47impl std::fmt::Display for PartyStatus {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 write!(f, "PartyStatus: {:?}", self)
50 }
51}
52
53#[derive(PartialEq, Eq, Debug, Copy, Clone)]
57pub enum PartyEvent {
58 Launch1a,
59 Launch1b,
60 Launch2a,
61 Launch2av,
62 Launch2b,
63 Finalize,
64}
65
66impl std::fmt::Display for PartyEvent {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 write!(f, "PartyEvent: {:?}", self)
69 }
70}
71
72pub struct Party<V: Value, VS: ValueSelector<V>> {
86 pub id: u64,
88
89 msg_in_receiver: UnboundedReceiver<MessagePacket>,
91 msg_out_sender: UnboundedSender<MessagePacket>,
93
94 event_receiver: UnboundedReceiver<PartyEvent>,
96 event_sender: UnboundedSender<PartyEvent>,
98
99 pub(crate) cfg: BPConConfig,
101
102 value_selector: VS,
104
105 elector: Box<dyn LeaderElector<V, VS>>,
107
108 status: PartyStatus,
110
111 pub(crate) ballot: u64,
113
114 leader: u64,
116
117 last_ballot_voted: Option<u64>,
119
120 last_value_voted: Option<V>,
122
123 rate_limiter: HashSet<(ProtocolMessage, u64)>,
125
126 parties_voted_before: HashMap<u64, Option<V>>,
129 messages_1b_weight: u128,
131
132 value_2a: Option<V>,
134
135 messages_2av_state: MessageRoundState,
137
138 messages_2b_state: MessageRoundState,
140}
141
142impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
143 pub fn new(
160 id: u64,
161 cfg: BPConConfig,
162 value_selector: VS,
163 elector: Box<dyn LeaderElector<V, VS>>,
164 ) -> (
165 Self,
166 UnboundedReceiver<MessagePacket>,
167 UnboundedSender<MessagePacket>,
168 ) {
169 let (event_sender, event_receiver) = unbounded_channel();
170 let (msg_in_sender, msg_in_receiver) = unbounded_channel();
171 let (msg_out_sender, msg_out_receiver) = unbounded_channel();
172
173 (
174 Self {
175 id,
176 msg_in_receiver,
177 msg_out_sender,
178 event_receiver,
179 event_sender,
180 cfg,
181 value_selector,
182 elector,
183 status: PartyStatus::None,
184 ballot: 0,
185 leader: 0,
186 last_ballot_voted: None,
187 last_value_voted: None,
188 rate_limiter: HashSet::new(),
189 parties_voted_before: HashMap::new(),
190 messages_1b_weight: 0,
191 value_2a: None,
192 messages_2av_state: MessageRoundState::new(),
193 messages_2b_state: MessageRoundState::new(),
194 },
195 msg_out_receiver,
196 msg_in_sender,
197 )
198 }
199
200 pub fn ballot(&self) -> u64 {
202 self.ballot
203 }
204
205 pub fn is_launched(&self) -> bool {
210 !(self.is_stopped() || self.status == PartyStatus::None)
211 }
212
213 pub fn is_stopped(&self) -> bool {
218 self.status == PartyStatus::Finished || self.status == PartyStatus::Failed
219 }
220
221 pub fn get_value_selected(&self) -> Option<V> {
227 if self.status == PartyStatus::Finished {
229 return self.value_2a.clone();
230 }
231
232 None
233 }
234
235 fn get_value(&self) -> V {
243 self.value_selector.select(&self.parties_voted_before)
244 }
245
246 pub async fn launch_ballot(&mut self) -> Result<V, LaunchBallotError> {
257 self.prepare_next_ballot()?;
258
259 sleep_until(self.cfg.launch_at).await;
260
261 let launch1a_timer = sleep(self.cfg.launch1a_timeout);
262 let launch1b_timer = sleep(self.cfg.launch1b_timeout);
263 let launch2a_timer = sleep(self.cfg.launch2a_timeout);
264 let launch2av_timer = sleep(self.cfg.launch2av_timeout);
265 let launch2b_timer = sleep(self.cfg.launch2b_timeout);
266 let finalize_timer = sleep(self.cfg.finalize_timeout);
267
268 tokio::pin!(
269 launch1a_timer,
270 launch1b_timer,
271 launch2a_timer,
272 launch2av_timer,
273 launch2b_timer,
274 finalize_timer
275 );
276
277 let mut launch1a_fired = false;
278 let mut launch1b_fired = false;
279 let mut launch2a_fired = false;
280 let mut launch2av_fired = false;
281 let mut launch2b_fired = false;
282 let mut finalize_fired = false;
283
284 while self.status != PartyStatus::Finished {
285 tokio::select! {
286 _ = &mut launch1a_timer, if !launch1a_fired => {
287 self.event_sender.send(PartyEvent::Launch1a).map_err(|err| {
288 self.status = PartyStatus::Failed;
289 FailedToSendEvent(PartyEvent::Launch1a, err.to_string())
290 })?;
291 launch1a_fired = true;
292 },
293 _ = &mut launch1b_timer, if !launch1b_fired => {
294 self.event_sender.send(PartyEvent::Launch1b).map_err(|err| {
295 self.status = PartyStatus::Failed;
296 FailedToSendEvent(PartyEvent::Launch1b, err.to_string())
297 })?;
298 launch1b_fired = true;
299 },
300 _ = &mut launch2a_timer, if !launch2a_fired => {
301 self.event_sender.send(PartyEvent::Launch2a).map_err(|err| {
302 self.status = PartyStatus::Failed;
303 FailedToSendEvent(PartyEvent::Launch2a, err.to_string())
304 })?;
305 launch2a_fired = true;
306 },
307 _ = &mut launch2av_timer, if !launch2av_fired => {
308 self.event_sender.send(PartyEvent::Launch2av).map_err(|err| {
309 self.status = PartyStatus::Failed;
310 FailedToSendEvent(PartyEvent::Launch2av, err.to_string())
311 })?;
312 launch2av_fired = true;
313 },
314 _ = &mut launch2b_timer, if !launch2b_fired => {
315 self.event_sender.send(PartyEvent::Launch2b).map_err(|err| {
316 self.status = PartyStatus::Failed;
317 FailedToSendEvent(PartyEvent::Launch2b, err.to_string())
318 })?;
319 launch2b_fired = true;
320 },
321 _ = &mut finalize_timer, if !finalize_fired => {
322 self.event_sender.send(PartyEvent::Finalize).map_err(|err| {
323 self.status = PartyStatus::Failed;
324 FailedToSendEvent(PartyEvent::Finalize, err.to_string())
325 })?;
326 finalize_fired = true;
327 },
328 msg = self.msg_in_receiver.recv() => {
329 sleep(self.cfg.grace_period).await;
330 if let Some(msg) = msg {
331 let meta = (msg.routing.msg_type, msg.routing.sender);
332 debug!("Party {} received {} from party {}", self.id, meta.0, meta.1);
333
334 if self.id == meta.1{
335 warn!("Received own message {}, intended to be broadcasted.", meta.0);
336 continue
337 }
338 if self.rate_limiter.contains(&meta){
339 warn!("Party {} hit rate limit in party {} for message {}", meta.1, self.id, meta.0);
340 continue
341 }
342
343 if let Err(err) = self.update_state(&msg) {
344 warn!("Failed to update state for party {} with {}, got error: {err}", self.id, meta.0)
349 }
350 self.rate_limiter.insert(meta);
351
352 }else if self.msg_in_receiver.is_closed(){
353 self.status = PartyStatus::Failed;
354 return Err(MessageChannelClosed)
355 }
356 },
357 event = self.event_receiver.recv() => {
358 sleep(self.cfg.grace_period).await;
359 if let Some(event) = event {
360 if let Err(err) = self.follow_event(event) {
361 self.status = PartyStatus::Failed;
362 return Err(LaunchBallotError::FollowEventError(event, err));
363 }
364 }else if self.event_receiver.is_closed(){
365 self.status = PartyStatus::Failed;
366 return Err(EventChannelClosed)
367 }
368 },
369 }
370 }
371
372 Ok(self.get_value_selected().unwrap())
373 }
374
375 fn prepare_next_ballot(&mut self) -> Result<(), LaunchBallotError> {
383 self.reset_state();
384 self.ballot += 1;
385 self.status = PartyStatus::Launched;
386 self.leader = self
387 .elector
388 .elect_leader(self)
389 .map_err(|err| LeaderElectionError(err.to_string()))?;
390
391 Ok(())
392 }
393
394 fn reset_state(&mut self) {
398 self.rate_limiter = HashSet::new();
399 self.parties_voted_before = HashMap::new();
400 self.messages_1b_weight = 0;
401 self.value_2a = None;
402 self.messages_2av_state.reset();
403 self.messages_2b_state.reset();
404
405 while self.event_receiver.try_recv().is_ok() {}
407 while self.msg_in_receiver.try_recv().is_ok() {}
408 }
409
410 fn update_state(&mut self, msg: &MessagePacket) -> Result<(), UpdateStateError<V>> {
422 let routing = msg.routing;
423
424 match routing.msg_type {
425 ProtocolMessage::Msg1a => {
426 if self.status != PartyStatus::Launched {
427 return Err(PartyStatusMismatch {
428 party_status: self.status,
429 needed_status: PartyStatus::Launched,
430 }
431 .into());
432 }
433
434 let msg = Message1aContent::unpack(msg)?;
435
436 if msg.ballot != self.ballot {
437 return Err(BallotNumberMismatch {
438 party_ballot_number: self.ballot,
439 message_ballot_number: msg.ballot,
440 }
441 .into());
442 }
443
444 if routing.sender != self.leader {
445 return Err(LeaderMismatch {
446 party_leader: self.leader,
447 message_sender: routing.sender,
448 }
449 .into());
450 }
451
452 self.status = PartyStatus::Passed1a;
453 }
454 ProtocolMessage::Msg1b => {
455 if self.status != PartyStatus::Passed1a {
456 return Err(PartyStatusMismatch {
457 party_status: self.status,
458 needed_status: PartyStatus::Passed1a,
459 }
460 .into());
461 }
462
463 let msg = Message1bContent::unpack(msg)?;
464
465 if msg.ballot != self.ballot {
466 return Err(BallotNumberMismatch {
467 party_ballot_number: self.ballot,
468 message_ballot_number: msg.ballot,
469 }
470 .into());
471 }
472
473 if let Some(last_ballot_voted) = msg.last_ballot_voted {
474 if last_ballot_voted >= self.ballot {
475 return Err(BallotNumberMismatch {
476 party_ballot_number: self.ballot,
477 message_ballot_number: msg.ballot,
478 }
479 .into());
480 }
481 }
482
483 if let Vacant(e) = self.parties_voted_before.entry(routing.sender) {
484 let value: Option<V> = match msg.last_value_voted {
485 Some(ref data) => Some(
486 bincode::deserialize(data)
487 .map_err(|err| DeserializationError::Value(err.to_string()))?,
488 ),
489 None => None,
490 };
491
492 e.insert(value);
493
494 self.messages_1b_weight +=
495 self.cfg.party_weights[routing.sender as usize] as u128;
496
497 let self_weight = self.cfg.party_weights[self.id as usize] as u128;
498 if self.messages_1b_weight >= self.cfg.threshold.saturating_sub(self_weight) {
499 self.status = PartyStatus::Passed1b;
500 }
501 }
502 }
503 ProtocolMessage::Msg2a => {
504 if self.status != PartyStatus::Passed1b {
505 return Err(PartyStatusMismatch {
506 party_status: self.status,
507 needed_status: PartyStatus::Passed1b,
508 }
509 .into());
510 }
511
512 let msg = Message2aContent::unpack(msg)?;
513
514 if msg.ballot != self.ballot {
515 return Err(BallotNumberMismatch {
516 party_ballot_number: self.ballot,
517 message_ballot_number: msg.ballot,
518 }
519 .into());
520 }
521
522 if routing.sender != self.leader {
523 return Err(LeaderMismatch {
524 party_leader: self.leader,
525 message_sender: routing.sender,
526 }
527 .into());
528 }
529
530 let value_received = bincode::deserialize(&msg.value[..])
531 .map_err(|err| DeserializationError::Value(err.to_string()))?;
532
533 if self
534 .value_selector
535 .verify(&value_received, &self.parties_voted_before)
536 {
537 self.status = PartyStatus::Passed2a;
538 self.value_2a = Some(value_received);
539 } else {
540 return Err(ValueVerificationFailed);
541 }
542 }
543 ProtocolMessage::Msg2av => {
544 if self.status != PartyStatus::Passed2a {
545 return Err(PartyStatusMismatch {
546 party_status: self.status,
547 needed_status: PartyStatus::Passed2a,
548 }
549 .into());
550 }
551
552 let msg = Message2avContent::unpack(msg)?;
553
554 if msg.ballot != self.ballot {
555 return Err(BallotNumberMismatch {
556 party_ballot_number: self.ballot,
557 message_ballot_number: msg.ballot,
558 }
559 .into());
560 }
561 let value_received: V = bincode::deserialize(&msg.received_value[..])
562 .map_err(|err| DeserializationError::Value(err.to_string()))?;
563
564 if value_received != self.value_2a.clone().unwrap() {
565 return Err(ValueMismatch {
566 party_value: self.value_2a.clone().unwrap(),
567 message_value: value_received.clone(),
568 }
569 .into());
570 }
571
572 if !self.messages_2av_state.contains_sender(&routing.sender) {
573 self.messages_2av_state.add_sender(
574 routing.sender,
575 self.cfg.party_weights[routing.sender as usize] as u128,
576 );
577
578 let self_weight = self.cfg.party_weights[self.id as usize] as u128;
579 if self.messages_2av_state.get_weight()
580 >= self.cfg.threshold.saturating_sub(self_weight)
581 {
582 self.status = PartyStatus::Passed2av;
583 }
584 }
585 }
586 ProtocolMessage::Msg2b => {
587 if self.status != PartyStatus::Passed2av {
588 return Err(PartyStatusMismatch {
589 party_status: self.status,
590 needed_status: PartyStatus::Passed2av,
591 }
592 .into());
593 }
594
595 let msg = Message2bContent::unpack(msg)?;
596
597 if msg.ballot != self.ballot {
598 return Err(BallotNumberMismatch {
599 party_ballot_number: self.ballot,
600 message_ballot_number: msg.ballot,
601 }
602 .into());
603 }
604
605 if self.messages_2av_state.contains_sender(&routing.sender)
606 && !self.messages_2b_state.contains_sender(&routing.sender)
607 {
608 self.messages_2b_state.add_sender(
609 routing.sender,
610 self.cfg.party_weights[routing.sender as usize] as u128,
611 );
612
613 let self_weight = self.cfg.party_weights[self.id as usize] as u128;
614 if self.messages_2b_state.get_weight()
615 >= self.cfg.threshold.saturating_sub(self_weight)
616 {
617 self.status = PartyStatus::Passed2b;
618 }
619 }
620 }
621 }
622 Ok(())
623 }
624
625 fn follow_event(&mut self, event: PartyEvent) -> Result<(), FollowEventError> {
637 match event {
638 PartyEvent::Launch1a => {
639 if self.status != PartyStatus::Launched {
640 return Err(PartyStatusMismatch {
641 party_status: self.status,
642 needed_status: PartyStatus::Launched,
643 }
644 .into());
645 }
646
647 if self.leader == self.id {
648 let content = &Message1aContent {
649 ballot: self.ballot,
650 };
651 let msg = content.pack(self.id)?;
652
653 self.msg_out_sender
654 .send(msg)
655 .map_err(|err| FailedToSendMessage(err.to_string()))?;
656 self.status = PartyStatus::Passed1a;
657 }
658 }
659 PartyEvent::Launch1b => {
660 if self.status != PartyStatus::Passed1a {
661 return Err(PartyStatusMismatch {
662 party_status: self.status,
663 needed_status: PartyStatus::Passed1a,
664 }
665 .into());
666 }
667
668 let last_value_voted = self
669 .last_value_voted
670 .clone()
671 .map(|inner_data| {
672 bincode::serialize(&inner_data)
673 .map_err(|err| SerializationError::Value(err.to_string()))
674 })
675 .transpose()?;
676
677 let content = &Message1bContent {
678 ballot: self.ballot,
679 last_ballot_voted: self.last_ballot_voted,
680 last_value_voted,
681 };
682 let msg = content.pack(self.id)?;
683
684 self.msg_out_sender
685 .send(msg)
686 .map_err(|err| FailedToSendMessage(err.to_string()))?;
687 }
688 PartyEvent::Launch2a => {
689 if self.status != PartyStatus::Passed1b {
690 return Err(PartyStatusMismatch {
691 party_status: self.status,
692 needed_status: PartyStatus::Passed1b,
693 }
694 .into());
695 }
696 if self.leader == self.id {
697 let value = bincode::serialize(&self.get_value())
698 .map_err(|err| SerializationError::Value(err.to_string()))?;
699
700 let content = &Message2aContent {
701 ballot: self.ballot,
702 value,
703 };
704 let msg = content.pack(self.id)?;
705
706 self.msg_out_sender
707 .send(msg)
708 .map_err(|err| FailedToSendMessage(err.to_string()))?;
709
710 self.value_2a = Some(self.get_value());
711 self.status = PartyStatus::Passed2a;
712 }
713 }
714 PartyEvent::Launch2av => {
715 if self.status != PartyStatus::Passed2a {
716 return Err(PartyStatusMismatch {
717 party_status: self.status,
718 needed_status: PartyStatus::Passed2a,
719 }
720 .into());
721 }
722
723 let received_value = bincode::serialize(&self.value_2a.clone().unwrap())
724 .map_err(|err| SerializationError::Value(err.to_string()))?;
725
726 let content = &Message2avContent {
727 ballot: self.ballot,
728 received_value,
729 };
730 let msg = content.pack(self.id)?;
731
732 self.msg_out_sender
733 .send(msg)
734 .map_err(|err| FailedToSendMessage(err.to_string()))?;
735 }
736 PartyEvent::Launch2b => {
737 if self.status != PartyStatus::Passed2av {
738 return Err(PartyStatusMismatch {
739 party_status: self.status,
740 needed_status: PartyStatus::Passed2av,
741 }
742 .into());
743 }
744
745 let content = &Message2bContent {
746 ballot: self.ballot,
747 };
748 let msg = content.pack(self.id)?;
749
750 self.msg_out_sender
751 .send(msg)
752 .map_err(|err| FailedToSendMessage(err.to_string()))?;
753 }
754 PartyEvent::Finalize => {
755 if self.status != PartyStatus::Passed2b {
756 return Err(PartyStatusMismatch {
757 party_status: self.status,
758 needed_status: PartyStatus::Passed2av,
759 }
760 .into());
761 }
762
763 self.status = PartyStatus::Finished;
764 }
765 }
766 Ok(())
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773
774 use crate::leader::DefaultLeaderElector;
775 use crate::test_mocks::{MockParty, MockValue};
776 use tokio::time;
777
778 #[test]
779 fn test_update_state_msg1a() {
780 let mut party = MockParty {
783 status: PartyStatus::Launched,
784 id: 0,
785 leader: 1,
786 ..Default::default()
787 };
788 let content = Message1aContent {
789 ballot: party.ballot,
790 };
791 let msg = content.pack(party.leader).unwrap();
792
793 party.update_state(&msg).unwrap();
794
795 assert_eq!(party.status, PartyStatus::Passed1a);
796 }
797
798 #[test]
799 fn test_update_state_msg1b() {
800 let mut party = MockParty {
801 status: PartyStatus::Passed1a,
802 ..Default::default()
803 };
804
805 let content = Message1bContent {
806 ballot: party.ballot,
807 last_ballot_voted: None,
808 last_value_voted: bincode::serialize(&MockValue::default()).ok(),
809 };
810
811 let msg = content.pack(1).unwrap();
813 party.update_state(&msg).unwrap();
814
815 let msg = content.pack(2).unwrap();
817 party.update_state(&msg).unwrap();
818
819 assert_eq!(party.status, PartyStatus::Passed1b);
822 }
823
824 #[test]
825 fn test_update_state_msg2a() {
826 let mut party = MockParty {
827 status: PartyStatus::Passed1b,
828 ..Default::default()
829 };
830
831 let content = Message2aContent {
832 ballot: party.ballot,
833 value: bincode::serialize(&MockValue::default()).unwrap(),
834 };
835 let msg = content.pack(party.leader).unwrap();
836
837 party.update_state(&msg).unwrap();
838
839 assert_eq!(party.status, PartyStatus::Passed2a);
840 }
841
842 #[test]
843 fn test_update_state_msg2av() {
844 let value_to_verify = MockValue::default();
845 let mut party = MockParty {
846 status: PartyStatus::Passed2a,
847 value_2a: Some(value_to_verify.clone()),
848 ..Default::default()
849 };
850
851 let content = Message2avContent {
852 ballot: party.ballot,
853 received_value: bincode::serialize(&value_to_verify).unwrap(),
854 };
855
856 let msg = content.pack(1).unwrap();
858 party.update_state(&msg).unwrap();
859
860 let msg = content.pack(2).unwrap();
862 party.update_state(&msg).unwrap();
863
864 assert_eq!(party.status, PartyStatus::Passed2av);
867 }
868
869 #[test]
870 fn test_update_state_msg2b() {
871 let mut party = MockParty {
872 status: PartyStatus::Passed2av,
873 ..Default::default()
874 };
875
876 party.messages_2av_state.add_sender(1, 1);
878 party.messages_2av_state.add_sender(2, 1);
879
880 let content = Message2bContent {
881 ballot: party.ballot,
882 };
883
884 let msg = content.pack(1).unwrap();
886 party.update_state(&msg).unwrap();
887
888 let msg = content.pack(2).unwrap();
890 party.update_state(&msg).unwrap();
891
892 assert_eq!(party.status, PartyStatus::Passed2b);
895 }
896
897 #[test]
898 fn test_follow_event_launch1a() {
899 let (mut party, _receiver_from, _) = MockParty::new(
902 Default::default(),
903 Default::default(),
904 Default::default(),
905 Box::new(DefaultLeaderElector::default()),
906 );
907
908 party.status = PartyStatus::Launched;
909 party.leader = party.id;
910
911 party.follow_event(PartyEvent::Launch1a).unwrap();
912
913 assert_eq!(party.status, PartyStatus::Passed1a);
917 }
918
919 #[tokio::test]
920 async fn test_launch_ballot_events() {
921 time::pause();
923
924 let cfg = BPConConfig::default();
925
926 let (mut party, _receiver_from, _sender_into) = MockParty::new(
929 Default::default(),
930 cfg.clone(),
931 Default::default(),
932 Box::new(DefaultLeaderElector::default()),
933 );
934
935 let (event_sender, mut event_receiver) = unbounded_channel();
936 let _event_sender = party.event_sender;
939 party.event_sender = event_sender;
940
941 _ = tokio::spawn(async move {
943 _ = party.launch_ballot().await;
944 });
945
946 time::advance(cfg.launch1a_timeout).await;
949 assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a);
950
951 time::advance(cfg.launch1b_timeout - cfg.launch1a_timeout).await;
952 assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1b);
953
954 time::advance(cfg.launch2a_timeout - cfg.launch1b_timeout).await;
955 assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2a);
956
957 time::advance(cfg.launch2av_timeout - cfg.launch2a_timeout).await;
958 assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2av);
959
960 time::advance(cfg.launch2b_timeout - cfg.launch2av_timeout).await;
961 assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2b);
962
963 time::advance(cfg.finalize_timeout - cfg.launch2b_timeout).await;
964 assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize);
965 }
966}