use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use futures::stream::Stream;
use futures::ready;
use bincode::Options;
use byteorder::ByteOrder;
use byteorder::NativeEndian;
use bytes::buf::Buf;
use bytes::BytesMut;
use serde::de::DeserializeOwned;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;
use crate::Error;
use crate::Key;
pub struct ValueStream<R: AsyncRead, K: Key, T: DeserializeOwned> {
reader: R,
buffer: BytesMut,
_key: PhantomData<K>,
_val: PhantomData<T>
}
impl<R: AsyncRead, K: Key, T: DeserializeOwned> ValueStream<R, K, T> {
pub(crate) fn new(reader: R) -> Self {
Self{
reader,
buffer: BytesMut::with_capacity(8192),
_key: PhantomData,
_val: PhantomData
}
}
}
impl<R: AsyncRead, K: Key, T: DeserializeOwned> Unpin for ValueStream<R, K, T> where R: Unpin {}
impl<R: AsyncRead + Unpin, K: Key, T: DeserializeOwned> Stream for ValueStream<R, K, T> {
type Item = Result<T, Error<K>>;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if let FillResult::EOF = ready!(self.as_mut().fill(ctx, 5))? {
return Poll::Ready(None);
}
let size = NativeEndian::read_u32(&self.buffer[..4]) as usize;
ready!(self.as_mut().fill(ctx, size + 4)?);
self.buffer.advance(4);
let value = crate::codec().deserialize(&self.buffer[..size])?;
self.buffer.advance(size);
Poll::Ready(Some(Ok(value)))
}
}
#[allow(clippy::upper_case_acronyms)]
enum FillResult {
Filled,
EOF
}
impl<R: AsyncRead + Unpin, K: Key, T: DeserializeOwned> ValueStream<R, K, T> {
fn fill(mut self: Pin<&mut Self>, ctx: &mut Context, target_size: usize) -> Poll<Result<FillResult, std::io::Error>> {
if(self.buffer.len() >= target_size) {
return Poll::Ready(Ok(FillResult::Filled));
}
if(self.buffer.capacity() < target_size) {
let missing = target_size - self.buffer.capacity();
self.buffer.reserve(missing);
}
let existing_len = self.buffer.len();
let mut rest = self.buffer.split_off(existing_len);
unsafe { rest.set_len(rest.capacity()) };
while self.buffer.len() < target_size {
let mut buf = ReadBuf::new(&mut rest);
ready!(Pin::new(&mut self.reader).poll_read(ctx, &mut buf))?;
let filled = buf.filled().len();
if(filled == 0) {
if(self.buffer.is_empty()) {
return Poll::Ready(Ok(FillResult::EOF));
} else {
return Poll::Ready(Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)));
}
}
let read = rest.split_to(filled);
self.buffer.unsplit(read);
}
Poll::Ready(Ok(FillResult::Filled))
}
}