Skip to main content

ntex_io/
lib.rs

1//! Utilities for abstructing io streams
2#![deny(clippy::pedantic)]
3#![allow(
4    clippy::missing_fields_in_debug,
5    clippy::must_use_candidate,
6    clippy::missing_errors_doc
7)]
8
9use std::io::{Error as IoError, Result as IoResult};
10use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
11
12pub mod cfg;
13pub mod testing;
14pub mod types;
15
16mod buf;
17mod filter;
18mod filterptr;
19mod flags;
20mod framed;
21mod io;
22mod ioref;
23mod macros;
24mod seal;
25mod tasks;
26mod timer;
27mod utils;
28
29use ntex_codec::Decoder;
30
31pub use self::buf::{FilterCtx, ReadBuf, WriteBuf};
32pub use self::cfg::IoConfig;
33pub use self::filter::{Base, Filter, FilterReadStatus, Layer};
34pub use self::framed::Framed;
35pub use self::io::{Io, IoRef, OnDisconnect};
36pub use self::seal::{IoBoxed, Sealed};
37pub use self::tasks::IoContext;
38pub use self::timer::TimerHandle;
39pub use self::utils::{Decoded, seal};
40
41#[doc(hidden)]
42pub use self::flags::Flags;
43
44/// Filter ready state
45#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
46pub enum Readiness {
47    /// Io task is clear to proceed with io operations
48    Ready,
49    /// Initiate graceful io shutdown operation
50    Shutdown,
51    /// Immediately terminate connection
52    Terminate,
53}
54
55impl Readiness {
56    /// Merge two Readiness values
57    pub fn merge(val1: Poll<Readiness>, val2: Poll<Readiness>) -> Poll<Readiness> {
58        match val1 {
59            Poll::Pending => Poll::Pending,
60            Poll::Ready(Readiness::Ready) => val2,
61            Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
62            Poll::Ready(Readiness::Shutdown) => {
63                if val2 == Poll::Ready(Readiness::Terminate) {
64                    Poll::Ready(Readiness::Terminate)
65                } else {
66                    Poll::Ready(Readiness::Shutdown)
67                }
68            }
69        }
70    }
71}
72
73#[allow(unused_variables)]
74pub trait FilterLayer: fmt::Debug + 'static {
75    #[inline]
76    /// Check readiness for read operations
77    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
78        Poll::Ready(Readiness::Ready)
79    }
80
81    #[inline]
82    /// Check readiness for write operations
83    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
84        Poll::Ready(Readiness::Ready)
85    }
86
87    /// Process read buffer
88    ///
89    /// Inner filter must process buffer before current.
90    /// Returns number of new bytes.
91    fn process_read_buf(&self, buf: &ReadBuf<'_>) -> IoResult<usize>;
92
93    /// Process write buffer
94    fn process_write_buf(&self, buf: &WriteBuf<'_>) -> IoResult<()>;
95
96    #[inline]
97    /// Query internal filter data
98    fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
99        None
100    }
101
102    #[inline]
103    /// Gracefully shutdown filter
104    fn shutdown(&self, buf: &WriteBuf<'_>) -> IoResult<Poll<()>> {
105        Ok(Poll::Ready(()))
106    }
107}
108
109pub trait IoStream {
110    fn start(self, _: IoContext) -> Option<Box<dyn Handle>>;
111}
112
113pub trait Handle {
114    fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
115}
116
117/// Status for read task
118#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
119pub enum IoTaskStatus {
120    /// More io ops
121    Io,
122    /// Pause io task
123    Pause,
124    /// Stop io task
125    Stop,
126}
127
128impl IoTaskStatus {
129    #[inline]
130    /// Ready for more io ops
131    pub fn ready(self) -> bool {
132        self == IoTaskStatus::Io
133    }
134}
135
136/// Io status
137#[derive(Debug)]
138pub enum IoStatusUpdate {
139    /// Keep-alive timeout occured
140    KeepAlive,
141    /// Write backpressure is enabled
142    WriteBackpressure,
143    /// Peer is disconnected
144    PeerGone(Option<IoError>),
145}
146
147/// Recv error
148pub enum RecvError<U: Decoder> {
149    /// Keep-alive timeout occured
150    KeepAlive,
151    /// Write backpressure is enabled
152    WriteBackpressure,
153    /// Unrecoverable frame decoding errors
154    Decoder(U::Error),
155    /// Peer is disconnected
156    PeerGone(Option<IoError>),
157}
158
159impl<U> fmt::Debug for RecvError<U>
160where
161    U: Decoder,
162    <U as Decoder>::Error: fmt::Debug,
163{
164    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
165        match *self {
166            RecvError::KeepAlive => {
167                write!(fmt, "RecvError::KeepAlive")
168            }
169            RecvError::WriteBackpressure => {
170                write!(fmt, "RecvError::WriteBackpressure")
171            }
172            RecvError::Decoder(ref e) => {
173                write!(fmt, "RecvError::Decoder({e:?})")
174            }
175            RecvError::PeerGone(ref e) => {
176                write!(fmt, "RecvError::PeerGone({e:?})")
177            }
178        }
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use ntex_codec::BytesCodec;
186    use std::io;
187
188    #[test]
189    fn test_fmt() {
190        assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
191        assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
192        assert!(
193            format!("{:?}", RecvError::<BytesCodec>::WriteBackpressure)
194                .contains("WriteBackpressure")
195        );
196        assert!(
197            format!(
198                "{:?}",
199                RecvError::<BytesCodec>::Decoder(io::Error::other("err"))
200            )
201            .contains("RecvError::Decoder")
202        );
203        assert!(
204            format!(
205                "{:?}",
206                RecvError::<BytesCodec>::PeerGone(Some(io::Error::other("err")))
207            )
208            .contains("RecvError::PeerGone")
209        );
210    }
211}