use std::marker::PhantomData;
use futures::{channel::oneshot, StreamExt};
use codec::{Decode, Encode};
use pezsc_network::{config as netconfig, NetworkBackend};
use pezsc_network_types::PeerId;
use pezsp_runtime::traits::Block;
use super::{IsRequest, ReqProtocolNames};
use crate::UnifiedReputationChange;
mod error;
pub use error::{Error, FatalError, JfyiError, Result};
#[derive(Debug)]
pub struct IncomingRequest<Req> {
pub peer: PeerId,
pub payload: Req,
pub pending_response: OutgoingResponseSender<Req>,
}
impl<Req> IncomingRequest<Req>
where
Req: IsRequest + Decode + Encode,
Req::Response: Encode,
{
pub fn get_config_receiver<B: Block, N: NetworkBackend<B, <B as Block>::Hash>>(
req_protocol_names: &ReqProtocolNames,
) -> (IncomingRequestReceiver<Req>, N::RequestResponseProtocolConfig) {
let (raw, cfg) = Req::PROTOCOL.get_config::<B, N>(req_protocol_names);
(IncomingRequestReceiver { raw, phantom: PhantomData {} }, cfg)
}
pub fn new(
peer: PeerId,
payload: Req,
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
) -> Self {
Self {
peer,
payload,
pending_response: OutgoingResponseSender { pending_response, phantom: PhantomData {} },
}
}
fn try_from_raw(
raw: pezsc_network::config::IncomingRequest,
reputation_changes: Vec<UnifiedReputationChange>,
) -> std::result::Result<Self, JfyiError> {
let pezsc_network::config::IncomingRequest { payload, peer, pending_response } = raw;
let payload = match Req::decode(&mut payload.as_ref()) {
Ok(payload) => payload,
Err(err) => {
let reputation_changes = reputation_changes.into_iter().map(|r| r.into()).collect();
let response = pezsc_network::config::OutgoingResponse {
result: Err(()),
reputation_changes,
sent_feedback: None,
};
if let Err(_) = pending_response.send(response) {
return Err(JfyiError::DecodingErrorNoReputationChange(peer, err));
}
return Err(JfyiError::DecodingError(peer, err));
},
};
Ok(Self::new(peer, payload, pending_response))
}
pub fn into_raw(self) -> pezsc_network::config::IncomingRequest {
pezsc_network::config::IncomingRequest {
peer: self.peer,
payload: self.payload.encode(),
pending_response: self.pending_response.pending_response,
}
}
pub fn send_response(self, resp: Req::Response) -> std::result::Result<(), Req::Response> {
self.pending_response.send_response(resp)
}
pub fn send_outgoing_response(
self,
resp: OutgoingResponse<<Req as IsRequest>::Response>,
) -> std::result::Result<(), ()> {
self.pending_response.send_outgoing_response(resp)
}
}
#[derive(Debug)]
pub struct OutgoingResponseSender<Req> {
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
phantom: PhantomData<Req>,
}
impl<Req> OutgoingResponseSender<Req>
where
Req: IsRequest + Decode,
Req::Response: Encode,
{
pub fn send_response(self, resp: Req::Response) -> std::result::Result<(), Req::Response> {
self.pending_response
.send(netconfig::OutgoingResponse {
result: Ok(resp.encode()),
reputation_changes: Vec::new(),
sent_feedback: None,
})
.map_err(|_| resp)
}
pub fn send_outgoing_response(
self,
resp: OutgoingResponse<<Req as IsRequest>::Response>,
) -> std::result::Result<(), ()> {
let OutgoingResponse { result, reputation_changes, sent_feedback } = resp;
let response = netconfig::OutgoingResponse {
result: result.map(|v| v.encode()),
reputation_changes: reputation_changes.into_iter().map(|c| c.into()).collect(),
sent_feedback,
};
self.pending_response.send(response).map_err(|_| ())
}
}
pub struct OutgoingResponse<Response> {
pub result: std::result::Result<Response, ()>,
pub reputation_changes: Vec<UnifiedReputationChange>,
pub sent_feedback: Option<oneshot::Sender<()>>,
}
pub struct IncomingRequestReceiver<Req> {
raw: async_channel::Receiver<netconfig::IncomingRequest>,
phantom: PhantomData<Req>,
}
impl<Req> IncomingRequestReceiver<Req>
where
Req: IsRequest + Decode + Encode,
Req::Response: Encode,
{
pub async fn recv<F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<Req>>
where
F: FnOnce() -> Vec<UnifiedReputationChange>,
{
let req = match self.raw.next().await {
None => return Err(FatalError::RequestChannelExhausted.into()),
Some(raw) => IncomingRequest::<Req>::try_from_raw(raw, reputation_changes())?,
};
Ok(req)
}
}