Skip to main content

ntex_io/
buf.rs

1use std::{cell::Cell, fmt};
2
3use ntex_bytes::BytesMut;
4use ntex_util::future::Either;
5
6use crate::{IoRef, cfg::BufConfig};
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10    read: Cell<Option<BytesMut>>,
11    write: Cell<Option<BytesMut>>,
12}
13
14impl fmt::Debug for Buffer {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        let b0 = self.read.take();
17        let b1 = self.write.take();
18        let res = f
19            .debug_struct("Buffer")
20            .field("read", &b0)
21            .field("write", &b1)
22            .finish();
23        self.read.set(b0);
24        self.write.set(b1);
25        res
26    }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33    len: usize,
34    buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38    pub(crate) fn new() -> Self {
39        Self {
40            len: 1,
41            buffers: Either::Left(Default::default()),
42        }
43    }
44
45    pub(crate) fn add_layer(&mut self) {
46        match &mut self.buffers {
47            Either::Left(b) => {
48                // move to vec
49                if self.len == INLINE_SIZE {
50                    let mut vec = vec![Buffer {
51                        read: Cell::new(None),
52                        write: Cell::new(None),
53                    }];
54                    for item in b.iter_mut().take(self.len) {
55                        vec.push(Buffer {
56                            read: Cell::new(item.read.take()),
57                            write: Cell::new(item.write.take()),
58                        });
59                    }
60                    self.len += 1;
61                    self.buffers = Either::Right(vec);
62                } else {
63                    let mut idx = self.len;
64                    while idx > 0 {
65                        let item = Buffer {
66                            read: Cell::new(b[idx - 1].read.take()),
67                            write: Cell::new(b[idx - 1].write.take()),
68                        };
69                        b[idx] = item;
70                        idx -= 1;
71                    }
72                    b[0] = Buffer {
73                        read: Cell::new(None),
74                        write: Cell::new(None),
75                    };
76                    self.len += 1;
77                }
78            }
79            Either::Right(vec) => {
80                self.len += 1;
81                vec.insert(
82                    0,
83                    Buffer {
84                        read: Cell::new(None),
85                        write: Cell::new(None),
86                    },
87                );
88            }
89        }
90    }
91
92    fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93    where
94        F: FnOnce(&Buffer, &Buffer) -> R,
95    {
96        let buffers = match self.buffers {
97            Either::Left(ref b) => &b[..],
98            Either::Right(ref b) => &b[..],
99        };
100
101        let next = idx + 1;
102        if self.len > next {
103            f(&buffers[idx], &buffers[next])
104        } else {
105            f(&buffers[idx], &Buffer::default())
106        }
107    }
108
109    fn get_first_level(&self) -> &Buffer {
110        match &self.buffers {
111            Either::Left(b) => &b[0],
112            Either::Right(b) => &b[0],
113        }
114    }
115
116    fn get_last_level(&self) -> &Buffer {
117        match &self.buffers {
118            Either::Left(b) => &b[self.len - 1],
119            Either::Right(b) => &b[self.len - 1],
120        }
121    }
122
123    pub(crate) fn get_read_source(&self) -> Option<BytesMut> {
124        self.get_last_level().read.take()
125    }
126
127    pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesMut) {
128        if buf.is_empty() {
129            io.cfg().read_buf().release(buf);
130        } else {
131            self.get_last_level().read.set(Some(buf));
132        }
133    }
134
135    pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136    where
137        F: FnOnce(&mut BytesMut) -> R,
138    {
139        let item = self.get_first_level();
140        let mut rb = item
141            .read
142            .take()
143            .unwrap_or_else(|| io.cfg().read_buf().get());
144
145        let result = f(&mut rb);
146
147        // check nested updates
148        if item.read.take().is_some() {
149            log::error!("Nested read io operation is detected");
150            io.force_close();
151        }
152
153        if rb.is_empty() {
154            io.cfg().read_buf().release(rb);
155        } else {
156            item.read.set(Some(rb));
157        }
158        result
159    }
160
161    pub(crate) fn get_write_destination(&self) -> Option<BytesMut> {
162        self.get_last_level().write.take()
163    }
164
165    pub(crate) fn set_write_destination(&self, buf: BytesMut) -> Option<BytesMut> {
166        let b = self.get_last_level().write.take();
167        if b.is_some() {
168            self.get_last_level().write.set(b);
169            Some(buf)
170        } else {
171            self.get_last_level().write.set(Some(buf));
172            None
173        }
174    }
175
176    pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177    where
178        F: FnOnce(&mut BytesMut) -> R,
179    {
180        let item = self.get_first_level();
181        let mut wb = item
182            .write
183            .take()
184            .unwrap_or_else(|| io.cfg().write_buf().get());
185
186        let result = f(&mut wb);
187        if wb.is_empty() {
188            io.cfg().write_buf().release(wb);
189        } else {
190            item.write.set(Some(wb));
191        }
192        result
193    }
194
195    pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196    where
197        F: FnOnce(Option<&mut BytesMut>) -> R,
198    {
199        let item = self.get_last_level();
200        let mut wb = item.write.take();
201
202        let result = f(wb.as_mut());
203
204        // check nested updates
205        if item.write.take().is_some() {
206            log::error!("Nested write io operation is detected");
207            io.force_close();
208        }
209
210        if let Some(b) = wb {
211            if b.is_empty() {
212                io.cfg().write_buf().release(b);
213            } else {
214                item.write.set(Some(b));
215            }
216        }
217        result
218    }
219
220    pub(crate) fn read_destination_size(&self) -> usize {
221        let item = self.get_first_level();
222        let rb = item.read.take();
223        let size = rb.as_ref().map_or(0, BytesMut::len);
224        item.read.set(rb);
225        size
226    }
227
228    pub(crate) fn write_destination_size(&self) -> usize {
229        let item = self.get_last_level();
230        let wb = item.write.take();
231        let size = wb.as_ref().map_or(0, BytesMut::len);
232        item.write.set(wb);
233        size
234    }
235}
236
237#[derive(Copy, Clone, Debug)]
238pub struct FilterCtx<'a> {
239    pub(crate) io: &'a IoRef,
240    pub(crate) stack: &'a Stack,
241    pub(crate) idx: usize,
242}
243
244impl<'a> FilterCtx<'a> {
245    pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
246        Self { io, stack, idx: 0 }
247    }
248
249    #[inline]
250    /// Get io
251    pub fn io(&self) -> &IoRef {
252        self.io
253    }
254
255    #[inline]
256    /// Get io tag
257    pub fn tag(&self) -> &'static str {
258        self.io.tag()
259    }
260
261    #[inline]
262    #[must_use]
263    /// Get filter ctx for next filter in chain
264    pub fn next(&self) -> Self {
265        Self {
266            io: self.io,
267            stack: self.stack,
268            idx: self.idx + 1,
269        }
270    }
271
272    #[inline]
273    /// Get current read buffer
274    pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
275    where
276        F: FnOnce(&ReadBuf<'_>) -> R,
277    {
278        self.stack.get_buffers(self.idx, |curr, next| {
279            let buf = ReadBuf {
280                nbytes,
281                curr,
282                next,
283                io: self.io,
284                need_write: Cell::new(false),
285            };
286            f(&buf)
287        })
288    }
289
290    #[inline]
291    /// Get current write buffer
292    pub fn write_buf<F, R>(&self, f: F) -> R
293    where
294        F: FnOnce(&WriteBuf<'_>) -> R,
295    {
296        self.stack.get_buffers(self.idx, |curr, next| {
297            let buf = WriteBuf {
298                curr,
299                next,
300                io: self.io,
301                need_write: Cell::new(false),
302            };
303            f(&buf)
304        })
305    }
306}
307
308#[derive(Debug)]
309pub struct ReadBuf<'a> {
310    pub(crate) io: &'a IoRef,
311    pub(crate) curr: &'a Buffer,
312    pub(crate) next: &'a Buffer,
313    pub(crate) nbytes: usize,
314    pub(crate) need_write: Cell<bool>,
315}
316
317impl ReadBuf<'_> {
318    #[inline]
319    /// Get io tag
320    pub fn tag(&self) -> &'static str {
321        self.io.tag()
322    }
323
324    #[inline]
325    /// Get buffer params
326    pub fn cfg(&self) -> &BufConfig {
327        self.io.cfg().read_buf()
328    }
329
330    #[inline]
331    /// Get number of newly added bytes
332    pub fn nbytes(&self) -> usize {
333        self.nbytes
334    }
335
336    #[inline]
337    /// Initiate graceful io stream shutdown
338    pub fn want_shutdown(&self) {
339        self.io.want_shutdown();
340    }
341
342    #[inline]
343    /// Make sure buffer has enough free space
344    pub fn resize_buf(&self, buf: &mut BytesMut) {
345        self.io.cfg().read_buf().resize(buf);
346    }
347
348    #[inline]
349    /// Get reference to source read buffer
350    pub fn with_src<F, R>(&self, f: F) -> R
351    where
352        F: FnOnce(&mut Option<BytesMut>) -> R,
353    {
354        let mut buf = self.next.read.take();
355        let result = f(&mut buf);
356
357        if let Some(b) = buf {
358            if b.is_empty() {
359                self.io.cfg().read_buf().release(b);
360            } else {
361                self.next.read.set(Some(b));
362            }
363        }
364        result
365    }
366
367    #[inline]
368    /// Get reference to destination read buffer
369    pub fn with_dst<F, R>(&self, f: F) -> R
370    where
371        F: FnOnce(&mut BytesMut) -> R,
372    {
373        let mut rb = self
374            .curr
375            .read
376            .take()
377            .unwrap_or_else(|| self.io.cfg().read_buf().get());
378
379        let result = f(&mut rb);
380        if rb.is_empty() {
381            self.io.cfg().read_buf().release(rb);
382        } else {
383            self.curr.read.set(Some(rb));
384        }
385        result
386    }
387
388    #[inline]
389    /// Take source read buffer
390    pub fn take_src(&self) -> Option<BytesMut> {
391        self.next.read.take().and_then(|b| {
392            if b.is_empty() {
393                self.io.cfg().read_buf().release(b);
394                None
395            } else {
396                Some(b)
397            }
398        })
399    }
400
401    #[inline]
402    /// Set source read buffer
403    pub fn set_src(&self, src: Option<BytesMut>) {
404        if let Some(src) = src {
405            if src.is_empty() {
406                self.io.cfg().read_buf().release(src);
407            } else if let Some(mut buf) = self.next.read.take() {
408                buf.extend_from_slice(&src);
409                self.next.read.set(Some(buf));
410                self.io.cfg().read_buf().release(src);
411            } else {
412                self.next.read.set(Some(src));
413            }
414        }
415    }
416
417    #[inline]
418    /// Take destination read buffer
419    pub fn take_dst(&self) -> BytesMut {
420        self.curr
421            .read
422            .take()
423            .unwrap_or_else(|| self.io.cfg().read_buf().get())
424    }
425
426    #[inline]
427    /// Set destination read buffer
428    pub fn set_dst(&self, dst: Option<BytesMut>) {
429        if let Some(dst) = dst {
430            if dst.is_empty() {
431                self.io.cfg().read_buf().release(dst);
432            } else if let Some(mut buf) = self.curr.read.take() {
433                buf.extend_from_slice(&dst);
434                self.curr.read.set(Some(buf));
435                self.io.cfg().read_buf().release(dst);
436            } else {
437                self.curr.read.set(Some(dst));
438            }
439        }
440    }
441
442    #[inline]
443    pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
444    where
445        F: FnOnce(&WriteBuf<'b>) -> R,
446    {
447        let mut buf = WriteBuf {
448            io: self.io,
449            curr: self.curr,
450            next: self.next,
451            need_write: Cell::new(self.need_write.get()),
452        };
453        let result = f(&mut buf);
454        self.need_write.set(buf.need_write.get());
455        result
456    }
457}
458
459#[derive(Debug)]
460pub struct WriteBuf<'a> {
461    pub(crate) io: &'a IoRef,
462    pub(crate) curr: &'a Buffer,
463    pub(crate) next: &'a Buffer,
464    pub(crate) need_write: Cell<bool>,
465}
466
467impl WriteBuf<'_> {
468    #[inline]
469    /// Get io tag
470    pub fn tag(&self) -> &'static str {
471        self.io.tag()
472    }
473
474    #[inline]
475    /// Get buffer params
476    pub fn cfg(&self) -> &BufConfig {
477        self.io.cfg().write_buf()
478    }
479
480    #[inline]
481    /// Initiate graceful io stream shutdown
482    pub fn want_shutdown(&self) {
483        self.io.want_shutdown();
484    }
485
486    #[inline]
487    /// Make sure buffer has enough free space
488    pub fn resize_buf(&self, buf: &mut BytesMut) {
489        self.io.cfg().write_buf().resize(buf);
490    }
491
492    #[inline]
493    /// Make sure buffer has enough free space
494    pub fn resize_buf_min(&self, buf: &mut BytesMut, size: usize) {
495        self.io.cfg().write_buf().resize_min(buf, size);
496    }
497
498    #[inline]
499    /// Get reference to source write buffer
500    pub fn with_src<F, R>(&self, f: F) -> R
501    where
502        F: FnOnce(&mut Option<BytesMut>) -> R,
503    {
504        let mut wb = self.curr.write.take();
505        let result = f(&mut wb);
506        if let Some(b) = wb {
507            if b.is_empty() {
508                self.io.cfg().write_buf().release(b);
509            } else {
510                self.curr.write.set(Some(b));
511            }
512        }
513        result
514    }
515
516    #[inline]
517    /// Get reference to destination write buffer
518    pub fn with_dst<F, R>(&self, f: F) -> R
519    where
520        F: FnOnce(&mut BytesMut) -> R,
521    {
522        let mut wb = self
523            .next
524            .write
525            .take()
526            .unwrap_or_else(|| self.io.cfg().write_buf().get());
527
528        let total = wb.len();
529        let result = f(&mut wb);
530
531        if wb.is_empty() {
532            self.io.cfg().write_buf().release(wb);
533        } else {
534            self.need_write
535                .set(self.need_write.get() | (total != wb.len()));
536            self.next.write.set(Some(wb));
537        }
538        result
539    }
540
541    #[inline]
542    /// Take source write buffer
543    pub fn take_src(&self) -> Option<BytesMut> {
544        self.curr.write.take().and_then(|b| {
545            if b.is_empty() {
546                self.io.cfg().write_buf().release(b);
547                None
548            } else {
549                Some(b)
550            }
551        })
552    }
553
554    #[inline]
555    /// Set source write buffer
556    pub fn set_src(&self, src: Option<BytesMut>) {
557        if let Some(src) = src {
558            if src.is_empty() {
559                self.io.cfg().write_buf().release(src);
560            } else if let Some(mut buf) = self.curr.write.take() {
561                buf.extend_from_slice(&src);
562                self.curr.write.set(Some(buf));
563                self.io.cfg().write_buf().release(src);
564            } else {
565                self.curr.write.set(Some(src));
566            }
567        }
568    }
569
570    #[inline]
571    /// Take destination write buffer
572    pub fn take_dst(&self) -> BytesMut {
573        self.next
574            .write
575            .take()
576            .unwrap_or_else(|| self.io.cfg().write_buf().get())
577    }
578
579    #[inline]
580    /// Set destination write buffer
581    pub fn set_dst(&self, dst: Option<BytesMut>) {
582        if let Some(dst) = dst {
583            if dst.is_empty() {
584                self.io.cfg().write_buf().release(dst);
585            } else {
586                self.need_write.set(true);
587
588                if let Some(mut buf) = self.next.write.take() {
589                    buf.extend_from_slice(&dst);
590                    self.next.write.set(Some(buf));
591                    self.io.cfg().write_buf().release(dst);
592                } else {
593                    self.next.write.set(Some(dst));
594                }
595            }
596        }
597    }
598
599    #[inline]
600    pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
601    where
602        F: FnOnce(&ReadBuf<'b>) -> R,
603    {
604        let mut buf = ReadBuf {
605            io: self.io,
606            curr: self.curr,
607            next: self.next,
608            nbytes: 0,
609            need_write: Cell::new(self.need_write.get()),
610        };
611        let result = f(&mut buf);
612        self.need_write.set(buf.need_write.get());
613        result
614    }
615}