ntex_io/
lib.rs

1//! Utilities for abstructing io streams
2#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3#![allow(async_fn_in_trait)]
4
5use std::io::{Error as IoError, Result as IoResult};
6use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
7
8pub mod testing;
9pub mod types;
10
11mod buf;
12mod dispatcher;
13mod filter;
14mod flags;
15mod framed;
16mod io;
17mod ioref;
18mod seal;
19mod tasks;
20mod timer;
21mod utils;
22
23use ntex_bytes::BytesVec;
24use ntex_codec::{Decoder, Encoder};
25
26pub use self::buf::{ReadBuf, WriteBuf};
27pub use self::dispatcher::{Dispatcher, DispatcherConfig};
28pub use self::filter::{Base, Filter, Layer};
29pub use self::framed::Framed;
30pub use self::io::{Io, IoRef, OnDisconnect};
31pub use self::seal::{IoBoxed, Sealed};
32pub use self::tasks::{IoContext, ReadContext, WriteContext, WriteContextBuf};
33pub use self::timer::TimerHandle;
34pub use self::utils::{seal, Decoded};
35
36#[doc(hidden)]
37pub use self::flags::Flags;
38
39#[doc(hidden)]
40pub trait AsyncRead {
41    async fn read(&mut self, buf: BytesVec) -> (BytesVec, IoResult<usize>);
42}
43
44#[doc(hidden)]
45pub trait AsyncWrite {
46    async fn write(&mut self, buf: &mut WriteContextBuf) -> IoResult<()>;
47
48    async fn flush(&mut self) -> IoResult<()>;
49
50    async fn shutdown(&mut self) -> IoResult<()>;
51}
52
53/// Status for read task
54#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
55pub enum ReadStatus {
56    /// Read task is clear to proceed with read operation
57    Ready,
58    /// Terminate read task
59    Terminate,
60}
61
62/// Status for write task
63#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
64pub enum WriteStatus {
65    /// Write task is clear to proceed with write operation
66    Ready,
67    /// Initiate graceful io shutdown operation
68    Shutdown,
69    /// Immediately terminate connection
70    Terminate,
71}
72
73#[allow(unused_variables)]
74pub trait FilterLayer: fmt::Debug + 'static {
75    /// Create buffers for this filter
76    const BUFFERS: bool = true;
77
78    #[inline]
79    /// Check readiness for read operations
80    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
81        Poll::Ready(ReadStatus::Ready)
82    }
83
84    #[inline]
85    /// Check readiness for write operations
86    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
87        Poll::Ready(WriteStatus::Ready)
88    }
89
90    /// Process read buffer
91    ///
92    /// Inner filter must process buffer before current.
93    /// Returns number of new bytes.
94    fn process_read_buf(&self, buf: &ReadBuf<'_>) -> IoResult<usize>;
95
96    /// Process write buffer
97    fn process_write_buf(&self, buf: &WriteBuf<'_>) -> IoResult<()>;
98
99    #[inline]
100    /// Query internal filter data
101    fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
102        None
103    }
104
105    #[inline]
106    /// Gracefully shutdown filter
107    fn shutdown(&self, buf: &WriteBuf<'_>) -> IoResult<Poll<()>> {
108        Ok(Poll::Ready(()))
109    }
110}
111
112pub trait IoStream {
113    fn start(self, _: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>>;
114}
115
116pub trait Handle {
117    fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
118}
119
120/// Io status
121#[derive(Debug)]
122pub enum IoStatusUpdate {
123    /// Keep-alive timeout occured
124    KeepAlive,
125    /// Write backpressure is enabled
126    WriteBackpressure,
127    /// Stop io stream handling
128    Stop,
129    /// Peer is disconnected
130    PeerGone(Option<IoError>),
131}
132
133/// Recv error
134#[derive(Debug)]
135pub enum RecvError<U: Decoder> {
136    /// Keep-alive timeout occured
137    KeepAlive,
138    /// Write backpressure is enabled
139    WriteBackpressure,
140    /// Stop io stream handling
141    Stop,
142    /// Unrecoverable frame decoding errors
143    Decoder(U::Error),
144    /// Peer is disconnected
145    PeerGone(Option<IoError>),
146}
147
148/// Dispatcher item
149pub enum DispatchItem<U: Encoder + Decoder> {
150    Item(<U as Decoder>::Item),
151    /// Write back-pressure enabled
152    WBackPressureEnabled,
153    /// Write back-pressure disabled
154    WBackPressureDisabled,
155    /// Keep alive timeout
156    KeepAliveTimeout,
157    /// Frame read timeout
158    ReadTimeout,
159    /// Decoder parse error
160    DecoderError(<U as Decoder>::Error),
161    /// Encoder parse error
162    EncoderError(<U as Encoder>::Error),
163    /// Socket is disconnected
164    Disconnect(Option<IoError>),
165}
166
167impl<U> fmt::Debug for DispatchItem<U>
168where
169    U: Encoder + Decoder,
170    <U as Decoder>::Item: fmt::Debug,
171{
172    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
173        match *self {
174            DispatchItem::Item(ref item) => {
175                write!(fmt, "DispatchItem::Item({:?})", item)
176            }
177            DispatchItem::WBackPressureEnabled => {
178                write!(fmt, "DispatchItem::WBackPressureEnabled")
179            }
180            DispatchItem::WBackPressureDisabled => {
181                write!(fmt, "DispatchItem::WBackPressureDisabled")
182            }
183            DispatchItem::KeepAliveTimeout => {
184                write!(fmt, "DispatchItem::KeepAliveTimeout")
185            }
186            DispatchItem::ReadTimeout => {
187                write!(fmt, "DispatchItem::ReadTimeout")
188            }
189            DispatchItem::EncoderError(ref e) => {
190                write!(fmt, "DispatchItem::EncoderError({:?})", e)
191            }
192            DispatchItem::DecoderError(ref e) => {
193                write!(fmt, "DispatchItem::DecoderError({:?})", e)
194            }
195            DispatchItem::Disconnect(ref e) => {
196                write!(fmt, "DispatchItem::Disconnect({:?})", e)
197            }
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use ntex_codec::BytesCodec;
206    use std::io;
207
208    #[test]
209    fn test_fmt() {
210        type T = DispatchItem<BytesCodec>;
211
212        let err = T::EncoderError(io::Error::new(io::ErrorKind::Other, "err"));
213        assert!(format!("{:?}", err).contains("DispatchItem::Encoder"));
214        let err = T::DecoderError(io::Error::new(io::ErrorKind::Other, "err"));
215        assert!(format!("{:?}", err).contains("DispatchItem::Decoder"));
216        let err = T::Disconnect(Some(io::Error::new(io::ErrorKind::Other, "err")));
217        assert!(format!("{:?}", err).contains("DispatchItem::Disconnect"));
218
219        assert!(format!("{:?}", T::WBackPressureEnabled)
220            .contains("DispatchItem::WBackPressureEnabled"));
221        assert!(format!("{:?}", T::WBackPressureDisabled)
222            .contains("DispatchItem::WBackPressureDisabled"));
223        assert!(
224            format!("{:?}", T::KeepAliveTimeout).contains("DispatchItem::KeepAliveTimeout")
225        );
226        assert!(format!("{:?}", T::ReadTimeout).contains("DispatchItem::ReadTimeout"));
227
228        assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
229        assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
230    }
231}