ntex_io/
buf.rs

1use std::{cell::Cell, fmt};
2
3use ntex_bytes::{BytesVec, PoolRef};
4use ntex_util::future::Either;
5
6use crate::IoRef;
7
8#[derive(Default)]
9pub(crate) struct Buffer(Cell<Option<BytesVec>>, Cell<Option<BytesVec>>);
10
11impl fmt::Debug for Buffer {
12    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13        let b0 = self.0.take();
14        let b1 = self.1.take();
15        let res = f
16            .debug_struct("Buffer")
17            .field("0", &b0)
18            .field("1", &b1)
19            .finish();
20        self.0.set(b0);
21        self.1.set(b1);
22        res
23    }
24}
25
26#[derive(Debug)]
27pub struct Stack {
28    len: usize,
29    buffers: Either<[Buffer; 3], Vec<Buffer>>,
30}
31
32impl Stack {
33    pub(crate) fn new() -> Self {
34        Self {
35            len: 1,
36            buffers: Either::Left(Default::default()),
37        }
38    }
39
40    pub(crate) fn add_layer(&mut self) {
41        match &mut self.buffers {
42            Either::Left(b) => {
43                // move to vec
44                if self.len == 3 {
45                    let mut vec = vec![Buffer(Cell::new(None), Cell::new(None))];
46                    for item in b.iter_mut().take(self.len) {
47                        vec.push(Buffer(
48                            Cell::new(item.0.take()),
49                            Cell::new(item.1.take()),
50                        ));
51                    }
52                    self.len += 1;
53                    self.buffers = Either::Right(vec);
54                } else {
55                    let mut idx = self.len;
56                    while idx > 0 {
57                        let item = Buffer(
58                            Cell::new(b[idx - 1].0.take()),
59                            Cell::new(b[idx - 1].1.take()),
60                        );
61                        b[idx] = item;
62                        idx -= 1;
63                    }
64                    b[0] = Buffer(Cell::new(None), Cell::new(None));
65                    self.len += 1;
66                }
67            }
68            Either::Right(vec) => {
69                self.len += 1;
70                vec.insert(0, Buffer(Cell::new(None), Cell::new(None)));
71            }
72        }
73    }
74
75    fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
76    where
77        F: FnOnce(&Buffer, &Buffer) -> R,
78    {
79        let buffers = match self.buffers {
80            Either::Left(ref b) => &b[..],
81            Either::Right(ref b) => &b[..],
82        };
83
84        let next = idx + 1;
85        if self.len > next {
86            f(&buffers[idx], &buffers[next])
87        } else {
88            let curr = Buffer(Cell::new(buffers[idx].0.take()), Cell::new(None));
89            let next = Buffer(Cell::new(None), Cell::new(buffers[idx].1.take()));
90
91            let result = f(&curr, &next);
92            buffers[idx].0.set(curr.0.take());
93            buffers[idx].1.set(next.1.take());
94            result
95        }
96    }
97
98    fn get_first_level(&self) -> &Buffer {
99        match &self.buffers {
100            Either::Left(b) => &b[0],
101            Either::Right(b) => &b[0],
102        }
103    }
104
105    fn get_last_level(&self) -> &Buffer {
106        match &self.buffers {
107            Either::Left(b) => &b[self.len - 1],
108            Either::Right(b) => &b[self.len - 1],
109        }
110    }
111
112    pub(crate) fn read_buf<F, R>(&self, io: &IoRef, idx: usize, nbytes: usize, f: F) -> R
113    where
114        F: FnOnce(&ReadBuf<'_>) -> R,
115    {
116        self.get_buffers(idx, |curr, next| {
117            let buf = ReadBuf {
118                io,
119                nbytes,
120                curr,
121                next,
122                need_write: Cell::new(false),
123            };
124            f(&buf)
125        })
126    }
127
128    pub(crate) fn write_buf<F, R>(&self, io: &IoRef, idx: usize, f: F) -> R
129    where
130        F: FnOnce(&WriteBuf<'_>) -> R,
131    {
132        self.get_buffers(idx, |curr, next| {
133            let buf = WriteBuf {
134                io,
135                curr,
136                next,
137                need_write: Cell::new(false),
138            };
139            f(&buf)
140        })
141    }
142
143    pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
144        self.get_last_level().0.take()
145    }
146
147    pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
148        if buf.is_empty() {
149            io.memory_pool().release_read_buf(buf);
150        } else {
151            self.get_last_level().0.set(Some(buf));
152        }
153    }
154
155    pub(crate) fn with_read_source<F, R>(&self, io: &IoRef, f: F) -> R
156    where
157        F: FnOnce(&mut BytesVec) -> R,
158    {
159        let item = self.get_last_level();
160        let mut rb = item.0.take();
161        if rb.is_none() {
162            rb = Some(io.memory_pool().get_read_buf());
163        }
164
165        let result = f(rb.as_mut().unwrap());
166        if let Some(b) = rb {
167            if b.is_empty() {
168                io.memory_pool().release_read_buf(b);
169            } else {
170                item.0.set(Some(b));
171            }
172        }
173        result
174    }
175
176    pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
177    where
178        F: FnOnce(&mut BytesVec) -> R,
179    {
180        let item = self.get_first_level();
181        let mut rb = item.0.take();
182        if rb.is_none() {
183            rb = Some(io.memory_pool().get_read_buf());
184        }
185
186        let result = f(rb.as_mut().unwrap());
187
188        // check nested updates
189        if item.0.take().is_some() {
190            log::error!("Nested read io operation is detected");
191            io.force_close();
192        }
193
194        if let Some(b) = rb {
195            if b.is_empty() {
196                io.memory_pool().release_read_buf(b);
197            } else {
198                item.0.set(Some(b));
199            }
200        }
201        result
202    }
203
204    pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
205    where
206        F: FnOnce(&mut BytesVec) -> R,
207    {
208        let item = self.get_first_level();
209        let mut wb = item.1.take();
210        if wb.is_none() {
211            wb = Some(io.memory_pool().get_write_buf());
212        }
213
214        let result = f(wb.as_mut().unwrap());
215        if let Some(b) = wb {
216            if b.is_empty() {
217                io.memory_pool().release_write_buf(b);
218            } else {
219                item.1.set(Some(b));
220            }
221        }
222        result
223    }
224
225    pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
226        self.get_last_level().1.take()
227    }
228
229    pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
230        let b = self.get_last_level().1.take();
231        if b.is_some() {
232            self.get_last_level().1.set(b);
233            Some(buf)
234        } else {
235            self.get_last_level().1.set(Some(buf));
236            None
237        }
238    }
239
240    pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
241    where
242        F: FnOnce(Option<&mut BytesVec>) -> R,
243    {
244        let item = self.get_last_level();
245        let mut wb = item.1.take();
246
247        let result = f(wb.as_mut());
248
249        // check nested updates
250        if item.1.take().is_some() {
251            log::error!("Nested write io operation is detected");
252            io.force_close();
253        }
254
255        if let Some(b) = wb {
256            if b.is_empty() {
257                io.memory_pool().release_write_buf(b);
258            } else {
259                item.1.set(Some(b));
260            }
261        }
262        result
263    }
264
265    pub(crate) fn read_destination_size(&self) -> usize {
266        let item = self.get_first_level();
267        let rb = item.0.take();
268        let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
269        item.0.set(rb);
270        size
271    }
272
273    pub(crate) fn write_destination_size(&self) -> usize {
274        let item = self.get_last_level();
275        let wb = item.1.take();
276        let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
277        item.1.set(wb);
278        size
279    }
280
281    pub(crate) fn release(&self, pool: PoolRef) {
282        let items = match &self.buffers {
283            Either::Left(b) => &b[..],
284            Either::Right(b) => &b[..],
285        };
286
287        for item in items {
288            if let Some(buf) = item.0.take() {
289                pool.release_read_buf(buf);
290            }
291            if let Some(buf) = item.1.take() {
292                pool.release_write_buf(buf);
293            }
294        }
295    }
296
297    pub(crate) fn set_memory_pool(&self, pool: PoolRef) {
298        let items = match &self.buffers {
299            Either::Left(b) => &b[..],
300            Either::Right(b) => &b[..],
301        };
302        for item in items {
303            if let Some(mut b) = item.0.take() {
304                pool.move_vec_in(&mut b);
305                item.0.set(Some(b));
306            }
307            if let Some(mut b) = item.1.take() {
308                pool.move_vec_in(&mut b);
309                item.1.set(Some(b));
310            }
311        }
312    }
313}
314
315#[derive(Debug)]
316pub struct ReadBuf<'a> {
317    pub(crate) io: &'a IoRef,
318    pub(crate) curr: &'a Buffer,
319    pub(crate) next: &'a Buffer,
320    pub(crate) nbytes: usize,
321    pub(crate) need_write: Cell<bool>,
322}
323
324impl ReadBuf<'_> {
325    #[inline]
326    /// Get io tag
327    pub fn tag(&self) -> &'static str {
328        self.io.tag()
329    }
330
331    #[inline]
332    /// Get number of newly added bytes
333    pub fn nbytes(&self) -> usize {
334        self.nbytes
335    }
336
337    #[inline]
338    /// Initiate graceful io stream shutdown
339    pub fn want_shutdown(&self) {
340        self.io.want_shutdown()
341    }
342
343    #[inline]
344    /// Make sure buffer has enough free space
345    pub fn resize_buf(&self, buf: &mut BytesVec) {
346        self.io.memory_pool().resize_read_buf(buf);
347    }
348
349    #[inline]
350    /// Get reference to source read buffer
351    pub fn with_src<F, R>(&self, f: F) -> R
352    where
353        F: FnOnce(&mut Option<BytesVec>) -> R,
354    {
355        let mut item = self.next.0.take();
356        let result = f(&mut item);
357
358        if let Some(b) = item {
359            if b.is_empty() {
360                self.io.memory_pool().release_read_buf(b);
361            } else {
362                self.next.0.set(Some(b));
363            }
364        }
365        result
366    }
367
368    #[inline]
369    /// Get reference to destination read buffer
370    pub fn with_dst<F, R>(&self, f: F) -> R
371    where
372        F: FnOnce(&mut BytesVec) -> R,
373    {
374        let mut item = self.curr.0.take();
375        if item.is_none() {
376            item = Some(self.io.memory_pool().get_read_buf());
377        }
378        let result = f(item.as_mut().unwrap());
379        if let Some(b) = item {
380            if b.is_empty() {
381                self.io.memory_pool().release_read_buf(b);
382            } else {
383                self.curr.0.set(Some(b));
384            }
385        }
386        result
387    }
388
389    #[inline]
390    /// Take source read buffer
391    pub fn take_src(&self) -> Option<BytesVec> {
392        self.next.0.take().and_then(|b| {
393            if b.is_empty() {
394                self.io.memory_pool().release_read_buf(b);
395                None
396            } else {
397                Some(b)
398            }
399        })
400    }
401
402    #[inline]
403    /// Set source read buffer
404    pub fn set_src(&self, src: Option<BytesVec>) {
405        if let Some(src) = src {
406            if src.is_empty() {
407                self.io.memory_pool().release_read_buf(src);
408            } else if let Some(mut buf) = self.next.0.take() {
409                buf.extend_from_slice(&src);
410                self.next.0.set(Some(buf));
411                self.io.memory_pool().release_read_buf(src);
412            } else {
413                self.next.0.set(Some(src));
414            }
415        }
416    }
417
418    #[inline]
419    /// Take destination read buffer
420    pub fn take_dst(&self) -> BytesVec {
421        self.curr
422            .0
423            .take()
424            .unwrap_or_else(|| self.io.memory_pool().get_read_buf())
425    }
426
427    #[inline]
428    /// Set destination read buffer
429    pub fn set_dst(&self, dst: Option<BytesVec>) {
430        if let Some(dst) = dst {
431            if dst.is_empty() {
432                self.io.memory_pool().release_read_buf(dst);
433            } else if let Some(mut buf) = self.curr.0.take() {
434                buf.extend_from_slice(&dst);
435                self.curr.0.set(Some(buf));
436                self.io.memory_pool().release_read_buf(dst);
437            } else {
438                self.curr.0.set(Some(dst));
439            }
440        }
441    }
442
443    #[inline]
444    pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
445    where
446        F: FnOnce(&WriteBuf<'b>) -> R,
447    {
448        let mut buf = WriteBuf {
449            io: self.io,
450            curr: self.curr,
451            next: self.next,
452            need_write: Cell::new(self.need_write.get()),
453        };
454        let result = f(&mut buf);
455        self.need_write.set(buf.need_write.get());
456        result
457    }
458}
459
460#[derive(Debug)]
461pub struct WriteBuf<'a> {
462    pub(crate) io: &'a IoRef,
463    pub(crate) curr: &'a Buffer,
464    pub(crate) next: &'a Buffer,
465    pub(crate) need_write: Cell<bool>,
466}
467
468impl WriteBuf<'_> {
469    #[inline]
470    /// Get io tag
471    pub fn tag(&self) -> &'static str {
472        self.io.tag()
473    }
474
475    #[inline]
476    /// Initiate graceful io stream shutdown
477    pub fn want_shutdown(&self) {
478        self.io.want_shutdown()
479    }
480
481    #[inline]
482    /// Make sure buffer has enough free space
483    pub fn resize_buf(&self, buf: &mut BytesVec) {
484        self.io.memory_pool().resize_write_buf(buf);
485    }
486
487    #[inline]
488    /// Get reference to source write buffer
489    pub fn with_src<F, R>(&self, f: F) -> R
490    where
491        F: FnOnce(&mut Option<BytesVec>) -> R,
492    {
493        let mut item = self.curr.1.take();
494        let result = f(&mut item);
495        if let Some(b) = item {
496            if b.is_empty() {
497                self.io.memory_pool().release_write_buf(b);
498            } else {
499                self.curr.1.set(Some(b));
500            }
501        }
502        result
503    }
504
505    #[inline]
506    /// Get reference to destination write buffer
507    pub fn with_dst<F, R>(&self, f: F) -> R
508    where
509        F: FnOnce(&mut BytesVec) -> R,
510    {
511        let mut item = self.next.1.take();
512        if item.is_none() {
513            item = Some(self.io.memory_pool().get_write_buf());
514        }
515        let buf = item.as_mut().unwrap();
516        let total = buf.len();
517        let result = f(buf);
518
519        if buf.is_empty() {
520            self.io.memory_pool().release_write_buf(item.unwrap());
521        } else {
522            self.need_write
523                .set(self.need_write.get() | (total != buf.len()));
524            self.next.1.set(item);
525        }
526        result
527    }
528
529    #[inline]
530    /// Take source write buffer
531    pub fn take_src(&self) -> Option<BytesVec> {
532        self.curr.1.take().and_then(|b| {
533            if b.is_empty() {
534                self.io.memory_pool().release_write_buf(b);
535                None
536            } else {
537                Some(b)
538            }
539        })
540    }
541
542    #[inline]
543    /// Set source write buffer
544    pub fn set_src(&self, src: Option<BytesVec>) {
545        if let Some(src) = src {
546            if src.is_empty() {
547                self.io.memory_pool().release_write_buf(src);
548            } else if let Some(mut buf) = self.curr.1.take() {
549                buf.extend_from_slice(&src);
550                self.curr.1.set(Some(buf));
551                self.io.memory_pool().release_write_buf(src);
552            } else {
553                self.curr.1.set(Some(src));
554            }
555        }
556    }
557
558    #[inline]
559    /// Take destination write buffer
560    pub fn take_dst(&self) -> BytesVec {
561        self.next
562            .1
563            .take()
564            .unwrap_or_else(|| self.io.memory_pool().get_write_buf())
565    }
566
567    #[inline]
568    /// Set destination write buffer
569    pub fn set_dst(&self, dst: Option<BytesVec>) {
570        if let Some(dst) = dst {
571            if dst.is_empty() {
572                self.io.memory_pool().release_write_buf(dst);
573            } else {
574                self.need_write.set(true);
575
576                if let Some(mut buf) = self.next.1.take() {
577                    buf.extend_from_slice(&dst);
578                    self.next.1.set(Some(buf));
579                    self.io.memory_pool().release_write_buf(dst);
580                } else {
581                    self.next.1.set(Some(dst));
582                }
583            }
584        }
585    }
586
587    #[inline]
588    pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
589    where
590        F: FnOnce(&ReadBuf<'b>) -> R,
591    {
592        let mut buf = ReadBuf {
593            io: self.io,
594            curr: self.curr,
595            next: self.next,
596            nbytes: 0,
597            need_write: Cell::new(self.need_write.get()),
598        };
599        let result = f(&mut buf);
600        self.need_write.set(buf.need_write.get());
601        result
602    }
603}