1use std::{cell::Cell, fmt};
2
3use ntex_bytes::{BytesVec, PoolRef};
4use ntex_util::future::Either;
5
6use crate::IoRef;
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10 read: Cell<Option<BytesVec>>,
11 write: Cell<Option<BytesVec>>,
12}
13
14impl fmt::Debug for Buffer {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 let b0 = self.read.take();
17 let b1 = self.write.take();
18 let res = f
19 .debug_struct("Buffer")
20 .field("read", &b0)
21 .field("write", &b1)
22 .finish();
23 self.read.set(b0);
24 self.write.set(b1);
25 res
26 }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33 len: usize,
34 buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38 pub(crate) fn new() -> Self {
39 Self {
40 len: 1,
41 buffers: Either::Left(Default::default()),
42 }
43 }
44
45 pub(crate) fn add_layer(&mut self) {
46 match &mut self.buffers {
47 Either::Left(b) => {
48 if self.len == INLINE_SIZE {
50 let mut vec = vec![Buffer {
51 read: Cell::new(None),
52 write: Cell::new(None),
53 }];
54 for item in b.iter_mut().take(self.len) {
55 vec.push(Buffer {
56 read: Cell::new(item.read.take()),
57 write: Cell::new(item.write.take()),
58 });
59 }
60 self.len += 1;
61 self.buffers = Either::Right(vec);
62 } else {
63 let mut idx = self.len;
64 while idx > 0 {
65 let item = Buffer {
66 read: Cell::new(b[idx - 1].read.take()),
67 write: Cell::new(b[idx - 1].write.take()),
68 };
69 b[idx] = item;
70 idx -= 1;
71 }
72 b[0] = Buffer {
73 read: Cell::new(None),
74 write: Cell::new(None),
75 };
76 self.len += 1;
77 }
78 }
79 Either::Right(vec) => {
80 self.len += 1;
81 vec.insert(
82 0,
83 Buffer {
84 read: Cell::new(None),
85 write: Cell::new(None),
86 },
87 );
88 }
89 }
90 }
91
92 fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93 where
94 F: FnOnce(&Buffer, &Buffer) -> R,
95 {
96 let buffers = match self.buffers {
97 Either::Left(ref b) => &b[..],
98 Either::Right(ref b) => &b[..],
99 };
100
101 let next = idx + 1;
102 if self.len > next {
103 f(&buffers[idx], &buffers[next])
104 } else {
105 f(&buffers[idx], &Buffer::default())
106 }
107 }
108
109 fn get_first_level(&self) -> &Buffer {
110 match &self.buffers {
111 Either::Left(b) => &b[0],
112 Either::Right(b) => &b[0],
113 }
114 }
115
116 fn get_last_level(&self) -> &Buffer {
117 match &self.buffers {
118 Either::Left(b) => &b[self.len - 1],
119 Either::Right(b) => &b[self.len - 1],
120 }
121 }
122
123 pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
124 self.get_last_level().read.take()
125 }
126
127 pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
128 if buf.is_empty() {
129 io.memory_pool().release_read_buf(buf);
130 } else {
131 self.get_last_level().read.set(Some(buf));
132 }
133 }
134
135 pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136 where
137 F: FnOnce(&mut BytesVec) -> R,
138 {
139 let item = self.get_first_level();
140 let mut rb = item
141 .read
142 .take()
143 .unwrap_or_else(|| io.memory_pool().get_read_buf());
144
145 let result = f(&mut rb);
146
147 if item.read.take().is_some() {
149 log::error!("Nested read io operation is detected");
150 io.force_close();
151 }
152
153 if rb.is_empty() {
154 io.memory_pool().release_read_buf(rb);
155 } else {
156 item.read.set(Some(rb));
157 }
158 result
159 }
160
161 pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
162 self.get_last_level().write.take()
163 }
164
165 pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
166 let b = self.get_last_level().write.take();
167 if b.is_some() {
168 self.get_last_level().write.set(b);
169 Some(buf)
170 } else {
171 self.get_last_level().write.set(Some(buf));
172 None
173 }
174 }
175
176 pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177 where
178 F: FnOnce(&mut BytesVec) -> R,
179 {
180 let item = self.get_first_level();
181 let mut wb = item
182 .write
183 .take()
184 .unwrap_or_else(|| io.memory_pool().get_write_buf());
185
186 let result = f(&mut wb);
187 if wb.is_empty() {
188 io.memory_pool().release_write_buf(wb);
189 } else {
190 item.write.set(Some(wb));
191 }
192 result
193 }
194
195 pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196 where
197 F: FnOnce(Option<&mut BytesVec>) -> R,
198 {
199 let item = self.get_last_level();
200 let mut wb = item.write.take();
201
202 let result = f(wb.as_mut());
203
204 if item.write.take().is_some() {
206 log::error!("Nested write io operation is detected");
207 io.force_close();
208 }
209
210 if let Some(b) = wb {
211 if b.is_empty() {
212 io.memory_pool().release_write_buf(b);
213 } else {
214 item.write.set(Some(b));
215 }
216 }
217 result
218 }
219
220 pub(crate) fn read_destination_size(&self) -> usize {
221 let item = self.get_first_level();
222 let rb = item.read.take();
223 let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
224 item.read.set(rb);
225 size
226 }
227
228 pub(crate) fn write_destination_size(&self) -> usize {
229 let item = self.get_last_level();
230 let wb = item.write.take();
231 let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
232 item.write.set(wb);
233 size
234 }
235
236 pub(crate) fn release(&self, pool: PoolRef) {
237 let items = match &self.buffers {
238 Either::Left(b) => &b[..],
239 Either::Right(b) => &b[..],
240 };
241
242 for item in items {
243 if let Some(buf) = item.read.take() {
244 pool.release_read_buf(buf);
245 }
246 if let Some(buf) = item.write.take() {
247 pool.release_write_buf(buf);
248 }
249 }
250 }
251
252 pub(crate) fn set_memory_pool(&self, pool: PoolRef) {
253 let items = match &self.buffers {
254 Either::Left(b) => &b[..],
255 Either::Right(b) => &b[..],
256 };
257 for item in items {
258 if let Some(mut b) = item.read.take() {
259 pool.move_vec_in(&mut b);
260 item.read.set(Some(b));
261 }
262 if let Some(mut b) = item.write.take() {
263 pool.move_vec_in(&mut b);
264 item.write.set(Some(b));
265 }
266 }
267 }
268}
269
270#[derive(Copy, Clone, Debug)]
271pub struct FilterCtx<'a> {
272 pub(crate) io: &'a IoRef,
273 pub(crate) stack: &'a Stack,
274 pub(crate) idx: usize,
275}
276
277impl<'a> FilterCtx<'a> {
278 pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
279 Self { io, stack, idx: 0 }
280 }
281
282 #[inline]
283 pub fn tag(&self) -> &'static str {
284 self.io.tag()
285 }
286
287 #[inline]
288 pub fn next(&self) -> Self {
289 Self {
290 io: self.io,
291 stack: self.stack,
292 idx: self.idx + 1,
293 }
294 }
295
296 #[inline]
297 pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
298 where
299 F: FnOnce(&ReadBuf<'_>) -> R,
300 {
301 self.stack.get_buffers(self.idx, |curr, next| {
302 let buf = ReadBuf {
303 nbytes,
304 curr,
305 next,
306 io: self.io,
307 need_write: Cell::new(false),
308 };
309 f(&buf)
310 })
311 }
312
313 #[inline]
314 pub fn write_buf<F, R>(&self, f: F) -> R
315 where
316 F: FnOnce(&WriteBuf<'_>) -> R,
317 {
318 self.stack.get_buffers(self.idx, |curr, next| {
319 let buf = WriteBuf {
320 curr,
321 next,
322 io: self.io,
323 need_write: Cell::new(false),
324 };
325 f(&buf)
326 })
327 }
328}
329
330#[derive(Debug)]
331pub struct ReadBuf<'a> {
332 pub(crate) io: &'a IoRef,
333 pub(crate) curr: &'a Buffer,
334 pub(crate) next: &'a Buffer,
335 pub(crate) nbytes: usize,
336 pub(crate) need_write: Cell<bool>,
337}
338
339impl ReadBuf<'_> {
340 #[inline]
341 pub fn tag(&self) -> &'static str {
343 self.io.tag()
344 }
345
346 #[inline]
347 pub fn nbytes(&self) -> usize {
349 self.nbytes
350 }
351
352 #[inline]
353 pub fn want_shutdown(&self) {
355 self.io.want_shutdown()
356 }
357
358 #[inline]
359 pub fn resize_buf(&self, buf: &mut BytesVec) {
361 self.io.memory_pool().resize_read_buf(buf);
362 }
363
364 #[inline]
365 pub fn with_src<F, R>(&self, f: F) -> R
367 where
368 F: FnOnce(&mut Option<BytesVec>) -> R,
369 {
370 let mut buf = self.next.read.take();
371 let result = f(&mut buf);
372
373 if let Some(b) = buf {
374 if b.is_empty() {
375 self.io.memory_pool().release_read_buf(b);
376 } else {
377 self.next.read.set(Some(b));
378 }
379 }
380 result
381 }
382
383 #[inline]
384 pub fn with_dst<F, R>(&self, f: F) -> R
386 where
387 F: FnOnce(&mut BytesVec) -> R,
388 {
389 let mut rb = self
390 .curr
391 .read
392 .take()
393 .unwrap_or_else(|| self.io.memory_pool().get_read_buf());
394
395 let result = f(&mut rb);
396 if rb.is_empty() {
397 self.io.memory_pool().release_read_buf(rb);
398 } else {
399 self.curr.read.set(Some(rb));
400 }
401 result
402 }
403
404 #[inline]
405 pub fn take_src(&self) -> Option<BytesVec> {
407 self.next.read.take().and_then(|b| {
408 if b.is_empty() {
409 self.io.memory_pool().release_read_buf(b);
410 None
411 } else {
412 Some(b)
413 }
414 })
415 }
416
417 #[inline]
418 pub fn set_src(&self, src: Option<BytesVec>) {
420 if let Some(src) = src {
421 if src.is_empty() {
422 self.io.memory_pool().release_read_buf(src);
423 } else if let Some(mut buf) = self.next.read.take() {
424 buf.extend_from_slice(&src);
425 self.next.read.set(Some(buf));
426 self.io.memory_pool().release_read_buf(src);
427 } else {
428 self.next.read.set(Some(src));
429 }
430 }
431 }
432
433 #[inline]
434 pub fn take_dst(&self) -> BytesVec {
436 self.curr
437 .read
438 .take()
439 .unwrap_or_else(|| self.io.memory_pool().get_read_buf())
440 }
441
442 #[inline]
443 pub fn set_dst(&self, dst: Option<BytesVec>) {
445 if let Some(dst) = dst {
446 if dst.is_empty() {
447 self.io.memory_pool().release_read_buf(dst);
448 } else if let Some(mut buf) = self.curr.read.take() {
449 buf.extend_from_slice(&dst);
450 self.curr.read.set(Some(buf));
451 self.io.memory_pool().release_read_buf(dst);
452 } else {
453 self.curr.read.set(Some(dst));
454 }
455 }
456 }
457
458 #[inline]
459 pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
460 where
461 F: FnOnce(&WriteBuf<'b>) -> R,
462 {
463 let mut buf = WriteBuf {
464 io: self.io,
465 curr: self.curr,
466 next: self.next,
467 need_write: Cell::new(self.need_write.get()),
468 };
469 let result = f(&mut buf);
470 self.need_write.set(buf.need_write.get());
471 result
472 }
473}
474
475#[derive(Debug)]
476pub struct WriteBuf<'a> {
477 pub(crate) io: &'a IoRef,
478 pub(crate) curr: &'a Buffer,
479 pub(crate) next: &'a Buffer,
480 pub(crate) need_write: Cell<bool>,
481}
482
483impl WriteBuf<'_> {
484 #[inline]
485 pub fn tag(&self) -> &'static str {
487 self.io.tag()
488 }
489
490 #[inline]
491 pub fn want_shutdown(&self) {
493 self.io.want_shutdown()
494 }
495
496 #[inline]
497 pub fn resize_buf(&self, buf: &mut BytesVec) {
499 self.io.memory_pool().resize_write_buf(buf);
500 }
501
502 #[inline]
503 pub fn with_src<F, R>(&self, f: F) -> R
505 where
506 F: FnOnce(&mut Option<BytesVec>) -> R,
507 {
508 let mut wb = self.curr.write.take();
509 let result = f(&mut wb);
510 if let Some(b) = wb {
511 if b.is_empty() {
512 self.io.memory_pool().release_write_buf(b);
513 } else {
514 self.curr.write.set(Some(b));
515 }
516 }
517 result
518 }
519
520 #[inline]
521 pub fn with_dst<F, R>(&self, f: F) -> R
523 where
524 F: FnOnce(&mut BytesVec) -> R,
525 {
526 let mut wb = self
527 .next
528 .write
529 .take()
530 .unwrap_or_else(|| self.io.memory_pool().get_write_buf());
531
532 let total = wb.len();
533 let result = f(&mut wb);
534
535 if wb.is_empty() {
536 self.io.memory_pool().release_write_buf(wb);
537 } else {
538 self.need_write
539 .set(self.need_write.get() | (total != wb.len()));
540 self.next.write.set(Some(wb));
541 }
542 result
543 }
544
545 #[inline]
546 pub fn take_src(&self) -> Option<BytesVec> {
548 self.curr.write.take().and_then(|b| {
549 if b.is_empty() {
550 self.io.memory_pool().release_write_buf(b);
551 None
552 } else {
553 Some(b)
554 }
555 })
556 }
557
558 #[inline]
559 pub fn set_src(&self, src: Option<BytesVec>) {
561 if let Some(src) = src {
562 if src.is_empty() {
563 self.io.memory_pool().release_write_buf(src);
564 } else if let Some(mut buf) = self.curr.write.take() {
565 buf.extend_from_slice(&src);
566 self.curr.write.set(Some(buf));
567 self.io.memory_pool().release_write_buf(src);
568 } else {
569 self.curr.write.set(Some(src));
570 }
571 }
572 }
573
574 #[inline]
575 pub fn take_dst(&self) -> BytesVec {
577 self.next
578 .write
579 .take()
580 .unwrap_or_else(|| self.io.memory_pool().get_write_buf())
581 }
582
583 #[inline]
584 pub fn set_dst(&self, dst: Option<BytesVec>) {
586 if let Some(dst) = dst {
587 if dst.is_empty() {
588 self.io.memory_pool().release_write_buf(dst);
589 } else {
590 self.need_write.set(true);
591
592 if let Some(mut buf) = self.next.write.take() {
593 buf.extend_from_slice(&dst);
594 self.next.write.set(Some(buf));
595 self.io.memory_pool().release_write_buf(dst);
596 } else {
597 self.next.write.set(Some(dst));
598 }
599 }
600 }
601 }
602
603 #[inline]
604 pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
605 where
606 F: FnOnce(&ReadBuf<'b>) -> R,
607 {
608 let mut buf = ReadBuf {
609 io: self.io,
610 curr: self.curr,
611 next: self.next,
612 nbytes: 0,
613 need_write: Cell::new(self.need_write.get()),
614 };
615 let result = f(&mut buf);
616 self.need_write.set(buf.need_write.get());
617 result
618 }
619}