json_lines/accumulator.rs
1//! An accumulator used to collect chunked newline-separated data and deserialize it.
2
3// The code in this module is modified from
4// [postcard](https://crates.io/crates/postcard).
5
6use serde::Deserialize;
7
8/// An accumulator used to collect chunked newline data and deserialize it.
9///
10/// This is often useful when you receive "parts" of the message at a time, for example when draining
11/// a serial port buffer that may not contain an entire uninterrupted message.
12///
13#[cfg_attr(feature = "use-defmt", derive(defmt::Format))]
14pub struct NewlinesAccumulator<const N: usize> {
15 buf: [u8; N],
16 idx: usize,
17}
18
19/// The result of feeding the accumulator.
20#[cfg_attr(feature = "use-defmt", derive(defmt::Format))]
21pub enum FeedResult<'a, T> {
22 /// Consumed all data, still pending.
23 Consumed,
24
25 /// Buffer was filled. Contains remaining section of input, if any.
26 OverFull(&'a [u8]),
27
28 /// Reached end of chunk, but deserialization failed. Contains remaining section of input, if.
29 /// any
30 DeserError(&'a [u8]),
31
32 /// Deserialization complete. Contains deserialized data and remaining section of input, if any.
33 Success {
34 /// Deserialize data.
35 data: T,
36
37 /// Remaining data left in the buffer after deserializing.
38 remaining: &'a [u8],
39 },
40}
41
42impl<const N: usize> NewlinesAccumulator<N> {
43 /// Create a new accumulator.
44 pub const fn new() -> Self {
45 Self {
46 buf: [0; N],
47 idx: 0,
48 }
49 }
50
51 /// Appends data to the internal buffer and attempts to deserialize the accumulated data into
52 /// `T`.
53 #[inline]
54 pub fn feed<'a, T>(&mut self, input: &'a [u8]) -> FeedResult<'a, T>
55 where
56 T: for<'de> Deserialize<'de>,
57 {
58 self.feed_ref(input)
59 }
60
61 /// Appends data to the internal buffer and attempts to deserialize the accumulated data into
62 /// `T`.
63 ///
64 /// This differs from feed, as it allows the `T` to reference data within the internal buffer, but
65 /// mutably borrows the accumulator for the lifetime of the deserialization.
66 /// If `T` does not require the reference, the borrow of `self` ends at the end of the function.
67 pub fn feed_ref<'de, 'a, T>(&'de mut self, input: &'a [u8]) -> FeedResult<'a, T>
68 where
69 T: Deserialize<'de>,
70 {
71 if input.is_empty() {
72 return FeedResult::Consumed;
73 }
74
75 let newline_pos = input.iter().position(|&i| i == b'\n');
76
77 if let Some(n) = newline_pos {
78 // Yes! We have an end of message here.
79 // Add one to include the newline in the "take" portion
80 // of the buffer, rather than in "release".
81 let (take, release) = input.split_at(n + 1);
82
83 // Does it fit?
84 if (self.idx + take.len()) <= N {
85 // Aw yiss - add to array
86 self.extend_unchecked(take);
87
88 let json_buf_len = self.idx - 1; // newline is not JSON-encoded
89 let retval = match crate::from_bytes::<T>(&mut self.buf[..json_buf_len]) {
90 Ok(t) => FeedResult::Success {
91 data: t,
92 remaining: release,
93 },
94 Err(_) => FeedResult::DeserError(release),
95 };
96 self.idx = 0;
97 retval
98 } else {
99 self.idx = 0;
100 FeedResult::OverFull(release)
101 }
102 } else {
103 // Does it fit?
104 if (self.idx + input.len()) > N {
105 // nope
106 let new_start = N - self.idx;
107 self.idx = 0;
108 FeedResult::OverFull(&input[new_start..])
109 } else {
110 // yup!
111 self.extend_unchecked(input);
112 FeedResult::Consumed
113 }
114 }
115 }
116
117 /// Extend the internal buffer with the given input.
118 ///
119 /// # Panics
120 ///
121 /// Will panic if the input does not fit in the internal buffer.
122 fn extend_unchecked(&mut self, input: &[u8]) {
123 let new_end = self.idx + input.len();
124 self.buf[self.idx..new_end].copy_from_slice(input);
125 self.idx = new_end;
126 }
127}