Skip to main content

ntex_io/
lib.rs

1//! Utilities for abstructing io streams
2#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3#![allow(deprecated, async_fn_in_trait)]
4
5use std::io::{Error as IoError, Result as IoResult};
6use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
7
8pub mod cfg;
9pub mod testing;
10pub mod types;
11
12mod buf;
13mod filter;
14mod flags;
15mod framed;
16mod io;
17mod ioref;
18mod macros;
19mod seal;
20mod tasks;
21mod timer;
22mod utils;
23
24use ntex_codec::Decoder;
25
26pub use self::buf::{FilterCtx, ReadBuf, WriteBuf};
27pub use self::cfg::IoConfig;
28pub use self::filter::{Base, Filter, FilterReadStatus, Layer};
29pub use self::framed::Framed;
30pub use self::io::{Io, IoRef, OnDisconnect};
31pub use self::seal::{IoBoxed, Sealed};
32pub use self::tasks::IoContext;
33pub use self::timer::TimerHandle;
34pub use self::utils::{Decoded, seal};
35
36#[doc(hidden)]
37pub use self::flags::Flags;
38
39/// Filter ready state
40#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
41pub enum Readiness {
42    /// Io task is clear to proceed with io operations
43    Ready,
44    /// Initiate graceful io shutdown operation
45    Shutdown,
46    /// Immediately terminate connection
47    Terminate,
48}
49
50impl Readiness {
51    /// Merge two Readiness values
52    pub fn merge(val1: Poll<Readiness>, val2: Poll<Readiness>) -> Poll<Readiness> {
53        match val1 {
54            Poll::Pending => Poll::Pending,
55            Poll::Ready(Readiness::Ready) => val2,
56            Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
57            Poll::Ready(Readiness::Shutdown) => {
58                if val2 == Poll::Ready(Readiness::Terminate) {
59                    Poll::Ready(Readiness::Terminate)
60                } else {
61                    Poll::Ready(Readiness::Shutdown)
62                }
63            }
64        }
65    }
66}
67
68#[allow(unused_variables)]
69pub trait FilterLayer: fmt::Debug + 'static {
70    #[inline]
71    /// Check readiness for read operations
72    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
73        Poll::Ready(Readiness::Ready)
74    }
75
76    #[inline]
77    /// Check readiness for write operations
78    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
79        Poll::Ready(Readiness::Ready)
80    }
81
82    /// Process read buffer
83    ///
84    /// Inner filter must process buffer before current.
85    /// Returns number of new bytes.
86    fn process_read_buf(&self, buf: &ReadBuf<'_>) -> IoResult<usize>;
87
88    /// Process write buffer
89    fn process_write_buf(&self, buf: &WriteBuf<'_>) -> IoResult<()>;
90
91    #[inline]
92    /// Query internal filter data
93    fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
94        None
95    }
96
97    #[inline]
98    /// Gracefully shutdown filter
99    fn shutdown(&self, buf: &WriteBuf<'_>) -> IoResult<Poll<()>> {
100        Ok(Poll::Ready(()))
101    }
102}
103
104pub trait IoStream {
105    fn start(self, _: IoContext) -> Option<Box<dyn Handle>>;
106}
107
108pub trait Handle {
109    fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
110}
111
112/// Status for read task
113#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
114pub enum IoTaskStatus {
115    /// More io ops
116    Io,
117    /// Pause io task
118    Pause,
119    /// Stop io task
120    Stop,
121}
122
123impl IoTaskStatus {
124    #[inline]
125    /// Ready for more io ops
126    pub fn ready(self) -> bool {
127        self == IoTaskStatus::Io
128    }
129}
130
131/// Io status
132#[derive(Debug)]
133pub enum IoStatusUpdate {
134    /// Keep-alive timeout occured
135    KeepAlive,
136    /// Write backpressure is enabled
137    WriteBackpressure,
138    /// Peer is disconnected
139    PeerGone(Option<IoError>),
140}
141
142/// Recv error
143pub enum RecvError<U: Decoder> {
144    /// Keep-alive timeout occured
145    KeepAlive,
146    /// Write backpressure is enabled
147    WriteBackpressure,
148    /// Unrecoverable frame decoding errors
149    Decoder(U::Error),
150    /// Peer is disconnected
151    PeerGone(Option<IoError>),
152}
153
154impl<U> fmt::Debug for RecvError<U>
155where
156    U: Decoder,
157    <U as Decoder>::Error: fmt::Debug,
158{
159    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
160        match *self {
161            RecvError::KeepAlive => {
162                write!(fmt, "RecvError::KeepAlive")
163            }
164            RecvError::WriteBackpressure => {
165                write!(fmt, "RecvError::WriteBackpressure")
166            }
167            RecvError::Decoder(ref e) => {
168                write!(fmt, "RecvError::Decoder({e:?})")
169            }
170            RecvError::PeerGone(ref e) => {
171                write!(fmt, "RecvError::PeerGone({e:?})")
172            }
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use ntex_codec::BytesCodec;
181    use std::io;
182
183    #[test]
184    fn test_fmt() {
185        assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
186        assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
187        assert!(
188            format!("{:?}", RecvError::<BytesCodec>::WriteBackpressure)
189                .contains("WriteBackpressure")
190        );
191        assert!(
192            format!(
193                "{:?}",
194                RecvError::<BytesCodec>::Decoder(io::Error::other("err"))
195            )
196            .contains("RecvError::Decoder")
197        );
198        assert!(
199            format!(
200                "{:?}",
201                RecvError::<BytesCodec>::PeerGone(Some(io::Error::other("err")))
202            )
203            .contains("RecvError::PeerGone")
204        );
205    }
206}