indexkv 0.8.0

A performance-focused, persistent, async, key-value store
Documentation
use core::cmp::Ordering;
use core::future::Future;
use core::{fmt, mem};
use std::io::{ErrorKind, SeekFrom};
use std::path::Path;

use bitcode::{Buffer, Decode, Encode};
use futures::future::FutureExt;
use futures::stream::{TryStream, TryStreamExt};
use memmap2::{Advice, Mmap, MmapMut, MmapOptions};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::{io, task};
use tracing::instrument;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unalign};

use super::{Error, Key, Store, StreamError, CURRENT_FORMAT_VERSION};

impl<K, T> Store<K, T>
where
	K: Key,
	T: for<'a> Decode<'a> + Encode + fmt::Debug + Send + Sync,
{
	pub(crate) const HEADER_SIZE: usize = mem::size_of::<HeaderOnDisk>();
	const INDEX_ENTRY_SIZE: usize = mem::size_of::<IndexEntryOnDisk<K>>();

	pub(crate) const fn data_start_position(&self) -> u64 {
		Self::HEADER_SIZE as u64
	}

	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
	pub(crate) fn find_index_entry(&self, key: K) -> Option<&IndexEntryOnDisk<K>> {
		let mut size = self.header.pair_count;
		let mut left = 0;
		let mut right = size;
		while left < right {
			let mid = left + size / 2;
			let entry = self.get_index_entry(mid)?;
			match key.cmp(&entry.key.get()) {
				Ordering::Equal => return Some(entry),
				Ordering::Greater => left = mid + 1,
				Ordering::Less => right = mid,
			};
			size = right - left;
		}
		None
	}

	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
	pub(crate) fn find_index_entries(&self, keys: &[K]) -> Vec<(K, IndexEntry<K>)> {
		let mut chunks = Vec::with_capacity(keys.len());
		chunks.push((keys, 0, self.header.pair_count - 1));
		let mut chunks_new = Vec::with_capacity(keys.len());
		let mut results = Vec::with_capacity(keys.len());
		while (!chunks.is_empty()) {
			chunks_new.clear();
			for (keys, left, right) in chunks.iter() {
				let left = *left;
				let right = *right;
				let mid = left + ((right - left) / 2);
				let Some(entry) = self.get_index_entry(mid) else {
					continue;
				};
				let pivot = match keys.binary_search(&entry.key.get()) {
					Ok(i) => {
						results.push((keys[i], entry.into()));
						if (keys.len() == 1) {
							continue;
						}
						i
					}
					Err(i) => i,
				};
				if (left == right) {
					continue;
				}
				match pivot {
					0 => chunks_new.push((&keys[..], mid + 1, right)),
					i if i == keys.len() => chunks_new.push((&keys[..], left, mid)),
					i => {
						chunks_new.push((&keys[..=i], left, mid));
						if (mid < right) {
							chunks_new.push((&keys[i..], mid + 1, right));
						}
					}
				};
			}
			std::mem::swap(&mut chunks, &mut chunks_new);
		}
		results
	}

	#[inline]
	pub(crate) async fn read_value_at(&mut self, position: u64) -> Result<T, Error<K>> {
		self.file.reader.seek(SeekFrom::Start(position)).await?;
		match self.read_value().await? {
			Some(v) => Ok(v),
			None => Err(Error::MissingValue),
		}
	}

	// Order of operations:
	//   * Write down the header, with completed_writing = false
	//   * Seek to data_start_position
	//   * For each entry in pairs:
	//     * Compute the serialized size of this entry
	//     * Serialize the size and the entry itself to the buffer
	//     * Save the (key, position_in_file) pair as an IndexEntry
	//     * Write the buffer to the file
	//   * Save current file position (as index_start_pos) and number of entries to the header
	//   * Write the sorted index down to the file
	//   * Seek to the beginning of the file and write down the header, with completed_writing = true
	#[instrument(err, level = "info", skip(self, input_stream), fields(indexkv.path = %self.path.display()))]
	pub(crate) async fn _write<S>(&mut self, mut input_stream: S) -> Result<(), StreamError<S::Error, K>>
	where
		S: TryStream<Ok = (K, T)> + Unpin + Send,
		S::Error: std::error::Error + Send,
	{
		self.index_cache.clear();
		let old_fd = self.file.take().await?;
		drop(old_fd);

		let mut buf = Buffer::new();
		let mut writer = BufWriter::new(File::options().create(true).truncate(true).append(false).write(true).open(self.path.as_path()).await?);
		let index_file: File = task::spawn_blocking(tempfile::tempfile).await??.into();
		let mut index_writer = BufWriter::new(index_file);

		self.header.format_version = CURRENT_FORMAT_VERSION;
		self.header.completed_writing = 0;
		self.header.pair_count = 0;
		{
			let header: HeaderOnDisk = self.header.into();
			writer.write_all(header.as_bytes()).await?;
		}

		let mut pos = self.data_start_position();
		while let Some((key, value)) = input_stream.try_next().await.map_err(StreamError::from_external)? {
			// We're writing down the (size, value) tuple so that we can deserialize values later
			//    without having to do any sort of read-to-end scheme.  We'll know exactly how many
			//    bytes to allocate in our read buffer.

			let slice = buf.encode(&value);

			let index_entry = IndexEntryOnDisk::new(key, pos);
			index_writer.write_all(index_entry.as_bytes()).await?;
			self.header.pair_count += 1;

			pos += writer.write((slice.len() as u32).to_ne_bytes().as_slice()).await? as u64;
			pos += writer.write(slice).await? as u64;
		}
		self.header.index_start_pos = pos;
		index_writer.flush().await?;

		// Doing this sort in a memory-mapped file is only 1-2.5% slower than doing it in a Vec,
		// while not requiring all that RAM to be available to the process
		let mut index_file = index_writer.into_inner();
		// SAFETY:  We ensure that nobody else uses the file while we're working with it.  A
		// separate process mutating it while we have it open would be a problem, but only a data
		// consistency problem, not UB.
		let mut index = unsafe { MmapMut::map_mut(&index_file)? };
		index.advise(Advice::WillNeed)?;
		index.advise(Advice::Sequential)?;
		{
			let index = <[IndexEntryOnDisk<K>]>::mut_from_bytes_with_elems(index.as_mut(), self.header.pair_count as usize).unwrap();
			index.sort_unstable();
		}
		task::spawn_blocking(move || {
			index.flush()?;
			drop(index);
			Ok::<_, std::io::Error>(())
		})
		.await??;

		index_file.seek(SeekFrom::Start(0)).await?;
		writer.seek(SeekFrom::End(0)).await?;
		io::copy(&mut index_file, &mut writer).await?;

		self.header.completed_writing = 1;
		{
			let header: HeaderOnDisk = self.header.into();
			writer.seek(SeekFrom::Start(0)).await?;
			writer.write_all(header.as_bytes()).await?;
			writer.flush().await?;
		}
		drop(writer);

		let (new_fd, _) = StoreFiles::open(&self.path).await?;
		self.file.replace(new_fd);
		Ok(())
	}

	#[inline]
	#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "trace", skip(self), fields(indexkv.path = %self.path.display())))]
	fn get_index_entry(&self, i: u64) -> Option<&IndexEntryOnDisk<K>> {
		let pos = i as usize * Self::INDEX_ENTRY_SIZE;
		// SAFETY:  We guarantee the correct buffer size using the indices
		let entry = unsafe { self.file.index.get(pos..pos + Self::INDEX_ENTRY_SIZE).map(IndexEntryOnDisk::ref_from_bytes)?.unwrap_unchecked() };
		Some(entry)
	}

	#[inline]
	async fn read_value(&mut self) -> Result<Option<T>, Error<K>> {
		let mut buf = [0u8; 4];
		let size = match self.file.reader.read_exact(buf.as_mut_slice()).await {
			Ok(_) => u32::from_ne_bytes(buf) as usize,
			Err(e) => match e.kind() {
				std::io::ErrorKind::UnexpectedEof => return Ok(None),
				_ => return Err(e.into()),
			},
		};

		let mut buf = vec![0u8; size];
		match self.file.reader.read_exact(&mut buf).await {
			Ok(read_count) => {
				if (read_count < size) {
					return Ok(None);
				}
			}
			Err(e) => match e.kind() {
				std::io::ErrorKind::UnexpectedEof => return Ok(None),
				_ => return Err(e.into()),
			},
		};
		let value = self.decoder.decode(buf.as_slice())?;
		Ok(Some(value))
	}
}

#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct Header {
	pub(crate) format_version: u16,
	completed_writing: u8,
	pub(crate) pair_count: u64,
	/// absolute position in the file
	index_start_pos: u64,
}

impl Header {
	#[inline]
	pub(crate) fn completed_writing(&self) -> bool {
		self.completed_writing != 0
	}
}

#[derive(Debug, FromBytes, Immutable, IntoBytes)]
#[repr(Rust, packed)]
struct HeaderOnDisk {
	format_version: u16,
	completed_writing: u8,
	pair_count: u64,
	index_start_pos: u64,
}

impl From<HeaderOnDisk> for Header {
	#[inline]
	fn from(v: HeaderOnDisk) -> Self {
		Self {
			format_version: v.format_version,
			completed_writing: v.completed_writing,
			pair_count: v.pair_count,
			index_start_pos: v.index_start_pos,
		}
	}
}

impl From<Header> for HeaderOnDisk {
	#[inline]
	fn from(v: Header) -> Self {
		Self {
			format_version: v.format_version,
			completed_writing: v.completed_writing,
			pair_count: v.pair_count,
			index_start_pos: v.index_start_pos,
		}
	}
}

#[derive(Debug, Clone, Copy)]
pub(crate) struct IndexEntry<K: Key> {
	/// absolute position in the file
	pub(crate) position: u64,
	pub(crate) key: K,
}

#[derive(FromBytes, Immutable, IntoBytes, KnownLayout)]
#[repr(C)]
pub(crate) struct IndexEntryOnDisk<K: Key> {
	/// absolute position in the file
	pub(crate) position: Unalign<u64>,
	pub(crate) key: Unalign<K>,
}

impl<K: Key> IndexEntryOnDisk<K> {
	#[inline]
	fn new(key: K, position: u64) -> Self {
		Self { key: Unalign::new(key), position: Unalign::new(position) }
	}
}

impl<K: Key> PartialEq for IndexEntryOnDisk<K> {
	#[inline]
	fn eq(&self, other: &Self) -> bool {
		self.key.get() == other.key.get() && self.position.get() == other.position.get()
	}
}

impl<K: Key> Eq for IndexEntryOnDisk<K> {}

impl<K: Key> Ord for IndexEntryOnDisk<K> {
	#[inline]
	fn cmp(&self, other: &Self) -> Ordering {
		self.key.get().cmp(&other.key.get())
	}
}

impl<K: Key> PartialOrd for IndexEntryOnDisk<K> {
	#[inline]
	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
		Some(self.cmp(other))
	}
}

impl<K: Key> From<IndexEntry<K>> for IndexEntryOnDisk<K> {
	#[inline]
	fn from(v: IndexEntry<K>) -> Self {
		Self { position: Unalign::new(v.position), key: Unalign::new(v.key) }
	}
}

impl<K: Key> From<&IndexEntryOnDisk<K>> for IndexEntry<K> {
	#[inline]
	fn from(v: &IndexEntryOnDisk<K>) -> Self {
		Self { position: v.position.get(), key: v.key.get() }
	}
}

pub(crate) struct StoreFiles {
	reader: BufReader<File>,
	index: Mmap,
}

impl StoreFiles {
	async fn null() -> Result<Self, std::io::Error> {
		let fd = File::options().read(true).write(false).open("/dev/null").await?;
		Ok(Self { index: MmapOptions::new().map_anon()?.make_read_only()?, reader: BufReader::new(fd) })
	}

	pub(crate) async fn open(path: &Path) -> Result<(Self, Header), std::io::Error> {
		// Touch the file before we try to open it; since we want to open it read-only, we have to
		// do this quick read-write open first
		File::options().write(true).create(true).truncate(false).open(path).await?;
		let mut fd = File::options().read(true).write(false).open(path).await?;
		let header = read_header(&mut fd).await?.unwrap_or_default();

		// SAFETY:  Nobody else in this process is able to write to the file; if another
		// process messes with it, we won't have UB, only data consistency issues.
		let index = unsafe { MmapOptions::new().offset(header.index_start_pos).map(&fd)? };
		index.advise(Advice::Random)?;

		let this = Self { index, reader: BufReader::new(fd) };

		Ok((this, header))
	}

	fn take(&mut self) -> impl Future<Output = Result<Self, std::io::Error>> + use<'_> {
		Self::null().map(move |r| r.map(|replacement| self.replace(replacement)))
	}

	fn replace(&mut self, mut other: Self) -> Self {
		mem::swap(self, &mut other);
		other
	}
}

#[inline]
pub(crate) async fn read_header(file: &mut File) -> Result<Option<Header>, std::io::Error> {
	let mut buf = [0u8; mem::size_of::<HeaderOnDisk>()];
	file.rewind().await?;
	match file.read_exact(buf.as_mut_slice()).await {
		Ok(_) => (),
		Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None),
		Err(e) => return Err(e),
	};
	// SAFETY:  We guarantee the byte slice is the right size with the .get()
	let header = unsafe { HeaderOnDisk::read_from_bytes(buf.as_slice()).unwrap_unchecked().into() };
	Ok(Some(header))
}