1use std::{cell::Cell, fmt};
2
3use ntex_bytes::{BytesVec, PoolRef};
4use ntex_util::future::Either;
5
6use crate::IoRef;
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10 read: Cell<Option<BytesVec>>,
11 write: Cell<Option<BytesVec>>,
12}
13
14impl fmt::Debug for Buffer {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 let b0 = self.read.take();
17 let b1 = self.write.take();
18 let res = f
19 .debug_struct("Buffer")
20 .field("read", &b0)
21 .field("write", &b1)
22 .finish();
23 self.read.set(b0);
24 self.write.set(b1);
25 res
26 }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33 len: usize,
34 buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38 pub(crate) fn new() -> Self {
39 Self {
40 len: 1,
41 buffers: Either::Left(Default::default()),
42 }
43 }
44
45 pub(crate) fn add_layer(&mut self) {
46 match &mut self.buffers {
47 Either::Left(b) => {
48 if self.len == INLINE_SIZE {
50 let mut vec = vec![Buffer {
51 read: Cell::new(None),
52 write: Cell::new(None),
53 }];
54 for item in b.iter_mut().take(self.len) {
55 vec.push(Buffer {
56 read: Cell::new(item.read.take()),
57 write: Cell::new(item.write.take()),
58 });
59 }
60 self.len += 1;
61 self.buffers = Either::Right(vec);
62 } else {
63 let mut idx = self.len;
64 while idx > 0 {
65 let item = Buffer {
66 read: Cell::new(b[idx - 1].read.take()),
67 write: Cell::new(b[idx - 1].write.take()),
68 };
69 b[idx] = item;
70 idx -= 1;
71 }
72 b[0] = Buffer {
73 read: Cell::new(None),
74 write: Cell::new(None),
75 };
76 self.len += 1;
77 }
78 }
79 Either::Right(vec) => {
80 self.len += 1;
81 vec.insert(
82 0,
83 Buffer {
84 read: Cell::new(None),
85 write: Cell::new(None),
86 },
87 );
88 }
89 }
90 }
91
92 fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93 where
94 F: FnOnce(&Buffer, &Buffer) -> R,
95 {
96 let buffers = match self.buffers {
97 Either::Left(ref b) => &b[..],
98 Either::Right(ref b) => &b[..],
99 };
100
101 let next = idx + 1;
102 if self.len > next {
103 f(&buffers[idx], &buffers[next])
104 } else {
105 f(&buffers[idx], &Buffer::default())
106 }
107 }
108
109 fn get_first_level(&self) -> &Buffer {
110 match &self.buffers {
111 Either::Left(b) => &b[0],
112 Either::Right(b) => &b[0],
113 }
114 }
115
116 fn get_last_level(&self) -> &Buffer {
117 match &self.buffers {
118 Either::Left(b) => &b[self.len - 1],
119 Either::Right(b) => &b[self.len - 1],
120 }
121 }
122
123 pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
124 self.get_last_level().read.take()
125 }
126
127 pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
128 if buf.is_empty() {
129 io.memory_pool().release_read_buf(buf);
130 } else {
131 self.get_last_level().read.set(Some(buf));
132 }
133 }
134
135 pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136 where
137 F: FnOnce(&mut BytesVec) -> R,
138 {
139 let item = self.get_first_level();
140 let mut rb = item
141 .read
142 .take()
143 .unwrap_or_else(|| io.memory_pool().get_read_buf());
144
145 let result = f(&mut rb);
146
147 if item.read.take().is_some() {
149 log::error!("Nested read io operation is detected");
150 io.force_close();
151 }
152
153 if rb.is_empty() {
154 io.memory_pool().release_read_buf(rb);
155 } else {
156 item.read.set(Some(rb));
157 }
158 result
159 }
160
161 pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
162 self.get_last_level().write.take()
163 }
164
165 pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
166 let b = self.get_last_level().write.take();
167 if b.is_some() {
168 self.get_last_level().write.set(b);
169 Some(buf)
170 } else {
171 self.get_last_level().write.set(Some(buf));
172 None
173 }
174 }
175
176 pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177 where
178 F: FnOnce(&mut BytesVec) -> R,
179 {
180 let item = self.get_first_level();
181 let mut wb = item
182 .write
183 .take()
184 .unwrap_or_else(|| io.memory_pool().get_write_buf());
185
186 let result = f(&mut wb);
187 if wb.is_empty() {
188 io.memory_pool().release_write_buf(wb);
189 } else {
190 item.write.set(Some(wb));
191 }
192 result
193 }
194
195 pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196 where
197 F: FnOnce(Option<&mut BytesVec>) -> R,
198 {
199 let item = self.get_last_level();
200 let mut wb = item.write.take();
201
202 let result = f(wb.as_mut());
203
204 if item.write.take().is_some() {
206 log::error!("Nested write io operation is detected");
207 io.force_close();
208 }
209
210 if let Some(b) = wb {
211 if b.is_empty() {
212 io.memory_pool().release_write_buf(b);
213 } else {
214 item.write.set(Some(b));
215 }
216 }
217 result
218 }
219
220 pub(crate) fn read_destination_size(&self) -> usize {
221 let item = self.get_first_level();
222 let rb = item.read.take();
223 let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
224 item.read.set(rb);
225 size
226 }
227
228 pub(crate) fn write_destination_size(&self) -> usize {
229 let item = self.get_last_level();
230 let wb = item.write.take();
231 let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
232 item.write.set(wb);
233 size
234 }
235
236 pub(crate) fn release(&self, pool: PoolRef) {
237 let items = match &self.buffers {
238 Either::Left(b) => &b[..],
239 Either::Right(b) => &b[..],
240 };
241
242 for item in items {
243 if let Some(buf) = item.read.take() {
244 pool.release_read_buf(buf);
245 }
246 if let Some(buf) = item.write.take() {
247 pool.release_write_buf(buf);
248 }
249 }
250 }
251
252 pub(crate) fn set_memory_pool(&self, pool: PoolRef) {
253 let items = match &self.buffers {
254 Either::Left(b) => &b[..],
255 Either::Right(b) => &b[..],
256 };
257 for item in items {
258 if let Some(mut b) = item.read.take() {
259 pool.move_vec_in(&mut b);
260 item.read.set(Some(b));
261 }
262 if let Some(mut b) = item.write.take() {
263 pool.move_vec_in(&mut b);
264 item.write.set(Some(b));
265 }
266 }
267 }
268}
269
270#[derive(Copy, Clone, Debug)]
271pub struct FilterCtx<'a> {
272 pub(crate) io: &'a IoRef,
273 pub(crate) stack: &'a Stack,
274 pub(crate) idx: usize,
275}
276
277impl<'a> FilterCtx<'a> {
278 pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
279 Self { io, stack, idx: 0 }
280 }
281
282 pub fn next(&self) -> Self {
283 Self {
284 io: self.io,
285 stack: self.stack,
286 idx: self.idx + 1,
287 }
288 }
289
290 pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
291 where
292 F: FnOnce(&ReadBuf<'_>) -> R,
293 {
294 self.stack.get_buffers(self.idx, |curr, next| {
295 let buf = ReadBuf {
296 nbytes,
297 curr,
298 next,
299 io: self.io,
300 need_write: Cell::new(false),
301 };
302 f(&buf)
303 })
304 }
305
306 pub fn write_buf<F, R>(&self, f: F) -> R
307 where
308 F: FnOnce(&WriteBuf<'_>) -> R,
309 {
310 self.stack.get_buffers(self.idx, |curr, next| {
311 let buf = WriteBuf {
312 curr,
313 next,
314 io: self.io,
315 need_write: Cell::new(false),
316 };
317 f(&buf)
318 })
319 }
320}
321
322#[derive(Debug)]
323pub struct ReadBuf<'a> {
324 pub(crate) io: &'a IoRef,
325 pub(crate) curr: &'a Buffer,
326 pub(crate) next: &'a Buffer,
327 pub(crate) nbytes: usize,
328 pub(crate) need_write: Cell<bool>,
329}
330
331impl ReadBuf<'_> {
332 #[inline]
333 pub fn tag(&self) -> &'static str {
335 self.io.tag()
336 }
337
338 #[inline]
339 pub fn nbytes(&self) -> usize {
341 self.nbytes
342 }
343
344 #[inline]
345 pub fn want_shutdown(&self) {
347 self.io.want_shutdown()
348 }
349
350 #[inline]
351 pub fn resize_buf(&self, buf: &mut BytesVec) {
353 self.io.memory_pool().resize_read_buf(buf);
354 }
355
356 #[inline]
357 pub fn with_src<F, R>(&self, f: F) -> R
359 where
360 F: FnOnce(&mut Option<BytesVec>) -> R,
361 {
362 let mut buf = self.next.read.take();
363 let result = f(&mut buf);
364
365 if let Some(b) = buf {
366 if b.is_empty() {
367 self.io.memory_pool().release_read_buf(b);
368 } else {
369 self.next.read.set(Some(b));
370 }
371 }
372 result
373 }
374
375 #[inline]
376 pub fn with_dst<F, R>(&self, f: F) -> R
378 where
379 F: FnOnce(&mut BytesVec) -> R,
380 {
381 let mut rb = self
382 .curr
383 .read
384 .take()
385 .unwrap_or_else(|| self.io.memory_pool().get_read_buf());
386
387 let result = f(&mut rb);
388 if rb.is_empty() {
389 self.io.memory_pool().release_read_buf(rb);
390 } else {
391 self.curr.read.set(Some(rb));
392 }
393 result
394 }
395
396 #[inline]
397 pub fn take_src(&self) -> Option<BytesVec> {
399 self.next.read.take().and_then(|b| {
400 if b.is_empty() {
401 self.io.memory_pool().release_read_buf(b);
402 None
403 } else {
404 Some(b)
405 }
406 })
407 }
408
409 #[inline]
410 pub fn set_src(&self, src: Option<BytesVec>) {
412 if let Some(src) = src {
413 if src.is_empty() {
414 self.io.memory_pool().release_read_buf(src);
415 } else if let Some(mut buf) = self.next.read.take() {
416 buf.extend_from_slice(&src);
417 self.next.read.set(Some(buf));
418 self.io.memory_pool().release_read_buf(src);
419 } else {
420 self.next.read.set(Some(src));
421 }
422 }
423 }
424
425 #[inline]
426 pub fn take_dst(&self) -> BytesVec {
428 self.curr
429 .read
430 .take()
431 .unwrap_or_else(|| self.io.memory_pool().get_read_buf())
432 }
433
434 #[inline]
435 pub fn set_dst(&self, dst: Option<BytesVec>) {
437 if let Some(dst) = dst {
438 if dst.is_empty() {
439 self.io.memory_pool().release_read_buf(dst);
440 } else if let Some(mut buf) = self.curr.read.take() {
441 buf.extend_from_slice(&dst);
442 self.curr.read.set(Some(buf));
443 self.io.memory_pool().release_read_buf(dst);
444 } else {
445 self.curr.read.set(Some(dst));
446 }
447 }
448 }
449
450 #[inline]
451 pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
452 where
453 F: FnOnce(&WriteBuf<'b>) -> R,
454 {
455 let mut buf = WriteBuf {
456 io: self.io,
457 curr: self.curr,
458 next: self.next,
459 need_write: Cell::new(self.need_write.get()),
460 };
461 let result = f(&mut buf);
462 self.need_write.set(buf.need_write.get());
463 result
464 }
465}
466
467#[derive(Debug)]
468pub struct WriteBuf<'a> {
469 pub(crate) io: &'a IoRef,
470 pub(crate) curr: &'a Buffer,
471 pub(crate) next: &'a Buffer,
472 pub(crate) need_write: Cell<bool>,
473}
474
475impl WriteBuf<'_> {
476 #[inline]
477 pub fn tag(&self) -> &'static str {
479 self.io.tag()
480 }
481
482 #[inline]
483 pub fn want_shutdown(&self) {
485 self.io.want_shutdown()
486 }
487
488 #[inline]
489 pub fn resize_buf(&self, buf: &mut BytesVec) {
491 self.io.memory_pool().resize_write_buf(buf);
492 }
493
494 #[inline]
495 pub fn with_src<F, R>(&self, f: F) -> R
497 where
498 F: FnOnce(&mut Option<BytesVec>) -> R,
499 {
500 let mut wb = self.curr.write.take();
501 let result = f(&mut wb);
502 if let Some(b) = wb {
503 if b.is_empty() {
504 self.io.memory_pool().release_write_buf(b);
505 } else {
506 self.curr.write.set(Some(b));
507 }
508 }
509 result
510 }
511
512 #[inline]
513 pub fn with_dst<F, R>(&self, f: F) -> R
515 where
516 F: FnOnce(&mut BytesVec) -> R,
517 {
518 let mut wb = self
519 .next
520 .write
521 .take()
522 .unwrap_or_else(|| self.io.memory_pool().get_write_buf());
523
524 let total = wb.len();
525 let result = f(&mut wb);
526
527 if wb.is_empty() {
528 self.io.memory_pool().release_write_buf(wb);
529 } else {
530 self.need_write
531 .set(self.need_write.get() | (total != wb.len()));
532 self.next.write.set(Some(wb));
533 }
534 result
535 }
536
537 #[inline]
538 pub fn take_src(&self) -> Option<BytesVec> {
540 self.curr.write.take().and_then(|b| {
541 if b.is_empty() {
542 self.io.memory_pool().release_write_buf(b);
543 None
544 } else {
545 Some(b)
546 }
547 })
548 }
549
550 #[inline]
551 pub fn set_src(&self, src: Option<BytesVec>) {
553 if let Some(src) = src {
554 if src.is_empty() {
555 self.io.memory_pool().release_write_buf(src);
556 } else if let Some(mut buf) = self.curr.write.take() {
557 buf.extend_from_slice(&src);
558 self.curr.write.set(Some(buf));
559 self.io.memory_pool().release_write_buf(src);
560 } else {
561 self.curr.write.set(Some(src));
562 }
563 }
564 }
565
566 #[inline]
567 pub fn take_dst(&self) -> BytesVec {
569 self.next
570 .write
571 .take()
572 .unwrap_or_else(|| self.io.memory_pool().get_write_buf())
573 }
574
575 #[inline]
576 pub fn set_dst(&self, dst: Option<BytesVec>) {
578 if let Some(dst) = dst {
579 if dst.is_empty() {
580 self.io.memory_pool().release_write_buf(dst);
581 } else {
582 self.need_write.set(true);
583
584 if let Some(mut buf) = self.next.write.take() {
585 buf.extend_from_slice(&dst);
586 self.next.write.set(Some(buf));
587 self.io.memory_pool().release_write_buf(dst);
588 } else {
589 self.next.write.set(Some(dst));
590 }
591 }
592 }
593 }
594
595 #[inline]
596 pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
597 where
598 F: FnOnce(&ReadBuf<'b>) -> R,
599 {
600 let mut buf = ReadBuf {
601 io: self.io,
602 curr: self.curr,
603 next: self.next,
604 nbytes: 0,
605 need_write: Cell::new(self.need_write.get()),
606 };
607 let result = f(&mut buf);
608 self.need_write.set(buf.need_write.get());
609 result
610 }
611}