kaspa_p2p_lib/
common.rs

1use crate::{convert::error::ConversionError, core::peer::PeerKey, KaspadMessagePayloadType};
2use kaspa_consensus_core::errors::{block::RuleError, consensus::ConsensusError, pruning::PruningImportError};
3use kaspa_mining_errors::manager::MiningManagerError;
4use std::time::Duration;
5use thiserror::Error;
6
7/// Default P2P communication timeout
8pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120); // 2 minutes
9
10#[derive(Error, Debug, Clone)]
11pub enum ProtocolError {
12    #[error("timeout expired after {0:?}")]
13    Timeout(Duration),
14
15    #[error("P2P protocol version mismatch - local: {0}, remote: {1}")]
16    VersionMismatch(u32, u32),
17
18    #[error("Network mismatch - local: {0}, remote: {1}")]
19    WrongNetwork(String, String),
20
21    #[error("expected message type/s {0} but got {1:?}")]
22    UnexpectedMessage(&'static str, Option<KaspadMessagePayloadType>),
23
24    #[error("{0}")]
25    ConversionError(#[from] ConversionError),
26
27    #[error("{0}")]
28    RuleError(#[from] RuleError),
29
30    #[error("{0}")]
31    PruningImportError(#[from] PruningImportError),
32
33    #[error("{0}")]
34    ConsensusError(#[from] ConsensusError),
35
36    // TODO: discuss if such an error type makes sense here
37    #[error("{0}")]
38    MiningManagerError(#[from] MiningManagerError),
39
40    #[error("{0}")]
41    IdentityError(#[from] uuid::Error),
42
43    #[error("{0}")]
44    Other(&'static str),
45
46    #[error("{0}")]
47    OtherOwned(String),
48
49    #[error("misbehaving peer: {0}")]
50    MisbehavingPeer(String),
51
52    #[error("peer connection is closed")]
53    ConnectionClosed,
54
55    #[error("incoming route capacity for message type {0:?} has been reached (peer: {1})")]
56    IncomingRouteCapacityReached(KaspadMessagePayloadType, String),
57
58    #[error("outgoing route capacity has been reached (peer: {0})")]
59    OutgoingRouteCapacityReached(String),
60
61    #[error("no flow has been registered for message type {0:?}")]
62    NoRouteForMessageType(KaspadMessagePayloadType),
63
64    #[error("peer {0} already exists")]
65    PeerAlreadyExists(PeerKey),
66
67    #[error("loopback connection - node is connecting to itself")]
68    LoopbackConnection(PeerKey),
69
70    #[error("got reject message: {0}")]
71    Rejected(String),
72
73    #[error("got reject message: {0}")]
74    IgnorableReject(String),
75}
76
77/// String used as a P2P convention to signal connection is rejected because we are connecting to ourselves
78const LOOPBACK_CONNECTION_MESSAGE: &str = "LOOPBACK_CONNECTION";
79
80/// String used as a P2P convention to signal connection is rejected because the peer already exists
81const DUPLICATE_CONNECTION_MESSAGE: &str = "DUPLICATE_CONNECTION";
82
83impl ProtocolError {
84    pub fn is_connection_closed_error(&self) -> bool {
85        matches!(self, Self::ConnectionClosed)
86    }
87
88    pub fn can_send_outgoing_message(&self) -> bool {
89        !matches!(self, Self::ConnectionClosed | Self::OutgoingRouteCapacityReached(_))
90    }
91
92    pub fn to_reject_message(&self) -> String {
93        match self {
94            Self::LoopbackConnection(_) => LOOPBACK_CONNECTION_MESSAGE.to_owned(),
95            Self::PeerAlreadyExists(_) => DUPLICATE_CONNECTION_MESSAGE.to_owned(),
96            err => err.to_string(),
97        }
98    }
99
100    pub fn from_reject_message(reason: String) -> Self {
101        if reason == LOOPBACK_CONNECTION_MESSAGE || reason == DUPLICATE_CONNECTION_MESSAGE {
102            ProtocolError::IgnorableReject(reason)
103        } else if reason.contains("cannot find full block") {
104            let hint = "Hint: If this error persists, it might be due to the other peer having pruned block data after syncing headers and UTXOs. In such a case, you may need to reset the database.";
105            let detailed_reason = format!("{}. {}", reason, hint);
106            ProtocolError::Rejected(detailed_reason)
107        } else {
108            ProtocolError::Rejected(reason)
109        }
110    }
111}
112
113/// Wraps an inner payload message into a valid `KaspadMessage`.
114/// Usage:
115/// ```ignore
116/// let msg = make_message!(Payload::Verack, verack_msg)
117/// ```
118#[macro_export]
119macro_rules! make_message {
120    ($pattern:path, $msg:expr) => {{
121        $crate::pb::KaspadMessage {
122            payload: Some($pattern($msg)),
123            response_id: $crate::BLANK_ROUTE_ID,
124            request_id: $crate::BLANK_ROUTE_ID,
125        }
126    }};
127
128    ($pattern:path, $msg:expr, $response_id:expr, $request_id: expr) => {{
129        $crate::pb::KaspadMessage { payload: Some($pattern($msg)), response_id: $response_id, request_id: $request_id }
130    }};
131}
132
133#[macro_export]
134macro_rules! make_response {
135    ($pattern:path, $msg:expr, $response_id:expr) => {{
136        $crate::pb::KaspadMessage { payload: Some($pattern($msg)), response_id: $response_id, request_id: 0 }
137    }};
138}
139
140#[macro_export]
141macro_rules! make_request {
142    ($pattern:path, $msg:expr, $request_id:expr) => {{
143        $crate::pb::KaspadMessage { payload: Some($pattern($msg)), response_id: 0, request_id: $request_id }
144    }};
145}
146
147/// Macro to extract a specific payload type from an `Option<pb::KaspadMessage>`.
148/// Usage:
149/// ```ignore
150/// let res = unwrap_message!(op, Payload::Verack)
151/// ```
152#[macro_export]
153macro_rules! unwrap_message {
154    ($op:expr, $pattern:path) => {{
155        if let Some(msg) = $op {
156            if let Some($pattern(inner_msg)) = msg.payload {
157                Ok(inner_msg)
158            } else {
159                Err($crate::common::ProtocolError::UnexpectedMessage(stringify!($pattern), msg.payload.as_ref().map(|v| v.into())))
160            }
161        } else {
162            Err($crate::common::ProtocolError::ConnectionClosed)
163        }
164    }};
165}
166
167#[macro_export]
168macro_rules! unwrap_message_with_request_id {
169    ($op:expr, $pattern:path) => {{
170        if let Some(msg) = $op {
171            if let Some($pattern(inner_msg)) = msg.payload {
172                Ok((inner_msg, msg.request_id))
173            } else {
174                Err($crate::common::ProtocolError::UnexpectedMessage(stringify!($pattern), msg.payload.as_ref().map(|v| v.into())))
175            }
176        } else {
177            Err($crate::common::ProtocolError::ConnectionClosed)
178        }
179    }};
180}
181
182/// Macro to await a channel `Receiver<pb::KaspadMessage>::recv` call with a default/specified timeout and expect a specific payload type.
183/// Usage:
184/// ```ignore
185/// let res = dequeue_with_timeout!(receiver, Payload::Verack) // Uses the default timeout
186/// // or:
187/// let res = dequeue_with_timeout!(receiver, Payload::Verack, Duration::from_secs(30))
188/// ```
189#[macro_export]
190macro_rules! dequeue_with_timeout {
191    ($receiver:expr, $pattern:path) => {{
192        match tokio::time::timeout($crate::common::DEFAULT_TIMEOUT, $receiver.recv()).await {
193            Ok(op) => {
194                $crate::unwrap_message!(op, $pattern)
195            }
196            Err(_) => Err($crate::common::ProtocolError::Timeout($crate::common::DEFAULT_TIMEOUT)),
197        }
198    }};
199    ($receiver:expr, $pattern:path, $timeout_duration:expr) => {{
200        match tokio::time::timeout($timeout_duration, $receiver.recv()).await {
201            Ok(op) => {
202                $crate::unwrap_message!(op, $pattern)
203            }
204            Err(_) => Err($crate::common::ProtocolError::Timeout($timeout_duration)),
205        }
206    }};
207}
208
209/// Macro to indefinitely await a channel `Receiver<pb::KaspadMessage>::recv` call and expect a specific payload type (without a timeout).
210/// Usage:
211/// ```ignore
212/// let res = dequeue!(receiver, Payload::Verack)
213/// ```
214#[macro_export]
215macro_rules! dequeue {
216    ($receiver:expr, $pattern:path) => {{
217        $crate::unwrap_message!($receiver.recv().await, $pattern)
218    }};
219}
220
221#[macro_export]
222macro_rules! dequeue_with_request_id {
223    ($receiver:expr, $pattern:path) => {{
224        $crate::unwrap_message_with_request_id!($receiver.recv().await, $pattern)
225    }};
226}