Skip to main content

fuel_core_p2p/request_response/
messages.rs

1use crate::service::TaskError;
2use fuel_core_types::{
3    blockchain::SealedBlockHeader,
4    fuel_tx::TxId,
5    services::p2p::{
6        NetworkableTransactionPool,
7        Transactions,
8    },
9};
10use libp2p::{
11    PeerId,
12    request_response::OutboundFailure,
13};
14use serde::{
15    Deserialize,
16    Serialize,
17};
18use std::ops::Range;
19use thiserror::Error;
20use tokio::sync::oneshot;
21
22#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)]
23pub enum RequestMessage {
24    SealedHeaders(Range<u32>),
25    Transactions(Range<u32>),
26    TxPoolAllTransactionsIds,
27    TxPoolFullTransactions(Vec<TxId>),
28}
29
30#[derive(Error, Debug, Clone, Serialize, Deserialize)]
31pub enum ResponseMessageErrorCode {
32    /// The peer sent an empty response using protocol `/fuel/req_res/0.0.1`
33    #[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")]
34    ProtocolV1EmptyResponse = 0,
35    #[error("The requested range is too large")]
36    RequestedRangeTooLarge = 1,
37    #[error("Timeout while processing request")]
38    Timeout = 2,
39    #[error("Sync processor is out of capacity")]
40    SyncProcessorOutOfCapacity = 3,
41    #[error("The peer sent an unknown error code")]
42    #[serde(skip_serializing, other)]
43    Unknown,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum V1ResponseMessage {
48    SealedHeaders(Option<Vec<SealedBlockHeader>>),
49    Transactions(Option<Vec<Transactions>>),
50    TxPoolAllTransactionsIds(Option<Vec<TxId>>),
51    TxPoolFullTransactions(Option<Vec<Option<NetworkableTransactionPool>>>),
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum V2ResponseMessage {
56    SealedHeaders(Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>),
57    Transactions(Result<Vec<Transactions>, ResponseMessageErrorCode>),
58    TxPoolAllTransactionsIds(Result<Vec<TxId>, ResponseMessageErrorCode>),
59    TxPoolFullTransactions(
60        Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
61    ),
62}
63
64impl From<V1ResponseMessage> for V2ResponseMessage {
65    fn from(v1_response: V1ResponseMessage) -> Self {
66        match v1_response {
67            V1ResponseMessage::SealedHeaders(sealed_headers) => {
68                V2ResponseMessage::SealedHeaders(
69                    sealed_headers
70                        .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
71                )
72            }
73            V1ResponseMessage::Transactions(vec) => V2ResponseMessage::Transactions(
74                vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
75            ),
76            V1ResponseMessage::TxPoolAllTransactionsIds(vec) => {
77                V2ResponseMessage::TxPoolAllTransactionsIds(
78                    vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
79                )
80            }
81            V1ResponseMessage::TxPoolFullTransactions(vec) => {
82                V2ResponseMessage::TxPoolFullTransactions(
83                    vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
84                )
85            }
86        }
87    }
88}
89
90impl From<V2ResponseMessage> for V1ResponseMessage {
91    fn from(response: V2ResponseMessage) -> Self {
92        match response {
93            V2ResponseMessage::SealedHeaders(sealed_headers) => {
94                V1ResponseMessage::SealedHeaders(sealed_headers.ok())
95            }
96            V2ResponseMessage::Transactions(transactions) => {
97                V1ResponseMessage::Transactions(transactions.ok())
98            }
99            V2ResponseMessage::TxPoolAllTransactionsIds(tx_ids) => {
100                V1ResponseMessage::TxPoolAllTransactionsIds(tx_ids.ok())
101            }
102            V2ResponseMessage::TxPoolFullTransactions(tx_pool) => {
103                V1ResponseMessage::TxPoolFullTransactions(tx_pool.ok())
104            }
105        }
106    }
107}
108
109pub type OnResponse<T> = oneshot::Sender<(PeerId, Result<T, ResponseError>)>;
110// This type is more complex because it's used in tasks that need to select a peer to send the request and this
111// can cause errors where the peer is not defined.
112pub type OnResponseWithPeerSelection<T> =
113    oneshot::Sender<Result<(PeerId, Result<T, ResponseError>), TaskError>>;
114
115#[derive(Debug)]
116pub enum ResponseSender {
117    SealedHeaders(
118        OnResponseWithPeerSelection<
119            Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
120        >,
121    ),
122    Transactions(
123        OnResponseWithPeerSelection<Result<Vec<Transactions>, ResponseMessageErrorCode>>,
124    ),
125    TransactionsFromPeer(OnResponse<Result<Vec<Transactions>, ResponseMessageErrorCode>>),
126
127    TxPoolAllTransactionsIds(OnResponse<Result<Vec<TxId>, ResponseMessageErrorCode>>),
128    TxPoolFullTransactions(
129        OnResponse<
130            Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
131        >,
132    ),
133}
134
135#[derive(Debug, Error)]
136pub enum RequestError {
137    #[error("Not currently connected to any peers")]
138    NoPeersConnected,
139}
140
141#[derive(Debug, Error)]
142pub enum ResponseError {
143    /// This is the raw error from `libp2p-request-response`.
144    #[error("P2P outbound error {0}")]
145    P2P(OutboundFailure),
146    /// The peer responded with an invalid response type
147    #[error("Peer response message was of incorrect type")]
148    TypeMismatch,
149}
150
151/// Errors than can occur when attempting to send a response
152#[derive(Debug, Eq, PartialEq, Error)]
153pub enum ResponseSendError {
154    #[error("Response channel does not exist")]
155    ResponseChannelDoesNotExist,
156    #[error("Failed to send response")]
157    SendingResponseFailed,
158    #[error("Failed to convert response to intermediate format")]
159    ConversionToIntermediateFailed,
160}
161
162#[cfg(test)]
163#[allow(non_snake_case)]
164mod tests {
165    use super::ResponseMessageErrorCode;
166
167    #[test]
168    fn response_message_error_code__unknown_error_cannot_be_serialized() {
169        let error = super::ResponseMessageErrorCode::Unknown;
170        let serialized = postcard::to_allocvec(&error);
171        assert!(serialized.is_err());
172    }
173
174    #[test]
175    fn response_message_error_code__known_error_code_is_deserialized_to_variant() {
176        let serialized_error_code =
177            postcard::to_stdvec(&ResponseMessageErrorCode::ProtocolV1EmptyResponse)
178                .unwrap();
179        println!("Error code: {:?}", serialized_error_code);
180        let response_message_error_code: ResponseMessageErrorCode =
181            postcard::from_bytes(&serialized_error_code).unwrap();
182        assert!(matches!(
183            response_message_error_code,
184            ResponseMessageErrorCode::ProtocolV1EmptyResponse
185        ));
186    }
187
188    #[test]
189    fn response_message_error_code__unknown_error_code_is_deserialized_to_unknown_variant()
190     {
191        let serialized_error_code = vec![42];
192        let response_message_error_code: ResponseMessageErrorCode =
193            postcard::from_bytes(&serialized_error_code).unwrap();
194        assert!(matches!(
195            response_message_error_code,
196            ResponseMessageErrorCode::Unknown
197        ));
198    }
199}