1#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3#![allow(async_fn_in_trait)]
4
5use std::io::{Error as IoError, Result as IoResult};
6use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
7
8pub mod cfg;
9pub mod testing;
10pub mod types;
11
12mod buf;
13mod dispatcher;
14mod filter;
15mod flags;
16mod framed;
17mod io;
18mod ioref;
19mod macros;
20mod seal;
21mod tasks;
22mod timer;
23mod utils;
24
25use ntex_codec::{Decoder, Encoder};
26
27pub use self::buf::{FilterCtx, ReadBuf, WriteBuf};
28pub use self::cfg::IoConfig;
29pub use self::dispatcher::Dispatcher;
30pub use self::filter::{Base, Filter, FilterReadStatus, Layer};
31pub use self::framed::Framed;
32pub use self::io::{Io, IoRef, OnDisconnect};
33pub use self::seal::{IoBoxed, Sealed};
34pub use self::tasks::IoContext;
35pub use self::timer::TimerHandle;
36pub use self::utils::{Decoded, seal};
37
38#[doc(hidden)]
39pub use self::flags::Flags;
40
41#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
43pub enum Readiness {
44 Ready,
46 Shutdown,
48 Terminate,
50}
51
52impl Readiness {
53 pub fn merge(val1: Poll<Readiness>, val2: Poll<Readiness>) -> Poll<Readiness> {
55 match val1 {
56 Poll::Pending => Poll::Pending,
57 Poll::Ready(Readiness::Ready) => val2,
58 Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
59 Poll::Ready(Readiness::Shutdown) => {
60 if val2 == Poll::Ready(Readiness::Terminate) {
61 Poll::Ready(Readiness::Terminate)
62 } else {
63 Poll::Ready(Readiness::Shutdown)
64 }
65 }
66 }
67 }
68}
69
70#[allow(unused_variables)]
71pub trait FilterLayer: fmt::Debug + 'static {
72 #[inline]
73 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
75 Poll::Ready(Readiness::Ready)
76 }
77
78 #[inline]
79 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
81 Poll::Ready(Readiness::Ready)
82 }
83
84 fn process_read_buf(&self, buf: &ReadBuf<'_>) -> IoResult<usize>;
89
90 fn process_write_buf(&self, buf: &WriteBuf<'_>) -> IoResult<()>;
92
93 #[inline]
94 fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
96 None
97 }
98
99 #[inline]
100 fn shutdown(&self, buf: &WriteBuf<'_>) -> IoResult<Poll<()>> {
102 Ok(Poll::Ready(()))
103 }
104}
105
106pub trait IoStream {
107 fn start(self, _: IoContext) -> Option<Box<dyn Handle>>;
108}
109
110pub trait Handle {
111 fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
112}
113
114#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
116pub enum IoTaskStatus {
117 Io,
119 Pause,
121 Stop,
123}
124
125impl IoTaskStatus {
126 #[inline]
127 pub fn ready(self) -> bool {
129 self == IoTaskStatus::Io
130 }
131}
132
133#[derive(Debug)]
135pub enum IoStatusUpdate {
136 KeepAlive,
138 WriteBackpressure,
140 PeerGone(Option<IoError>),
142}
143
144pub enum RecvError<U: Decoder> {
146 KeepAlive,
148 WriteBackpressure,
150 Decoder(U::Error),
152 PeerGone(Option<IoError>),
154}
155
156impl<U> fmt::Debug for RecvError<U>
157where
158 U: Decoder,
159 <U as Decoder>::Error: fmt::Debug,
160{
161 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
162 match *self {
163 RecvError::KeepAlive => {
164 write!(fmt, "RecvError::KeepAlive")
165 }
166 RecvError::WriteBackpressure => {
167 write!(fmt, "RecvError::WriteBackpressure")
168 }
169 RecvError::Decoder(ref e) => {
170 write!(fmt, "RecvError::Decoder({e:?})")
171 }
172 RecvError::PeerGone(ref e) => {
173 write!(fmt, "RecvError::PeerGone({e:?})")
174 }
175 }
176 }
177}
178
179pub enum DispatchItem<U: Encoder + Decoder> {
181 Item(<U as Decoder>::Item),
182 WBackPressureEnabled,
184 WBackPressureDisabled,
186 KeepAliveTimeout,
188 ReadTimeout,
190 DecoderError(<U as Decoder>::Error),
192 EncoderError(<U as Encoder>::Error),
194 Disconnect(Option<IoError>),
196}
197
198impl<U> fmt::Debug for DispatchItem<U>
199where
200 U: Encoder + Decoder,
201 <U as Decoder>::Item: fmt::Debug,
202{
203 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
204 match *self {
205 DispatchItem::Item(ref item) => {
206 write!(fmt, "DispatchItem::Item({item:?})")
207 }
208 DispatchItem::WBackPressureEnabled => {
209 write!(fmt, "DispatchItem::WBackPressureEnabled")
210 }
211 DispatchItem::WBackPressureDisabled => {
212 write!(fmt, "DispatchItem::WBackPressureDisabled")
213 }
214 DispatchItem::KeepAliveTimeout => {
215 write!(fmt, "DispatchItem::KeepAliveTimeout")
216 }
217 DispatchItem::ReadTimeout => {
218 write!(fmt, "DispatchItem::ReadTimeout")
219 }
220 DispatchItem::EncoderError(ref e) => {
221 write!(fmt, "DispatchItem::EncoderError({e:?})")
222 }
223 DispatchItem::DecoderError(ref e) => {
224 write!(fmt, "DispatchItem::DecoderError({e:?})")
225 }
226 DispatchItem::Disconnect(ref e) => {
227 write!(fmt, "DispatchItem::Disconnect({e:?})")
228 }
229 }
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use ntex_codec::BytesCodec;
237 use std::io;
238
239 #[test]
240 fn test_fmt() {
241 type T = DispatchItem<BytesCodec>;
242
243 let err = T::EncoderError(io::Error::other("err"));
244 assert!(format!("{err:?}").contains("DispatchItem::Encoder"));
245 let err = T::DecoderError(io::Error::other("err"));
246 assert!(format!("{err:?}").contains("DispatchItem::Decoder"));
247 let err = T::Disconnect(Some(io::Error::other("err")));
248 assert!(format!("{err:?}").contains("DispatchItem::Disconnect"));
249
250 assert!(
251 format!("{:?}", T::WBackPressureEnabled)
252 .contains("DispatchItem::WBackPressureEnabled")
253 );
254 assert!(
255 format!("{:?}", T::WBackPressureDisabled)
256 .contains("DispatchItem::WBackPressureDisabled")
257 );
258 assert!(
259 format!("{:?}", T::KeepAliveTimeout).contains("DispatchItem::KeepAliveTimeout")
260 );
261 assert!(format!("{:?}", T::ReadTimeout).contains("DispatchItem::ReadTimeout"));
262
263 assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
264 assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
265 assert!(
266 format!("{:?}", RecvError::<BytesCodec>::WriteBackpressure)
267 .contains("WriteBackpressure")
268 );
269 assert!(
270 format!(
271 "{:?}",
272 RecvError::<BytesCodec>::Decoder(io::Error::other("err"))
273 )
274 .contains("RecvError::Decoder")
275 );
276 assert!(
277 format!(
278 "{:?}",
279 RecvError::<BytesCodec>::PeerGone(Some(io::Error::other("err")))
280 )
281 .contains("RecvError::PeerGone")
282 );
283 }
284}