1use std::{any, io, task::Context, task::Poll};
2
3use crate::{buf::Stack, FilterLayer, Flags, IoRef, ReadStatus, WriteStatus};
4
5#[derive(Debug)]
6pub struct Base(IoRef);
8
9impl Base {
10 pub(crate) fn new(inner: IoRef) -> Self {
11 Base(inner)
12 }
13}
14
15#[derive(Debug)]
16pub struct Layer<F, L = Base>(pub(crate) F, L);
17
18impl<F: FilterLayer, L: Filter> Layer<F, L> {
19 pub(crate) fn new(f: F, l: L) -> Self {
20 Self(f, l)
21 }
22}
23
24pub(crate) struct NullFilter;
25
26const NULL: NullFilter = NullFilter;
27
28impl NullFilter {
29 pub(super) const fn get() -> &'static dyn Filter {
30 &NULL
31 }
32}
33
34#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
35pub struct FilterReadStatus {
36 pub nbytes: usize,
37 pub need_write: bool,
38}
39
40pub trait Filter: 'static {
41 fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>>;
42
43 fn process_read_buf(
44 &self,
45 io: &IoRef,
46 stack: &Stack,
47 idx: usize,
48 nbytes: usize,
49 ) -> io::Result<FilterReadStatus>;
50
51 fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()>;
53
54 fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>>;
56
57 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus>;
59
60 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus>;
62}
63
64impl Filter for Base {
65 fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
66 if let Some(hnd) = self.0 .0.handle.take() {
67 let res = hnd.query(id);
68 self.0 .0.handle.set(Some(hnd));
69 res
70 } else {
71 None
72 }
73 }
74
75 #[inline]
76 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
77 let flags = self.0.flags();
78
79 if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
80 Poll::Ready(ReadStatus::Terminate)
81 } else {
82 self.0 .0.read_task.register(cx.waker());
83
84 if flags.intersects(Flags::IO_STOPPING_FILTERS) {
85 Poll::Ready(ReadStatus::Ready)
86 } else if flags.cannot_read() {
87 Poll::Pending
88 } else {
89 Poll::Ready(ReadStatus::Ready)
90 }
91 }
92 }
93
94 #[inline]
95 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
96 let flags = self.0.flags();
97
98 if flags.is_stopped() {
99 Poll::Ready(WriteStatus::Terminate)
100 } else {
101 self.0 .0.write_task.register(cx.waker());
102
103 if flags.contains(Flags::IO_STOPPING) {
104 Poll::Ready(WriteStatus::Shutdown)
105 } else if flags.contains(Flags::WR_PAUSED) {
106 Poll::Pending
107 } else {
108 Poll::Ready(WriteStatus::Ready)
109 }
110 }
111 }
112
113 #[inline]
114 fn process_read_buf(
115 &self,
116 _: &IoRef,
117 _: &Stack,
118 _: usize,
119 nbytes: usize,
120 ) -> io::Result<FilterReadStatus> {
121 Ok(FilterReadStatus {
122 nbytes,
123 need_write: false,
124 })
125 }
126
127 #[inline]
128 fn process_write_buf(&self, io: &IoRef, s: &Stack, _: usize) -> io::Result<()> {
129 s.with_write_destination(io, |buf| {
130 if let Some(buf) = buf {
131 let len = buf.len();
132 let flags = self.0.flags();
133 if len > 0 && flags.contains(Flags::WR_PAUSED) {
134 self.0 .0.remove_flags(Flags::WR_PAUSED);
135 self.0 .0.write_task.wake();
136 }
137 if len >= self.0.memory_pool().write_params_high()
138 && !flags.contains(Flags::BUF_W_BACKPRESSURE)
139 {
140 self.0 .0.insert_flags(Flags::BUF_W_BACKPRESSURE);
141 self.0 .0.dispatch_task.wake();
142 }
143 }
144 });
145 Ok(())
146 }
147
148 #[inline]
149 fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
150 Ok(Poll::Ready(()))
151 }
152}
153
154impl<F, L> Filter for Layer<F, L>
155where
156 F: FilterLayer,
157 L: Filter,
158{
159 #[inline]
160 fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
161 self.0.query(id).or_else(|| self.1.query(id))
162 }
163
164 #[inline]
165 fn shutdown(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<Poll<()>> {
166 let result1 = stack.write_buf(io, idx, |buf| self.0.shutdown(buf))?;
167 self.process_write_buf(io, stack, idx)?;
168
169 let result2 = if F::BUFFERS {
170 self.1.shutdown(io, stack, idx + 1)?
171 } else {
172 self.1.shutdown(io, stack, idx)?
173 };
174
175 if result1.is_pending() || result2.is_pending() {
176 Ok(Poll::Pending)
177 } else {
178 Ok(Poll::Ready(()))
179 }
180 }
181
182 #[inline]
183 fn process_read_buf(
184 &self,
185 io: &IoRef,
186 stack: &Stack,
187 idx: usize,
188 nbytes: usize,
189 ) -> io::Result<FilterReadStatus> {
190 let status = if F::BUFFERS {
191 self.1.process_read_buf(io, stack, idx + 1, nbytes)?
192 } else {
193 self.1.process_read_buf(io, stack, idx, nbytes)?
194 };
195 stack.read_buf(io, idx, status.nbytes, |buf| {
196 self.0.process_read_buf(buf).map(|nbytes| FilterReadStatus {
197 nbytes,
198 need_write: status.need_write || buf.need_write.get(),
199 })
200 })
201 }
202
203 #[inline]
204 fn process_write_buf(&self, io: &IoRef, stack: &Stack, idx: usize) -> io::Result<()> {
205 stack.write_buf(io, idx, |buf| self.0.process_write_buf(buf))?;
206
207 if F::BUFFERS {
208 self.1.process_write_buf(io, stack, idx + 1)
209 } else {
210 self.1.process_write_buf(io, stack, idx)
211 }
212 }
213
214 #[inline]
215 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
216 let res1 = self.0.poll_read_ready(cx);
217 let res2 = self.1.poll_read_ready(cx);
218
219 match res1 {
220 Poll::Pending => Poll::Pending,
221 Poll::Ready(ReadStatus::Ready) => res2,
222 Poll::Ready(ReadStatus::Terminate) => Poll::Ready(ReadStatus::Terminate),
223 }
224 }
225
226 #[inline]
227 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
228 let res1 = self.0.poll_write_ready(cx);
229 let res2 = self.1.poll_write_ready(cx);
230
231 match res1 {
232 Poll::Pending => Poll::Pending,
233 Poll::Ready(WriteStatus::Ready) => res2,
234 Poll::Ready(WriteStatus::Terminate) => Poll::Ready(WriteStatus::Terminate),
235 Poll::Ready(WriteStatus::Shutdown) => {
236 if res2 == Poll::Ready(WriteStatus::Terminate) {
237 Poll::Ready(WriteStatus::Terminate)
238 } else {
239 Poll::Ready(WriteStatus::Shutdown)
240 }
241 }
242 }
243 }
244}
245
246impl Filter for NullFilter {
247 #[inline]
248 fn query(&self, _: any::TypeId) -> Option<Box<dyn any::Any>> {
249 None
250 }
251
252 #[inline]
253 fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<ReadStatus> {
254 Poll::Ready(ReadStatus::Terminate)
255 }
256
257 #[inline]
258 fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<WriteStatus> {
259 Poll::Ready(WriteStatus::Terminate)
260 }
261
262 #[inline]
263 fn process_read_buf(
264 &self,
265 _: &IoRef,
266 _: &Stack,
267 _: usize,
268 _: usize,
269 ) -> io::Result<FilterReadStatus> {
270 Ok(Default::default())
271 }
272
273 #[inline]
274 fn process_write_buf(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<()> {
275 Ok(())
276 }
277
278 #[inline]
279 fn shutdown(&self, _: &IoRef, _: &Stack, _: usize) -> io::Result<Poll<()>> {
280 Ok(Poll::Ready(()))
281 }
282}