indexkv 0.7.3

A performance-focused, persistent, async, key-value store
Documentation
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use futures::stream::Stream;
use futures::ready;

use bincode::Options;
use byteorder::ByteOrder;
use byteorder::NativeEndian;
use bytes::buf::Buf;
use bytes::BytesMut;
use serde::de::DeserializeOwned;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;

use crate::Error;
use crate::Key;

// This is basically copy-and-pasted from async_bincode::AsyncBincodeReader; I reimplemented here to utilize fixint encoding.
/// Provides a stream of values for [Store::stream_values](crate::Store::stream_values)
pub struct ValueStream<R: AsyncRead, K: Key, T: DeserializeOwned> {
	reader: R,
	buffer: BytesMut,
	_key: PhantomData<K>,
	_val: PhantomData<T>
}

impl<R: AsyncRead, K: Key, T: DeserializeOwned> ValueStream<R, K, T> {
	pub(crate) fn new(reader: R) -> Self {
		Self{
			reader,
			buffer: BytesMut::with_capacity(8192),
			_key: PhantomData,
			_val: PhantomData
		}
	}
}

impl<R: AsyncRead, K: Key, T: DeserializeOwned> Unpin for ValueStream<R, K, T> where R: Unpin {}

impl<R: AsyncRead + Unpin, K: Key, T: DeserializeOwned> Stream for ValueStream<R, K, T> {
	type Item = Result<T, Error<K>>;

	fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
		if let FillResult::EOF = ready!(self.as_mut().fill(ctx, 5))? {
			return Poll::Ready(None);
		}

		let size = NativeEndian::read_u32(&self.buffer[..4]) as usize;
		ready!(self.as_mut().fill(ctx, size + 4)?);
		self.buffer.advance(4);

		let value = crate::codec().deserialize(&self.buffer[..size])?;
		self.buffer.advance(size);
		Poll::Ready(Some(Ok(value)))
	}
}

#[allow(clippy::upper_case_acronyms)]
enum FillResult {
	Filled,
	EOF
}

impl<R: AsyncRead + Unpin, K: Key, T: DeserializeOwned> ValueStream<R, K, T> {
	fn fill(mut self: Pin<&mut Self>, ctx: &mut Context, target_size: usize) -> Poll<Result<FillResult, std::io::Error>> {
		if(self.buffer.len() >= target_size) {
			return Poll::Ready(Ok(FillResult::Filled));
		}

		if(self.buffer.capacity() < target_size) {
			let missing = target_size - self.buffer.capacity();
			self.buffer.reserve(missing);
		}

		let existing_len = self.buffer.len();
		let mut rest = self.buffer.split_off(existing_len);
		// SAFETY:  We're not extending beyond the reserved (allocated)
		// capacity, and we're never reading uninitialized bytes - only ever
		// writing to them.
		unsafe { rest.set_len(rest.capacity()) };

		while self.buffer.len() < target_size {
			let mut buf = ReadBuf::new(&mut rest);
			ready!(Pin::new(&mut self.reader).poll_read(ctx, &mut buf))?;

			let filled = buf.filled().len();
			if(filled == 0) {
				if(self.buffer.is_empty()) {
					return Poll::Ready(Ok(FillResult::EOF));
				} else {
					return Poll::Ready(Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)));
				}
			}

			let read = rest.split_to(filled);
			self.buffer.unsplit(read);
		}

		Poll::Ready(Ok(FillResult::Filled))
	}
}