ntex_io/
lib.rs

1//! Utilities for abstructing io streams
2#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3#![allow(async_fn_in_trait)]
4
5use std::io::{Error as IoError, Result as IoResult};
6use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
7
8pub mod testing;
9pub mod types;
10
11mod buf;
12mod dispatcher;
13mod filter;
14mod flags;
15mod framed;
16mod io;
17mod ioref;
18mod macros;
19mod seal;
20mod tasks;
21mod timer;
22mod utils;
23
24use ntex_bytes::BytesVec;
25use ntex_codec::{Decoder, Encoder};
26
27pub use self::buf::{FilterCtx, ReadBuf, WriteBuf};
28pub use self::dispatcher::{Dispatcher, DispatcherConfig};
29pub use self::filter::{Base, Filter, FilterReadStatus, Layer};
30pub use self::framed::Framed;
31pub use self::io::{Io, IoRef, OnDisconnect};
32pub use self::seal::{IoBoxed, Sealed};
33pub use self::tasks::{IoContext, ReadContext, WriteContext, WriteContextBuf};
34pub use self::timer::TimerHandle;
35pub use self::utils::{seal, Decoded};
36
37#[doc(hidden)]
38pub use self::flags::Flags;
39
40#[doc(hidden)]
41pub trait AsyncRead {
42    async fn read(&mut self, buf: BytesVec) -> (BytesVec, IoResult<usize>);
43}
44
45#[doc(hidden)]
46pub trait AsyncWrite {
47    async fn write(&mut self, buf: &mut WriteContextBuf) -> IoResult<()>;
48
49    async fn flush(&mut self) -> IoResult<()>;
50
51    async fn shutdown(&mut self) -> IoResult<()>;
52}
53
54/// Filter ready state
55#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
56pub enum Readiness {
57    /// Io task is clear to proceed with io operations
58    Ready,
59    /// Initiate graceful io shutdown operation
60    Shutdown,
61    /// Immediately terminate connection
62    Terminate,
63}
64
65impl Readiness {
66    /// Merge two Readiness values
67    pub fn merge(val1: Poll<Readiness>, val2: Poll<Readiness>) -> Poll<Readiness> {
68        match val1 {
69            Poll::Pending => Poll::Pending,
70            Poll::Ready(Readiness::Ready) => val2,
71            Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
72            Poll::Ready(Readiness::Shutdown) => {
73                if val2 == Poll::Ready(Readiness::Terminate) {
74                    Poll::Ready(Readiness::Terminate)
75                } else {
76                    Poll::Ready(Readiness::Shutdown)
77                }
78            }
79        }
80    }
81}
82
83#[allow(unused_variables)]
84pub trait FilterLayer: fmt::Debug + 'static {
85    #[inline]
86    /// Check readiness for read operations
87    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
88        Poll::Ready(Readiness::Ready)
89    }
90
91    #[inline]
92    /// Check readiness for write operations
93    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
94        Poll::Ready(Readiness::Ready)
95    }
96
97    /// Process read buffer
98    ///
99    /// Inner filter must process buffer before current.
100    /// Returns number of new bytes.
101    fn process_read_buf(&self, buf: &ReadBuf<'_>) -> IoResult<usize>;
102
103    /// Process write buffer
104    fn process_write_buf(&self, buf: &WriteBuf<'_>) -> IoResult<()>;
105
106    #[inline]
107    /// Query internal filter data
108    fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
109        None
110    }
111
112    #[inline]
113    /// Gracefully shutdown filter
114    fn shutdown(&self, buf: &WriteBuf<'_>) -> IoResult<Poll<()>> {
115        Ok(Poll::Ready(()))
116    }
117}
118
119pub trait IoStream {
120    fn start(self, _: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>>;
121}
122
123pub trait Handle {
124    fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
125}
126
127/// Status for read task
128#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
129pub enum IoTaskStatus {
130    /// More io ops
131    Io,
132    /// Pause io task
133    Pause,
134}
135
136/// Io status
137#[derive(Debug)]
138pub enum IoStatusUpdate {
139    /// Keep-alive timeout occured
140    KeepAlive,
141    /// Write backpressure is enabled
142    WriteBackpressure,
143    /// Stop io stream handling
144    Stop,
145    /// Peer is disconnected
146    PeerGone(Option<IoError>),
147}
148
149/// Recv error
150#[derive(Debug)]
151pub enum RecvError<U: Decoder> {
152    /// Keep-alive timeout occured
153    KeepAlive,
154    /// Write backpressure is enabled
155    WriteBackpressure,
156    /// Stop io stream handling
157    Stop,
158    /// Unrecoverable frame decoding errors
159    Decoder(U::Error),
160    /// Peer is disconnected
161    PeerGone(Option<IoError>),
162}
163
164/// Dispatcher item
165pub enum DispatchItem<U: Encoder + Decoder> {
166    Item(<U as Decoder>::Item),
167    /// Write back-pressure enabled
168    WBackPressureEnabled,
169    /// Write back-pressure disabled
170    WBackPressureDisabled,
171    /// Keep alive timeout
172    KeepAliveTimeout,
173    /// Frame read timeout
174    ReadTimeout,
175    /// Decoder parse error
176    DecoderError(<U as Decoder>::Error),
177    /// Encoder parse error
178    EncoderError(<U as Encoder>::Error),
179    /// Socket is disconnected
180    Disconnect(Option<IoError>),
181}
182
183impl<U> fmt::Debug for DispatchItem<U>
184where
185    U: Encoder + Decoder,
186    <U as Decoder>::Item: fmt::Debug,
187{
188    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
189        match *self {
190            DispatchItem::Item(ref item) => {
191                write!(fmt, "DispatchItem::Item({item:?})")
192            }
193            DispatchItem::WBackPressureEnabled => {
194                write!(fmt, "DispatchItem::WBackPressureEnabled")
195            }
196            DispatchItem::WBackPressureDisabled => {
197                write!(fmt, "DispatchItem::WBackPressureDisabled")
198            }
199            DispatchItem::KeepAliveTimeout => {
200                write!(fmt, "DispatchItem::KeepAliveTimeout")
201            }
202            DispatchItem::ReadTimeout => {
203                write!(fmt, "DispatchItem::ReadTimeout")
204            }
205            DispatchItem::EncoderError(ref e) => {
206                write!(fmt, "DispatchItem::EncoderError({e:?})")
207            }
208            DispatchItem::DecoderError(ref e) => {
209                write!(fmt, "DispatchItem::DecoderError({e:?})")
210            }
211            DispatchItem::Disconnect(ref e) => {
212                write!(fmt, "DispatchItem::Disconnect({e:?})")
213            }
214        }
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use ntex_codec::BytesCodec;
222    use std::io;
223
224    #[test]
225    fn test_fmt() {
226        type T = DispatchItem<BytesCodec>;
227
228        let err = T::EncoderError(io::Error::other("err"));
229        assert!(format!("{err:?}").contains("DispatchItem::Encoder"));
230        let err = T::DecoderError(io::Error::other("err"));
231        assert!(format!("{err:?}").contains("DispatchItem::Decoder"));
232        let err = T::Disconnect(Some(io::Error::other("err")));
233        assert!(format!("{err:?}").contains("DispatchItem::Disconnect"));
234
235        assert!(format!("{:?}", T::WBackPressureEnabled)
236            .contains("DispatchItem::WBackPressureEnabled"));
237        assert!(format!("{:?}", T::WBackPressureDisabled)
238            .contains("DispatchItem::WBackPressureDisabled"));
239        assert!(
240            format!("{:?}", T::KeepAliveTimeout).contains("DispatchItem::KeepAliveTimeout")
241        );
242        assert!(format!("{:?}", T::ReadTimeout).contains("DispatchItem::ReadTimeout"));
243
244        assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
245        assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
246    }
247}