1use std::collections::VecDeque;
4use std::fmt;
5use std::mem;
6use std::pin::Pin;
7
8use bytes::{BufMut, Bytes, BytesMut};
9use destream::{en, IntoStream};
10use futures::future;
11use futures::stream::{Stream, StreamExt};
12use num_traits::ToPrimitive;
13use uuid::Uuid;
14
15use super::constants::*;
16use super::element::{Element, IntoBytes};
17
18mod stream;
19
20pub type ByteStream<'en> = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en>>;
22
23pub struct Error {
25    message: String,
26}
27
28impl en::Error for Error {
29    fn custom<I: fmt::Display>(info: I) -> Self {
30        Self {
31            message: info.to_string(),
32        }
33    }
34}
35
36impl std::error::Error for Error {}
37
38impl fmt::Debug for Error {
39    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40        fmt::Display::fmt(self, f)
41    }
42}
43
44impl fmt::Display for Error {
45    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
46        f.write_str(&self.message)
47    }
48}
49
50pub struct MapEncoder<'en> {
52    pending_key: Option<ByteStream<'en>>,
53    entries: VecDeque<(ByteStream<'en>, ByteStream<'en>)>,
54}
55
56impl<'en> MapEncoder<'en> {
57    #[inline]
58    fn new(size_hint: Option<usize>) -> Self {
59        let entries = if let Some(len) = size_hint {
60            VecDeque::with_capacity(len)
61        } else {
62            VecDeque::new()
63        };
64
65        Self {
66            pending_key: None,
67            entries,
68        }
69    }
70}
71
72impl<'en> en::EncodeMap<'en> for MapEncoder<'en> {
73    type Ok = ByteStream<'en>;
74    type Error = Error;
75
76    #[inline]
77    fn encode_key<T: en::IntoStream<'en> + 'en>(&mut self, key: T) -> Result<(), Self::Error> {
78        if self.pending_key.is_none() {
79            self.pending_key = Some(key.into_stream(Encoder)?);
80            Ok(())
81        } else {
82            Err(en::Error::custom(
83                "You must call encode_value before calling encode_key again",
84            ))
85        }
86    }
87
88    #[inline]
89    fn encode_value<T: en::IntoStream<'en> + 'en>(&mut self, value: T) -> Result<(), Self::Error> {
90        if self.pending_key.is_none() {
91            return Err(en::Error::custom(
92                "You must call encode_key before encode_value",
93            ));
94        }
95
96        let value = value.into_stream(Encoder)?;
97
98        let mut key = None;
99        mem::swap(&mut self.pending_key, &mut key);
100
101        self.entries.push_back((key.expect("key"), value));
102        Ok(())
103    }
104
105    fn end(mut self) -> Result<Self::Ok, Self::Error> {
106        if self.pending_key.is_some() {
107            return Err(en::Error::custom(
108                "You must call encode_value after calling encode_key",
109            ));
110        }
111
112        let mut encoded = delimiter(MAP_BEGIN);
113
114        while let Some((key, value)) = self.entries.pop_front() {
115            encoded = Box::pin(encoded.chain(key).chain(value));
116        }
117
118        encoded = Box::pin(encoded.chain(delimiter(MAP_END)));
119        Ok(encoded)
120    }
121}
122
123pub struct SequenceEncoder<'en> {
125    items: VecDeque<ByteStream<'en>>,
126}
127
128impl<'en> SequenceEncoder<'en> {
129    #[inline]
130    fn new(size_hint: Option<usize>) -> Self {
131        let items = if let Some(len) = size_hint {
132            VecDeque::with_capacity(len)
133        } else {
134            VecDeque::new()
135        };
136
137        Self { items }
138    }
139
140    #[inline]
141    fn push(&mut self, value: ByteStream<'en>) {
142        self.items.push_back(value);
143    }
144
145    fn encode(mut self) -> Result<ByteStream<'en>, Error> {
146        let mut encoded = delimiter(LIST_BEGIN);
147
148        while let Some(item) = self.items.pop_front() {
149            encoded = Box::pin(encoded.chain(item));
150        }
151
152        encoded = Box::pin(encoded.chain(delimiter(LIST_END)));
153        Ok(encoded)
154    }
155}
156
157impl<'en> en::EncodeSeq<'en> for SequenceEncoder<'en> {
158    type Ok = ByteStream<'en>;
159    type Error = Error;
160
161    #[inline]
162    fn encode_element<T: en::IntoStream<'en> + 'en>(
163        &mut self,
164        value: T,
165    ) -> Result<(), Self::Error> {
166        let encoded = value.into_stream(Encoder)?;
167        self.push(encoded);
168        Ok(())
169    }
170
171    fn end(self) -> Result<Self::Ok, Self::Error> {
172        self.encode()
173    }
174}
175
176impl<'en> en::EncodeTuple<'en> for SequenceEncoder<'en> {
177    type Ok = ByteStream<'en>;
178    type Error = Error;
179
180    #[inline]
181    fn encode_element<T: en::IntoStream<'en> + 'en>(
182        &mut self,
183        value: T,
184    ) -> Result<(), Self::Error> {
185        let encoded = value.into_stream(Encoder)?;
186        self.push(encoded);
187        Ok(())
188    }
189
190    fn end(self) -> Result<Self::Ok, Self::Error> {
191        self.encode()
192    }
193}
194
195pub struct Encoder;
197
198impl Encoder {
199    #[inline]
200    fn encode_type<'en>(&self, dtype: &Type, value: &[u8]) -> Result<ByteStream<'en>, Error> {
201        let mut chunk = BytesMut::with_capacity(value.len() + 1);
202        chunk.put_u8(dtype.to_u8().expect("type bit"));
203        chunk.extend_from_slice(value);
204
205        Ok(Box::pin(futures::stream::once(future::ready(Ok(
206            chunk.into()
207        )))))
208    }
209
210    #[inline]
211    fn encode_string_type<'en>(
212        &self,
213        start: u8,
214        value: &[u8],
215        end: u8,
216    ) -> Result<ByteStream<'en>, Error> {
217        let mut chunk = BytesMut::with_capacity(value.len() + 2);
218        chunk.put_u8(start);
219        chunk.extend(self.escape(value, &[start, end]));
220        chunk.put_u8(end);
221
222        Ok(Box::pin(futures::stream::once(future::ready(Ok(
223            chunk.into()
224        )))))
225    }
226
227    fn escape(&self, value: &[u8], control: &[u8]) -> Vec<u8> {
228        let mut escaped = Vec::with_capacity(value.len() * 2);
229        for char in value {
230            if control.contains(char) || char == &ESCAPE[0] {
231                escaped.push(ESCAPE[0])
232            }
233
234            escaped.push(*char);
235        }
236
237        escaped
238    }
239}
240
241impl<'en> en::Encoder<'en> for Encoder {
242    type Ok = ByteStream<'en>;
243    type Error = Error;
244    type EncodeMap = MapEncoder<'en>;
245    type EncodeSeq = SequenceEncoder<'en>;
246    type EncodeTuple = SequenceEncoder<'en>;
247
248    #[inline]
249    fn encode_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
250        let value = if v { TRUE } else { FALSE };
251        self.encode_type(&Type::Bool, value)
252    }
253
254    #[inline]
255    fn encode_bytes<B: Into<Bytes>>(self, bytes: B) -> Result<Self::Ok, Self::Error> {
256        self.collect_bytes(bytes.into())
257    }
258
259    #[inline]
260    fn encode_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
261        self.encode_type(&Type::I8, &v.to_be_bytes())
262    }
263
264    fn encode_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
265        self.encode_type(&Type::I16, &v.to_be_bytes())
266    }
267
268    #[inline]
269    fn encode_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
270        self.encode_type(&Type::I32, &v.to_be_bytes())
271    }
272
273    #[inline]
274    fn encode_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
275        self.encode_type(&Type::I64, &v.to_be_bytes())
276    }
277
278    #[inline]
279    fn encode_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
280        self.encode_type(&Type::U8, &v.to_be_bytes())
281    }
282
283    #[inline]
284    fn encode_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
285        self.encode_type(&Type::U16, &v.to_be_bytes())
286    }
287
288    #[inline]
289    fn encode_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
290        self.encode_type(&Type::U32, &v.to_be_bytes())
291    }
292
293    #[inline]
294    fn encode_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
295        self.encode_type(&Type::U64, &v.to_be_bytes())
296    }
297
298    #[inline]
299    fn encode_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
300        self.encode_type(&Type::F32, &v.to_be_bytes())
301    }
302
303    #[inline]
304    fn encode_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
305        self.encode_type(&Type::F64, &v.to_be_bytes())
306    }
307
308    fn encode_array_bool<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
309    where
310        T: IntoIterator<Item = bool> + Send + Unpin + 'en,
311        S: Stream<Item = T> + Send + Unpin + 'en,
312        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
313    {
314        Ok(encode_array(Type::Bool, chunks))
315    }
316
317    fn encode_array_i8<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
318    where
319        T: IntoIterator<Item = i8> + Send + Unpin + 'en,
320        S: Stream<Item = T> + Send + Unpin + 'en,
321        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
322    {
323        Ok(encode_array(Type::I8, chunks))
324    }
325
326    fn encode_array_i16<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
327    where
328        T: IntoIterator<Item = i16> + Send + Unpin + 'en,
329        S: Stream<Item = T> + Send + Unpin + 'en,
330        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
331    {
332        Ok(encode_array(Type::I16, chunks))
333    }
334
335    fn encode_array_i32<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
336    where
337        T: IntoIterator<Item = i32> + Send + Unpin + 'en,
338        S: Stream<Item = T> + Send + Unpin + 'en,
339        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
340    {
341        Ok(encode_array(Type::I32, chunks))
342    }
343
344    fn encode_array_i64<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
345    where
346        T: IntoIterator<Item = i64> + Send + Unpin + 'en,
347        S: Stream<Item = T> + Send + Unpin + 'en,
348        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
349    {
350        Ok(encode_array(Type::I64, chunks))
351    }
352
353    fn encode_array_u8<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
354    where
355        T: IntoIterator<Item = u8> + Send + Unpin + 'en,
356        S: Stream<Item = T> + Send + Unpin + 'en,
357        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
358    {
359        Ok(encode_array(Type::U8, chunks))
360    }
361
362    fn encode_array_u16<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
363    where
364        T: IntoIterator<Item = u16> + Send + Unpin + 'en,
365        S: Stream<Item = T> + Send + Unpin + 'en,
366        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
367    {
368        Ok(encode_array(Type::U16, chunks))
369    }
370
371    fn encode_array_u32<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
372    where
373        T: IntoIterator<Item = u32> + Send + Unpin + 'en,
374        S: Stream<Item = T> + Send + Unpin + 'en,
375        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
376    {
377        Ok(encode_array(Type::U32, chunks))
378    }
379
380    fn encode_array_u64<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
381    where
382        T: IntoIterator<Item = u64> + Send + Unpin + 'en,
383        S: Stream<Item = T> + Send + Unpin + 'en,
384        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
385    {
386        Ok(encode_array(Type::U64, chunks))
387    }
388
389    fn encode_array_f32<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
390    where
391        T: IntoIterator<Item = f32> + Send + Unpin + 'en,
392        S: Stream<Item = T> + Send + Unpin + 'en,
393        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
394    {
395        Ok(encode_array(Type::F32, chunks))
396    }
397
398    fn encode_array_f64<T, S>(self, chunks: S) -> Result<Self::Ok, Self::Error>
399    where
400        T: IntoIterator<Item = f64> + Send + Unpin + 'en,
401        S: Stream<Item = T> + Send + Unpin + 'en,
402        <T as IntoIterator>::IntoIter: Send + Unpin + 'en,
403    {
404        Ok(encode_array(Type::F64, chunks))
405    }
406
407    #[inline]
408    fn encode_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
409        self.encode_string_type(STRING_DELIMIT[0], v.as_bytes(), STRING_DELIMIT[0])
410    }
411
412    #[inline]
413    fn encode_none(self) -> Result<Self::Ok, Self::Error> {
414        Ok(Box::pin(futures::stream::once(future::ready(Ok(
415            Bytes::from(vec![(&Type::None).to_u8().expect("type bit")]),
416        )))))
417    }
418
419    #[inline]
420    fn encode_some<T: en::IntoStream<'en> + 'en>(self, value: T) -> Result<Self::Ok, Self::Error> {
421        value.into_stream(self)
422    }
423
424    #[inline]
425    fn encode_unit(self) -> Result<Self::Ok, Self::Error> {
426        self.encode_none()
427    }
428
429    #[inline]
430    fn encode_uuid(self, uuid: Uuid) -> Result<Self::Ok, Self::Error> {
431        self.collect_bytes(uuid.as_bytes().into_iter().copied())
432    }
433
434    #[inline]
435    fn encode_map(self, size_hint: Option<usize>) -> Result<Self::EncodeMap, Self::Error> {
436        Ok(MapEncoder::new(size_hint))
437    }
438
439    #[inline]
440    fn encode_map_stream<K, V, S>(self, map: S) -> Result<Self::Ok, Self::Error>
441    where
442        K: en::IntoStream<'en> + 'en,
443        V: en::IntoStream<'en> + 'en,
444        S: Stream<Item = (K, V)> + Send + Unpin + 'en,
445    {
446        Ok(Box::pin(stream::encode_map(map)))
447    }
448
449    #[inline]
450    fn encode_seq(self, size_hint: Option<usize>) -> Result<Self::EncodeSeq, Self::Error> {
451        Ok(SequenceEncoder::new(size_hint))
452    }
453
454    #[inline]
455    fn encode_seq_stream<T: en::IntoStream<'en> + 'en, S: Stream<Item = T> + Send + Unpin + 'en>(
456        self,
457        seq: S,
458    ) -> Result<Self::Ok, Self::Error> {
459        Ok(Box::pin(stream::encode_list(seq)))
460    }
461
462    #[inline]
463    fn encode_tuple(self, len: usize) -> Result<Self::EncodeTuple, Self::Error> {
464        Ok(SequenceEncoder::new(Some(len)))
465    }
466
467    #[inline]
468    fn collect_bytes<B: IntoIterator<Item = u8>>(self, bytes: B) -> Result<Self::Ok, Self::Error> {
469        let bytes = bytes.into_iter();
470        let mut array = match bytes.size_hint() {
471            (0, None) | (0, Some(usize::MAX)) => Vec::new(),
472            (_min, Some(max)) => Vec::with_capacity(max + 3),
473            (min, None) => Vec::with_capacity(min),
474        };
475
476        array.extend_from_slice(ARRAY_DELIMIT);
477        array.push(u8::dtype().to_u8().expect("type bit"));
478
479        for byte in bytes {
480            let as_slice = std::slice::from_ref(&byte);
481            if as_slice == ARRAY_DELIMIT || as_slice == ESCAPE {
482                array.extend_from_slice(ESCAPE);
483            }
484
485            array.put_u8(byte);
486        }
487
488        array.extend_from_slice(ARRAY_DELIMIT);
489
490        let array: ByteStream = Box::pin(futures::stream::once(future::ready(Ok(array.into()))));
491        Ok(array)
492    }
493}
494
495#[inline]
496fn delimiter<'en>(delimiter: &'static [u8]) -> ByteStream<'en> {
497    Box::pin(futures::stream::once(future::ready(Ok(
498        Bytes::from_static(delimiter),
499    ))))
500}
501
502pub fn encode<'en, T: IntoStream<'en> + 'en>(
504    value: T,
505) -> Result<impl Stream<Item = Result<Bytes, Error>> + 'en, Error> {
506    value.into_stream(Encoder)
507}
508
509pub fn encode_map<'en, K, V, S>(
511    seq: S,
512) -> impl Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en
513where
514    K: IntoStream<'en> + 'en,
515    V: IntoStream<'en> + 'en,
516    S: Stream<Item = (K, V)> + Send + Unpin + 'en,
517{
518    stream::encode_map(seq)
519}
520
521pub fn encode_seq<'en, T, S>(
523    seq: S,
524) -> impl Stream<Item = Result<Bytes, Error>> + Send + Unpin + 'en
525where
526    T: IntoStream<'en> + 'en,
527    S: Stream<Item = T> + Send + Unpin + 'en,
528{
529    stream::encode_list(seq)
530}
531
532fn encode_array<'en, const SIZE: usize, E, T, S>(dtype: Type, chunks: S) -> ByteStream<'en>
533where
534    E: IntoBytes<SIZE>,
535    T: IntoIterator<Item = E>,
536    S: Stream<Item = T> + Send + Unpin + 'en,
537{
538    let mut start = BytesMut::with_capacity(2);
539    start.extend_from_slice(ARRAY_DELIMIT);
540    start.put_u8(dtype.to_u8().expect("type bit"));
541
542    let start = futures::stream::once(future::ready(Ok(Bytes::from(start))));
543    let end = delimiter(ARRAY_DELIMIT);
544
545    let contents = chunks.map(|chunk| {
546        let mut encoded = BytesMut::new();
547
548        for b in chunk.into_iter() {
549            for byte in b.into_bytes() {
550                let as_slice = std::slice::from_ref(&byte);
551                if as_slice == ARRAY_DELIMIT || as_slice == ESCAPE {
552                    encoded.extend_from_slice(ESCAPE);
553                }
554
555                encoded.put_u8(byte);
556            }
557        }
558
559        Ok(encoded.into())
560    });
561
562    let encoded: ByteStream = Box::pin(start.chain(contents).chain(end));
563    encoded
564}