ergot_base/interface_manager/utils/std.rs
1use std::sync::Arc;
2
3use bbq2::{
4 queue::BBQueue,
5 traits::{coordination::cas::AtomicCoord, notifier::maitake::MaiNotSpsc, storage::BoxedSlice},
6};
7
8#[derive(Debug, PartialEq)]
9pub enum ReceiverError {
10 SocketClosed,
11}
12
13/// A type alias for the kind of queue used on std devices.
14pub type StdQueue = Arc<BBQueue<BoxedSlice, AtomicCoord, MaiNotSpsc>>;
15
16/// Create a new StdQueue with the given buffer size
17pub fn new_std_queue(buffer: usize) -> StdQueue {
18 Arc::new(BBQueue::new_with_storage(BoxedSlice::new(buffer)))
19}
20
21pub(crate) mod acc {
22 //! Basically postcard's cobs accumulator, but without the deser part
23
24 pub struct CobsAccumulator {
25 buf: Box<[u8]>,
26 idx: usize,
27 }
28
29 /// The result of feeding the accumulator.
30 pub enum FeedResult<'input, 'buf> {
31 /// Consumed all data, still pending.
32 Consumed,
33
34 /// Buffer was filled. Contains remaining section of input, if any.
35 OverFull(&'input [u8]),
36
37 /// Reached end of chunk, but deserialization failed. Contains remaining section of input, if.
38 /// any
39 DeserError(&'input [u8]),
40
41 Success {
42 /// Decoded data.
43 data: &'buf [u8],
44
45 /// Remaining data left in the buffer after deserializing.
46 remaining: &'input [u8],
47 },
48 }
49
50 impl CobsAccumulator {
51 /// Create a new accumulator.
52 pub fn new(sz: usize) -> Self {
53 CobsAccumulator {
54 buf: vec![0u8; sz].into_boxed_slice(),
55 idx: 0,
56 }
57 }
58
59 /// Appends data to the internal buffer and attempts to deserialize the accumulated data into
60 /// `T`.
61 ///
62 /// This differs from feed, as it allows the `T` to reference data within the internal buffer, but
63 /// mutably borrows the accumulator for the lifetime of the deserialization.
64 /// If `T` does not require the reference, the borrow of `self` ends at the end of the function.
65 pub fn feed_raw<'me, 'input>(
66 &'me mut self,
67 input: &'input [u8],
68 ) -> FeedResult<'input, 'me> {
69 if input.is_empty() {
70 return FeedResult::Consumed;
71 }
72
73 let zero_pos = input.iter().position(|&i| i == 0);
74 let max_len = self.buf.len();
75
76 if let Some(n) = zero_pos {
77 // Yes! We have an end of message here.
78 // Add one to include the zero in the "take" portion
79 // of the buffer, rather than in "release".
80 let (take, release) = input.split_at(n + 1);
81
82 // TODO(AJM): We could special case when idx == 0 to avoid copying
83 // into the dest buffer if there's a whole packet in the input
84
85 // Does it fit?
86 if (self.idx + take.len()) <= max_len {
87 // Aw yiss - add to array
88 self.extend_unchecked(take);
89
90 let retval = match cobs::decode_in_place(&mut self.buf[..self.idx]) {
91 Ok(ct) => FeedResult::Success {
92 data: &self.buf[..ct],
93 remaining: release,
94 },
95 Err(_) => FeedResult::DeserError(release),
96 };
97 self.idx = 0;
98 retval
99 } else {
100 self.idx = 0;
101 FeedResult::OverFull(release)
102 }
103 } else {
104 // Does it fit?
105 if (self.idx + input.len()) > max_len {
106 // nope
107 let new_start = max_len - self.idx;
108 self.idx = 0;
109 FeedResult::OverFull(&input[new_start..])
110 } else {
111 // yup!
112 self.extend_unchecked(input);
113 FeedResult::Consumed
114 }
115 }
116 }
117
118 /// Extend the internal buffer with the given input.
119 ///
120 /// # Panics
121 ///
122 /// Will panic if the input does not fit in the internal buffer.
123 fn extend_unchecked(&mut self, input: &[u8]) {
124 let new_end = self.idx + input.len();
125 self.buf[self.idx..new_end].copy_from_slice(input);
126 self.idx = new_end;
127 }
128 }
129}