1pub mod codec;
2
3pub mod client;
4pub mod server;
5
6use std::{
7 convert::Infallible,
8 fmt, io,
9 sync::atomic::{AtomicUsize, Ordering},
10 time::Duration,
11};
12
13pub use codec::Codec;
14use futures::{channel::oneshot, future};
15use smallvec::SmallVec;
16use volans_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
17use volans_swarm::Substream;
18
19const NEXT_REQUEST_ID: AtomicUsize = AtomicUsize::new(0);
20
21#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
22pub struct RequestId(usize);
23
24impl RequestId {
25 pub(crate) fn next() -> Self {
26 RequestId(NEXT_REQUEST_ID.fetch_add(1, Ordering::SeqCst))
27 }
28}
29
30impl fmt::Display for RequestId {
31 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
32 write!(f, "{}", self.0)
33 }
34}
35
36#[derive(Debug)]
37pub struct Upgrade<P> {
38 pub(crate) protocols: SmallVec<[P; 2]>,
39}
40
41impl<P> Upgrade<P>
42where
43 P: AsRef<str> + Clone,
44{
45 pub fn new(protocols: SmallVec<[P; 2]>) -> Self {
46 Self { protocols }
47 }
48
49 pub fn new_single(protocol: P) -> Self {
50 Self {
51 protocols: SmallVec::from_vec(vec![protocol]),
52 }
53 }
54}
55
56impl<P> UpgradeInfo for Upgrade<P>
57where
58 P: AsRef<str> + Clone,
59{
60 type Info = P;
61 type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
62
63 fn protocol_info(&self) -> Self::InfoIter {
64 self.protocols.clone().into_iter()
65 }
66}
67
68impl<P> InboundUpgrade<Substream> for Upgrade<P>
69where
70 P: AsRef<str> + Clone,
71{
72 type Output = (Substream, P);
73 type Error = Infallible;
74 type Future = future::Ready<Result<Self::Output, Self::Error>>;
75
76 fn upgrade_inbound(self, io: Substream, protocol: Self::Info) -> Self::Future {
77 future::ready(Ok((io, protocol)))
78 }
79}
80
81impl<P> OutboundUpgrade<Substream> for Upgrade<P>
82where
83 P: AsRef<str> + Clone,
84{
85 type Output = (Substream, P);
86 type Error = Infallible;
87 type Future = future::Ready<Result<Self::Output, Self::Error>>;
88
89 fn upgrade_outbound(self, io: Substream, protocol: Self::Info) -> Self::Future {
90 future::ready(Ok((io, protocol)))
91 }
92}
93
94#[derive(Debug)]
95pub struct Responder<TResponse> {
96 tx: oneshot::Sender<TResponse>,
97}
98
99impl<TResponse> Responder<TResponse> {
100 pub fn send_response(self, response: TResponse) -> Result<(), TResponse> {
101 self.tx.send(response)
102 }
103}
104
105#[derive(Debug, Clone)]
106pub struct Config {
107 request_timeout: Duration,
108 }
110
111impl Default for Config {
112 fn default() -> Self {
113 Self {
114 request_timeout: Duration::from_secs(30),
115 }
117 }
118}
119
120#[derive(Debug, thiserror::Error)]
121pub enum OutboundFailure {
122 #[error("Failed to dial the remote peer")]
123 DialFailure,
124 #[error("Timeout waiting for the response")]
125 Timeout,
126 #[error("Connection closed before response was received")]
127 ConnectionClosed,
128 #[error("Unsupported protocol for request")]
129 UnsupportedProtocols,
130 #[error("I/O error: {0}")]
131 Io(#[from] io::Error),
132}
133
134#[derive(Debug, thiserror::Error)]
135pub enum InboundFailure {
136 #[error("Request timeout")]
137 Timeout,
138 #[error("Connection closed before response was received")]
139 ConnectionClosed,
140 #[error("Unsupported protocol for request")]
141 UnsupportedProtocols,
142 #[error("Response was dropped before it could be sent")]
143 Discard,
144 #[error("I/O error: {0}")]
145 Io(#[from] io::Error),
146}
147
148impl From<InboundFailure> for io::Error {
149 fn from(err: InboundFailure) -> Self {
150 match err {
151 InboundFailure::Timeout => io::Error::new(io::ErrorKind::TimedOut, err),
152 InboundFailure::ConnectionClosed => io::Error::new(io::ErrorKind::UnexpectedEof, err),
153 InboundFailure::UnsupportedProtocols => io::Error::new(io::ErrorKind::Other, err),
154 InboundFailure::Discard => io::Error::new(io::ErrorKind::Other, err),
155 InboundFailure::Io(e) => e,
156 }
157 }
158}
159
160impl From<OutboundFailure> for io::Error {
161 fn from(err: OutboundFailure) -> Self {
162 match err {
163 OutboundFailure::DialFailure => io::Error::new(io::ErrorKind::ConnectionRefused, err),
164 OutboundFailure::Timeout => io::Error::new(io::ErrorKind::TimedOut, err),
165 OutboundFailure::ConnectionClosed => io::Error::new(io::ErrorKind::UnexpectedEof, err),
166 OutboundFailure::UnsupportedProtocols => io::Error::new(io::ErrorKind::Other, err),
167 OutboundFailure::Io(e) => e,
168 }
169 }
170}