Skip to main content

ntex_io/
lib.rs

1//! Utilities for abstructing io streams
2#![deny(clippy::pedantic)]
3#![allow(
4    clippy::missing_fields_in_debug,
5    clippy::missing_errors_doc,
6    clippy::missing_panics_doc,
7    clippy::must_use_candidate
8)]
9use std::io::{Error as IoError, Result as IoResult};
10use std::{any::Any, any::TypeId, fmt, task::Poll};
11
12pub mod cfg;
13pub mod testing;
14pub mod types;
15
16mod buf;
17mod ctx;
18mod filter;
19mod filterptr;
20mod flags;
21mod framed;
22mod io;
23mod ioref;
24mod macros;
25mod ops;
26mod seal;
27mod utils;
28
29use ntex_codec::Decoder;
30
31pub use self::buf::{FilterBuf, FilterCtx};
32pub use self::cfg::IoConfig;
33pub use self::ctx::IoContext;
34pub use self::filter::{Base, Filter, Layer};
35pub use self::framed::Framed;
36pub use self::io::{Io, IoRef, OnDisconnect};
37pub use self::ops::{Id, TimerHandle};
38pub use self::seal::{IoBoxed, Sealed};
39pub use self::utils::{Decoded, seal};
40
41#[doc(hidden)]
42pub use self::flags::Flags;
43
44/// Filter readiness state.
45#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
46pub enum Readiness {
47    /// The I/O task may proceed with I/O operations.
48    Ready,
49    /// Initiates a graceful I/O shutdown.
50    Shutdown,
51    /// Immediately terminates the I/O stream.
52    Terminate,
53}
54
55impl Readiness {
56    /// Merges two readiness states.
57    pub fn merge(val1: Poll<Readiness>, val2: Poll<Readiness>) -> Poll<Readiness> {
58        match val1 {
59            Poll::Pending => Poll::Pending,
60            Poll::Ready(Readiness::Ready) => val2,
61            Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
62            Poll::Ready(Readiness::Shutdown) => {
63                if val2 == Poll::Ready(Readiness::Terminate) {
64                    Poll::Ready(Readiness::Terminate)
65                } else {
66                    Poll::Ready(Readiness::Shutdown)
67                }
68            }
69        }
70    }
71}
72
73#[allow(unused_variables)]
74pub trait FilterLayer: fmt::Debug + 'static {
75    /// Accesses internal filter information.
76    fn query(&self, id: TypeId) -> Option<Box<dyn Any>> {
77        None
78    }
79
80    /// Processes incoming read-buffer data.
81    fn process_read_buf(&self, buf: &FilterBuf<'_>) -> IoResult<()>;
82
83    /// Processes outgoing write-buffer data.
84    fn process_write_buf(&self, buf: &FilterBuf<'_>) -> IoResult<()>;
85
86    /// Performs a graceful shutdown of the filter.
87    fn shutdown(&self, buf: &FilterBuf<'_>) -> IoResult<Poll<()>> {
88        Ok(Poll::Ready(()))
89    }
90}
91
92pub trait IoStream {
93    fn start(self, _: IoContext) -> Box<dyn Handle>;
94}
95
96pub trait Handle {
97    fn query(&self, _: TypeId) -> Option<Box<dyn Any>> {
98        None
99    }
100
101    #[inline]
102    /// Initiate io write operation
103    fn write(&self, _: &IoContext) {}
104
105    #[inline]
106    /// Called when readiness changes
107    fn notify(&self, ctx: &IoContext) {
108        ctx.notify();
109    }
110}
111
112/// Current status of the I/O state.
113#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
114pub enum IoTaskStatus {
115    /// Continue performing I/O operations.
116    Io,
117    /// Pause I/O processing temporarily.
118    Pause,
119    /// Stop the I/O task.
120    Stop,
121}
122
123/// I/O status update events.
124#[derive(Debug)]
125pub enum IoStatusUpdate {
126    /// Keep-alive timeout has occurred.
127    KeepAlive,
128    /// Write backpressure is currently active.
129    WriteBackpressure,
130    /// Peer has disconnected.
131    PeerGone(Option<IoError>),
132}
133
134/// Errors that can occur while receiving data.
135pub enum RecvError<U: Decoder> {
136    /// A keep-alive timeout occurred.
137    KeepAlive,
138    /// Write backpressure is currently active.
139    WriteBackpressure,
140    /// Failed to decode an incoming frame.
141    Decoder(U::Error),
142    /// The peer has disconnected.
143    PeerGone(Option<IoError>),
144}
145
146impl<U> fmt::Debug for RecvError<U>
147where
148    U: Decoder,
149    <U as Decoder>::Error: fmt::Debug,
150{
151    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
152        match *self {
153            RecvError::KeepAlive => {
154                write!(fmt, "RecvError::KeepAlive")
155            }
156            RecvError::WriteBackpressure => {
157                write!(fmt, "RecvError::WriteBackpressure")
158            }
159            RecvError::Decoder(ref e) => {
160                write!(fmt, "RecvError::Decoder({e:?})")
161            }
162            RecvError::PeerGone(ref e) => {
163                write!(fmt, "RecvError::PeerGone({e:?})")
164            }
165        }
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use ntex_codec::BytesCodec;
173    use std::io;
174
175    #[test]
176    fn test_fmt() {
177        assert!(format!("{:?}", IoStatusUpdate::KeepAlive).contains("KeepAlive"));
178        assert!(format!("{:?}", RecvError::<BytesCodec>::KeepAlive).contains("KeepAlive"));
179        assert!(
180            format!("{:?}", RecvError::<BytesCodec>::WriteBackpressure)
181                .contains("WriteBackpressure")
182        );
183        assert!(
184            format!(
185                "{:?}",
186                RecvError::<BytesCodec>::Decoder(io::Error::other("err"))
187            )
188            .contains("RecvError::Decoder")
189        );
190        assert!(
191            format!(
192                "{:?}",
193                RecvError::<BytesCodec>::PeerGone(Some(io::Error::other("err")))
194            )
195            .contains("RecvError::PeerGone")
196        );
197    }
198}