dusk_consensus/
msg_handler.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use async_trait::async_trait;
8use node_data::bls::PublicKeyBytes;
9use node_data::message::{Message, Payload, Status};
10use node_data::StepName;
11use tracing::{debug, warn};
12
13use crate::commons::RoundUpdate;
14use crate::errors::ConsensusError;
15use crate::iteration_ctx::RoundCommittees;
16use crate::ratification::handler::RatificationHandler;
17use crate::user::committee::Committee;
18use crate::{proposal, validation};
19
20/// Indicates whether an output value is available for current step execution
21/// (Step is Ready) or needs to collect data (Step is Pending)
22#[allow(clippy::large_enum_variant)]
23pub enum StepOutcome {
24    Pending,
25    Ready(Message),
26}
27
28/// MsgHandler must be implemented by any step that needs to handle an external
29/// message within event_loop life-cycle.
30#[async_trait]
31pub trait MsgHandler {
32    /// is_valid checks a new message is valid in the first place.
33    ///
34    /// Only if the message has correct round and step and is signed by a
35    /// committee member then we delegate it to Phase::verify.
36    fn is_valid(
37        &self,
38        msg: &Message,
39        ru: &RoundUpdate,
40        current_iteration: u8,
41        step: StepName,
42        committee: &Committee,
43        round_committees: &RoundCommittees,
44    ) -> Result<(), ConsensusError> {
45        let signer = msg.get_signer();
46
47        debug!(
48            event = "validating msg",
49            signer = signer.as_ref().map(|s| s.to_bs58()),
50            src_addr = ?msg.metadata.as_ref().map(|m| m.src_addr),
51            topic = ?msg.topic(),
52            step = msg.get_step(),
53            ray_id = msg.ray_id(),
54        );
55
56        // We don't verify the tip here, otherwise future round messages will be
57        // discarded and not put into the queue
58        let msg_tip = msg.header.prev_block_hash;
59        match msg.compare(ru.round, current_iteration, step) {
60            Status::Past => {
61                Self::verify_message(msg, ru, round_committees, Status::Past)?;
62                Err(ConsensusError::PastEvent)
63            }
64            Status::Present => {
65                if msg_tip != ru.hash() {
66                    return Err(ConsensusError::InvalidPrevBlockHash(msg_tip));
67                }
68
69                let signer = signer.ok_or(ConsensusError::InvalidMsgType)?;
70                // Ensure the message originates from a committee member.
71                if !committee.is_member(&signer) {
72                    return Err(ConsensusError::NotCommitteeMember);
73                }
74
75                // Delegate message final verification to the phase instance.
76                // It is the phase that knows what message type to expect and if
77                // it is valid or not.
78                self.verify(msg, round_committees)
79            }
80            Status::Future => {
81                Self::verify_message(
82                    msg,
83                    ru,
84                    round_committees,
85                    Status::Future,
86                )?;
87                Err(ConsensusError::FutureEvent)
88            }
89        }
90    }
91
92    /// Verify step message for the current round with different iteration
93    fn verify_message(
94        msg: &Message,
95        ru: &RoundUpdate,
96        round_committees: &RoundCommittees,
97        status: Status,
98    ) -> Result<(), ConsensusError> {
99        // Pre-verify messages for the current round with different iteration
100        if msg.header.round == ru.round {
101            let msg_tip = msg.header.prev_block_hash;
102            if msg_tip != ru.hash() {
103                return Err(ConsensusError::InvalidPrevBlockHash(msg_tip));
104            }
105
106            let step = msg.get_step();
107            if let Some(committee) = round_committees.get_committee(step) {
108                // Ensure msg is signed by a committee member.
109                // We skip ValidationQuorum, since it has no signer
110                if !matches!(msg.payload, Payload::ValidationQuorum(_)) {
111                    let signer = msg.get_signer().expect("signer to exist");
112
113                    if !committee.is_member(&signer) {
114                        return Err(ConsensusError::NotCommitteeMember);
115                    }
116                }
117
118                match &msg.payload {
119                    node_data::message::Payload::Ratification(_)
120                    | node_data::message::Payload::ValidationQuorum(_) => {
121                        RatificationHandler::verify_stateless(
122                            msg,
123                            round_committees,
124                        )?;
125                    }
126                    node_data::message::Payload::Validation(_) => {
127                        validation::handler::verify_stateless(
128                            msg,
129                            round_committees,
130                        )?;
131                    }
132                    node_data::message::Payload::Candidate(c) => {
133                        proposal::handler::verify_stateless(
134                            c,
135                            round_committees,
136                        )?;
137                    }
138                    _ => {
139                        warn!(
140                            "{status:?} message not repropagated {:?}",
141                            msg.topic()
142                        );
143                        Err(ConsensusError::InvalidMsgType)?;
144                    }
145                }
146            } else {
147                warn!("{status:?} committee for step {step} not generated; skipping pre-verification for {:?} message", msg.topic());
148            }
149        }
150        Ok(())
151    }
152
153    /// verify allows each Phase to fully verify the message payload.
154    fn verify(
155        &self,
156        msg: &Message,
157        round_committees: &RoundCommittees,
158    ) -> Result<(), ConsensusError>;
159
160    /// collect allows each Phase to process a verified inbound message.
161    async fn collect(
162        &mut self,
163        msg: Message,
164        ru: &RoundUpdate,
165        committee: &Committee,
166        generator: Option<PublicKeyBytes>,
167        round_committees: &RoundCommittees,
168    ) -> Result<StepOutcome, ConsensusError>;
169
170    /// collect allows each Phase to process a verified message from a former
171    /// iteration
172    async fn collect_from_past(
173        &mut self,
174        msg: Message,
175        committee: &Committee,
176        generator: Option<PublicKeyBytes>,
177    ) -> Result<StepOutcome, ConsensusError>;
178
179    /// handle_timeout allows each Phase to handle a timeout event.
180    /// Returned Message here is sent to outboud queue.
181    fn handle_timeout(
182        &self,
183        ru: &RoundUpdate,
184        curr_iteration: u8,
185    ) -> Option<Message>;
186}