1use std::{any, io, task::Context, task::Poll};
2
3use crate::{buf::Stack, FilterLayer, Flags, IoRef, Readiness};
4
5#[derive(Debug)]
6pub struct Base(IoRef);
8
9impl Base {
10 pub(crate) fn new(inner: IoRef) -> Self {
11 Base(inner)
12 }
13}
14
15#[derive(Debug)]
16pub struct Layer<F, L = Base>(pub(crate) F, L);
17
18impl<F: FilterLayer, L: Filter> Layer<F, L> {
19 pub(crate) fn new(f: F, l: L) -> Self {
20 Self(f, l)
21 }
22}
23
24pub(crate) struct NullFilter;
25
26const NULL: NullFilter = NullFilter;
27
28impl NullFilter {
29 pub(super) const fn get() -> &'static dyn Filter {
30 &NULL
31 }
32}
33
34impl Readiness {
35 fn merge(res1: Poll<Readiness>, res2: Poll<Readiness>) -> Poll<Readiness> {
36 match res1 {
37 Poll::Pending => Poll::Pending,
38 Poll::Ready(Readiness::Ready) => res2,
39 Poll::Ready(Readiness::Terminate) => Poll::Ready(Readiness::Terminate),
40 Poll::Ready(Readiness::Shutdown) => {
41 if res2 == Poll::Ready(Readiness::Terminate) {
42 Poll::Ready(Readiness::Terminate)
43 } else {
44 Poll::Ready(Readiness::Shutdown)
45 }
46 }
47 }
48 }
49}
50
51#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
52pub struct FilterReadStatus {
53 pub nbytes: usize,
54 pub need_write: bool,
55}
56
57pub trait Filter: 'static {
58 fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>>;
59
60 fn process_read_buf(
61 &self,
62 io: &IoRef,
63 stack: &Stack,
64 idx: usize,
65 nbytes: usize,
66 ) -> io::Result<FilterReadStatus>;
67
68 fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()>;
70
71 fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>>;
73
74 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness>;
76
77 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness>;
79}
80
81impl Filter for Base {
82 fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
83 if let Some(hnd) = self.0 .0.handle.take() {
84 let res = hnd.query(id);
85 self.0 .0.handle.set(Some(hnd));
86 res
87 } else {
88 None
89 }
90 }
91
92 #[inline]
93 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
94 let flags = self.0.flags();
95
96 if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
97 Poll::Ready(Readiness::Terminate)
98 } else {
99 self.0 .0.read_task.register(cx.waker());
100
101 if flags.intersects(Flags::IO_STOPPING_FILTERS) {
102 Poll::Ready(Readiness::Ready)
103 } else if flags.cannot_read() {
104 Poll::Pending
105 } else {
106 Poll::Ready(Readiness::Ready)
107 }
108 }
109 }
110
111 #[inline]
112 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
113 let flags = self.0.flags();
114
115 if flags.is_stopped() {
116 Poll::Ready(Readiness::Terminate)
117 } else {
118 self.0 .0.write_task.register(cx.waker());
119
120 if flags.contains(Flags::IO_STOPPING) {
121 Poll::Ready(Readiness::Shutdown)
122 } else if flags.contains(Flags::WR_PAUSED) {
123 Poll::Pending
124 } else {
125 Poll::Ready(Readiness::Ready)
126 }
127 }
128 }
129
130 #[inline]
131 fn process_read_buf(
132 &self,
133 _: &IoRef,
134 _: &Stack,
135 _: usize,
136 nbytes: usize,
137 ) -> io::Result<FilterReadStatus> {
138 Ok(FilterReadStatus {
139 nbytes,
140 need_write: false,
141 })
142 }
143
144 #[inline]
145 fn process_write_buf(&self, io: &IoRef, s: &Stack, _: usize) -> io::Result<()> {
146 s.with_write_destination(io, |buf| {
147 if let Some(buf) = buf {
148 let len = buf.len();
149 let flags = self.0.flags();
150 if len > 0 && flags.contains(Flags::WR_PAUSED) {
151 self.0 .0.remove_flags(Flags::WR_PAUSED);
152 self.0 .0.write_task.wake();
153 }
154 if len >= self.0.memory_pool().write_params_high()
155 && !flags.contains(Flags::BUF_W_BACKPRESSURE)
156 {
157 self.0 .0.insert_flags(Flags::BUF_W_BACKPRESSURE);
158 self.0 .0.dispatch_task.wake();
159 }
160 }
161 });
162 Ok(())
163 }
164
165 #[inline]
166 fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
167 Ok(Poll::Ready(()))
168 }
169}
170
171impl<F, L> Filter for Layer<F, L>
172where
173 F: FilterLayer,
174 L: Filter,
175{
176 #[inline]
177 fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
178 self.0.query(id).or_else(|| self.1.query(id))
179 }
180
181 #[inline]
182 fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>> {
183 let result1 = stack.write_buf(io, idx, |buf| self.0.shutdown(buf))?;
184 self.process_write_buf(io, stack, idx)?;
185
186 let result2 = if F::BUFFERS {
187 self.1.shutdown(io, stack, idx + 1)?
188 } else {
189 self.1.shutdown(io, stack, idx)?
190 };
191
192 if result1.is_pending() || result2.is_pending() {
193 Ok(Poll::Pending)
194 } else {
195 Ok(Poll::Ready(()))
196 }
197 }
198
199 #[inline]
200 fn process_read_buf(
201 &self,
202 io: &IoRef,
203 stack: &Stack,
204 idx: usize,
205 nbytes: usize,
206 ) -> io::Result<FilterReadStatus> {
207 let status = if F::BUFFERS {
208 self.1.process_read_buf(io, stack, idx + 1, nbytes)?
209 } else {
210 self.1.process_read_buf(io, stack, idx, nbytes)?
211 };
212 stack.read_buf(io, idx, status.nbytes, |buf| {
213 self.0.process_read_buf(buf).map(|nbytes| FilterReadStatus {
214 nbytes,
215 need_write: status.need_write || buf.need_write.get(),
216 })
217 })
218 }
219
220 #[inline]
221 fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()> {
222 stack.write_buf(io, idx, |buf| self.0.process_write_buf(buf))?;
223
224 if F::BUFFERS {
225 self.1.process_write_buf(io, stack, idx + 1)
226 } else {
227 self.1.process_write_buf(io, stack, idx)
228 }
229 }
230
231 #[inline]
232 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
233 Readiness::merge(self.0.poll_read_ready(cx), self.1.poll_read_ready(cx))
234 }
235
236 #[inline]
237 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
238 Readiness::merge(self.0.poll_write_ready(cx), self.1.poll_write_ready(cx))
239 }
240}
241
242impl Filter for NullFilter {
243 #[inline]
244 fn query(&self, _: any::TypeId) -> Option<Box<dyn any::Any>> {
245 None
246 }
247
248 #[inline]
249 fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<Readiness> {
250 Poll::Ready(Readiness::Terminate)
251 }
252
253 #[inline]
254 fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<Readiness> {
255 Poll::Ready(Readiness::Terminate)
256 }
257
258 #[inline]
259 fn process_read_buf(
260 &self,
261 _: &IoRef,
262 _: &Stack,
263 _: usize,
264 _: usize,
265 ) -> io::Result<FilterReadStatus> {
266 Ok(Default::default())
267 }
268
269 #[inline]
270 fn process_write_buf(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<()> {
271 Ok(())
272 }
273
274 #[inline]
275 fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
276 Ok(Poll::Ready(()))
277 }
278}