tremor_codec/codec/
json.rs

1// Copyright 2020-2021, The Tremor Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The `json` codec supports the Javascript Object Notation format.
16//!
17//! Specification: [JSON](https://json.org).
18//!
19//! Deserialization supports minified and fat JSON. Duplicate keys are not preserved, consecutive duplicate keys overwrite previous ones.
20//!
21//! Serialization supports minified JSON only.
22//!
23//! The codec can be configured with a mode, either `sorted` or `unsorted`. The default is `unsorted` as it is singnificantly faster, `sorted` json is only needed in testing situations where the key order in maps matters for compairson.
24
25use crate::prelude::*;
26use simd_json::Buffers;
27use std::marker::PhantomData;
28use tremor_value::utils::sorted_serialize;
29
30/// Sorting for JSON
31pub trait Sorting: Sync + Send + Copy + Clone + 'static {
32    /// Is this codec sorted
33    const SORTED: bool;
34}
35
36/// Sorted
37#[derive(Clone, Copy, Debug)]
38pub struct Sorted {}
39impl Sorting for Sorted {
40    const SORTED: bool = true;
41}
42
43/// Unsorted
44#[derive(Clone, Copy, Debug)]
45pub struct Unsorted {}
46impl Sorting for Unsorted {
47    const SORTED: bool = false;
48}
49
50/// JSON codec
51pub struct Json<S: Sorting> {
52    _phantom: PhantomData<S>,
53    buffers: Buffers,
54    data_buf: Vec<u8>,
55}
56
57impl<S: Sorting> Clone for Json<S> {
58    fn clone(&self) -> Self {
59        Self::default()
60    }
61}
62
63impl<S: Sorting> Default for Json<S> {
64    fn default() -> Self {
65        Self {
66            _phantom: PhantomData,
67            buffers: Buffers::new(1024),
68            data_buf: Vec::new(),
69        }
70    }
71}
72
73pub(crate) fn from_config(config: Option<&Value>) -> Result<Box<dyn Codec>> {
74    match config.get_str("mode") {
75        Some("sorted") => Ok(Box::<Json<Sorted>>::default()),
76        None | Some("unsorted") => Ok(Box::<Json<Unsorted>>::default()),
77        Some(mode) => Err(format!(
78            "Unknown json codec mode: {mode}, can only be one of `sorted` or `unsorted`",
79        )
80        .into()),
81    }
82}
83#[async_trait::async_trait]
84impl<S: Sorting> Codec for Json<S> {
85    fn name(&self) -> &str {
86        if S::SORTED {
87            "sorted-json"
88        } else {
89            "json"
90        }
91    }
92
93    fn mime_types(&self) -> Vec<&'static str> {
94        vec!["application/json"]
95        // TODO: application/json-seq for one json doc per line?
96    }
97
98    async fn decode<'input>(
99        &mut self,
100        data: &'input mut [u8],
101        _ingest_ns: u64,
102        meta: Value<'input>,
103    ) -> Result<Option<(Value<'input>, Value<'input>)>> {
104        tremor_value::parse_to_value_with_buffers(data, &mut self.buffers)
105            .map(|v| Some((v, meta)))
106            .map_err(Error::from)
107    }
108    async fn encode(&mut self, data: &Value, _meta: &Value) -> Result<Vec<u8>> {
109        if S::SORTED {
110            Ok(sorted_serialize(data)?)
111        } else {
112            data.write(&mut self.data_buf)?;
113            let v = self.data_buf.clone();
114            self.data_buf.clear();
115            Ok(v)
116        }
117    }
118
119    fn boxed_clone(&self) -> Box<dyn Codec> {
120        Box::new(self.clone())
121    }
122}
123
124#[cfg(test)]
125mod test {
126    use super::*;
127    use tremor_value::literal;
128
129    #[tokio::test(flavor = "multi_thread")]
130    async fn decode() -> Result<()> {
131        let mut codec: Json<Unsorted> = Json {
132            buffers: Buffers::default(),
133            ..Default::default()
134        };
135        let expected = literal!({ "snot": "badger" });
136
137        let mut data = br#"{ "snot": "badger" }"#.to_vec();
138        let output = codec
139            .decode(&mut data, 42, Value::object())
140            .await?
141            .expect("no data");
142        assert_eq!(output.0, expected);
143
144        let mut codec = codec.clone();
145
146        let mut data = br#"{ "snot": "badger" }"#.to_vec();
147        let output = codec
148            .decode(&mut data, 42, Value::object())
149            .await?
150            .expect("no data");
151        assert_eq!(output.0, expected);
152
153        Ok(())
154    }
155
156    #[tokio::test(flavor = "multi_thread")]
157    async fn test_json_codec() -> Result<()> {
158        let seed = literal!({ "snot": "badger" });
159
160        let mut codec = Json::<Unsorted>::default();
161
162        let mut as_raw = codec.encode(&seed, &Value::const_null()).await?;
163        assert!(codec
164            .decode(as_raw.as_mut_slice(), 0, Value::object())
165            .await?
166            .is_some());
167
168        Ok(())
169    }
170    #[tokio::test(flavor = "multi_thread")]
171    async fn test_json_codec_sorted() -> Result<()> {
172        let seed = literal!({ "snot": "badger" });
173
174        let mut codec = Json::<Sorted>::default();
175
176        let mut as_raw = codec.encode(&seed, &Value::const_null()).await?;
177        assert!(codec
178            .decode(as_raw.as_mut_slice(), 0, Value::object())
179            .await?
180            .is_some());
181
182        Ok(())
183    }
184
185    #[tokio::test(flavor = "multi_thread")]
186    async fn duplicate_keys_unsorted() -> Result<()> {
187        let mut input = r#"{"key": 1, "key":2}"#.as_bytes().to_vec();
188        let mut codec = Json::<Unsorted>::default();
189        let res = codec
190            .decode(input.as_mut_slice(), 0, Value::object())
191            .await?
192            .expect("no data");
193        assert_eq!(literal!({"key": 2}), res.0); // duplicate keys are deduplicated with last-key-wins strategy
194
195        let serialized = codec.encode(&res.0, &Value::const_null()).await?;
196        assert_eq!(r#"{"key":2}"#.as_bytes(), serialized.as_slice());
197
198        Ok(())
199    }
200
201    #[tokio::test(flavor = "multi_thread")]
202    async fn duplicate_keys_sorted() -> Result<()> {
203        let mut input = r#"{"key": 1, "key":2}"#.as_bytes().to_vec();
204        let mut codec = Json::<Sorted>::default();
205        let res = codec
206            .decode(input.as_mut_slice(), 0, Value::object())
207            .await?
208            .expect("no data");
209        assert_eq!(literal!({"key": 2}), res.0); // duplicate keys are deduplicated with last-key-wins strategy
210        let serialized = codec.encode(&res.0, &Value::const_null()).await?;
211        assert_eq!(r#"{"key":2}"#.as_bytes(), serialized.as_slice());
212
213        Ok(())
214    }
215
216    #[tokio::test(flavor = "multi_thread")]
217    async fn duplicate_keys_into_static() -> Result<()> {
218        let mut input = r#"{"key": 1, "key":2}"#.as_bytes().to_vec();
219        let mut codec = Json::<Unsorted>::default();
220        let res = codec
221            .decode(input.as_mut_slice(), 0, Value::object())
222            .await?
223            .expect("no data");
224        assert_eq!(literal!({"key": 2}), res.0); // duplicate keys are deduplicated with last-key-wins strategy
225        let serialized = codec.encode(&res.0, &Value::const_null()).await?;
226        assert_eq!(r#"{"key":2}"#.as_bytes(), serialized.as_slice());
227
228        Ok(())
229    }
230}