volans_swarm/
handler.rs

1mod dummy;
2mod either;
3mod map;
4mod multi;
5mod pending;
6mod select;
7
8pub use dummy::DummyHandler;
9pub use map::{MapAction, MapEvent};
10pub use pending::PendingConnectionHandler;
11pub use select::ConnectionHandlerSelect;
12
13use std::{
14    fmt,
15    task::{Context, Poll},
16    time::Duration,
17};
18
19use ::either::Either;
20
21use crate::{InboundUpgradeSend, OutboundUpgradeSend};
22
23pub trait ConnectionHandler: Send + 'static {
24    type Action: fmt::Debug + Send + 'static;
25    type Event: fmt::Debug + Send + 'static;
26
27    fn handle_action(&mut self, action: Self::Action);
28
29    fn connection_keep_alive(&self) -> bool {
30        false
31    }
32
33    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::Event>> {
34        Poll::Ready(None)
35    }
36
37    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ConnectionHandlerEvent<Self::Event>>;
38
39    fn select<H>(self, other: H) -> ConnectionHandlerSelect<Self, H>
40    where
41        H: ConnectionHandler,
42        Self: Sized,
43    {
44        ConnectionHandlerSelect::select(self, other)
45    }
46
47    fn map_event<O, F>(self, map: F) -> MapEvent<Self, F>
48    where
49        Self: Sized,
50        O: fmt::Debug + Send + 'static,
51        F: FnMut(Self::Event) -> O + Send + 'static,
52    {
53        MapEvent::new(self, map)
54    }
55
56    fn map_action<O, F>(self, map: F) -> MapAction<Self, O, F>
57    where
58        Self: Sized,
59        O: fmt::Debug + Send + 'static,
60        F: FnMut(O) -> Option<Self::Action> + Send + 'static,
61    {
62        MapAction::new(self, map)
63    }
64}
65
66pub trait InboundStreamHandler: ConnectionHandler {
67    type InboundUpgrade: InboundUpgradeSend;
68    type InboundUserData: Send + 'static;
69
70    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundUpgrade, Self::InboundUserData>;
71
72    fn on_fully_negotiated(
73        &mut self,
74        user_data: Self::InboundUserData,
75        protocol: <Self::InboundUpgrade as InboundUpgradeSend>::Output,
76    );
77
78    fn on_upgrade_error(
79        &mut self,
80        user_data: Self::InboundUserData,
81        error: <Self::InboundUpgrade as InboundUpgradeSend>::Error,
82    );
83}
84
85pub trait OutboundStreamHandler: ConnectionHandler {
86    type OutboundUpgrade: OutboundUpgradeSend;
87    type OutboundUserData: Send + 'static;
88
89    fn on_fully_negotiated(
90        &mut self,
91        user_data: Self::OutboundUserData,
92        protocol: <Self::OutboundUpgrade as OutboundUpgradeSend>::Output,
93    );
94
95    fn on_upgrade_error(
96        &mut self,
97        user_data: Self::OutboundUserData,
98        error: StreamUpgradeError<<Self::OutboundUpgrade as OutboundUpgradeSend>::Error>,
99    );
100
101    fn poll_outbound_request(
102        &mut self,
103        cx: &mut Context<'_>,
104    ) -> Poll<SubstreamProtocol<Self::OutboundUpgrade, Self::OutboundUserData>>;
105}
106
107#[derive(Copy, Clone, Debug, PartialEq, Eq)]
108pub struct SubstreamProtocol<TUpgr, TData> {
109    upgrade: TUpgr,
110    timeout: Duration,
111    user_data: TData,
112}
113
114impl<TUpgr, TData> SubstreamProtocol<TUpgr, TData> {
115    pub fn new(upgrade: TUpgr, data: TData) -> Self {
116        Self {
117            upgrade,
118            timeout: Duration::from_secs(5),
119            user_data: data,
120        }
121    }
122
123    pub fn upgrade(&self) -> &TUpgr {
124        &self.upgrade
125    }
126
127    pub fn timeout(&self) -> &Duration {
128        &self.timeout
129    }
130
131    pub fn with_timeout(mut self, timeout: Duration) -> Self {
132        self.timeout = timeout;
133        self
134    }
135
136    pub fn into_inner(self) -> (TUpgr, TData, Duration) {
137        (self.upgrade, self.user_data, self.timeout)
138    }
139
140    pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TData>
141    where
142        F: FnOnce(TUpgr) -> U,
143    {
144        SubstreamProtocol {
145            upgrade: f(self.upgrade),
146            user_data: self.user_data,
147            timeout: self.timeout,
148        }
149    }
150
151    pub fn map_user_data<U, F>(self, f: F) -> SubstreamProtocol<TUpgr, U>
152    where
153        F: FnOnce(TData) -> U,
154    {
155        SubstreamProtocol {
156            upgrade: self.upgrade,
157            user_data: f(self.user_data),
158            timeout: self.timeout,
159        }
160    }
161}
162
163#[derive(Debug)]
164pub enum StreamUpgradeError<TUpgrErr> {
165    Timeout,
166    Apply(TUpgrErr),
167    NegotiationFailed,
168    Io(std::io::Error),
169}
170
171impl<TUpgrErr> StreamUpgradeError<TUpgrErr> {
172    pub fn map_upgrade_err<F, E>(self, f: F) -> StreamUpgradeError<E>
173    where
174        F: FnOnce(TUpgrErr) -> E,
175    {
176        match self {
177            StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
178            StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(f(e)),
179            StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
180            StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
181        }
182    }
183}
184
185impl<TErr1, TErr2> StreamUpgradeError<Either<TErr1, TErr2>> {
186    pub fn transpose_left(self) -> StreamUpgradeError<TErr1> {
187        match self {
188            StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
189            StreamUpgradeError::Apply(e) => {
190                StreamUpgradeError::Apply(e.left().expect("StreamUpgradeError Left error expected"))
191            }
192            StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
193            StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
194        }
195    }
196
197    fn transpose_right(self) -> StreamUpgradeError<TErr2> {
198        match self {
199            StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
200            StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(
201                e.right().expect("StreamUpgradeError Right error expected"),
202            ),
203            StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
204            StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
205        }
206    }
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210#[non_exhaustive]
211pub enum ConnectionHandlerEvent<TCustom> {
212    Notify(TCustom),
213    CloseConnection,
214}
215
216impl<TCustom> ConnectionHandlerEvent<TCustom> {
217    pub fn map_event<O, F>(self, f: F) -> ConnectionHandlerEvent<O>
218    where
219        F: FnOnce(TCustom) -> O,
220    {
221        match self {
222            ConnectionHandlerEvent::Notify(event) => ConnectionHandlerEvent::Notify(f(event)),
223            ConnectionHandlerEvent::CloseConnection => ConnectionHandlerEvent::CloseConnection,
224        }
225    }
226}