tremor_codec/codec/
json.rs1use crate::prelude::*;
26use simd_json::Buffers;
27use std::marker::PhantomData;
28use tremor_value::utils::sorted_serialize;
29
30pub trait Sorting: Sync + Send + Copy + Clone + 'static {
32 const SORTED: bool;
34}
35
36#[derive(Clone, Copy, Debug)]
38pub struct Sorted {}
39impl Sorting for Sorted {
40 const SORTED: bool = true;
41}
42
43#[derive(Clone, Copy, Debug)]
45pub struct Unsorted {}
46impl Sorting for Unsorted {
47 const SORTED: bool = false;
48}
49
50pub struct Json<S: Sorting> {
52 _phantom: PhantomData<S>,
53 buffers: Buffers,
54 data_buf: Vec<u8>,
55}
56
57impl<S: Sorting> Clone for Json<S> {
58 fn clone(&self) -> Self {
59 Self::default()
60 }
61}
62
63impl<S: Sorting> Default for Json<S> {
64 fn default() -> Self {
65 Self {
66 _phantom: PhantomData,
67 buffers: Buffers::new(1024),
68 data_buf: Vec::new(),
69 }
70 }
71}
72
73pub(crate) fn from_config(config: Option<&Value>) -> Result<Box<dyn Codec>> {
74 match config.get_str("mode") {
75 Some("sorted") => Ok(Box::<Json<Sorted>>::default()),
76 None | Some("unsorted") => Ok(Box::<Json<Unsorted>>::default()),
77 Some(mode) => Err(format!(
78 "Unknown json codec mode: {mode}, can only be one of `sorted` or `unsorted`",
79 )
80 .into()),
81 }
82}
83#[async_trait::async_trait]
84impl<S: Sorting> Codec for Json<S> {
85 fn name(&self) -> &str {
86 if S::SORTED {
87 "sorted-json"
88 } else {
89 "json"
90 }
91 }
92
93 fn mime_types(&self) -> Vec<&'static str> {
94 vec!["application/json"]
95 }
97
98 async fn decode<'input>(
99 &mut self,
100 data: &'input mut [u8],
101 _ingest_ns: u64,
102 meta: Value<'input>,
103 ) -> Result<Option<(Value<'input>, Value<'input>)>> {
104 tremor_value::parse_to_value_with_buffers(data, &mut self.buffers)
105 .map(|v| Some((v, meta)))
106 .map_err(Error::from)
107 }
108 async fn encode(&mut self, data: &Value, _meta: &Value) -> Result<Vec<u8>> {
109 if S::SORTED {
110 Ok(sorted_serialize(data)?)
111 } else {
112 data.write(&mut self.data_buf)?;
113 let v = self.data_buf.clone();
114 self.data_buf.clear();
115 Ok(v)
116 }
117 }
118
119 fn boxed_clone(&self) -> Box<dyn Codec> {
120 Box::new(self.clone())
121 }
122}
123
124#[cfg(test)]
125mod test {
126 use super::*;
127 use tremor_value::literal;
128
129 #[tokio::test(flavor = "multi_thread")]
130 async fn decode() -> Result<()> {
131 let mut codec: Json<Unsorted> = Json {
132 buffers: Buffers::default(),
133 ..Default::default()
134 };
135 let expected = literal!({ "snot": "badger" });
136
137 let mut data = br#"{ "snot": "badger" }"#.to_vec();
138 let output = codec
139 .decode(&mut data, 42, Value::object())
140 .await?
141 .expect("no data");
142 assert_eq!(output.0, expected);
143
144 let mut codec = codec.clone();
145
146 let mut data = br#"{ "snot": "badger" }"#.to_vec();
147 let output = codec
148 .decode(&mut data, 42, Value::object())
149 .await?
150 .expect("no data");
151 assert_eq!(output.0, expected);
152
153 Ok(())
154 }
155
156 #[tokio::test(flavor = "multi_thread")]
157 async fn test_json_codec() -> Result<()> {
158 let seed = literal!({ "snot": "badger" });
159
160 let mut codec = Json::<Unsorted>::default();
161
162 let mut as_raw = codec.encode(&seed, &Value::const_null()).await?;
163 assert!(codec
164 .decode(as_raw.as_mut_slice(), 0, Value::object())
165 .await?
166 .is_some());
167
168 Ok(())
169 }
170 #[tokio::test(flavor = "multi_thread")]
171 async fn test_json_codec_sorted() -> Result<()> {
172 let seed = literal!({ "snot": "badger" });
173
174 let mut codec = Json::<Sorted>::default();
175
176 let mut as_raw = codec.encode(&seed, &Value::const_null()).await?;
177 assert!(codec
178 .decode(as_raw.as_mut_slice(), 0, Value::object())
179 .await?
180 .is_some());
181
182 Ok(())
183 }
184
185 #[tokio::test(flavor = "multi_thread")]
186 async fn duplicate_keys_unsorted() -> Result<()> {
187 let mut input = r#"{"key": 1, "key":2}"#.as_bytes().to_vec();
188 let mut codec = Json::<Unsorted>::default();
189 let res = codec
190 .decode(input.as_mut_slice(), 0, Value::object())
191 .await?
192 .expect("no data");
193 assert_eq!(literal!({"key": 2}), res.0); let serialized = codec.encode(&res.0, &Value::const_null()).await?;
196 assert_eq!(r#"{"key":2}"#.as_bytes(), serialized.as_slice());
197
198 Ok(())
199 }
200
201 #[tokio::test(flavor = "multi_thread")]
202 async fn duplicate_keys_sorted() -> Result<()> {
203 let mut input = r#"{"key": 1, "key":2}"#.as_bytes().to_vec();
204 let mut codec = Json::<Sorted>::default();
205 let res = codec
206 .decode(input.as_mut_slice(), 0, Value::object())
207 .await?
208 .expect("no data");
209 assert_eq!(literal!({"key": 2}), res.0); let serialized = codec.encode(&res.0, &Value::const_null()).await?;
211 assert_eq!(r#"{"key":2}"#.as_bytes(), serialized.as_slice());
212
213 Ok(())
214 }
215
216 #[tokio::test(flavor = "multi_thread")]
217 async fn duplicate_keys_into_static() -> Result<()> {
218 let mut input = r#"{"key": 1, "key":2}"#.as_bytes().to_vec();
219 let mut codec = Json::<Unsorted>::default();
220 let res = codec
221 .decode(input.as_mut_slice(), 0, Value::object())
222 .await?
223 .expect("no data");
224 assert_eq!(literal!({"key": 2}), res.0); let serialized = codec.encode(&res.0, &Value::const_null()).await?;
226 assert_eq!(r#"{"key":2}"#.as_bytes(), serialized.as_slice());
227
228 Ok(())
229 }
230}