indexkv 0.7.3

A performance-focused, persistent, async, key-value store
Documentation
#![allow(unused_parens)]
#![allow(clippy::tabs_in_doc_comments)]
#![warn(clippy::future_not_send)]

//! # indexkv
//!
//! Provides [Store], an on-disk storage bucket.  Conceptually, this is not dissimilar to a `map<u64, T>`
//! that provides fast (by persistent storage standards) key lookups, low memory complexity, and
//! fast sequential writes at the expense of random writes; random writes are unsupported.  See
//! [Store::write] for details on writes.
//!
//! While reading concurrently is safe, reading while a write is ongoing is a footgun.
//!
//! #### Examples
//!
//! Automatically keying values with [Iterator::enumerate]:
//! ```
//! use indexkv::Store;
//!
//! #[tokio::main]
//! async fn main() {
//!		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
//!		let mut store: Store<u8, String> = Store::new(path).await.unwrap();
//!
//!		let stream = futures::stream::iter(
//!			["zero", "one", "two", "three", "four", "five"]
//!				.into_iter()
//!				.enumerate()
//!				.map(|(i, v)| (i as u8, v.to_string()))
//!		);
//!		store.write_infallible(stream, 0).await.unwrap();
//!
//!		let result = store.get(2).await.unwrap();
//!		assert_eq!(result, "two");
//!		let result = store.get_many(&[0, 4]).await.unwrap();
//!		assert_eq!(result.len(), 2);
//!		assert_eq!(result.get(&0), Some(&"zero".to_string()));
//!		assert_eq!(result.get(&2), None);
//!		assert_eq!(result.get(&4), Some(&"four".to_string()));
//! }
//! ```
//!
//! Manually keying values with an external "ID":
//! ```
//! use indexkv::{Error, Store};
//!
//! #[tokio::main]
//! async fn main() {
//!		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
//!		let mut store: Store<u16, String> = Store::new(path).await.unwrap();
//!
//!		let stream = futures::stream::iter([
//!			(200, "two hundred".to_string()),
//!			(1337, "thirteen hundred thirty-seven".to_string()),
//!			(75, "seventy-five".to_string()),
//!			(20, "twenty".to_string())
//!		]);
//!		store.write_infallible(stream, 4).await.unwrap();
//!
//!		let result = store.get(2).await;
//!		assert!(matches!(result, Err(Error::NotFound(2))));
//!		let result = store.get(75).await.unwrap();
//!		assert_eq!(result, "seventy-five".to_string());
//! }
//! ```

use std::cmp::Ordering;
use std::collections::HashMap;
use std::hash::Hash;
use std::io::SeekFrom;
use std::mem::size_of;
use std::path::Path;
use std::path::PathBuf;

use bincode::deserialize;
use bincode::Options;

use tokio_byteorder::AsyncReadBytesExt;
use tokio_byteorder::NativeEndian;

use futures::stream::Stream;
use futures::stream::StreamExt;
use futures::stream::TryStream;
use futures::stream::TryStreamExt;

use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;

use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeek;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::io::BufStream;
use tokio::io::BufWriter;

use tracing::instrument;

mod error;
pub use error::Error;
pub use error::StreamError;
pub mod stream;

pub trait Key: Copy + Eq + Hash + Ord + std::fmt::Debug + std::fmt::Display + for<'de> Deserialize<'de> + Serialize + Send + Sync {}
impl<T> Key for T where T: Copy + Eq + Hash + Ord + std::fmt::Debug + std::fmt::Display + for<'de> Deserialize<'de> + Serialize + Send + Sync {}

const CURRENT_FORMAT_VERSION: u16 = 2;

fn codec() -> impl bincode::Options + Copy /* {{{ */ {
	bincode::DefaultOptions::new()
		.with_limit(u32::max_value() as u64)
		.allow_trailing_bytes()
		.with_fixint_encoding()
		.with_native_endian()
} // }}}

#[derive(Debug, Eq, PartialEq, Serialize)]
pub struct IndexEntry<K: Key> {
	key: K,
	/// absolute position in the file
	position: u64
}

impl<K: Key> IndexEntry<K> {
	fn new(key: K, position: u64) -> Self /* {{{ */ {
		Self{
			key,
			position
		}
	} // }}}
}

impl<K: Key> Ord for IndexEntry<K> /* {{{ */ {
	#[inline]
	fn cmp(&self, other: &Self) -> Ordering {
		self.key.cmp(&other.key)
	}
} // }}}

impl<K: Key> PartialOrd for IndexEntry<K> /* {{{ */ {
	#[inline]
	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
		Some(self.cmp(other))
	}
} // }}}

impl<'de, K: Key> Deserialize<'de> for IndexEntry<K> /* {{{ */ {
	#[inline]
	fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
		let (key, position) = <(K, u64)>::deserialize(deserializer)?;
		Ok(Self{
			key, position
		})
	}
} // }}}

#[derive(Debug, Deserialize, Serialize)]
struct Header {
	format_version: u16,
	completed_writing: bool,
	pair_count: u64,
	/// absolute position in the file
	index_start_pos: u64
}

pub struct Store<K, T> /* {{{ */
where
	K: Key,
	T: Serialize + DeserializeOwned
{
	path: PathBuf,
	file: BufStream<File>,
	header: Header,
	index_cache: HashMap<K, u64, fnv::FnvBuildHasher>,
	_nothing: std::marker::PhantomData<T>
} // }}}

impl<K, T> Store<K, T>
where
	K: Key,
	T: Serialize + DeserializeOwned + std::fmt::Debug + Send + Sync,
{
	const HEADER_SIZE: usize = size_of::<Header>();
	const INDEX_ENTRY_SIZE: usize = size_of::<IndexEntry<K>>();

	/// Allocates a new Store containing no values.  Opens a file descriptor to `path`.
	pub async fn new(path: impl AsRef<Path> + Send) -> Result<Self, Error<K>> /* {{{ */ {
		let path = path.as_ref().to_path_buf();
		let fd = tokio::fs::OpenOptions::new()
			.read(true)
			.write(true)
			.create(true)
			.append(false)
			.open(&path)
			.await?;
		let mut store = Self{
			path,
			file: BufStream::new(fd),
			header: Header{
				format_version: 0,
				completed_writing: false,
				pair_count: 0,
				index_start_pos: Self::HEADER_SIZE as u64
			},
			index_cache: HashMap::with_hasher(fnv::FnvBuildHasher::default()),
			_nothing: std::marker::PhantomData
		};
		if let Some(header) = store.read_header().await? {
			store.header = header;
		}
		Ok(store)
	} // }}}

	#[inline]
	fn index_start_position(&self) -> u64 /* {{{ */ {
		self.header.index_start_pos
	} // }}}

	const fn data_start_position(&self) -> u64 /* {{{ */ {
		Self::HEADER_SIZE as u64
	} // }}}

	#[inline]
	async fn read_header(&mut self) -> Result<Option<Header>, Error<K>> /* {{{ */ {
		let mut buf = [0u8; size_of::<Header>()];
		self.file.seek(SeekFrom::Start(0)).await?;
		let read_count = self.file.read(&mut buf).await?;
		if(read_count != buf.len()) {
			return Ok(None);
		}
		Ok(Some(deserialize(&buf)?))
	} // }}}

	#[inline]
	async fn read_index_entry(&mut self) -> Result<IndexEntry<K>, Error<K>> /* {{{ */ {
		// This will fail if size_of::<K>() > 128
		let mut buf = [0u8; 192];
		let count = self.file.read(&mut buf).await?;
		Ok(deserialize(&buf[..count])?)
	} // }}}

	#[inline]
	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "trace", skip(self), fields(indexkv.path = %self.path.display())))]
	async fn get_index_entry(&mut self, i: u64) -> Result<IndexEntry<K>, Error<K>> /* {{{ */ {
		let pos = self.index_start_position() + (i * Self::INDEX_ENTRY_SIZE as u64);
		self.file.seek(SeekFrom::Start(pos)).await?;
		let entry = self.read_index_entry().await?;
		Ok(entry)
	} // }}}

	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
	async fn find_index_entry(&mut self, key: K) -> Result<Option<IndexEntry<K>>, Error<K>> /* {{{ */ {
		let mut size = self.header.pair_count;
		let mut left = 0;
		let mut right = size;
		while left < right {
			let mid = left + size / 2;
			let entry = self.get_index_entry(mid).await?;
			match key.cmp(&entry.key) {
				Ordering::Equal => return Ok(Some(entry)),
				Ordering::Greater => left = mid + 1,
				Ordering::Less => right = mid
			};
			size = right - left;
		}
		Ok(None)
	} // }}}

	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
	async fn find_index_entries(&mut self, keys: &[K]) -> Result<Vec<(K, IndexEntry<K>)>, Error<K>> {
		let mut chunks = Vec::with_capacity(keys.len());
		chunks.push((keys, 0, self.header.pair_count - 1));
		let mut chunks_new = Vec::with_capacity(keys.len());
		let mut results = Vec::with_capacity(keys.len());
		while(!chunks.is_empty()) {
			chunks_new.clear();
			for (keys, left, right) in chunks.iter() {
				let left = *left;
				let right = *right;
				let mid = left + ((right - left) / 2);
				let entry = self.get_index_entry(mid).await?;
				let pivot = match keys.binary_search(&entry.key) {
					Ok(i) => {
						results.push((keys[i], entry));
						if(keys.len() == 1) {
							continue;
						}
						i
					},
					Err(i) => i
				};
				if(left == right) {
					continue;
				}
				match pivot {
					0 => chunks_new.push((&keys[..], mid + 1, right)),
					i if i == keys.len() => chunks_new.push((&keys[..], left, mid)),
					i => {
						chunks_new.push((&keys[..=i], left, mid));
						if(mid < right) {
							chunks_new.push((&keys[i..], mid + 1, right));
						}
					}
				};
			}
			std::mem::swap(&mut chunks, &mut chunks_new);
		}
		Ok(results)
	}

	#[inline]
	async fn read_value(&mut self) -> Result<Option<T>, Error<K>> /* {{{ */ {
		let size = match AsyncReadBytesExt::read_u32::<NativeEndian>(&mut self.file).await {
			Ok(v) => v as usize,
			Err(e) => match e.kind() {
				std::io::ErrorKind::UnexpectedEof => return Ok(None),
				_ => return Err(e.into())
			}
		};
		let mut buf = vec![0u8; size];
		match self.file.read_exact(&mut buf).await {
			Ok(read_count) => if(read_count < size) {
				return Ok(None);
			},
			Err(e) => match e.kind() {
				std::io::ErrorKind::UnexpectedEof => return Ok(None),
				_ => return Err(e.into())
			}
		};
		Ok(Some(deserialize(&buf)?))
	} // }}}

	#[inline]
	async fn read_value_at(&mut self, position: u64) -> Result::<T, Error<K>> /* {{{ */ {
		self.file.seek(SeekFrom::Start(position)).await?;
		match self.read_value().await? {
			Some(v) => Ok(v),
			None => Err(Error::MissingValue)
		}
	} // }}}

	/// Reads the header and checks to see whether or not the store is valid; at this time, this
	/// means making sure the format is fully up-to-date and writing completed.
	#[inline]
	pub async fn is_valid(&mut self) -> Result<bool, Error<K>> /* {{{ */ {
		Ok(
			self.header.format_version == CURRENT_FORMAT_VERSION &&
			self.header.completed_writing
		)
	} // }}}

	/// Provides a [sequential stream](stream::ValueStream) of all values in the Store.  In
	/// relational database parlance, this would be a tablescan.
	// TODO:  Right now, nothing stops you from starting a stream then writing
	//    to the Store.  This will, naturally, ruin everything.
	#[instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display()))]
	#[inline]
	pub async fn stream_values(&self) -> Result<impl Stream<Item=Result<T, Error<K>>>, Error<K>> /* {{{ */ {
		let start_pos = self.data_start_position();
		let pair_count = self.header.pair_count as usize;
		let mut file = tokio::fs::OpenOptions::new()
			.read(true)
			.write(false)
			.create(false)
			.append(false)
			.open(&self.path)
			.await?;
		file.seek(SeekFrom::Start(start_pos)).await?;
		let stream = stream::ValueStream::new(file).take(pair_count);
		Ok(stream)
	} // }}}

	/// Looks up a single value by key.  Will attempt to cache data positions in the backing file
	/// to minimize disk seeking.  Returns [Error::NotFound] when `key` does not exist in the
	/// Store.
	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
	#[inline]
	pub async fn get(&mut self, key: K) -> Result<T, Error<K>> /* {{{ */ {
		let position = self.index_cache.get(&key);
		let position = match position {
			Some(v) => *v,
			None => {
				let index = match self.find_index_entry(key).await? {
					Some(v) => v,
					None => return Err(Error::NotFound(key))
				};
				self.index_cache.insert(key, index.position);
				index.position
			}
		};
		self.read_value_at(position).await
	} // }}}

	/// Looks up multiple values by key.  Will find (uncached) keys in sorted order to minimize
	/// disk seeking back and forth.  Does _not_ return an error when `key` does not exist; calling
	/// code should check for `Some` vs `None` in the resulting `HashMap` for fallibility.
	#[instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display()))]
	pub async fn get_many(&mut self, keys_in: &[K]) -> Result<HashMap<K, T, fnv::FnvBuildHasher>, Error<K>> /* {{{ */ {
		let mut results = HashMap::with_capacity_and_hasher(keys_in.len(), fnv::FnvBuildHasher::default());
		let mut keys = Vec::with_capacity(keys_in.len());
		for key in keys_in {
			if let Some(&position) = self.index_cache.get(key) {
				let value = self.read_value_at(position).await?;
				results.insert(*key, value);
			} else {
				keys.push(*key);
			}
		}

		keys.sort_unstable();
		if(keys.len() == 1) {
			let key = keys[0];
			if let Some(index) = self.find_index_entry(key).await? {
				self.index_cache.insert(key, index.position);
				results.insert(key, self.read_value_at(index.position).await?);
			}
		} else {
			let found = self.find_index_entries(&keys).await?;
			for (key, index) in found.into_iter() {
				self.index_cache.insert(key, index.position);
				results.insert(key, self.read_value_at(index.position).await?);
			}
		}
		Ok(results)
	} // }}}

	// Order of operations:
	//   * Write down the header, with completed_writing = false
	//   * Seek to data_start_position
	//   * For each entry in pairs:
	//     * Compute the serialized size of this entry
	//     * Serialize the size and the entry itself to the buffer
	//     * Save the (key, position_in_file) pair as an IndexEntry
	//     * Write the buffer to the file
	//   * Save current file position (as index_start_pos) and number of entries to the header
	//   * Write the sorted index down to the file
	//   * Seek to the beginning of the file and write down the header, with completed_writing = true
	#[instrument(err, level = "info", skip(self, input_stream, writer, index_size_hint), fields(indexkv.path = %self.path.display()))]
	async fn write_to<S, W>(&mut self, mut input_stream: S, writer: &mut W, index_size_hint: usize) -> Result<(), StreamError<S::Error, K>> /* {{{ */
	where
		S: TryStream<Ok=(K, T)> + Unpin + Send,
		W: AsyncWrite + AsyncSeek + Unpin + Send,
		S::Error: std::error::Error + Send
	{
		let mut buf = vec![0u8; 1024];
		let encoder = codec();

		self.header.format_version = CURRENT_FORMAT_VERSION;
		self.header.completed_writing = false;
		encoder.serialize_into(&mut buf[0..], &self.header)?;
		writer.seek(SeekFrom::Start(0)).await?;
		writer.write_all(&buf[0..Self::HEADER_SIZE]).await?;
		writer.flush().await?;

		let mut pos = self.data_start_position();
		let mut index = Vec::with_capacity(index_size_hint);

		let mut size = 0u32;
		let size_of_size = encoder.serialized_size(&size)? as u32;
		writer.seek(SeekFrom::Start(pos)).await?;
		while let Some((key, value)) = input_stream.try_next().await.map_err(StreamError::from_external)? {
			// We're writing down the (size, value) tuple so that we can deserialize values later
			//    without having to do any sort of read-to-end scheme.  We'll know exactly how many
			//    bytes to allocate in our read buffer.
			size = encoder.serialized_size(&value)? as u32;
			encoder.serialize_into(&mut buf[0..], &size)?;
			if((size + size_of_size) as usize > buf.len()) {
				buf.resize((size + size_of_size) as usize, 0);
			}
			encoder.serialize_into(&mut buf[size_of_size as usize..], &value)?;
			index.push(IndexEntry::new(key, pos));
			pos += writer.write(&buf[0..(size + size_of_size) as usize]).await? as u64;
		}

		self.header.index_start_pos = pos;
		self.header.pair_count = index.len() as u64;
		index.sort_unstable();

		for entry in index.into_iter() {
			encoder.serialize_into(&mut buf[0..], &entry)?;
			writer.write_all(&buf[0..Self::INDEX_ENTRY_SIZE]).await?;
		}

		self.header.completed_writing = true;
		encoder.serialize_into(&mut buf[0..], &self.header)?;
		writer.seek(SeekFrom::Start(0)).await?;
		writer.write_all(&buf[0..Self::HEADER_SIZE]).await?;

		writer.flush().await?;
		Ok(())
	} // }}}

	/// Consumes a TryStream of (key, value) pairs.  Sequentially writes all the values to disk,
	/// noting their positions, then goes back and sequentially writes down the sorted index.
	///
	/// Making effective use of this error handling mechanism is complex; an example is included
	/// below.
	///
	/// The Error type is usually inferrable.  A recommended pattern is to have it be your crate's
	/// Error type, or [anyhow::Error](https://docs.rs/anyhow/1/anyhow/struct.Error.html).
	///
	/// `index_size_hint` is used to preallocate the in-memory index that is eventually written to
	/// disk, which is strictly a performance optimization.  If it's unknown, passing a 0 is fine.
	///
	/// Example:
	/// ```
	/// use std::io;
	/// use futures::TryStreamExt;
	/// use indexkv::{Store, StreamError};
	///
	/// #[derive(Debug, thiserror::Error)]
	/// enum MyError {
	///		#[error("I/O error")]
	///		IO(#[from] io::Error),
	///		#[error("cache error")]
	///		Cache(#[from] indexkv::Error<u64>)
	/// }
	///
	/// impl From<StreamError<MyError, u64>> for MyError {
	///		fn from(other: StreamError<MyError, u64>) -> Self {
	///			match other {
	///				StreamError::Caller(e) => e,
	///				StreamError::Internal(e) => MyError::from(e)
	///			}
	///		}
	/// }
	///
	/// async fn write() -> Result<(), MyError> {
	///		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
	///		let mut store: Store<u64, String> = Store::new(path).await.unwrap();
	///
	///		let stream = futures::stream::iter(
	///			vec![Ok("zero"), Ok("one"), Err(io::Error::from(io::ErrorKind::BrokenPipe)), Ok("two")]
	///				.into_iter()
	///				.enumerate()
	///				.map(|(i, r)| match r {
	///					Ok(v) => Ok((i as u64, v.to_string())),
	///					Err(e) => Err(e)
	///				})
	///		).err_into::<MyError>();
	///		store.write(stream, 0).await?;
	///		Ok(())
	/// }
	///
	/// #[tokio::main]
	/// async fn main() {
	///		let result = write().await;
	///		assert!(matches!(
	///			result,
	///			Err(MyError::IO(_))
	///		));
	///	}
	/// ```
	#[inline]
	pub async fn write<S>(&mut self, input_stream: S, index_size_hint: usize) -> Result<(), StreamError<S::Error, K>> /* {{{ */
	where
		S: TryStream<Ok=(K, T)> + Unpin + Send,
		S::Error: std::error::Error + Send
	{
		self.index_cache.clear();
		let mut writer = BufWriter::new(self.file.get_ref().try_clone().await?);
		self.write_to(input_stream, &mut writer, index_size_hint).await
	} // }}}

	/// For cases where the incoming data stream is infallible, this allows you to avoid that
	/// portion of error handling.
	#[inline]
	pub async fn write_infallible<S>(&mut self, input_stream: S, index_size_hint: usize) -> Result<(), Error<K>> /* {{{ */
	where
		S: Stream<Item=(K, T)> + Unpin + Send
	{
		self.write(input_stream.map(Result::<_, Error<K>>::Ok), index_size_hint).await.map_err(|e| match e {
			StreamError::Internal(e) => e,
			_ => unreachable!()
		})
	} // }}}
}

#[cfg(test)]
mod tests {
	use futures::stream;
	use rand::prelude::SliceRandom;
	use crate::*;

	// FixedWidth {{{
	#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
	struct FixedWidth {
		a: u8,
		b: i16,
		c: u32,
		d: u64
	}
	impl FixedWidth {
		fn new(i: u8) -> Self {
			Self{a: i, b: i as i16 * 2, c: i as u32 * 4, d: i as u64 * 8}
		}
		fn new2(i: u8) -> Self {
			Self{a: 255 - i, b: -(i as i16) * 2, c: i as u32 * 11, d: i as u64 * 17}
		}
	}
	// }}}

	#[tokio::test]
	async fn constant_u8_1() /* {{{ */ {
		let values = vec![42u8];
		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u8, u8> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as u8, v) })), 0).await.unwrap();

		let result = store.get(0).await.unwrap();
		assert_eq!(result, 42u8);
	} // }}}

	#[tokio::test]
	async fn constant_u8_2() /* {{{ */ {
		let values = vec![11u8, 32u8];
		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<i16, u8> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as i16, v) })), 0).await.unwrap();

		let result = store.get(0).await.unwrap();
		assert_eq!(result, 11u8);
		let result = store.get(1).await.unwrap();
		assert_eq!(result, 32u8);
	} // }}}

	#[tokio::test]
	async fn constant_u8_3() /* {{{ */ {
		let values = vec![47u8, 53u8, 128u8];
		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u32, u8> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as u32, v) })), 0).await.unwrap();

		let result = store.get(0).await.unwrap();
		assert_eq!(result, 47u8);
		let result = store.get(1).await.unwrap();
		assert_eq!(result, 53u8);
		let result = store.get(2).await.unwrap();
		assert_eq!(result, 128u8);
	} // }}}

	#[tokio::test]
	async fn constant_u16_1() /* {{{ */ {
		let values = vec![42u16];
		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<i128, u16> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as i128, v) })), 0).await.unwrap();

		let result = store.get(0).await.unwrap();
		assert_eq!(result, 42u16);
	} // }}}

	#[tokio::test]
	async fn constant_u16_2() /* {{{ */ {
		let values = vec![11u16, 32u16];
		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<i32, u16> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as i32, v) })), 0).await.unwrap();

		let result = store.get(0).await.unwrap();
		assert_eq!(result, 11u16);
		let result = store.get(1).await.unwrap();
		assert_eq!(result, 32u16);
	} // }}}

	#[tokio::test]
	async fn fixedwidth() /* {{{ */ {
		let mut values = (0..255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
		values.extend((0..255_u8).map(|i| FixedWidth::new2(i)));

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u16, FixedWidth> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u16, v))), 512).await.unwrap();

		let mut rng = rand::thread_rng();
		let mut indices = (0..values.len() as u16).collect::<Vec<u16>>();
		for _ in 1..16 {
			indices.shuffle(&mut rng);
			for i in indices.iter() {
				let result = store.get(*i).await.unwrap();
				assert_eq!(result, values[*i as usize]);
			}
		}
	} // }}}

	#[tokio::test]
	async fn strings() /* {{{ */ {
		// setup {{{
		#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
		struct Struct {
			number: u8,
			name: String,
			maybe_name: Option<String>
		}
		impl Struct {
			fn new(i: u8) -> Self {
				Self{
					number: i,
					name: english_numbers::convert_all_fmt(i as i64),
					maybe_name: match i % 2 {
						0 => Some(english_numbers::convert_long(i as i64, english_numbers::Formatting::none())),
						_ => None
					}
				}
			}
		}
		let values = (0..255_u8).map(|i| Struct::new(i)).collect::<Vec<_>>();
		// }}}

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<i64, Struct> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as i64, v))), 256).await.unwrap();

		let mut rng = rand::thread_rng();
		let mut indices = (0..values.len() as i64).collect::<Vec<i64>>();
		for _ in 1..16 {
			indices.shuffle(&mut rng);
			for i in indices.iter() {
				let result = store.get(*i).await.unwrap();
				assert_eq!(result, values[*i as usize]);
			}
		}
	} // }}}

	#[tokio::test]
	async fn strings_reopen() /* {{{ */ {
		// setup {{{
		#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
		struct Struct {
			number: u8,
			name: String,
			maybe_name: Option<String>
		}
		impl Struct {
			fn new(i: u8) -> Self {
				Self{
					number: i,
					name: english_numbers::convert_all_fmt(i as i64),
					maybe_name: match i % 2 {
						0 => Some(english_numbers::convert_long(i as i64, english_numbers::Formatting::none())),
						_ => None
					}
				}
			}
		}
		let values = (0..255_u8).map(|i| Struct::new(i)).collect::<Vec<_>>();
		// }}}

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		for _ in 1..4 {
			let mut store: Store<u128, Struct> = Store::new(&path).await.unwrap();
			store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u128, v))), 256).await.unwrap();

			let mut rng = rand::thread_rng();
			let mut indices = (0..values.len() as u128).collect::<Vec<u128>>();
			for _ in 1..16 {
				indices.shuffle(&mut rng);
				for i in indices.iter() {
					let result = store.get(*i).await.unwrap();
					assert_eq!(result, values[*i as usize]);
				}
			}
		}
	} // }}}

	#[tokio::test]
	async fn smallstream() /* {{{ */ {
		let values = (0..5_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();

		let path = tempfile::NamedTempFile::new().unwrap().keep().unwrap().1;
		let mut store: Store<i8, FixedWidth> = Store::new(&path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as i8, v))), 0).await.unwrap();

		let mut stream = store.stream_values().await.unwrap().enumerate();
		while let Some((i, result)) = stream.next().await {
			let value = result.unwrap();
			assert_eq!(value, values[i]);
		}

		assert_eq!(store.get(0).await.unwrap(), values[0]);
		assert_eq!(store.get(1).await.unwrap(), values[1]);

		tokio::fs::remove_file(path).await.unwrap();
	} // }}}

	#[tokio::test]
	async fn stream() /* {{{ */ {
		let mut values = (0..255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
		values.extend((0..255_u8).map(|i| FixedWidth::new2(i)));

		let path = tempfile::NamedTempFile::new().unwrap().keep().unwrap().1;
		let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 512).await.unwrap();

		let mut stream = store.stream_values().await.unwrap().enumerate();
		while let Some((i, result)) = stream.next().await {
			let value = result.unwrap();
			assert_eq!(value, values[i]);
		}

		assert_eq!(store.get(0).await.unwrap(), values[0]);
		assert_eq!(store.get(1).await.unwrap(), values[1]);

		tokio::fs::remove_file(path).await.unwrap();
	} // }}}

	#[tokio::test]
	async fn stream_reopen() /* {{{ */ {
		let mut values = (0..255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
		values.extend((0..255_u8).map(|i| FixedWidth::new2(i)));

		let path = tempfile::NamedTempFile::new().unwrap().keep().unwrap().1;
		let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 512).await.unwrap();
		drop(store);
		for iteration in 1..4 {
			println!(">>> 1-{}", iteration);
			let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();

			let mut stream = store.stream_values().await.unwrap().enumerate();
			while let Some((i, result)) = stream.next().await {
				let value = result.unwrap();
				assert_eq!(value, values[i]);
			}

			assert_eq!(store.get(0).await.unwrap(), values[0]);
			assert_eq!(store.get(1).await.unwrap(), values[1]);
		}
		for iteration in 1..4 {
			println!(">>> 2-{}", iteration);
			let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();

			let mut stream = store.stream_values().await.unwrap().enumerate();
			while let Some((i, result)) = stream.next().await {
				let value = result.unwrap();
				assert_eq!(value, values[i]);
			}

			assert_eq!(store.get(0).await.unwrap(), values[0]);
			assert_eq!(store.get(1).await.unwrap(), values[1]);
		}
		tokio::fs::remove_file(path).await.unwrap();
	} // }}}

	#[tokio::test]
	async fn fw_constant() /* {{{ */ {
		let values = (0..5).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 0).await.unwrap();

		let indices = (0..values.len() as u64).collect::<Vec<u64>>();
		for i in indices.iter() {
			let result = store.get(*i).await.unwrap();
			assert_eq!(result, values[*i as usize]);
		}
	} // }}}

	#[tokio::test]
	async fn fw_prime_5() /* {{{ */ {
		let values = (0..5).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 0).await.unwrap();

		let mut rng = rand::thread_rng();
		let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
		for _ in 1..16 {
			indices.shuffle(&mut rng);
			for i in indices.iter() {
				let result = store.get(*i).await.unwrap();
				assert_eq!(result, values[*i as usize]);
			}
		}
	} // }}}

	#[tokio::test]
	async fn fw_prime_13() /* {{{ */ {
		let values = (0..13).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 13).await.unwrap();

		let mut rng = rand::thread_rng();
		let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
		for _ in 1..16 {
			indices.shuffle(&mut rng);
			for i in indices.iter() {
				let result = store.get(*i).await.unwrap();
				assert_eq!(result, values[*i as usize]);
			}
		}
	} // }}}

	#[tokio::test]
	async fn fw_prime_41() /* {{{ */ {
		let values = (0..41).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 0).await.unwrap();

		let mut rng = rand::thread_rng();
		let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
		for _ in 1..16 {
			indices.shuffle(&mut rng);
			for i in indices.iter() {
				let result = store.get(*i).await.unwrap();
				assert_eq!(result, values[*i as usize]);
			}
		}
	} // }}}

	#[tokio::test]
	async fn fw_prime_47() /* {{{ */ {
		let values = (0..47).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 47).await.unwrap();

		let mut rng = rand::thread_rng();
		let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
		for _ in 1..16 {
			indices.shuffle(&mut rng);
			for i in indices.iter() {
				let result = store.get(*i).await.unwrap();
				assert_eq!(result, values[*i as usize]);
			}
		}
	} // }}}

	#[tokio::test]
	async fn fw_prime_53() /* {{{ */ {
		let values = (0..53).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 0).await.unwrap();

		let mut rng = rand::thread_rng();
		let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
		for _ in 1..16 {
			indices.shuffle(&mut rng);
			for i in indices.iter() {
				let result = store.get(*i).await.unwrap();
				assert_eq!(result, values[*i as usize]);
			}
		}
	} // }}}

	#[tokio::test]
	async fn fw_prime_53_reopen() /* {{{ */ {
		let values = (0..53).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		for _ in 1..2 {
			let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
			store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 53).await.unwrap();

			let mut rng = rand::thread_rng();
			let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
			for _ in 1..16 {
				indices.shuffle(&mut rng);
				for i in indices.iter() {
					let result = store.get(*i).await.unwrap();
					assert_eq!(result, values[*i as usize]);
				}
			}
		}
	} // }}}

	#[tokio::test]
	async fn get_many() /* {{{ */ {
		let mut values = (0..=255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
		values.extend((0..=255_u8).map(|i| FixedWidth::new2(i)));

		let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
		let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
		store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 512).await.unwrap();

		let mut rng = rand::thread_rng();
		let indices = (0..values.len() as u64).collect::<Vec<u64>>();
		let counts = [1, 2, 3, 5, 7, 8, 11, 13, 16, 17, 19, 23, 29, 31, 32, 37];
		// Repeating four times to give us coverage of cached cases
		for _ in 0..4 {
			for count in &counts {
				let is = indices.choose_multiple(&mut rng, *count).map(|i| *i).collect::<Vec<u64>>();
				let results = store.get_many(&is).await.unwrap();
				assert_eq!(results.len(), is.len());
				for (key, value) in results.into_iter() {
					assert_eq!(value, values[key as usize]);
				}
			}
		}
	} // }}}
}