tbon 0.9.0

TinyChain Binary Object Notation is a compact and versatile stream-friendly binary serialization format
Documentation
use std::pin::Pin;
use std::task::{self, Poll};

use bytes::{Bytes, BytesMut};
use destream::en::{self, IntoStream};
use futures::ready;
use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
use futures::task::Context;
use pin_project::pin_project;
use std::collections::VecDeque;

use crate::constants::*;

use super::{ByteStream, Encoder};

#[pin_project]
struct MapEntryStream<'en> {
    #[pin]
    key: Fuse<ByteStream<'en>>,

    #[pin]
    value: Fuse<ByteStream<'en>>,
}

impl<'en> MapEntryStream<'en> {
    fn new<K: IntoStream<'en>, V: IntoStream<'en>>(key: K, value: V) -> Result<Self, super::Error> {
        let key = key.into_stream(Encoder)?;
        let value = value.into_stream(Encoder)?;

        Ok(Self {
            key: key.fuse(),
            value: value.fuse(),
        })
    }

    pub(super) fn from_streams(key: ByteStream<'en>, value: ByteStream<'en>) -> Self {
        Self {
            key: key.fuse(),
            value: value.fuse(),
        }
    }
}

impl<'en> Stream for MapEntryStream<'en> {
    type Item = Result<Bytes, super::Error>;

    fn poll_next(self: Pin<&mut Self>, cxt: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        Poll::Ready(loop {
            if !this.key.is_terminated() {
                if let Some(result) = ready!(this.key.as_mut().poll_next(cxt)) {
                    break Some(result);
                }
            } else if !this.value.is_terminated() {
                if let Some(result) = ready!(this.value.as_mut().poll_next(cxt)) {
                    break Some(result);
                }
            } else {
                break None;
            }
        })
    }
}

impl<'en> FusedStream for MapEntryStream<'en> {
    fn is_terminated(&self) -> bool {
        self.key.is_terminated() && self.value.is_terminated()
    }
}

#[pin_project]
struct TBONEncodingStream<
    I: Stream<Item = Result<Bytes, super::Error>>,
    S: Stream<Item = Result<I, super::Error>>,
> {
    #[pin]
    source: Fuse<S>,

    next: Option<Pin<Box<I>>>,

    started: bool,
    finished: bool,

    start: &'static [u8; 1],
    end: &'static [u8; 1],
}

impl<I: Stream<Item = Result<Bytes, super::Error>>, S: Stream<Item = Result<I, super::Error>>>
    Stream for TBONEncodingStream<I, S>
{
    type Item = Result<Bytes, super::Error>;

    fn poll_next(self: Pin<&mut Self>, cxt: &mut task::Context) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        Poll::Ready(loop {
            match this.next {
                Some(next) => match ready!(next.as_mut().poll_next(cxt)) {
                    Some(result) => break Some(result),
                    None => *this.next = None,
                },
                None => match ready!(this.source.as_mut().poll_next(cxt)) {
                    Some(Ok(next)) => {
                        *this.next = Some(Box::pin(next));

                        if !*this.started {
                            *this.started = true;
                            break Some(Ok(Bytes::from_static(*this.start)));
                        }
                    }
                    Some(Err(cause)) => break Some(Err(en::Error::custom(cause))),
                    None if !*this.started => {
                        *this.started = true;
                        break Some(Ok(Bytes::from_static(*this.start)));
                    }
                    None if !*this.finished => {
                        *this.finished = true;
                        break Some(Ok(Bytes::from_static(*this.end)));
                    }
                    None => break None,
                },
            }
        })
    }
}

impl<I: Stream<Item = Result<Bytes, super::Error>>, S: Stream<Item = Result<I, super::Error>>>
    FusedStream for TBONEncodingStream<I, S>
{
    fn is_terminated(&self) -> bool {
        self.finished
    }
}

pub fn encode_list<'en, I: IntoStream<'en>, S: Stream<Item = I> + Send + Unpin + 'en>(
    seq: S,
) -> impl Stream<Item = Result<Bytes, super::Error>> + 'en {
    let source = seq.map(|item| item.into_stream(Encoder));

    TBONEncodingStream {
        source: source.fuse(),
        next: None,
        started: false,
        finished: false,
        start: LIST_BEGIN,
        end: LIST_END,
    }
}

pub fn encode_map<
    'en,
    K: IntoStream<'en>,
    V: IntoStream<'en>,
    S: Stream<Item = (K, V)> + Send + Unpin + 'en,
>(
    seq: S,
) -> impl Stream<Item = Result<Bytes, super::Error>> + Send + Unpin + 'en {
    let source = seq.map(|(key, value)| MapEntryStream::new(key, value));

    TBONEncodingStream {
        source: source.fuse(),
        next: None,
        started: false,
        finished: false,
        start: MAP_BEGIN,
        end: MAP_END,
    }
}

pub(super) fn encode_list_encoded<'en>(items: VecDeque<ByteStream<'en>>) -> ByteStream<'en> {
    let source = futures::stream::iter(items.into_iter().map(Ok));

    Box::pin(TBONEncodingStream {
        source: source.fuse(),
        next: None,
        started: false,
        finished: false,
        start: LIST_BEGIN,
        end: LIST_END,
    })
}

pub(super) fn encode_map_encoded<'en>(
    entries: VecDeque<(ByteStream<'en>, ByteStream<'en>)>,
) -> ByteStream<'en> {
    let source = futures::stream::iter(
        entries
            .into_iter()
            .map(|(key, value)| Ok(MapEntryStream::from_streams(key, value))),
    );

    Box::pin(TBONEncodingStream {
        source: source.fuse(),
        next: None,
        started: false,
        finished: false,
        start: MAP_BEGIN,
        end: MAP_END,
    })
}

#[pin_project]
pub(crate) struct CoalesceStream<S> {
    #[pin]
    source: S,
    buffer: BytesMut,
    pending: Option<Bytes>,
    target: usize,
    done: bool,
}

impl<S> CoalesceStream<S> {
    pub(crate) fn new(source: S, target: usize) -> Self {
        assert!(target > 0);

        Self {
            source,
            buffer: BytesMut::with_capacity(target),
            pending: None,
            target,
            done: false,
        }
    }
}

impl<S: Stream<Item = Result<Bytes, super::Error>>> Stream for CoalesceStream<S> {
    type Item = Result<Bytes, super::Error>;

    fn poll_next(self: Pin<&mut Self>, cxt: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        if *this.done {
            return Poll::Ready(None);
        }

        if this.buffer.len() >= *this.target {
            let out = this.buffer.split().freeze();
            return Poll::Ready(Some(Ok(out)));
        }

        if let Some(pending) = this.pending.take() {
            if this.buffer.is_empty() && pending.len() >= *this.target {
                return Poll::Ready(Some(Ok(pending)));
            } else {
                this.buffer.extend_from_slice(&pending);
                if this.buffer.len() >= *this.target {
                    let out = this.buffer.split().freeze();
                    return Poll::Ready(Some(Ok(out)));
                }
            }
        }

        loop {
            match ready!(this.source.as_mut().poll_next(cxt)) {
                Some(Ok(chunk)) => {
                    if chunk.is_empty() {
                        continue;
                    }

                    if chunk.len() >= *this.target {
                        if this.buffer.is_empty() {
                            break Poll::Ready(Some(Ok(chunk)));
                        } else {
                            *this.pending = Some(chunk);
                            let out = this.buffer.split().freeze();
                            break Poll::Ready(Some(Ok(out)));
                        }
                    } else {
                        this.buffer.extend_from_slice(&chunk);
                        if this.buffer.len() >= *this.target {
                            let out = this.buffer.split().freeze();
                            break Poll::Ready(Some(Ok(out)));
                        }
                    }
                }
                Some(Err(cause)) => {
                    *this.done = true;
                    break Poll::Ready(Some(Err(cause)));
                }
                None => {
                    *this.done = true;
                    if this.buffer.is_empty() {
                        break Poll::Ready(None);
                    } else {
                        let out = this.buffer.split().freeze();
                        break Poll::Ready(Some(Ok(out)));
                    }
                }
            }
        }
    }
}