Skip to main content

ntex_io/
buf.rs

1use std::{cell::Cell, fmt};
2
3use ntex_bytes::BytesMut;
4use ntex_util::future::Either;
5
6use crate::{IoRef, cfg::BufConfig};
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10    read: Cell<Option<BytesMut>>,
11    write: Cell<Option<BytesMut>>,
12}
13
14impl fmt::Debug for Buffer {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        let b0 = self.read.take();
17        let b1 = self.write.take();
18        let res = f
19            .debug_struct("Buffer")
20            .field("read", &b0)
21            .field("write", &b1)
22            .finish();
23        self.read.set(b0);
24        self.write.set(b1);
25        res
26    }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33    len: usize,
34    buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38    pub(crate) fn new() -> Self {
39        Self {
40            len: 1,
41            buffers: Either::Left(Default::default()),
42        }
43    }
44
45    pub(crate) fn add_layer(&mut self) {
46        match &mut self.buffers {
47            Either::Left(b) => {
48                // move to vec
49                if self.len == INLINE_SIZE {
50                    let mut vec = vec![Buffer {
51                        read: Cell::new(None),
52                        write: Cell::new(None),
53                    }];
54                    for item in b.iter_mut().take(self.len) {
55                        vec.push(Buffer {
56                            read: Cell::new(item.read.take()),
57                            write: Cell::new(item.write.take()),
58                        });
59                    }
60                    self.len += 1;
61                    self.buffers = Either::Right(vec);
62                } else {
63                    let mut idx = self.len;
64                    while idx > 0 {
65                        let item = Buffer {
66                            read: Cell::new(b[idx - 1].read.take()),
67                            write: Cell::new(b[idx - 1].write.take()),
68                        };
69                        b[idx] = item;
70                        idx -= 1;
71                    }
72                    b[0] = Buffer {
73                        read: Cell::new(None),
74                        write: Cell::new(None),
75                    };
76                    self.len += 1;
77                }
78            }
79            Either::Right(vec) => {
80                self.len += 1;
81                vec.insert(
82                    0,
83                    Buffer {
84                        read: Cell::new(None),
85                        write: Cell::new(None),
86                    },
87                );
88            }
89        }
90    }
91
92    fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93    where
94        F: FnOnce(&Buffer, &Buffer) -> R,
95    {
96        let buffers = match self.buffers {
97            Either::Left(ref b) => &b[..],
98            Either::Right(ref b) => &b[..],
99        };
100
101        let next = idx + 1;
102        if self.len > next {
103            f(&buffers[idx], &buffers[next])
104        } else {
105            f(&buffers[idx], &Buffer::default())
106        }
107    }
108
109    fn get_first_level(&self) -> &Buffer {
110        match &self.buffers {
111            Either::Left(b) => &b[0],
112            Either::Right(b) => &b[0],
113        }
114    }
115
116    fn get_last_level(&self) -> &Buffer {
117        match &self.buffers {
118            Either::Left(b) => &b[self.len - 1],
119            Either::Right(b) => &b[self.len - 1],
120        }
121    }
122
123    pub(crate) fn get_read_source(&self) -> Option<BytesMut> {
124        self.get_last_level().read.take()
125    }
126
127    pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesMut) {
128        if buf.is_empty() {
129            io.cfg().read_buf().release(buf);
130        } else {
131            self.get_last_level().read.set(Some(buf));
132        }
133    }
134
135    pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136    where
137        F: FnOnce(&mut BytesMut) -> R,
138    {
139        let item = self.get_first_level();
140        let mut rb = item
141            .read
142            .take()
143            .unwrap_or_else(|| io.cfg().read_buf().get());
144
145        let result = f(&mut rb);
146
147        // check nested updates
148        if item.read.take().is_some() {
149            log::error!("Nested read io operation is detected");
150            io.force_close();
151        }
152
153        if rb.is_empty() {
154            io.cfg().read_buf().release(rb);
155        } else {
156            item.read.set(Some(rb));
157        }
158        result
159    }
160
161    pub(crate) fn get_write_destination(&self) -> Option<BytesMut> {
162        self.get_last_level().write.take()
163    }
164
165    pub(crate) fn set_write_destination(&self, buf: BytesMut) -> Option<BytesMut> {
166        let b = self.get_last_level().write.take();
167        if b.is_some() {
168            self.get_last_level().write.set(b);
169            Some(buf)
170        } else {
171            self.get_last_level().write.set(Some(buf));
172            None
173        }
174    }
175
176    pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177    where
178        F: FnOnce(&mut BytesMut) -> R,
179    {
180        let item = self.get_first_level();
181        let mut wb = item
182            .write
183            .take()
184            .unwrap_or_else(|| io.cfg().write_buf().get());
185
186        let result = f(&mut wb);
187        if wb.is_empty() {
188            io.cfg().write_buf().release(wb);
189        } else {
190            item.write.set(Some(wb));
191        }
192        result
193    }
194
195    pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196    where
197        F: FnOnce(Option<&mut BytesMut>) -> R,
198    {
199        let item = self.get_last_level();
200        let mut wb = item.write.take();
201
202        let result = f(wb.as_mut());
203
204        // check nested updates
205        if item.write.take().is_some() {
206            log::error!("Nested write io operation is detected");
207            io.force_close();
208        }
209
210        if let Some(b) = wb {
211            if b.is_empty() {
212                io.cfg().write_buf().release(b);
213            } else {
214                item.write.set(Some(b));
215            }
216        }
217        result
218    }
219
220    pub(crate) fn read_destination_size(&self) -> usize {
221        let item = self.get_first_level();
222        let rb = item.read.take();
223        let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
224        item.read.set(rb);
225        size
226    }
227
228    pub(crate) fn write_destination_size(&self) -> usize {
229        let item = self.get_last_level();
230        let wb = item.write.take();
231        let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
232        item.write.set(wb);
233        size
234    }
235}
236
237#[derive(Copy, Clone, Debug)]
238pub struct FilterCtx<'a> {
239    pub(crate) io: &'a IoRef,
240    pub(crate) stack: &'a Stack,
241    pub(crate) idx: usize,
242}
243
244impl<'a> FilterCtx<'a> {
245    pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
246        Self { io, stack, idx: 0 }
247    }
248
249    #[inline]
250    /// Get io
251    pub fn io(&self) -> &IoRef {
252        self.io
253    }
254
255    #[inline]
256    /// Get io tag
257    pub fn tag(&self) -> &'static str {
258        self.io.tag()
259    }
260
261    #[inline]
262    /// Get filter ctx for next filter in chain
263    pub fn next(&self) -> Self {
264        Self {
265            io: self.io,
266            stack: self.stack,
267            idx: self.idx + 1,
268        }
269    }
270
271    #[inline]
272    /// Get current read buffer
273    pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
274    where
275        F: FnOnce(&ReadBuf<'_>) -> R,
276    {
277        self.stack.get_buffers(self.idx, |curr, next| {
278            let buf = ReadBuf {
279                nbytes,
280                curr,
281                next,
282                io: self.io,
283                need_write: Cell::new(false),
284            };
285            f(&buf)
286        })
287    }
288
289    #[inline]
290    /// Get current write buffer
291    pub fn write_buf<F, R>(&self, f: F) -> R
292    where
293        F: FnOnce(&WriteBuf<'_>) -> R,
294    {
295        self.stack.get_buffers(self.idx, |curr, next| {
296            let buf = WriteBuf {
297                curr,
298                next,
299                io: self.io,
300                need_write: Cell::new(false),
301            };
302            f(&buf)
303        })
304    }
305}
306
307#[derive(Debug)]
308pub struct ReadBuf<'a> {
309    pub(crate) io: &'a IoRef,
310    pub(crate) curr: &'a Buffer,
311    pub(crate) next: &'a Buffer,
312    pub(crate) nbytes: usize,
313    pub(crate) need_write: Cell<bool>,
314}
315
316impl ReadBuf<'_> {
317    #[inline]
318    /// Get io tag
319    pub fn tag(&self) -> &'static str {
320        self.io.tag()
321    }
322
323    #[inline]
324    /// Get buffer params
325    pub fn cfg(&self) -> &BufConfig {
326        self.io.cfg().read_buf()
327    }
328
329    #[inline]
330    /// Get number of newly added bytes
331    pub fn nbytes(&self) -> usize {
332        self.nbytes
333    }
334
335    #[inline]
336    /// Initiate graceful io stream shutdown
337    pub fn want_shutdown(&self) {
338        self.io.want_shutdown()
339    }
340
341    #[inline]
342    /// Make sure buffer has enough free space
343    pub fn resize_buf(&self, buf: &mut BytesMut) {
344        self.io.cfg().read_buf().resize(buf);
345    }
346
347    #[inline]
348    /// Get reference to source read buffer
349    pub fn with_src<F, R>(&self, f: F) -> R
350    where
351        F: FnOnce(&mut Option<BytesMut>) -> R,
352    {
353        let mut buf = self.next.read.take();
354        let result = f(&mut buf);
355
356        if let Some(b) = buf {
357            if b.is_empty() {
358                self.io.cfg().read_buf().release(b);
359            } else {
360                self.next.read.set(Some(b));
361            }
362        }
363        result
364    }
365
366    #[inline]
367    /// Get reference to destination read buffer
368    pub fn with_dst<F, R>(&self, f: F) -> R
369    where
370        F: FnOnce(&mut BytesMut) -> R,
371    {
372        let mut rb = self
373            .curr
374            .read
375            .take()
376            .unwrap_or_else(|| self.io.cfg().read_buf().get());
377
378        let result = f(&mut rb);
379        if rb.is_empty() {
380            self.io.cfg().read_buf().release(rb);
381        } else {
382            self.curr.read.set(Some(rb));
383        }
384        result
385    }
386
387    #[inline]
388    /// Take source read buffer
389    pub fn take_src(&self) -> Option<BytesMut> {
390        self.next.read.take().and_then(|b| {
391            if b.is_empty() {
392                self.io.cfg().read_buf().release(b);
393                None
394            } else {
395                Some(b)
396            }
397        })
398    }
399
400    #[inline]
401    /// Set source read buffer
402    pub fn set_src(&self, src: Option<BytesMut>) {
403        if let Some(src) = src {
404            if src.is_empty() {
405                self.io.cfg().read_buf().release(src);
406            } else if let Some(mut buf) = self.next.read.take() {
407                buf.extend_from_slice(&src);
408                self.next.read.set(Some(buf));
409                self.io.cfg().read_buf().release(src);
410            } else {
411                self.next.read.set(Some(src));
412            }
413        }
414    }
415
416    #[inline]
417    /// Take destination read buffer
418    pub fn take_dst(&self) -> BytesMut {
419        self.curr
420            .read
421            .take()
422            .unwrap_or_else(|| self.io.cfg().read_buf().get())
423    }
424
425    #[inline]
426    /// Set destination read buffer
427    pub fn set_dst(&self, dst: Option<BytesMut>) {
428        if let Some(dst) = dst {
429            if dst.is_empty() {
430                self.io.cfg().read_buf().release(dst);
431            } else if let Some(mut buf) = self.curr.read.take() {
432                buf.extend_from_slice(&dst);
433                self.curr.read.set(Some(buf));
434                self.io.cfg().read_buf().release(dst);
435            } else {
436                self.curr.read.set(Some(dst));
437            }
438        }
439    }
440
441    #[inline]
442    pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
443    where
444        F: FnOnce(&WriteBuf<'b>) -> R,
445    {
446        let mut buf = WriteBuf {
447            io: self.io,
448            curr: self.curr,
449            next: self.next,
450            need_write: Cell::new(self.need_write.get()),
451        };
452        let result = f(&mut buf);
453        self.need_write.set(buf.need_write.get());
454        result
455    }
456}
457
458#[derive(Debug)]
459pub struct WriteBuf<'a> {
460    pub(crate) io: &'a IoRef,
461    pub(crate) curr: &'a Buffer,
462    pub(crate) next: &'a Buffer,
463    pub(crate) need_write: Cell<bool>,
464}
465
466impl WriteBuf<'_> {
467    #[inline]
468    /// Get io tag
469    pub fn tag(&self) -> &'static str {
470        self.io.tag()
471    }
472
473    #[inline]
474    /// Get buffer params
475    pub fn cfg(&self) -> &BufConfig {
476        self.io.cfg().write_buf()
477    }
478
479    #[inline]
480    /// Initiate graceful io stream shutdown
481    pub fn want_shutdown(&self) {
482        self.io.want_shutdown()
483    }
484
485    #[inline]
486    /// Make sure buffer has enough free space
487    pub fn resize_buf(&self, buf: &mut BytesMut) {
488        self.io.cfg().write_buf().resize(buf);
489    }
490
491    #[inline]
492    /// Make sure buffer has enough free space
493    pub fn resize_buf_min(&self, buf: &mut BytesMut, size: usize) {
494        self.io.cfg().write_buf().resize_min(buf, size);
495    }
496
497    #[inline]
498    /// Get reference to source write buffer
499    pub fn with_src<F, R>(&self, f: F) -> R
500    where
501        F: FnOnce(&mut Option<BytesMut>) -> R,
502    {
503        let mut wb = self.curr.write.take();
504        let result = f(&mut wb);
505        if let Some(b) = wb {
506            if b.is_empty() {
507                self.io.cfg().write_buf().release(b);
508            } else {
509                self.curr.write.set(Some(b));
510            }
511        }
512        result
513    }
514
515    #[inline]
516    /// Get reference to destination write buffer
517    pub fn with_dst<F, R>(&self, f: F) -> R
518    where
519        F: FnOnce(&mut BytesMut) -> R,
520    {
521        let mut wb = self
522            .next
523            .write
524            .take()
525            .unwrap_or_else(|| self.io.cfg().write_buf().get());
526
527        let total = wb.len();
528        let result = f(&mut wb);
529
530        if wb.is_empty() {
531            self.io.cfg().write_buf().release(wb);
532        } else {
533            self.need_write
534                .set(self.need_write.get() | (total != wb.len()));
535            self.next.write.set(Some(wb));
536        }
537        result
538    }
539
540    #[inline]
541    /// Take source write buffer
542    pub fn take_src(&self) -> Option<BytesMut> {
543        self.curr.write.take().and_then(|b| {
544            if b.is_empty() {
545                self.io.cfg().write_buf().release(b);
546                None
547            } else {
548                Some(b)
549            }
550        })
551    }
552
553    #[inline]
554    /// Set source write buffer
555    pub fn set_src(&self, src: Option<BytesMut>) {
556        if let Some(src) = src {
557            if src.is_empty() {
558                self.io.cfg().write_buf().release(src);
559            } else if let Some(mut buf) = self.curr.write.take() {
560                buf.extend_from_slice(&src);
561                self.curr.write.set(Some(buf));
562                self.io.cfg().write_buf().release(src);
563            } else {
564                self.curr.write.set(Some(src));
565            }
566        }
567    }
568
569    #[inline]
570    /// Take destination write buffer
571    pub fn take_dst(&self) -> BytesMut {
572        self.next
573            .write
574            .take()
575            .unwrap_or_else(|| self.io.cfg().write_buf().get())
576    }
577
578    #[inline]
579    /// Set destination write buffer
580    pub fn set_dst(&self, dst: Option<BytesMut>) {
581        if let Some(dst) = dst {
582            if dst.is_empty() {
583                self.io.cfg().write_buf().release(dst);
584            } else {
585                self.need_write.set(true);
586
587                if let Some(mut buf) = self.next.write.take() {
588                    buf.extend_from_slice(&dst);
589                    self.next.write.set(Some(buf));
590                    self.io.cfg().write_buf().release(dst);
591                } else {
592                    self.next.write.set(Some(dst));
593                }
594            }
595        }
596    }
597
598    #[inline]
599    pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
600    where
601        F: FnOnce(&ReadBuf<'b>) -> R,
602    {
603        let mut buf = ReadBuf {
604            io: self.io,
605            curr: self.curr,
606            next: self.next,
607            nbytes: 0,
608            need_write: Cell::new(self.need_write.get()),
609        };
610        let result = f(&mut buf);
611        self.need_write.set(buf.need_write.get());
612        result
613    }
614}