1use std::{cell::Cell, fmt};
2
3use ntex_bytes::BytesMut;
4use ntex_util::future::Either;
5
6use crate::{IoRef, cfg::BufConfig};
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10 read: Cell<Option<BytesMut>>,
11 write: Cell<Option<BytesMut>>,
12}
13
14impl fmt::Debug for Buffer {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 let b0 = self.read.take();
17 let b1 = self.write.take();
18 let res = f
19 .debug_struct("Buffer")
20 .field("read", &b0)
21 .field("write", &b1)
22 .finish();
23 self.read.set(b0);
24 self.write.set(b1);
25 res
26 }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33 len: usize,
34 buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38 pub(crate) fn new() -> Self {
39 Self {
40 len: 1,
41 buffers: Either::Left(Default::default()),
42 }
43 }
44
45 pub(crate) fn add_layer(&mut self) {
46 match &mut self.buffers {
47 Either::Left(b) => {
48 if self.len == INLINE_SIZE {
50 let mut vec = vec![Buffer {
51 read: Cell::new(None),
52 write: Cell::new(None),
53 }];
54 for item in b.iter_mut().take(self.len) {
55 vec.push(Buffer {
56 read: Cell::new(item.read.take()),
57 write: Cell::new(item.write.take()),
58 });
59 }
60 self.len += 1;
61 self.buffers = Either::Right(vec);
62 } else {
63 let mut idx = self.len;
64 while idx > 0 {
65 let item = Buffer {
66 read: Cell::new(b[idx - 1].read.take()),
67 write: Cell::new(b[idx - 1].write.take()),
68 };
69 b[idx] = item;
70 idx -= 1;
71 }
72 b[0] = Buffer {
73 read: Cell::new(None),
74 write: Cell::new(None),
75 };
76 self.len += 1;
77 }
78 }
79 Either::Right(vec) => {
80 self.len += 1;
81 vec.insert(
82 0,
83 Buffer {
84 read: Cell::new(None),
85 write: Cell::new(None),
86 },
87 );
88 }
89 }
90 }
91
92 fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93 where
94 F: FnOnce(&Buffer, &Buffer) -> R,
95 {
96 let buffers = match self.buffers {
97 Either::Left(ref b) => &b[..],
98 Either::Right(ref b) => &b[..],
99 };
100
101 let next = idx + 1;
102 if self.len > next {
103 f(&buffers[idx], &buffers[next])
104 } else {
105 f(&buffers[idx], &Buffer::default())
106 }
107 }
108
109 fn get_first_level(&self) -> &Buffer {
110 match &self.buffers {
111 Either::Left(b) => &b[0],
112 Either::Right(b) => &b[0],
113 }
114 }
115
116 fn get_last_level(&self) -> &Buffer {
117 match &self.buffers {
118 Either::Left(b) => &b[self.len - 1],
119 Either::Right(b) => &b[self.len - 1],
120 }
121 }
122
123 pub(crate) fn get_read_source(&self) -> Option<BytesMut> {
124 self.get_last_level().read.take()
125 }
126
127 pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesMut) {
128 if buf.is_empty() {
129 io.cfg().read_buf().release(buf);
130 } else {
131 self.get_last_level().read.set(Some(buf));
132 }
133 }
134
135 pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136 where
137 F: FnOnce(&mut BytesMut) -> R,
138 {
139 let item = self.get_first_level();
140 let mut rb = item
141 .read
142 .take()
143 .unwrap_or_else(|| io.cfg().read_buf().get());
144
145 let result = f(&mut rb);
146
147 if item.read.take().is_some() {
149 log::error!("Nested read io operation is detected");
150 io.force_close();
151 }
152
153 if rb.is_empty() {
154 io.cfg().read_buf().release(rb);
155 } else {
156 item.read.set(Some(rb));
157 }
158 result
159 }
160
161 pub(crate) fn get_write_destination(&self) -> Option<BytesMut> {
162 self.get_last_level().write.take()
163 }
164
165 pub(crate) fn set_write_destination(&self, buf: BytesMut) -> Option<BytesMut> {
166 let b = self.get_last_level().write.take();
167 if b.is_some() {
168 self.get_last_level().write.set(b);
169 Some(buf)
170 } else {
171 self.get_last_level().write.set(Some(buf));
172 None
173 }
174 }
175
176 pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177 where
178 F: FnOnce(&mut BytesMut) -> R,
179 {
180 let item = self.get_first_level();
181 let mut wb = item
182 .write
183 .take()
184 .unwrap_or_else(|| io.cfg().write_buf().get());
185
186 let result = f(&mut wb);
187 if wb.is_empty() {
188 io.cfg().write_buf().release(wb);
189 } else {
190 item.write.set(Some(wb));
191 }
192 result
193 }
194
195 pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196 where
197 F: FnOnce(Option<&mut BytesMut>) -> R,
198 {
199 let item = self.get_last_level();
200 let mut wb = item.write.take();
201
202 let result = f(wb.as_mut());
203
204 if item.write.take().is_some() {
206 log::error!("Nested write io operation is detected");
207 io.force_close();
208 }
209
210 if let Some(b) = wb {
211 if b.is_empty() {
212 io.cfg().write_buf().release(b);
213 } else {
214 item.write.set(Some(b));
215 }
216 }
217 result
218 }
219
220 pub(crate) fn read_destination_size(&self) -> usize {
221 let item = self.get_first_level();
222 let rb = item.read.take();
223 let size = rb.as_ref().map_or(0, BytesMut::len);
224 item.read.set(rb);
225 size
226 }
227
228 pub(crate) fn write_destination_size(&self) -> usize {
229 let item = self.get_last_level();
230 let wb = item.write.take();
231 let size = wb.as_ref().map_or(0, BytesMut::len);
232 item.write.set(wb);
233 size
234 }
235}
236
237#[derive(Copy, Clone, Debug)]
238pub struct FilterCtx<'a> {
239 pub(crate) io: &'a IoRef,
240 pub(crate) stack: &'a Stack,
241 pub(crate) idx: usize,
242}
243
244impl<'a> FilterCtx<'a> {
245 pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
246 Self { io, stack, idx: 0 }
247 }
248
249 #[inline]
250 pub fn io(&self) -> &IoRef {
252 self.io
253 }
254
255 #[inline]
256 pub fn tag(&self) -> &'static str {
258 self.io.tag()
259 }
260
261 #[inline]
262 #[must_use]
263 pub fn next(&self) -> Self {
265 Self {
266 io: self.io,
267 stack: self.stack,
268 idx: self.idx + 1,
269 }
270 }
271
272 #[inline]
273 pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
275 where
276 F: FnOnce(&ReadBuf<'_>) -> R,
277 {
278 self.stack.get_buffers(self.idx, |curr, next| {
279 let buf = ReadBuf {
280 nbytes,
281 curr,
282 next,
283 io: self.io,
284 need_write: Cell::new(false),
285 };
286 f(&buf)
287 })
288 }
289
290 #[inline]
291 pub fn write_buf<F, R>(&self, f: F) -> R
293 where
294 F: FnOnce(&WriteBuf<'_>) -> R,
295 {
296 self.stack.get_buffers(self.idx, |curr, next| {
297 let buf = WriteBuf {
298 curr,
299 next,
300 io: self.io,
301 need_write: Cell::new(false),
302 };
303 f(&buf)
304 })
305 }
306}
307
308#[derive(Debug)]
309pub struct ReadBuf<'a> {
310 pub(crate) io: &'a IoRef,
311 pub(crate) curr: &'a Buffer,
312 pub(crate) next: &'a Buffer,
313 pub(crate) nbytes: usize,
314 pub(crate) need_write: Cell<bool>,
315}
316
317impl ReadBuf<'_> {
318 #[inline]
319 pub fn tag(&self) -> &'static str {
321 self.io.tag()
322 }
323
324 #[inline]
325 pub fn cfg(&self) -> &BufConfig {
327 self.io.cfg().read_buf()
328 }
329
330 #[inline]
331 pub fn nbytes(&self) -> usize {
333 self.nbytes
334 }
335
336 #[inline]
337 pub fn want_shutdown(&self) {
339 self.io.want_shutdown();
340 }
341
342 #[inline]
343 pub fn resize_buf(&self, buf: &mut BytesMut) {
345 self.io.cfg().read_buf().resize(buf);
346 }
347
348 #[inline]
349 pub fn with_src<F, R>(&self, f: F) -> R
351 where
352 F: FnOnce(&mut Option<BytesMut>) -> R,
353 {
354 let mut buf = self.next.read.take();
355 let result = f(&mut buf);
356
357 if let Some(b) = buf {
358 if b.is_empty() {
359 self.io.cfg().read_buf().release(b);
360 } else {
361 self.next.read.set(Some(b));
362 }
363 }
364 result
365 }
366
367 #[inline]
368 pub fn with_dst<F, R>(&self, f: F) -> R
370 where
371 F: FnOnce(&mut BytesMut) -> R,
372 {
373 let mut rb = self
374 .curr
375 .read
376 .take()
377 .unwrap_or_else(|| self.io.cfg().read_buf().get());
378
379 let result = f(&mut rb);
380 if rb.is_empty() {
381 self.io.cfg().read_buf().release(rb);
382 } else {
383 self.curr.read.set(Some(rb));
384 }
385 result
386 }
387
388 #[inline]
389 pub fn take_src(&self) -> Option<BytesMut> {
391 self.next.read.take().and_then(|b| {
392 if b.is_empty() {
393 self.io.cfg().read_buf().release(b);
394 None
395 } else {
396 Some(b)
397 }
398 })
399 }
400
401 #[inline]
402 pub fn set_src(&self, src: Option<BytesMut>) {
404 if let Some(src) = src {
405 if src.is_empty() {
406 self.io.cfg().read_buf().release(src);
407 } else if let Some(mut buf) = self.next.read.take() {
408 buf.extend_from_slice(&src);
409 self.next.read.set(Some(buf));
410 self.io.cfg().read_buf().release(src);
411 } else {
412 self.next.read.set(Some(src));
413 }
414 }
415 }
416
417 #[inline]
418 pub fn take_dst(&self) -> BytesMut {
420 self.curr
421 .read
422 .take()
423 .unwrap_or_else(|| self.io.cfg().read_buf().get())
424 }
425
426 #[inline]
427 pub fn set_dst(&self, dst: Option<BytesMut>) {
429 if let Some(dst) = dst {
430 if dst.is_empty() {
431 self.io.cfg().read_buf().release(dst);
432 } else if let Some(mut buf) = self.curr.read.take() {
433 buf.extend_from_slice(&dst);
434 self.curr.read.set(Some(buf));
435 self.io.cfg().read_buf().release(dst);
436 } else {
437 self.curr.read.set(Some(dst));
438 }
439 }
440 }
441
442 #[inline]
443 pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
444 where
445 F: FnOnce(&WriteBuf<'b>) -> R,
446 {
447 let mut buf = WriteBuf {
448 io: self.io,
449 curr: self.curr,
450 next: self.next,
451 need_write: Cell::new(self.need_write.get()),
452 };
453 let result = f(&mut buf);
454 self.need_write.set(buf.need_write.get());
455 result
456 }
457}
458
459#[derive(Debug)]
460pub struct WriteBuf<'a> {
461 pub(crate) io: &'a IoRef,
462 pub(crate) curr: &'a Buffer,
463 pub(crate) next: &'a Buffer,
464 pub(crate) need_write: Cell<bool>,
465}
466
467impl WriteBuf<'_> {
468 #[inline]
469 pub fn tag(&self) -> &'static str {
471 self.io.tag()
472 }
473
474 #[inline]
475 pub fn cfg(&self) -> &BufConfig {
477 self.io.cfg().write_buf()
478 }
479
480 #[inline]
481 pub fn want_shutdown(&self) {
483 self.io.want_shutdown();
484 }
485
486 #[inline]
487 pub fn resize_buf(&self, buf: &mut BytesMut) {
489 self.io.cfg().write_buf().resize(buf);
490 }
491
492 #[inline]
493 pub fn resize_buf_min(&self, buf: &mut BytesMut, size: usize) {
495 self.io.cfg().write_buf().resize_min(buf, size);
496 }
497
498 #[inline]
499 pub fn with_src<F, R>(&self, f: F) -> R
501 where
502 F: FnOnce(&mut Option<BytesMut>) -> R,
503 {
504 let mut wb = self.curr.write.take();
505 let result = f(&mut wb);
506 if let Some(b) = wb {
507 if b.is_empty() {
508 self.io.cfg().write_buf().release(b);
509 } else {
510 self.curr.write.set(Some(b));
511 }
512 }
513 result
514 }
515
516 #[inline]
517 pub fn with_dst<F, R>(&self, f: F) -> R
519 where
520 F: FnOnce(&mut BytesMut) -> R,
521 {
522 let mut wb = self
523 .next
524 .write
525 .take()
526 .unwrap_or_else(|| self.io.cfg().write_buf().get());
527
528 let total = wb.len();
529 let result = f(&mut wb);
530
531 if wb.is_empty() {
532 self.io.cfg().write_buf().release(wb);
533 } else {
534 self.need_write
535 .set(self.need_write.get() | (total != wb.len()));
536 self.next.write.set(Some(wb));
537 }
538 result
539 }
540
541 #[inline]
542 pub fn take_src(&self) -> Option<BytesMut> {
544 self.curr.write.take().and_then(|b| {
545 if b.is_empty() {
546 self.io.cfg().write_buf().release(b);
547 None
548 } else {
549 Some(b)
550 }
551 })
552 }
553
554 #[inline]
555 pub fn set_src(&self, src: Option<BytesMut>) {
557 if let Some(src) = src {
558 if src.is_empty() {
559 self.io.cfg().write_buf().release(src);
560 } else if let Some(mut buf) = self.curr.write.take() {
561 buf.extend_from_slice(&src);
562 self.curr.write.set(Some(buf));
563 self.io.cfg().write_buf().release(src);
564 } else {
565 self.curr.write.set(Some(src));
566 }
567 }
568 }
569
570 #[inline]
571 pub fn take_dst(&self) -> BytesMut {
573 self.next
574 .write
575 .take()
576 .unwrap_or_else(|| self.io.cfg().write_buf().get())
577 }
578
579 #[inline]
580 pub fn set_dst(&self, dst: Option<BytesMut>) {
582 if let Some(dst) = dst {
583 if dst.is_empty() {
584 self.io.cfg().write_buf().release(dst);
585 } else {
586 self.need_write.set(true);
587
588 if let Some(mut buf) = self.next.write.take() {
589 buf.extend_from_slice(&dst);
590 self.next.write.set(Some(buf));
591 self.io.cfg().write_buf().release(dst);
592 } else {
593 self.next.write.set(Some(dst));
594 }
595 }
596 }
597 }
598
599 #[inline]
600 pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
601 where
602 F: FnOnce(&ReadBuf<'b>) -> R,
603 {
604 let mut buf = ReadBuf {
605 io: self.io,
606 curr: self.curr,
607 next: self.next,
608 nbytes: 0,
609 need_write: Cell::new(self.need_write.get()),
610 };
611 let result = f(&mut buf);
612 self.need_write.set(buf.need_write.get());
613 result
614 }
615}