dusk_consensus/
msg_handler.rs1use async_trait::async_trait;
8use node_data::bls::PublicKeyBytes;
9use node_data::message::{Message, Payload, Status};
10use node_data::StepName;
11use tracing::{debug, warn};
12
13use crate::commons::RoundUpdate;
14use crate::errors::ConsensusError;
15use crate::iteration_ctx::RoundCommittees;
16use crate::ratification::handler::RatificationHandler;
17use crate::user::committee::Committee;
18use crate::{proposal, validation};
19
20#[allow(clippy::large_enum_variant)]
23pub enum StepOutcome {
24 Pending,
25 Ready(Message),
26}
27
28#[async_trait]
31pub trait MsgHandler {
32 fn is_valid(
37 &self,
38 msg: &Message,
39 ru: &RoundUpdate,
40 current_iteration: u8,
41 step: StepName,
42 committee: &Committee,
43 round_committees: &RoundCommittees,
44 ) -> Result<(), ConsensusError> {
45 let signer = msg.get_signer();
46
47 debug!(
48 event = "validating msg",
49 signer = signer.as_ref().map(|s| s.to_bs58()),
50 src_addr = ?msg.metadata.as_ref().map(|m| m.src_addr),
51 topic = ?msg.topic(),
52 step = msg.get_step(),
53 ray_id = msg.ray_id(),
54 );
55
56 let msg_tip = msg.header.prev_block_hash;
59 match msg.compare(ru.round, current_iteration, step) {
60 Status::Past => {
61 Self::verify_message(msg, ru, round_committees, Status::Past)?;
62 Err(ConsensusError::PastEvent)
63 }
64 Status::Present => {
65 if msg_tip != ru.hash() {
66 return Err(ConsensusError::InvalidPrevBlockHash(msg_tip));
67 }
68
69 let signer = signer.ok_or(ConsensusError::InvalidMsgType)?;
70 if !committee.is_member(&signer) {
72 return Err(ConsensusError::NotCommitteeMember);
73 }
74
75 self.verify(msg, round_committees)
79 }
80 Status::Future => {
81 Self::verify_message(
82 msg,
83 ru,
84 round_committees,
85 Status::Future,
86 )?;
87 Err(ConsensusError::FutureEvent)
88 }
89 }
90 }
91
92 fn verify_message(
94 msg: &Message,
95 ru: &RoundUpdate,
96 round_committees: &RoundCommittees,
97 status: Status,
98 ) -> Result<(), ConsensusError> {
99 if msg.header.round == ru.round {
101 let msg_tip = msg.header.prev_block_hash;
102 if msg_tip != ru.hash() {
103 return Err(ConsensusError::InvalidPrevBlockHash(msg_tip));
104 }
105
106 let step = msg.get_step();
107 if let Some(committee) = round_committees.get_committee(step) {
108 if !matches!(msg.payload, Payload::ValidationQuorum(_)) {
111 let signer = msg.get_signer().expect("signer to exist");
112
113 if !committee.is_member(&signer) {
114 return Err(ConsensusError::NotCommitteeMember);
115 }
116 }
117
118 match &msg.payload {
119 node_data::message::Payload::Ratification(_)
120 | node_data::message::Payload::ValidationQuorum(_) => {
121 RatificationHandler::verify_stateless(
122 msg,
123 round_committees,
124 )?;
125 }
126 node_data::message::Payload::Validation(_) => {
127 validation::handler::verify_stateless(
128 msg,
129 round_committees,
130 )?;
131 }
132 node_data::message::Payload::Candidate(c) => {
133 proposal::handler::verify_stateless(
134 c,
135 round_committees,
136 )?;
137 }
138 _ => {
139 warn!(
140 "{status:?} message not repropagated {:?}",
141 msg.topic()
142 );
143 Err(ConsensusError::InvalidMsgType)?;
144 }
145 }
146 } else {
147 warn!("{status:?} committee for step {step} not generated; skipping pre-verification for {:?} message", msg.topic());
148 }
149 }
150 Ok(())
151 }
152
153 fn verify(
155 &self,
156 msg: &Message,
157 round_committees: &RoundCommittees,
158 ) -> Result<(), ConsensusError>;
159
160 async fn collect(
162 &mut self,
163 msg: Message,
164 ru: &RoundUpdate,
165 committee: &Committee,
166 generator: Option<PublicKeyBytes>,
167 round_committees: &RoundCommittees,
168 ) -> Result<StepOutcome, ConsensusError>;
169
170 async fn collect_from_past(
173 &mut self,
174 msg: Message,
175 committee: &Committee,
176 generator: Option<PublicKeyBytes>,
177 ) -> Result<StepOutcome, ConsensusError>;
178
179 fn handle_timeout(
182 &self,
183 ru: &RoundUpdate,
184 curr_iteration: u8,
185 ) -> Option<Message>;
186}