1use std::{
4 fmt::{self, Display, Formatter},
5 sync::Arc,
6};
7
8use derive_more::From;
9use fmt::Debug;
10use futures::{future::BoxFuture, FutureExt};
11use hex_fmt::HexFmt;
12use serde::{Deserialize, Serialize};
13use strum::EnumDiscriminants;
14
15use casper_types::{BlockV2, FinalitySignatureV2, Transaction};
16
17use crate::{
18 components::{
19 consensus,
20 fetcher::{FetchItem, FetchResponse, Tag},
21 gossiper,
22 network::{EstimatorWeights, FromIncoming, GossipedAddress, MessageKind, Payload},
23 },
24 effect::{
25 incoming::{
26 ConsensusDemand, ConsensusMessageIncoming, FinalitySignatureIncoming, GossiperIncoming,
27 NetRequest, NetRequestIncoming, NetResponse, NetResponseIncoming, TrieDemand,
28 TrieRequest, TrieRequestIncoming, TrieResponse, TrieResponseIncoming,
29 },
30 AutoClosingResponder, EffectBuilder,
31 },
32 types::NodeId,
33};
34
35#[derive(Clone, From, Serialize, Deserialize, EnumDiscriminants)]
37#[strum_discriminants(derive(strum::EnumIter))]
38pub(crate) enum Message {
39 #[from]
41 Consensus(consensus::ConsensusMessage),
42 #[from]
44 ConsensusRequest(consensus::ConsensusRequestMessage),
45 #[from]
47 BlockGossiper(gossiper::Message<BlockV2>),
48 #[from]
50 TransactionGossiper(gossiper::Message<Transaction>),
51 #[from]
52 FinalitySignatureGossiper(gossiper::Message<FinalitySignatureV2>),
53 #[from]
55 AddressGossiper(gossiper::Message<GossipedAddress>),
56 GetRequest {
58 tag: Tag,
60 serialized_id: Vec<u8>,
62 },
63 GetResponse {
65 tag: Tag,
67 serialized_item: Arc<[u8]>,
69 },
70 #[from]
72 FinalitySignature(Box<FinalitySignatureV2>),
73}
74
75impl Payload for Message {
76 #[inline]
77 fn message_kind(&self) -> MessageKind {
78 match self {
79 Message::Consensus(_) => MessageKind::Consensus,
80 Message::ConsensusRequest(_) => MessageKind::Consensus,
81 Message::BlockGossiper(_) => MessageKind::BlockGossip,
82 Message::TransactionGossiper(_) => MessageKind::TransactionGossip,
83 Message::AddressGossiper(_) => MessageKind::AddressGossip,
84 Message::GetRequest { tag, .. } | Message::GetResponse { tag, .. } => match tag {
85 Tag::Transaction | Tag::LegacyDeploy => MessageKind::TransactionTransfer,
86 Tag::Block => MessageKind::BlockTransfer,
87 Tag::BlockHeader => MessageKind::BlockTransfer,
88 Tag::TrieOrChunk => MessageKind::TrieTransfer,
89 Tag::FinalitySignature => MessageKind::Other,
90 Tag::SyncLeap => MessageKind::BlockTransfer,
91 Tag::ApprovalsHashes => MessageKind::BlockTransfer,
92 Tag::BlockExecutionResults => MessageKind::BlockTransfer,
93 },
94 Message::FinalitySignature(_) => MessageKind::Consensus,
95 Message::FinalitySignatureGossiper(_) => MessageKind::FinalitySignatureGossip,
96 }
97 }
98
99 fn is_low_priority(&self) -> bool {
100 match self {
103 Message::Consensus(_) => false,
104 Message::ConsensusRequest(_) => false,
105 Message::TransactionGossiper(_) => false,
106 Message::BlockGossiper(_) => false,
107 Message::FinalitySignatureGossiper(_) => false,
108 Message::AddressGossiper(_) => false,
109 Message::GetRequest { tag, .. } if *tag == Tag::TrieOrChunk => true,
110 Message::GetRequest { .. } => false,
111 Message::GetResponse { .. } => false,
112 Message::FinalitySignature(_) => false,
113 }
114 }
115
116 #[inline]
117 fn incoming_resource_estimate(&self, weights: &EstimatorWeights) -> u32 {
118 match self {
119 Message::Consensus(_) => weights.consensus,
120 Message::ConsensusRequest(_) => weights.consensus,
121 Message::BlockGossiper(_) => weights.block_gossip,
122 Message::TransactionGossiper(_) => weights.transaction_gossip,
123 Message::FinalitySignatureGossiper(_) => weights.finality_signature_gossip,
124 Message::AddressGossiper(_) => weights.address_gossip,
125 Message::GetRequest { tag, .. } => match tag {
126 Tag::Transaction => weights.transaction_requests,
127 Tag::LegacyDeploy => weights.legacy_deploy_requests,
128 Tag::Block => weights.block_requests,
129 Tag::BlockHeader => weights.block_header_requests,
130 Tag::TrieOrChunk => weights.trie_requests,
131 Tag::FinalitySignature => weights.finality_signature_requests,
132 Tag::SyncLeap => weights.sync_leap_requests,
133 Tag::ApprovalsHashes => weights.approvals_hashes_requests,
134 Tag::BlockExecutionResults => weights.execution_results_requests,
135 },
136 Message::GetResponse { tag, .. } => match tag {
137 Tag::Transaction => weights.transaction_responses,
138 Tag::LegacyDeploy => weights.legacy_deploy_responses,
139 Tag::Block => weights.block_responses,
140 Tag::BlockHeader => weights.block_header_responses,
141 Tag::TrieOrChunk => weights.trie_responses,
142 Tag::FinalitySignature => weights.finality_signature_responses,
143 Tag::SyncLeap => weights.sync_leap_responses,
144 Tag::ApprovalsHashes => weights.approvals_hashes_responses,
145 Tag::BlockExecutionResults => weights.execution_results_responses,
146 },
147 Message::FinalitySignature(_) => weights.finality_signature_broadcasts,
148 }
149 }
150
151 fn is_unsafe_for_syncing_peers(&self) -> bool {
152 match self {
153 Message::Consensus(_) => false,
154 Message::ConsensusRequest(_) => false,
155 Message::BlockGossiper(_) => false,
156 Message::TransactionGossiper(_) => false,
157 Message::FinalitySignatureGossiper(_) => false,
158 Message::AddressGossiper(_) => false,
159 Message::GetRequest { tag, .. } if *tag == Tag::TrieOrChunk => true,
161 Message::GetRequest { .. } => false,
162 Message::GetResponse { .. } => false,
163 Message::FinalitySignature(_) => false,
164 }
165 }
166}
167
168impl Message {
169 pub(crate) fn new_get_request<T: FetchItem>(id: &T::Id) -> Result<Self, bincode::Error> {
170 Ok(Message::GetRequest {
171 tag: T::TAG,
172 serialized_id: bincode::serialize(id)?,
173 })
174 }
175
176 pub(crate) fn new_get_response<T: FetchItem>(
177 item: &FetchResponse<T, T::Id>,
178 ) -> Result<Self, bincode::Error> {
179 Ok(Message::GetResponse {
180 tag: T::TAG,
181 serialized_item: item.to_serialized()?.into(),
182 })
183 }
184
185 pub(crate) fn new_get_response_from_serialized(tag: Tag, serialized_item: Arc<[u8]>) -> Self {
187 Message::GetResponse {
188 tag,
189 serialized_item,
190 }
191 }
192}
193
194impl Debug for Message {
195 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
196 match self {
197 Message::Consensus(c) => f.debug_tuple("Consensus").field(&c).finish(),
198 Message::ConsensusRequest(c) => f.debug_tuple("ConsensusRequest").field(&c).finish(),
199 Message::BlockGossiper(dg) => f.debug_tuple("BlockGossiper").field(&dg).finish(),
200 Message::TransactionGossiper(dg) => f.debug_tuple("DeployGossiper").field(&dg).finish(),
201 Message::FinalitySignatureGossiper(sig) => f
202 .debug_tuple("FinalitySignatureGossiper")
203 .field(&sig)
204 .finish(),
205 Message::AddressGossiper(ga) => f.debug_tuple("AddressGossiper").field(&ga).finish(),
206 Message::GetRequest { tag, serialized_id } => f
207 .debug_struct("GetRequest")
208 .field("tag", tag)
209 .field("serialized_id", &HexFmt(serialized_id))
210 .finish(),
211 Message::GetResponse {
212 tag,
213 serialized_item,
214 } => f
215 .debug_struct("GetResponse")
216 .field("tag", tag)
217 .field(
218 "serialized_item",
219 &format!("{} bytes", serialized_item.len()),
220 )
221 .finish(),
222 Message::FinalitySignature(fs) => {
223 f.debug_tuple("FinalitySignature").field(&fs).finish()
224 }
225 }
226 }
227}
228mod specimen_support {
229 use crate::utils::specimen::{
230 largest_get_request, largest_get_response, largest_variant, Cache, LargestSpecimen,
231 SizeEstimator,
232 };
233
234 use super::{Message, MessageDiscriminants};
235
236 impl LargestSpecimen for Message {
237 fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
238 largest_variant::<Self, MessageDiscriminants, _, _>(
239 estimator,
240 |variant| match variant {
241 MessageDiscriminants::Consensus => {
242 Message::Consensus(LargestSpecimen::largest_specimen(estimator, cache))
243 }
244 MessageDiscriminants::ConsensusRequest => Message::ConsensusRequest(
245 LargestSpecimen::largest_specimen(estimator, cache),
246 ),
247 MessageDiscriminants::BlockGossiper => {
248 Message::BlockGossiper(LargestSpecimen::largest_specimen(estimator, cache))
249 }
250 MessageDiscriminants::TransactionGossiper => Message::TransactionGossiper(
251 LargestSpecimen::largest_specimen(estimator, cache),
252 ),
253 MessageDiscriminants::FinalitySignatureGossiper => {
254 Message::FinalitySignatureGossiper(LargestSpecimen::largest_specimen(
255 estimator, cache,
256 ))
257 }
258 MessageDiscriminants::AddressGossiper => Message::AddressGossiper(
259 LargestSpecimen::largest_specimen(estimator, cache),
260 ),
261 MessageDiscriminants::GetRequest => largest_get_request(estimator, cache),
262 MessageDiscriminants::GetResponse => largest_get_response(estimator, cache),
263 MessageDiscriminants::FinalitySignature => Message::FinalitySignature(
264 LargestSpecimen::largest_specimen(estimator, cache),
265 ),
266 },
267 )
268 }
269 }
270}
271
272impl Display for Message {
273 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
274 match self {
275 Message::Consensus(consensus) => write!(f, "Consensus::{}", consensus),
276 Message::ConsensusRequest(consensus) => write!(f, "ConsensusRequest({})", consensus),
277 Message::BlockGossiper(deploy) => write!(f, "BlockGossiper::{}", deploy),
278 Message::TransactionGossiper(txn) => write!(f, "TransactionGossiper::{}", txn),
279 Message::FinalitySignatureGossiper(sig) => {
280 write!(f, "FinalitySignatureGossiper::{}", sig)
281 }
282 Message::AddressGossiper(gossiped_address) => {
283 write!(f, "AddressGossiper::({})", gossiped_address)
284 }
285 Message::GetRequest { tag, serialized_id } => {
286 write!(f, "GetRequest({}-{:10})", tag, HexFmt(serialized_id))
287 }
288 Message::GetResponse {
289 tag,
290 serialized_item,
291 } => write!(f, "GetResponse({}-{:10})", tag, HexFmt(serialized_item)),
292 Message::FinalitySignature(fs) => {
293 write!(f, "FinalitySignature::({})", fs)
294 }
295 }
296 }
297}
298
299impl<REv> FromIncoming<Message> for REv
300where
301 REv: From<ConsensusMessageIncoming>
302 + From<ConsensusDemand>
303 + From<GossiperIncoming<BlockV2>>
304 + From<GossiperIncoming<Transaction>>
305 + From<GossiperIncoming<FinalitySignatureV2>>
306 + From<GossiperIncoming<GossipedAddress>>
307 + From<NetRequestIncoming>
308 + From<NetResponseIncoming>
309 + From<TrieRequestIncoming>
310 + From<TrieDemand>
311 + From<TrieResponseIncoming>
312 + From<FinalitySignatureIncoming>,
313{
314 fn from_incoming(sender: NodeId, payload: Message) -> Self {
315 match payload {
316 Message::Consensus(message) => ConsensusMessageIncoming {
317 sender,
318 message: Box::new(message),
319 }
320 .into(),
321 Message::ConsensusRequest(_message) => {
322 unreachable!("called from_incoming with a consensus request")
324 }
325 Message::BlockGossiper(message) => GossiperIncoming {
326 sender,
327 message: Box::new(message),
328 }
329 .into(),
330 Message::TransactionGossiper(message) => GossiperIncoming {
331 sender,
332 message: Box::new(message),
333 }
334 .into(),
335 Message::FinalitySignatureGossiper(message) => GossiperIncoming {
336 sender,
337 message: Box::new(message),
338 }
339 .into(),
340 Message::AddressGossiper(message) => GossiperIncoming {
341 sender,
342 message: Box::new(message),
343 }
344 .into(),
345 Message::GetRequest { tag, serialized_id } => match tag {
346 Tag::Transaction => NetRequestIncoming {
347 sender,
348 message: Box::new(NetRequest::Transaction(serialized_id)),
349 }
350 .into(),
351 Tag::LegacyDeploy => NetRequestIncoming {
352 sender,
353 message: Box::new(NetRequest::LegacyDeploy(serialized_id)),
354 }
355 .into(),
356 Tag::Block => NetRequestIncoming {
357 sender,
358 message: Box::new(NetRequest::Block(serialized_id)),
359 }
360 .into(),
361 Tag::BlockHeader => NetRequestIncoming {
362 sender,
363 message: Box::new(NetRequest::BlockHeader(serialized_id)),
364 }
365 .into(),
366 Tag::TrieOrChunk => TrieRequestIncoming {
367 sender,
368 message: Box::new(TrieRequest(serialized_id)),
369 }
370 .into(),
371 Tag::FinalitySignature => NetRequestIncoming {
372 sender,
373 message: Box::new(NetRequest::FinalitySignature(serialized_id)),
374 }
375 .into(),
376 Tag::SyncLeap => NetRequestIncoming {
377 sender,
378 message: Box::new(NetRequest::SyncLeap(serialized_id)),
379 }
380 .into(),
381 Tag::ApprovalsHashes => NetRequestIncoming {
382 sender,
383 message: Box::new(NetRequest::ApprovalsHashes(serialized_id)),
384 }
385 .into(),
386 Tag::BlockExecutionResults => NetRequestIncoming {
387 sender,
388 message: Box::new(NetRequest::BlockExecutionResults(serialized_id)),
389 }
390 .into(),
391 },
392 Message::GetResponse {
393 tag,
394 serialized_item,
395 } => match tag {
396 Tag::Transaction => NetResponseIncoming {
397 sender,
398 message: Box::new(NetResponse::Transaction(serialized_item)),
399 }
400 .into(),
401 Tag::LegacyDeploy => NetResponseIncoming {
402 sender,
403 message: Box::new(NetResponse::LegacyDeploy(serialized_item)),
404 }
405 .into(),
406 Tag::Block => NetResponseIncoming {
407 sender,
408 message: Box::new(NetResponse::Block(serialized_item)),
409 }
410 .into(),
411 Tag::BlockHeader => NetResponseIncoming {
412 sender,
413 message: Box::new(NetResponse::BlockHeader(serialized_item)),
414 }
415 .into(),
416 Tag::TrieOrChunk => TrieResponseIncoming {
417 sender,
418 message: Box::new(TrieResponse(serialized_item.to_vec())),
419 }
420 .into(),
421 Tag::FinalitySignature => NetResponseIncoming {
422 sender,
423 message: Box::new(NetResponse::FinalitySignature(serialized_item)),
424 }
425 .into(),
426 Tag::SyncLeap => NetResponseIncoming {
427 sender,
428 message: Box::new(NetResponse::SyncLeap(serialized_item)),
429 }
430 .into(),
431 Tag::ApprovalsHashes => NetResponseIncoming {
432 sender,
433 message: Box::new(NetResponse::ApprovalsHashes(serialized_item)),
434 }
435 .into(),
436 Tag::BlockExecutionResults => NetResponseIncoming {
437 sender,
438 message: Box::new(NetResponse::BlockExecutionResults(serialized_item)),
439 }
440 .into(),
441 },
442 Message::FinalitySignature(message) => {
443 FinalitySignatureIncoming { sender, message }.into()
444 }
445 }
446 }
447
448 fn try_demand_from_incoming(
449 effect_builder: EffectBuilder<REv>,
450 sender: NodeId,
451 payload: Message,
452 ) -> Result<(Self, BoxFuture<'static, Option<Message>>), Message>
453 where
454 Self: Sized + Send,
455 {
456 match payload {
457 Message::GetRequest {
458 tag: Tag::TrieOrChunk,
459 serialized_id,
460 } => {
461 let (ev, fut) = effect_builder.create_request_parts(move |responder| TrieDemand {
462 sender,
463 request_msg: Box::new(TrieRequest(serialized_id)),
464 auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
465 });
466
467 Ok((ev, fut.boxed()))
468 }
469 Message::ConsensusRequest(request_msg) => {
470 let (ev, fut) =
471 effect_builder.create_request_parts(move |responder| ConsensusDemand {
472 sender,
473 request_msg: Box::new(request_msg),
474 auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
475 });
476
477 Ok((ev, fut.boxed()))
478 }
479 _ => Err(payload),
480 }
481 }
482}