indexkv 0.8.0

A performance-focused, persistent, async, key-value store
Documentation
use core::marker::PhantomData;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::cell::RefCell;

use bitcode::Decode;
use byteorder::{ByteOrder, NativeEndian};
use bytes::buf::Buf;
use bytes::BytesMut;
use futures::ready;
use futures::stream::Stream;
use tokio::fs::File;
use tokio::io::{AsyncRead, ReadBuf};

use crate::{Error, Key};

// This is basically copy-and-pasted from async_bincode::AsyncBincodeReader; I reimplemented here to utilize bitcode.
/// Provides a stream of values for [Store::stream_values](crate::Store::stream_values)
pub struct ValueStream<K: Key, T: for<'a> Decode<'a>> {
	reader: File,
	buffer: BytesMut,
	decoder: RefCell<bitcode::Buffer>,
	_key: PhantomData<K>,
	_val: PhantomData<T>,
}

impl<K: Key, T: for<'a> Decode<'a>> ValueStream<K, T> {
	pub(crate) fn new(reader: File) -> Self {
		Self {
			reader,
			buffer: BytesMut::with_capacity(8192),
			decoder: RefCell::new(bitcode::Buffer::new()),
			_key: PhantomData,
			_val: PhantomData,
		}
	}
}

impl<K: Key, T: for<'a> Decode<'a>> Unpin for ValueStream<K, T> {}

impl<K: Key, T: for<'a> Decode<'a>> Stream for ValueStream<K, T> {
	type Item = Result<T, Error<K>>;

	fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
		if let FillResult::Eof = ready!(self.as_mut().fill(ctx, 4))? {
			return Poll::Ready(None);
		}

		let size = NativeEndian::read_u32(&self.buffer[..4]) as usize;
		ready!(self.as_mut().fill(ctx, size + 4)?);
		self.buffer.advance(4);

		let value = self.decoder.borrow_mut().decode(&self.buffer[..size])?;
		self.buffer.advance(size);
		Poll::Ready(Some(Ok(value)))
	}
}

#[derive(Debug)]
enum FillResult {
	Filled,
	Eof,
}

impl<K: Key, T: for<'a> Decode<'a>> ValueStream<K, T> {
	fn fill(mut self: Pin<&mut Self>, ctx: &mut Context, target_size: usize) -> Poll<Result<FillResult, std::io::Error>> {
		let existing_len = self.buffer.len();
		if (existing_len >= target_size) {
			return Poll::Ready(Ok(FillResult::Filled));
		}

		let missing = target_size - existing_len;
		if (self.buffer.capacity() < target_size) {
			self.buffer.reserve(missing);
		}

		let mut rest = self.buffer.split_off(existing_len);
		let mut buf = ReadBuf::uninit(rest.spare_capacity_mut());
		let result = Pin::new(&mut self.reader).poll_read(ctx, &mut buf);
		let filled = buf.filled().len();

		let result = match result {
			Poll::Pending => Poll::Pending,
			Poll::Ready(Ok(_)) if filled == 0 => return Poll::Ready(Ok(FillResult::Eof)),
			Poll::Ready(Ok(_)) if filled < missing => Poll::Pending,
			Poll::Ready(Ok(_)) => Poll::Ready(Ok(FillResult::Filled)),
			Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
		};
		self.buffer.unsplit(rest);
		// SAFETY:  The way poll_read() and ReadBuf work, we're only extending
		// this to the buffer size that we've actually filled by reading.
		unsafe {
			self.buffer.set_len(existing_len + filled);
		}

		result
	}
}