Skip to main content

ntex_io/
lib.rs

1//! Utilities for abstructing io streams
2#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3#![allow(deprecated, async_fn_in_trait)]
4
5use std::io::{Error as IoError, Result as IoResult};
6use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
7
8pub mod cfg;
9pub mod testing;
10pub mod types;
11
12mod buf;
13mod dispatcher;
14mod filter;
15mod flags;
16mod framed;
17mod io;
18mod ioref;
19mod macros;
20mod seal;
21mod tasks;
22mod timer;
23mod utils;
24
25use ntex_codec::{Decoder, Encoder};
26
27pub use self::buf::{FilterCtx, ReadBuf, WriteBuf};
28pub use self::cfg::IoConfig;
29pub use self::dispatcher::Dispatcher;
30pub use self::filter::{Base, Filter, FilterReadStatus, Layer};
31pub use self::framed::Framed;
32pub use self::io::{Io, IoRef, OnDisconnect};
33pub use self::seal::{IoBoxed, Sealed};
34pub use self::tasks::IoContext;
35pub use self::timer::TimerHandle;
36pub use self::utils::{Decoded, seal};
37
38#[doc(hidden)]
39pub use self::flags::Flags;
40
41/// Filter ready state
42#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
43pub enum Readiness {
44    /// Io task is clear to proceed with io operations
45    Ready,
46    /// Initiate graceful io shutdown operation
47    Shutdown,
48    /// Immediately terminate connection
49    Terminate,
50}
51
52impl Readiness {
53    /// Merge two Readiness values
54    pub fn merge(val1: Poll<Readiness>, val2: Poll<Readiness>) -> Poll<Readiness> {
55        match val1 {
56            Poll::Pending => Poll::Pending,
57            Poll::Ready(Readiness::Ready) => val2,
58            Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
59            Poll::Ready(Readiness::Shutdown) => {
60                if val2 == Poll::Ready(Readiness::Terminate) {
61                    Poll::Ready(Readiness::Terminate)
62                } else {
63                    Poll::Ready(Readiness::Shutdown)
64                }
65            }
66        }
67    }
68}
69
70#[allow(unused_variables)]
71pub trait FilterLayer: fmt::Debug + 'static {
72    #[inline]
73    /// Check readiness for read operations
74    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
75        Poll::Ready(Readiness::Ready)
76    }
77
78    #[inline]
79    /// Check readiness for write operations
80    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
81        Poll::Ready(Readiness::Ready)
82    }
83
84    /// Process read buffer
85    ///
86    /// Inner filter must process buffer before current.
87    /// Returns number of new bytes.
88    fn process_read_buf(&self, buf: &ReadBuf<'_>) -> IoResult<usize>;
89
90    /// Process write buffer
91    fn process_write_buf(&self, buf: &WriteBuf<'_>) -> IoResult<()>;
92
93    #[inline]
94    /// Query internal filter data
95    fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
96        None
97    }
98
99    #[inline]
100    /// Gracefully shutdown filter
101    fn shutdown(&self, buf: &WriteBuf<'_>) -> IoResult<Poll<()>> {
102        Ok(Poll::Ready(()))
103    }
104}
105
106pub trait IoStream {
107    fn start(self, _: IoContext) -> Option<Box<dyn Handle>>;
108}
109
110pub trait Handle {
111    fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
112}
113
114/// Status for read task
115#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
116pub enum IoTaskStatus {
117    /// More io ops
118    Io,
119    /// Pause io task
120    Pause,
121    /// Stop io task
122    Stop,
123}
124
125impl IoTaskStatus {
126    #[inline]
127    /// Ready for more io ops
128    pub fn ready(self) -> bool {
129        self == IoTaskStatus::Io
130    }
131}
132
133/// Io status
134#[derive(Debug)]
135pub enum IoStatusUpdate {
136    /// Keep-alive timeout occured
137    KeepAlive,
138    /// Write backpressure is enabled
139    WriteBackpressure,
140    /// Peer is disconnected
141    PeerGone(Option<IoError>),
142}
143
144/// Recv error
145pub enum RecvError<U: Decoder> {
146    /// Keep-alive timeout occured
147    KeepAlive,
148    /// Write backpressure is enabled
149    WriteBackpressure,
150    /// Unrecoverable frame decoding errors
151    Decoder(U::Error),
152    /// Peer is disconnected
153    PeerGone(Option<IoError>),
154}
155
156impl<U> fmt::Debug for RecvError<U>
157where
158    U: Decoder,
159    <U as Decoder>::Error: fmt::Debug,
160{
161    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
162        match *self {
163            RecvError::KeepAlive => {
164                write!(fmt, "RecvError::KeepAlive")
165            }
166            RecvError::WriteBackpressure => {
167                write!(fmt, "RecvError::WriteBackpressure")
168            }
169            RecvError::Decoder(ref e) => {
170                write!(fmt, "RecvError::Decoder({e:?})")
171            }
172            RecvError::PeerGone(ref e) => {
173                write!(fmt, "RecvError::PeerGone({e:?})")
174            }
175        }
176    }
177}
178
179#[deprecated]
180/// Dispatcher item
181pub enum DispatchItem<U: Encoder + Decoder> {
182    Item(<U as Decoder>::Item),
183    /// Write back-pressure enabled
184    WBackPressureEnabled,
185    /// Write back-pressure disabled
186    WBackPressureDisabled,
187    /// Keep alive timeout
188    KeepAliveTimeout,
189    /// Frame read timeout
190    ReadTimeout,
191    /// Decoder parse error
192    DecoderError(<U as Decoder>::Error),
193    /// Encoder parse error
194    EncoderError(<U as Encoder>::Error),
195    /// Socket is disconnected
196    Disconnect(Option<IoError>),
197}
198
199impl<U> fmt::Debug for DispatchItem<U>
200where
201    U: Encoder + Decoder,
202    <U as Decoder>::Item: fmt::Debug,
203{
204    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
205        match *self {
206            DispatchItem::Item(ref item) => {
207                write!(fmt, "DispatchItem::Item({item:?})")
208            }
209            DispatchItem::WBackPressureEnabled => {
210                write!(fmt, "DispatchItem::WBackPressureEnabled")
211            }
212            DispatchItem::WBackPressureDisabled => {
213                write!(fmt, "DispatchItem::WBackPressureDisabled")
214            }
215            DispatchItem::KeepAliveTimeout => {
216                write!(fmt, "DispatchItem::KeepAliveTimeout")
217            }
218            DispatchItem::ReadTimeout => {
219                write!(fmt, "DispatchItem::ReadTimeout")
220            }
221            DispatchItem::EncoderError(ref e) => {
222                write!(fmt, "DispatchItem::EncoderError({e:?})")
223            }
224            DispatchItem::DecoderError(ref e) => {
225                write!(fmt, "DispatchItem::DecoderError({e:?})")
226            }
227            DispatchItem::Disconnect(ref e) => {
228                write!(fmt, "DispatchItem::Disconnect({e:?})")
229            }
230        }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use ntex_codec::BytesCodec;
238    use std::io;
239
240    #[test]
241    fn test_fmt() {
242        type T = DispatchItem<BytesCodec>;
243
244        let err = T::EncoderError(io::Error::other("err"));
245        assert!(format!("{err:?}").contains("DispatchItem::Encoder"));
246        let err = T::DecoderError(io::Error::other("err"));
247        assert!(format!("{err:?}").contains("DispatchItem::Decoder"));
248        let err = T::Disconnect(Some(io::Error::other("err")));
249        assert!(format!("{err:?}").contains("DispatchItem::Disconnect"));
250
251        assert!(
252            format!("{:?}", T::WBackPressureEnabled)
253                .contains("DispatchItem::WBackPressureEnabled")
254        );
255        assert!(
256            format!("{:?}", T::WBackPressureDisabled)
257                .contains("DispatchItem::WBackPressureDisabled")
258        );
259        assert!(
260            format!("{:?}", T::KeepAliveTimeout).contains("DispatchItem::KeepAliveTimeout")
261        );
262        assert!(format!("{:?}", T::ReadTimeout).contains("DispatchItem::ReadTimeout"));
263
264        assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
265        assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
266        assert!(
267            format!("{:?}", RecvError::<BytesCodec>::WriteBackpressure)
268                .contains("WriteBackpressure")
269        );
270        assert!(
271            format!(
272                "{:?}",
273                RecvError::<BytesCodec>::Decoder(io::Error::other("err"))
274            )
275            .contains("RecvError::Decoder")
276        );
277        assert!(
278            format!(
279                "{:?}",
280                RecvError::<BytesCodec>::PeerGone(Some(io::Error::other("err")))
281            )
282            .contains("RecvError::PeerGone")
283        );
284    }
285}