twitter-v2 0.1.8

Twitter API v2
Documentation
use crate::{Error, Result};
use futures::prelude::*;
use pin_project_lite::pin_project;
use serde::de::DeserializeOwned;
use serde_json::de::{SliceRead, StreamDeserializer};
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

pin_project! {
#[project = JsonStreamProjection]
pub struct JsonStream<S, T> {
    #[pin]
    stream: S,
    buffer: Vec<u8>,
    item_ty: PhantomData<T>
}
}

impl<S, T> JsonStream<S, T> {
    pub fn new(stream: S) -> Self {
        Self {
            stream,
            buffer: Default::default(),
            item_ty: Default::default(),
        }
    }
}

impl<'a, S, T> JsonStreamProjection<'a, S, T>
where
    T: DeserializeOwned,
{
    pub fn next_item(&mut self) -> Option<Result<T>> {
        let mut deserializer =
            StreamDeserializer::<_, T>::new(SliceRead::new(self.buffer.as_ref()));
        match deserializer.next() {
            Some(Ok(item)) => {
                let read = deserializer.byte_offset();
                self.buffer.rotate_left(read);
                unsafe {
                    self.buffer.set_len(self.buffer.len() - read);
                }
                Some(Ok(item))
            }
            Some(Err(err)) => {
                if err.is_eof() {
                    None
                } else {
                    Some(Err(err.into()))
                }
            }
            None => None,
        }
    }
}

impl<S, B, E, T> Stream for JsonStream<S, T>
where
    S: Stream<Item = Result<B, E>>,
    B: AsRef<[u8]>,
    E: Into<Error>,
    T: DeserializeOwned,
{
    type Item = Result<T>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        if let Some(item) = this.next_item() {
            return Poll::Ready(Some(item));
        }
        while let Poll::Ready(chunk) = this.stream.as_mut().poll_next(cx) {
            match chunk {
                Some(Ok(chunk)) => {
                    this.buffer.extend_from_slice(chunk.as_ref());
                    if let Some(item) = this.next_item() {
                        return Poll::Ready(Some(item));
                    }
                }
                Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
                None => return Poll::Ready(None),
            }
        }
        Poll::Pending
    }
}