1use std::{cell::Cell, fmt};
2
3use ntex_bytes::BytesMut;
4use ntex_util::future::Either;
5
6use crate::{IoRef, cfg::BufConfig};
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10 read: Cell<Option<BytesMut>>,
11 write: Cell<Option<BytesMut>>,
12}
13
14impl fmt::Debug for Buffer {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 let b0 = self.read.take();
17 let b1 = self.write.take();
18 let res = f
19 .debug_struct("Buffer")
20 .field("read", &b0)
21 .field("write", &b1)
22 .finish();
23 self.read.set(b0);
24 self.write.set(b1);
25 res
26 }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33 len: usize,
34 buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38 pub(crate) fn new() -> Self {
39 Self {
40 len: 1,
41 buffers: Either::Left(Default::default()),
42 }
43 }
44
45 pub(crate) fn add_layer(&mut self) {
46 match &mut self.buffers {
47 Either::Left(b) => {
48 if self.len == INLINE_SIZE {
50 let mut vec = vec![Buffer {
51 read: Cell::new(None),
52 write: Cell::new(None),
53 }];
54 for item in b.iter_mut().take(self.len) {
55 vec.push(Buffer {
56 read: Cell::new(item.read.take()),
57 write: Cell::new(item.write.take()),
58 });
59 }
60 self.len += 1;
61 self.buffers = Either::Right(vec);
62 } else {
63 let mut idx = self.len;
64 while idx > 0 {
65 let item = Buffer {
66 read: Cell::new(b[idx - 1].read.take()),
67 write: Cell::new(b[idx - 1].write.take()),
68 };
69 b[idx] = item;
70 idx -= 1;
71 }
72 b[0] = Buffer {
73 read: Cell::new(None),
74 write: Cell::new(None),
75 };
76 self.len += 1;
77 }
78 }
79 Either::Right(vec) => {
80 self.len += 1;
81 vec.insert(
82 0,
83 Buffer {
84 read: Cell::new(None),
85 write: Cell::new(None),
86 },
87 );
88 }
89 }
90 }
91
92 fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93 where
94 F: FnOnce(&Buffer, &Buffer) -> R,
95 {
96 let buffers = match self.buffers {
97 Either::Left(ref b) => &b[..],
98 Either::Right(ref b) => &b[..],
99 };
100
101 let next = idx + 1;
102 if self.len > next {
103 f(&buffers[idx], &buffers[next])
104 } else {
105 f(&buffers[idx], &Buffer::default())
106 }
107 }
108
109 fn get_first_level(&self) -> &Buffer {
110 match &self.buffers {
111 Either::Left(b) => &b[0],
112 Either::Right(b) => &b[0],
113 }
114 }
115
116 fn get_last_level(&self) -> &Buffer {
117 match &self.buffers {
118 Either::Left(b) => &b[self.len - 1],
119 Either::Right(b) => &b[self.len - 1],
120 }
121 }
122
123 pub(crate) fn get_read_source(&self) -> Option<BytesMut> {
124 self.get_last_level().read.take()
125 }
126
127 pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesMut) {
128 if buf.is_empty() {
129 io.cfg().read_buf().release(buf);
130 } else {
131 self.get_last_level().read.set(Some(buf));
132 }
133 }
134
135 pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136 where
137 F: FnOnce(&mut BytesMut) -> R,
138 {
139 let item = self.get_first_level();
140 let mut rb = item
141 .read
142 .take()
143 .unwrap_or_else(|| io.cfg().read_buf().get());
144
145 let result = f(&mut rb);
146
147 if item.read.take().is_some() {
149 log::error!("Nested read io operation is detected");
150 io.force_close();
151 }
152
153 if rb.is_empty() {
154 io.cfg().read_buf().release(rb);
155 } else {
156 item.read.set(Some(rb));
157 }
158 result
159 }
160
161 pub(crate) fn get_write_destination(&self) -> Option<BytesMut> {
162 self.get_last_level().write.take()
163 }
164
165 pub(crate) fn set_write_destination(&self, buf: BytesMut) -> Option<BytesMut> {
166 let b = self.get_last_level().write.take();
167 if b.is_some() {
168 self.get_last_level().write.set(b);
169 Some(buf)
170 } else {
171 self.get_last_level().write.set(Some(buf));
172 None
173 }
174 }
175
176 pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177 where
178 F: FnOnce(&mut BytesMut) -> R,
179 {
180 let item = self.get_first_level();
181 let mut wb = item
182 .write
183 .take()
184 .unwrap_or_else(|| io.cfg().write_buf().get());
185
186 let result = f(&mut wb);
187 if wb.is_empty() {
188 io.cfg().write_buf().release(wb);
189 } else {
190 item.write.set(Some(wb));
191 }
192 result
193 }
194
195 pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196 where
197 F: FnOnce(Option<&mut BytesMut>) -> R,
198 {
199 let item = self.get_last_level();
200 let mut wb = item.write.take();
201
202 let result = f(wb.as_mut());
203
204 if item.write.take().is_some() {
206 log::error!("Nested write io operation is detected");
207 io.force_close();
208 }
209
210 if let Some(b) = wb {
211 if b.is_empty() {
212 io.cfg().write_buf().release(b);
213 } else {
214 item.write.set(Some(b));
215 }
216 }
217 result
218 }
219
220 pub(crate) fn read_destination_size(&self) -> usize {
221 let item = self.get_first_level();
222 let rb = item.read.take();
223 let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
224 item.read.set(rb);
225 size
226 }
227
228 pub(crate) fn write_destination_size(&self) -> usize {
229 let item = self.get_last_level();
230 let wb = item.write.take();
231 let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
232 item.write.set(wb);
233 size
234 }
235}
236
237#[derive(Copy, Clone, Debug)]
238pub struct FilterCtx<'a> {
239 pub(crate) io: &'a IoRef,
240 pub(crate) stack: &'a Stack,
241 pub(crate) idx: usize,
242}
243
244impl<'a> FilterCtx<'a> {
245 pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
246 Self { io, stack, idx: 0 }
247 }
248
249 #[inline]
250 pub fn io(&self) -> &IoRef {
252 self.io
253 }
254
255 #[inline]
256 pub fn tag(&self) -> &'static str {
258 self.io.tag()
259 }
260
261 #[inline]
262 pub fn next(&self) -> Self {
264 Self {
265 io: self.io,
266 stack: self.stack,
267 idx: self.idx + 1,
268 }
269 }
270
271 #[inline]
272 pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
274 where
275 F: FnOnce(&ReadBuf<'_>) -> R,
276 {
277 self.stack.get_buffers(self.idx, |curr, next| {
278 let buf = ReadBuf {
279 nbytes,
280 curr,
281 next,
282 io: self.io,
283 need_write: Cell::new(false),
284 };
285 f(&buf)
286 })
287 }
288
289 #[inline]
290 pub fn write_buf<F, R>(&self, f: F) -> R
292 where
293 F: FnOnce(&WriteBuf<'_>) -> R,
294 {
295 self.stack.get_buffers(self.idx, |curr, next| {
296 let buf = WriteBuf {
297 curr,
298 next,
299 io: self.io,
300 need_write: Cell::new(false),
301 };
302 f(&buf)
303 })
304 }
305}
306
307#[derive(Debug)]
308pub struct ReadBuf<'a> {
309 pub(crate) io: &'a IoRef,
310 pub(crate) curr: &'a Buffer,
311 pub(crate) next: &'a Buffer,
312 pub(crate) nbytes: usize,
313 pub(crate) need_write: Cell<bool>,
314}
315
316impl ReadBuf<'_> {
317 #[inline]
318 pub fn tag(&self) -> &'static str {
320 self.io.tag()
321 }
322
323 #[inline]
324 pub fn cfg(&self) -> &BufConfig {
326 self.io.cfg().read_buf()
327 }
328
329 #[inline]
330 pub fn nbytes(&self) -> usize {
332 self.nbytes
333 }
334
335 #[inline]
336 pub fn want_shutdown(&self) {
338 self.io.want_shutdown()
339 }
340
341 #[inline]
342 pub fn resize_buf(&self, buf: &mut BytesMut) {
344 self.io.cfg().read_buf().resize(buf);
345 }
346
347 #[inline]
348 pub fn with_src<F, R>(&self, f: F) -> R
350 where
351 F: FnOnce(&mut Option<BytesMut>) -> R,
352 {
353 let mut buf = self.next.read.take();
354 let result = f(&mut buf);
355
356 if let Some(b) = buf {
357 if b.is_empty() {
358 self.io.cfg().read_buf().release(b);
359 } else {
360 self.next.read.set(Some(b));
361 }
362 }
363 result
364 }
365
366 #[inline]
367 pub fn with_dst<F, R>(&self, f: F) -> R
369 where
370 F: FnOnce(&mut BytesMut) -> R,
371 {
372 let mut rb = self
373 .curr
374 .read
375 .take()
376 .unwrap_or_else(|| self.io.cfg().read_buf().get());
377
378 let result = f(&mut rb);
379 if rb.is_empty() {
380 self.io.cfg().read_buf().release(rb);
381 } else {
382 self.curr.read.set(Some(rb));
383 }
384 result
385 }
386
387 #[inline]
388 pub fn take_src(&self) -> Option<BytesMut> {
390 self.next.read.take().and_then(|b| {
391 if b.is_empty() {
392 self.io.cfg().read_buf().release(b);
393 None
394 } else {
395 Some(b)
396 }
397 })
398 }
399
400 #[inline]
401 pub fn set_src(&self, src: Option<BytesMut>) {
403 if let Some(src) = src {
404 if src.is_empty() {
405 self.io.cfg().read_buf().release(src);
406 } else if let Some(mut buf) = self.next.read.take() {
407 buf.extend_from_slice(&src);
408 self.next.read.set(Some(buf));
409 self.io.cfg().read_buf().release(src);
410 } else {
411 self.next.read.set(Some(src));
412 }
413 }
414 }
415
416 #[inline]
417 pub fn take_dst(&self) -> BytesMut {
419 self.curr
420 .read
421 .take()
422 .unwrap_or_else(|| self.io.cfg().read_buf().get())
423 }
424
425 #[inline]
426 pub fn set_dst(&self, dst: Option<BytesMut>) {
428 if let Some(dst) = dst {
429 if dst.is_empty() {
430 self.io.cfg().read_buf().release(dst);
431 } else if let Some(mut buf) = self.curr.read.take() {
432 buf.extend_from_slice(&dst);
433 self.curr.read.set(Some(buf));
434 self.io.cfg().read_buf().release(dst);
435 } else {
436 self.curr.read.set(Some(dst));
437 }
438 }
439 }
440
441 #[inline]
442 pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
443 where
444 F: FnOnce(&WriteBuf<'b>) -> R,
445 {
446 let mut buf = WriteBuf {
447 io: self.io,
448 curr: self.curr,
449 next: self.next,
450 need_write: Cell::new(self.need_write.get()),
451 };
452 let result = f(&mut buf);
453 self.need_write.set(buf.need_write.get());
454 result
455 }
456}
457
458#[derive(Debug)]
459pub struct WriteBuf<'a> {
460 pub(crate) io: &'a IoRef,
461 pub(crate) curr: &'a Buffer,
462 pub(crate) next: &'a Buffer,
463 pub(crate) need_write: Cell<bool>,
464}
465
466impl WriteBuf<'_> {
467 #[inline]
468 pub fn tag(&self) -> &'static str {
470 self.io.tag()
471 }
472
473 #[inline]
474 pub fn cfg(&self) -> &BufConfig {
476 self.io.cfg().write_buf()
477 }
478
479 #[inline]
480 pub fn want_shutdown(&self) {
482 self.io.want_shutdown()
483 }
484
485 #[inline]
486 pub fn resize_buf(&self, buf: &mut BytesMut) {
488 self.io.cfg().write_buf().resize(buf);
489 }
490
491 #[inline]
492 pub fn resize_buf_min(&self, buf: &mut BytesMut, size: usize) {
494 self.io.cfg().write_buf().resize_min(buf, size);
495 }
496
497 #[inline]
498 pub fn with_src<F, R>(&self, f: F) -> R
500 where
501 F: FnOnce(&mut Option<BytesMut>) -> R,
502 {
503 let mut wb = self.curr.write.take();
504 let result = f(&mut wb);
505 if let Some(b) = wb {
506 if b.is_empty() {
507 self.io.cfg().write_buf().release(b);
508 } else {
509 self.curr.write.set(Some(b));
510 }
511 }
512 result
513 }
514
515 #[inline]
516 pub fn with_dst<F, R>(&self, f: F) -> R
518 where
519 F: FnOnce(&mut BytesMut) -> R,
520 {
521 let mut wb = self
522 .next
523 .write
524 .take()
525 .unwrap_or_else(|| self.io.cfg().write_buf().get());
526
527 let total = wb.len();
528 let result = f(&mut wb);
529
530 if wb.is_empty() {
531 self.io.cfg().write_buf().release(wb);
532 } else {
533 self.need_write
534 .set(self.need_write.get() | (total != wb.len()));
535 self.next.write.set(Some(wb));
536 }
537 result
538 }
539
540 #[inline]
541 pub fn take_src(&self) -> Option<BytesMut> {
543 self.curr.write.take().and_then(|b| {
544 if b.is_empty() {
545 self.io.cfg().write_buf().release(b);
546 None
547 } else {
548 Some(b)
549 }
550 })
551 }
552
553 #[inline]
554 pub fn set_src(&self, src: Option<BytesMut>) {
556 if let Some(src) = src {
557 if src.is_empty() {
558 self.io.cfg().write_buf().release(src);
559 } else if let Some(mut buf) = self.curr.write.take() {
560 buf.extend_from_slice(&src);
561 self.curr.write.set(Some(buf));
562 self.io.cfg().write_buf().release(src);
563 } else {
564 self.curr.write.set(Some(src));
565 }
566 }
567 }
568
569 #[inline]
570 pub fn take_dst(&self) -> BytesMut {
572 self.next
573 .write
574 .take()
575 .unwrap_or_else(|| self.io.cfg().write_buf().get())
576 }
577
578 #[inline]
579 pub fn set_dst(&self, dst: Option<BytesMut>) {
581 if let Some(dst) = dst {
582 if dst.is_empty() {
583 self.io.cfg().write_buf().release(dst);
584 } else {
585 self.need_write.set(true);
586
587 if let Some(mut buf) = self.next.write.take() {
588 buf.extend_from_slice(&dst);
589 self.next.write.set(Some(buf));
590 self.io.cfg().write_buf().release(dst);
591 } else {
592 self.next.write.set(Some(dst));
593 }
594 }
595 }
596 }
597
598 #[inline]
599 pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
600 where
601 F: FnOnce(&ReadBuf<'b>) -> R,
602 {
603 let mut buf = ReadBuf {
604 io: self.io,
605 curr: self.curr,
606 next: self.next,
607 nbytes: 0,
608 need_write: Cell::new(self.need_write.get()),
609 };
610 let result = f(&mut buf);
611 self.need_write.set(buf.need_write.get());
612 result
613 }
614}