Skip to main content

ntex_io/
lib.rs

1//! Utilities for abstructing io streams
2#![deny(clippy::pedantic)]
3#![allow(
4    clippy::missing_fields_in_debug,
5    clippy::must_use_candidate,
6    clippy::return_self_not_must_use,
7    clippy::missing_errors_doc
8)]
9
10use std::io::{Error as IoError, Result as IoResult};
11use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
12
13pub mod cfg;
14pub mod testing;
15pub mod types;
16
17mod buf;
18mod filter;
19mod filterptr;
20mod flags;
21mod framed;
22mod io;
23mod ioref;
24mod macros;
25mod seal;
26mod tasks;
27mod timer;
28mod utils;
29
30use ntex_codec::Decoder;
31
32pub use self::buf::{FilterCtx, ReadBuf, WriteBuf};
33pub use self::cfg::IoConfig;
34pub use self::filter::{Base, Filter, FilterReadStatus, Layer};
35pub use self::framed::Framed;
36pub use self::io::{Io, IoRef, OnDisconnect};
37pub use self::seal::{IoBoxed, Sealed};
38pub use self::tasks::IoContext;
39pub use self::timer::TimerHandle;
40pub use self::utils::{Decoded, seal};
41
42#[doc(hidden)]
43pub use self::flags::Flags;
44
45/// Filter ready state
46#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
47pub enum Readiness {
48    /// Io task is clear to proceed with io operations
49    Ready,
50    /// Initiate graceful io shutdown operation
51    Shutdown,
52    /// Immediately terminate connection
53    Terminate,
54}
55
56impl Readiness {
57    /// Merge two Readiness values
58    pub fn merge(val1: Poll<Readiness>, val2: Poll<Readiness>) -> Poll<Readiness> {
59        match val1 {
60            Poll::Pending => Poll::Pending,
61            Poll::Ready(Readiness::Ready) => val2,
62            Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
63            Poll::Ready(Readiness::Shutdown) => {
64                if val2 == Poll::Ready(Readiness::Terminate) {
65                    Poll::Ready(Readiness::Terminate)
66                } else {
67                    Poll::Ready(Readiness::Shutdown)
68                }
69            }
70        }
71    }
72}
73
74#[allow(unused_variables)]
75pub trait FilterLayer: fmt::Debug + 'static {
76    #[inline]
77    /// Check readiness for read operations
78    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
79        Poll::Ready(Readiness::Ready)
80    }
81
82    #[inline]
83    /// Check readiness for write operations
84    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
85        Poll::Ready(Readiness::Ready)
86    }
87
88    /// Process read buffer
89    ///
90    /// Inner filter must process buffer before current.
91    /// Returns number of new bytes.
92    fn process_read_buf(&self, buf: &ReadBuf<'_>) -> IoResult<usize>;
93
94    /// Process write buffer
95    fn process_write_buf(&self, buf: &WriteBuf<'_>) -> IoResult<()>;
96
97    #[inline]
98    /// Query internal filter data
99    fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
100        None
101    }
102
103    #[inline]
104    /// Gracefully shutdown filter
105    fn shutdown(&self, buf: &WriteBuf<'_>) -> IoResult<Poll<()>> {
106        Ok(Poll::Ready(()))
107    }
108}
109
110pub trait IoStream {
111    fn start(self, _: IoContext) -> Option<Box<dyn Handle>>;
112}
113
114pub trait Handle {
115    fn query(&self, id: TypeId) -> Option<Box<dyn Any>>;
116}
117
118/// Status for read task
119#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
120pub enum IoTaskStatus {
121    /// More io ops
122    Io,
123    /// Pause io task
124    Pause,
125    /// Stop io task
126    Stop,
127}
128
129impl IoTaskStatus {
130    #[inline]
131    /// Ready for more io ops
132    pub fn ready(self) -> bool {
133        self == IoTaskStatus::Io
134    }
135}
136
137/// Io status
138#[derive(Debug)]
139pub enum IoStatusUpdate {
140    /// Keep-alive timeout occured
141    KeepAlive,
142    /// Write backpressure is enabled
143    WriteBackpressure,
144    /// Peer is disconnected
145    PeerGone(Option<IoError>),
146}
147
148/// Recv error
149pub enum RecvError<U: Decoder> {
150    /// Keep-alive timeout occured
151    KeepAlive,
152    /// Write backpressure is enabled
153    WriteBackpressure,
154    /// Unrecoverable frame decoding errors
155    Decoder(U::Error),
156    /// Peer is disconnected
157    PeerGone(Option<IoError>),
158}
159
160impl<U> fmt::Debug for RecvError<U>
161where
162    U: Decoder,
163    <U as Decoder>::Error: fmt::Debug,
164{
165    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
166        match *self {
167            RecvError::KeepAlive => {
168                write!(fmt, "RecvError::KeepAlive")
169            }
170            RecvError::WriteBackpressure => {
171                write!(fmt, "RecvError::WriteBackpressure")
172            }
173            RecvError::Decoder(ref e) => {
174                write!(fmt, "RecvError::Decoder({e:?})")
175            }
176            RecvError::PeerGone(ref e) => {
177                write!(fmt, "RecvError::PeerGone({e:?})")
178            }
179        }
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use ntex_codec::BytesCodec;
187    use std::io;
188
189    #[test]
190    fn test_fmt() {
191        assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
192        assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
193        assert!(
194            format!("{:?}", RecvError::<BytesCodec>::WriteBackpressure)
195                .contains("WriteBackpressure")
196        );
197        assert!(
198            format!(
199                "{:?}",
200                RecvError::<BytesCodec>::Decoder(io::Error::other("err"))
201            )
202            .contains("RecvError::Decoder")
203        );
204        assert!(
205            format!(
206                "{:?}",
207                RecvError::<BytesCodec>::PeerGone(Some(io::Error::other("err")))
208            )
209            .contains("RecvError::PeerGone")
210        );
211    }
212}