ntex_io/
buf.rs

1use std::{cell::Cell, fmt};
2
3use ntex_bytes::{BytesVec, PoolRef};
4use ntex_util::future::Either;
5
6use crate::IoRef;
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10    read: Cell<Option<BytesVec>>,
11    write: Cell<Option<BytesVec>>,
12}
13
14impl fmt::Debug for Buffer {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        let b0 = self.read.take();
17        let b1 = self.write.take();
18        let res = f
19            .debug_struct("Buffer")
20            .field("read", &b0)
21            .field("write", &b1)
22            .finish();
23        self.read.set(b0);
24        self.write.set(b1);
25        res
26    }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33    len: usize,
34    buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38    pub(crate) fn new() -> Self {
39        Self {
40            len: 1,
41            buffers: Either::Left(Default::default()),
42        }
43    }
44
45    pub(crate) fn add_layer(&mut self) {
46        match &mut self.buffers {
47            Either::Left(b) => {
48                // move to vec
49                if self.len == INLINE_SIZE {
50                    let mut vec = vec![Buffer {
51                        read: Cell::new(None),
52                        write: Cell::new(None),
53                    }];
54                    for item in b.iter_mut().take(self.len) {
55                        vec.push(Buffer {
56                            read: Cell::new(item.read.take()),
57                            write: Cell::new(item.write.take()),
58                        });
59                    }
60                    self.len += 1;
61                    self.buffers = Either::Right(vec);
62                } else {
63                    let mut idx = self.len;
64                    while idx > 0 {
65                        let item = Buffer {
66                            read: Cell::new(b[idx - 1].read.take()),
67                            write: Cell::new(b[idx - 1].write.take()),
68                        };
69                        b[idx] = item;
70                        idx -= 1;
71                    }
72                    b[0] = Buffer {
73                        read: Cell::new(None),
74                        write: Cell::new(None),
75                    };
76                    self.len += 1;
77                }
78            }
79            Either::Right(vec) => {
80                self.len += 1;
81                vec.insert(
82                    0,
83                    Buffer {
84                        read: Cell::new(None),
85                        write: Cell::new(None),
86                    },
87                );
88            }
89        }
90    }
91
92    fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93    where
94        F: FnOnce(&Buffer, &Buffer) -> R,
95    {
96        let buffers = match self.buffers {
97            Either::Left(ref b) => &b[..],
98            Either::Right(ref b) => &b[..],
99        };
100
101        let next = idx + 1;
102        if self.len > next {
103            f(&buffers[idx], &buffers[next])
104        } else {
105            f(&buffers[idx], &Buffer::default())
106        }
107    }
108
109    fn get_first_level(&self) -> &Buffer {
110        match &self.buffers {
111            Either::Left(b) => &b[0],
112            Either::Right(b) => &b[0],
113        }
114    }
115
116    fn get_last_level(&self) -> &Buffer {
117        match &self.buffers {
118            Either::Left(b) => &b[self.len - 1],
119            Either::Right(b) => &b[self.len - 1],
120        }
121    }
122
123    pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
124        self.get_last_level().read.take()
125    }
126
127    pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
128        if buf.is_empty() {
129            io.memory_pool().release_read_buf(buf);
130        } else {
131            self.get_last_level().read.set(Some(buf));
132        }
133    }
134
135    pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136    where
137        F: FnOnce(&mut BytesVec) -> R,
138    {
139        let item = self.get_first_level();
140        let mut rb = item
141            .read
142            .take()
143            .unwrap_or_else(|| io.memory_pool().get_read_buf());
144
145        let result = f(&mut rb);
146
147        // check nested updates
148        if item.read.take().is_some() {
149            log::error!("Nested read io operation is detected");
150            io.force_close();
151        }
152
153        if rb.is_empty() {
154            io.memory_pool().release_read_buf(rb);
155        } else {
156            item.read.set(Some(rb));
157        }
158        result
159    }
160
161    pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
162        self.get_last_level().write.take()
163    }
164
165    pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
166        let b = self.get_last_level().write.take();
167        if b.is_some() {
168            self.get_last_level().write.set(b);
169            Some(buf)
170        } else {
171            self.get_last_level().write.set(Some(buf));
172            None
173        }
174    }
175
176    pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177    where
178        F: FnOnce(&mut BytesVec) -> R,
179    {
180        let item = self.get_first_level();
181        let mut wb = item
182            .write
183            .take()
184            .unwrap_or_else(|| io.memory_pool().get_write_buf());
185
186        let result = f(&mut wb);
187        if wb.is_empty() {
188            io.memory_pool().release_write_buf(wb);
189        } else {
190            item.write.set(Some(wb));
191        }
192        result
193    }
194
195    pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196    where
197        F: FnOnce(Option<&mut BytesVec>) -> R,
198    {
199        let item = self.get_last_level();
200        let mut wb = item.write.take();
201
202        let result = f(wb.as_mut());
203
204        // check nested updates
205        if item.write.take().is_some() {
206            log::error!("Nested write io operation is detected");
207            io.force_close();
208        }
209
210        if let Some(b) = wb {
211            if b.is_empty() {
212                io.memory_pool().release_write_buf(b);
213            } else {
214                item.write.set(Some(b));
215            }
216        }
217        result
218    }
219
220    pub(crate) fn read_destination_size(&self) -> usize {
221        let item = self.get_first_level();
222        let rb = item.read.take();
223        let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
224        item.read.set(rb);
225        size
226    }
227
228    pub(crate) fn write_destination_size(&self) -> usize {
229        let item = self.get_last_level();
230        let wb = item.write.take();
231        let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
232        item.write.set(wb);
233        size
234    }
235
236    pub(crate) fn release(&self, pool: PoolRef) {
237        let items = match &self.buffers {
238            Either::Left(b) => &b[..],
239            Either::Right(b) => &b[..],
240        };
241
242        for item in items {
243            if let Some(buf) = item.read.take() {
244                pool.release_read_buf(buf);
245            }
246            if let Some(buf) = item.write.take() {
247                pool.release_write_buf(buf);
248            }
249        }
250    }
251
252    pub(crate) fn set_memory_pool(&self, pool: PoolRef) {
253        let items = match &self.buffers {
254            Either::Left(b) => &b[..],
255            Either::Right(b) => &b[..],
256        };
257        for item in items {
258            if let Some(mut b) = item.read.take() {
259                pool.move_vec_in(&mut b);
260                item.read.set(Some(b));
261            }
262            if let Some(mut b) = item.write.take() {
263                pool.move_vec_in(&mut b);
264                item.write.set(Some(b));
265            }
266        }
267    }
268}
269
270#[derive(Copy, Clone, Debug)]
271pub struct FilterCtx<'a> {
272    pub(crate) io: &'a IoRef,
273    pub(crate) stack: &'a Stack,
274    pub(crate) idx: usize,
275}
276
277impl<'a> FilterCtx<'a> {
278    pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
279        Self { io, stack, idx: 0 }
280    }
281
282    #[inline]
283    pub fn tag(&self) -> &'static str {
284        self.io.tag()
285    }
286
287    #[inline]
288    pub fn next(&self) -> Self {
289        Self {
290            io: self.io,
291            stack: self.stack,
292            idx: self.idx + 1,
293        }
294    }
295
296    #[inline]
297    pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
298    where
299        F: FnOnce(&ReadBuf<'_>) -> R,
300    {
301        self.stack.get_buffers(self.idx, |curr, next| {
302            let buf = ReadBuf {
303                nbytes,
304                curr,
305                next,
306                io: self.io,
307                need_write: Cell::new(false),
308            };
309            f(&buf)
310        })
311    }
312
313    #[inline]
314    pub fn write_buf<F, R>(&self, f: F) -> R
315    where
316        F: FnOnce(&WriteBuf<'_>) -> R,
317    {
318        self.stack.get_buffers(self.idx, |curr, next| {
319            let buf = WriteBuf {
320                curr,
321                next,
322                io: self.io,
323                need_write: Cell::new(false),
324            };
325            f(&buf)
326        })
327    }
328}
329
330#[derive(Debug)]
331pub struct ReadBuf<'a> {
332    pub(crate) io: &'a IoRef,
333    pub(crate) curr: &'a Buffer,
334    pub(crate) next: &'a Buffer,
335    pub(crate) nbytes: usize,
336    pub(crate) need_write: Cell<bool>,
337}
338
339impl ReadBuf<'_> {
340    #[inline]
341    /// Get io tag
342    pub fn tag(&self) -> &'static str {
343        self.io.tag()
344    }
345
346    #[inline]
347    /// Get number of newly added bytes
348    pub fn nbytes(&self) -> usize {
349        self.nbytes
350    }
351
352    #[inline]
353    /// Initiate graceful io stream shutdown
354    pub fn want_shutdown(&self) {
355        self.io.want_shutdown()
356    }
357
358    #[inline]
359    /// Make sure buffer has enough free space
360    pub fn resize_buf(&self, buf: &mut BytesVec) {
361        self.io.memory_pool().resize_read_buf(buf);
362    }
363
364    #[inline]
365    /// Get reference to source read buffer
366    pub fn with_src<F, R>(&self, f: F) -> R
367    where
368        F: FnOnce(&mut Option<BytesVec>) -> R,
369    {
370        let mut buf = self.next.read.take();
371        let result = f(&mut buf);
372
373        if let Some(b) = buf {
374            if b.is_empty() {
375                self.io.memory_pool().release_read_buf(b);
376            } else {
377                self.next.read.set(Some(b));
378            }
379        }
380        result
381    }
382
383    #[inline]
384    /// Get reference to destination read buffer
385    pub fn with_dst<F, R>(&self, f: F) -> R
386    where
387        F: FnOnce(&mut BytesVec) -> R,
388    {
389        let mut rb = self
390            .curr
391            .read
392            .take()
393            .unwrap_or_else(|| self.io.memory_pool().get_read_buf());
394
395        let result = f(&mut rb);
396        if rb.is_empty() {
397            self.io.memory_pool().release_read_buf(rb);
398        } else {
399            self.curr.read.set(Some(rb));
400        }
401        result
402    }
403
404    #[inline]
405    /// Take source read buffer
406    pub fn take_src(&self) -> Option<BytesVec> {
407        self.next.read.take().and_then(|b| {
408            if b.is_empty() {
409                self.io.memory_pool().release_read_buf(b);
410                None
411            } else {
412                Some(b)
413            }
414        })
415    }
416
417    #[inline]
418    /// Set source read buffer
419    pub fn set_src(&self, src: Option<BytesVec>) {
420        if let Some(src) = src {
421            if src.is_empty() {
422                self.io.memory_pool().release_read_buf(src);
423            } else if let Some(mut buf) = self.next.read.take() {
424                buf.extend_from_slice(&src);
425                self.next.read.set(Some(buf));
426                self.io.memory_pool().release_read_buf(src);
427            } else {
428                self.next.read.set(Some(src));
429            }
430        }
431    }
432
433    #[inline]
434    /// Take destination read buffer
435    pub fn take_dst(&self) -> BytesVec {
436        self.curr
437            .read
438            .take()
439            .unwrap_or_else(|| self.io.memory_pool().get_read_buf())
440    }
441
442    #[inline]
443    /// Set destination read buffer
444    pub fn set_dst(&self, dst: Option<BytesVec>) {
445        if let Some(dst) = dst {
446            if dst.is_empty() {
447                self.io.memory_pool().release_read_buf(dst);
448            } else if let Some(mut buf) = self.curr.read.take() {
449                buf.extend_from_slice(&dst);
450                self.curr.read.set(Some(buf));
451                self.io.memory_pool().release_read_buf(dst);
452            } else {
453                self.curr.read.set(Some(dst));
454            }
455        }
456    }
457
458    #[inline]
459    pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
460    where
461        F: FnOnce(&WriteBuf<'b>) -> R,
462    {
463        let mut buf = WriteBuf {
464            io: self.io,
465            curr: self.curr,
466            next: self.next,
467            need_write: Cell::new(self.need_write.get()),
468        };
469        let result = f(&mut buf);
470        self.need_write.set(buf.need_write.get());
471        result
472    }
473}
474
475#[derive(Debug)]
476pub struct WriteBuf<'a> {
477    pub(crate) io: &'a IoRef,
478    pub(crate) curr: &'a Buffer,
479    pub(crate) next: &'a Buffer,
480    pub(crate) need_write: Cell<bool>,
481}
482
483impl WriteBuf<'_> {
484    #[inline]
485    /// Get io tag
486    pub fn tag(&self) -> &'static str {
487        self.io.tag()
488    }
489
490    #[inline]
491    /// Initiate graceful io stream shutdown
492    pub fn want_shutdown(&self) {
493        self.io.want_shutdown()
494    }
495
496    #[inline]
497    /// Make sure buffer has enough free space
498    pub fn resize_buf(&self, buf: &mut BytesVec) {
499        self.io.memory_pool().resize_write_buf(buf);
500    }
501
502    #[inline]
503    /// Get reference to source write buffer
504    pub fn with_src<F, R>(&self, f: F) -> R
505    where
506        F: FnOnce(&mut Option<BytesVec>) -> R,
507    {
508        let mut wb = self.curr.write.take();
509        let result = f(&mut wb);
510        if let Some(b) = wb {
511            if b.is_empty() {
512                self.io.memory_pool().release_write_buf(b);
513            } else {
514                self.curr.write.set(Some(b));
515            }
516        }
517        result
518    }
519
520    #[inline]
521    /// Get reference to destination write buffer
522    pub fn with_dst<F, R>(&self, f: F) -> R
523    where
524        F: FnOnce(&mut BytesVec) -> R,
525    {
526        let mut wb = self
527            .next
528            .write
529            .take()
530            .unwrap_or_else(|| self.io.memory_pool().get_write_buf());
531
532        let total = wb.len();
533        let result = f(&mut wb);
534
535        if wb.is_empty() {
536            self.io.memory_pool().release_write_buf(wb);
537        } else {
538            self.need_write
539                .set(self.need_write.get() | (total != wb.len()));
540            self.next.write.set(Some(wb));
541        }
542        result
543    }
544
545    #[inline]
546    /// Take source write buffer
547    pub fn take_src(&self) -> Option<BytesVec> {
548        self.curr.write.take().and_then(|b| {
549            if b.is_empty() {
550                self.io.memory_pool().release_write_buf(b);
551                None
552            } else {
553                Some(b)
554            }
555        })
556    }
557
558    #[inline]
559    /// Set source write buffer
560    pub fn set_src(&self, src: Option<BytesVec>) {
561        if let Some(src) = src {
562            if src.is_empty() {
563                self.io.memory_pool().release_write_buf(src);
564            } else if let Some(mut buf) = self.curr.write.take() {
565                buf.extend_from_slice(&src);
566                self.curr.write.set(Some(buf));
567                self.io.memory_pool().release_write_buf(src);
568            } else {
569                self.curr.write.set(Some(src));
570            }
571        }
572    }
573
574    #[inline]
575    /// Take destination write buffer
576    pub fn take_dst(&self) -> BytesVec {
577        self.next
578            .write
579            .take()
580            .unwrap_or_else(|| self.io.memory_pool().get_write_buf())
581    }
582
583    #[inline]
584    /// Set destination write buffer
585    pub fn set_dst(&self, dst: Option<BytesVec>) {
586        if let Some(dst) = dst {
587            if dst.is_empty() {
588                self.io.memory_pool().release_write_buf(dst);
589            } else {
590                self.need_write.set(true);
591
592                if let Some(mut buf) = self.next.write.take() {
593                    buf.extend_from_slice(&dst);
594                    self.next.write.set(Some(buf));
595                    self.io.memory_pool().release_write_buf(dst);
596                } else {
597                    self.next.write.set(Some(dst));
598                }
599            }
600        }
601    }
602
603    #[inline]
604    pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
605    where
606        F: FnOnce(&ReadBuf<'b>) -> R,
607    {
608        let mut buf = ReadBuf {
609            io: self.io,
610            curr: self.curr,
611            next: self.next,
612            nbytes: 0,
613            need_write: Cell::new(self.need_write.get()),
614        };
615        let result = f(&mut buf);
616        self.need_write.set(buf.need_write.get());
617        result
618    }
619}