1use std::{cell::Cell, fmt};
2
3use ntex_bytes::BytesVec;
4use ntex_util::future::Either;
5
6use crate::{IoRef, cfg::BufConfig};
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10 read: Cell<Option<BytesVec>>,
11 write: Cell<Option<BytesVec>>,
12}
13
14impl fmt::Debug for Buffer {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 let b0 = self.read.take();
17 let b1 = self.write.take();
18 let res = f
19 .debug_struct("Buffer")
20 .field("read", &b0)
21 .field("write", &b1)
22 .finish();
23 self.read.set(b0);
24 self.write.set(b1);
25 res
26 }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33 len: usize,
34 buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38 pub(crate) fn new() -> Self {
39 Self {
40 len: 1,
41 buffers: Either::Left(Default::default()),
42 }
43 }
44
45 pub(crate) fn add_layer(&mut self) {
46 match &mut self.buffers {
47 Either::Left(b) => {
48 if self.len == INLINE_SIZE {
50 let mut vec = vec![Buffer {
51 read: Cell::new(None),
52 write: Cell::new(None),
53 }];
54 for item in b.iter_mut().take(self.len) {
55 vec.push(Buffer {
56 read: Cell::new(item.read.take()),
57 write: Cell::new(item.write.take()),
58 });
59 }
60 self.len += 1;
61 self.buffers = Either::Right(vec);
62 } else {
63 let mut idx = self.len;
64 while idx > 0 {
65 let item = Buffer {
66 read: Cell::new(b[idx - 1].read.take()),
67 write: Cell::new(b[idx - 1].write.take()),
68 };
69 b[idx] = item;
70 idx -= 1;
71 }
72 b[0] = Buffer {
73 read: Cell::new(None),
74 write: Cell::new(None),
75 };
76 self.len += 1;
77 }
78 }
79 Either::Right(vec) => {
80 self.len += 1;
81 vec.insert(
82 0,
83 Buffer {
84 read: Cell::new(None),
85 write: Cell::new(None),
86 },
87 );
88 }
89 }
90 }
91
92 fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93 where
94 F: FnOnce(&Buffer, &Buffer) -> R,
95 {
96 let buffers = match self.buffers {
97 Either::Left(ref b) => &b[..],
98 Either::Right(ref b) => &b[..],
99 };
100
101 let next = idx + 1;
102 if self.len > next {
103 f(&buffers[idx], &buffers[next])
104 } else {
105 f(&buffers[idx], &Buffer::default())
106 }
107 }
108
109 fn get_first_level(&self) -> &Buffer {
110 match &self.buffers {
111 Either::Left(b) => &b[0],
112 Either::Right(b) => &b[0],
113 }
114 }
115
116 fn get_last_level(&self) -> &Buffer {
117 match &self.buffers {
118 Either::Left(b) => &b[self.len - 1],
119 Either::Right(b) => &b[self.len - 1],
120 }
121 }
122
123 pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
124 self.get_last_level().read.take()
125 }
126
127 pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
128 if buf.is_empty() {
129 io.cfg().read_buf().release(buf);
130 } else {
131 self.get_last_level().read.set(Some(buf));
132 }
133 }
134
135 pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136 where
137 F: FnOnce(&mut BytesVec) -> R,
138 {
139 let item = self.get_first_level();
140 let mut rb = item
141 .read
142 .take()
143 .unwrap_or_else(|| io.cfg().read_buf().get());
144
145 let result = f(&mut rb);
146
147 if item.read.take().is_some() {
149 log::error!("Nested read io operation is detected");
150 io.force_close();
151 }
152
153 if rb.is_empty() {
154 io.cfg().read_buf().release(rb);
155 } else {
156 item.read.set(Some(rb));
157 }
158 result
159 }
160
161 pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
162 self.get_last_level().write.take()
163 }
164
165 pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
166 let b = self.get_last_level().write.take();
167 if b.is_some() {
168 self.get_last_level().write.set(b);
169 Some(buf)
170 } else {
171 self.get_last_level().write.set(Some(buf));
172 None
173 }
174 }
175
176 pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177 where
178 F: FnOnce(&mut BytesVec) -> R,
179 {
180 let item = self.get_first_level();
181 let mut wb = item
182 .write
183 .take()
184 .unwrap_or_else(|| io.cfg().write_buf().get());
185
186 let result = f(&mut wb);
187 if wb.is_empty() {
188 io.cfg().write_buf().release(wb);
189 } else {
190 item.write.set(Some(wb));
191 }
192 result
193 }
194
195 pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196 where
197 F: FnOnce(Option<&mut BytesVec>) -> R,
198 {
199 let item = self.get_last_level();
200 let mut wb = item.write.take();
201
202 let result = f(wb.as_mut());
203
204 if item.write.take().is_some() {
206 log::error!("Nested write io operation is detected");
207 io.force_close();
208 }
209
210 if let Some(b) = wb {
211 if b.is_empty() {
212 io.cfg().write_buf().release(b);
213 } else {
214 item.write.set(Some(b));
215 }
216 }
217 result
218 }
219
220 pub(crate) fn read_destination_size(&self) -> usize {
221 let item = self.get_first_level();
222 let rb = item.read.take();
223 let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
224 item.read.set(rb);
225 size
226 }
227
228 pub(crate) fn write_destination_size(&self) -> usize {
229 let item = self.get_last_level();
230 let wb = item.write.take();
231 let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
232 item.write.set(wb);
233 size
234 }
235}
236
237#[derive(Copy, Clone, Debug)]
238pub struct FilterCtx<'a> {
239 pub(crate) io: &'a IoRef,
240 pub(crate) stack: &'a Stack,
241 pub(crate) idx: usize,
242}
243
244impl<'a> FilterCtx<'a> {
245 pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
246 Self { io, stack, idx: 0 }
247 }
248
249 #[inline]
250 pub fn tag(&self) -> &'static str {
251 self.io.tag()
252 }
253
254 #[inline]
255 pub fn next(&self) -> Self {
256 Self {
257 io: self.io,
258 stack: self.stack,
259 idx: self.idx + 1,
260 }
261 }
262
263 #[inline]
264 pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
265 where
266 F: FnOnce(&ReadBuf<'_>) -> R,
267 {
268 self.stack.get_buffers(self.idx, |curr, next| {
269 let buf = ReadBuf {
270 nbytes,
271 curr,
272 next,
273 io: self.io,
274 need_write: Cell::new(false),
275 };
276 f(&buf)
277 })
278 }
279
280 #[inline]
281 pub fn write_buf<F, R>(&self, f: F) -> R
282 where
283 F: FnOnce(&WriteBuf<'_>) -> R,
284 {
285 self.stack.get_buffers(self.idx, |curr, next| {
286 let buf = WriteBuf {
287 curr,
288 next,
289 io: self.io,
290 need_write: Cell::new(false),
291 };
292 f(&buf)
293 })
294 }
295}
296
297#[derive(Debug)]
298pub struct ReadBuf<'a> {
299 pub(crate) io: &'a IoRef,
300 pub(crate) curr: &'a Buffer,
301 pub(crate) next: &'a Buffer,
302 pub(crate) nbytes: usize,
303 pub(crate) need_write: Cell<bool>,
304}
305
306impl ReadBuf<'_> {
307 #[inline]
308 pub fn tag(&self) -> &'static str {
310 self.io.tag()
311 }
312
313 #[inline]
314 pub fn cfg(&self) -> &BufConfig {
316 self.io.cfg().read_buf()
317 }
318
319 #[inline]
320 pub fn nbytes(&self) -> usize {
322 self.nbytes
323 }
324
325 #[inline]
326 pub fn want_shutdown(&self) {
328 self.io.want_shutdown()
329 }
330
331 #[inline]
332 pub fn resize_buf(&self, buf: &mut BytesVec) {
334 self.io.cfg().read_buf().resize(buf);
335 }
336
337 #[inline]
338 pub fn with_src<F, R>(&self, f: F) -> R
340 where
341 F: FnOnce(&mut Option<BytesVec>) -> R,
342 {
343 let mut buf = self.next.read.take();
344 let result = f(&mut buf);
345
346 if let Some(b) = buf {
347 if b.is_empty() {
348 self.io.cfg().read_buf().release(b);
349 } else {
350 self.next.read.set(Some(b));
351 }
352 }
353 result
354 }
355
356 #[inline]
357 pub fn with_dst<F, R>(&self, f: F) -> R
359 where
360 F: FnOnce(&mut BytesVec) -> R,
361 {
362 let mut rb = self
363 .curr
364 .read
365 .take()
366 .unwrap_or_else(|| self.io.cfg().read_buf().get());
367
368 let result = f(&mut rb);
369 if rb.is_empty() {
370 self.io.cfg().read_buf().release(rb);
371 } else {
372 self.curr.read.set(Some(rb));
373 }
374 result
375 }
376
377 #[inline]
378 pub fn take_src(&self) -> Option<BytesVec> {
380 self.next.read.take().and_then(|b| {
381 if b.is_empty() {
382 self.io.cfg().read_buf().release(b);
383 None
384 } else {
385 Some(b)
386 }
387 })
388 }
389
390 #[inline]
391 pub fn set_src(&self, src: Option<BytesVec>) {
393 if let Some(src) = src {
394 if src.is_empty() {
395 self.io.cfg().read_buf().release(src);
396 } else if let Some(mut buf) = self.next.read.take() {
397 buf.extend_from_slice(&src);
398 self.next.read.set(Some(buf));
399 self.io.cfg().read_buf().release(src);
400 } else {
401 self.next.read.set(Some(src));
402 }
403 }
404 }
405
406 #[inline]
407 pub fn take_dst(&self) -> BytesVec {
409 self.curr
410 .read
411 .take()
412 .unwrap_or_else(|| self.io.cfg().read_buf().get())
413 }
414
415 #[inline]
416 pub fn set_dst(&self, dst: Option<BytesVec>) {
418 if let Some(dst) = dst {
419 if dst.is_empty() {
420 self.io.cfg().read_buf().release(dst);
421 } else if let Some(mut buf) = self.curr.read.take() {
422 buf.extend_from_slice(&dst);
423 self.curr.read.set(Some(buf));
424 self.io.cfg().read_buf().release(dst);
425 } else {
426 self.curr.read.set(Some(dst));
427 }
428 }
429 }
430
431 #[inline]
432 pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
433 where
434 F: FnOnce(&WriteBuf<'b>) -> R,
435 {
436 let mut buf = WriteBuf {
437 io: self.io,
438 curr: self.curr,
439 next: self.next,
440 need_write: Cell::new(self.need_write.get()),
441 };
442 let result = f(&mut buf);
443 self.need_write.set(buf.need_write.get());
444 result
445 }
446}
447
448#[derive(Debug)]
449pub struct WriteBuf<'a> {
450 pub(crate) io: &'a IoRef,
451 pub(crate) curr: &'a Buffer,
452 pub(crate) next: &'a Buffer,
453 pub(crate) need_write: Cell<bool>,
454}
455
456impl WriteBuf<'_> {
457 #[inline]
458 pub fn tag(&self) -> &'static str {
460 self.io.tag()
461 }
462
463 #[inline]
464 pub fn cfg(&self) -> &BufConfig {
466 self.io.cfg().write_buf()
467 }
468
469 #[inline]
470 pub fn want_shutdown(&self) {
472 self.io.want_shutdown()
473 }
474
475 #[inline]
476 pub fn resize_buf(&self, buf: &mut BytesVec) {
478 self.io.cfg().write_buf().resize(buf);
479 }
480
481 #[inline]
482 pub fn with_src<F, R>(&self, f: F) -> R
484 where
485 F: FnOnce(&mut Option<BytesVec>) -> R,
486 {
487 let mut wb = self.curr.write.take();
488 let result = f(&mut wb);
489 if let Some(b) = wb {
490 if b.is_empty() {
491 self.io.cfg().write_buf().release(b);
492 } else {
493 self.curr.write.set(Some(b));
494 }
495 }
496 result
497 }
498
499 #[inline]
500 pub fn with_dst<F, R>(&self, f: F) -> R
502 where
503 F: FnOnce(&mut BytesVec) -> R,
504 {
505 let mut wb = self
506 .next
507 .write
508 .take()
509 .unwrap_or_else(|| self.io.cfg().write_buf().get());
510
511 let total = wb.len();
512 let result = f(&mut wb);
513
514 if wb.is_empty() {
515 self.io.cfg().write_buf().release(wb);
516 } else {
517 self.need_write
518 .set(self.need_write.get() | (total != wb.len()));
519 self.next.write.set(Some(wb));
520 }
521 result
522 }
523
524 #[inline]
525 pub fn take_src(&self) -> Option<BytesVec> {
527 self.curr.write.take().and_then(|b| {
528 if b.is_empty() {
529 self.io.cfg().write_buf().release(b);
530 None
531 } else {
532 Some(b)
533 }
534 })
535 }
536
537 #[inline]
538 pub fn set_src(&self, src: Option<BytesVec>) {
540 if let Some(src) = src {
541 if src.is_empty() {
542 self.io.cfg().write_buf().release(src);
543 } else if let Some(mut buf) = self.curr.write.take() {
544 buf.extend_from_slice(&src);
545 self.curr.write.set(Some(buf));
546 self.io.cfg().write_buf().release(src);
547 } else {
548 self.curr.write.set(Some(src));
549 }
550 }
551 }
552
553 #[inline]
554 pub fn take_dst(&self) -> BytesVec {
556 self.next
557 .write
558 .take()
559 .unwrap_or_else(|| self.io.cfg().write_buf().get())
560 }
561
562 #[inline]
563 pub fn set_dst(&self, dst: Option<BytesVec>) {
565 if let Some(dst) = dst {
566 if dst.is_empty() {
567 self.io.cfg().write_buf().release(dst);
568 } else {
569 self.need_write.set(true);
570
571 if let Some(mut buf) = self.next.write.take() {
572 buf.extend_from_slice(&dst);
573 self.next.write.set(Some(buf));
574 self.io.cfg().write_buf().release(dst);
575 } else {
576 self.next.write.set(Some(dst));
577 }
578 }
579 }
580 }
581
582 #[inline]
583 pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
584 where
585 F: FnOnce(&ReadBuf<'b>) -> R,
586 {
587 let mut buf = ReadBuf {
588 io: self.io,
589 curr: self.curr,
590 next: self.next,
591 nbytes: 0,
592 need_write: Cell::new(self.need_write.get()),
593 };
594 let result = f(&mut buf);
595 self.need_write.set(buf.need_write.get());
596 result
597 }
598}