1mod dummy;
2mod either;
3mod map;
4mod multi;
5mod pending;
6mod select;
7
8pub use dummy::DummyHandler;
9pub use map::{MapAction, MapEvent};
10pub use pending::PendingConnectionHandler;
11pub use select::ConnectionHandlerSelect;
12
13use std::{
14 fmt,
15 task::{Context, Poll},
16 time::Duration,
17};
18
19use ::either::Either;
20
21use crate::{InboundUpgradeSend, OutboundUpgradeSend};
22
23pub trait ConnectionHandler: Send + 'static {
24 type Action: fmt::Debug + Send + 'static;
25 type Event: fmt::Debug + Send + 'static;
26
27 fn handle_action(&mut self, action: Self::Action);
28
29 fn connection_keep_alive(&self) -> bool {
30 false
31 }
32
33 fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::Event>> {
34 Poll::Ready(None)
35 }
36
37 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ConnectionHandlerEvent<Self::Event>>;
38
39 fn select<H>(self, other: H) -> ConnectionHandlerSelect<Self, H>
40 where
41 H: ConnectionHandler,
42 Self: Sized,
43 {
44 ConnectionHandlerSelect::select(self, other)
45 }
46
47 fn map_event<O, F>(self, map: F) -> MapEvent<Self, F>
48 where
49 Self: Sized,
50 O: fmt::Debug + Send + 'static,
51 F: FnMut(Self::Event) -> O + Send + 'static,
52 {
53 MapEvent::new(self, map)
54 }
55
56 fn map_action<O, F>(self, map: F) -> MapAction<Self, O, F>
57 where
58 Self: Sized,
59 O: fmt::Debug + Send + 'static,
60 F: FnMut(O) -> Option<Self::Action> + Send + 'static,
61 {
62 MapAction::new(self, map)
63 }
64}
65
66pub trait InboundStreamHandler: ConnectionHandler {
67 type InboundUpgrade: InboundUpgradeSend;
68 type InboundUserData: Send + 'static;
69
70 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundUpgrade, Self::InboundUserData>;
71
72 fn on_fully_negotiated(
73 &mut self,
74 user_data: Self::InboundUserData,
75 protocol: <Self::InboundUpgrade as InboundUpgradeSend>::Output,
76 );
77
78 fn on_upgrade_error(
79 &mut self,
80 user_data: Self::InboundUserData,
81 error: <Self::InboundUpgrade as InboundUpgradeSend>::Error,
82 );
83}
84
85pub trait OutboundStreamHandler: ConnectionHandler {
86 type OutboundUpgrade: OutboundUpgradeSend;
87 type OutboundUserData: Send + 'static;
88
89 fn on_fully_negotiated(
90 &mut self,
91 user_data: Self::OutboundUserData,
92 protocol: <Self::OutboundUpgrade as OutboundUpgradeSend>::Output,
93 );
94
95 fn on_upgrade_error(
96 &mut self,
97 user_data: Self::OutboundUserData,
98 error: StreamUpgradeError<<Self::OutboundUpgrade as OutboundUpgradeSend>::Error>,
99 );
100
101 fn poll_outbound_request(
102 &mut self,
103 cx: &mut Context<'_>,
104 ) -> Poll<SubstreamProtocol<Self::OutboundUpgrade, Self::OutboundUserData>>;
105}
106
107#[derive(Copy, Clone, Debug, PartialEq, Eq)]
108pub struct SubstreamProtocol<TUpgr, TData> {
109 upgrade: TUpgr,
110 timeout: Duration,
111 user_data: TData,
112}
113
114impl<TUpgr, TData> SubstreamProtocol<TUpgr, TData> {
115 pub fn new(upgrade: TUpgr, data: TData) -> Self {
116 Self {
117 upgrade,
118 timeout: Duration::from_secs(5),
119 user_data: data,
120 }
121 }
122
123 pub fn upgrade(&self) -> &TUpgr {
124 &self.upgrade
125 }
126
127 pub fn timeout(&self) -> &Duration {
128 &self.timeout
129 }
130
131 pub fn with_timeout(mut self, timeout: Duration) -> Self {
132 self.timeout = timeout;
133 self
134 }
135
136 pub fn into_inner(self) -> (TUpgr, TData, Duration) {
137 (self.upgrade, self.user_data, self.timeout)
138 }
139
140 pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TData>
141 where
142 F: FnOnce(TUpgr) -> U,
143 {
144 SubstreamProtocol {
145 upgrade: f(self.upgrade),
146 user_data: self.user_data,
147 timeout: self.timeout,
148 }
149 }
150
151 pub fn map_user_data<U, F>(self, f: F) -> SubstreamProtocol<TUpgr, U>
152 where
153 F: FnOnce(TData) -> U,
154 {
155 SubstreamProtocol {
156 upgrade: self.upgrade,
157 user_data: f(self.user_data),
158 timeout: self.timeout,
159 }
160 }
161}
162
163#[derive(Debug)]
164pub enum StreamUpgradeError<TUpgrErr> {
165 Timeout,
166 Apply(TUpgrErr),
167 NegotiationFailed,
168 Io(std::io::Error),
169}
170
171impl<TUpgrErr> StreamUpgradeError<TUpgrErr> {
172 pub fn map_upgrade_err<F, E>(self, f: F) -> StreamUpgradeError<E>
173 where
174 F: FnOnce(TUpgrErr) -> E,
175 {
176 match self {
177 StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
178 StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(f(e)),
179 StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
180 StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
181 }
182 }
183}
184
185impl<TErr1, TErr2> StreamUpgradeError<Either<TErr1, TErr2>> {
186 pub fn transpose_left(self) -> StreamUpgradeError<TErr1> {
187 match self {
188 StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
189 StreamUpgradeError::Apply(e) => {
190 StreamUpgradeError::Apply(e.left().expect("StreamUpgradeError Left error expected"))
191 }
192 StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
193 StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
194 }
195 }
196
197 fn transpose_right(self) -> StreamUpgradeError<TErr2> {
198 match self {
199 StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
200 StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(
201 e.right().expect("StreamUpgradeError Right error expected"),
202 ),
203 StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
204 StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
205 }
206 }
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210#[non_exhaustive]
211pub enum ConnectionHandlerEvent<TCustom> {
212 Notify(TCustom),
213 CloseConnection,
214}
215
216impl<TCustom> ConnectionHandlerEvent<TCustom> {
217 pub fn map_event<O, F>(self, f: F) -> ConnectionHandlerEvent<O>
218 where
219 F: FnOnce(TCustom) -> O,
220 {
221 match self {
222 ConnectionHandlerEvent::Notify(event) => ConnectionHandlerEvent::Notify(f(event)),
223 ConnectionHandlerEvent::CloseConnection => ConnectionHandlerEvent::CloseConnection,
224 }
225 }
226}