ntex_io/
filter.rs

1use std::{any, io, task::Context, task::Poll};
2
3use crate::{buf::Stack, FilterLayer, Flags, IoRef, Readiness};
4
5#[derive(Debug)]
6/// Default `Io` filter
7pub struct Base(IoRef);
8
9impl Base {
10    pub(crate) fn new(inner: IoRef) -> Self {
11        Base(inner)
12    }
13}
14
15#[derive(Debug)]
16pub struct Layer<F, L = Base>(pub(crate) F, L);
17
18impl<F: FilterLayer, L: Filter> Layer<F, L> {
19    pub(crate) fn new(f: F, l: L) -> Self {
20        Self(f, l)
21    }
22}
23
24pub(crate) struct NullFilter;
25
26const NULL: NullFilter = NullFilter;
27
28impl NullFilter {
29    pub(super) const fn get() -> &'static dyn Filter {
30        &NULL
31    }
32}
33
34impl Readiness {
35    fn merge(res1: Poll<Readiness>, res2: Poll<Readiness>) -> Poll<Readiness> {
36        match res1 {
37            Poll::Pending => Poll::Pending,
38            Poll::Ready(Readiness::Ready) => res2,
39            Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
40            Poll::Ready(Readiness::Shutdown) => {
41                if res2 == Poll::Ready(Readiness::Terminate) {
42                    Poll::Ready(Readiness::Terminate)
43                } else {
44                    Poll::Ready(Readiness::Shutdown)
45                }
46            }
47        }
48    }
49}
50
51#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
52pub struct FilterReadStatus {
53    pub nbytes: usize,
54    pub need_write: bool,
55}
56
57pub trait Filter: 'static {
58    fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>>;
59
60    fn process_read_buf(
61        &self,
62        io: &IoRef,
63        stack: &Stack,
64        idx: usize,
65        nbytes: usize,
66    ) -> io::Result<FilterReadStatus>;
67
68    /// Process write buffer
69    fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()>;
70
71    /// Gracefully shutdown filter
72    fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>>;
73
74    /// Check readiness for read operations
75    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness>;
76
77    /// Check readiness for write operations
78    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness>;
79}
80
81impl Filter for Base {
82    fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
83        if let Some(hnd) = self.0 .0.handle.take() {
84            let res = hnd.query(id);
85            self.0 .0.handle.set(Some(hnd));
86            res
87        } else {
88            None
89        }
90    }
91
92    #[inline]
93    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
94        let flags = self.0.flags();
95
96        if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
97            Poll::Ready(Readiness::Terminate)
98        } else {
99            self.0 .0.read_task.register(cx.waker());
100
101            if flags.intersects(Flags::IO_STOPPING_FILTERS) {
102                Poll::Ready(Readiness::Ready)
103            } else if flags.cannot_read() {
104                Poll::Pending
105            } else {
106                Poll::Ready(Readiness::Ready)
107            }
108        }
109    }
110
111    #[inline]
112    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
113        let flags = self.0.flags();
114
115        if flags.is_stopped() {
116            Poll::Ready(Readiness::Terminate)
117        } else {
118            self.0 .0.write_task.register(cx.waker());
119
120            if flags.contains(Flags::IO_STOPPING) {
121                Poll::Ready(Readiness::Shutdown)
122            } else if flags.contains(Flags::WR_PAUSED) {
123                Poll::Pending
124            } else {
125                Poll::Ready(Readiness::Ready)
126            }
127        }
128    }
129
130    #[inline]
131    fn process_read_buf(
132        &self,
133        _: &IoRef,
134        _: &Stack,
135        _: usize,
136        nbytes: usize,
137    ) -> io::Result<FilterReadStatus> {
138        Ok(FilterReadStatus {
139            nbytes,
140            need_write: false,
141        })
142    }
143
144    #[inline]
145    fn process_write_buf(&self, io: &IoRef, s: &Stack, _: usize) -> io::Result<()> {
146        s.with_write_destination(io, |buf| {
147            if let Some(buf) = buf {
148                let len = buf.len();
149                let flags = self.0.flags();
150                if len > 0 && flags.contains(Flags::WR_PAUSED) {
151                    self.0 .0.remove_flags(Flags::WR_PAUSED);
152                    self.0 .0.write_task.wake();
153                }
154                if len >= self.0.memory_pool().write_params_high()
155                    && !flags.contains(Flags::BUF_W_BACKPRESSURE)
156                {
157                    self.0 .0.insert_flags(Flags::BUF_W_BACKPRESSURE);
158                    self.0 .0.dispatch_task.wake();
159                }
160            }
161        });
162        Ok(())
163    }
164
165    #[inline]
166    fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
167        Ok(Poll::Ready(()))
168    }
169}
170
171impl<F, L> Filter for Layer<F, L>
172where
173    F: FilterLayer,
174    L: Filter,
175{
176    #[inline]
177    fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
178        self.0.query(id).or_else(|| self.1.query(id))
179    }
180
181    #[inline]
182    fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>> {
183        let result1 = stack.write_buf(io, idx, |buf| self.0.shutdown(buf))?;
184        self.process_write_buf(io, stack, idx)?;
185
186        let result2 = if F::BUFFERS {
187            self.1.shutdown(io, stack, idx + 1)?
188        } else {
189            self.1.shutdown(io, stack, idx)?
190        };
191
192        if result1.is_pending() || result2.is_pending() {
193            Ok(Poll::Pending)
194        } else {
195            Ok(Poll::Ready(()))
196        }
197    }
198
199    #[inline]
200    fn process_read_buf(
201        &self,
202        io: &IoRef,
203        stack: &Stack,
204        idx: usize,
205        nbytes: usize,
206    ) -> io::Result<FilterReadStatus> {
207        let status = if F::BUFFERS {
208            self.1.process_read_buf(io, stack, idx + 1, nbytes)?
209        } else {
210            self.1.process_read_buf(io, stack, idx, nbytes)?
211        };
212        stack.read_buf(io, idx, status.nbytes, |buf| {
213            self.0.process_read_buf(buf).map(|nbytes| FilterReadStatus {
214                nbytes,
215                need_write: status.need_write || buf.need_write.get(),
216            })
217        })
218    }
219
220    #[inline]
221    fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()> {
222        stack.write_buf(io, idx, |buf| self.0.process_write_buf(buf))?;
223
224        if F::BUFFERS {
225            self.1.process_write_buf(io, stack, idx + 1)
226        } else {
227            self.1.process_write_buf(io, stack, idx)
228        }
229    }
230
231    #[inline]
232    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
233        Readiness::merge(self.0.poll_read_ready(cx), self.1.poll_read_ready(cx))
234    }
235
236    #[inline]
237    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
238        Readiness::merge(self.0.poll_write_ready(cx), self.1.poll_write_ready(cx))
239    }
240}
241
242impl Filter for NullFilter {
243    #[inline]
244    fn query(&self, _: any::TypeId) -> Option<Box<dyn any::Any>> {
245        None
246    }
247
248    #[inline]
249    fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<Readiness> {
250        Poll::Ready(Readiness::Terminate)
251    }
252
253    #[inline]
254    fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<Readiness> {
255        Poll::Ready(Readiness::Terminate)
256    }
257
258    #[inline]
259    fn process_read_buf(
260        &self,
261        _: &IoRef,
262        _: &Stack,
263        _: usize,
264        _: usize,
265    ) -> io::Result<FilterReadStatus> {
266        Ok(Default::default())
267    }
268
269    #[inline]
270    fn process_write_buf(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<()> {
271        Ok(())
272    }
273
274    #[inline]
275    fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
276        Ok(Poll::Ready(()))
277    }
278}