1use std::{cell::Cell, fmt, io, task::Poll};
2
3use ntex_bytes::{BytePageSize, BytePages, BytesMut};
4
5use crate::{IoConfig, IoRef};
6
7pub(crate) struct Stack {
8 buffers: Vec<Buffer>,
9}
10
11#[derive(Default)]
12struct Buffer {
13 read: Cell<Option<BytesMut>>,
14 write: Cell<Option<BytePages>>,
15}
16
17impl fmt::Debug for Stack {
18 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19 f.debug_struct("Stack")
20 .field("len", &self.buffers.len())
21 .finish()
22 }
23}
24
25impl Stack {
26 pub(crate) fn new(size: BytePageSize) -> Self {
27 Self {
28 buffers: vec![
29 Buffer {
30 read: Cell::new(None),
31 write: Cell::new(Some(BytePages::new(size))),
32 },
33 Buffer {
34 read: Cell::new(None),
35 write: Cell::new(Some(BytePages::new(size))),
36 },
37 ],
38 }
39 }
40
41 pub(crate) fn set_page_size(&self, size: BytePageSize) {
42 for b in &self.buffers {
43 b.with_write(|b| b.set_page_size(size));
44 }
45 }
46
47 pub(crate) fn add_layer(&mut self, page_size: BytePageSize) {
48 self.buffers.insert(
49 0,
50 Buffer {
51 read: Cell::new(None),
52 write: Cell::new(Some(BytePages::new(page_size))),
53 },
54 );
55 }
56
57 fn with_first<F, R>(&self, f: F) -> R
58 where
59 F: FnOnce(&Buffer) -> R,
60 {
61 f(&self.buffers[0])
62 }
63
64 fn with_last<F, R>(&self, f: F) -> R
65 where
66 F: FnOnce(&Buffer) -> R,
67 {
68 f(&self.buffers[self.buffers.len() - 2])
69 }
70
71 pub(crate) fn with_read_dst<F, R>(&self, io: &IoRef, f: F) -> R
72 where
73 F: FnOnce(&mut BytesMut) -> R,
74 {
75 self.with_first(|buf| buf.with_read(io, f))
76 }
77
78 pub(crate) fn write_buf_size(&self) -> usize {
79 if self.buffers.len() == 2 {
81 self.buffers[0].write_len()
82 } else {
83 self.buffers[0].write_len() + self.buffers[self.buffers.len() - 2].write_len()
84 }
85 }
86
87 pub(crate) fn with_write_src<F, R>(&self, f: F) -> R
88 where
89 F: FnOnce(&mut BytePages) -> R,
90 {
91 self.buffers[0].with_write(f)
92 }
93
94 pub(crate) fn with_write_dst<F, R>(&self, f: F) -> R
95 where
96 F: FnOnce(&mut BytePages) -> R,
97 {
98 self.buffers[self.buffers.len() - 2].with_write(f)
99 }
100
101 pub(crate) fn read_dst_size(&self) -> usize {
102 self.buffers[0].read_len()
103 }
104
105 pub(crate) fn with_filter<F, R>(&self, io: &IoRef, f: F) -> R
106 where
107 F: FnOnce(&mut FilterCtx<'_>) -> R,
108 {
109 let mut ctx = FilterCtx {
110 io,
111 idx: 0,
112 nbytes: 0,
113 stack: self,
114 st: FilterUpdates {
115 wants_write: false,
116 notify: false,
117 },
118 };
119 f(&mut ctx)
120 }
121
122 pub(crate) fn get_read_buf(&self) -> Option<BytesMut> {
123 self.with_last(|buffer| buffer.read.take())
124 }
125
126 pub(crate) fn set_read_buf(&self, buf: BytesMut, cfg: &IoConfig) {
127 self.with_last(move |buffer| {
128 if let Some(mut first_buf) = buffer.read.take() {
129 first_buf.extend_from_slice(&buf);
130 cfg.read_buf().release(buf);
131 buffer.read.set(Some(first_buf));
132 } else if !buf.is_empty() {
133 buffer.read.set(Some(buf));
134 } else {
135 cfg.read_buf().release(buf);
136 }
137 });
138 }
139
140 pub(crate) fn process_read_buf(
141 &self,
142 io: &IoRef,
143 nbytes: usize,
144 ) -> io::Result<FilterUpdates> {
145 let mut ctx = FilterCtx {
146 io,
147 nbytes,
148 idx: 0,
149 stack: self,
150 st: FilterUpdates {
151 wants_write: false,
152 notify: false,
153 },
154 };
155 let result = io.filter().process_read_buf(&mut ctx);
156 result.map(|()| ctx.st)
157 }
158
159 pub(crate) fn process_write_buf(&self, io: &IoRef) -> io::Result<()> {
160 if self.buffers[0].is_write_empty() {
161 Ok(())
162 } else {
163 let mut ctx = FilterCtx {
164 io,
165 idx: 0,
166 nbytes: 0,
167 stack: self,
168 st: FilterUpdates {
169 wants_write: true,
170 notify: false,
171 },
172 };
173 io.filter().process_write_buf(&mut ctx)
174 }
175 }
176
177 pub(crate) fn process_write_buf_force(&self, io: &IoRef) -> io::Result<()> {
178 let mut ctx = FilterCtx {
179 io,
180 idx: 0,
181 nbytes: 0,
182 stack: self,
183 st: FilterUpdates {
184 wants_write: true,
185 notify: false,
186 },
187 };
188 io.filter().process_write_buf(&mut ctx)
189 }
190
191 pub(crate) fn process_shutdown(&self, io: &IoRef) -> io::Result<Poll<()>> {
192 self.process_write_buf(io)?;
193 self.with_filter(io, |ctx| io.filter().shutdown(ctx))
194 }
195}
196
197impl Buffer {
198 fn is_write_empty(&self) -> bool {
199 self.with_write(|b| b.is_empty())
200 }
201
202 fn read_len(&self) -> usize {
203 if let Some(rb) = self.read.take() {
204 let l = rb.len();
205 self.read.set(Some(rb));
206 l
207 } else {
208 0
209 }
210 }
211
212 fn write_len(&self) -> usize {
213 self.with_write(|b| b.len())
214 }
215
216 fn with_read<F, R>(&self, io: &IoRef, f: F) -> R
217 where
218 F: FnOnce(&mut BytesMut) -> R,
219 {
220 let mut rb = self
221 .read
222 .take()
223 .unwrap_or_else(|| io.cfg().read_buf().get());
224 let result = f(&mut rb);
225
226 if self.read.take().is_some() {
228 log::error!("Nested read io operation is detected");
229 io.terminate();
230 }
231
232 if rb.is_empty() {
233 io.cfg().read_buf().release(rb);
234 } else {
235 self.read.set(Some(rb));
236 }
237 result
238 }
239
240 fn with_write<F, R>(&self, f: F) -> R
241 where
242 F: FnOnce(&mut BytePages) -> R,
243 {
244 let mut wb = self.write.take().unwrap();
245 let result = f(&mut wb);
246 self.write.set(Some(wb));
247 result
248 }
249}
250
251#[derive(Copy, Clone, Debug)]
252pub(crate) struct FilterUpdates {
253 pub(crate) wants_write: bool,
254 pub(crate) notify: bool,
255}
256
257#[derive(Debug)]
258pub struct FilterCtx<'a> {
259 io: &'a IoRef,
260 idx: usize,
261 nbytes: usize,
262 stack: &'a Stack,
263 st: FilterUpdates,
264}
265
266impl FilterCtx<'_> {
267 #[inline]
268 pub fn io(&self) -> &IoRef {
270 self.io
271 }
272
273 #[inline]
274 pub fn tag(&self) -> &'static str {
276 self.io.tag()
277 }
278
279 #[inline]
280 pub fn new_read_bytes(&self) -> usize {
282 self.nbytes
283 }
284
285 #[inline]
286 pub fn notify(&mut self) {
288 self.st.notify = true;
289 }
290
291 #[inline]
292 pub fn with_next<F, R>(&mut self, f: F) -> R
294 where
295 F: FnOnce(&mut Self) -> R,
296 {
297 self.idx += 1;
298 let res = f(self);
299 self.idx -= 1;
300 res
301 }
302
303 #[inline]
304 pub fn with_buffer<F, R>(&mut self, f: F) -> R
306 where
307 F: FnOnce(&mut FilterBuf<'_>) -> R,
308 {
309 let mut buf = FilterBuf {
310 io: self.io,
311 curr: &self.stack.buffers[self.idx],
312 next: &self.stack.buffers[self.idx + 1],
313 wants_write: Cell::new(self.st.wants_write),
314 };
315 let result = f(&mut buf);
316 if buf.wants_write.get() {
317 self.st.wants_write = true;
318 }
319 result
320 }
321
322 #[inline]
323 pub fn read_dst_size(&self) -> usize {
325 self.stack.buffers[0].read_len()
326 }
327
328 #[inline]
329 pub fn write_dst_size(&mut self) -> usize {
331 self.stack.buffers[self.stack.buffers.len() - 2].write_len()
332 }
333
334 pub(crate) fn clear_write_buf(&mut self) {
335 self.stack.buffers[self.idx].with_write(BytePages::clear);
336 }
337}
338
339#[derive(Debug)]
340pub struct FilterBuf<'a> {
341 io: &'a IoRef,
342 curr: &'a Buffer,
343 next: &'a Buffer,
344 wants_write: Cell<bool>,
345}
346
347impl FilterBuf<'_> {
348 #[inline]
349 pub fn io(&self) -> &IoRef {
351 self.io
352 }
353
354 #[inline]
355 pub fn tag(&self) -> &'static str {
357 self.io.tag()
358 }
359
360 pub fn with_read_src<F, R>(&self, f: F) -> R
362 where
363 F: FnOnce(&mut Option<BytesMut>) -> R,
364 {
365 let mut read_src = self.next.read.take();
366 let result = f(&mut read_src);
367
368 if let Some(b) = read_src {
369 if b.is_empty() {
370 self.io.cfg().read_buf().release(b);
371 } else {
372 self.next.read.set(Some(b));
373 }
374 }
375 result
376 }
377
378 pub fn with_read_buffers<F, R>(&self, f: F) -> R
380 where
381 F: FnOnce(&mut Option<BytesMut>, &mut BytesMut) -> R,
382 {
383 let mut read_src = self.next.read.take();
384 let mut read_dst = self
385 .curr
386 .read
387 .take()
388 .unwrap_or_else(|| self.io.cfg().read_buf().get());
389
390 let result = f(&mut read_src, &mut read_dst);
391
392 if let Some(b) = read_src {
393 if b.is_empty() {
394 self.io.cfg().read_buf().release(b);
395 } else {
396 self.next.read.set(Some(b));
397 }
398 }
399 if read_dst.is_empty() {
400 self.io.cfg().read_buf().release(read_dst);
401 } else {
402 self.curr.read.set(Some(read_dst));
403 }
404
405 result
406 }
407
408 #[inline]
409 pub fn with_write_buffers<F, R>(&self, f: F) -> R
411 where
412 F: FnOnce(&mut BytePages, &mut BytePages) -> R,
413 {
414 let mut write_curr = self.curr.write.take().unwrap();
415 let mut write_next = self.next.write.take().unwrap();
416 let write_len = if self.wants_write.get() {
417 0
418 } else {
419 write_next.len()
420 };
421
422 let result = f(&mut write_curr, &mut write_next);
423
424 if !self.wants_write.get() && write_next.len() > write_len {
425 self.wants_write.set(true);
426 }
427
428 self.curr.write.set(Some(write_curr));
429 self.next.write.set(Some(write_next));
430 result
431 }
432}
433
434impl fmt::Debug for Buffer {
435 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436 let read = self.read.take();
437 let write = self.write.take();
438
439 let result = f
440 .debug_struct("Buffer")
441 .field("read", &read)
442 .field("write", &write)
443 .finish();
444 self.read.set(read);
445 self.write.set(write);
446 result
447 }
448}