ntex_io/
buf.rs

1use std::{cell::Cell, fmt};
2
3use ntex_bytes::BytesVec;
4use ntex_util::future::Either;
5
6use crate::{IoRef, cfg::BufConfig};
7
8#[derive(Default)]
9pub(crate) struct Buffer {
10    read: Cell<Option<BytesVec>>,
11    write: Cell<Option<BytesVec>>,
12}
13
14impl fmt::Debug for Buffer {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        let b0 = self.read.take();
17        let b1 = self.write.take();
18        let res = f
19            .debug_struct("Buffer")
20            .field("read", &b0)
21            .field("write", &b1)
22            .finish();
23        self.read.set(b0);
24        self.write.set(b1);
25        res
26    }
27}
28
29const INLINE_SIZE: usize = 3;
30
31#[derive(Debug)]
32pub(crate) struct Stack {
33    len: usize,
34    buffers: Either<[Buffer; INLINE_SIZE], Vec<Buffer>>,
35}
36
37impl Stack {
38    pub(crate) fn new() -> Self {
39        Self {
40            len: 1,
41            buffers: Either::Left(Default::default()),
42        }
43    }
44
45    pub(crate) fn add_layer(&mut self) {
46        match &mut self.buffers {
47            Either::Left(b) => {
48                // move to vec
49                if self.len == INLINE_SIZE {
50                    let mut vec = vec![Buffer {
51                        read: Cell::new(None),
52                        write: Cell::new(None),
53                    }];
54                    for item in b.iter_mut().take(self.len) {
55                        vec.push(Buffer {
56                            read: Cell::new(item.read.take()),
57                            write: Cell::new(item.write.take()),
58                        });
59                    }
60                    self.len += 1;
61                    self.buffers = Either::Right(vec);
62                } else {
63                    let mut idx = self.len;
64                    while idx > 0 {
65                        let item = Buffer {
66                            read: Cell::new(b[idx - 1].read.take()),
67                            write: Cell::new(b[idx - 1].write.take()),
68                        };
69                        b[idx] = item;
70                        idx -= 1;
71                    }
72                    b[0] = Buffer {
73                        read: Cell::new(None),
74                        write: Cell::new(None),
75                    };
76                    self.len += 1;
77                }
78            }
79            Either::Right(vec) => {
80                self.len += 1;
81                vec.insert(
82                    0,
83                    Buffer {
84                        read: Cell::new(None),
85                        write: Cell::new(None),
86                    },
87                );
88            }
89        }
90    }
91
92    fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
93    where
94        F: FnOnce(&Buffer, &Buffer) -> R,
95    {
96        let buffers = match self.buffers {
97            Either::Left(ref b) => &b[..],
98            Either::Right(ref b) => &b[..],
99        };
100
101        let next = idx + 1;
102        if self.len > next {
103            f(&buffers[idx], &buffers[next])
104        } else {
105            f(&buffers[idx], &Buffer::default())
106        }
107    }
108
109    fn get_first_level(&self) -> &Buffer {
110        match &self.buffers {
111            Either::Left(b) => &b[0],
112            Either::Right(b) => &b[0],
113        }
114    }
115
116    fn get_last_level(&self) -> &Buffer {
117        match &self.buffers {
118            Either::Left(b) => &b[self.len - 1],
119            Either::Right(b) => &b[self.len - 1],
120        }
121    }
122
123    pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
124        self.get_last_level().read.take()
125    }
126
127    pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
128        if buf.is_empty() {
129            io.cfg().read_buf().release(buf);
130        } else {
131            self.get_last_level().read.set(Some(buf));
132        }
133    }
134
135    pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
136    where
137        F: FnOnce(&mut BytesVec) -> R,
138    {
139        let item = self.get_first_level();
140        let mut rb = item
141            .read
142            .take()
143            .unwrap_or_else(|| io.cfg().read_buf().get());
144
145        let result = f(&mut rb);
146
147        // check nested updates
148        if item.read.take().is_some() {
149            log::error!("Nested read io operation is detected");
150            io.force_close();
151        }
152
153        if rb.is_empty() {
154            io.cfg().read_buf().release(rb);
155        } else {
156            item.read.set(Some(rb));
157        }
158        result
159    }
160
161    pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
162        self.get_last_level().write.take()
163    }
164
165    pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
166        let b = self.get_last_level().write.take();
167        if b.is_some() {
168            self.get_last_level().write.set(b);
169            Some(buf)
170        } else {
171            self.get_last_level().write.set(Some(buf));
172            None
173        }
174    }
175
176    pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
177    where
178        F: FnOnce(&mut BytesVec) -> R,
179    {
180        let item = self.get_first_level();
181        let mut wb = item
182            .write
183            .take()
184            .unwrap_or_else(|| io.cfg().write_buf().get());
185
186        let result = f(&mut wb);
187        if wb.is_empty() {
188            io.cfg().write_buf().release(wb);
189        } else {
190            item.write.set(Some(wb));
191        }
192        result
193    }
194
195    pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
196    where
197        F: FnOnce(Option<&mut BytesVec>) -> R,
198    {
199        let item = self.get_last_level();
200        let mut wb = item.write.take();
201
202        let result = f(wb.as_mut());
203
204        // check nested updates
205        if item.write.take().is_some() {
206            log::error!("Nested write io operation is detected");
207            io.force_close();
208        }
209
210        if let Some(b) = wb {
211            if b.is_empty() {
212                io.cfg().write_buf().release(b);
213            } else {
214                item.write.set(Some(b));
215            }
216        }
217        result
218    }
219
220    pub(crate) fn read_destination_size(&self) -> usize {
221        let item = self.get_first_level();
222        let rb = item.read.take();
223        let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
224        item.read.set(rb);
225        size
226    }
227
228    pub(crate) fn write_destination_size(&self) -> usize {
229        let item = self.get_last_level();
230        let wb = item.write.take();
231        let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
232        item.write.set(wb);
233        size
234    }
235}
236
237#[derive(Copy, Clone, Debug)]
238pub struct FilterCtx<'a> {
239    pub(crate) io: &'a IoRef,
240    pub(crate) stack: &'a Stack,
241    pub(crate) idx: usize,
242}
243
244impl<'a> FilterCtx<'a> {
245    pub(crate) fn new(io: &'a IoRef, stack: &'a Stack) -> Self {
246        Self { io, stack, idx: 0 }
247    }
248
249    #[inline]
250    pub fn tag(&self) -> &'static str {
251        self.io.tag()
252    }
253
254    #[inline]
255    pub fn next(&self) -> Self {
256        Self {
257            io: self.io,
258            stack: self.stack,
259            idx: self.idx + 1,
260        }
261    }
262
263    #[inline]
264    pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
265    where
266        F: FnOnce(&ReadBuf<'_>) -> R,
267    {
268        self.stack.get_buffers(self.idx, |curr, next| {
269            let buf = ReadBuf {
270                nbytes,
271                curr,
272                next,
273                io: self.io,
274                need_write: Cell::new(false),
275            };
276            f(&buf)
277        })
278    }
279
280    #[inline]
281    pub fn write_buf<F, R>(&self, f: F) -> R
282    where
283        F: FnOnce(&WriteBuf<'_>) -> R,
284    {
285        self.stack.get_buffers(self.idx, |curr, next| {
286            let buf = WriteBuf {
287                curr,
288                next,
289                io: self.io,
290                need_write: Cell::new(false),
291            };
292            f(&buf)
293        })
294    }
295}
296
297#[derive(Debug)]
298pub struct ReadBuf<'a> {
299    pub(crate) io: &'a IoRef,
300    pub(crate) curr: &'a Buffer,
301    pub(crate) next: &'a Buffer,
302    pub(crate) nbytes: usize,
303    pub(crate) need_write: Cell<bool>,
304}
305
306impl ReadBuf<'_> {
307    #[inline]
308    /// Get io tag
309    pub fn tag(&self) -> &'static str {
310        self.io.tag()
311    }
312
313    #[inline]
314    /// Get buffer params
315    pub fn cfg(&self) -> &BufConfig {
316        self.io.cfg().read_buf()
317    }
318
319    #[inline]
320    /// Get number of newly added bytes
321    pub fn nbytes(&self) -> usize {
322        self.nbytes
323    }
324
325    #[inline]
326    /// Initiate graceful io stream shutdown
327    pub fn want_shutdown(&self) {
328        self.io.want_shutdown()
329    }
330
331    #[inline]
332    /// Make sure buffer has enough free space
333    pub fn resize_buf(&self, buf: &mut BytesVec) {
334        self.io.cfg().read_buf().resize(buf);
335    }
336
337    #[inline]
338    /// Get reference to source read buffer
339    pub fn with_src<F, R>(&self, f: F) -> R
340    where
341        F: FnOnce(&mut Option<BytesVec>) -> R,
342    {
343        let mut buf = self.next.read.take();
344        let result = f(&mut buf);
345
346        if let Some(b) = buf {
347            if b.is_empty() {
348                self.io.cfg().read_buf().release(b);
349            } else {
350                self.next.read.set(Some(b));
351            }
352        }
353        result
354    }
355
356    #[inline]
357    /// Get reference to destination read buffer
358    pub fn with_dst<F, R>(&self, f: F) -> R
359    where
360        F: FnOnce(&mut BytesVec) -> R,
361    {
362        let mut rb = self
363            .curr
364            .read
365            .take()
366            .unwrap_or_else(|| self.io.cfg().read_buf().get());
367
368        let result = f(&mut rb);
369        if rb.is_empty() {
370            self.io.cfg().read_buf().release(rb);
371        } else {
372            self.curr.read.set(Some(rb));
373        }
374        result
375    }
376
377    #[inline]
378    /// Take source read buffer
379    pub fn take_src(&self) -> Option<BytesVec> {
380        self.next.read.take().and_then(|b| {
381            if b.is_empty() {
382                self.io.cfg().read_buf().release(b);
383                None
384            } else {
385                Some(b)
386            }
387        })
388    }
389
390    #[inline]
391    /// Set source read buffer
392    pub fn set_src(&self, src: Option<BytesVec>) {
393        if let Some(src) = src {
394            if src.is_empty() {
395                self.io.cfg().read_buf().release(src);
396            } else if let Some(mut buf) = self.next.read.take() {
397                buf.extend_from_slice(&src);
398                self.next.read.set(Some(buf));
399                self.io.cfg().read_buf().release(src);
400            } else {
401                self.next.read.set(Some(src));
402            }
403        }
404    }
405
406    #[inline]
407    /// Take destination read buffer
408    pub fn take_dst(&self) -> BytesVec {
409        self.curr
410            .read
411            .take()
412            .unwrap_or_else(|| self.io.cfg().read_buf().get())
413    }
414
415    #[inline]
416    /// Set destination read buffer
417    pub fn set_dst(&self, dst: Option<BytesVec>) {
418        if let Some(dst) = dst {
419            if dst.is_empty() {
420                self.io.cfg().read_buf().release(dst);
421            } else if let Some(mut buf) = self.curr.read.take() {
422                buf.extend_from_slice(&dst);
423                self.curr.read.set(Some(buf));
424                self.io.cfg().read_buf().release(dst);
425            } else {
426                self.curr.read.set(Some(dst));
427            }
428        }
429    }
430
431    #[inline]
432    pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
433    where
434        F: FnOnce(&WriteBuf<'b>) -> R,
435    {
436        let mut buf = WriteBuf {
437            io: self.io,
438            curr: self.curr,
439            next: self.next,
440            need_write: Cell::new(self.need_write.get()),
441        };
442        let result = f(&mut buf);
443        self.need_write.set(buf.need_write.get());
444        result
445    }
446}
447
448#[derive(Debug)]
449pub struct WriteBuf<'a> {
450    pub(crate) io: &'a IoRef,
451    pub(crate) curr: &'a Buffer,
452    pub(crate) next: &'a Buffer,
453    pub(crate) need_write: Cell<bool>,
454}
455
456impl WriteBuf<'_> {
457    #[inline]
458    /// Get io tag
459    pub fn tag(&self) -> &'static str {
460        self.io.tag()
461    }
462
463    #[inline]
464    /// Get buffer params
465    pub fn cfg(&self) -> &BufConfig {
466        self.io.cfg().write_buf()
467    }
468
469    #[inline]
470    /// Initiate graceful io stream shutdown
471    pub fn want_shutdown(&self) {
472        self.io.want_shutdown()
473    }
474
475    #[inline]
476    /// Make sure buffer has enough free space
477    pub fn resize_buf(&self, buf: &mut BytesVec) {
478        self.io.cfg().write_buf().resize(buf);
479    }
480
481    #[inline]
482    /// Get reference to source write buffer
483    pub fn with_src<F, R>(&self, f: F) -> R
484    where
485        F: FnOnce(&mut Option<BytesVec>) -> R,
486    {
487        let mut wb = self.curr.write.take();
488        let result = f(&mut wb);
489        if let Some(b) = wb {
490            if b.is_empty() {
491                self.io.cfg().write_buf().release(b);
492            } else {
493                self.curr.write.set(Some(b));
494            }
495        }
496        result
497    }
498
499    #[inline]
500    /// Get reference to destination write buffer
501    pub fn with_dst<F, R>(&self, f: F) -> R
502    where
503        F: FnOnce(&mut BytesVec) -> R,
504    {
505        let mut wb = self
506            .next
507            .write
508            .take()
509            .unwrap_or_else(|| self.io.cfg().write_buf().get());
510
511        let total = wb.len();
512        let result = f(&mut wb);
513
514        if wb.is_empty() {
515            self.io.cfg().write_buf().release(wb);
516        } else {
517            self.need_write
518                .set(self.need_write.get() | (total != wb.len()));
519            self.next.write.set(Some(wb));
520        }
521        result
522    }
523
524    #[inline]
525    /// Take source write buffer
526    pub fn take_src(&self) -> Option<BytesVec> {
527        self.curr.write.take().and_then(|b| {
528            if b.is_empty() {
529                self.io.cfg().write_buf().release(b);
530                None
531            } else {
532                Some(b)
533            }
534        })
535    }
536
537    #[inline]
538    /// Set source write buffer
539    pub fn set_src(&self, src: Option<BytesVec>) {
540        if let Some(src) = src {
541            if src.is_empty() {
542                self.io.cfg().write_buf().release(src);
543            } else if let Some(mut buf) = self.curr.write.take() {
544                buf.extend_from_slice(&src);
545                self.curr.write.set(Some(buf));
546                self.io.cfg().write_buf().release(src);
547            } else {
548                self.curr.write.set(Some(src));
549            }
550        }
551    }
552
553    #[inline]
554    /// Take destination write buffer
555    pub fn take_dst(&self) -> BytesVec {
556        self.next
557            .write
558            .take()
559            .unwrap_or_else(|| self.io.cfg().write_buf().get())
560    }
561
562    #[inline]
563    /// Set destination write buffer
564    pub fn set_dst(&self, dst: Option<BytesVec>) {
565        if let Some(dst) = dst {
566            if dst.is_empty() {
567                self.io.cfg().write_buf().release(dst);
568            } else {
569                self.need_write.set(true);
570
571                if let Some(mut buf) = self.next.write.take() {
572                    buf.extend_from_slice(&dst);
573                    self.next.write.set(Some(buf));
574                    self.io.cfg().write_buf().release(dst);
575                } else {
576                    self.next.write.set(Some(dst));
577                }
578            }
579        }
580    }
581
582    #[inline]
583    pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
584    where
585        F: FnOnce(&ReadBuf<'b>) -> R,
586    {
587        let mut buf = ReadBuf {
588            io: self.io,
589            curr: self.curr,
590            next: self.next,
591            nbytes: 0,
592            need_write: Cell::new(self.need_write.get()),
593        };
594        let result = f(&mut buf);
595        self.need_write.set(buf.need_write.get());
596        result
597    }
598}