volans_request/
lib.rs

1pub mod codec;
2
3pub mod client;
4pub mod server;
5
6use std::{
7    convert::Infallible,
8    fmt, io,
9    sync::atomic::{AtomicUsize, Ordering},
10    time::Duration,
11};
12
13pub use codec::Codec;
14use futures::{channel::oneshot, future};
15use smallvec::SmallVec;
16use volans_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
17use volans_swarm::Substream;
18
19const NEXT_REQUEST_ID: AtomicUsize = AtomicUsize::new(0);
20
21#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
22pub struct RequestId(usize);
23
24impl RequestId {
25    pub(crate) fn next() -> Self {
26        RequestId(NEXT_REQUEST_ID.fetch_add(1, Ordering::SeqCst))
27    }
28}
29
30impl fmt::Display for RequestId {
31    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
32        write!(f, "{}", self.0)
33    }
34}
35
36#[derive(Debug)]
37pub struct Upgrade<P> {
38    pub(crate) protocols: SmallVec<[P; 2]>,
39}
40
41impl<P> Upgrade<P>
42where
43    P: AsRef<str> + Clone,
44{
45    pub fn new(protocols: SmallVec<[P; 2]>) -> Self {
46        Self { protocols }
47    }
48
49    pub fn new_single(protocol: P) -> Self {
50        Self {
51            protocols: SmallVec::from_vec(vec![protocol]),
52        }
53    }
54}
55
56impl<P> UpgradeInfo for Upgrade<P>
57where
58    P: AsRef<str> + Clone,
59{
60    type Info = P;
61    type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
62
63    fn protocol_info(&self) -> Self::InfoIter {
64        self.protocols.clone().into_iter()
65    }
66}
67
68impl<P> InboundUpgrade<Substream> for Upgrade<P>
69where
70    P: AsRef<str> + Clone,
71{
72    type Output = (Substream, P);
73    type Error = Infallible;
74    type Future = future::Ready<Result<Self::Output, Self::Error>>;
75
76    fn upgrade_inbound(self, io: Substream, protocol: Self::Info) -> Self::Future {
77        future::ready(Ok((io, protocol)))
78    }
79}
80
81impl<P> OutboundUpgrade<Substream> for Upgrade<P>
82where
83    P: AsRef<str> + Clone,
84{
85    type Output = (Substream, P);
86    type Error = Infallible;
87    type Future = future::Ready<Result<Self::Output, Self::Error>>;
88
89    fn upgrade_outbound(self, io: Substream, protocol: Self::Info) -> Self::Future {
90        future::ready(Ok((io, protocol)))
91    }
92}
93
94#[derive(Debug)]
95pub struct Responder<TResponse> {
96    tx: oneshot::Sender<TResponse>,
97}
98
99impl<TResponse> Responder<TResponse> {
100    pub fn send_response(self, response: TResponse) -> Result<(), TResponse> {
101        self.tx.send(response)
102    }
103}
104
105#[derive(Debug, Clone)]
106pub struct Config {
107    request_timeout: Duration,
108    // max_concurrent_streams: usize,
109}
110
111impl Default for Config {
112    fn default() -> Self {
113        Self {
114            request_timeout: Duration::from_secs(30),
115            // max_concurrent_streams: 100,
116        }
117    }
118}
119
120#[derive(Debug, thiserror::Error)]
121pub enum OutboundFailure {
122    #[error("Failed to dial the remote peer")]
123    DialFailure,
124    #[error("Timeout waiting for the response")]
125    Timeout,
126    #[error("Connection closed before response was received")]
127    ConnectionClosed,
128    #[error("Unsupported protocol for request")]
129    UnsupportedProtocols,
130    #[error("I/O error: {0}")]
131    Io(#[from] io::Error),
132}
133
134#[derive(Debug, thiserror::Error)]
135pub enum InboundFailure {
136    #[error("Request timeout")]
137    Timeout,
138    #[error("Connection closed before response was received")]
139    ConnectionClosed,
140    #[error("Unsupported protocol for request")]
141    UnsupportedProtocols,
142    #[error("Response was dropped before it could be sent")]
143    Discard,
144    #[error("I/O error: {0}")]
145    Io(#[from] io::Error),
146}
147
148impl From<InboundFailure> for io::Error {
149    fn from(err: InboundFailure) -> Self {
150        match err {
151            InboundFailure::Timeout => io::Error::new(io::ErrorKind::TimedOut, err),
152            InboundFailure::ConnectionClosed => io::Error::new(io::ErrorKind::UnexpectedEof, err),
153            InboundFailure::UnsupportedProtocols => io::Error::new(io::ErrorKind::Other, err),
154            InboundFailure::Discard => io::Error::new(io::ErrorKind::Other, err),
155            InboundFailure::Io(e) => e,
156        }
157    }
158}
159
160impl From<OutboundFailure> for io::Error {
161    fn from(err: OutboundFailure) -> Self {
162        match err {
163            OutboundFailure::DialFailure => io::Error::new(io::ErrorKind::ConnectionRefused, err),
164            OutboundFailure::Timeout => io::Error::new(io::ErrorKind::TimedOut, err),
165            OutboundFailure::ConnectionClosed => io::Error::new(io::ErrorKind::UnexpectedEof, err),
166            OutboundFailure::UnsupportedProtocols => io::Error::new(io::ErrorKind::Other, err),
167            OutboundFailure::Io(e) => e,
168        }
169    }
170}