use core::marker::PhantomData;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::cell::RefCell;
use bitcode::Decode;
use byteorder::{ByteOrder, NativeEndian};
use bytes::buf::Buf;
use bytes::BytesMut;
use futures::ready;
use futures::stream::Stream;
use tokio::fs::File;
use tokio::io::{AsyncRead, ReadBuf};
use crate::{Error, Key};
pub struct ValueStream<K: Key, T: for<'a> Decode<'a>> {
reader: File,
buffer: BytesMut,
decoder: RefCell<bitcode::Buffer>,
_key: PhantomData<K>,
_val: PhantomData<T>,
}
impl<K: Key, T: for<'a> Decode<'a>> ValueStream<K, T> {
pub(crate) fn new(reader: File) -> Self {
Self {
reader,
buffer: BytesMut::with_capacity(8192),
decoder: RefCell::new(bitcode::Buffer::new()),
_key: PhantomData,
_val: PhantomData,
}
}
}
impl<K: Key, T: for<'a> Decode<'a>> Unpin for ValueStream<K, T> {}
impl<K: Key, T: for<'a> Decode<'a>> Stream for ValueStream<K, T> {
type Item = Result<T, Error<K>>;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if let FillResult::Eof = ready!(self.as_mut().fill(ctx, 4))? {
return Poll::Ready(None);
}
let size = NativeEndian::read_u32(&self.buffer[..4]) as usize;
ready!(self.as_mut().fill(ctx, size + 4)?);
self.buffer.advance(4);
let value = self.decoder.borrow_mut().decode(&self.buffer[..size])?;
self.buffer.advance(size);
Poll::Ready(Some(Ok(value)))
}
}
#[derive(Debug)]
enum FillResult {
Filled,
Eof,
}
impl<K: Key, T: for<'a> Decode<'a>> ValueStream<K, T> {
fn fill(mut self: Pin<&mut Self>, ctx: &mut Context, target_size: usize) -> Poll<Result<FillResult, std::io::Error>> {
let existing_len = self.buffer.len();
if (existing_len >= target_size) {
return Poll::Ready(Ok(FillResult::Filled));
}
let missing = target_size - existing_len;
if (self.buffer.capacity() < target_size) {
self.buffer.reserve(missing);
}
let mut rest = self.buffer.split_off(existing_len);
let mut buf = ReadBuf::uninit(rest.spare_capacity_mut());
let result = Pin::new(&mut self.reader).poll_read(ctx, &mut buf);
let filled = buf.filled().len();
let result = match result {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) if filled == 0 => return Poll::Ready(Ok(FillResult::Eof)),
Poll::Ready(Ok(_)) if filled < missing => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(FillResult::Filled)),
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
};
self.buffer.unsplit(rest);
unsafe {
self.buffer.set_len(existing_len + filled);
}
result
}
}