json_lines/
codec.rs

1//! Provides implementations to use [tokio_util::codec]
2
3// The implementations here do not use the no-std capable variants in the other
4// modules in this crate. This should probably be fixed.
5
6use bytes::{buf::Buf, BytesMut};
7use tokio_util::codec::{Decoder, Encoder};
8
9use crate::{Error, Result};
10
11/// JSON Lines text format, also called newline-delimited JSON.
12pub struct JsonLinesCodec<D, S> {
13    deser: std::marker::PhantomData<D>,
14    ser: std::marker::PhantomData<S>,
15}
16
17impl<D, S> Default for JsonLinesCodec<D, S> {
18    fn default() -> Self {
19        Self {
20            deser: std::marker::PhantomData,
21            ser: std::marker::PhantomData,
22        }
23    }
24}
25
26impl<D, S> Decoder for JsonLinesCodec<D, S>
27where
28    D: serde::de::DeserializeOwned,
29{
30    type Item = D;
31    type Error = Error;
32
33    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>> {
34        if buf.is_empty() {
35            Ok(None)
36        } else {
37            // Could investigate potential optimization in which size of
38            // already-searched buffer is maintained.
39            loop {
40                // we loop until out of bytes or found actual JSON inside newline
41                if let Some(newline_offset) = memchr::memchr(b'\n', &buf[..]) {
42                    // Found a line!
43                    let to_parse = &buf[..newline_offset];
44                    if newline_offset == 0 {
45                        buf.advance(newline_offset + 1);
46                        // try again
47                        continue;
48                    }
49                    debug_assert!(buf[newline_offset] == b'\n');
50                    match serde_json::from_slice(&to_parse[..newline_offset]) {
51                        Ok(msg) => {
52                            buf.advance(newline_offset + 1);
53                            return Ok(Some(msg));
54                        }
55                        Err(_e) => {
56                            buf.advance(newline_offset + 1);
57                            // If decode fails, we should still advance the buffer.
58                            //
59                            // In case of error, we want to advance our place in the buffer so that
60                            // we don't attempt to re-parse this bad data again.
61                            return Err(Error::DeserializeJson);
62                        }
63                    }
64                } else {
65                    // No newline, so stop trying and wait until we have more bytes.
66                    return Ok(None);
67                }
68            }
69        }
70    }
71}
72
73// We encode `T` and not `&T` because we do not want to deal with
74// the lifetime issues (this is used in async contexts.)
75impl<D, S> Encoder<S> for JsonLinesCodec<D, S>
76where
77    S: serde::Serialize,
78{
79    type Error = Error;
80    fn encode(&mut self, msg: S, final_buf: &mut BytesMut) -> Result<()> {
81        let mut v = serde_json::to_vec(&msg).map_err(|_| Error::SerializeJson)?;
82        if memchr::memchr(b'\n', &v).is_some() {
83            return Err(Error::NewlineInData);
84        }
85        v.push(b'\n');
86        final_buf.extend_from_slice(&v);
87        Ok(())
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94    use serde::{Deserialize, Serialize};
95
96    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
97    struct MyStruct {
98        val1: u8,
99        val2: u8,
100    }
101
102    #[test]
103    fn roundtrip() -> Result<()> {
104        let msg1 = MyStruct { val1: 12, val2: 34 };
105        let msg2 = MyStruct { val1: 56, val2: 78 };
106        let mut bytes = BytesMut::new();
107        let mut codec = JsonLinesCodec::default();
108        codec.encode(msg1.clone(), &mut bytes)?;
109        codec.encode(msg2.clone(), &mut bytes)?;
110        let found1: Option<MyStruct> = codec.decode(&mut bytes)?;
111        let found2: Option<MyStruct> = codec.decode(&mut bytes)?;
112        assert_eq!(found1, Some(msg1));
113        assert_eq!(found2, Some(msg2));
114        Ok(())
115    }
116}