1#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3#![allow(async_fn_in_trait)]
4
5use std::io::{Error as IoError, Result as IoResult};
6use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
7
8pub mod testing;
9pub mod types;
10
11mod buf;
12mod dispatcher;
13mod filter;
14mod flags;
15mod framed;
16mod io;
17mod ioref;
18mod seal;
19mod tasks;
20mod timer;
21mod utils;
22
23use ntex_bytes::BytesVec;
24use ntex_codec::{Decoder, Encoder};
25
26pub use self::buf::{ReadBuf, WriteBuf};
27pub use self::dispatcher::{Dispatcher, DispatcherConfig};
28pub use self::filter::{Base, Filter, Layer};
29pub use self::framed::Framed;
30pub use self::io::{Io, IoRef, OnDisconnect};
31pub use self::seal::{IoBoxed, Sealed};
32pub use self::tasks::{IoContext, ReadContext, WriteContext, WriteContextBuf};
33pub use self::timer::TimerHandle;
34pub use self::utils::{seal, Decoded};
35
36#[doc(hidden)]
37pub use self::flags::Flags;
38
39#[doc(hidden)]
40pub trait AsyncRead {
41 async fn read(&mut self, buf: BytesVec) -> (BytesVec, IoResult<usize>);
42}
43
44#[doc(hidden)]
45pub trait AsyncWrite {
46 async fn write(&mut self, buf: &mut WriteContextBuf) -> IoResult<()>;
47
48 async fn flush(&mut self) -> IoResult<()>;
49
50 async fn shutdown(&mut self) -> IoResult<()>;
51}
52
53#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
55pub enum ReadStatus {
56 Ready,
58 Terminate,
60}
61
62#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
64pub enum WriteStatus {
65 Ready,
67 Shutdown,
69 Terminate,
71}
72
73#[allow(unused_variables)]
74pub trait FilterLayer: fmt::Debug + 'static {
75 const BUFFERS: bool = true;
77
78 #[inline]
79 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
81 Poll::Ready(ReadStatus::Ready)
82 }
83
84 #[inline]
85 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
87 Poll::Ready(WriteStatus::Ready)
88 }
89
90 fn process_read_buf(&self, buf: &ReadBuf<'_>) -> IoResult<usize>;
95
96 fn process_write_buf(&self, buf: &WriteBuf<'_>) -> IoResult<()>;
98
99 #[inline]
100 fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
102 None
103 }
104
105 #[inline]
106 fn shutdown(&self, buf: &WriteBuf<'_>) -> IoResult<Poll<()>> {
108 Ok(Poll::Ready(()))
109 }
110}
111
112pub trait IoStream {
113 fn start(self, _: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>>;
114}
115
116pub trait Handle {
117 fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
118}
119
120#[derive(Debug)]
122pub enum IoStatusUpdate {
123 KeepAlive,
125 WriteBackpressure,
127 Stop,
129 PeerGone(Option<IoError>),
131}
132
133#[derive(Debug)]
135pub enum RecvError<U: Decoder> {
136 KeepAlive,
138 WriteBackpressure,
140 Stop,
142 Decoder(U::Error),
144 PeerGone(Option<IoError>),
146}
147
148pub enum DispatchItem<U: Encoder + Decoder> {
150 Item(<U as Decoder>::Item),
151 WBackPressureEnabled,
153 WBackPressureDisabled,
155 KeepAliveTimeout,
157 ReadTimeout,
159 DecoderError(<U as Decoder>::Error),
161 EncoderError(<U as Encoder>::Error),
163 Disconnect(Option<IoError>),
165}
166
167impl<U> fmt::Debug for DispatchItem<U>
168where
169 U: Encoder + Decoder,
170 <U as Decoder>::Item: fmt::Debug,
171{
172 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
173 match *self {
174 DispatchItem::Item(ref item) => {
175 write!(fmt, "DispatchItem::Item({:?})", item)
176 }
177 DispatchItem::WBackPressureEnabled => {
178 write!(fmt, "DispatchItem::WBackPressureEnabled")
179 }
180 DispatchItem::WBackPressureDisabled => {
181 write!(fmt, "DispatchItem::WBackPressureDisabled")
182 }
183 DispatchItem::KeepAliveTimeout => {
184 write!(fmt, "DispatchItem::KeepAliveTimeout")
185 }
186 DispatchItem::ReadTimeout => {
187 write!(fmt, "DispatchItem::ReadTimeout")
188 }
189 DispatchItem::EncoderError(ref e) => {
190 write!(fmt, "DispatchItem::EncoderError({:?})", e)
191 }
192 DispatchItem::DecoderError(ref e) => {
193 write!(fmt, "DispatchItem::DecoderError({:?})", e)
194 }
195 DispatchItem::Disconnect(ref e) => {
196 write!(fmt, "DispatchItem::Disconnect({:?})", e)
197 }
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use ntex_codec::BytesCodec;
206 use std::io;
207
208 #[test]
209 fn test_fmt() {
210 type T = DispatchItem<BytesCodec>;
211
212 let err = T::EncoderError(io::Error::new(io::ErrorKind::Other, "err"));
213 assert!(format!("{:?}", err).contains("DispatchItem::Encoder"));
214 let err = T::DecoderError(io::Error::new(io::ErrorKind::Other, "err"));
215 assert!(format!("{:?}", err).contains("DispatchItem::Decoder"));
216 let err = T::Disconnect(Some(io::Error::new(io::ErrorKind::Other, "err")));
217 assert!(format!("{:?}", err).contains("DispatchItem::Disconnect"));
218
219 assert!(format!("{:?}", T::WBackPressureEnabled)
220 .contains("DispatchItem::WBackPressureEnabled"));
221 assert!(format!("{:?}", T::WBackPressureDisabled)
222 .contains("DispatchItem::WBackPressureDisabled"));
223 assert!(
224 format!("{:?}", T::KeepAliveTimeout).contains("DispatchItem::KeepAliveTimeout")
225 );
226 assert!(format!("{:?}", T::ReadTimeout).contains("DispatchItem::ReadTimeout"));
227
228 assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
229 assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
230 }
231}