test_semantic/
party.rs

1//! Definitions central to BPCon participant.
2//!
3//! This module contains the implementation of a `Party` in the BPCon consensus protocol.
4//! The `Party` manages the execution of the ballot, handles incoming messages, and coordinates with other participants
5//! to reach a consensus. It uses various components such as leader election and value selection to perform its duties.
6
7use crate::config::BPConConfig;
8use crate::error::FollowEventError::FailedToSendMessage;
9use crate::error::LaunchBallotError::{
10    EventChannelClosed, FailedToSendEvent, LeaderElectionError, MessageChannelClosed,
11};
12use crate::error::UpdateStateError::ValueVerificationFailed;
13use crate::error::{
14    BallotNumberMismatch, DeserializationError, FollowEventError, LaunchBallotError,
15    LeaderMismatch, PartyStatusMismatch, SerializationError, UpdateStateError, ValueMismatch,
16};
17use crate::leader::LeaderElector;
18use crate::message::{
19    Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent,
20    MessagePacket, MessageRoundState, ProtocolMessage,
21};
22use crate::value::{Value, ValueSelector};
23use log::{debug, warn};
24use std::cmp::PartialEq;
25use std::collections::hash_map::Entry::Vacant;
26use std::collections::{HashMap, HashSet};
27use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
28use tokio::time::{sleep, sleep_until};
29
30/// Represents the status of a `Party` in the BPCon consensus protocol.
31///
32/// The status indicates the current phase or outcome of the ballot execution for this party.
33/// It transitions through various states as the party progresses through the protocol.
34#[derive(PartialEq, Eq, Debug, Copy, Clone)]
35pub enum PartyStatus {
36    None,
37    Launched,
38    Passed1a,
39    Passed1b,
40    Passed2a,
41    Passed2av,
42    Passed2b,
43    Finished,
44    Failed,
45}
46
47impl std::fmt::Display for PartyStatus {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        write!(f, "PartyStatus: {:?}", self)
50    }
51}
52
53/// Represents the events that control the flow of the ballot process in a `Party`.
54///
55/// These events trigger transitions between different phases of the protocol.
56#[derive(PartialEq, Eq, Debug, Copy, Clone)]
57pub enum PartyEvent {
58    Launch1a,
59    Launch1b,
60    Launch2a,
61    Launch2av,
62    Launch2b,
63    Finalize,
64}
65
66impl std::fmt::Display for PartyEvent {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(f, "PartyEvent: {:?}", self)
69    }
70}
71
72/// A participant in the BPCon protocol responsible for executing ballots.
73///
74/// A `Party` manages the execution of ballots by communicating with other parties, processing incoming messages,
75/// and following the protocol's steps. It uses an internal state machine to track its progress through the ballot process.
76///
77/// # Communication
78/// - `msg_in_receiver`: Receives incoming messages from other parties.
79/// - `msg_out_sender`: Sends outgoing messages to other parties.
80/// - `event_receiver`: Receives events that drive the ballot process.
81/// - `event_sender`: Sends events to trigger actions in the ballot process.
82///
83/// The `Party` operates within a BPCon configuration and relies on a `ValueSelector` to choose values during the consensus process.
84/// A `LeaderElector` is used to determine the leader for each ballot.
85pub struct Party<V: Value, VS: ValueSelector<V>> {
86    /// The identifier of this party.
87    pub id: u64,
88
89    /// Queue for receiving incoming messages.
90    msg_in_receiver: UnboundedReceiver<MessagePacket>,
91    /// Queue for sending outgoing messages.
92    msg_out_sender: UnboundedSender<MessagePacket>,
93
94    /// Queue for receiving events that control the ballot process.
95    event_receiver: UnboundedReceiver<PartyEvent>,
96    /// Queue for sending events that control the ballot process.
97    event_sender: UnboundedSender<PartyEvent>,
98
99    /// BPCon configuration settings, including timeouts and party weights.
100    pub(crate) cfg: BPConConfig,
101
102    /// Component responsible for selecting values during the consensus process.
103    value_selector: VS,
104
105    /// Component responsible for electing a leader for each ballot.
106    elector: Box<dyn LeaderElector<V, VS>>,
107
108    /// The current status of the ballot execution for this party.
109    status: PartyStatus,
110
111    /// The current ballot number.
112    pub(crate) ballot: u64,
113
114    /// The leader for the current ballot.
115    leader: u64,
116
117    /// The last ballot number where this party submitted a 2b message.
118    last_ballot_voted: Option<u64>,
119
120    /// The last value for which this party submitted a 2b message.
121    last_value_voted: Option<V>,
122
123    /// DDoS prevention mechanism - we allow each party to send one message type per ballot.
124    rate_limiter: HashSet<(ProtocolMessage, u64)>,
125
126    // Local round fields
127    /// The state of 1b round, tracking which parties have voted and their corresponding values.
128    parties_voted_before: HashMap<u64, Option<V>>,
129    /// The cumulative weight of 1b messages received.
130    messages_1b_weight: u128,
131
132    /// The state of 2a round, storing the value proposed by this party.
133    value_2a: Option<V>,
134
135    /// The state of 2av round, tracking which parties have confirmed the 2a value.
136    messages_2av_state: MessageRoundState,
137
138    /// The state of 2b round, tracking which parties have sent 2b messages.
139    messages_2b_state: MessageRoundState,
140}
141
142impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
143    /// Creates a new `Party` instance.
144    ///
145    /// This constructor sets up the party with the given ID, BPCon configuration, value selector, and leader elector.
146    /// It also initializes communication channels for receiving and sending messages and events.
147    ///
148    /// # Parameters
149    /// - `id`: The unique identifier for this party.
150    /// - `cfg`: The BPCon configuration settings.
151    /// - `value_selector`: The component responsible for selecting values during the consensus process.
152    /// - `elector`: The component responsible for electing the leader for each ballot.
153    ///
154    /// # Returns
155    /// - A tuple containing:
156    ///   - The new `Party` instance.
157    ///   - The `UnboundedReceiver` for outgoing messages.
158    ///   - The `UnboundedSender` for incoming messages.
159    pub fn new(
160        id: u64,
161        cfg: BPConConfig,
162        value_selector: VS,
163        elector: Box<dyn LeaderElector<V, VS>>,
164    ) -> (
165        Self,
166        UnboundedReceiver<MessagePacket>,
167        UnboundedSender<MessagePacket>,
168    ) {
169        let (event_sender, event_receiver) = unbounded_channel();
170        let (msg_in_sender, msg_in_receiver) = unbounded_channel();
171        let (msg_out_sender, msg_out_receiver) = unbounded_channel();
172
173        (
174            Self {
175                id,
176                msg_in_receiver,
177                msg_out_sender,
178                event_receiver,
179                event_sender,
180                cfg,
181                value_selector,
182                elector,
183                status: PartyStatus::None,
184                ballot: 0,
185                leader: 0,
186                last_ballot_voted: None,
187                last_value_voted: None,
188                rate_limiter: HashSet::new(),
189                parties_voted_before: HashMap::new(),
190                messages_1b_weight: 0,
191                value_2a: None,
192                messages_2av_state: MessageRoundState::new(),
193                messages_2b_state: MessageRoundState::new(),
194            },
195            msg_out_receiver,
196            msg_in_sender,
197        )
198    }
199
200    /// Returns the current ballot number.
201    pub fn ballot(&self) -> u64 {
202        self.ballot
203    }
204
205    /// Checks if the ballot process has been launched.
206    ///
207    /// # Returns
208    /// - `true` if the ballot is currently active; `false` otherwise.
209    pub fn is_launched(&self) -> bool {
210        !(self.is_stopped() || self.status == PartyStatus::None)
211    }
212
213    /// Checks if the ballot process has been stopped.
214    ///
215    /// # Returns
216    /// - `true` if the ballot has finished or failed; `false` otherwise.
217    pub fn is_stopped(&self) -> bool {
218        self.status == PartyStatus::Finished || self.status == PartyStatus::Failed
219    }
220
221    /// Retrieves the selected value if the ballot process has finished successfully.
222    ///
223    /// # Returns
224    /// - `Some(V)` if the ballot reached a consensus and the value was selected.
225    /// - `None` if the ballot did not reach consensus or is still ongoing.
226    pub fn get_value_selected(&self) -> Option<V> {
227        // Only `Finished` status means reached BFT agreement
228        if self.status == PartyStatus::Finished {
229            return self.value_2a.clone();
230        }
231
232        None
233    }
234
235    /// Selects a value based on the current state of the party.
236    ///
237    /// This method delegates to the `ValueSelector` to determine the value that should be selected
238    /// based on the votes received in the 1b round.
239    ///
240    /// # Returns
241    /// - The value selected by the `ValueSelector`.
242    fn get_value(&self) -> V {
243        self.value_selector.select(&self.parties_voted_before)
244    }
245
246    /// Launches the ballot process.
247    ///
248    /// This method initiates the ballot process, advancing through the different phases of the protocol
249    /// by sending and receiving events and messages. It handles timeouts for each phase and processes
250    /// incoming messages to update the party's state.
251    ///
252    /// # Returns
253    /// - `Ok(Some(V))`: The selected value if the ballot reaches consensus.
254    /// - `Ok(None)`: If the ballot process is terminated without reaching consensus.
255    /// - `Err(LaunchBallotError)`: If an error occurs during the ballot process.
256    pub async fn launch_ballot(&mut self) -> Result<V, LaunchBallotError> {
257        self.prepare_next_ballot()?;
258
259        sleep_until(self.cfg.launch_at).await;
260
261        let launch1a_timer = sleep(self.cfg.launch1a_timeout);
262        let launch1b_timer = sleep(self.cfg.launch1b_timeout);
263        let launch2a_timer = sleep(self.cfg.launch2a_timeout);
264        let launch2av_timer = sleep(self.cfg.launch2av_timeout);
265        let launch2b_timer = sleep(self.cfg.launch2b_timeout);
266        let finalize_timer = sleep(self.cfg.finalize_timeout);
267
268        tokio::pin!(
269            launch1a_timer,
270            launch1b_timer,
271            launch2a_timer,
272            launch2av_timer,
273            launch2b_timer,
274            finalize_timer
275        );
276
277        let mut launch1a_fired = false;
278        let mut launch1b_fired = false;
279        let mut launch2a_fired = false;
280        let mut launch2av_fired = false;
281        let mut launch2b_fired = false;
282        let mut finalize_fired = false;
283
284        while self.status != PartyStatus::Finished {
285            tokio::select! {
286                _ = &mut launch1a_timer, if !launch1a_fired => {
287                    self.event_sender.send(PartyEvent::Launch1a).map_err(|err| {
288                        self.status = PartyStatus::Failed;
289                        FailedToSendEvent(PartyEvent::Launch1a, err.to_string())
290                    })?;
291                    launch1a_fired = true;
292                },
293                _ = &mut launch1b_timer, if !launch1b_fired => {
294                    self.event_sender.send(PartyEvent::Launch1b).map_err(|err| {
295                        self.status = PartyStatus::Failed;
296                        FailedToSendEvent(PartyEvent::Launch1b, err.to_string())
297                    })?;
298                    launch1b_fired = true;
299                },
300                _ = &mut launch2a_timer, if !launch2a_fired => {
301                    self.event_sender.send(PartyEvent::Launch2a).map_err(|err| {
302                        self.status = PartyStatus::Failed;
303                        FailedToSendEvent(PartyEvent::Launch2a, err.to_string())
304                    })?;
305                    launch2a_fired = true;
306                },
307                _ = &mut launch2av_timer, if !launch2av_fired => {
308                    self.event_sender.send(PartyEvent::Launch2av).map_err(|err| {
309                        self.status = PartyStatus::Failed;
310                         FailedToSendEvent(PartyEvent::Launch2av, err.to_string())
311                    })?;
312                    launch2av_fired = true;
313                },
314                _ = &mut launch2b_timer, if !launch2b_fired => {
315                    self.event_sender.send(PartyEvent::Launch2b).map_err(|err| {
316                        self.status = PartyStatus::Failed;
317                        FailedToSendEvent(PartyEvent::Launch2b, err.to_string())
318                    })?;
319                    launch2b_fired = true;
320                },
321                _ = &mut finalize_timer, if !finalize_fired => {
322                    self.event_sender.send(PartyEvent::Finalize).map_err(|err| {
323                        self.status = PartyStatus::Failed;
324                        FailedToSendEvent(PartyEvent::Finalize, err.to_string())
325                    })?;
326                    finalize_fired = true;
327                },
328                msg = self.msg_in_receiver.recv() => {
329                    sleep(self.cfg.grace_period).await;
330                    if let Some(msg) = msg {
331                        let meta = (msg.routing.msg_type, msg.routing.sender);
332                        debug!("Party {} received {} from party {}", self.id, meta.0, meta.1);
333
334                        if self.id == meta.1{
335                            warn!("Received own message {}, intended to be broadcasted.", meta.0);
336                            continue
337                        }
338                        if self.rate_limiter.contains(&meta){
339                            warn!("Party {} hit rate limit in party {} for message {}", meta.1, self.id, meta.0);
340                            continue
341                        }
342
343                        if let Err(err) = self.update_state(&msg) {
344                            // Shouldn't fail the party, since invalid message
345                            // may be sent by anyone. Furthermore, since in consensus
346                            // we are relying on redundancy of parties, we actually may need
347                            // less messages than from every party to transit to next status.
348                            warn!("Failed to update state for party {} with {}, got error: {err}", self.id, meta.0)
349                        }
350                        self.rate_limiter.insert(meta);
351
352                    }else if self.msg_in_receiver.is_closed(){
353                         self.status = PartyStatus::Failed;
354                         return Err(MessageChannelClosed)
355                    }
356                },
357                event = self.event_receiver.recv() => {
358                    sleep(self.cfg.grace_period).await;
359                    if let Some(event) = event {
360                        if let Err(err) = self.follow_event(event) {
361                            self.status = PartyStatus::Failed;
362                            return Err(LaunchBallotError::FollowEventError(event, err));
363                        }
364                    }else if self.event_receiver.is_closed(){
365                        self.status = PartyStatus::Failed;
366                        return Err(EventChannelClosed)
367                    }
368                },
369            }
370        }
371
372        Ok(self.get_value_selected().unwrap())
373    }
374
375    /// Prepares the party's state for the next ballot.
376    ///
377    /// This method resets the party's state, increments the ballot number, and elects a new leader.
378    ///
379    /// # Returns
380    /// - `Ok(())`: If the preparation is successful.
381    /// - `Err(LaunchBallotError)`: If an error occurs during leader election.
382    fn prepare_next_ballot(&mut self) -> Result<(), LaunchBallotError> {
383        self.reset_state();
384        self.ballot += 1;
385        self.status = PartyStatus::Launched;
386        self.leader = self
387            .elector
388            .elect_leader(self)
389            .map_err(|err| LeaderElectionError(err.to_string()))?;
390
391        Ok(())
392    }
393
394    /// Resets the party's state for a new round of ballot execution.
395    ///
396    /// This method clears the state associated with previous rounds and prepares the party for the next ballot.
397    fn reset_state(&mut self) {
398        self.rate_limiter = HashSet::new();
399        self.parties_voted_before = HashMap::new();
400        self.messages_1b_weight = 0;
401        self.value_2a = None;
402        self.messages_2av_state.reset();
403        self.messages_2b_state.reset();
404
405        // Cleaning channels
406        while self.event_receiver.try_recv().is_ok() {}
407        while self.msg_in_receiver.try_recv().is_ok() {}
408    }
409
410    /// Updates the party's state based on an incoming message.
411    ///
412    /// This method processes a message according to its type and updates the party's internal state accordingly.
413    /// It performs validation checks to ensure that the message is consistent with the current ballot and protocol rules.
414    ///
415    /// # Parameters
416    /// - `msg`: The incoming `MessagePacket` to be processed.
417    ///
418    /// # Returns
419    /// - `Ok(())`: If the state is successfully updated.
420    /// - `Err(UpdateStateError<V>)`: If an error occurs during the update, such as a mismatch in the ballot number or leader.
421    fn update_state(&mut self, msg: &MessagePacket) -> Result<(), UpdateStateError<V>> {
422        let routing = msg.routing;
423
424        match routing.msg_type {
425            ProtocolMessage::Msg1a => {
426                if self.status != PartyStatus::Launched {
427                    return Err(PartyStatusMismatch {
428                        party_status: self.status,
429                        needed_status: PartyStatus::Launched,
430                    }
431                    .into());
432                }
433
434                let msg = Message1aContent::unpack(msg)?;
435
436                if msg.ballot != self.ballot {
437                    return Err(BallotNumberMismatch {
438                        party_ballot_number: self.ballot,
439                        message_ballot_number: msg.ballot,
440                    }
441                    .into());
442                }
443
444                if routing.sender != self.leader {
445                    return Err(LeaderMismatch {
446                        party_leader: self.leader,
447                        message_sender: routing.sender,
448                    }
449                    .into());
450                }
451
452                self.status = PartyStatus::Passed1a;
453            }
454            ProtocolMessage::Msg1b => {
455                if self.status != PartyStatus::Passed1a {
456                    return Err(PartyStatusMismatch {
457                        party_status: self.status,
458                        needed_status: PartyStatus::Passed1a,
459                    }
460                    .into());
461                }
462
463                let msg = Message1bContent::unpack(msg)?;
464
465                if msg.ballot != self.ballot {
466                    return Err(BallotNumberMismatch {
467                        party_ballot_number: self.ballot,
468                        message_ballot_number: msg.ballot,
469                    }
470                    .into());
471                }
472
473                if let Some(last_ballot_voted) = msg.last_ballot_voted {
474                    if last_ballot_voted >= self.ballot {
475                        return Err(BallotNumberMismatch {
476                            party_ballot_number: self.ballot,
477                            message_ballot_number: msg.ballot,
478                        }
479                        .into());
480                    }
481                }
482
483                if let Vacant(e) = self.parties_voted_before.entry(routing.sender) {
484                    let value: Option<V> = match msg.last_value_voted {
485                        Some(ref data) => Some(
486                            bincode::deserialize(data)
487                                .map_err(|err| DeserializationError::Value(err.to_string()))?,
488                        ),
489                        None => None,
490                    };
491
492                    e.insert(value);
493
494                    self.messages_1b_weight +=
495                        self.cfg.party_weights[routing.sender as usize] as u128;
496
497                    let self_weight = self.cfg.party_weights[self.id as usize] as u128;
498                    if self.messages_1b_weight >= self.cfg.threshold.saturating_sub(self_weight) {
499                        self.status = PartyStatus::Passed1b;
500                    }
501                }
502            }
503            ProtocolMessage::Msg2a => {
504                if self.status != PartyStatus::Passed1b {
505                    return Err(PartyStatusMismatch {
506                        party_status: self.status,
507                        needed_status: PartyStatus::Passed1b,
508                    }
509                    .into());
510                }
511
512                let msg = Message2aContent::unpack(msg)?;
513
514                if msg.ballot != self.ballot {
515                    return Err(BallotNumberMismatch {
516                        party_ballot_number: self.ballot,
517                        message_ballot_number: msg.ballot,
518                    }
519                    .into());
520                }
521
522                if routing.sender != self.leader {
523                    return Err(LeaderMismatch {
524                        party_leader: self.leader,
525                        message_sender: routing.sender,
526                    }
527                    .into());
528                }
529
530                let value_received = bincode::deserialize(&msg.value[..])
531                    .map_err(|err| DeserializationError::Value(err.to_string()))?;
532
533                if self
534                    .value_selector
535                    .verify(&value_received, &self.parties_voted_before)
536                {
537                    self.status = PartyStatus::Passed2a;
538                    self.value_2a = Some(value_received);
539                } else {
540                    return Err(ValueVerificationFailed);
541                }
542            }
543            ProtocolMessage::Msg2av => {
544                if self.status != PartyStatus::Passed2a {
545                    return Err(PartyStatusMismatch {
546                        party_status: self.status,
547                        needed_status: PartyStatus::Passed2a,
548                    }
549                    .into());
550                }
551
552                let msg = Message2avContent::unpack(msg)?;
553
554                if msg.ballot != self.ballot {
555                    return Err(BallotNumberMismatch {
556                        party_ballot_number: self.ballot,
557                        message_ballot_number: msg.ballot,
558                    }
559                    .into());
560                }
561                let value_received: V = bincode::deserialize(&msg.received_value[..])
562                    .map_err(|err| DeserializationError::Value(err.to_string()))?;
563
564                if value_received != self.value_2a.clone().unwrap() {
565                    return Err(ValueMismatch {
566                        party_value: self.value_2a.clone().unwrap(),
567                        message_value: value_received.clone(),
568                    }
569                    .into());
570                }
571
572                if !self.messages_2av_state.contains_sender(&routing.sender) {
573                    self.messages_2av_state.add_sender(
574                        routing.sender,
575                        self.cfg.party_weights[routing.sender as usize] as u128,
576                    );
577
578                    let self_weight = self.cfg.party_weights[self.id as usize] as u128;
579                    if self.messages_2av_state.get_weight()
580                        >= self.cfg.threshold.saturating_sub(self_weight)
581                    {
582                        self.status = PartyStatus::Passed2av;
583                    }
584                }
585            }
586            ProtocolMessage::Msg2b => {
587                if self.status != PartyStatus::Passed2av {
588                    return Err(PartyStatusMismatch {
589                        party_status: self.status,
590                        needed_status: PartyStatus::Passed2av,
591                    }
592                    .into());
593                }
594
595                let msg = Message2bContent::unpack(msg)?;
596
597                if msg.ballot != self.ballot {
598                    return Err(BallotNumberMismatch {
599                        party_ballot_number: self.ballot,
600                        message_ballot_number: msg.ballot,
601                    }
602                    .into());
603                }
604
605                if self.messages_2av_state.contains_sender(&routing.sender)
606                    && !self.messages_2b_state.contains_sender(&routing.sender)
607                {
608                    self.messages_2b_state.add_sender(
609                        routing.sender,
610                        self.cfg.party_weights[routing.sender as usize] as u128,
611                    );
612
613                    let self_weight = self.cfg.party_weights[self.id as usize] as u128;
614                    if self.messages_2b_state.get_weight()
615                        >= self.cfg.threshold.saturating_sub(self_weight)
616                    {
617                        self.status = PartyStatus::Passed2b;
618                    }
619                }
620            }
621        }
622        Ok(())
623    }
624
625    /// Executes ballot actions according to the received event.
626    ///
627    /// This method processes an event and triggers the corresponding action in the ballot process,
628    /// such as launching a new phase or finalizing the ballot.
629    ///
630    /// # Parameters
631    /// - `event`: The `PartyEvent` to process.
632    ///
633    /// # Returns
634    /// - `Ok(())`: If the event is successfully processed.
635    /// - `Err(FollowEventError)`: If an error occurs while processing the event.
636    fn follow_event(&mut self, event: PartyEvent) -> Result<(), FollowEventError> {
637        match event {
638            PartyEvent::Launch1a => {
639                if self.status != PartyStatus::Launched {
640                    return Err(PartyStatusMismatch {
641                        party_status: self.status,
642                        needed_status: PartyStatus::Launched,
643                    }
644                    .into());
645                }
646
647                if self.leader == self.id {
648                    let content = &Message1aContent {
649                        ballot: self.ballot,
650                    };
651                    let msg = content.pack(self.id)?;
652
653                    self.msg_out_sender
654                        .send(msg)
655                        .map_err(|err| FailedToSendMessage(err.to_string()))?;
656                    self.status = PartyStatus::Passed1a;
657                }
658            }
659            PartyEvent::Launch1b => {
660                if self.status != PartyStatus::Passed1a {
661                    return Err(PartyStatusMismatch {
662                        party_status: self.status,
663                        needed_status: PartyStatus::Passed1a,
664                    }
665                    .into());
666                }
667
668                let last_value_voted = self
669                    .last_value_voted
670                    .clone()
671                    .map(|inner_data| {
672                        bincode::serialize(&inner_data)
673                            .map_err(|err| SerializationError::Value(err.to_string()))
674                    })
675                    .transpose()?;
676
677                let content = &Message1bContent {
678                    ballot: self.ballot,
679                    last_ballot_voted: self.last_ballot_voted,
680                    last_value_voted,
681                };
682                let msg = content.pack(self.id)?;
683
684                self.msg_out_sender
685                    .send(msg)
686                    .map_err(|err| FailedToSendMessage(err.to_string()))?;
687            }
688            PartyEvent::Launch2a => {
689                if self.status != PartyStatus::Passed1b {
690                    return Err(PartyStatusMismatch {
691                        party_status: self.status,
692                        needed_status: PartyStatus::Passed1b,
693                    }
694                    .into());
695                }
696                if self.leader == self.id {
697                    let value = bincode::serialize(&self.get_value())
698                        .map_err(|err| SerializationError::Value(err.to_string()))?;
699
700                    let content = &Message2aContent {
701                        ballot: self.ballot,
702                        value,
703                    };
704                    let msg = content.pack(self.id)?;
705
706                    self.msg_out_sender
707                        .send(msg)
708                        .map_err(|err| FailedToSendMessage(err.to_string()))?;
709
710                    self.value_2a = Some(self.get_value());
711                    self.status = PartyStatus::Passed2a;
712                }
713            }
714            PartyEvent::Launch2av => {
715                if self.status != PartyStatus::Passed2a {
716                    return Err(PartyStatusMismatch {
717                        party_status: self.status,
718                        needed_status: PartyStatus::Passed2a,
719                    }
720                    .into());
721                }
722
723                let received_value = bincode::serialize(&self.value_2a.clone().unwrap())
724                    .map_err(|err| SerializationError::Value(err.to_string()))?;
725
726                let content = &Message2avContent {
727                    ballot: self.ballot,
728                    received_value,
729                };
730                let msg = content.pack(self.id)?;
731
732                self.msg_out_sender
733                    .send(msg)
734                    .map_err(|err| FailedToSendMessage(err.to_string()))?;
735            }
736            PartyEvent::Launch2b => {
737                if self.status != PartyStatus::Passed2av {
738                    return Err(PartyStatusMismatch {
739                        party_status: self.status,
740                        needed_status: PartyStatus::Passed2av,
741                    }
742                    .into());
743                }
744
745                let content = &Message2bContent {
746                    ballot: self.ballot,
747                };
748                let msg = content.pack(self.id)?;
749
750                self.msg_out_sender
751                    .send(msg)
752                    .map_err(|err| FailedToSendMessage(err.to_string()))?;
753            }
754            PartyEvent::Finalize => {
755                if self.status != PartyStatus::Passed2b {
756                    return Err(PartyStatusMismatch {
757                        party_status: self.status,
758                        needed_status: PartyStatus::Passed2av,
759                    }
760                    .into());
761                }
762
763                self.status = PartyStatus::Finished;
764            }
765        }
766        Ok(())
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773
774    use crate::leader::DefaultLeaderElector;
775    use crate::test_mocks::{MockParty, MockValue};
776    use tokio::time;
777
778    #[test]
779    fn test_update_state_msg1a() {
780        // We wish to simulate behavior of leader
781        // sending message to another participant.
782        let mut party = MockParty {
783            status: PartyStatus::Launched,
784            id: 0,
785            leader: 1,
786            ..Default::default()
787        };
788        let content = Message1aContent {
789            ballot: party.ballot,
790        };
791        let msg = content.pack(party.leader).unwrap();
792
793        party.update_state(&msg).unwrap();
794
795        assert_eq!(party.status, PartyStatus::Passed1a);
796    }
797
798    #[test]
799    fn test_update_state_msg1b() {
800        let mut party = MockParty {
801            status: PartyStatus::Passed1a,
802            ..Default::default()
803        };
804
805        let content = Message1bContent {
806            ballot: party.ballot,
807            last_ballot_voted: None,
808            last_value_voted: bincode::serialize(&MockValue::default()).ok(),
809        };
810
811        // First, send a 1b message from party 1.
812        let msg = content.pack(1).unwrap();
813        party.update_state(&msg).unwrap();
814
815        // Then, send a 1b message from party 2.
816        let msg = content.pack(2).unwrap();
817        party.update_state(&msg).unwrap();
818
819        // After both messages, the cumulative weight is
820        // 1 (own) + 1 (party 1) + 1 (party 2) = 3, which satisfies the threshold.
821        assert_eq!(party.status, PartyStatus::Passed1b);
822    }
823
824    #[test]
825    fn test_update_state_msg2a() {
826        let mut party = MockParty {
827            status: PartyStatus::Passed1b,
828            ..Default::default()
829        };
830
831        let content = Message2aContent {
832            ballot: party.ballot,
833            value: bincode::serialize(&MockValue::default()).unwrap(),
834        };
835        let msg = content.pack(party.leader).unwrap();
836
837        party.update_state(&msg).unwrap();
838
839        assert_eq!(party.status, PartyStatus::Passed2a);
840    }
841
842    #[test]
843    fn test_update_state_msg2av() {
844        let value_to_verify = MockValue::default();
845        let mut party = MockParty {
846            status: PartyStatus::Passed2a,
847            value_2a: Some(value_to_verify.clone()),
848            ..Default::default()
849        };
850
851        let content = Message2avContent {
852            ballot: party.ballot,
853            received_value: bincode::serialize(&value_to_verify).unwrap(),
854        };
855
856        // First, send a 2av message from party 1.
857        let msg = content.pack(1).unwrap();
858        party.update_state(&msg).unwrap();
859
860        // Then, send a 2av message from party 2.
861        let msg = content.pack(2).unwrap();
862        party.update_state(&msg).unwrap();
863
864        // After both messages, the cumulative weight is
865        // 1 (own) + 1 (party 1) + 1 (party 2) = 3, which satisfies the threshold.
866        assert_eq!(party.status, PartyStatus::Passed2av);
867    }
868
869    #[test]
870    fn test_update_state_msg2b() {
871        let mut party = MockParty {
872            status: PartyStatus::Passed2av,
873            ..Default::default()
874        };
875
876        // Simulate that both party 1 and party 2 have already sent 2av messages.
877        party.messages_2av_state.add_sender(1, 1);
878        party.messages_2av_state.add_sender(2, 1);
879
880        let content = Message2bContent {
881            ballot: party.ballot,
882        };
883
884        // First, send 2b message from party 1.
885        let msg = content.pack(1).unwrap();
886        party.update_state(&msg).unwrap();
887
888        // Then, send 2b message from party 2.
889        let msg = content.pack(2).unwrap();
890        party.update_state(&msg).unwrap();
891
892        // After both messages, the cumulative weight is
893        // 1 (own) + 1 (party 1) + 1 (party 2) = 3, which satisfies the threshold.
894        assert_eq!(party.status, PartyStatus::Passed2b);
895    }
896
897    #[test]
898    fn test_follow_event_launch1a() {
899        // Need to take ownership of msg_out_receiver, so that sender doesn't close,
900        // since otherwise msg_out_receiver will be dropped and party will fail.
901        let (mut party, _receiver_from, _) = MockParty::new(
902            Default::default(),
903            Default::default(),
904            Default::default(),
905            Box::new(DefaultLeaderElector::default()),
906        );
907
908        party.status = PartyStatus::Launched;
909        party.leader = party.id;
910
911        party.follow_event(PartyEvent::Launch1a).unwrap();
912
913        // If the party is the leader and in the Launched state, the event should trigger a message.
914        // And it's status shall update to Passed1a after sending 1a message,
915        // contrary to other participants, whose `Passed1a` updates only after receiving 1a message.
916        assert_eq!(party.status, PartyStatus::Passed1a);
917    }
918
919    #[tokio::test]
920    async fn test_launch_ballot_events() {
921        // Pause the Tokio time so we can manipulate it.
922        time::pause();
923
924        let cfg = BPConConfig::default();
925
926        // Returning both channels so that party won't fail,
927        // because associated channels will close otherwise.
928        let (mut party, _receiver_from, _sender_into) = MockParty::new(
929            Default::default(),
930            cfg.clone(),
931            Default::default(),
932            Box::new(DefaultLeaderElector::default()),
933        );
934
935        let (event_sender, mut event_receiver) = unbounded_channel();
936        // Keeping, so that associated party's event_receiver won't close
937        // and it doesn't fail.
938        let _event_sender = party.event_sender;
939        party.event_sender = event_sender;
940
941        // Spawn the launch_ballot function in a separate task.
942        _ = tokio::spawn(async move {
943            _ = party.launch_ballot().await;
944        });
945
946        // Sequential time advance and event check.
947
948        time::advance(cfg.launch1a_timeout).await;
949        assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a);
950
951        time::advance(cfg.launch1b_timeout - cfg.launch1a_timeout).await;
952        assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1b);
953
954        time::advance(cfg.launch2a_timeout - cfg.launch1b_timeout).await;
955        assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2a);
956
957        time::advance(cfg.launch2av_timeout - cfg.launch2a_timeout).await;
958        assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2av);
959
960        time::advance(cfg.launch2b_timeout - cfg.launch2av_timeout).await;
961        assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2b);
962
963        time::advance(cfg.finalize_timeout - cfg.launch2b_timeout).await;
964        assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize);
965    }
966}