fuel_core_p2p/request_response/
messages.rs1use crate::service::TaskError;
2use fuel_core_types::{
3 blockchain::SealedBlockHeader,
4 fuel_tx::TxId,
5 services::p2p::{
6 NetworkableTransactionPool,
7 Transactions,
8 },
9};
10use libp2p::{
11 PeerId,
12 request_response::OutboundFailure,
13};
14use serde::{
15 Deserialize,
16 Serialize,
17};
18use std::ops::Range;
19use thiserror::Error;
20use tokio::sync::oneshot;
21
22#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)]
23pub enum RequestMessage {
24 SealedHeaders(Range<u32>),
25 Transactions(Range<u32>),
26 TxPoolAllTransactionsIds,
27 TxPoolFullTransactions(Vec<TxId>),
28}
29
30#[derive(Error, Debug, Clone, Serialize, Deserialize)]
31pub enum ResponseMessageErrorCode {
32 #[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")]
34 ProtocolV1EmptyResponse = 0,
35 #[error("The requested range is too large")]
36 RequestedRangeTooLarge = 1,
37 #[error("Timeout while processing request")]
38 Timeout = 2,
39 #[error("Sync processor is out of capacity")]
40 SyncProcessorOutOfCapacity = 3,
41 #[error("The peer sent an unknown error code")]
42 #[serde(skip_serializing, other)]
43 Unknown,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum V1ResponseMessage {
48 SealedHeaders(Option<Vec<SealedBlockHeader>>),
49 Transactions(Option<Vec<Transactions>>),
50 TxPoolAllTransactionsIds(Option<Vec<TxId>>),
51 TxPoolFullTransactions(Option<Vec<Option<NetworkableTransactionPool>>>),
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum V2ResponseMessage {
56 SealedHeaders(Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>),
57 Transactions(Result<Vec<Transactions>, ResponseMessageErrorCode>),
58 TxPoolAllTransactionsIds(Result<Vec<TxId>, ResponseMessageErrorCode>),
59 TxPoolFullTransactions(
60 Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
61 ),
62}
63
64impl From<V1ResponseMessage> for V2ResponseMessage {
65 fn from(v1_response: V1ResponseMessage) -> Self {
66 match v1_response {
67 V1ResponseMessage::SealedHeaders(sealed_headers) => {
68 V2ResponseMessage::SealedHeaders(
69 sealed_headers
70 .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
71 )
72 }
73 V1ResponseMessage::Transactions(vec) => V2ResponseMessage::Transactions(
74 vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
75 ),
76 V1ResponseMessage::TxPoolAllTransactionsIds(vec) => {
77 V2ResponseMessage::TxPoolAllTransactionsIds(
78 vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
79 )
80 }
81 V1ResponseMessage::TxPoolFullTransactions(vec) => {
82 V2ResponseMessage::TxPoolFullTransactions(
83 vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
84 )
85 }
86 }
87 }
88}
89
90impl From<V2ResponseMessage> for V1ResponseMessage {
91 fn from(response: V2ResponseMessage) -> Self {
92 match response {
93 V2ResponseMessage::SealedHeaders(sealed_headers) => {
94 V1ResponseMessage::SealedHeaders(sealed_headers.ok())
95 }
96 V2ResponseMessage::Transactions(transactions) => {
97 V1ResponseMessage::Transactions(transactions.ok())
98 }
99 V2ResponseMessage::TxPoolAllTransactionsIds(tx_ids) => {
100 V1ResponseMessage::TxPoolAllTransactionsIds(tx_ids.ok())
101 }
102 V2ResponseMessage::TxPoolFullTransactions(tx_pool) => {
103 V1ResponseMessage::TxPoolFullTransactions(tx_pool.ok())
104 }
105 }
106 }
107}
108
109pub type OnResponse<T> = oneshot::Sender<(PeerId, Result<T, ResponseError>)>;
110pub type OnResponseWithPeerSelection<T> =
113 oneshot::Sender<Result<(PeerId, Result<T, ResponseError>), TaskError>>;
114
115#[derive(Debug)]
116pub enum ResponseSender {
117 SealedHeaders(
118 OnResponseWithPeerSelection<
119 Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
120 >,
121 ),
122 Transactions(
123 OnResponseWithPeerSelection<Result<Vec<Transactions>, ResponseMessageErrorCode>>,
124 ),
125 TransactionsFromPeer(OnResponse<Result<Vec<Transactions>, ResponseMessageErrorCode>>),
126
127 TxPoolAllTransactionsIds(OnResponse<Result<Vec<TxId>, ResponseMessageErrorCode>>),
128 TxPoolFullTransactions(
129 OnResponse<
130 Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
131 >,
132 ),
133}
134
135#[derive(Debug, Error)]
136pub enum RequestError {
137 #[error("Not currently connected to any peers")]
138 NoPeersConnected,
139}
140
141#[derive(Debug, Error)]
142pub enum ResponseError {
143 #[error("P2P outbound error {0}")]
145 P2P(OutboundFailure),
146 #[error("Peer response message was of incorrect type")]
148 TypeMismatch,
149}
150
151#[derive(Debug, Eq, PartialEq, Error)]
153pub enum ResponseSendError {
154 #[error("Response channel does not exist")]
155 ResponseChannelDoesNotExist,
156 #[error("Failed to send response")]
157 SendingResponseFailed,
158 #[error("Failed to convert response to intermediate format")]
159 ConversionToIntermediateFailed,
160}
161
162#[cfg(test)]
163#[allow(non_snake_case)]
164mod tests {
165 use super::ResponseMessageErrorCode;
166
167 #[test]
168 fn response_message_error_code__unknown_error_cannot_be_serialized() {
169 let error = super::ResponseMessageErrorCode::Unknown;
170 let serialized = postcard::to_allocvec(&error);
171 assert!(serialized.is_err());
172 }
173
174 #[test]
175 fn response_message_error_code__known_error_code_is_deserialized_to_variant() {
176 let serialized_error_code =
177 postcard::to_stdvec(&ResponseMessageErrorCode::ProtocolV1EmptyResponse)
178 .unwrap();
179 println!("Error code: {:?}", serialized_error_code);
180 let response_message_error_code: ResponseMessageErrorCode =
181 postcard::from_bytes(&serialized_error_code).unwrap();
182 assert!(matches!(
183 response_message_error_code,
184 ResponseMessageErrorCode::ProtocolV1EmptyResponse
185 ));
186 }
187
188 #[test]
189 fn response_message_error_code__unknown_error_code_is_deserialized_to_unknown_variant()
190 {
191 let serialized_error_code = vec![42];
192 let response_message_error_code: ResponseMessageErrorCode =
193 postcard::from_bytes(&serialized_error_code).unwrap();
194 assert!(matches!(
195 response_message_error_code,
196 ResponseMessageErrorCode::Unknown
197 ));
198 }
199}