ergot_base/interface_manager/
std_utils.rs

1use std::sync::Arc;
2
3use bbq2::{
4    queue::BBQueue,
5    traits::{coordination::cas::AtomicCoord, notifier::maitake::MaiNotSpsc, storage::BoxedSlice},
6};
7
8#[derive(Debug, PartialEq)]
9pub enum ReceiverError {
10    SocketClosed,
11}
12
13pub(crate) type StdQueue = Arc<BBQueue<BoxedSlice, AtomicCoord, MaiNotSpsc>>;
14
15pub(crate) mod acc {
16    //! Basically postcard's cobs accumulator, but without the deser part
17
18    pub struct CobsAccumulator {
19        buf: Box<[u8]>,
20        idx: usize,
21    }
22
23    /// The result of feeding the accumulator.
24    pub enum FeedResult<'input, 'buf> {
25        /// Consumed all data, still pending.
26        Consumed,
27
28        /// Buffer was filled. Contains remaining section of input, if any.
29        OverFull(&'input [u8]),
30
31        /// Reached end of chunk, but deserialization failed. Contains remaining section of input, if.
32        /// any
33        DeserError(&'input [u8]),
34
35        Success {
36            /// Decoded data.
37            data: &'buf [u8],
38
39            /// Remaining data left in the buffer after deserializing.
40            remaining: &'input [u8],
41        },
42    }
43
44    impl CobsAccumulator {
45        /// Create a new accumulator.
46        pub fn new(sz: usize) -> Self {
47            CobsAccumulator {
48                buf: vec![0u8; sz].into_boxed_slice(),
49                idx: 0,
50            }
51        }
52
53        /// Appends data to the internal buffer and attempts to deserialize the accumulated data into
54        /// `T`.
55        ///
56        /// This differs from feed, as it allows the `T` to reference data within the internal buffer, but
57        /// mutably borrows the accumulator for the lifetime of the deserialization.
58        /// If `T` does not require the reference, the borrow of `self` ends at the end of the function.
59        pub fn feed_raw<'me, 'input>(
60            &'me mut self,
61            input: &'input [u8],
62        ) -> FeedResult<'input, 'me> {
63            if input.is_empty() {
64                return FeedResult::Consumed;
65            }
66
67            let zero_pos = input.iter().position(|&i| i == 0);
68            let max_len = self.buf.len();
69
70            if let Some(n) = zero_pos {
71                // Yes! We have an end of message here.
72                // Add one to include the zero in the "take" portion
73                // of the buffer, rather than in "release".
74                let (take, release) = input.split_at(n + 1);
75
76                // TODO(AJM): We could special case when idx == 0 to avoid copying
77                // into the dest buffer if there's a whole packet in the input
78
79                // Does it fit?
80                if (self.idx + take.len()) <= max_len {
81                    // Aw yiss - add to array
82                    self.extend_unchecked(take);
83
84                    let retval = match cobs::decode_in_place(&mut self.buf[..self.idx]) {
85                        Ok(ct) => FeedResult::Success {
86                            data: &self.buf[..ct],
87                            remaining: release,
88                        },
89                        Err(_) => FeedResult::DeserError(release),
90                    };
91                    self.idx = 0;
92                    retval
93                } else {
94                    self.idx = 0;
95                    FeedResult::OverFull(release)
96                }
97            } else {
98                // Does it fit?
99                if (self.idx + input.len()) > max_len {
100                    // nope
101                    let new_start = max_len - self.idx;
102                    self.idx = 0;
103                    FeedResult::OverFull(&input[new_start..])
104                } else {
105                    // yup!
106                    self.extend_unchecked(input);
107                    FeedResult::Consumed
108                }
109            }
110        }
111
112        /// Extend the internal buffer with the given input.
113        ///
114        /// # Panics
115        ///
116        /// Will panic if the input does not fit in the internal buffer.
117        fn extend_unchecked(&mut self, input: &[u8]) {
118            let new_end = self.idx + input.len();
119            self.buf[self.idx..new_end].copy_from_slice(input);
120            self.idx = new_end;
121        }
122    }
123}