ergot_base/interface_manager/std_utils.rs
1use std::sync::Arc;
2
3use bbq2::{
4 queue::BBQueue,
5 traits::{coordination::cas::AtomicCoord, notifier::maitake::MaiNotSpsc, storage::BoxedSlice},
6};
7
8#[derive(Debug, PartialEq)]
9pub enum ReceiverError {
10 SocketClosed,
11}
12
13pub(crate) type StdQueue = Arc<BBQueue<BoxedSlice, AtomicCoord, MaiNotSpsc>>;
14
15pub(crate) mod acc {
16 //! Basically postcard's cobs accumulator, but without the deser part
17
18 pub struct CobsAccumulator {
19 buf: Box<[u8]>,
20 idx: usize,
21 }
22
23 /// The result of feeding the accumulator.
24 pub enum FeedResult<'input, 'buf> {
25 /// Consumed all data, still pending.
26 Consumed,
27
28 /// Buffer was filled. Contains remaining section of input, if any.
29 OverFull(&'input [u8]),
30
31 /// Reached end of chunk, but deserialization failed. Contains remaining section of input, if.
32 /// any
33 DeserError(&'input [u8]),
34
35 Success {
36 /// Decoded data.
37 data: &'buf [u8],
38
39 /// Remaining data left in the buffer after deserializing.
40 remaining: &'input [u8],
41 },
42 }
43
44 impl CobsAccumulator {
45 /// Create a new accumulator.
46 pub fn new(sz: usize) -> Self {
47 CobsAccumulator {
48 buf: vec![0u8; sz].into_boxed_slice(),
49 idx: 0,
50 }
51 }
52
53 /// Appends data to the internal buffer and attempts to deserialize the accumulated data into
54 /// `T`.
55 ///
56 /// This differs from feed, as it allows the `T` to reference data within the internal buffer, but
57 /// mutably borrows the accumulator for the lifetime of the deserialization.
58 /// If `T` does not require the reference, the borrow of `self` ends at the end of the function.
59 pub fn feed_raw<'me, 'input>(
60 &'me mut self,
61 input: &'input [u8],
62 ) -> FeedResult<'input, 'me> {
63 if input.is_empty() {
64 return FeedResult::Consumed;
65 }
66
67 let zero_pos = input.iter().position(|&i| i == 0);
68 let max_len = self.buf.len();
69
70 if let Some(n) = zero_pos {
71 // Yes! We have an end of message here.
72 // Add one to include the zero in the "take" portion
73 // of the buffer, rather than in "release".
74 let (take, release) = input.split_at(n + 1);
75
76 // TODO(AJM): We could special case when idx == 0 to avoid copying
77 // into the dest buffer if there's a whole packet in the input
78
79 // Does it fit?
80 if (self.idx + take.len()) <= max_len {
81 // Aw yiss - add to array
82 self.extend_unchecked(take);
83
84 let retval = match cobs::decode_in_place(&mut self.buf[..self.idx]) {
85 Ok(ct) => FeedResult::Success {
86 data: &self.buf[..ct],
87 remaining: release,
88 },
89 Err(_) => FeedResult::DeserError(release),
90 };
91 self.idx = 0;
92 retval
93 } else {
94 self.idx = 0;
95 FeedResult::OverFull(release)
96 }
97 } else {
98 // Does it fit?
99 if (self.idx + input.len()) > max_len {
100 // nope
101 let new_start = max_len - self.idx;
102 self.idx = 0;
103 FeedResult::OverFull(&input[new_start..])
104 } else {
105 // yup!
106 self.extend_unchecked(input);
107 FeedResult::Consumed
108 }
109 }
110 }
111
112 /// Extend the internal buffer with the given input.
113 ///
114 /// # Panics
115 ///
116 /// Will panic if the input does not fit in the internal buffer.
117 fn extend_unchecked(&mut self, input: &[u8]) {
118 let new_end = self.idx + input.len();
119 self.buf[self.idx..new_end].copy_from_slice(input);
120 self.idx = new_end;
121 }
122 }
123}