pipebuf/
buf.rs

1use super::{PBufRd, PBufWr};
2
3#[cfg(all(not(feature = "std"), feature = "alloc"))]
4extern crate alloc;
5#[cfg(all(not(feature = "std"), feature = "alloc"))]
6use {alloc::vec, alloc::vec::Vec};
7
8#[cfg(feature = "std")]
9use std::io::{ErrorKind, Read, Write};
10
11/// Efficient byte-pipe buffer
12///
13/// This is the interface that is intended for use by the glue code.
14/// Use [`PipeBuf::wr`] to get a [`PBufWr`] reference to write to the
15/// buffer, and [`PipeBuf::rd`] get a [`PBufRd`] reference to read
16/// from the buffer.  These are the references that should be passed
17/// to component code.  See this crate's top-level documentation for
18/// further discussion of how this works.
19pub struct PipeBuf<T: 'static = u8> {
20    #[cfg(any(feature = "alloc", feature = "std"))]
21    pub(crate) data: Vec<T>,
22    #[cfg(not(any(feature = "alloc", feature = "std")))]
23    pub(crate) data: &'static mut [T],
24    pub(crate) rd: usize,
25    pub(crate) wr: usize,
26    pub(crate) state: PBufState,
27    #[cfg(any(feature = "alloc", feature = "std"))]
28    pub(crate) fixed_capacity: bool,
29}
30
31impl<T: Copy + Default + 'static> PipeBuf<T> {
32    /// Create a new empty pipe buffer
33    #[cfg(any(feature = "std", feature = "alloc"))]
34    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
35    #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
36    #[inline]
37    pub fn new() -> Self {
38        Self {
39            data: Vec::new(),
40            rd: 0,
41            wr: 0,
42            state: PBufState::Open,
43            fixed_capacity: false,
44        }
45    }
46
47    /// Create a new pipe buffer with the given initial capacity
48    #[cfg(any(feature = "std", feature = "alloc"))]
49    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
50    #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
51    #[inline]
52    pub fn with_capacity(cap: usize) -> Self {
53        Self {
54            data: vec![T::default(); cap],
55            rd: 0,
56            wr: 0,
57            state: PBufState::Open,
58            fixed_capacity: false,
59        }
60    }
61
62    /// Create a new pipe buffer with the given fixed capacity.  The
63    /// buffer will never be reallocated.  If a [`PBufWr::space`] call
64    /// requests more space than is available, then the call will
65    /// panic.
66    #[cfg(any(feature = "std", feature = "alloc"))]
67    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
68    #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
69    #[inline]
70    pub fn with_fixed_capacity(cap: usize) -> Self {
71        Self {
72            data: vec![T::default(); cap],
73            rd: 0,
74            wr: 0,
75            state: PBufState::Open,
76            fixed_capacity: true,
77        }
78    }
79
80    /// Create a new pipe buffer backed by the given static memory.
81    /// This is useful for `no_std` without an allocator.  This is a
82    /// safe call, but requires use of `unsafe` in caller code because
83    /// the caller must guarantee that no other code is using this
84    /// static memory.
85    ///
86    /// ```
87    ///# use pipebuf::PipeBuf;
88    ///# use core::ptr::addr_of_mut;
89    /// static mut BUF: [u8; 1024] = [0; 1024];
90    /// let _ = PipeBuf::new_static(unsafe { &mut *addr_of_mut!(BUF) });
91    /// ```
92    #[cfg(feature = "static")]
93    #[cfg_attr(docsrs, doc(cfg(feature = "static")))]
94    #[inline]
95    pub fn new_static(buffer: &'static mut [T]) -> Self {
96        Self {
97            data: buffer,
98            rd: 0,
99            wr: 0,
100            state: PBufState::Open,
101        }
102    }
103
104    /// Reset the buffer to its initial state, i.e. in the `Open`
105    /// state and empty.  The buffer backing memory is not zeroed, so
106    /// malicious code may observe old data in the slice returned by
107    /// [`PBufWr::space`].  If sensitive data would be exposed in this
108    /// case, use [`PipeBuf::reset_and_zero`] instead.
109    #[inline]
110    pub fn reset(&mut self) {
111        self.rd = 0;
112        self.wr = 0;
113        self.state = PBufState::Open;
114    }
115
116    /// Zero the buffer, and reset it to its initial state.  If a
117    /// `PipeBuf` is going to be kept in a pool and reused, it may be
118    /// best to zero it after use so that no sensitive data can leak
119    /// between different parts of the codebase.
120    #[inline]
121    pub fn reset_and_zero(&mut self) {
122        self.data[..].fill(T::default());
123        self.rd = 0;
124        self.wr = 0;
125        self.state = PBufState::Open;
126    }
127
128    /// Get a consumer reference to the buffer
129    #[inline(always)]
130    pub fn rd(&mut self) -> PBufRd<'_, T> {
131        PBufRd { pb: self }
132    }
133
134    /// Get a producer reference to the buffer
135    #[inline(always)]
136    pub fn wr(&mut self) -> PBufWr<'_, T> {
137        PBufWr { pb: self }
138    }
139
140    /// Obtain a tripwire value to detect buffer changes.  See the
141    /// [`PBufTrip`] type for further explanation.
142    #[inline]
143    pub fn tripwire(&self) -> PBufTrip {
144        // The priority here is that a tripwire value can be generated
145        // efficiently without adding any overhead to all the
146        // different operation methods.
147        //
148        // All consumer operations must decrease the value, and all
149        // producer operations must increase the value (in a
150        // wrapping-integer sense).  Otherwise there is a risk that
151        // consuming or producing a few bytes along with another
152        // change may result in the same value, meaning that the
153        // change would be missed.
154        PBufTrip((self.wr - self.rd).wrapping_add(self.state as usize))
155    }
156
157    /// Test whether there has been a change to the buffer since the
158    /// tripwire value provided was obtained.  See [`PBufTrip`].
159    #[inline]
160    pub fn is_tripped(&self, trip: PBufTrip) -> bool {
161        self.tripwire() != trip
162    }
163
164    /// Get the current EOF/push state of the buffer
165    #[inline(always)]
166    pub fn state(&self) -> PBufState {
167        self.state
168    }
169
170    /// Test whether the "push" state is set on the buffer without
171    /// changing the state.
172    #[inline(always)]
173    pub fn is_push(&self) -> bool {
174        self.state == PBufState::Push
175    }
176
177    /// Change the "push" state.  It may be necessary for the glue
178    /// code to override the "push" status being set by a producer if
179    /// the producer is flushing its output too frequently for optimal
180    /// operation of a downstream component.
181    #[inline]
182    pub fn set_push(&mut self, push: bool) {
183        if matches!(self.state, PBufState::Open | PBufState::Push) {
184            self.state = if push {
185                PBufState::Push
186            } else {
187                PBufState::Open
188            };
189        }
190    }
191
192    /// Test whether an EOF has been indicated and consumed, and for
193    /// the case of a `Closed` EOF also that the buffer is empty.
194    /// This means that processing on this [`PipeBuf`] is complete
195    #[inline]
196    pub fn is_done(&self) -> bool {
197        match self.state {
198            PBufState::Aborted => true,
199            PBufState::Closed => self.rd == self.wr,
200            _ => false,
201        }
202    }
203}
204
205#[cfg(feature = "std")]
206#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
207impl Read for PipeBuf<u8> {
208    /// Read data from the pipe-buffer, as much as is available.  The
209    /// following returns are possible:
210    ///
211    /// - `Ok(len)`: Some data was read
212    /// - `Ok(0)`: Successful end-of-file was reached
213    /// - `Err(e)` with `e.kind() == ErrorKind::WouldBlock`: No data available right now
214    /// - `Err(e)` with `e.kind() == ErrorKind::ConnectionAborted`: Aborted end-of-file was reached
215    fn read(&mut self, data: &mut [u8]) -> Result<usize, std::io::Error> {
216        let mut rd = self.rd();
217        if !rd.is_empty() {
218            let slice = rd.data();
219            let len = slice.len().min(data.len());
220            data[..len].copy_from_slice(&slice[..len]);
221            rd.consume(len);
222            Ok(len)
223        } else if rd.consume_eof() {
224            if rd.is_aborted() {
225                Err(ErrorKind::ConnectionAborted.into())
226            } else {
227                Ok(0)
228            }
229        } else {
230            Err(ErrorKind::WouldBlock.into())
231        }
232    }
233}
234
235#[cfg(feature = "std")]
236#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
237impl Write for PipeBuf<u8> {
238    /// Write data to the pipe-buffer.  Never returns an error.  For
239    /// variable-capacity, always succeeds.  For fixed-capacity may
240    /// panic in case more data is written than there is space
241    /// available.
242    fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
243        let mut wr = self.wr();
244        let len = data.len();
245        let slice = wr.space(len);
246        slice.copy_from_slice(data);
247        wr.commit(len);
248        Ok(len)
249    }
250
251    /// Flush sets the "push" state on the [`PipeBuf`]
252    fn flush(&mut self) -> Result<(), std::io::Error> {
253        self.wr().push();
254        Ok(())
255    }
256}
257
258#[cfg(any(feature = "std", feature = "alloc"))]
259#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
260#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
261impl<T: Copy + Default + 'static> Default for PipeBuf<T> {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267/// End-of-file and "push" state of the buffer
268#[derive(Copy, Clone, Eq, PartialEq, Debug)]
269pub enum PBufState {
270    // Note that the values here are selected so that producer
271    // operations increase the value, and consumer operations decrease
272    // the value.  This is necessary for `tripwire` to function
273    // correctly.
274    //
275    // Also values are selected so that typical `is_*` operations can
276    // optimise down to a single comparison.
277    //
278    /// End-of-file has not been reached yet.  More data may follow.
279    Open = 0,
280    /// End-of-file has not been reached yet.  More data may follow.
281    /// Producer has suggested that current data be flushed.
282    Push = 1,
283    /// The producer has reported a successful end-of-file.  Any data
284    /// left in the buffer is the final data of the stream.  The
285    /// consumer has not yet processed the EOF.
286    Closing = 3,
287    /// Successful end-of-file has been reported by the producer and
288    /// processed by the consumer.
289    Closed = 2,
290    /// The producer has reported end-of-file due to some error
291    /// condition.  The data in the stream might be in an inconsistent
292    /// or incomplete state (e.g. a partial record, protocol not
293    /// terminated normally, etc).  The consumer has not yet processed
294    /// the EOF.
295    Aborting = 5,
296    /// Abort end-of-file has been reported by the producer and
297    /// processed by the consumer.
298    Aborted = 4,
299}
300
301/// Tripwire value used to detect changes
302///
303/// This value is obtained using [`PipeBuf::tripwire`],
304/// [`PBufRd::tripwire`] or [`PBufWr::tripwire`], which all calculate
305/// the same value.  See also the [`tripwire!`] macro which allows
306/// tuples of tripwire values to be created and compared.
307///
308/// The value will change in these cases:
309///
310/// - Data is written to the pipe
311/// - Data is read from the pipe
312/// - A "push" state is set or consumed
313/// - An EOF state is set or consumed
314///
315/// This value can be compared before and after some operation to
316/// detect whether a change has occurred.  However that operation must
317/// be purely a consumer operation or purely a producer operation.  If
318/// data is both produced and consumed, then the tripwire value may
319/// return to the same value and the change wouldn't be detected.
320///
321/// These scenarios are supported:
322///
323/// - In a consumer, avoiding re-parsing an input buffer when there
324/// have been no changes made by the producer.  Save a `PBufTrip`
325/// value before returning, and when called the next time, compare it
326/// to the current value.
327///
328/// - In the glue code, detect whether a component call has caused
329/// changes to a buffer.
330///
331/// - In consumer code, check whether some sub-part of the consumer
332/// processing has done something.
333///
334/// - In producer code, check whether some sub-part of the producer
335/// processing has done something.
336///
337/// [`tripwire!`]: macro.tripwire.html
338#[derive(Eq, PartialEq, Copy, Clone)]
339pub struct PBufTrip(usize);
340
341#[cfg(test)]
342mod test {
343    // This test is here so that it can directly check inc/dec of
344    // tripwire values, which is not possible from outside the crate
345    #[cfg(any(feature = "std", feature = "alloc"))]
346    #[test]
347    fn tripwire() {
348        let mut p;
349        let mut t;
350
351        macro_rules! assert_inc {
352            () => {{
353                let n = p.tripwire();
354                assert!(t.0 < n.0, "Expecting increase: {} -> {}", t.0, n.0);
355                t = n;
356            }};
357        }
358        macro_rules! assert_dec {
359            () => {{
360                let n = p.tripwire();
361                assert!(t.0 > n.0, "Expecting decrease: {} -> {}", t.0, n.0);
362                t = n;
363            }};
364        }
365
366        p = super::PipeBuf::new();
367        t = p.tripwire();
368        p.wr().append(b"X");
369        assert_inc!();
370        p.rd().consume(1);
371        assert_dec!();
372        p.wr().push();
373        assert_inc!();
374        assert!(p.rd().consume_push());
375        assert_dec!();
376        p.wr().close();
377        assert_inc!();
378        assert!(p.rd().consume_eof());
379        assert_dec!();
380        let _ = t;
381
382        p = super::PipeBuf::default(); // Same as ::new()
383        t = p.tripwire();
384        p.wr().append(b"X");
385        assert_inc!();
386        p.wr().push();
387        assert_inc!();
388        assert!(p.rd().consume_push());
389        assert_dec!();
390        p.rd().consume(1);
391        assert_dec!();
392        p.wr().abort();
393        assert_inc!();
394        assert!(p.rd().consume_eof());
395        assert_dec!();
396
397        let _ = t;
398    }
399}