json_lines/
accumulator.rs

1//! An accumulator used to collect chunked newline-separated data and deserialize it.
2
3// The code in this module is modified from
4// [postcard](https://crates.io/crates/postcard).
5
6use serde::Deserialize;
7
8/// An accumulator used to collect chunked newline data and deserialize it.
9///
10/// This is often useful when you receive "parts" of the message at a time, for example when draining
11/// a serial port buffer that may not contain an entire uninterrupted message.
12///
13#[cfg_attr(feature = "use-defmt", derive(defmt::Format))]
14pub struct NewlinesAccumulator<const N: usize> {
15    buf: [u8; N],
16    idx: usize,
17}
18
19/// The result of feeding the accumulator.
20#[cfg_attr(feature = "use-defmt", derive(defmt::Format))]
21pub enum FeedResult<'a, T> {
22    /// Consumed all data, still pending.
23    Consumed,
24
25    /// Buffer was filled. Contains remaining section of input, if any.
26    OverFull(&'a [u8]),
27
28    /// Reached end of chunk, but deserialization failed. Contains remaining section of input, if.
29    /// any
30    DeserError(&'a [u8]),
31
32    /// Deserialization complete. Contains deserialized data and remaining section of input, if any.
33    Success {
34        /// Deserialize data.
35        data: T,
36
37        /// Remaining data left in the buffer after deserializing.
38        remaining: &'a [u8],
39    },
40}
41
42impl<const N: usize> NewlinesAccumulator<N> {
43    /// Create a new accumulator.
44    pub const fn new() -> Self {
45        Self {
46            buf: [0; N],
47            idx: 0,
48        }
49    }
50
51    /// Appends data to the internal buffer and attempts to deserialize the accumulated data into
52    /// `T`.
53    #[inline]
54    pub fn feed<'a, T>(&mut self, input: &'a [u8]) -> FeedResult<'a, T>
55    where
56        T: for<'de> Deserialize<'de>,
57    {
58        self.feed_ref(input)
59    }
60
61    /// Appends data to the internal buffer and attempts to deserialize the accumulated data into
62    /// `T`.
63    ///
64    /// This differs from feed, as it allows the `T` to reference data within the internal buffer, but
65    /// mutably borrows the accumulator for the lifetime of the deserialization.
66    /// If `T` does not require the reference, the borrow of `self` ends at the end of the function.
67    pub fn feed_ref<'de, 'a, T>(&'de mut self, input: &'a [u8]) -> FeedResult<'a, T>
68    where
69        T: Deserialize<'de>,
70    {
71        if input.is_empty() {
72            return FeedResult::Consumed;
73        }
74
75        let newline_pos = input.iter().position(|&i| i == b'\n');
76
77        if let Some(n) = newline_pos {
78            // Yes! We have an end of message here.
79            // Add one to include the newline in the "take" portion
80            // of the buffer, rather than in "release".
81            let (take, release) = input.split_at(n + 1);
82
83            // Does it fit?
84            if (self.idx + take.len()) <= N {
85                // Aw yiss - add to array
86                self.extend_unchecked(take);
87
88                let json_buf_len = self.idx - 1; // newline is not JSON-encoded
89                let retval = match crate::from_bytes::<T>(&mut self.buf[..json_buf_len]) {
90                    Ok(t) => FeedResult::Success {
91                        data: t,
92                        remaining: release,
93                    },
94                    Err(_) => FeedResult::DeserError(release),
95                };
96                self.idx = 0;
97                retval
98            } else {
99                self.idx = 0;
100                FeedResult::OverFull(release)
101            }
102        } else {
103            // Does it fit?
104            if (self.idx + input.len()) > N {
105                // nope
106                let new_start = N - self.idx;
107                self.idx = 0;
108                FeedResult::OverFull(&input[new_start..])
109            } else {
110                // yup!
111                self.extend_unchecked(input);
112                FeedResult::Consumed
113            }
114        }
115    }
116
117    /// Extend the internal buffer with the given input.
118    ///
119    /// # Panics
120    ///
121    /// Will panic if the input does not fit in the internal buffer.
122    fn extend_unchecked(&mut self, input: &[u8]) {
123        let new_end = self.idx + input.len();
124        self.buf[self.idx..new_end].copy_from_slice(input);
125        self.idx = new_end;
126    }
127}