pipebuf/buf.rs
1use super::{PBufRd, PBufWr};
2
3#[cfg(all(not(feature = "std"), feature = "alloc"))]
4extern crate alloc;
5#[cfg(all(not(feature = "std"), feature = "alloc"))]
6use {alloc::vec, alloc::vec::Vec};
7
8#[cfg(feature = "std")]
9use std::io::{ErrorKind, Read, Write};
10
11/// Efficient byte-pipe buffer
12///
13/// This is the interface that is intended for use by the glue code.
14/// Use [`PipeBuf::wr`] to get a [`PBufWr`] reference to write to the
15/// buffer, and [`PipeBuf::rd`] get a [`PBufRd`] reference to read
16/// from the buffer. These are the references that should be passed
17/// to component code. See this crate's top-level documentation for
18/// further discussion of how this works.
19pub struct PipeBuf<T: 'static = u8> {
20 #[cfg(any(feature = "alloc", feature = "std"))]
21 pub(crate) data: Vec<T>,
22 #[cfg(not(any(feature = "alloc", feature = "std")))]
23 pub(crate) data: &'static mut [T],
24 pub(crate) rd: usize,
25 pub(crate) wr: usize,
26 pub(crate) state: PBufState,
27 #[cfg(any(feature = "alloc", feature = "std"))]
28 pub(crate) fixed_capacity: bool,
29}
30
31impl<T: Copy + Default + 'static> PipeBuf<T> {
32 /// Create a new empty pipe buffer
33 #[cfg(any(feature = "std", feature = "alloc"))]
34 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
35 #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
36 #[inline]
37 pub fn new() -> Self {
38 Self {
39 data: Vec::new(),
40 rd: 0,
41 wr: 0,
42 state: PBufState::Open,
43 fixed_capacity: false,
44 }
45 }
46
47 /// Create a new pipe buffer with the given initial capacity
48 #[cfg(any(feature = "std", feature = "alloc"))]
49 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
50 #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
51 #[inline]
52 pub fn with_capacity(cap: usize) -> Self {
53 Self {
54 data: vec![T::default(); cap],
55 rd: 0,
56 wr: 0,
57 state: PBufState::Open,
58 fixed_capacity: false,
59 }
60 }
61
62 /// Create a new pipe buffer with the given fixed capacity. The
63 /// buffer will never be reallocated. If a [`PBufWr::space`] call
64 /// requests more space than is available, then the call will
65 /// panic.
66 #[cfg(any(feature = "std", feature = "alloc"))]
67 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
68 #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
69 #[inline]
70 pub fn with_fixed_capacity(cap: usize) -> Self {
71 Self {
72 data: vec![T::default(); cap],
73 rd: 0,
74 wr: 0,
75 state: PBufState::Open,
76 fixed_capacity: true,
77 }
78 }
79
80 /// Create a new pipe buffer backed by the given static memory.
81 /// This is useful for `no_std` without an allocator. This is a
82 /// safe call, but requires use of `unsafe` in caller code because
83 /// the caller must guarantee that no other code is using this
84 /// static memory.
85 ///
86 /// ```
87 ///# use pipebuf::PipeBuf;
88 ///# use core::ptr::addr_of_mut;
89 /// static mut BUF: [u8; 1024] = [0; 1024];
90 /// let _ = PipeBuf::new_static(unsafe { &mut *addr_of_mut!(BUF) });
91 /// ```
92 #[cfg(feature = "static")]
93 #[cfg_attr(docsrs, doc(cfg(feature = "static")))]
94 #[inline]
95 pub fn new_static(buffer: &'static mut [T]) -> Self {
96 Self {
97 data: buffer,
98 rd: 0,
99 wr: 0,
100 state: PBufState::Open,
101 }
102 }
103
104 /// Reset the buffer to its initial state, i.e. in the `Open`
105 /// state and empty. The buffer backing memory is not zeroed, so
106 /// malicious code may observe old data in the slice returned by
107 /// [`PBufWr::space`]. If sensitive data would be exposed in this
108 /// case, use [`PipeBuf::reset_and_zero`] instead.
109 #[inline]
110 pub fn reset(&mut self) {
111 self.rd = 0;
112 self.wr = 0;
113 self.state = PBufState::Open;
114 }
115
116 /// Zero the buffer, and reset it to its initial state. If a
117 /// `PipeBuf` is going to be kept in a pool and reused, it may be
118 /// best to zero it after use so that no sensitive data can leak
119 /// between different parts of the codebase.
120 #[inline]
121 pub fn reset_and_zero(&mut self) {
122 self.data[..].fill(T::default());
123 self.rd = 0;
124 self.wr = 0;
125 self.state = PBufState::Open;
126 }
127
128 /// Get a consumer reference to the buffer
129 #[inline(always)]
130 pub fn rd(&mut self) -> PBufRd<'_, T> {
131 PBufRd { pb: self }
132 }
133
134 /// Get a producer reference to the buffer
135 #[inline(always)]
136 pub fn wr(&mut self) -> PBufWr<'_, T> {
137 PBufWr { pb: self }
138 }
139
140 /// Obtain a tripwire value to detect buffer changes. See the
141 /// [`PBufTrip`] type for further explanation.
142 #[inline]
143 pub fn tripwire(&self) -> PBufTrip {
144 // The priority here is that a tripwire value can be generated
145 // efficiently without adding any overhead to all the
146 // different operation methods.
147 //
148 // All consumer operations must decrease the value, and all
149 // producer operations must increase the value (in a
150 // wrapping-integer sense). Otherwise there is a risk that
151 // consuming or producing a few bytes along with another
152 // change may result in the same value, meaning that the
153 // change would be missed.
154 PBufTrip((self.wr - self.rd).wrapping_add(self.state as usize))
155 }
156
157 /// Test whether there has been a change to the buffer since the
158 /// tripwire value provided was obtained. See [`PBufTrip`].
159 #[inline]
160 pub fn is_tripped(&self, trip: PBufTrip) -> bool {
161 self.tripwire() != trip
162 }
163
164 /// Get the current EOF/push state of the buffer
165 #[inline(always)]
166 pub fn state(&self) -> PBufState {
167 self.state
168 }
169
170 /// Test whether the "push" state is set on the buffer without
171 /// changing the state.
172 #[inline(always)]
173 pub fn is_push(&self) -> bool {
174 self.state == PBufState::Push
175 }
176
177 /// Change the "push" state. It may be necessary for the glue
178 /// code to override the "push" status being set by a producer if
179 /// the producer is flushing its output too frequently for optimal
180 /// operation of a downstream component.
181 #[inline]
182 pub fn set_push(&mut self, push: bool) {
183 if matches!(self.state, PBufState::Open | PBufState::Push) {
184 self.state = if push {
185 PBufState::Push
186 } else {
187 PBufState::Open
188 };
189 }
190 }
191
192 /// Test whether an EOF has been indicated and consumed, and for
193 /// the case of a `Closed` EOF also that the buffer is empty.
194 /// This means that processing on this [`PipeBuf`] is complete
195 #[inline]
196 pub fn is_done(&self) -> bool {
197 match self.state {
198 PBufState::Aborted => true,
199 PBufState::Closed => self.rd == self.wr,
200 _ => false,
201 }
202 }
203}
204
205#[cfg(feature = "std")]
206#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
207impl Read for PipeBuf<u8> {
208 /// Read data from the pipe-buffer, as much as is available. The
209 /// following returns are possible:
210 ///
211 /// - `Ok(len)`: Some data was read
212 /// - `Ok(0)`: Successful end-of-file was reached
213 /// - `Err(e)` with `e.kind() == ErrorKind::WouldBlock`: No data available right now
214 /// - `Err(e)` with `e.kind() == ErrorKind::ConnectionAborted`: Aborted end-of-file was reached
215 fn read(&mut self, data: &mut [u8]) -> Result<usize, std::io::Error> {
216 let mut rd = self.rd();
217 if !rd.is_empty() {
218 let slice = rd.data();
219 let len = slice.len().min(data.len());
220 data[..len].copy_from_slice(&slice[..len]);
221 rd.consume(len);
222 Ok(len)
223 } else if rd.consume_eof() {
224 if rd.is_aborted() {
225 Err(ErrorKind::ConnectionAborted.into())
226 } else {
227 Ok(0)
228 }
229 } else {
230 Err(ErrorKind::WouldBlock.into())
231 }
232 }
233}
234
235#[cfg(feature = "std")]
236#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
237impl Write for PipeBuf<u8> {
238 /// Write data to the pipe-buffer. Never returns an error. For
239 /// variable-capacity, always succeeds. For fixed-capacity may
240 /// panic in case more data is written than there is space
241 /// available.
242 fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
243 let mut wr = self.wr();
244 let len = data.len();
245 let slice = wr.space(len);
246 slice.copy_from_slice(data);
247 wr.commit(len);
248 Ok(len)
249 }
250
251 /// Flush sets the "push" state on the [`PipeBuf`]
252 fn flush(&mut self) -> Result<(), std::io::Error> {
253 self.wr().push();
254 Ok(())
255 }
256}
257
258#[cfg(any(feature = "std", feature = "alloc"))]
259#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
260#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
261impl<T: Copy + Default + 'static> Default for PipeBuf<T> {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267/// End-of-file and "push" state of the buffer
268#[derive(Copy, Clone, Eq, PartialEq, Debug)]
269pub enum PBufState {
270 // Note that the values here are selected so that producer
271 // operations increase the value, and consumer operations decrease
272 // the value. This is necessary for `tripwire` to function
273 // correctly.
274 //
275 // Also values are selected so that typical `is_*` operations can
276 // optimise down to a single comparison.
277 //
278 /// End-of-file has not been reached yet. More data may follow.
279 Open = 0,
280 /// End-of-file has not been reached yet. More data may follow.
281 /// Producer has suggested that current data be flushed.
282 Push = 1,
283 /// The producer has reported a successful end-of-file. Any data
284 /// left in the buffer is the final data of the stream. The
285 /// consumer has not yet processed the EOF.
286 Closing = 3,
287 /// Successful end-of-file has been reported by the producer and
288 /// processed by the consumer.
289 Closed = 2,
290 /// The producer has reported end-of-file due to some error
291 /// condition. The data in the stream might be in an inconsistent
292 /// or incomplete state (e.g. a partial record, protocol not
293 /// terminated normally, etc). The consumer has not yet processed
294 /// the EOF.
295 Aborting = 5,
296 /// Abort end-of-file has been reported by the producer and
297 /// processed by the consumer.
298 Aborted = 4,
299}
300
301/// Tripwire value used to detect changes
302///
303/// This value is obtained using [`PipeBuf::tripwire`],
304/// [`PBufRd::tripwire`] or [`PBufWr::tripwire`], which all calculate
305/// the same value. See also the [`tripwire!`] macro which allows
306/// tuples of tripwire values to be created and compared.
307///
308/// The value will change in these cases:
309///
310/// - Data is written to the pipe
311/// - Data is read from the pipe
312/// - A "push" state is set or consumed
313/// - An EOF state is set or consumed
314///
315/// This value can be compared before and after some operation to
316/// detect whether a change has occurred. However that operation must
317/// be purely a consumer operation or purely a producer operation. If
318/// data is both produced and consumed, then the tripwire value may
319/// return to the same value and the change wouldn't be detected.
320///
321/// These scenarios are supported:
322///
323/// - In a consumer, avoiding re-parsing an input buffer when there
324/// have been no changes made by the producer. Save a `PBufTrip`
325/// value before returning, and when called the next time, compare it
326/// to the current value.
327///
328/// - In the glue code, detect whether a component call has caused
329/// changes to a buffer.
330///
331/// - In consumer code, check whether some sub-part of the consumer
332/// processing has done something.
333///
334/// - In producer code, check whether some sub-part of the producer
335/// processing has done something.
336///
337/// [`tripwire!`]: macro.tripwire.html
338#[derive(Eq, PartialEq, Copy, Clone)]
339pub struct PBufTrip(usize);
340
341#[cfg(test)]
342mod test {
343 // This test is here so that it can directly check inc/dec of
344 // tripwire values, which is not possible from outside the crate
345 #[cfg(any(feature = "std", feature = "alloc"))]
346 #[test]
347 fn tripwire() {
348 let mut p;
349 let mut t;
350
351 macro_rules! assert_inc {
352 () => {{
353 let n = p.tripwire();
354 assert!(t.0 < n.0, "Expecting increase: {} -> {}", t.0, n.0);
355 t = n;
356 }};
357 }
358 macro_rules! assert_dec {
359 () => {{
360 let n = p.tripwire();
361 assert!(t.0 > n.0, "Expecting decrease: {} -> {}", t.0, n.0);
362 t = n;
363 }};
364 }
365
366 p = super::PipeBuf::new();
367 t = p.tripwire();
368 p.wr().append(b"X");
369 assert_inc!();
370 p.rd().consume(1);
371 assert_dec!();
372 p.wr().push();
373 assert_inc!();
374 assert!(p.rd().consume_push());
375 assert_dec!();
376 p.wr().close();
377 assert_inc!();
378 assert!(p.rd().consume_eof());
379 assert_dec!();
380 let _ = t;
381
382 p = super::PipeBuf::default(); // Same as ::new()
383 t = p.tripwire();
384 p.wr().append(b"X");
385 assert_inc!();
386 p.wr().push();
387 assert_inc!();
388 assert!(p.rd().consume_push());
389 assert_dec!();
390 p.rd().consume(1);
391 assert_dec!();
392 p.wr().abort();
393 assert_inc!();
394 assert!(p.rd().consume_eof());
395 assert_dec!();
396
397 let _ = t;
398 }
399}