Skip to main content

ntex_io/
buf.rs

1use std::{cell::Cell, fmt, io, task::Poll};
2
3use ntex_bytes::{BytePageSize, BytePages, BytesMut};
4
5use crate::{IoConfig, IoRef};
6
7pub(crate) struct Stack {
8    buffers: Vec<Buffer>,
9}
10
11#[derive(Default)]
12struct Buffer {
13    read: Cell<Option<BytesMut>>,
14    write: Cell<Option<BytePages>>,
15}
16
17impl fmt::Debug for Stack {
18    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19        f.debug_struct("Stack")
20            .field("len", &self.buffers.len())
21            .finish()
22    }
23}
24
25impl Stack {
26    pub(crate) fn new(size: BytePageSize) -> Self {
27        Self {
28            buffers: vec![
29                Buffer {
30                    read: Cell::new(None),
31                    write: Cell::new(Some(BytePages::new(size))),
32                },
33                Buffer {
34                    read: Cell::new(None),
35                    write: Cell::new(Some(BytePages::new(size))),
36                },
37            ],
38        }
39    }
40
41    pub(crate) fn set_page_size(&self, size: BytePageSize) {
42        for b in &self.buffers {
43            b.with_write(|b| b.set_page_size(size));
44        }
45    }
46
47    pub(crate) fn add_layer(&mut self, page_size: BytePageSize) {
48        self.buffers.insert(
49            0,
50            Buffer {
51                read: Cell::new(None),
52                write: Cell::new(Some(BytePages::new(page_size))),
53            },
54        );
55    }
56
57    fn with_first<F, R>(&self, f: F) -> R
58    where
59        F: FnOnce(&Buffer) -> R,
60    {
61        f(&self.buffers[0])
62    }
63
64    fn with_last<F, R>(&self, f: F) -> R
65    where
66        F: FnOnce(&Buffer) -> R,
67    {
68        f(&self.buffers[self.buffers.len() - 2])
69    }
70
71    pub(crate) fn with_read_dst<F, R>(&self, io: &IoRef, f: F) -> R
72    where
73        F: FnOnce(&mut BytesMut) -> R,
74    {
75        self.with_first(|buf| buf.with_read(io, f))
76    }
77
78    pub(crate) fn write_buf_size(&self) -> usize {
79        // check size for first level because delayed filter processing
80        if self.buffers.len() == 2 {
81            self.buffers[0].write_len()
82        } else {
83            self.buffers[0].write_len() + self.buffers[self.buffers.len() - 2].write_len()
84        }
85    }
86
87    pub(crate) fn with_write_src<F, R>(&self, f: F) -> R
88    where
89        F: FnOnce(&mut BytePages) -> R,
90    {
91        self.buffers[0].with_write(f)
92    }
93
94    pub(crate) fn with_write_dst<F, R>(&self, f: F) -> R
95    where
96        F: FnOnce(&mut BytePages) -> R,
97    {
98        self.buffers[self.buffers.len() - 2].with_write(f)
99    }
100
101    pub(crate) fn read_dst_size(&self) -> usize {
102        self.buffers[0].read_len()
103    }
104
105    pub(crate) fn with_filter<F, R>(&self, io: &IoRef, f: F) -> R
106    where
107        F: FnOnce(&mut FilterCtx<'_>) -> R,
108    {
109        let mut ctx = FilterCtx {
110            io,
111            idx: 0,
112            nbytes: 0,
113            stack: self,
114            st: FilterUpdates {
115                wants_write: false,
116                notify: false,
117            },
118        };
119        f(&mut ctx)
120    }
121
122    pub(crate) fn get_read_buf(&self) -> Option<BytesMut> {
123        self.with_last(|buffer| buffer.read.take())
124    }
125
126    pub(crate) fn set_read_buf(&self, buf: BytesMut, cfg: &IoConfig) {
127        self.with_last(move |buffer| {
128            if let Some(mut first_buf) = buffer.read.take() {
129                first_buf.extend_from_slice(&buf);
130                cfg.read_buf().release(buf);
131                buffer.read.set(Some(first_buf));
132            } else if !buf.is_empty() {
133                buffer.read.set(Some(buf));
134            } else {
135                cfg.read_buf().release(buf);
136            }
137        });
138    }
139
140    pub(crate) fn process_read_buf(
141        &self,
142        io: &IoRef,
143        nbytes: usize,
144    ) -> io::Result<FilterUpdates> {
145        let mut ctx = FilterCtx {
146            io,
147            nbytes,
148            idx: 0,
149            stack: self,
150            st: FilterUpdates {
151                wants_write: false,
152                notify: false,
153            },
154        };
155        let result = io.filter().process_read_buf(&mut ctx);
156        result.map(|()| ctx.st)
157    }
158
159    pub(crate) fn process_write_buf(&self, io: &IoRef) -> io::Result<()> {
160        if self.buffers[0].is_write_empty() {
161            Ok(())
162        } else {
163            let mut ctx = FilterCtx {
164                io,
165                idx: 0,
166                nbytes: 0,
167                stack: self,
168                st: FilterUpdates {
169                    wants_write: true,
170                    notify: false,
171                },
172            };
173            io.filter().process_write_buf(&mut ctx)
174        }
175    }
176
177    pub(crate) fn process_write_buf_force(&self, io: &IoRef) -> io::Result<()> {
178        let mut ctx = FilterCtx {
179            io,
180            idx: 0,
181            nbytes: 0,
182            stack: self,
183            st: FilterUpdates {
184                wants_write: true,
185                notify: false,
186            },
187        };
188        io.filter().process_write_buf(&mut ctx)
189    }
190
191    pub(crate) fn process_shutdown(&self, io: &IoRef) -> io::Result<Poll<()>> {
192        self.process_write_buf(io)?;
193        self.with_filter(io, |ctx| io.filter().shutdown(ctx))
194    }
195}
196
197impl Buffer {
198    fn is_write_empty(&self) -> bool {
199        self.with_write(|b| b.is_empty())
200    }
201
202    fn read_len(&self) -> usize {
203        if let Some(rb) = self.read.take() {
204            let l = rb.len();
205            self.read.set(Some(rb));
206            l
207        } else {
208            0
209        }
210    }
211
212    fn write_len(&self) -> usize {
213        self.with_write(|b| b.len())
214    }
215
216    fn with_read<F, R>(&self, io: &IoRef, f: F) -> R
217    where
218        F: FnOnce(&mut BytesMut) -> R,
219    {
220        let mut rb = self
221            .read
222            .take()
223            .unwrap_or_else(|| io.cfg().read_buf().get());
224        let result = f(&mut rb);
225
226        // check nested updates
227        if self.read.take().is_some() {
228            log::error!("Nested read io operation is detected");
229            io.terminate();
230        }
231
232        if rb.is_empty() {
233            io.cfg().read_buf().release(rb);
234        } else {
235            self.read.set(Some(rb));
236        }
237        result
238    }
239
240    fn with_write<F, R>(&self, f: F) -> R
241    where
242        F: FnOnce(&mut BytePages) -> R,
243    {
244        let mut wb = self.write.take().unwrap();
245        let result = f(&mut wb);
246        self.write.set(Some(wb));
247        result
248    }
249}
250
251#[derive(Copy, Clone, Debug)]
252pub(crate) struct FilterUpdates {
253    pub(crate) wants_write: bool,
254    pub(crate) notify: bool,
255}
256
257#[derive(Debug)]
258pub struct FilterCtx<'a> {
259    io: &'a IoRef,
260    idx: usize,
261    nbytes: usize,
262    stack: &'a Stack,
263    st: FilterUpdates,
264}
265
266impl FilterCtx<'_> {
267    #[inline]
268    /// Gets a reference to the I/O object.
269    pub fn io(&self) -> &IoRef {
270        self.io
271    }
272
273    #[inline]
274    /// Gets the I/O tag.
275    pub fn tag(&self) -> &'static str {
276        self.io.tag()
277    }
278
279    #[inline]
280    /// Gets new bytes count for read buffer.
281    pub fn new_read_bytes(&self) -> usize {
282        self.nbytes
283    }
284
285    #[inline]
286    /// Notifies about readiness changes.
287    pub fn notify(&mut self) {
288        self.st.notify = true;
289    }
290
291    #[inline]
292    /// Returns the filter context for the next filter in the chain.
293    pub fn with_next<F, R>(&mut self, f: F) -> R
294    where
295        F: FnOnce(&mut Self) -> R,
296    {
297        self.idx += 1;
298        let res = f(self);
299        self.idx -= 1;
300        res
301    }
302
303    #[inline]
304    /// Returns the filter buffer.
305    pub fn with_buffer<F, R>(&mut self, f: F) -> R
306    where
307        F: FnOnce(&mut FilterBuf<'_>) -> R,
308    {
309        let mut buf = FilterBuf {
310            io: self.io,
311            curr: &self.stack.buffers[self.idx],
312            next: &self.stack.buffers[self.idx + 1],
313            wants_write: Cell::new(self.st.wants_write),
314        };
315        let result = f(&mut buf);
316        if buf.wants_write.get() {
317            self.st.wants_write = true;
318        }
319        result
320    }
321
322    #[inline]
323    /// Returns the size of the last read buffer in the chain.
324    pub fn read_dst_size(&self) -> usize {
325        self.stack.buffers[0].read_len()
326    }
327
328    #[inline]
329    /// Returns the size of the last write buffer in the chain.
330    pub fn write_dst_size(&mut self) -> usize {
331        self.stack.buffers[self.stack.buffers.len() - 2].write_len()
332    }
333
334    pub(crate) fn clear_write_buf(&mut self) {
335        self.stack.buffers[self.idx].with_write(BytePages::clear);
336    }
337}
338
339#[derive(Debug)]
340pub struct FilterBuf<'a> {
341    io: &'a IoRef,
342    curr: &'a Buffer,
343    next: &'a Buffer,
344    wants_write: Cell<bool>,
345}
346
347impl FilterBuf<'_> {
348    #[inline]
349    /// Gets a reference to the I/O object.
350    pub fn io(&self) -> &IoRef {
351        self.io
352    }
353
354    #[inline]
355    /// Gets the I/O tag.
356    pub fn tag(&self) -> &'static str {
357        self.io.tag()
358    }
359
360    /// Returns references to the source read buffer.
361    pub fn with_read_src<F, R>(&self, f: F) -> R
362    where
363        F: FnOnce(&mut Option<BytesMut>) -> R,
364    {
365        let mut read_src = self.next.read.take();
366        let result = f(&mut read_src);
367
368        if let Some(b) = read_src {
369            if b.is_empty() {
370                self.io.cfg().read_buf().release(b);
371            } else {
372                self.next.read.set(Some(b));
373            }
374        }
375        result
376    }
377
378    /// Returns references to the source and destination read buffers.
379    pub fn with_read_buffers<F, R>(&self, f: F) -> R
380    where
381        F: FnOnce(&mut Option<BytesMut>, &mut BytesMut) -> R,
382    {
383        let mut read_src = self.next.read.take();
384        let mut read_dst = self
385            .curr
386            .read
387            .take()
388            .unwrap_or_else(|| self.io.cfg().read_buf().get());
389
390        let result = f(&mut read_src, &mut read_dst);
391
392        if let Some(b) = read_src {
393            if b.is_empty() {
394                self.io.cfg().read_buf().release(b);
395            } else {
396                self.next.read.set(Some(b));
397            }
398        }
399        if read_dst.is_empty() {
400            self.io.cfg().read_buf().release(read_dst);
401        } else {
402            self.curr.read.set(Some(read_dst));
403        }
404
405        result
406    }
407
408    #[inline]
409    /// Returns references to the source and destination write buffers.
410    pub fn with_write_buffers<F, R>(&self, f: F) -> R
411    where
412        F: FnOnce(&mut BytePages, &mut BytePages) -> R,
413    {
414        let mut write_curr = self.curr.write.take().unwrap();
415        let mut write_next = self.next.write.take().unwrap();
416        let write_len = if self.wants_write.get() {
417            0
418        } else {
419            write_next.len()
420        };
421
422        let result = f(&mut write_curr, &mut write_next);
423
424        if !self.wants_write.get() && write_next.len() > write_len {
425            self.wants_write.set(true);
426        }
427
428        self.curr.write.set(Some(write_curr));
429        self.next.write.set(Some(write_next));
430        result
431    }
432}
433
434impl fmt::Debug for Buffer {
435    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436        let read = self.read.take();
437        let write = self.write.take();
438
439        let result = f
440            .debug_struct("Buffer")
441            .field("read", &read)
442            .field("write", &write)
443            .finish();
444        self.read.set(read);
445        self.write.set(write);
446        result
447    }
448}