ntex_io/
buf.rs

1use std::{cell::Cell, fmt};
2
3use ntex_bytes::{BytesVec, PoolRef};
4use ntex_util::future::Either;
5
6use crate::IoRef;
7
8#[derive(Default)]
9pub(crate) struct Buffer(Cell<Option<BytesVec>>, Cell<Option<BytesVec>>);
10
11impl fmt::Debug for Buffer {
12    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13        let b0 = self.0.take();
14        let b1 = self.1.take();
15        let res = f
16            .debug_struct("Buffer")
17            .field("0", &b0)
18            .field("1", &b1)
19            .finish();
20        self.0.set(b0);
21        self.1.set(b1);
22        res
23    }
24}
25
26#[derive(Debug)]
27pub struct Stack {
28    len: usize,
29    buffers: Either<[Buffer; 3], Vec<Buffer>>,
30}
31
32impl Stack {
33    pub(crate) fn new() -> Self {
34        Self {
35            len: 1,
36            buffers: Either::Left(Default::default()),
37        }
38    }
39
40    pub(crate) fn add_layer(&mut self) {
41        match &mut self.buffers {
42            Either::Left(b) => {
43                // move to vec
44                if self.len == 3 {
45                    let mut vec = vec![Buffer(Cell::new(None), Cell::new(None))];
46                    for item in b.iter_mut().take(self.len) {
47                        vec.push(Buffer(
48                            Cell::new(item.0.take()),
49                            Cell::new(item.1.take()),
50                        ));
51                    }
52                    self.len += 1;
53                    self.buffers = Either::Right(vec);
54                } else {
55                    let mut idx = self.len;
56                    while idx > 0 {
57                        let item = Buffer(
58                            Cell::new(b[idx - 1].0.take()),
59                            Cell::new(b[idx - 1].1.take()),
60                        );
61                        b[idx] = item;
62                        idx -= 1;
63                    }
64                    b[0] = Buffer(Cell::new(None), Cell::new(None));
65                    self.len += 1;
66                }
67            }
68            Either::Right(vec) => {
69                self.len += 1;
70                vec.insert(0, Buffer(Cell::new(None), Cell::new(None)));
71            }
72        }
73    }
74
75    fn get_buffers<F, R>(&self, idx: usize, f: F) -> R
76    where
77        F: FnOnce(&Buffer, &Buffer) -> R,
78    {
79        let buffers = match self.buffers {
80            Either::Left(ref b) => &b[..],
81            Either::Right(ref b) => &b[..],
82        };
83
84        let next = idx + 1;
85        if self.len > next {
86            f(&buffers[idx], &buffers[next])
87        } else {
88            let curr = Buffer(Cell::new(buffers[idx].0.take()), Cell::new(None));
89            let next = Buffer(Cell::new(None), Cell::new(buffers[idx].1.take()));
90
91            let result = f(&curr, &next);
92            buffers[idx].0.set(curr.0.take());
93            buffers[idx].1.set(next.1.take());
94            result
95        }
96    }
97
98    fn get_first_level(&self) -> &Buffer {
99        match &self.buffers {
100            Either::Left(b) => &b[0],
101            Either::Right(b) => &b[0],
102        }
103    }
104
105    fn get_last_level(&self) -> &Buffer {
106        match &self.buffers {
107            Either::Left(b) => &b[self.len - 1],
108            Either::Right(b) => &b[self.len - 1],
109        }
110    }
111
112    pub(crate) fn read_buf<F, R>(&self, io: &IoRef, idx: usize, nbytes: usize, f: F) -> R
113    where
114        F: FnOnce(&ReadBuf<'_>) -> R,
115    {
116        self.get_buffers(idx, |curr, next| {
117            let buf = ReadBuf {
118                io,
119                nbytes,
120                curr,
121                next,
122                need_write: Cell::new(false),
123            };
124            f(&buf)
125        })
126    }
127
128    pub(crate) fn write_buf<F, R>(&self, io: &IoRef, idx: usize, f: F) -> R
129    where
130        F: FnOnce(&WriteBuf<'_>) -> R,
131    {
132        self.get_buffers(idx, |curr, next| {
133            let buf = WriteBuf {
134                io,
135                curr,
136                next,
137                need_write: Cell::new(false),
138            };
139            f(&buf)
140        })
141    }
142
143    pub(crate) fn get_read_source(&self) -> Option<BytesVec> {
144        self.get_last_level().0.take()
145    }
146
147    pub(crate) fn set_read_source(&self, io: &IoRef, buf: BytesVec) {
148        if buf.is_empty() {
149            io.memory_pool().release_read_buf(buf);
150        } else {
151            self.get_last_level().0.set(Some(buf));
152        }
153    }
154
155    pub(crate) fn with_read_destination<F, R>(&self, io: &IoRef, f: F) -> R
156    where
157        F: FnOnce(&mut BytesVec) -> R,
158    {
159        let item = self.get_first_level();
160        let mut rb = item.0.take();
161        if rb.is_none() {
162            rb = Some(io.memory_pool().get_read_buf());
163        }
164
165        let result = f(rb.as_mut().unwrap());
166
167        // check nested updates
168        if item.0.take().is_some() {
169            log::error!("Nested read io operation is detected");
170            io.force_close();
171        }
172
173        if let Some(b) = rb {
174            if b.is_empty() {
175                io.memory_pool().release_read_buf(b);
176            } else {
177                item.0.set(Some(b));
178            }
179        }
180        result
181    }
182
183    pub(crate) fn with_write_source<F, R>(&self, io: &IoRef, f: F) -> R
184    where
185        F: FnOnce(&mut BytesVec) -> R,
186    {
187        let item = self.get_first_level();
188        let mut wb = item.1.take();
189        if wb.is_none() {
190            wb = Some(io.memory_pool().get_write_buf());
191        }
192
193        let result = f(wb.as_mut().unwrap());
194        if let Some(b) = wb {
195            if b.is_empty() {
196                io.memory_pool().release_write_buf(b);
197            } else {
198                item.1.set(Some(b));
199            }
200        }
201        result
202    }
203
204    pub(crate) fn get_write_destination(&self) -> Option<BytesVec> {
205        self.get_last_level().1.take()
206    }
207
208    pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option<BytesVec> {
209        let b = self.get_last_level().1.take();
210        if b.is_some() {
211            self.get_last_level().1.set(b);
212            Some(buf)
213        } else {
214            self.get_last_level().1.set(Some(buf));
215            None
216        }
217    }
218
219    pub(crate) fn with_write_destination<F, R>(&self, io: &IoRef, f: F) -> R
220    where
221        F: FnOnce(Option<&mut BytesVec>) -> R,
222    {
223        let item = self.get_last_level();
224        let mut wb = item.1.take();
225
226        let result = f(wb.as_mut());
227
228        // check nested updates
229        if item.1.take().is_some() {
230            log::error!("Nested write io operation is detected");
231            io.force_close();
232        }
233
234        if let Some(b) = wb {
235            if b.is_empty() {
236                io.memory_pool().release_write_buf(b);
237            } else {
238                item.1.set(Some(b));
239            }
240        }
241        result
242    }
243
244    pub(crate) fn read_destination_size(&self) -> usize {
245        let item = self.get_first_level();
246        let rb = item.0.take();
247        let size = rb.as_ref().map(|b| b.len()).unwrap_or(0);
248        item.0.set(rb);
249        size
250    }
251
252    pub(crate) fn write_destination_size(&self) -> usize {
253        let item = self.get_last_level();
254        let wb = item.1.take();
255        let size = wb.as_ref().map(|b| b.len()).unwrap_or(0);
256        item.1.set(wb);
257        size
258    }
259
260    pub(crate) fn release(&self, pool: PoolRef) {
261        let items = match &self.buffers {
262            Either::Left(b) => &b[..],
263            Either::Right(b) => &b[..],
264        };
265
266        for item in items {
267            if let Some(buf) = item.0.take() {
268                pool.release_read_buf(buf);
269            }
270            if let Some(buf) = item.1.take() {
271                pool.release_write_buf(buf);
272            }
273        }
274    }
275
276    pub(crate) fn set_memory_pool(&self, pool: PoolRef) {
277        let items = match &self.buffers {
278            Either::Left(b) => &b[..],
279            Either::Right(b) => &b[..],
280        };
281        for item in items {
282            if let Some(mut b) = item.0.take() {
283                pool.move_vec_in(&mut b);
284                item.0.set(Some(b));
285            }
286            if let Some(mut b) = item.1.take() {
287                pool.move_vec_in(&mut b);
288                item.1.set(Some(b));
289            }
290        }
291    }
292}
293
294#[derive(Debug)]
295pub struct ReadBuf<'a> {
296    pub(crate) io: &'a IoRef,
297    pub(crate) curr: &'a Buffer,
298    pub(crate) next: &'a Buffer,
299    pub(crate) nbytes: usize,
300    pub(crate) need_write: Cell<bool>,
301}
302
303impl ReadBuf<'_> {
304    #[inline]
305    /// Get io tag
306    pub fn tag(&self) -> &'static str {
307        self.io.tag()
308    }
309
310    #[inline]
311    /// Get number of newly added bytes
312    pub fn nbytes(&self) -> usize {
313        self.nbytes
314    }
315
316    #[inline]
317    /// Initiate graceful io stream shutdown
318    pub fn want_shutdown(&self) {
319        self.io.want_shutdown()
320    }
321
322    #[inline]
323    /// Make sure buffer has enough free space
324    pub fn resize_buf(&self, buf: &mut BytesVec) {
325        self.io.memory_pool().resize_read_buf(buf);
326    }
327
328    #[inline]
329    /// Get reference to source read buffer
330    pub fn with_src<F, R>(&self, f: F) -> R
331    where
332        F: FnOnce(&mut Option<BytesVec>) -> R,
333    {
334        let mut item = self.next.0.take();
335        let result = f(&mut item);
336
337        if let Some(b) = item {
338            if b.is_empty() {
339                self.io.memory_pool().release_read_buf(b);
340            } else {
341                self.next.0.set(Some(b));
342            }
343        }
344        result
345    }
346
347    #[inline]
348    /// Get reference to destination read buffer
349    pub fn with_dst<F, R>(&self, f: F) -> R
350    where
351        F: FnOnce(&mut BytesVec) -> R,
352    {
353        let mut item = self.curr.0.take();
354        if item.is_none() {
355            item = Some(self.io.memory_pool().get_read_buf());
356        }
357        let result = f(item.as_mut().unwrap());
358        if let Some(b) = item {
359            if b.is_empty() {
360                self.io.memory_pool().release_read_buf(b);
361            } else {
362                self.curr.0.set(Some(b));
363            }
364        }
365        result
366    }
367
368    #[inline]
369    /// Take source read buffer
370    pub fn take_src(&self) -> Option<BytesVec> {
371        self.next.0.take().and_then(|b| {
372            if b.is_empty() {
373                self.io.memory_pool().release_read_buf(b);
374                None
375            } else {
376                Some(b)
377            }
378        })
379    }
380
381    #[inline]
382    /// Set source read buffer
383    pub fn set_src(&self, src: Option<BytesVec>) {
384        if let Some(src) = src {
385            if src.is_empty() {
386                self.io.memory_pool().release_read_buf(src);
387            } else if let Some(mut buf) = self.next.0.take() {
388                buf.extend_from_slice(&src);
389                self.next.0.set(Some(buf));
390                self.io.memory_pool().release_read_buf(src);
391            } else {
392                self.next.0.set(Some(src));
393            }
394        }
395    }
396
397    #[inline]
398    /// Take destination read buffer
399    pub fn take_dst(&self) -> BytesVec {
400        self.curr
401            .0
402            .take()
403            .unwrap_or_else(|| self.io.memory_pool().get_read_buf())
404    }
405
406    #[inline]
407    /// Set destination read buffer
408    pub fn set_dst(&self, dst: Option<BytesVec>) {
409        if let Some(dst) = dst {
410            if dst.is_empty() {
411                self.io.memory_pool().release_read_buf(dst);
412            } else if let Some(mut buf) = self.curr.0.take() {
413                buf.extend_from_slice(&dst);
414                self.curr.0.set(Some(buf));
415                self.io.memory_pool().release_read_buf(dst);
416            } else {
417                self.curr.0.set(Some(dst));
418            }
419        }
420    }
421
422    #[inline]
423    pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R
424    where
425        F: FnOnce(&WriteBuf<'b>) -> R,
426    {
427        let mut buf = WriteBuf {
428            io: self.io,
429            curr: self.curr,
430            next: self.next,
431            need_write: Cell::new(self.need_write.get()),
432        };
433        let result = f(&mut buf);
434        self.need_write.set(buf.need_write.get());
435        result
436    }
437}
438
439#[derive(Debug)]
440pub struct WriteBuf<'a> {
441    pub(crate) io: &'a IoRef,
442    pub(crate) curr: &'a Buffer,
443    pub(crate) next: &'a Buffer,
444    pub(crate) need_write: Cell<bool>,
445}
446
447impl WriteBuf<'_> {
448    #[inline]
449    /// Get io tag
450    pub fn tag(&self) -> &'static str {
451        self.io.tag()
452    }
453
454    #[inline]
455    /// Initiate graceful io stream shutdown
456    pub fn want_shutdown(&self) {
457        self.io.want_shutdown()
458    }
459
460    #[inline]
461    /// Make sure buffer has enough free space
462    pub fn resize_buf(&self, buf: &mut BytesVec) {
463        self.io.memory_pool().resize_write_buf(buf);
464    }
465
466    #[inline]
467    /// Get reference to source write buffer
468    pub fn with_src<F, R>(&self, f: F) -> R
469    where
470        F: FnOnce(&mut Option<BytesVec>) -> R,
471    {
472        let mut item = self.curr.1.take();
473        let result = f(&mut item);
474        if let Some(b) = item {
475            if b.is_empty() {
476                self.io.memory_pool().release_write_buf(b);
477            } else {
478                self.curr.1.set(Some(b));
479            }
480        }
481        result
482    }
483
484    #[inline]
485    /// Get reference to destination write buffer
486    pub fn with_dst<F, R>(&self, f: F) -> R
487    where
488        F: FnOnce(&mut BytesVec) -> R,
489    {
490        let mut item = self.next.1.take();
491        if item.is_none() {
492            item = Some(self.io.memory_pool().get_write_buf());
493        }
494        let buf = item.as_mut().unwrap();
495        let total = buf.len();
496        let result = f(buf);
497
498        if buf.is_empty() {
499            self.io.memory_pool().release_write_buf(item.unwrap());
500        } else {
501            self.need_write
502                .set(self.need_write.get() | (total != buf.len()));
503            self.next.1.set(item);
504        }
505        result
506    }
507
508    #[inline]
509    /// Take source write buffer
510    pub fn take_src(&self) -> Option<BytesVec> {
511        self.curr.1.take().and_then(|b| {
512            if b.is_empty() {
513                self.io.memory_pool().release_write_buf(b);
514                None
515            } else {
516                Some(b)
517            }
518        })
519    }
520
521    #[inline]
522    /// Set source write buffer
523    pub fn set_src(&self, src: Option<BytesVec>) {
524        if let Some(src) = src {
525            if src.is_empty() {
526                self.io.memory_pool().release_write_buf(src);
527            } else if let Some(mut buf) = self.curr.1.take() {
528                buf.extend_from_slice(&src);
529                self.curr.1.set(Some(buf));
530                self.io.memory_pool().release_write_buf(src);
531            } else {
532                self.curr.1.set(Some(src));
533            }
534        }
535    }
536
537    #[inline]
538    /// Take destination write buffer
539    pub fn take_dst(&self) -> BytesVec {
540        self.next
541            .1
542            .take()
543            .unwrap_or_else(|| self.io.memory_pool().get_write_buf())
544    }
545
546    #[inline]
547    /// Set destination write buffer
548    pub fn set_dst(&self, dst: Option<BytesVec>) {
549        if let Some(dst) = dst {
550            if dst.is_empty() {
551                self.io.memory_pool().release_write_buf(dst);
552            } else {
553                self.need_write.set(true);
554
555                if let Some(mut buf) = self.next.1.take() {
556                    buf.extend_from_slice(&dst);
557                    self.next.1.set(Some(buf));
558                    self.io.memory_pool().release_write_buf(dst);
559                } else {
560                    self.next.1.set(Some(dst));
561                }
562            }
563        }
564    }
565
566    #[inline]
567    pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R
568    where
569        F: FnOnce(&ReadBuf<'b>) -> R,
570    {
571        let mut buf = ReadBuf {
572            io: self.io,
573            curr: self.curr,
574            next: self.next,
575            nbytes: 0,
576            need_write: Cell::new(self.need_write.get()),
577        };
578        let result = f(&mut buf);
579        self.need_write.set(buf.need_write.get());
580        result
581    }
582}