ntex_io/
filter.rs

1use std::{any, io, task::Context, task::Poll};
2
3use crate::{FilterCtx, FilterLayer, Flags, IoRef, Readiness};
4
5#[derive(Debug)]
6/// Default `Io` filter
7pub struct Base(IoRef);
8
9impl Base {
10    pub(crate) fn new(inner: IoRef) -> Self {
11        Base(inner)
12    }
13}
14
15#[derive(Debug)]
16pub struct Layer<F, L = Base>(pub(crate) F, L);
17
18impl<F: FilterLayer, L: Filter> Layer<F, L> {
19    pub(crate) fn new(f: F, l: L) -> Self {
20        Self(f, l)
21    }
22}
23
24pub(crate) struct NullFilter;
25
26const NULL: NullFilter = NullFilter;
27
28impl NullFilter {
29    pub(super) const fn get() -> &'static dyn Filter {
30        &NULL
31    }
32}
33
34#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
35pub struct FilterReadStatus {
36    pub nbytes: usize,
37    pub need_write: bool,
38}
39
40pub trait Filter: 'static {
41    fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>>;
42
43    fn process_read_buf(
44        &self,
45        ctx: FilterCtx<'_>,
46        nbytes: usize,
47    ) -> io::Result<FilterReadStatus>;
48
49    /// Process write buffer
50    fn process_write_buf(&self, ctx: FilterCtx<'_>) -> io::Result<()>;
51
52    /// Gracefully shutdown filter
53    fn shutdown(&self, ctx: FilterCtx<'_>) -> io::Result<Poll<()>>;
54
55    /// Check readiness for read operations
56    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness>;
57
58    /// Check readiness for write operations
59    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness>;
60}
61
62impl Filter for Base {
63    fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
64        if let Some(hnd) = self.0 .0.handle.take() {
65            let res = hnd.query(id);
66            self.0 .0.handle.set(Some(hnd));
67            res
68        } else {
69            None
70        }
71    }
72
73    #[inline]
74    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
75        let flags = self.0.flags();
76
77        if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
78            Poll::Ready(Readiness::Terminate)
79        } else {
80            self.0 .0.read_task.register(cx.waker());
81
82            if flags.intersects(Flags::IO_STOPPING_FILTERS) {
83                Poll::Ready(Readiness::Ready)
84            } else if flags.cannot_read() {
85                Poll::Pending
86            } else {
87                Poll::Ready(Readiness::Ready)
88            }
89        }
90    }
91
92    #[inline]
93    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
94        let flags = self.0.flags();
95
96        if flags.is_stopped() {
97            Poll::Ready(Readiness::Terminate)
98        } else {
99            self.0 .0.write_task.register(cx.waker());
100
101            if flags.contains(Flags::IO_STOPPING) {
102                Poll::Ready(Readiness::Shutdown)
103            } else if flags.contains(Flags::WR_PAUSED) {
104                Poll::Pending
105            } else {
106                Poll::Ready(Readiness::Ready)
107            }
108        }
109    }
110
111    #[inline]
112    fn process_read_buf(
113        &self,
114        _: FilterCtx<'_>,
115        nbytes: usize,
116    ) -> io::Result<FilterReadStatus> {
117        Ok(FilterReadStatus {
118            nbytes,
119            need_write: false,
120        })
121    }
122
123    #[inline]
124    fn process_write_buf(&self, ctx: FilterCtx<'_>) -> io::Result<()> {
125        ctx.stack.with_write_destination(ctx.io, |buf| {
126            if let Some(buf) = buf {
127                let len = buf.len();
128                let flags = self.0.flags();
129                if len > 0 && flags.contains(Flags::WR_PAUSED) {
130                    self.0 .0.remove_flags(Flags::WR_PAUSED);
131                    self.0 .0.write_task.wake();
132                }
133                if len >= self.0.memory_pool().write_params_high()
134                    && !flags.contains(Flags::BUF_W_BACKPRESSURE)
135                {
136                    self.0 .0.insert_flags(Flags::BUF_W_BACKPRESSURE);
137                    self.0 .0.dispatch_task.wake();
138                }
139            }
140        });
141        Ok(())
142    }
143
144    #[inline]
145    fn shutdown(&self, _: FilterCtx<'_>) -> io::Result<Poll<()>> {
146        Ok(Poll::Ready(()))
147    }
148}
149
150impl<F, L> Filter for Layer<F, L>
151where
152    F: FilterLayer,
153    L: Filter,
154{
155    #[inline]
156    fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
157        self.0.query(id).or_else(|| self.1.query(id))
158    }
159
160    #[inline]
161    fn shutdown(&self, ctx: FilterCtx<'_>) -> io::Result<Poll<()>> {
162        let result1 = ctx.write_buf(|buf| self.0.shutdown(buf))?;
163        self.process_write_buf(ctx)?;
164        let result2 = self.1.shutdown(ctx.next())?;
165
166        if result1.is_pending() || result2.is_pending() {
167            Ok(Poll::Pending)
168        } else {
169            Ok(Poll::Ready(()))
170        }
171    }
172
173    #[inline]
174    fn process_read_buf(
175        &self,
176        ctx: FilterCtx<'_>,
177        nbytes: usize,
178    ) -> io::Result<FilterReadStatus> {
179        let status = self.1.process_read_buf(ctx.next(), nbytes)?;
180        ctx.read_buf(status.nbytes, |buf| {
181            self.0.process_read_buf(buf).map(|nbytes| FilterReadStatus {
182                nbytes,
183                need_write: status.need_write || buf.need_write.get(),
184            })
185        })
186    }
187
188    #[inline]
189    fn process_write_buf(&self, ctx: FilterCtx<'_>) -> io::Result<()> {
190        ctx.write_buf(|buf| self.0.process_write_buf(buf))?;
191        self.1.process_write_buf(ctx.next())
192    }
193
194    #[inline]
195    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
196        Readiness::merge(self.0.poll_read_ready(cx), self.1.poll_read_ready(cx))
197    }
198
199    #[inline]
200    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
201        Readiness::merge(self.0.poll_write_ready(cx), self.1.poll_write_ready(cx))
202    }
203}
204
205impl Filter for NullFilter {
206    #[inline]
207    fn query(&self, _: any::TypeId) -> Option<Box<dyn any::Any>> {
208        None
209    }
210
211    #[inline]
212    fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<Readiness> {
213        Poll::Ready(Readiness::Terminate)
214    }
215
216    #[inline]
217    fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<Readiness> {
218        Poll::Ready(Readiness::Terminate)
219    }
220
221    #[inline]
222    fn process_read_buf(&self, _: FilterCtx<'_>, _: usize) -> io::Result<FilterReadStatus> {
223        Ok(Default::default())
224    }
225
226    #[inline]
227    fn process_write_buf(&self, _: FilterCtx<'_>) -> io::Result<()> {
228        Ok(())
229    }
230
231    #[inline]
232    fn shutdown(&self, _: FilterCtx<'_>) -> io::Result<Poll<()>> {
233        Ok(Poll::Ready(()))
234    }
235}