1#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3#![allow(async_fn_in_trait)]
4
5use std::io::{Error as IoError, Result as IoResult};
6use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
7
8pub mod testing;
9pub mod types;
10
11mod buf;
12mod dispatcher;
13mod filter;
14mod flags;
15mod framed;
16mod io;
17mod ioref;
18mod macros;
19mod seal;
20mod tasks;
21mod timer;
22mod utils;
23
24use ntex_bytes::BytesVec;
25use ntex_codec::{Decoder, Encoder};
26
27pub use self::buf::{FilterCtx, ReadBuf, WriteBuf};
28pub use self::dispatcher::{Dispatcher, DispatcherConfig};
29pub use self::filter::{Base, Filter, FilterReadStatus, Layer};
30pub use self::framed::Framed;
31pub use self::io::{Io, IoRef, OnDisconnect};
32pub use self::seal::{IoBoxed, Sealed};
33pub use self::tasks::{IoContext, ReadContext, WriteContext, WriteContextBuf};
34pub use self::timer::TimerHandle;
35pub use self::utils::{seal, Decoded};
36
37#[doc(hidden)]
38pub use self::flags::Flags;
39
40#[doc(hidden)]
41pub trait AsyncRead {
42 async fn read(&mut self, buf: BytesVec) -> (BytesVec, IoResult<usize>);
43}
44
45#[doc(hidden)]
46pub trait AsyncWrite {
47 async fn write(&mut self, buf: &mut WriteContextBuf) -> IoResult<()>;
48
49 async fn flush(&mut self) -> IoResult<()>;
50
51 async fn shutdown(&mut self) -> IoResult<()>;
52}
53
54#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
56pub enum Readiness {
57 Ready,
59 Shutdown,
61 Terminate,
63}
64
65impl Readiness {
66 pub fn merge(val1: Poll<Readiness>, val2: Poll<Readiness>) -> Poll<Readiness> {
68 match val1 {
69 Poll::Pending => Poll::Pending,
70 Poll::Ready(Readiness::Ready) => val2,
71 Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
72 Poll::Ready(Readiness::Shutdown) => {
73 if val2 == Poll::Ready(Readiness::Terminate) {
74 Poll::Ready(Readiness::Terminate)
75 } else {
76 Poll::Ready(Readiness::Shutdown)
77 }
78 }
79 }
80 }
81}
82
83#[allow(unused_variables)]
84pub trait FilterLayer: fmt::Debug + 'static {
85 #[inline]
86 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
88 Poll::Ready(Readiness::Ready)
89 }
90
91 #[inline]
92 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
94 Poll::Ready(Readiness::Ready)
95 }
96
97 fn process_read_buf(&self, buf: &ReadBuf<'_>) -> IoResult<usize>;
102
103 fn process_write_buf(&self, buf: &WriteBuf<'_>) -> IoResult<()>;
105
106 #[inline]
107 fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
109 None
110 }
111
112 #[inline]
113 fn shutdown(&self, buf: &WriteBuf<'_>) -> IoResult<Poll<()>> {
115 Ok(Poll::Ready(()))
116 }
117}
118
119pub trait IoStream {
120 fn start(self, _: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>>;
121}
122
123pub trait Handle {
124 fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
125}
126
127#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
129pub enum IoTaskStatus {
130 Io,
132 Pause,
134}
135
136#[derive(Debug)]
138pub enum IoStatusUpdate {
139 KeepAlive,
141 WriteBackpressure,
143 Stop,
145 PeerGone(Option<IoError>),
147}
148
149#[derive(Debug)]
151pub enum RecvError<U: Decoder> {
152 KeepAlive,
154 WriteBackpressure,
156 Stop,
158 Decoder(U::Error),
160 PeerGone(Option<IoError>),
162}
163
164pub enum DispatchItem<U: Encoder + Decoder> {
166 Item(<U as Decoder>::Item),
167 WBackPressureEnabled,
169 WBackPressureDisabled,
171 KeepAliveTimeout,
173 ReadTimeout,
175 DecoderError(<U as Decoder>::Error),
177 EncoderError(<U as Encoder>::Error),
179 Disconnect(Option<IoError>),
181}
182
183impl<U> fmt::Debug for DispatchItem<U>
184where
185 U: Encoder + Decoder,
186 <U as Decoder>::Item: fmt::Debug,
187{
188 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
189 match *self {
190 DispatchItem::Item(ref item) => {
191 write!(fmt, "DispatchItem::Item({item:?})")
192 }
193 DispatchItem::WBackPressureEnabled => {
194 write!(fmt, "DispatchItem::WBackPressureEnabled")
195 }
196 DispatchItem::WBackPressureDisabled => {
197 write!(fmt, "DispatchItem::WBackPressureDisabled")
198 }
199 DispatchItem::KeepAliveTimeout => {
200 write!(fmt, "DispatchItem::KeepAliveTimeout")
201 }
202 DispatchItem::ReadTimeout => {
203 write!(fmt, "DispatchItem::ReadTimeout")
204 }
205 DispatchItem::EncoderError(ref e) => {
206 write!(fmt, "DispatchItem::EncoderError({e:?})")
207 }
208 DispatchItem::DecoderError(ref e) => {
209 write!(fmt, "DispatchItem::DecoderError({e:?})")
210 }
211 DispatchItem::Disconnect(ref e) => {
212 write!(fmt, "DispatchItem::Disconnect({e:?})")
213 }
214 }
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use ntex_codec::BytesCodec;
222 use std::io;
223
224 #[test]
225 fn test_fmt() {
226 type T = DispatchItem<BytesCodec>;
227
228 let err = T::EncoderError(io::Error::other("err"));
229 assert!(format!("{err:?}").contains("DispatchItem::Encoder"));
230 let err = T::DecoderError(io::Error::other("err"));
231 assert!(format!("{err:?}").contains("DispatchItem::Decoder"));
232 let err = T::Disconnect(Some(io::Error::other("err")));
233 assert!(format!("{err:?}").contains("DispatchItem::Disconnect"));
234
235 assert!(format!("{:?}", T::WBackPressureEnabled)
236 .contains("DispatchItem::WBackPressureEnabled"));
237 assert!(format!("{:?}", T::WBackPressureDisabled)
238 .contains("DispatchItem::WBackPressureDisabled"));
239 assert!(
240 format!("{:?}", T::KeepAliveTimeout).contains("DispatchItem::KeepAliveTimeout")
241 );
242 assert!(format!("{:?}", T::ReadTimeout).contains("DispatchItem::ReadTimeout"));
243
244 assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
245 assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
246 }
247}