1use std::{cell::Cell, fmt};
2
3use ntex_bytes::{BytesVec, PoolRef};
4use ntex_util::future::Either;
5
6use crate::IoRef;
7
8#[derive(Default)]
9pub(crate) struct Buffer(Cell<Option<BytesVec>>, Cell<Option<BytesVec>>);
10
11impl fmt::Debug for Buffer {
12 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13 let b0 = self.0.take();
14 let b1 = self.1.take();
15 let res = f
16 .debug_struct("Buffer")
17 .field("0", &b0)
18 .field("1", &b1)
19 .finish();
20 self.0.set(b0);
21 self.1.set(b1);
22 res
23 }
24}
25
26#[derive(Debug)]
27pub struct Stack {
28 len: usize,
29 buffers: Either<[Buffer; 3], Vec<Buffer>>,
30}
31
32impl Stack {
33 pub(crate) fn new() -> Self {
34 Self {
35 len: 1,
36 buffers: Either::Left(Default::default()),
37 }
38 }
39
40 pub(crate) fn add_layer(&mut self) {
41 match &mut self.buffers {
42 Either::Left(b) => {
43 if self.len == 3 {
45 let mut vec = vec![Buffer(Cell::new(None), Cell::new(None))];
46 for item in b.iter_mut().take(self.len) {
47 vec.push(Buffer(
48 Cell::new(item.0.take()),
49 Cell::new(item.1.take()),
50 ));
51 }
52 self.len += 1;
53 self.buffers = Either::Right(vec);
54 } else {
55 let mut idx = self.len;
56 while idx > 0 {
57 let item = Buffer(
58 Cell::new(b[idx - 1].0.take()),
59 Cell::new(b[idx - 1].1.take()),
60 );
61 b[idx] = item;
62 idx -= 1;
63 }
64 b[0] = Buffer(Cell::new(None), Cell::new(None));
65 self.len += 1;
66 }
67 }
68 Either::Right(vec) => {
69 self.len += 1;
70 vec.insert(0, Buffer(Cell::new(None), Cell::new(None)));
71 }
72 }
73 }
74
75 fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
76 where
77 F: FnOnce(&Buffer, &Buffer) -> R,
78 {
79 let buffers = match self.buffers {
80 Either::Left(ref b) => &b[..],
81 Either::Right(ref b) => &b[..],
82 };
83
84 let next = idx + 1;
85 if self.len > next {
86 f(&buffers[idx], &buffers[next])
87 } else {
88 let curr = Buffer(Cell::new(buffers[idx].0.take()), Cell::new(None));
89 let next = Buffer(Cell::new(None), Cell::new(buffers[idx].1.take()));
90
91 let result = f(&curr, &next);
92 buffers[idx].0.set(curr.0.take());
93 buffers[idx].1.set(next.1.take());
94 result
95 }
96 }
97
98 fn get_first_level(&self) -> &Buffer {
99 match &self.buffers {
100 Either::Left(b) => &b[0],
101 Either::Right(b) => &b[0],
102 }
103 }
104
105 fn get_last_level(&self) -> &Buffer {
106 match &self.buffers {
107 Either::Left(b) => &b[self.len - 1],
108 Either::Right(b) => &b[self.len - 1],
109 }
110 }
111
112 pub(crate) fn read_buf<F, R>(&self, io: &IoRef, idx: usize, nbytes: usize, f: F) -> R
113 where
114 F: FnOnce(&ReadBuf<'_>) -> R,
115 {
116 self.get_buffers(idx, |curr, next| {
117 let buf = ReadBuf {
118 io,
119 nbytes,
120 curr,
121 next,
122 need_write: Cell::new(false),
123 };
124 f(&buf)
125 })
126 }
127
128 pub(crate) fn write_buf<F, R>(&self, io: &IoRef, idx: usize, f: F) -> R
129 where
130 F: FnOnce(&WriteBuf<'_>) -> R,
131 {
132 self.get_buffers(idx, |curr, next| {
133 let buf = WriteBuf {
134 io,
135 curr,
136 next,
137 need_write: Cell::new(false),
138 };
139 f(&buf)
140 })
141 }
142
143 pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
144 self.get_last_level().0.take()
145 }
146
147 pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
148 if buf.is_empty() {
149 io.memory_pool().release_read_buf(buf);
150 } else {
151 self.get_last_level().0.set(Some(buf));
152 }
153 }
154
155 pub(crate) fn with_read_source<F, R>(&self, io: &IoRef, f: F) -> R
156 where
157 F: FnOnce(&mut BytesVec) -> R,
158 {
159 let item = self.get_last_level();
160 let mut rb = item.0.take();
161 if rb.is_none() {
162 rb = Some(io.memory_pool().get_read_buf());
163 }
164
165 let result = f(rb.as_mut().unwrap());
166 if let Some(b) = rb {
167 if b.is_empty() {
168 io.memory_pool().release_read_buf(b);
169 } else {
170 item.0.set(Some(b));
171 }
172 }
173 result
174 }
175
176 pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
177 where
178 F: FnOnce(&mut BytesVec) -> R,
179 {
180 let item = self.get_first_level();
181 let mut rb = item.0.take();
182 if rb.is_none() {
183 rb = Some(io.memory_pool().get_read_buf());
184 }
185
186 let result = f(rb.as_mut().unwrap());
187
188 if item.0.take().is_some() {
190 log::error!("Nested read io operation is detected");
191 io.force_close();
192 }
193
194 if let Some(b) = rb {
195 if b.is_empty() {
196 io.memory_pool().release_read_buf(b);
197 } else {
198 item.0.set(Some(b));
199 }
200 }
201 result
202 }
203
204 pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
205 where
206 F: FnOnce(&mut BytesVec) -> R,
207 {
208 let item = self.get_first_level();
209 let mut wb = item.1.take();
210 if wb.is_none() {
211 wb = Some(io.memory_pool().get_write_buf());
212 }
213
214 let result = f(wb.as_mut().unwrap());
215 if let Some(b) = wb {
216 if b.is_empty() {
217 io.memory_pool().release_write_buf(b);
218 } else {
219 item.1.set(Some(b));
220 }
221 }
222 result
223 }
224
225 pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
226 self.get_last_level().1.take()
227 }
228
229 pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
230 let b = self.get_last_level().1.take();
231 if b.is_some() {
232 self.get_last_level().1.set(b);
233 Some(buf)
234 } else {
235 self.get_last_level().1.set(Some(buf));
236 None
237 }
238 }
239
240 pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
241 where
242 F: FnOnce(Option<&mut BytesVec>) -> R,
243 {
244 let item = self.get_last_level();
245 let mut wb = item.1.take();
246
247 let result = f(wb.as_mut());
248
249 if item.1.take().is_some() {
251 log::error!("Nested write io operation is detected");
252 io.force_close();
253 }
254
255 if let Some(b) = wb {
256 if b.is_empty() {
257 io.memory_pool().release_write_buf(b);
258 } else {
259 item.1.set(Some(b));
260 }
261 }
262 result
263 }
264
265 pub(crate) fn read_destination_size(&self) -> usize {
266 let item = self.get_first_level();
267 let rb = item.0.take();
268 let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
269 item.0.set(rb);
270 size
271 }
272
273 pub(crate) fn write_destination_size(&self) -> usize {
274 let item = self.get_last_level();
275 let wb = item.1.take();
276 let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
277 item.1.set(wb);
278 size
279 }
280
281 pub(crate) fn release(&self, pool: PoolRef) {
282 let items = match &self.buffers {
283 Either::Left(b) => &b[..],
284 Either::Right(b) => &b[..],
285 };
286
287 for item in items {
288 if let Some(buf) = item.0.take() {
289 pool.release_read_buf(buf);
290 }
291 if let Some(buf) = item.1.take() {
292 pool.release_write_buf(buf);
293 }
294 }
295 }
296
297 pub(crate) fn set_memory_pool(&self, pool: PoolRef) {
298 let items = match &self.buffers {
299 Either::Left(b) => &b[..],
300 Either::Right(b) => &b[..],
301 };
302 for item in items {
303 if let Some(mut b) = item.0.take() {
304 pool.move_vec_in(&mut b);
305 item.0.set(Some(b));
306 }
307 if let Some(mut b) = item.1.take() {
308 pool.move_vec_in(&mut b);
309 item.1.set(Some(b));
310 }
311 }
312 }
313}
314
315#[derive(Debug)]
316pub struct ReadBuf<'a> {
317 pub(crate) io: &'a IoRef,
318 pub(crate) curr: &'a Buffer,
319 pub(crate) next: &'a Buffer,
320 pub(crate) nbytes: usize,
321 pub(crate) need_write: Cell<bool>,
322}
323
324impl ReadBuf<'_> {
325 #[inline]
326 pub fn tag(&self) -> &'static str {
328 self.io.tag()
329 }
330
331 #[inline]
332 pub fn nbytes(&self) -> usize {
334 self.nbytes
335 }
336
337 #[inline]
338 pub fn want_shutdown(&self) {
340 self.io.want_shutdown()
341 }
342
343 #[inline]
344 pub fn resize_buf(&self, buf: &mut BytesVec) {
346 self.io.memory_pool().resize_read_buf(buf);
347 }
348
349 #[inline]
350 pub fn with_src<F, R>(&self, f: F) -> R
352 where
353 F: FnOnce(&mut Option<BytesVec>) -> R,
354 {
355 let mut item = self.next.0.take();
356 let result = f(&mut item);
357
358 if let Some(b) = item {
359 if b.is_empty() {
360 self.io.memory_pool().release_read_buf(b);
361 } else {
362 self.next.0.set(Some(b));
363 }
364 }
365 result
366 }
367
368 #[inline]
369 pub fn with_dst<F, R>(&self, f: F) -> R
371 where
372 F: FnOnce(&mut BytesVec) -> R,
373 {
374 let mut item = self.curr.0.take();
375 if item.is_none() {
376 item = Some(self.io.memory_pool().get_read_buf());
377 }
378 let result = f(item.as_mut().unwrap());
379 if let Some(b) = item {
380 if b.is_empty() {
381 self.io.memory_pool().release_read_buf(b);
382 } else {
383 self.curr.0.set(Some(b));
384 }
385 }
386 result
387 }
388
389 #[inline]
390 pub fn take_src(&self) -> Option<BytesVec> {
392 self.next.0.take().and_then(|b| {
393 if b.is_empty() {
394 self.io.memory_pool().release_read_buf(b);
395 None
396 } else {
397 Some(b)
398 }
399 })
400 }
401
402 #[inline]
403 pub fn set_src(&self, src: Option<BytesVec>) {
405 if let Some(src) = src {
406 if src.is_empty() {
407 self.io.memory_pool().release_read_buf(src);
408 } else if let Some(mut buf) = self.next.0.take() {
409 buf.extend_from_slice(&src);
410 self.next.0.set(Some(buf));
411 self.io.memory_pool().release_read_buf(src);
412 } else {
413 self.next.0.set(Some(src));
414 }
415 }
416 }
417
418 #[inline]
419 pub fn take_dst(&self) -> BytesVec {
421 self.curr
422 .0
423 .take()
424 .unwrap_or_else(|| self.io.memory_pool().get_read_buf())
425 }
426
427 #[inline]
428 pub fn set_dst(&self, dst: Option<BytesVec>) {
430 if let Some(dst) = dst {
431 if dst.is_empty() {
432 self.io.memory_pool().release_read_buf(dst);
433 } else if let Some(mut buf) = self.curr.0.take() {
434 buf.extend_from_slice(&dst);
435 self.curr.0.set(Some(buf));
436 self.io.memory_pool().release_read_buf(dst);
437 } else {
438 self.curr.0.set(Some(dst));
439 }
440 }
441 }
442
443 #[inline]
444 pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
445 where
446 F: FnOnce(&WriteBuf<'b>) -> R,
447 {
448 let mut buf = WriteBuf {
449 io: self.io,
450 curr: self.curr,
451 next: self.next,
452 need_write: Cell::new(self.need_write.get()),
453 };
454 let result = f(&mut buf);
455 self.need_write.set(buf.need_write.get());
456 result
457 }
458}
459
460#[derive(Debug)]
461pub struct WriteBuf<'a> {
462 pub(crate) io: &'a IoRef,
463 pub(crate) curr: &'a Buffer,
464 pub(crate) next: &'a Buffer,
465 pub(crate) need_write: Cell<bool>,
466}
467
468impl WriteBuf<'_> {
469 #[inline]
470 pub fn tag(&self) -> &'static str {
472 self.io.tag()
473 }
474
475 #[inline]
476 pub fn want_shutdown(&self) {
478 self.io.want_shutdown()
479 }
480
481 #[inline]
482 pub fn resize_buf(&self, buf: &mut BytesVec) {
484 self.io.memory_pool().resize_write_buf(buf);
485 }
486
487 #[inline]
488 pub fn with_src<F, R>(&self, f: F) -> R
490 where
491 F: FnOnce(&mut Option<BytesVec>) -> R,
492 {
493 let mut item = self.curr.1.take();
494 let result = f(&mut item);
495 if let Some(b) = item {
496 if b.is_empty() {
497 self.io.memory_pool().release_write_buf(b);
498 } else {
499 self.curr.1.set(Some(b));
500 }
501 }
502 result
503 }
504
505 #[inline]
506 pub fn with_dst<F, R>(&self, f: F) -> R
508 where
509 F: FnOnce(&mut BytesVec) -> R,
510 {
511 let mut item = self.next.1.take();
512 if item.is_none() {
513 item = Some(self.io.memory_pool().get_write_buf());
514 }
515 let buf = item.as_mut().unwrap();
516 let total = buf.len();
517 let result = f(buf);
518
519 if buf.is_empty() {
520 self.io.memory_pool().release_write_buf(item.unwrap());
521 } else {
522 self.need_write
523 .set(self.need_write.get() | (total != buf.len()));
524 self.next.1.set(item);
525 }
526 result
527 }
528
529 #[inline]
530 pub fn take_src(&self) -> Option<BytesVec> {
532 self.curr.1.take().and_then(|b| {
533 if b.is_empty() {
534 self.io.memory_pool().release_write_buf(b);
535 None
536 } else {
537 Some(b)
538 }
539 })
540 }
541
542 #[inline]
543 pub fn set_src(&self, src: Option<BytesVec>) {
545 if let Some(src) = src {
546 if src.is_empty() {
547 self.io.memory_pool().release_write_buf(src);
548 } else if let Some(mut buf) = self.curr.1.take() {
549 buf.extend_from_slice(&src);
550 self.curr.1.set(Some(buf));
551 self.io.memory_pool().release_write_buf(src);
552 } else {
553 self.curr.1.set(Some(src));
554 }
555 }
556 }
557
558 #[inline]
559 pub fn take_dst(&self) -> BytesVec {
561 self.next
562 .1
563 .take()
564 .unwrap_or_else(|| self.io.memory_pool().get_write_buf())
565 }
566
567 #[inline]
568 pub fn set_dst(&self, dst: Option<BytesVec>) {
570 if let Some(dst) = dst {
571 if dst.is_empty() {
572 self.io.memory_pool().release_write_buf(dst);
573 } else {
574 self.need_write.set(true);
575
576 if let Some(mut buf) = self.next.1.take() {
577 buf.extend_from_slice(&dst);
578 self.next.1.set(Some(buf));
579 self.io.memory_pool().release_write_buf(dst);
580 } else {
581 self.next.1.set(Some(dst));
582 }
583 }
584 }
585 }
586
587 #[inline]
588 pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
589 where
590 F: FnOnce(&ReadBuf<'b>) -> R,
591 {
592 let mut buf = ReadBuf {
593 io: self.io,
594 curr: self.curr,
595 next: self.next,
596 nbytes: 0,
597 need_write: Cell::new(self.need_write.get()),
598 };
599 let result = f(&mut buf);
600 self.need_write.set(buf.need_write.get());
601 result
602 }
603}