1use std::{any, io, task::Context, task::Poll};
2
3use crate::{FilterCtx, FilterLayer, Flags, IoRef, Readiness};
4
5#[derive(Debug)]
6pub struct Base(IoRef);
8
9impl Base {
10 pub(crate) fn new(inner: IoRef) -> Self {
11 Base(inner)
12 }
13}
14
15#[derive(Debug)]
16pub struct Layer<F, L = Base>(pub(crate) F, L);
17
18impl<F: FilterLayer, L: Filter> Layer<F, L> {
19 pub(crate) fn new(f: F, l: L) -> Self {
20 Self(f, l)
21 }
22}
23
24pub(crate) struct NullFilter;
25
26const NULL: NullFilter = NullFilter;
27
28impl NullFilter {
29 pub(super) const fn get() -> &'static dyn Filter {
30 &NULL
31 }
32}
33
34#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
35pub struct FilterReadStatus {
36 pub nbytes: usize,
37 pub need_write: bool,
38}
39
40pub trait Filter: 'static {
41 fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>>;
42
43 fn process_read_buf(
44 &self,
45 ctx: FilterCtx<'_>,
46 nbytes: usize,
47 ) -> io::Result<FilterReadStatus>;
48
49 fn process_write_buf(&self, ctx: FilterCtx<'_>) -> io::Result<()>;
51
52 fn shutdown(&self, ctx: FilterCtx<'_>) -> io::Result<Poll<()>>;
54
55 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness>;
57
58 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness>;
60}
61
62impl Filter for Base {
63 fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
64 if let Some(hnd) = self.0 .0.handle.take() {
65 let res = hnd.query(id);
66 self.0 .0.handle.set(Some(hnd));
67 res
68 } else {
69 None
70 }
71 }
72
73 #[inline]
74 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
75 let flags = self.0.flags();
76
77 if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
78 Poll::Ready(Readiness::Terminate)
79 } else {
80 self.0 .0.read_task.register(cx.waker());
81
82 if flags.intersects(Flags::IO_STOPPING_FILTERS) {
83 Poll::Ready(Readiness::Ready)
84 } else if flags.cannot_read() {
85 Poll::Pending
86 } else {
87 Poll::Ready(Readiness::Ready)
88 }
89 }
90 }
91
92 #[inline]
93 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
94 let flags = self.0.flags();
95
96 if flags.is_stopped() {
97 Poll::Ready(Readiness::Terminate)
98 } else {
99 self.0 .0.write_task.register(cx.waker());
100
101 if flags.contains(Flags::IO_STOPPING) {
102 Poll::Ready(Readiness::Shutdown)
103 } else if flags.contains(Flags::WR_PAUSED) {
104 Poll::Pending
105 } else {
106 Poll::Ready(Readiness::Ready)
107 }
108 }
109 }
110
111 #[inline]
112 fn process_read_buf(
113 &self,
114 _: FilterCtx<'_>,
115 nbytes: usize,
116 ) -> io::Result<FilterReadStatus> {
117 Ok(FilterReadStatus {
118 nbytes,
119 need_write: false,
120 })
121 }
122
123 #[inline]
124 fn process_write_buf(&self, ctx: FilterCtx<'_>) -> io::Result<()> {
125 ctx.stack.with_write_destination(ctx.io, |buf| {
126 if let Some(buf) = buf {
127 let len = buf.len();
128 let flags = self.0.flags();
129 if len > 0 && flags.contains(Flags::WR_PAUSED) {
130 self.0 .0.remove_flags(Flags::WR_PAUSED);
131 self.0 .0.write_task.wake();
132 }
133 if len >= self.0.memory_pool().write_params_high()
134 && !flags.contains(Flags::BUF_W_BACKPRESSURE)
135 {
136 self.0 .0.insert_flags(Flags::BUF_W_BACKPRESSURE);
137 self.0 .0.dispatch_task.wake();
138 }
139 }
140 });
141 Ok(())
142 }
143
144 #[inline]
145 fn shutdown(&self, _: FilterCtx<'_>) -> io::Result<Poll<()>> {
146 Ok(Poll::Ready(()))
147 }
148}
149
150impl<F, L> Filter for Layer<F, L>
151where
152 F: FilterLayer,
153 L: Filter,
154{
155 #[inline]
156 fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
157 self.0.query(id).or_else(|| self.1.query(id))
158 }
159
160 #[inline]
161 fn shutdown(&self, ctx: FilterCtx<'_>) -> io::Result<Poll<()>> {
162 let result1 = ctx.write_buf(|buf| self.0.shutdown(buf))?;
163 self.process_write_buf(ctx)?;
164 let result2 = self.1.shutdown(ctx.next())?;
165
166 if result1.is_pending() || result2.is_pending() {
167 Ok(Poll::Pending)
168 } else {
169 Ok(Poll::Ready(()))
170 }
171 }
172
173 #[inline]
174 fn process_read_buf(
175 &self,
176 ctx: FilterCtx<'_>,
177 nbytes: usize,
178 ) -> io::Result<FilterReadStatus> {
179 let status = self.1.process_read_buf(ctx.next(), nbytes)?;
180 ctx.read_buf(status.nbytes, |buf| {
181 self.0.process_read_buf(buf).map(|nbytes| FilterReadStatus {
182 nbytes,
183 need_write: status.need_write || buf.need_write.get(),
184 })
185 })
186 }
187
188 #[inline]
189 fn process_write_buf(&self, ctx: FilterCtx<'_>) -> io::Result<()> {
190 ctx.write_buf(|buf| self.0.process_write_buf(buf))?;
191 self.1.process_write_buf(ctx.next())
192 }
193
194 #[inline]
195 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
196 Readiness::merge(self.0.poll_read_ready(cx), self.1.poll_read_ready(cx))
197 }
198
199 #[inline]
200 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Readiness> {
201 Readiness::merge(self.0.poll_write_ready(cx), self.1.poll_write_ready(cx))
202 }
203}
204
205impl Filter for NullFilter {
206 #[inline]
207 fn query(&self, _: any::TypeId) -> Option<Box<dyn any::Any>> {
208 None
209 }
210
211 #[inline]
212 fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<Readiness> {
213 Poll::Ready(Readiness::Terminate)
214 }
215
216 #[inline]
217 fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<Readiness> {
218 Poll::Ready(Readiness::Terminate)
219 }
220
221 #[inline]
222 fn process_read_buf(&self, _: FilterCtx<'_>, _: usize) -> io::Result<FilterReadStatus> {
223 Ok(Default::default())
224 }
225
226 #[inline]
227 fn process_write_buf(&self, _: FilterCtx<'_>) -> io::Result<()> {
228 Ok(())
229 }
230
231 #[inline]
232 fn shutdown(&self, _: FilterCtx<'_>) -> io::Result<Poll<()>> {
233 Ok(Poll::Ready(()))
234 }
235}