Skip to main content

destream_json/en/
mod.rs

1//! Encode a Rust data structure into JSON data.
2
3use std::collections::VecDeque;
4use std::fmt;
5use std::mem;
6use std::pin::Pin;
7
8use bytes::{BufMut, Bytes, BytesMut};
9use destream::en::{self, IntoStream};
10use futures::future;
11use futures::stream::{Stream, StreamExt};
12use uuid::Uuid;
13
14use crate::constants::*;
15
16mod stream;
17
18/// A [`Stream`] of JSON-encoded data
19pub type JSONStream<'en> = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en>>;
20
21/// An error encountered while encoding a stream.
22pub struct Error {
23    message: String,
24}
25
26impl std::error::Error for Error {}
27
28impl en::Error for Error {
29    fn custom<I: fmt::Display>(info: I) -> Self {
30        let message = info.to_string();
31        Self { message }
32    }
33}
34
35impl fmt::Debug for Error {
36    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37        fmt::Display::fmt(self, f)
38    }
39}
40
41impl fmt::Display for Error {
42    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43        f.write_str(&self.message)
44    }
45}
46
47struct MapEncoder<'en> {
48    pending_key: Option<JSONStream<'en>>,
49    entries: VecDeque<(JSONStream<'en>, JSONStream<'en>)>,
50}
51
52impl<'en> MapEncoder<'en> {
53    #[inline]
54    fn new(size_hint: Option<usize>) -> Self {
55        let entries = if let Some(len) = size_hint {
56            VecDeque::with_capacity(len)
57        } else {
58            VecDeque::new()
59        };
60
61        Self {
62            pending_key: None,
63            entries,
64        }
65    }
66}
67
68impl<'en> en::EncodeMap<'en> for MapEncoder<'en> {
69    type Ok = JSONStream<'en>;
70    type Error = Error;
71
72    #[inline]
73    fn encode_key<T: IntoStream<'en> + 'en>(&mut self, key: T) -> Result<(), Self::Error> {
74        if self.pending_key.is_none() {
75            self.pending_key = Some(key.into_stream(Encoder)?);
76            Ok(())
77        } else {
78            Err(en::Error::custom(
79                "You must call encode_value before calling encode_key again",
80            ))
81        }
82    }
83
84    #[inline]
85    fn encode_value<T: IntoStream<'en> + 'en>(&mut self, value: T) -> Result<(), Self::Error> {
86        if self.pending_key.is_none() {
87            return Err(en::Error::custom(
88                "You must call encode_key before encode_value",
89            ));
90        }
91
92        let value = value.into_stream(Encoder)?;
93
94        let mut key = None;
95        mem::swap(&mut self.pending_key, &mut key);
96
97        self.entries.push_back((key.unwrap(), value));
98        Ok(())
99    }
100
101    fn end(mut self) -> Result<Self::Ok, Self::Error> {
102        if self.pending_key.is_some() {
103            return Err(en::Error::custom(
104                "You must call encode_value after calling encode_key",
105            ));
106        }
107
108        let mut encoded = delimiter(MAP_BEGIN);
109
110        while let Some((key, value)) = self.entries.pop_front() {
111            encoded = Box::pin(encoded.chain(key).chain(delimiter(COLON)).chain(value));
112
113            if !self.entries.is_empty() {
114                encoded = Box::pin(encoded.chain(delimiter(COMMA)));
115            }
116        }
117
118        encoded = Box::pin(encoded.chain(delimiter(MAP_END)));
119        Ok(encoded)
120    }
121}
122
123struct SequenceEncoder<'en> {
124    items: VecDeque<JSONStream<'en>>,
125}
126
127impl<'en> SequenceEncoder<'en> {
128    #[inline]
129    fn new(size_hint: Option<usize>) -> Self {
130        let items = if let Some(len) = size_hint {
131            VecDeque::with_capacity(len)
132        } else {
133            VecDeque::new()
134        };
135
136        Self { items }
137    }
138
139    #[inline]
140    fn push(&mut self, value: JSONStream<'en>) {
141        self.items.push_back(value);
142    }
143
144    fn encode(mut self) -> Result<JSONStream<'en>, Error> {
145        let mut encoded = delimiter(LIST_BEGIN);
146
147        while let Some(item) = self.items.pop_front() {
148            encoded = Box::pin(encoded.chain(item));
149
150            if !self.items.is_empty() {
151                encoded = Box::pin(encoded.chain(delimiter(COMMA)));
152            }
153        }
154
155        encoded = Box::pin(encoded.chain(delimiter(LIST_END)));
156        Ok(encoded)
157    }
158}
159
160impl<'en> en::EncodeSeq<'en> for SequenceEncoder<'en> {
161    type Ok = JSONStream<'en>;
162    type Error = Error;
163
164    #[inline]
165    fn encode_element<T: IntoStream<'en> + 'en>(&mut self, value: T) -> Result<(), Self::Error> {
166        let encoded = value.into_stream(Encoder)?;
167        self.push(encoded);
168        Ok(())
169    }
170
171    fn end(self) -> Result<Self::Ok, Self::Error> {
172        self.encode()
173    }
174}
175
176impl<'en> en::EncodeTuple<'en> for SequenceEncoder<'en> {
177    type Ok = JSONStream<'en>;
178    type Error = Error;
179
180    #[inline]
181    fn encode_element<T: IntoStream<'en> + 'en>(&mut self, value: T) -> Result<(), Self::Error> {
182        let encoded = value.into_stream(Encoder)?;
183        self.push(encoded);
184        Ok(())
185    }
186
187    fn end(self) -> Result<Self::Ok, Self::Error> {
188        self.encode()
189    }
190}
191
192struct Encoder;
193
194impl Encoder {
195    fn encode_array<
196        'en,
197        E: IntoStream<'en> + 'en,
198        T: IntoIterator<Item = E> + Send + Unpin + 'en,
199        S: Stream<Item = T> + Send + Unpin + 'en,
200    >(
201        self,
202        chunks: S,
203    ) -> Result<JSONStream<'en>, Error>
204    where
205        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
206    {
207        let sequence = chunks.map(futures::stream::iter).flatten();
208
209        en::Encoder::encode_seq_stream(self, sequence)
210    }
211}
212
213impl<'en> en::Encoder<'en> for Encoder {
214    type Ok = JSONStream<'en>;
215    type Error = Error;
216    type EncodeMap = MapEncoder<'en>;
217    type EncodeSeq = SequenceEncoder<'en>;
218    type EncodeTuple = SequenceEncoder<'en>;
219
220    #[inline]
221    fn encode_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
222        Ok(encode_fmt(v))
223    }
224
225    #[inline]
226    fn encode_bytes<B: Into<Bytes>>(self, v: B) -> Result<Self::Ok, Self::Error> {
227        use base64::engine::general_purpose::STANDARD;
228        use base64::engine::Engine;
229
230        self.encode_str(&STANDARD.encode(v.into()))
231    }
232
233    #[inline]
234    fn encode_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
235        Ok(encode_fmt(v))
236    }
237
238    #[inline]
239    fn encode_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
240        Ok(encode_fmt(v))
241    }
242
243    #[inline]
244    fn encode_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
245        Ok(encode_fmt(v))
246    }
247
248    #[inline]
249    fn encode_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
250        Ok(encode_fmt(v))
251    }
252
253    #[inline]
254    fn encode_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
255        Ok(encode_fmt(v))
256    }
257
258    #[inline]
259    fn encode_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
260        Ok(encode_fmt(v))
261    }
262
263    #[inline]
264    fn encode_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
265        Ok(encode_fmt(v))
266    }
267
268    #[inline]
269    fn encode_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
270        Ok(encode_fmt(v))
271    }
272
273    #[inline]
274    fn encode_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
275        if v.is_nan() || v.is_infinite() {
276            Err(en::Error::custom(format!(
277                "JSON encoding does not support floating-point value {}",
278                v
279            )))
280        } else {
281            Ok(encode_fmt(v))
282        }
283    }
284
285    #[inline]
286    fn encode_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
287        if v.is_nan() || v.is_infinite() {
288            Err(en::Error::custom(format!(
289                "JSON encoding does not support floating-point value {}",
290                v
291            )))
292        } else {
293            Ok(encode_fmt(v))
294        }
295    }
296
297    fn encode_array_bool<
298        T: IntoIterator<Item = bool> + Send + Unpin + 'en,
299        S: Stream<Item = T> + Send + Unpin + 'en,
300    >(
301        self,
302        chunks: S,
303    ) -> Result<Self::Ok, Self::Error>
304    where
305        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
306    {
307        self.encode_array(chunks)
308    }
309
310    fn encode_array_i8<
311        T: IntoIterator<Item = i8> + Send + Unpin + 'en,
312        S: Stream<Item = T> + Send + Unpin + 'en,
313    >(
314        self,
315        chunks: S,
316    ) -> Result<Self::Ok, Self::Error>
317    where
318        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
319    {
320        self.encode_array(chunks)
321    }
322
323    fn encode_array_i16<
324        T: IntoIterator<Item = i16> + Send + Unpin + 'en,
325        S: Stream<Item = T> + Send + Unpin + 'en,
326    >(
327        self,
328        chunks: S,
329    ) -> Result<Self::Ok, Self::Error>
330    where
331        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
332    {
333        self.encode_array(chunks)
334    }
335
336    fn encode_array_i32<
337        T: IntoIterator<Item = i32> + Send + Unpin + 'en,
338        S: Stream<Item = T> + Send + Unpin + 'en,
339    >(
340        self,
341        chunks: S,
342    ) -> Result<Self::Ok, Self::Error>
343    where
344        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
345    {
346        self.encode_array(chunks)
347    }
348
349    fn encode_array_i64<
350        T: IntoIterator<Item = i64> + Send + Unpin + 'en,
351        S: Stream<Item = T> + Send + Unpin + 'en,
352    >(
353        self,
354        chunks: S,
355    ) -> Result<Self::Ok, Self::Error>
356    where
357        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
358    {
359        self.encode_array(chunks)
360    }
361
362    fn encode_array_u8<
363        T: IntoIterator<Item = u8> + Send + Unpin + 'en,
364        S: Stream<Item = T> + Send + Unpin + 'en,
365    >(
366        self,
367        chunks: S,
368    ) -> Result<Self::Ok, Self::Error>
369    where
370        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
371    {
372        self.encode_array(chunks)
373    }
374
375    fn encode_array_u16<
376        T: IntoIterator<Item = u16> + Send + Unpin + 'en,
377        S: Stream<Item = T> + Send + Unpin + 'en,
378    >(
379        self,
380        chunks: S,
381    ) -> Result<Self::Ok, Self::Error>
382    where
383        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
384    {
385        self.encode_array(chunks)
386    }
387
388    fn encode_array_u32<
389        T: IntoIterator<Item = u32> + Send + Unpin + 'en,
390        S: Stream<Item = T> + Send + Unpin + 'en,
391    >(
392        self,
393        chunks: S,
394    ) -> Result<Self::Ok, Self::Error>
395    where
396        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
397    {
398        self.encode_array(chunks)
399    }
400
401    fn encode_array_u64<
402        T: IntoIterator<Item = u64> + Send + Unpin + 'en,
403        S: Stream<Item = T> + Send + Unpin + 'en,
404    >(
405        self,
406        chunks: S,
407    ) -> Result<Self::Ok, Self::Error>
408    where
409        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
410    {
411        self.encode_array(chunks)
412    }
413
414    fn encode_array_f32<
415        T: IntoIterator<Item = f32> + Send + Unpin + 'en,
416        S: Stream<Item = T> + Send + Unpin + 'en,
417    >(
418        self,
419        chunks: S,
420    ) -> Result<Self::Ok, Self::Error>
421    where
422        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
423    {
424        self.encode_array(chunks)
425    }
426
427    fn encode_array_f64<
428        T: IntoIterator<Item = f64> + Send + Unpin + 'en,
429        S: Stream<Item = T> + Send + Unpin + 'en,
430    >(
431        self,
432        chunks: S,
433    ) -> Result<Self::Ok, Self::Error>
434    where
435        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
436    {
437        self.encode_array(chunks)
438    }
439
440    #[inline]
441    fn encode_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
442        let mut chunk = BytesMut::with_capacity(v.as_bytes().len() + 2);
443        chunk.extend_from_slice(QUOTE);
444        chunk.extend(escape(v));
445        chunk.extend_from_slice(QUOTE);
446        Ok(Box::pin(futures::stream::once(future::ready(Ok(
447            chunk.into()
448        )))))
449    }
450
451    #[inline]
452    fn encode_none(self) -> Result<Self::Ok, Self::Error> {
453        Ok(encode_fmt("null"))
454    }
455
456    #[inline]
457    fn encode_some<T: IntoStream<'en> + 'en>(self, value: T) -> Result<Self::Ok, Self::Error> {
458        value.into_stream(self)
459    }
460
461    #[inline]
462    fn encode_unit(self) -> Result<Self::Ok, Self::Error> {
463        Ok(encode_fmt("null"))
464    }
465
466    #[inline]
467    fn encode_map(self, size_hint: Option<usize>) -> Result<Self::EncodeMap, Self::Error> {
468        Ok(MapEncoder::new(size_hint))
469    }
470
471    #[inline]
472    fn encode_map_stream<
473        K: IntoStream<'en> + 'en,
474        V: IntoStream<'en> + 'en,
475        S: Stream<Item = (K, V)> + Send + Unpin + 'en,
476    >(
477        self,
478        map: S,
479    ) -> Result<Self::Ok, Self::Error> {
480        Ok(Box::pin(stream::encode_map(map)))
481    }
482
483    #[inline]
484    fn encode_seq(self, size_hint: Option<usize>) -> Result<Self::EncodeSeq, Self::Error> {
485        Ok(SequenceEncoder::new(size_hint))
486    }
487
488    #[inline]
489    fn encode_seq_stream<T: IntoStream<'en> + 'en, S: Stream<Item = T> + Send + Unpin + 'en>(
490        self,
491        seq: S,
492    ) -> Result<Self::Ok, Self::Error> {
493        Ok(Box::pin(stream::encode_list(seq)))
494    }
495
496    #[inline]
497    fn encode_tuple(self, len: usize) -> Result<Self::EncodeTuple, Self::Error> {
498        Ok(SequenceEncoder::new(Some(len)))
499    }
500
501    #[inline]
502    fn encode_uuid(self, uuid: Uuid) -> Result<Self::Ok, Self::Error> {
503        self.encode_str(&uuid.to_string())
504    }
505
506    #[inline]
507    fn collect_bytes<B: IntoIterator<Item = u8>>(self, bytes: B) -> Result<Self::Ok, Self::Error> {
508        self.encode_bytes(bytes.into_iter().collect::<Vec<u8>>())
509    }
510}
511
512#[inline]
513fn escape<T: fmt::Display>(value: T) -> Bytes {
514    let as_str = value.to_string();
515    let mut encoded = BytesMut::with_capacity(as_str.len());
516    for byte in as_str.as_bytes() {
517        let as_slice = std::slice::from_ref(byte);
518        if as_slice == QUOTE || as_slice == ESCAPE {
519            encoded.extend_from_slice(ESCAPE);
520        }
521
522        encoded.put_u8(*byte);
523    }
524
525    encoded.into()
526}
527
528#[inline]
529fn encode_fmt<'en, T: fmt::Display>(value: T) -> JSONStream<'en> {
530    let encoded = escape(value);
531    Box::pin(futures::stream::once(future::ready(Ok(encoded))))
532}
533
534#[inline]
535fn delimiter<'en>(delimiter: &'static [u8]) -> JSONStream<'en> {
536    let encoded = futures::stream::once(future::ready(Ok(Bytes::from_static(delimiter))));
537    Box::pin(encoded)
538}
539
540/// Given an encodable value, return an encoded stream.
541pub fn encode<'en, T: IntoStream<'en> + 'en>(
542    value: T,
543) -> Result<impl Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en, Error> {
544    value.into_stream(Encoder)
545}
546
547/// Given a stream of encodable key-value pairs, return a streaming JSON object.
548pub fn encode_map<
549    'en,
550    K: IntoStream<'en> + 'en,
551    V: IntoStream<'en> + 'en,
552    S: Stream<Item = (K, V)> + Send + Unpin + 'en,
553>(
554    seq: S,
555) -> impl Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en {
556    stream::encode_map(seq)
557}
558
559/// Given a stream of encodable elements, return a streaming JSON list.
560pub fn encode_seq<'en, T: IntoStream<'en> + 'en, S: Stream<Item = T> + Send + Unpin + 'en>(
561    seq: S,
562) -> impl Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en {
563    stream::encode_list(seq)
564}