polkadot_node_network_protocol/request_response/
outgoing.rs1use futures::{channel::oneshot, prelude::Future, FutureExt};
18
19use codec::{Decode, Encode, Error as DecodingError};
20use network::ProtocolName;
21
22use sc_network as network;
23use sc_network_types::PeerId;
24
25use polkadot_primitives::AuthorityDiscoveryId;
26
27use super::{v1, v2, IsRequest, Protocol};
28
29#[derive(Debug)]
31pub enum Requests {
32 ChunkFetching(OutgoingRequest<v2::ChunkFetchingRequest, v1::ChunkFetchingRequest>),
34 CollationFetchingV1(OutgoingRequest<v1::CollationFetchingRequest>),
36 PoVFetchingV1(OutgoingRequest<v1::PoVFetchingRequest>),
38 AvailableDataFetchingV1(OutgoingRequest<v1::AvailableDataFetchingRequest>),
40 DisputeSendingV1(OutgoingRequest<v1::DisputeRequest>),
42
43 AttestedCandidateV2(OutgoingRequest<v2::AttestedCandidateRequest>),
45 CollationFetchingV2(OutgoingRequest<v2::CollationFetchingRequest>),
48}
49
50impl Requests {
51 pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
59 match self {
60 Self::ChunkFetching(r) => r.encode_request(),
61 Self::CollationFetchingV1(r) => r.encode_request(),
62 Self::CollationFetchingV2(r) => r.encode_request(),
63 Self::PoVFetchingV1(r) => r.encode_request(),
64 Self::AvailableDataFetchingV1(r) => r.encode_request(),
65 Self::DisputeSendingV1(r) => r.encode_request(),
66 Self::AttestedCandidateV2(r) => r.encode_request(),
67 }
68 }
69}
70
71pub type ResponseSender = oneshot::Sender<Result<(Vec<u8>, ProtocolName), network::RequestFailure>>;
73
74#[derive(Debug, thiserror::Error)]
76pub enum RequestError {
77 #[error("Response could not be decoded: {0}")]
79 InvalidResponse(#[from] DecodingError),
80
81 #[error("{0}")]
83 NetworkError(#[from] network::RequestFailure),
84
85 #[error("Response channel got canceled")]
87 Canceled(#[from] oneshot::Canceled),
88}
89
90impl RequestError {
91 pub fn is_timed_out(&self) -> bool {
93 match self {
94 Self::Canceled(_) |
95 Self::NetworkError(network::RequestFailure::Obsolete) |
96 Self::NetworkError(network::RequestFailure::Network(
97 network::OutboundFailure::Timeout,
98 )) => true,
99 _ => false,
100 }
101 }
102}
103
104#[derive(Debug)]
115pub struct OutgoingRequest<Req, FallbackReq = Req> {
116 pub peer: Recipient,
118 pub payload: Req,
120 pub fallback_request: Option<(FallbackReq, Protocol)>,
122 pub pending_response: ResponseSender,
124}
125
126#[derive(Debug, Eq, Hash, PartialEq, Clone)]
128pub enum Recipient {
129 Peer(PeerId),
131 Authority(AuthorityDiscoveryId),
133}
134
135pub type OutgoingResult<Res> = Result<Res, RequestError>;
137
138impl<Req, FallbackReq> OutgoingRequest<Req, FallbackReq>
139where
140 Req: IsRequest + Encode,
141 Req::Response: Decode,
142 FallbackReq: IsRequest + Encode,
143 FallbackReq::Response: Decode,
144{
145 pub fn new(
150 peer: Recipient,
151 payload: Req,
152 ) -> (Self, impl Future<Output = OutgoingResult<Req::Response>>) {
153 let (tx, rx) = oneshot::channel();
154 let r = Self { peer, payload, pending_response: tx, fallback_request: None };
155 (r, receive_response::<Req>(rx.map(|r| r.map(|r| r.map(|(resp, _)| resp)))))
156 }
157
158 pub fn new_with_fallback(
165 peer: Recipient,
166 payload: Req,
167 fallback_request: FallbackReq,
168 ) -> (Self, impl Future<Output = OutgoingResult<(Vec<u8>, ProtocolName)>>) {
169 let (tx, rx) = oneshot::channel();
170 let r = Self {
171 peer,
172 payload,
173 pending_response: tx,
174 fallback_request: Some((fallback_request, FallbackReq::PROTOCOL)),
175 };
176 (r, async { Ok(rx.await??) })
177 }
178
179 pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
184 let OutgoingRequest { peer, payload, pending_response, fallback_request } = self;
185 let encoded = OutgoingRequest {
186 peer,
187 payload: payload.encode(),
188 fallback_request: fallback_request.map(|(r, p)| (r.encode(), p)),
189 pending_response,
190 };
191 (Req::PROTOCOL, encoded)
192 }
193}
194
195async fn receive_response<Req>(
197 rec: impl Future<Output = Result<Result<Vec<u8>, network::RequestFailure>, oneshot::Canceled>>,
198) -> OutgoingResult<Req::Response>
199where
200 Req: IsRequest,
201 Req::Response: Decode,
202{
203 let raw = rec.await??;
204 Ok(Decode::decode(&mut raw.as_ref())?)
205}