1use std::collections::VecDeque;
4use std::fmt;
5use std::mem;
6use std::pin::Pin;
7
8use bytes::{BufMut, Bytes, BytesMut};
9use destream::en::{self, IntoStream};
10use futures::future;
11use futures::stream::{Stream, StreamExt};
12use uuid::Uuid;
13
14use crate::constants::*;
15
16mod stream;
17
18pub type JSONStream<'en> = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en>>;
20
21pub struct Error {
23 message: String,
24}
25
26impl std::error::Error for Error {}
27
28impl en::Error for Error {
29 fn custom<I: fmt::Display>(info: I) -> Self {
30 let message = info.to_string();
31 Self { message }
32 }
33}
34
35impl fmt::Debug for Error {
36 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37 fmt::Display::fmt(self, f)
38 }
39}
40
41impl fmt::Display for Error {
42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43 f.write_str(&self.message)
44 }
45}
46
47struct MapEncoder<'en> {
48 pending_key: Option<JSONStream<'en>>,
49 entries: VecDeque<(JSONStream<'en>, JSONStream<'en>)>,
50}
51
52impl<'en> MapEncoder<'en> {
53 #[inline]
54 fn new(size_hint: Option<usize>) -> Self {
55 let entries = if let Some(len) = size_hint {
56 VecDeque::with_capacity(len)
57 } else {
58 VecDeque::new()
59 };
60
61 Self {
62 pending_key: None,
63 entries,
64 }
65 }
66}
67
68impl<'en> en::EncodeMap<'en> for MapEncoder<'en> {
69 type Ok = JSONStream<'en>;
70 type Error = Error;
71
72 #[inline]
73 fn encode_key<T: IntoStream<'en> + 'en>(&mut self, key: T) -> Result<(), Self::Error> {
74 if self.pending_key.is_none() {
75 self.pending_key = Some(key.into_stream(Encoder)?);
76 Ok(())
77 } else {
78 Err(en::Error::custom(
79 "You must call encode_value before calling encode_key again",
80 ))
81 }
82 }
83
84 #[inline]
85 fn encode_value<T: IntoStream<'en> + 'en>(&mut self, value: T) -> Result<(), Self::Error> {
86 if self.pending_key.is_none() {
87 return Err(en::Error::custom(
88 "You must call encode_key before encode_value",
89 ));
90 }
91
92 let value = value.into_stream(Encoder)?;
93
94 let mut key = None;
95 mem::swap(&mut self.pending_key, &mut key);
96
97 self.entries.push_back((key.unwrap(), value));
98 Ok(())
99 }
100
101 fn end(mut self) -> Result<Self::Ok, Self::Error> {
102 if self.pending_key.is_some() {
103 return Err(en::Error::custom(
104 "You must call encode_value after calling encode_key",
105 ));
106 }
107
108 let mut encoded = delimiter(MAP_BEGIN);
109
110 while let Some((key, value)) = self.entries.pop_front() {
111 encoded = Box::pin(encoded.chain(key).chain(delimiter(COLON)).chain(value));
112
113 if !self.entries.is_empty() {
114 encoded = Box::pin(encoded.chain(delimiter(COMMA)));
115 }
116 }
117
118 encoded = Box::pin(encoded.chain(delimiter(MAP_END)));
119 Ok(encoded)
120 }
121}
122
123struct SequenceEncoder<'en> {
124 items: VecDeque<JSONStream<'en>>,
125}
126
127impl<'en> SequenceEncoder<'en> {
128 #[inline]
129 fn new(size_hint: Option<usize>) -> Self {
130 let items = if let Some(len) = size_hint {
131 VecDeque::with_capacity(len)
132 } else {
133 VecDeque::new()
134 };
135
136 Self { items }
137 }
138
139 #[inline]
140 fn push(&mut self, value: JSONStream<'en>) {
141 self.items.push_back(value);
142 }
143
144 fn encode(mut self) -> Result<JSONStream<'en>, Error> {
145 let mut encoded = delimiter(LIST_BEGIN);
146
147 while let Some(item) = self.items.pop_front() {
148 encoded = Box::pin(encoded.chain(item));
149
150 if !self.items.is_empty() {
151 encoded = Box::pin(encoded.chain(delimiter(COMMA)));
152 }
153 }
154
155 encoded = Box::pin(encoded.chain(delimiter(LIST_END)));
156 Ok(encoded)
157 }
158}
159
160impl<'en> en::EncodeSeq<'en> for SequenceEncoder<'en> {
161 type Ok = JSONStream<'en>;
162 type Error = Error;
163
164 #[inline]
165 fn encode_element<T: IntoStream<'en> + 'en>(&mut self, value: T) -> Result<(), Self::Error> {
166 let encoded = value.into_stream(Encoder)?;
167 self.push(encoded);
168 Ok(())
169 }
170
171 fn end(self) -> Result<Self::Ok, Self::Error> {
172 self.encode()
173 }
174}
175
176impl<'en> en::EncodeTuple<'en> for SequenceEncoder<'en> {
177 type Ok = JSONStream<'en>;
178 type Error = Error;
179
180 #[inline]
181 fn encode_element<T: IntoStream<'en> + 'en>(&mut self, value: T) -> Result<(), Self::Error> {
182 let encoded = value.into_stream(Encoder)?;
183 self.push(encoded);
184 Ok(())
185 }
186
187 fn end(self) -> Result<Self::Ok, Self::Error> {
188 self.encode()
189 }
190}
191
192struct Encoder;
193
194impl Encoder {
195 fn encode_array<
196 'en,
197 E: IntoStream<'en> + 'en,
198 T: IntoIterator<Item = E> + Send + Unpin + 'en,
199 S: Stream<Item = T> + Send + Unpin + 'en,
200 >(
201 self,
202 chunks: S,
203 ) -> Result<JSONStream<'en>, Error>
204 where
205 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
206 {
207 let sequence = chunks.map(futures::stream::iter).flatten();
208
209 en::Encoder::encode_seq_stream(self, sequence)
210 }
211}
212
213impl<'en> en::Encoder<'en> for Encoder {
214 type Ok = JSONStream<'en>;
215 type Error = Error;
216 type EncodeMap = MapEncoder<'en>;
217 type EncodeSeq = SequenceEncoder<'en>;
218 type EncodeTuple = SequenceEncoder<'en>;
219
220 #[inline]
221 fn encode_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
222 Ok(encode_fmt(v))
223 }
224
225 #[inline]
226 fn encode_bytes<B: Into<Bytes>>(self, v: B) -> Result<Self::Ok, Self::Error> {
227 use base64::engine::general_purpose::STANDARD;
228 use base64::engine::Engine;
229
230 self.encode_str(&STANDARD.encode(v.into()))
231 }
232
233 #[inline]
234 fn encode_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
235 Ok(encode_fmt(v))
236 }
237
238 #[inline]
239 fn encode_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
240 Ok(encode_fmt(v))
241 }
242
243 #[inline]
244 fn encode_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
245 Ok(encode_fmt(v))
246 }
247
248 #[inline]
249 fn encode_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
250 Ok(encode_fmt(v))
251 }
252
253 #[inline]
254 fn encode_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
255 Ok(encode_fmt(v))
256 }
257
258 #[inline]
259 fn encode_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
260 Ok(encode_fmt(v))
261 }
262
263 #[inline]
264 fn encode_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
265 Ok(encode_fmt(v))
266 }
267
268 #[inline]
269 fn encode_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
270 Ok(encode_fmt(v))
271 }
272
273 #[inline]
274 fn encode_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
275 if v.is_nan() || v.is_infinite() {
276 Err(en::Error::custom(format!(
277 "JSON encoding does not support floating-point value {}",
278 v
279 )))
280 } else {
281 Ok(encode_fmt(v))
282 }
283 }
284
285 #[inline]
286 fn encode_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
287 if v.is_nan() || v.is_infinite() {
288 Err(en::Error::custom(format!(
289 "JSON encoding does not support floating-point value {}",
290 v
291 )))
292 } else {
293 Ok(encode_fmt(v))
294 }
295 }
296
297 fn encode_array_bool<
298 T: IntoIterator<Item = bool> + Send + Unpin + 'en,
299 S: Stream<Item = T> + Send + Unpin + 'en,
300 >(
301 self,
302 chunks: S,
303 ) -> Result<Self::Ok, Self::Error>
304 where
305 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
306 {
307 self.encode_array(chunks)
308 }
309
310 fn encode_array_i8<
311 T: IntoIterator<Item = i8> + Send + Unpin + 'en,
312 S: Stream<Item = T> + Send + Unpin + 'en,
313 >(
314 self,
315 chunks: S,
316 ) -> Result<Self::Ok, Self::Error>
317 where
318 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
319 {
320 self.encode_array(chunks)
321 }
322
323 fn encode_array_i16<
324 T: IntoIterator<Item = i16> + Send + Unpin + 'en,
325 S: Stream<Item = T> + Send + Unpin + 'en,
326 >(
327 self,
328 chunks: S,
329 ) -> Result<Self::Ok, Self::Error>
330 where
331 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
332 {
333 self.encode_array(chunks)
334 }
335
336 fn encode_array_i32<
337 T: IntoIterator<Item = i32> + Send + Unpin + 'en,
338 S: Stream<Item = T> + Send + Unpin + 'en,
339 >(
340 self,
341 chunks: S,
342 ) -> Result<Self::Ok, Self::Error>
343 where
344 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
345 {
346 self.encode_array(chunks)
347 }
348
349 fn encode_array_i64<
350 T: IntoIterator<Item = i64> + Send + Unpin + 'en,
351 S: Stream<Item = T> + Send + Unpin + 'en,
352 >(
353 self,
354 chunks: S,
355 ) -> Result<Self::Ok, Self::Error>
356 where
357 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
358 {
359 self.encode_array(chunks)
360 }
361
362 fn encode_array_u8<
363 T: IntoIterator<Item = u8> + Send + Unpin + 'en,
364 S: Stream<Item = T> + Send + Unpin + 'en,
365 >(
366 self,
367 chunks: S,
368 ) -> Result<Self::Ok, Self::Error>
369 where
370 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
371 {
372 self.encode_array(chunks)
373 }
374
375 fn encode_array_u16<
376 T: IntoIterator<Item = u16> + Send + Unpin + 'en,
377 S: Stream<Item = T> + Send + Unpin + 'en,
378 >(
379 self,
380 chunks: S,
381 ) -> Result<Self::Ok, Self::Error>
382 where
383 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
384 {
385 self.encode_array(chunks)
386 }
387
388 fn encode_array_u32<
389 T: IntoIterator<Item = u32> + Send + Unpin + 'en,
390 S: Stream<Item = T> + Send + Unpin + 'en,
391 >(
392 self,
393 chunks: S,
394 ) -> Result<Self::Ok, Self::Error>
395 where
396 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
397 {
398 self.encode_array(chunks)
399 }
400
401 fn encode_array_u64<
402 T: IntoIterator<Item = u64> + Send + Unpin + 'en,
403 S: Stream<Item = T> + Send + Unpin + 'en,
404 >(
405 self,
406 chunks: S,
407 ) -> Result<Self::Ok, Self::Error>
408 where
409 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
410 {
411 self.encode_array(chunks)
412 }
413
414 fn encode_array_f32<
415 T: IntoIterator<Item = f32> + Send + Unpin + 'en,
416 S: Stream<Item = T> + Send + Unpin + 'en,
417 >(
418 self,
419 chunks: S,
420 ) -> Result<Self::Ok, Self::Error>
421 where
422 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
423 {
424 self.encode_array(chunks)
425 }
426
427 fn encode_array_f64<
428 T: IntoIterator<Item = f64> + Send + Unpin + 'en,
429 S: Stream<Item = T> + Send + Unpin + 'en,
430 >(
431 self,
432 chunks: S,
433 ) -> Result<Self::Ok, Self::Error>
434 where
435 <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
436 {
437 self.encode_array(chunks)
438 }
439
440 #[inline]
441 fn encode_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
442 let mut chunk = BytesMut::with_capacity(v.as_bytes().len() + 2);
443 chunk.extend_from_slice(QUOTE);
444 chunk.extend(escape(v));
445 chunk.extend_from_slice(QUOTE);
446 Ok(Box::pin(futures::stream::once(future::ready(Ok(
447 chunk.into()
448 )))))
449 }
450
451 #[inline]
452 fn encode_none(self) -> Result<Self::Ok, Self::Error> {
453 Ok(encode_fmt("null"))
454 }
455
456 #[inline]
457 fn encode_some<T: IntoStream<'en> + 'en>(self, value: T) -> Result<Self::Ok, Self::Error> {
458 value.into_stream(self)
459 }
460
461 #[inline]
462 fn encode_unit(self) -> Result<Self::Ok, Self::Error> {
463 Ok(encode_fmt("null"))
464 }
465
466 #[inline]
467 fn encode_map(self, size_hint: Option<usize>) -> Result<Self::EncodeMap, Self::Error> {
468 Ok(MapEncoder::new(size_hint))
469 }
470
471 #[inline]
472 fn encode_map_stream<
473 K: IntoStream<'en> + 'en,
474 V: IntoStream<'en> + 'en,
475 S: Stream<Item = (K, V)> + Send + Unpin + 'en,
476 >(
477 self,
478 map: S,
479 ) -> Result<Self::Ok, Self::Error> {
480 Ok(Box::pin(stream::encode_map(map)))
481 }
482
483 #[inline]
484 fn encode_seq(self, size_hint: Option<usize>) -> Result<Self::EncodeSeq, Self::Error> {
485 Ok(SequenceEncoder::new(size_hint))
486 }
487
488 #[inline]
489 fn encode_seq_stream<T: IntoStream<'en> + 'en, S: Stream<Item = T> + Send + Unpin + 'en>(
490 self,
491 seq: S,
492 ) -> Result<Self::Ok, Self::Error> {
493 Ok(Box::pin(stream::encode_list(seq)))
494 }
495
496 #[inline]
497 fn encode_tuple(self, len: usize) -> Result<Self::EncodeTuple, Self::Error> {
498 Ok(SequenceEncoder::new(Some(len)))
499 }
500
501 #[inline]
502 fn encode_uuid(self, uuid: Uuid) -> Result<Self::Ok, Self::Error> {
503 self.encode_str(&uuid.to_string())
504 }
505
506 #[inline]
507 fn collect_bytes<B: IntoIterator<Item = u8>>(self, bytes: B) -> Result<Self::Ok, Self::Error> {
508 self.encode_bytes(bytes.into_iter().collect::<Vec<u8>>())
509 }
510}
511
512#[inline]
513fn escape<T: fmt::Display>(value: T) -> Bytes {
514 let as_str = value.to_string();
515 let mut encoded = BytesMut::with_capacity(as_str.len());
516 for byte in as_str.as_bytes() {
517 let as_slice = std::slice::from_ref(byte);
518 if as_slice == QUOTE || as_slice == ESCAPE {
519 encoded.extend_from_slice(ESCAPE);
520 }
521
522 encoded.put_u8(*byte);
523 }
524
525 encoded.into()
526}
527
528#[inline]
529fn encode_fmt<'en, T: fmt::Display>(value: T) -> JSONStream<'en> {
530 let encoded = escape(value);
531 Box::pin(futures::stream::once(future::ready(Ok(encoded))))
532}
533
534#[inline]
535fn delimiter<'en>(delimiter: &'static [u8]) -> JSONStream<'en> {
536 let encoded = futures::stream::once(future::ready(Ok(Bytes::from_static(delimiter))));
537 Box::pin(encoded)
538}
539
540pub fn encode<'en, T: IntoStream<'en> + 'en>(
542 value: T,
543) -> Result<impl Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en, Error> {
544 value.into_stream(Encoder)
545}
546
547pub fn encode_map<
549 'en,
550 K: IntoStream<'en> + 'en,
551 V: IntoStream<'en> + 'en,
552 S: Stream<Item = (K, V)> + Send + Unpin + 'en,
553>(
554 seq: S,
555) -> impl Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en {
556 stream::encode_map(seq)
557}
558
559pub fn encode_seq<'en, T: IntoStream<'en> + 'en, S: Stream<Item = T> + Send + Unpin + 'en>(
561 seq: S,
562) -> impl Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en {
563 stream::encode_list(seq)
564}