ntex_io/
filter.rs

1use std::{any, io, task::Context, task::Poll};
2
3use crate::{buf::Stack, FilterLayer, Flags, IoRef, ReadStatus, WriteStatus};
4
5#[derive(Debug)]
6/// Default `Io` filter
7pub struct Base(IoRef);
8
9impl Base {
10    pub(crate) fn new(inner: IoRef) -> Self {
11        Base(inner)
12    }
13}
14
15#[derive(Debug)]
16pub struct Layer<F, L = Base>(pub(crate) F, L);
17
18impl<F: FilterLayer, L: Filter> Layer<F, L> {
19    pub(crate) fn new(f: F, l: L) -> Self {
20        Self(f, l)
21    }
22}
23
24pub(crate) struct NullFilter;
25
26const NULL: NullFilter = NullFilter;
27
28impl NullFilter {
29    pub(super) const fn get() -> &'static dyn Filter {
30        &NULL
31    }
32}
33
34#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
35pub struct FilterReadStatus {
36    pub nbytes: usize,
37    pub need_write: bool,
38}
39
40pub trait Filter: 'static {
41    fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>>;
42
43    fn process_read_buf(
44        &self,
45        io: &IoRef,
46        stack: &Stack,
47        idx: usize,
48        nbytes: usize,
49    ) -> io::Result<FilterReadStatus>;
50
51    /// Process write buffer
52    fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()>;
53
54    /// Gracefully shutdown filter
55    fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>>;
56
57    /// Check readiness for read operations
58    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus>;
59
60    /// Check readiness for write operations
61    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus>;
62}
63
64impl Filter for Base {
65    fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
66        if let Some(hnd) = self.0 .0.handle.take() {
67            let res = hnd.query(id);
68            self.0 .0.handle.set(Some(hnd));
69            res
70        } else {
71            None
72        }
73    }
74
75    #[inline]
76    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
77        let flags = self.0.flags();
78
79        if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
80            Poll::Ready(ReadStatus::Terminate)
81        } else {
82            self.0 .0.read_task.register(cx.waker());
83
84            if flags.intersects(Flags::IO_STOPPING_FILTERS) {
85                Poll::Ready(ReadStatus::Ready)
86            } else if flags.cannot_read() {
87                Poll::Pending
88            } else {
89                Poll::Ready(ReadStatus::Ready)
90            }
91        }
92    }
93
94    #[inline]
95    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
96        let flags = self.0.flags();
97
98        if flags.is_stopped() {
99            Poll::Ready(WriteStatus::Terminate)
100        } else {
101            self.0 .0.write_task.register(cx.waker());
102
103            if flags.contains(Flags::IO_STOPPING) {
104                Poll::Ready(WriteStatus::Shutdown)
105            } else if flags.contains(Flags::WR_PAUSED) {
106                Poll::Pending
107            } else {
108                Poll::Ready(WriteStatus::Ready)
109            }
110        }
111    }
112
113    #[inline]
114    fn process_read_buf(
115        &self,
116        _: &IoRef,
117        _: &Stack,
118        _: usize,
119        nbytes: usize,
120    ) -> io::Result<FilterReadStatus> {
121        Ok(FilterReadStatus {
122            nbytes,
123            need_write: false,
124        })
125    }
126
127    #[inline]
128    fn process_write_buf(&self, io: &IoRef, s: &Stack, _: usize) -> io::Result<()> {
129        s.with_write_destination(io, |buf| {
130            if let Some(buf) = buf {
131                let len = buf.len();
132                let flags = self.0.flags();
133                if len > 0 && flags.contains(Flags::WR_PAUSED) {
134                    self.0 .0.remove_flags(Flags::WR_PAUSED);
135                    self.0 .0.write_task.wake();
136                }
137                if len >= self.0.memory_pool().write_params_high()
138                    && !flags.contains(Flags::BUF_W_BACKPRESSURE)
139                {
140                    self.0 .0.insert_flags(Flags::BUF_W_BACKPRESSURE);
141                    self.0 .0.dispatch_task.wake();
142                }
143            }
144        });
145        Ok(())
146    }
147
148    #[inline]
149    fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
150        Ok(Poll::Ready(()))
151    }
152}
153
154impl<F, L> Filter for Layer<F, L>
155where
156    F: FilterLayer,
157    L: Filter,
158{
159    #[inline]
160    fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
161        self.0.query(id).or_else(|| self.1.query(id))
162    }
163
164    #[inline]
165    fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>> {
166        let result1 = stack.write_buf(io, idx, |buf| self.0.shutdown(buf))?;
167        self.process_write_buf(io, stack, idx)?;
168
169        let result2 = if F::BUFFERS {
170            self.1.shutdown(io, stack, idx + 1)?
171        } else {
172            self.1.shutdown(io, stack, idx)?
173        };
174
175        if result1.is_pending() || result2.is_pending() {
176            Ok(Poll::Pending)
177        } else {
178            Ok(Poll::Ready(()))
179        }
180    }
181
182    #[inline]
183    fn process_read_buf(
184        &self,
185        io: &IoRef,
186        stack: &Stack,
187        idx: usize,
188        nbytes: usize,
189    ) -> io::Result<FilterReadStatus> {
190        let status = if F::BUFFERS {
191            self.1.process_read_buf(io, stack, idx + 1, nbytes)?
192        } else {
193            self.1.process_read_buf(io, stack, idx, nbytes)?
194        };
195        stack.read_buf(io, idx, status.nbytes, |buf| {
196            self.0.process_read_buf(buf).map(|nbytes| FilterReadStatus {
197                nbytes,
198                need_write: status.need_write || buf.need_write.get(),
199            })
200        })
201    }
202
203    #[inline]
204    fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()> {
205        stack.write_buf(io, idx, |buf| self.0.process_write_buf(buf))?;
206
207        if F::BUFFERS {
208            self.1.process_write_buf(io, stack, idx + 1)
209        } else {
210            self.1.process_write_buf(io, stack, idx)
211        }
212    }
213
214    #[inline]
215    fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
216        let res1 = self.0.poll_read_ready(cx);
217        let res2 = self.1.poll_read_ready(cx);
218
219        match res1 {
220            Poll::Pending => Poll::Pending,
221            Poll::Ready(ReadStatus::Ready) => res2,
222            Poll::Ready(ReadStatus::Terminate) => Poll::Ready(ReadStatus::Terminate),
223        }
224    }
225
226    #[inline]
227    fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
228        let res1 = self.0.poll_write_ready(cx);
229        let res2 = self.1.poll_write_ready(cx);
230
231        match res1 {
232            Poll::Pending => Poll::Pending,
233            Poll::Ready(WriteStatus::Ready) => res2,
234            Poll::Ready(WriteStatus::Terminate) => Poll::Ready(WriteStatus::Terminate),
235            Poll::Ready(WriteStatus::Shutdown) => {
236                if res2 == Poll::Ready(WriteStatus::Terminate) {
237                    Poll::Ready(WriteStatus::Terminate)
238                } else {
239                    Poll::Ready(WriteStatus::Shutdown)
240                }
241            }
242        }
243    }
244}
245
246impl Filter for NullFilter {
247    #[inline]
248    fn query(&self, _: any::TypeId) -> Option<Box<dyn any::Any>> {
249        None
250    }
251
252    #[inline]
253    fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<ReadStatus> {
254        Poll::Ready(ReadStatus::Terminate)
255    }
256
257    #[inline]
258    fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<WriteStatus> {
259        Poll::Ready(WriteStatus::Terminate)
260    }
261
262    #[inline]
263    fn process_read_buf(
264        &self,
265        _: &IoRef,
266        _: &Stack,
267        _: usize,
268        _: usize,
269    ) -> io::Result<FilterReadStatus> {
270        Ok(Default::default())
271    }
272
273    #[inline]
274    fn process_write_buf(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<()> {
275        Ok(())
276    }
277
278    #[inline]
279    fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
280        Ok(Poll::Ready(()))
281    }
282}