ntex_io/
buf.rs

1use std::{cell::Cell, fmt};
2
3use ntex_bytes::{BytesVec, PoolRef};
4use ntex_util::future::Either;
5
6use crate::IoRef;
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10    read: Cell<Option<BytesVec>>,
11    write: Cell<Option<BytesVec>>,
12}
13
14impl fmt::Debug for Buffer {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        let b0 = self.read.take();
17        let b1 = self.write.take();
18        let res = f
19            .debug_struct("Buffer")
20            .field("read", &b0)
21            .field("write", &b1)
22            .finish();
23        self.read.set(b0);
24        self.write.set(b1);
25        res
26    }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33    len: usize,
34    buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38    pub(crate) fn new() -> Self {
39        Self {
40            len: 1,
41            buffers: Either::Left(Default::default()),
42        }
43    }
44
45    pub(crate) fn add_layer(&mut self) {
46        match &mut self.buffers {
47            Either::Left(b) => {
48                // move to vec
49                if self.len == INLINE_SIZE {
50                    let mut vec = vec![Buffer {
51                        read: Cell::new(None),
52                        write: Cell::new(None),
53                    }];
54                    for item in b.iter_mut().take(self.len) {
55                        vec.push(Buffer {
56                            read: Cell::new(item.read.take()),
57                            write: Cell::new(item.write.take()),
58                        });
59                    }
60                    self.len += 1;
61                    self.buffers = Either::Right(vec);
62                } else {
63                    let mut idx = self.len;
64                    while idx > 0 {
65                        let item = Buffer {
66                            read: Cell::new(b[idx - 1].read.take()),
67                            write: Cell::new(b[idx - 1].write.take()),
68                        };
69                        b[idx] = item;
70                        idx -= 1;
71                    }
72                    b[0] = Buffer {
73                        read: Cell::new(None),
74                        write: Cell::new(None),
75                    };
76                    self.len += 1;
77                }
78            }
79            Either::Right(vec) => {
80                self.len += 1;
81                vec.insert(
82                    0,
83                    Buffer {
84                        read: Cell::new(None),
85                        write: Cell::new(None),
86                    },
87                );
88            }
89        }
90    }
91
92    fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93    where
94        F: FnOnce(&Buffer, &Buffer) -> R,
95    {
96        let buffers = match self.buffers {
97            Either::Left(ref b) => &b[..],
98            Either::Right(ref b) => &b[..],
99        };
100
101        let next = idx + 1;
102        if self.len > next {
103            f(&buffers[idx], &buffers[next])
104        } else {
105            f(&buffers[idx], &Buffer::default())
106        }
107    }
108
109    fn get_first_level(&self) -> &Buffer {
110        match &self.buffers {
111            Either::Left(b) => &b[0],
112            Either::Right(b) => &b[0],
113        }
114    }
115
116    fn get_last_level(&self) -> &Buffer {
117        match &self.buffers {
118            Either::Left(b) => &b[self.len - 1],
119            Either::Right(b) => &b[self.len - 1],
120        }
121    }
122
123    pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
124        self.get_last_level().read.take()
125    }
126
127    pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
128        if buf.is_empty() {
129            io.memory_pool().release_read_buf(buf);
130        } else {
131            self.get_last_level().read.set(Some(buf));
132        }
133    }
134
135    pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136    where
137        F: FnOnce(&mut BytesVec) -> R,
138    {
139        let item = self.get_first_level();
140        let mut rb = item
141            .read
142            .take()
143            .unwrap_or_else(|| io.memory_pool().get_read_buf());
144
145        let result = f(&mut rb);
146
147        // check nested updates
148        if item.read.take().is_some() {
149            log::error!("Nested read io operation is detected");
150            io.force_close();
151        }
152
153        if rb.is_empty() {
154            io.memory_pool().release_read_buf(rb);
155        } else {
156            item.read.set(Some(rb));
157        }
158        result
159    }
160
161    pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
162        self.get_last_level().write.take()
163    }
164
165    pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
166        let b = self.get_last_level().write.take();
167        if b.is_some() {
168            self.get_last_level().write.set(b);
169            Some(buf)
170        } else {
171            self.get_last_level().write.set(Some(buf));
172            None
173        }
174    }
175
176    pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177    where
178        F: FnOnce(&mut BytesVec) -> R,
179    {
180        let item = self.get_first_level();
181        let mut wb = item
182            .write
183            .take()
184            .unwrap_or_else(|| io.memory_pool().get_write_buf());
185
186        let result = f(&mut wb);
187        if wb.is_empty() {
188            io.memory_pool().release_write_buf(wb);
189        } else {
190            item.write.set(Some(wb));
191        }
192        result
193    }
194
195    pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196    where
197        F: FnOnce(Option<&mut BytesVec>) -> R,
198    {
199        let item = self.get_last_level();
200        let mut wb = item.write.take();
201
202        let result = f(wb.as_mut());
203
204        // check nested updates
205        if item.write.take().is_some() {
206            log::error!("Nested write io operation is detected");
207            io.force_close();
208        }
209
210        if let Some(b) = wb {
211            if b.is_empty() {
212                io.memory_pool().release_write_buf(b);
213            } else {
214                item.write.set(Some(b));
215            }
216        }
217        result
218    }
219
220    pub(crate) fn read_destination_size(&self) -> usize {
221        let item = self.get_first_level();
222        let rb = item.read.take();
223        let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
224        item.read.set(rb);
225        size
226    }
227
228    pub(crate) fn write_destination_size(&self) -> usize {
229        let item = self.get_last_level();
230        let wb = item.write.take();
231        let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
232        item.write.set(wb);
233        size
234    }
235
236    pub(crate) fn release(&self, pool: PoolRef) {
237        let items = match &self.buffers {
238            Either::Left(b) => &b[..],
239            Either::Right(b) => &b[..],
240        };
241
242        for item in items {
243            if let Some(buf) = item.read.take() {
244                pool.release_read_buf(buf);
245            }
246            if let Some(buf) = item.write.take() {
247                pool.release_write_buf(buf);
248            }
249        }
250    }
251
252    pub(crate) fn set_memory_pool(&self, pool: PoolRef) {
253        let items = match &self.buffers {
254            Either::Left(b) => &b[..],
255            Either::Right(b) => &b[..],
256        };
257        for item in items {
258            if let Some(mut b) = item.read.take() {
259                pool.move_vec_in(&mut b);
260                item.read.set(Some(b));
261            }
262            if let Some(mut b) = item.write.take() {
263                pool.move_vec_in(&mut b);
264                item.write.set(Some(b));
265            }
266        }
267    }
268}
269
270#[derive(Copy, Clone, Debug)]
271pub struct FilterCtx<'a> {
272    pub(crate) io: &'a IoRef,
273    pub(crate) stack: &'a Stack,
274    pub(crate) idx: usize,
275}
276
277impl<'a> FilterCtx<'a> {
278    pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
279        Self { io, stack, idx: 0 }
280    }
281
282    pub fn next(&self) -> Self {
283        Self {
284            io: self.io,
285            stack: self.stack,
286            idx: self.idx + 1,
287        }
288    }
289
290    pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
291    where
292        F: FnOnce(&ReadBuf<'_>) -> R,
293    {
294        self.stack.get_buffers(self.idx, |curr, next| {
295            let buf = ReadBuf {
296                nbytes,
297                curr,
298                next,
299                io: self.io,
300                need_write: Cell::new(false),
301            };
302            f(&buf)
303        })
304    }
305
306    pub fn write_buf<F, R>(&self, f: F) -> R
307    where
308        F: FnOnce(&WriteBuf<'_>) -> R,
309    {
310        self.stack.get_buffers(self.idx, |curr, next| {
311            let buf = WriteBuf {
312                curr,
313                next,
314                io: self.io,
315                need_write: Cell::new(false),
316            };
317            f(&buf)
318        })
319    }
320}
321
322#[derive(Debug)]
323pub struct ReadBuf<'a> {
324    pub(crate) io: &'a IoRef,
325    pub(crate) curr: &'a Buffer,
326    pub(crate) next: &'a Buffer,
327    pub(crate) nbytes: usize,
328    pub(crate) need_write: Cell<bool>,
329}
330
331impl ReadBuf<'_> {
332    #[inline]
333    /// Get io tag
334    pub fn tag(&self) -> &'static str {
335        self.io.tag()
336    }
337
338    #[inline]
339    /// Get number of newly added bytes
340    pub fn nbytes(&self) -> usize {
341        self.nbytes
342    }
343
344    #[inline]
345    /// Initiate graceful io stream shutdown
346    pub fn want_shutdown(&self) {
347        self.io.want_shutdown()
348    }
349
350    #[inline]
351    /// Make sure buffer has enough free space
352    pub fn resize_buf(&self, buf: &mut BytesVec) {
353        self.io.memory_pool().resize_read_buf(buf);
354    }
355
356    #[inline]
357    /// Get reference to source read buffer
358    pub fn with_src<F, R>(&self, f: F) -> R
359    where
360        F: FnOnce(&mut Option<BytesVec>) -> R,
361    {
362        let mut buf = self.next.read.take();
363        let result = f(&mut buf);
364
365        if let Some(b) = buf {
366            if b.is_empty() {
367                self.io.memory_pool().release_read_buf(b);
368            } else {
369                self.next.read.set(Some(b));
370            }
371        }
372        result
373    }
374
375    #[inline]
376    /// Get reference to destination read buffer
377    pub fn with_dst<F, R>(&self, f: F) -> R
378    where
379        F: FnOnce(&mut BytesVec) -> R,
380    {
381        let mut rb = self
382            .curr
383            .read
384            .take()
385            .unwrap_or_else(|| self.io.memory_pool().get_read_buf());
386
387        let result = f(&mut rb);
388        if rb.is_empty() {
389            self.io.memory_pool().release_read_buf(rb);
390        } else {
391            self.curr.read.set(Some(rb));
392        }
393        result
394    }
395
396    #[inline]
397    /// Take source read buffer
398    pub fn take_src(&self) -> Option<BytesVec> {
399        self.next.read.take().and_then(|b| {
400            if b.is_empty() {
401                self.io.memory_pool().release_read_buf(b);
402                None
403            } else {
404                Some(b)
405            }
406        })
407    }
408
409    #[inline]
410    /// Set source read buffer
411    pub fn set_src(&self, src: Option<BytesVec>) {
412        if let Some(src) = src {
413            if src.is_empty() {
414                self.io.memory_pool().release_read_buf(src);
415            } else if let Some(mut buf) = self.next.read.take() {
416                buf.extend_from_slice(&src);
417                self.next.read.set(Some(buf));
418                self.io.memory_pool().release_read_buf(src);
419            } else {
420                self.next.read.set(Some(src));
421            }
422        }
423    }
424
425    #[inline]
426    /// Take destination read buffer
427    pub fn take_dst(&self) -> BytesVec {
428        self.curr
429            .read
430            .take()
431            .unwrap_or_else(|| self.io.memory_pool().get_read_buf())
432    }
433
434    #[inline]
435    /// Set destination read buffer
436    pub fn set_dst(&self, dst: Option<BytesVec>) {
437        if let Some(dst) = dst {
438            if dst.is_empty() {
439                self.io.memory_pool().release_read_buf(dst);
440            } else if let Some(mut buf) = self.curr.read.take() {
441                buf.extend_from_slice(&dst);
442                self.curr.read.set(Some(buf));
443                self.io.memory_pool().release_read_buf(dst);
444            } else {
445                self.curr.read.set(Some(dst));
446            }
447        }
448    }
449
450    #[inline]
451    pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
452    where
453        F: FnOnce(&WriteBuf<'b>) -> R,
454    {
455        let mut buf = WriteBuf {
456            io: self.io,
457            curr: self.curr,
458            next: self.next,
459            need_write: Cell::new(self.need_write.get()),
460        };
461        let result = f(&mut buf);
462        self.need_write.set(buf.need_write.get());
463        result
464    }
465}
466
467#[derive(Debug)]
468pub struct WriteBuf<'a> {
469    pub(crate) io: &'a IoRef,
470    pub(crate) curr: &'a Buffer,
471    pub(crate) next: &'a Buffer,
472    pub(crate) need_write: Cell<bool>,
473}
474
475impl WriteBuf<'_> {
476    #[inline]
477    /// Get io tag
478    pub fn tag(&self) -> &'static str {
479        self.io.tag()
480    }
481
482    #[inline]
483    /// Initiate graceful io stream shutdown
484    pub fn want_shutdown(&self) {
485        self.io.want_shutdown()
486    }
487
488    #[inline]
489    /// Make sure buffer has enough free space
490    pub fn resize_buf(&self, buf: &mut BytesVec) {
491        self.io.memory_pool().resize_write_buf(buf);
492    }
493
494    #[inline]
495    /// Get reference to source write buffer
496    pub fn with_src<F, R>(&self, f: F) -> R
497    where
498        F: FnOnce(&mut Option<BytesVec>) -> R,
499    {
500        let mut wb = self.curr.write.take();
501        let result = f(&mut wb);
502        if let Some(b) = wb {
503            if b.is_empty() {
504                self.io.memory_pool().release_write_buf(b);
505            } else {
506                self.curr.write.set(Some(b));
507            }
508        }
509        result
510    }
511
512    #[inline]
513    /// Get reference to destination write buffer
514    pub fn with_dst<F, R>(&self, f: F) -> R
515    where
516        F: FnOnce(&mut BytesVec) -> R,
517    {
518        let mut wb = self
519            .next
520            .write
521            .take()
522            .unwrap_or_else(|| self.io.memory_pool().get_write_buf());
523
524        let total = wb.len();
525        let result = f(&mut wb);
526
527        if wb.is_empty() {
528            self.io.memory_pool().release_write_buf(wb);
529        } else {
530            self.need_write
531                .set(self.need_write.get() | (total != wb.len()));
532            self.next.write.set(Some(wb));
533        }
534        result
535    }
536
537    #[inline]
538    /// Take source write buffer
539    pub fn take_src(&self) -> Option<BytesVec> {
540        self.curr.write.take().and_then(|b| {
541            if b.is_empty() {
542                self.io.memory_pool().release_write_buf(b);
543                None
544            } else {
545                Some(b)
546            }
547        })
548    }
549
550    #[inline]
551    /// Set source write buffer
552    pub fn set_src(&self, src: Option<BytesVec>) {
553        if let Some(src) = src {
554            if src.is_empty() {
555                self.io.memory_pool().release_write_buf(src);
556            } else if let Some(mut buf) = self.curr.write.take() {
557                buf.extend_from_slice(&src);
558                self.curr.write.set(Some(buf));
559                self.io.memory_pool().release_write_buf(src);
560            } else {
561                self.curr.write.set(Some(src));
562            }
563        }
564    }
565
566    #[inline]
567    /// Take destination write buffer
568    pub fn take_dst(&self) -> BytesVec {
569        self.next
570            .write
571            .take()
572            .unwrap_or_else(|| self.io.memory_pool().get_write_buf())
573    }
574
575    #[inline]
576    /// Set destination write buffer
577    pub fn set_dst(&self, dst: Option<BytesVec>) {
578        if let Some(dst) = dst {
579            if dst.is_empty() {
580                self.io.memory_pool().release_write_buf(dst);
581            } else {
582                self.need_write.set(true);
583
584                if let Some(mut buf) = self.next.write.take() {
585                    buf.extend_from_slice(&dst);
586                    self.next.write.set(Some(buf));
587                    self.io.memory_pool().release_write_buf(dst);
588                } else {
589                    self.next.write.set(Some(dst));
590                }
591            }
592        }
593    }
594
595    #[inline]
596    pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
597    where
598        F: FnOnce(&ReadBuf<'b>) -> R,
599    {
600        let mut buf = ReadBuf {
601            io: self.io,
602            curr: self.curr,
603            next: self.next,
604            nbytes: 0,
605            need_write: Cell::new(self.need_write.get()),
606        };
607        let result = f(&mut buf);
608        self.need_write.set(buf.need_write.get());
609        result
610    }
611}