use futures::{channel::oneshot, prelude::Future, FutureExt};
use codec::{Decode, Encode, Error as DecodingError};
use network::ProtocolName;
use pezsc_network as network;
use pezsc_network_types::PeerId;
use pezkuwi_primitives::AuthorityDiscoveryId;
use super::{v1, v2, IsRequest, Protocol};
#[derive(Debug)]
pub enum Requests {
ChunkFetching(OutgoingRequest<v2::ChunkFetchingRequest, v1::ChunkFetchingRequest>),
CollationFetchingV1(OutgoingRequest<v1::CollationFetchingRequest>),
PoVFetchingV1(OutgoingRequest<v1::PoVFetchingRequest>),
AvailableDataFetchingV1(OutgoingRequest<v1::AvailableDataFetchingRequest>),
DisputeSendingV1(OutgoingRequest<v1::DisputeRequest>),
AttestedCandidateV2(OutgoingRequest<v2::AttestedCandidateRequest>),
CollationFetchingV2(OutgoingRequest<v2::CollationFetchingRequest>),
}
impl Requests {
pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
match self {
Self::ChunkFetching(r) => r.encode_request(),
Self::CollationFetchingV1(r) => r.encode_request(),
Self::CollationFetchingV2(r) => r.encode_request(),
Self::PoVFetchingV1(r) => r.encode_request(),
Self::AvailableDataFetchingV1(r) => r.encode_request(),
Self::DisputeSendingV1(r) => r.encode_request(),
Self::AttestedCandidateV2(r) => r.encode_request(),
}
}
}
pub type ResponseSender = oneshot::Sender<Result<(Vec<u8>, ProtocolName), network::RequestFailure>>;
#[derive(Debug, thiserror::Error)]
pub enum RequestError {
#[error("Response could not be decoded: {0}")]
InvalidResponse(#[from] DecodingError),
#[error("{0}")]
NetworkError(#[from] network::RequestFailure),
#[error("Response channel got canceled")]
Canceled(#[from] oneshot::Canceled),
}
impl RequestError {
pub fn is_timed_out(&self) -> bool {
match self {
Self::Canceled(_)
| Self::NetworkError(network::RequestFailure::Obsolete)
| Self::NetworkError(network::RequestFailure::Network(
network::OutboundFailure::Timeout,
)) => true,
_ => false,
}
}
}
#[derive(Debug)]
pub struct OutgoingRequest<Req, FallbackReq = Req> {
pub peer: Recipient,
pub payload: Req,
pub fallback_request: Option<(FallbackReq, Protocol)>,
pub pending_response: ResponseSender,
}
#[derive(Debug, Eq, Hash, PartialEq, Clone)]
pub enum Recipient {
Peer(PeerId),
Authority(AuthorityDiscoveryId),
}
pub type OutgoingResult<Res> = Result<Res, RequestError>;
impl<Req, FallbackReq> OutgoingRequest<Req, FallbackReq>
where
Req: IsRequest + Encode,
Req::Response: Decode,
FallbackReq: IsRequest + Encode,
FallbackReq::Response: Decode,
{
pub fn new(
peer: Recipient,
payload: Req,
) -> (Self, impl Future<Output = OutgoingResult<Req::Response>>) {
let (tx, rx) = oneshot::channel();
let r = Self { peer, payload, pending_response: tx, fallback_request: None };
(r, receive_response::<Req>(rx.map(|r| r.map(|r| r.map(|(resp, _)| resp)))))
}
pub fn new_with_fallback(
peer: Recipient,
payload: Req,
fallback_request: FallbackReq,
) -> (Self, impl Future<Output = OutgoingResult<(Vec<u8>, ProtocolName)>>) {
let (tx, rx) = oneshot::channel();
let r = Self {
peer,
payload,
pending_response: tx,
fallback_request: Some((fallback_request, FallbackReq::PROTOCOL)),
};
(r, async { Ok(rx.await??) })
}
pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
let OutgoingRequest { peer, payload, pending_response, fallback_request } = self;
let encoded = OutgoingRequest {
peer,
payload: payload.encode(),
fallback_request: fallback_request.map(|(r, p)| (r.encode(), p)),
pending_response,
};
(Req::PROTOCOL, encoded)
}
}
async fn receive_response<Req>(
rec: impl Future<Output = Result<Result<Vec<u8>, network::RequestFailure>, oneshot::Canceled>>,
) -> OutgoingResult<Req::Response>
where
Req: IsRequest,
Req::Response: Decode,
{
let raw = rec.await??;
Ok(Decode::decode(&mut raw.as_ref())?)
}