casper_node/
protocol.rs

1//! A network message type used for communication between nodes
2
3use std::{
4    fmt::{self, Display, Formatter},
5    sync::Arc,
6};
7
8use derive_more::From;
9use fmt::Debug;
10use futures::{future::BoxFuture, FutureExt};
11use hex_fmt::HexFmt;
12use serde::{Deserialize, Serialize};
13use strum::EnumDiscriminants;
14
15use casper_types::{BlockV2, FinalitySignatureV2, Transaction};
16
17use crate::{
18    components::{
19        consensus,
20        fetcher::{FetchItem, FetchResponse, Tag},
21        gossiper,
22        network::{EstimatorWeights, FromIncoming, GossipedAddress, MessageKind, Payload},
23    },
24    effect::{
25        incoming::{
26            ConsensusDemand, ConsensusMessageIncoming, FinalitySignatureIncoming, GossiperIncoming,
27            NetRequest, NetRequestIncoming, NetResponse, NetResponseIncoming, TrieDemand,
28            TrieRequest, TrieRequestIncoming, TrieResponse, TrieResponseIncoming,
29        },
30        AutoClosingResponder, EffectBuilder,
31    },
32    types::NodeId,
33};
34
35/// Reactor message.
36#[derive(Clone, From, Serialize, Deserialize, EnumDiscriminants)]
37#[strum_discriminants(derive(strum::EnumIter))]
38pub(crate) enum Message {
39    /// Consensus component message.
40    #[from]
41    Consensus(consensus::ConsensusMessage),
42    /// Consensus component demand.
43    #[from]
44    ConsensusRequest(consensus::ConsensusRequestMessage),
45    /// Block gossiper component message.
46    #[from]
47    BlockGossiper(gossiper::Message<BlockV2>),
48    /// Deploy gossiper component message.
49    #[from]
50    TransactionGossiper(gossiper::Message<Transaction>),
51    #[from]
52    FinalitySignatureGossiper(gossiper::Message<FinalitySignatureV2>),
53    /// Address gossiper component message.
54    #[from]
55    AddressGossiper(gossiper::Message<GossipedAddress>),
56    /// Request to get an item from a peer.
57    GetRequest {
58        /// The type tag of the requested item.
59        tag: Tag,
60        /// The serialized ID of the requested item.
61        serialized_id: Vec<u8>,
62    },
63    /// Response to a `GetRequest`.
64    GetResponse {
65        /// The type tag of the contained item.
66        tag: Tag,
67        /// The serialized item.
68        serialized_item: Arc<[u8]>,
69    },
70    /// Finality signature.
71    #[from]
72    FinalitySignature(Box<FinalitySignatureV2>),
73}
74
75impl Payload for Message {
76    #[inline]
77    fn message_kind(&self) -> MessageKind {
78        match self {
79            Message::Consensus(_) => MessageKind::Consensus,
80            Message::ConsensusRequest(_) => MessageKind::Consensus,
81            Message::BlockGossiper(_) => MessageKind::BlockGossip,
82            Message::TransactionGossiper(_) => MessageKind::TransactionGossip,
83            Message::AddressGossiper(_) => MessageKind::AddressGossip,
84            Message::GetRequest { tag, .. } | Message::GetResponse { tag, .. } => match tag {
85                Tag::Transaction | Tag::LegacyDeploy => MessageKind::TransactionTransfer,
86                Tag::Block => MessageKind::BlockTransfer,
87                Tag::BlockHeader => MessageKind::BlockTransfer,
88                Tag::TrieOrChunk => MessageKind::TrieTransfer,
89                Tag::FinalitySignature => MessageKind::Other,
90                Tag::SyncLeap => MessageKind::BlockTransfer,
91                Tag::ApprovalsHashes => MessageKind::BlockTransfer,
92                Tag::BlockExecutionResults => MessageKind::BlockTransfer,
93            },
94            Message::FinalitySignature(_) => MessageKind::Consensus,
95            Message::FinalitySignatureGossiper(_) => MessageKind::FinalitySignatureGossip,
96        }
97    }
98
99    fn is_low_priority(&self) -> bool {
100        // We only deprioritize requested trie nodes, as they are the most commonly requested item
101        // during fast sync.
102        match self {
103            Message::Consensus(_) => false,
104            Message::ConsensusRequest(_) => false,
105            Message::TransactionGossiper(_) => false,
106            Message::BlockGossiper(_) => false,
107            Message::FinalitySignatureGossiper(_) => false,
108            Message::AddressGossiper(_) => false,
109            Message::GetRequest { tag, .. } if *tag == Tag::TrieOrChunk => true,
110            Message::GetRequest { .. } => false,
111            Message::GetResponse { .. } => false,
112            Message::FinalitySignature(_) => false,
113        }
114    }
115
116    #[inline]
117    fn incoming_resource_estimate(&self, weights: &EstimatorWeights) -> u32 {
118        match self {
119            Message::Consensus(_) => weights.consensus,
120            Message::ConsensusRequest(_) => weights.consensus,
121            Message::BlockGossiper(_) => weights.block_gossip,
122            Message::TransactionGossiper(_) => weights.transaction_gossip,
123            Message::FinalitySignatureGossiper(_) => weights.finality_signature_gossip,
124            Message::AddressGossiper(_) => weights.address_gossip,
125            Message::GetRequest { tag, .. } => match tag {
126                Tag::Transaction => weights.transaction_requests,
127                Tag::LegacyDeploy => weights.legacy_deploy_requests,
128                Tag::Block => weights.block_requests,
129                Tag::BlockHeader => weights.block_header_requests,
130                Tag::TrieOrChunk => weights.trie_requests,
131                Tag::FinalitySignature => weights.finality_signature_requests,
132                Tag::SyncLeap => weights.sync_leap_requests,
133                Tag::ApprovalsHashes => weights.approvals_hashes_requests,
134                Tag::BlockExecutionResults => weights.execution_results_requests,
135            },
136            Message::GetResponse { tag, .. } => match tag {
137                Tag::Transaction => weights.transaction_responses,
138                Tag::LegacyDeploy => weights.legacy_deploy_responses,
139                Tag::Block => weights.block_responses,
140                Tag::BlockHeader => weights.block_header_responses,
141                Tag::TrieOrChunk => weights.trie_responses,
142                Tag::FinalitySignature => weights.finality_signature_responses,
143                Tag::SyncLeap => weights.sync_leap_responses,
144                Tag::ApprovalsHashes => weights.approvals_hashes_responses,
145                Tag::BlockExecutionResults => weights.execution_results_responses,
146            },
147            Message::FinalitySignature(_) => weights.finality_signature_broadcasts,
148        }
149    }
150
151    fn is_unsafe_for_syncing_peers(&self) -> bool {
152        match self {
153            Message::Consensus(_) => false,
154            Message::ConsensusRequest(_) => false,
155            Message::BlockGossiper(_) => false,
156            Message::TransactionGossiper(_) => false,
157            Message::FinalitySignatureGossiper(_) => false,
158            Message::AddressGossiper(_) => false,
159            // Trie requests can deadlock between syncing nodes.
160            Message::GetRequest { tag, .. } if *tag == Tag::TrieOrChunk => true,
161            Message::GetRequest { .. } => false,
162            Message::GetResponse { .. } => false,
163            Message::FinalitySignature(_) => false,
164        }
165    }
166}
167
168impl Message {
169    pub(crate) fn new_get_request<T: FetchItem>(id: &T::Id) -> Result<Self, bincode::Error> {
170        Ok(Message::GetRequest {
171            tag: T::TAG,
172            serialized_id: bincode::serialize(id)?,
173        })
174    }
175
176    pub(crate) fn new_get_response<T: FetchItem>(
177        item: &FetchResponse<T, T::Id>,
178    ) -> Result<Self, bincode::Error> {
179        Ok(Message::GetResponse {
180            tag: T::TAG,
181            serialized_item: item.to_serialized()?.into(),
182        })
183    }
184
185    /// Creates a new get response from already serialized data.
186    pub(crate) fn new_get_response_from_serialized(tag: Tag, serialized_item: Arc<[u8]>) -> Self {
187        Message::GetResponse {
188            tag,
189            serialized_item,
190        }
191    }
192}
193
194impl Debug for Message {
195    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
196        match self {
197            Message::Consensus(c) => f.debug_tuple("Consensus").field(&c).finish(),
198            Message::ConsensusRequest(c) => f.debug_tuple("ConsensusRequest").field(&c).finish(),
199            Message::BlockGossiper(dg) => f.debug_tuple("BlockGossiper").field(&dg).finish(),
200            Message::TransactionGossiper(dg) => f.debug_tuple("DeployGossiper").field(&dg).finish(),
201            Message::FinalitySignatureGossiper(sig) => f
202                .debug_tuple("FinalitySignatureGossiper")
203                .field(&sig)
204                .finish(),
205            Message::AddressGossiper(ga) => f.debug_tuple("AddressGossiper").field(&ga).finish(),
206            Message::GetRequest { tag, serialized_id } => f
207                .debug_struct("GetRequest")
208                .field("tag", tag)
209                .field("serialized_id", &HexFmt(serialized_id))
210                .finish(),
211            Message::GetResponse {
212                tag,
213                serialized_item,
214            } => f
215                .debug_struct("GetResponse")
216                .field("tag", tag)
217                .field(
218                    "serialized_item",
219                    &format!("{} bytes", serialized_item.len()),
220                )
221                .finish(),
222            Message::FinalitySignature(fs) => {
223                f.debug_tuple("FinalitySignature").field(&fs).finish()
224            }
225        }
226    }
227}
228mod specimen_support {
229    use crate::utils::specimen::{
230        largest_get_request, largest_get_response, largest_variant, Cache, LargestSpecimen,
231        SizeEstimator,
232    };
233
234    use super::{Message, MessageDiscriminants};
235
236    impl LargestSpecimen for Message {
237        fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
238            largest_variant::<Self, MessageDiscriminants, _, _>(
239                estimator,
240                |variant| match variant {
241                    MessageDiscriminants::Consensus => {
242                        Message::Consensus(LargestSpecimen::largest_specimen(estimator, cache))
243                    }
244                    MessageDiscriminants::ConsensusRequest => Message::ConsensusRequest(
245                        LargestSpecimen::largest_specimen(estimator, cache),
246                    ),
247                    MessageDiscriminants::BlockGossiper => {
248                        Message::BlockGossiper(LargestSpecimen::largest_specimen(estimator, cache))
249                    }
250                    MessageDiscriminants::TransactionGossiper => Message::TransactionGossiper(
251                        LargestSpecimen::largest_specimen(estimator, cache),
252                    ),
253                    MessageDiscriminants::FinalitySignatureGossiper => {
254                        Message::FinalitySignatureGossiper(LargestSpecimen::largest_specimen(
255                            estimator, cache,
256                        ))
257                    }
258                    MessageDiscriminants::AddressGossiper => Message::AddressGossiper(
259                        LargestSpecimen::largest_specimen(estimator, cache),
260                    ),
261                    MessageDiscriminants::GetRequest => largest_get_request(estimator, cache),
262                    MessageDiscriminants::GetResponse => largest_get_response(estimator, cache),
263                    MessageDiscriminants::FinalitySignature => Message::FinalitySignature(
264                        LargestSpecimen::largest_specimen(estimator, cache),
265                    ),
266                },
267            )
268        }
269    }
270}
271
272impl Display for Message {
273    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
274        match self {
275            Message::Consensus(consensus) => write!(f, "Consensus::{}", consensus),
276            Message::ConsensusRequest(consensus) => write!(f, "ConsensusRequest({})", consensus),
277            Message::BlockGossiper(deploy) => write!(f, "BlockGossiper::{}", deploy),
278            Message::TransactionGossiper(txn) => write!(f, "TransactionGossiper::{}", txn),
279            Message::FinalitySignatureGossiper(sig) => {
280                write!(f, "FinalitySignatureGossiper::{}", sig)
281            }
282            Message::AddressGossiper(gossiped_address) => {
283                write!(f, "AddressGossiper::({})", gossiped_address)
284            }
285            Message::GetRequest { tag, serialized_id } => {
286                write!(f, "GetRequest({}-{:10})", tag, HexFmt(serialized_id))
287            }
288            Message::GetResponse {
289                tag,
290                serialized_item,
291            } => write!(f, "GetResponse({}-{:10})", tag, HexFmt(serialized_item)),
292            Message::FinalitySignature(fs) => {
293                write!(f, "FinalitySignature::({})", fs)
294            }
295        }
296    }
297}
298
299impl<REv> FromIncoming<Message> for REv
300where
301    REv: From<ConsensusMessageIncoming>
302        + From<ConsensusDemand>
303        + From<GossiperIncoming<BlockV2>>
304        + From<GossiperIncoming<Transaction>>
305        + From<GossiperIncoming<FinalitySignatureV2>>
306        + From<GossiperIncoming<GossipedAddress>>
307        + From<NetRequestIncoming>
308        + From<NetResponseIncoming>
309        + From<TrieRequestIncoming>
310        + From<TrieDemand>
311        + From<TrieResponseIncoming>
312        + From<FinalitySignatureIncoming>,
313{
314    fn from_incoming(sender: NodeId, payload: Message) -> Self {
315        match payload {
316            Message::Consensus(message) => ConsensusMessageIncoming {
317                sender,
318                message: Box::new(message),
319            }
320            .into(),
321            Message::ConsensusRequest(_message) => {
322                // TODO: Remove this once from_incoming and try_demand_from_incoming are unified.
323                unreachable!("called from_incoming with a consensus request")
324            }
325            Message::BlockGossiper(message) => GossiperIncoming {
326                sender,
327                message: Box::new(message),
328            }
329            .into(),
330            Message::TransactionGossiper(message) => GossiperIncoming {
331                sender,
332                message: Box::new(message),
333            }
334            .into(),
335            Message::FinalitySignatureGossiper(message) => GossiperIncoming {
336                sender,
337                message: Box::new(message),
338            }
339            .into(),
340            Message::AddressGossiper(message) => GossiperIncoming {
341                sender,
342                message: Box::new(message),
343            }
344            .into(),
345            Message::GetRequest { tag, serialized_id } => match tag {
346                Tag::Transaction => NetRequestIncoming {
347                    sender,
348                    message: Box::new(NetRequest::Transaction(serialized_id)),
349                }
350                .into(),
351                Tag::LegacyDeploy => NetRequestIncoming {
352                    sender,
353                    message: Box::new(NetRequest::LegacyDeploy(serialized_id)),
354                }
355                .into(),
356                Tag::Block => NetRequestIncoming {
357                    sender,
358                    message: Box::new(NetRequest::Block(serialized_id)),
359                }
360                .into(),
361                Tag::BlockHeader => NetRequestIncoming {
362                    sender,
363                    message: Box::new(NetRequest::BlockHeader(serialized_id)),
364                }
365                .into(),
366                Tag::TrieOrChunk => TrieRequestIncoming {
367                    sender,
368                    message: Box::new(TrieRequest(serialized_id)),
369                }
370                .into(),
371                Tag::FinalitySignature => NetRequestIncoming {
372                    sender,
373                    message: Box::new(NetRequest::FinalitySignature(serialized_id)),
374                }
375                .into(),
376                Tag::SyncLeap => NetRequestIncoming {
377                    sender,
378                    message: Box::new(NetRequest::SyncLeap(serialized_id)),
379                }
380                .into(),
381                Tag::ApprovalsHashes => NetRequestIncoming {
382                    sender,
383                    message: Box::new(NetRequest::ApprovalsHashes(serialized_id)),
384                }
385                .into(),
386                Tag::BlockExecutionResults => NetRequestIncoming {
387                    sender,
388                    message: Box::new(NetRequest::BlockExecutionResults(serialized_id)),
389                }
390                .into(),
391            },
392            Message::GetResponse {
393                tag,
394                serialized_item,
395            } => match tag {
396                Tag::Transaction => NetResponseIncoming {
397                    sender,
398                    message: Box::new(NetResponse::Transaction(serialized_item)),
399                }
400                .into(),
401                Tag::LegacyDeploy => NetResponseIncoming {
402                    sender,
403                    message: Box::new(NetResponse::LegacyDeploy(serialized_item)),
404                }
405                .into(),
406                Tag::Block => NetResponseIncoming {
407                    sender,
408                    message: Box::new(NetResponse::Block(serialized_item)),
409                }
410                .into(),
411                Tag::BlockHeader => NetResponseIncoming {
412                    sender,
413                    message: Box::new(NetResponse::BlockHeader(serialized_item)),
414                }
415                .into(),
416                Tag::TrieOrChunk => TrieResponseIncoming {
417                    sender,
418                    message: Box::new(TrieResponse(serialized_item.to_vec())),
419                }
420                .into(),
421                Tag::FinalitySignature => NetResponseIncoming {
422                    sender,
423                    message: Box::new(NetResponse::FinalitySignature(serialized_item)),
424                }
425                .into(),
426                Tag::SyncLeap => NetResponseIncoming {
427                    sender,
428                    message: Box::new(NetResponse::SyncLeap(serialized_item)),
429                }
430                .into(),
431                Tag::ApprovalsHashes => NetResponseIncoming {
432                    sender,
433                    message: Box::new(NetResponse::ApprovalsHashes(serialized_item)),
434                }
435                .into(),
436                Tag::BlockExecutionResults => NetResponseIncoming {
437                    sender,
438                    message: Box::new(NetResponse::BlockExecutionResults(serialized_item)),
439                }
440                .into(),
441            },
442            Message::FinalitySignature(message) => {
443                FinalitySignatureIncoming { sender, message }.into()
444            }
445        }
446    }
447
448    fn try_demand_from_incoming(
449        effect_builder: EffectBuilder<REv>,
450        sender: NodeId,
451        payload: Message,
452    ) -> Result<(Self, BoxFuture<'static, Option<Message>>), Message>
453    where
454        Self: Sized + Send,
455    {
456        match payload {
457            Message::GetRequest {
458                tag: Tag::TrieOrChunk,
459                serialized_id,
460            } => {
461                let (ev, fut) = effect_builder.create_request_parts(move |responder| TrieDemand {
462                    sender,
463                    request_msg: Box::new(TrieRequest(serialized_id)),
464                    auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
465                });
466
467                Ok((ev, fut.boxed()))
468            }
469            Message::ConsensusRequest(request_msg) => {
470                let (ev, fut) =
471                    effect_builder.create_request_parts(move |responder| ConsensusDemand {
472                        sender,
473                        request_msg: Box::new(request_msg),
474                        auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
475                    });
476
477                Ok((ev, fut.boxed()))
478            }
479            _ => Err(payload),
480        }
481    }
482}