indexkv 0.7.5

A performance-focused, persistent, async, key-value store
Documentation
use core::cmp::Ordering;
use core::fmt::Debug;
use std::io::SeekFrom;

use bincode::{deserialize, Options};
use futures::stream::{TryStream, TryStreamExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Deserializer, Serialize};
use tokio::io::{AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio_byteorder::{AsyncReadBytesExt, NativeEndian};
use tracing::instrument;

use super::{Error, Header, Key, Store, StreamError, CURRENT_FORMAT_VERSION};

impl<K, T> Store<K, T>
where
	K: Key,
	T: Serialize + DeserializeOwned + Debug + Send + Sync,
{
	pub(crate) const HEADER_SIZE: usize = size_of::<Header>();
	const INDEX_ENTRY_SIZE: usize = size_of::<IndexEntry<K>>();

	#[inline]
	pub(crate) fn index_start_position(&self) -> u64 {
		self.header.index_start_pos
	}

	pub(crate) const fn data_start_position(&self) -> u64 {
		Self::HEADER_SIZE as u64
	}

	#[inline]
	pub(crate) async fn read_header(&mut self) -> Result<Option<Header>, Error<K>> {
		let mut buf = [0u8; size_of::<Header>()];
		self.file.seek(SeekFrom::Start(0)).await?;
		let read_count = self.file.read(&mut buf).await?;
		if (read_count != buf.len()) {
			return Ok(None);
		}
		Ok(Some(deserialize(&buf)?))
	}

	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
	pub(crate) async fn find_index_entry(&mut self, key: K) -> Result<Option<IndexEntry<K>>, Error<K>> {
		let mut size = self.header.pair_count;
		let mut left = 0;
		let mut right = size;
		while left < right {
			let mid = left + size / 2;
			let entry = self.get_index_entry(mid).await?;
			match key.cmp(&entry.key) {
				Ordering::Equal => return Ok(Some(entry)),
				Ordering::Greater => left = mid + 1,
				Ordering::Less => right = mid,
			};
			size = right - left;
		}
		Ok(None)
	}

	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
	pub(crate) async fn find_index_entries(&mut self, keys: &[K]) -> Result<Vec<(K, IndexEntry<K>)>, Error<K>> {
		let mut chunks = Vec::with_capacity(keys.len());
		chunks.push((keys, 0, self.header.pair_count - 1));
		let mut chunks_new = Vec::with_capacity(keys.len());
		let mut results = Vec::with_capacity(keys.len());
		while (!chunks.is_empty()) {
			chunks_new.clear();
			for (keys, left, right) in chunks.iter() {
				let left = *left;
				let right = *right;
				let mid = left + ((right - left) / 2);
				let entry = self.get_index_entry(mid).await?;
				let pivot = match keys.binary_search(&entry.key) {
					Ok(i) => {
						results.push((keys[i], entry));
						if (keys.len() == 1) {
							continue;
						}
						i
					}
					Err(i) => i,
				};
				if (left == right) {
					continue;
				}
				match pivot {
					0 => chunks_new.push((&keys[..], mid + 1, right)),
					i if i == keys.len() => chunks_new.push((&keys[..], left, mid)),
					i => {
						chunks_new.push((&keys[..=i], left, mid));
						if (mid < right) {
							chunks_new.push((&keys[i..], mid + 1, right));
						}
					}
				};
			}
			std::mem::swap(&mut chunks, &mut chunks_new);
		}
		Ok(results)
	}

	#[inline]
	pub(crate) async fn read_value_at(&mut self, position: u64) -> Result<T, Error<K>> {
		self.file.seek(SeekFrom::Start(position)).await?;
		match self.read_value().await? {
			Some(v) => Ok(v),
			None => Err(Error::MissingValue),
		}
	}

	// Order of operations:
	//   * Write down the header, with completed_writing = false
	//   * Seek to data_start_position
	//   * For each entry in pairs:
	//     * Compute the serialized size of this entry
	//     * Serialize the size and the entry itself to the buffer
	//     * Save the (key, position_in_file) pair as an IndexEntry
	//     * Write the buffer to the file
	//   * Save current file position (as index_start_pos) and number of entries to the header
	//   * Write the sorted index down to the file
	//   * Seek to the beginning of the file and write down the header, with completed_writing = true
	#[instrument(err, level = "info", skip(self, input_stream, writer, index_size_hint), fields(indexkv.path = %self.path.display()))]
	pub(crate) async fn write_to<S, W>(&mut self, mut input_stream: S, writer: &mut W, index_size_hint: usize) -> Result<(), StreamError<S::Error, K>>
	where
		S: TryStream<Ok = (K, T)> + Unpin + Send,
		W: AsyncWrite + AsyncSeek + Unpin + Send,
		S::Error: std::error::Error + Send,
	{
		let mut buf = vec![0u8; 1024];
		let encoder = codec();

		self.header.format_version = CURRENT_FORMAT_VERSION;
		self.header.completed_writing = false;
		encoder.serialize_into(&mut buf[0..], &self.header)?;
		writer.seek(SeekFrom::Start(0)).await?;
		writer.write_all(&buf[0..Self::HEADER_SIZE]).await?;
		writer.flush().await?;

		let mut pos = self.data_start_position();
		let mut index = Vec::with_capacity(index_size_hint);

		let mut size = 0u32;
		let size_of_size = encoder.serialized_size(&size)? as u32;
		writer.seek(SeekFrom::Start(pos)).await?;
		while let Some((key, value)) = input_stream.try_next().await.map_err(StreamError::from_external)? {
			// We're writing down the (size, value) tuple so that we can deserialize values later
			//    without having to do any sort of read-to-end scheme.  We'll know exactly how many
			//    bytes to allocate in our read buffer.
			size = encoder.serialized_size(&value)? as u32;
			encoder.serialize_into(&mut buf[0..], &size)?;
			if ((size + size_of_size) as usize > buf.len()) {
				buf.resize((size + size_of_size) as usize, 0);
			}
			encoder.serialize_into(&mut buf[size_of_size as usize..], &value)?;
			index.push(IndexEntry::new(key, pos));
			pos += writer.write(&buf[0..(size + size_of_size) as usize]).await? as u64;
		}

		self.header.index_start_pos = pos;
		self.header.pair_count = index.len() as u64;
		index.sort_unstable();

		for entry in index.into_iter() {
			encoder.serialize_into(&mut buf[0..], &entry)?;
			writer.write_all(&buf[0..Self::INDEX_ENTRY_SIZE]).await?;
		}

		self.header.completed_writing = true;
		encoder.serialize_into(&mut buf[0..], &self.header)?;
		writer.seek(SeekFrom::Start(0)).await?;
		writer.write_all(&buf[0..Self::HEADER_SIZE]).await?;

		writer.flush().await?;
		Ok(())
	}

	#[inline]
	async fn read_index_entry(&mut self) -> Result<IndexEntry<K>, Error<K>> {
		// This will fail if size_of::<K>() > 128
		let mut buf = [0u8; 192];
		let count = self.file.read(&mut buf).await?;
		Ok(deserialize(&buf[..count])?)
	}

	#[inline]
	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "trace", skip(self), fields(indexkv.path = %self.path.display())))]
	async fn get_index_entry(&mut self, i: u64) -> Result<IndexEntry<K>, Error<K>> {
		let pos = self.index_start_position() + (i * Self::INDEX_ENTRY_SIZE as u64);
		self.file.seek(SeekFrom::Start(pos)).await?;
		let entry = self.read_index_entry().await?;
		Ok(entry)
	}

	#[inline]
	async fn read_value(&mut self) -> Result<Option<T>, Error<K>> {
		let size = match AsyncReadBytesExt::read_u32::<NativeEndian>(&mut self.file).await {
			Ok(v) => v as usize,
			Err(e) => match e.kind() {
				std::io::ErrorKind::UnexpectedEof => return Ok(None),
				_ => return Err(e.into()),
			},
		};
		let mut buf = vec![0u8; size];
		match self.file.read_exact(&mut buf).await {
			Ok(read_count) => {
				if (read_count < size) {
					return Ok(None);
				}
			}
			Err(e) => match e.kind() {
				std::io::ErrorKind::UnexpectedEof => return Ok(None),
				_ => return Err(e.into()),
			},
		};
		Ok(Some(deserialize(&buf)?))
	}
}

pub(crate) fn codec() -> impl bincode::Options + Copy {
	bincode::DefaultOptions::new()
		.with_limit(u32::MAX as u64)
		.allow_trailing_bytes()
		.with_fixint_encoding()
		.with_native_endian()
}

#[derive(Debug, Eq, PartialEq, Serialize)]
pub(crate) struct IndexEntry<K: Key> {
	pub(crate) key: K,
	/// absolute position in the file
	pub(crate) position: u64,
}

impl<K: Key> IndexEntry<K> {
	fn new(key: K, position: u64) -> Self {
		Self { key, position }
	}
}

impl<K: Key> Ord for IndexEntry<K> {
	#[inline]
	fn cmp(&self, other: &Self) -> Ordering {
		self.key.cmp(&other.key)
	}
}

impl<K: Key> PartialOrd for IndexEntry<K> {
	#[inline]
	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
		Some(self.cmp(other))
	}
}

impl<'de, K: Key> Deserialize<'de> for IndexEntry<K> {
	#[inline]
	fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
		let (key, position) = <(K, u64)>::deserialize(deserializer)?;
		Ok(Self { key, position })
	}
}