1use std::{cell::Cell, fmt};
2
3use ntex_bytes::{BytesVec, PoolRef};
4use ntex_util::future::Either;
5
6use crate::IoRef;
7
8#[derive(Default)]
9pub(crate) struct Buffer(Cell<Option<BytesVec>>, Cell<Option<BytesVec>>);
10
11impl fmt::Debug for Buffer {
12 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13 let b0 = self.0.take();
14 let b1 = self.1.take();
15 let res = f
16 .debug_struct("Buffer")
17 .field("0", &b0)
18 .field("1", &b1)
19 .finish();
20 self.0.set(b0);
21 self.1.set(b1);
22 res
23 }
24}
25
26#[derive(Debug)]
27pub struct Stack {
28 len: usize,
29 buffers: Either<[Buffer; 3], Vec<Buffer>>,
30}
31
32impl Stack {
33 pub(crate) fn new() -> Self {
34 Self {
35 len: 1,
36 buffers: Either::Left(Default::default()),
37 }
38 }
39
40 pub(crate) fn add_layer(&mut self) {
41 match &mut self.buffers {
42 Either::Left(b) => {
43 if self.len == 3 {
45 let mut vec = vec![Buffer(Cell::new(None), Cell::new(None))];
46 for item in b.iter_mut().take(self.len) {
47 vec.push(Buffer(
48 Cell::new(item.0.take()),
49 Cell::new(item.1.take()),
50 ));
51 }
52 self.len += 1;
53 self.buffers = Either::Right(vec);
54 } else {
55 let mut idx = self.len;
56 while idx > 0 {
57 let item = Buffer(
58 Cell::new(b[idx - 1].0.take()),
59 Cell::new(b[idx - 1].1.take()),
60 );
61 b[idx] = item;
62 idx -= 1;
63 }
64 b[0] = Buffer(Cell::new(None), Cell::new(None));
65 self.len += 1;
66 }
67 }
68 Either::Right(vec) => {
69 self.len += 1;
70 vec.insert(0, Buffer(Cell::new(None), Cell::new(None)));
71 }
72 }
73 }
74
75 fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
76 where
77 F: FnOnce(&Buffer, &Buffer) -> R,
78 {
79 let buffers = match self.buffers {
80 Either::Left(ref b) => &b[..],
81 Either::Right(ref b) => &b[..],
82 };
83
84 let next = idx + 1;
85 if self.len > next {
86 f(&buffers[idx], &buffers[next])
87 } else {
88 let curr = Buffer(Cell::new(buffers[idx].0.take()), Cell::new(None));
89 let next = Buffer(Cell::new(None), Cell::new(buffers[idx].1.take()));
90
91 let result = f(&curr, &next);
92 buffers[idx].0.set(curr.0.take());
93 buffers[idx].1.set(next.1.take());
94 result
95 }
96 }
97
98 fn get_first_level(&self) -> &Buffer {
99 match &self.buffers {
100 Either::Left(b) => &b[0],
101 Either::Right(b) => &b[0],
102 }
103 }
104
105 fn get_last_level(&self) -> &Buffer {
106 match &self.buffers {
107 Either::Left(b) => &b[self.len - 1],
108 Either::Right(b) => &b[self.len - 1],
109 }
110 }
111
112 pub(crate) fn read_buf<F, R>(&self, io: &IoRef, idx: usize, nbytes: usize, f: F) -> R
113 where
114 F: FnOnce(&ReadBuf<'_>) -> R,
115 {
116 self.get_buffers(idx, |curr, next| {
117 let buf = ReadBuf {
118 io,
119 nbytes,
120 curr,
121 next,
122 need_write: Cell::new(false),
123 };
124 f(&buf)
125 })
126 }
127
128 pub(crate) fn write_buf<F, R>(&self, io: &IoRef, idx: usize, f: F) -> R
129 where
130 F: FnOnce(&WriteBuf<'_>) -> R,
131 {
132 self.get_buffers(idx, |curr, next| {
133 let buf = WriteBuf {
134 io,
135 curr,
136 next,
137 need_write: Cell::new(false),
138 };
139 f(&buf)
140 })
141 }
142
143 pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
144 self.get_last_level().0.take()
145 }
146
147 pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
148 if buf.is_empty() {
149 io.memory_pool().release_read_buf(buf);
150 } else {
151 self.get_last_level().0.set(Some(buf));
152 }
153 }
154
155 pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
156 where
157 F: FnOnce(&mut BytesVec) -> R,
158 {
159 let item = self.get_first_level();
160 let mut rb = item.0.take();
161 if rb.is_none() {
162 rb = Some(io.memory_pool().get_read_buf());
163 }
164
165 let result = f(rb.as_mut().unwrap());
166
167 if item.0.take().is_some() {
169 log::error!("Nested read io operation is detected");
170 io.force_close();
171 }
172
173 if let Some(b) = rb {
174 if b.is_empty() {
175 io.memory_pool().release_read_buf(b);
176 } else {
177 item.0.set(Some(b));
178 }
179 }
180 result
181 }
182
183 pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
184 where
185 F: FnOnce(&mut BytesVec) -> R,
186 {
187 let item = self.get_first_level();
188 let mut wb = item.1.take();
189 if wb.is_none() {
190 wb = Some(io.memory_pool().get_write_buf());
191 }
192
193 let result = f(wb.as_mut().unwrap());
194 if let Some(b) = wb {
195 if b.is_empty() {
196 io.memory_pool().release_write_buf(b);
197 } else {
198 item.1.set(Some(b));
199 }
200 }
201 result
202 }
203
204 pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
205 self.get_last_level().1.take()
206 }
207
208 pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
209 let b = self.get_last_level().1.take();
210 if b.is_some() {
211 self.get_last_level().1.set(b);
212 Some(buf)
213 } else {
214 self.get_last_level().1.set(Some(buf));
215 None
216 }
217 }
218
219 pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
220 where
221 F: FnOnce(Option<&mut BytesVec>) -> R,
222 {
223 let item = self.get_last_level();
224 let mut wb = item.1.take();
225
226 let result = f(wb.as_mut());
227
228 if item.1.take().is_some() {
230 log::error!("Nested write io operation is detected");
231 io.force_close();
232 }
233
234 if let Some(b) = wb {
235 if b.is_empty() {
236 io.memory_pool().release_write_buf(b);
237 } else {
238 item.1.set(Some(b));
239 }
240 }
241 result
242 }
243
244 pub(crate) fn read_destination_size(&self) -> usize {
245 let item = self.get_first_level();
246 let rb = item.0.take();
247 let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
248 item.0.set(rb);
249 size
250 }
251
252 pub(crate) fn write_destination_size(&self) -> usize {
253 let item = self.get_last_level();
254 let wb = item.1.take();
255 let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
256 item.1.set(wb);
257 size
258 }
259
260 pub(crate) fn release(&self, pool: PoolRef) {
261 let items = match &self.buffers {
262 Either::Left(b) => &b[..],
263 Either::Right(b) => &b[..],
264 };
265
266 for item in items {
267 if let Some(buf) = item.0.take() {
268 pool.release_read_buf(buf);
269 }
270 if let Some(buf) = item.1.take() {
271 pool.release_write_buf(buf);
272 }
273 }
274 }
275
276 pub(crate) fn set_memory_pool(&self, pool: PoolRef) {
277 let items = match &self.buffers {
278 Either::Left(b) => &b[..],
279 Either::Right(b) => &b[..],
280 };
281 for item in items {
282 if let Some(mut b) = item.0.take() {
283 pool.move_vec_in(&mut b);
284 item.0.set(Some(b));
285 }
286 if let Some(mut b) = item.1.take() {
287 pool.move_vec_in(&mut b);
288 item.1.set(Some(b));
289 }
290 }
291 }
292}
293
294#[derive(Debug)]
295pub struct ReadBuf<'a> {
296 pub(crate) io: &'a IoRef,
297 pub(crate) curr: &'a Buffer,
298 pub(crate) next: &'a Buffer,
299 pub(crate) nbytes: usize,
300 pub(crate) need_write: Cell<bool>,
301}
302
303impl ReadBuf<'_> {
304 #[inline]
305 pub fn tag(&self) -> &'static str {
307 self.io.tag()
308 }
309
310 #[inline]
311 pub fn nbytes(&self) -> usize {
313 self.nbytes
314 }
315
316 #[inline]
317 pub fn want_shutdown(&self) {
319 self.io.want_shutdown()
320 }
321
322 #[inline]
323 pub fn resize_buf(&self, buf: &mut BytesVec) {
325 self.io.memory_pool().resize_read_buf(buf);
326 }
327
328 #[inline]
329 pub fn with_src<F, R>(&self, f: F) -> R
331 where
332 F: FnOnce(&mut Option<BytesVec>) -> R,
333 {
334 let mut item = self.next.0.take();
335 let result = f(&mut item);
336
337 if let Some(b) = item {
338 if b.is_empty() {
339 self.io.memory_pool().release_read_buf(b);
340 } else {
341 self.next.0.set(Some(b));
342 }
343 }
344 result
345 }
346
347 #[inline]
348 pub fn with_dst<F, R>(&self, f: F) -> R
350 where
351 F: FnOnce(&mut BytesVec) -> R,
352 {
353 let mut item = self.curr.0.take();
354 if item.is_none() {
355 item = Some(self.io.memory_pool().get_read_buf());
356 }
357 let result = f(item.as_mut().unwrap());
358 if let Some(b) = item {
359 if b.is_empty() {
360 self.io.memory_pool().release_read_buf(b);
361 } else {
362 self.curr.0.set(Some(b));
363 }
364 }
365 result
366 }
367
368 #[inline]
369 pub fn take_src(&self) -> Option<BytesVec> {
371 self.next.0.take().and_then(|b| {
372 if b.is_empty() {
373 self.io.memory_pool().release_read_buf(b);
374 None
375 } else {
376 Some(b)
377 }
378 })
379 }
380
381 #[inline]
382 pub fn set_src(&self, src: Option<BytesVec>) {
384 if let Some(src) = src {
385 if src.is_empty() {
386 self.io.memory_pool().release_read_buf(src);
387 } else if let Some(mut buf) = self.next.0.take() {
388 buf.extend_from_slice(&src);
389 self.next.0.set(Some(buf));
390 self.io.memory_pool().release_read_buf(src);
391 } else {
392 self.next.0.set(Some(src));
393 }
394 }
395 }
396
397 #[inline]
398 pub fn take_dst(&self) -> BytesVec {
400 self.curr
401 .0
402 .take()
403 .unwrap_or_else(|| self.io.memory_pool().get_read_buf())
404 }
405
406 #[inline]
407 pub fn set_dst(&self, dst: Option<BytesVec>) {
409 if let Some(dst) = dst {
410 if dst.is_empty() {
411 self.io.memory_pool().release_read_buf(dst);
412 } else if let Some(mut buf) = self.curr.0.take() {
413 buf.extend_from_slice(&dst);
414 self.curr.0.set(Some(buf));
415 self.io.memory_pool().release_read_buf(dst);
416 } else {
417 self.curr.0.set(Some(dst));
418 }
419 }
420 }
421
422 #[inline]
423 pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
424 where
425 F: FnOnce(&WriteBuf<'b>) -> R,
426 {
427 let mut buf = WriteBuf {
428 io: self.io,
429 curr: self.curr,
430 next: self.next,
431 need_write: Cell::new(self.need_write.get()),
432 };
433 let result = f(&mut buf);
434 self.need_write.set(buf.need_write.get());
435 result
436 }
437}
438
439#[derive(Debug)]
440pub struct WriteBuf<'a> {
441 pub(crate) io: &'a IoRef,
442 pub(crate) curr: &'a Buffer,
443 pub(crate) next: &'a Buffer,
444 pub(crate) need_write: Cell<bool>,
445}
446
447impl WriteBuf<'_> {
448 #[inline]
449 pub fn tag(&self) -> &'static str {
451 self.io.tag()
452 }
453
454 #[inline]
455 pub fn want_shutdown(&self) {
457 self.io.want_shutdown()
458 }
459
460 #[inline]
461 pub fn resize_buf(&self, buf: &mut BytesVec) {
463 self.io.memory_pool().resize_write_buf(buf);
464 }
465
466 #[inline]
467 pub fn with_src<F, R>(&self, f: F) -> R
469 where
470 F: FnOnce(&mut Option<BytesVec>) -> R,
471 {
472 let mut item = self.curr.1.take();
473 let result = f(&mut item);
474 if let Some(b) = item {
475 if b.is_empty() {
476 self.io.memory_pool().release_write_buf(b);
477 } else {
478 self.curr.1.set(Some(b));
479 }
480 }
481 result
482 }
483
484 #[inline]
485 pub fn with_dst<F, R>(&self, f: F) -> R
487 where
488 F: FnOnce(&mut BytesVec) -> R,
489 {
490 let mut item = self.next.1.take();
491 if item.is_none() {
492 item = Some(self.io.memory_pool().get_write_buf());
493 }
494 let buf = item.as_mut().unwrap();
495 let total = buf.len();
496 let result = f(buf);
497
498 if buf.is_empty() {
499 self.io.memory_pool().release_write_buf(item.unwrap());
500 } else {
501 self.need_write
502 .set(self.need_write.get() | (total != buf.len()));
503 self.next.1.set(item);
504 }
505 result
506 }
507
508 #[inline]
509 pub fn take_src(&self) -> Option<BytesVec> {
511 self.curr.1.take().and_then(|b| {
512 if b.is_empty() {
513 self.io.memory_pool().release_write_buf(b);
514 None
515 } else {
516 Some(b)
517 }
518 })
519 }
520
521 #[inline]
522 pub fn set_src(&self, src: Option<BytesVec>) {
524 if let Some(src) = src {
525 if src.is_empty() {
526 self.io.memory_pool().release_write_buf(src);
527 } else if let Some(mut buf) = self.curr.1.take() {
528 buf.extend_from_slice(&src);
529 self.curr.1.set(Some(buf));
530 self.io.memory_pool().release_write_buf(src);
531 } else {
532 self.curr.1.set(Some(src));
533 }
534 }
535 }
536
537 #[inline]
538 pub fn take_dst(&self) -> BytesVec {
540 self.next
541 .1
542 .take()
543 .unwrap_or_else(|| self.io.memory_pool().get_write_buf())
544 }
545
546 #[inline]
547 pub fn set_dst(&self, dst: Option<BytesVec>) {
549 if let Some(dst) = dst {
550 if dst.is_empty() {
551 self.io.memory_pool().release_write_buf(dst);
552 } else {
553 self.need_write.set(true);
554
555 if let Some(mut buf) = self.next.1.take() {
556 buf.extend_from_slice(&dst);
557 self.next.1.set(Some(buf));
558 self.io.memory_pool().release_write_buf(dst);
559 } else {
560 self.next.1.set(Some(dst));
561 }
562 }
563 }
564 }
565
566 #[inline]
567 pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
568 where
569 F: FnOnce(&ReadBuf<'b>) -> R,
570 {
571 let mut buf = ReadBuf {
572 io: self.io,
573 curr: self.curr,
574 next: self.next,
575 nbytes: 0,
576 need_write: Cell::new(self.need_write.get()),
577 };
578 let result = f(&mut buf);
579 self.need_write.set(buf.need_write.get());
580 result
581 }
582}