use core::cmp::Ordering;
use core::future::Future;
use core::{fmt, mem};
use std::io::{ErrorKind, SeekFrom};
use std::path::Path;
use bitcode::{Buffer, Decode, Encode};
use futures::future::FutureExt;
use futures::stream::{TryStream, TryStreamExt};
use memmap2::{Advice, Mmap, MmapMut, MmapOptions};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::{io, task};
use tracing::instrument;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unalign};
use super::{Error, Key, Store, StreamError, CURRENT_FORMAT_VERSION};
impl<K, T> Store<K, T>
where
K: Key,
T: for<'a> Decode<'a> + Encode + fmt::Debug + Send + Sync,
{
pub(crate) const HEADER_SIZE: usize = mem::size_of::<HeaderOnDisk>();
const INDEX_ENTRY_SIZE: usize = mem::size_of::<IndexEntryOnDisk<K>>();
pub(crate) const fn data_start_position(&self) -> u64 {
Self::HEADER_SIZE as u64
}
#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
pub(crate) fn find_index_entry(&self, key: K) -> Option<&IndexEntryOnDisk<K>> {
let mut size = self.header.pair_count;
let mut left = 0;
let mut right = size;
while left < right {
let mid = left + size / 2;
let entry = self.get_index_entry(mid)?;
match key.cmp(&entry.key.get()) {
Ordering::Equal => return Some(entry),
Ordering::Greater => left = mid + 1,
Ordering::Less => right = mid,
};
size = right - left;
}
None
}
#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
pub(crate) fn find_index_entries(&self, keys: &[K]) -> Vec<(K, IndexEntry<K>)> {
let mut chunks = Vec::with_capacity(keys.len());
chunks.push((keys, 0, self.header.pair_count - 1));
let mut chunks_new = Vec::with_capacity(keys.len());
let mut results = Vec::with_capacity(keys.len());
while (!chunks.is_empty()) {
chunks_new.clear();
for (keys, left, right) in chunks.iter() {
let left = *left;
let right = *right;
let mid = left + ((right - left) / 2);
let Some(entry) = self.get_index_entry(mid) else {
continue;
};
let pivot = match keys.binary_search(&entry.key.get()) {
Ok(i) => {
results.push((keys[i], entry.into()));
if (keys.len() == 1) {
continue;
}
i
}
Err(i) => i,
};
if (left == right) {
continue;
}
match pivot {
0 => chunks_new.push((&keys[..], mid + 1, right)),
i if i == keys.len() => chunks_new.push((&keys[..], left, mid)),
i => {
chunks_new.push((&keys[..=i], left, mid));
if (mid < right) {
chunks_new.push((&keys[i..], mid + 1, right));
}
}
};
}
std::mem::swap(&mut chunks, &mut chunks_new);
}
results
}
#[inline]
pub(crate) async fn read_value_at(&mut self, position: u64) -> Result<T, Error<K>> {
self.file.reader.seek(SeekFrom::Start(position)).await?;
match self.read_value().await? {
Some(v) => Ok(v),
None => Err(Error::MissingValue),
}
}
#[instrument(err, level = "info", skip(self, input_stream), fields(indexkv.path = %self.path.display()))]
pub(crate) async fn _write<S>(&mut self, mut input_stream: S) -> Result<(), StreamError<S::Error, K>>
where
S: TryStream<Ok = (K, T)> + Unpin + Send,
S::Error: std::error::Error + Send,
{
self.index_cache.clear();
let old_fd = self.file.take().await?;
drop(old_fd);
let mut buf = Buffer::new();
let mut writer = BufWriter::new(File::options().create(true).truncate(true).append(false).write(true).open(self.path.as_path()).await?);
let index_file: File = task::spawn_blocking(tempfile::tempfile).await??.into();
let mut index_writer = BufWriter::new(index_file);
self.header.format_version = CURRENT_FORMAT_VERSION;
self.header.completed_writing = 0;
self.header.pair_count = 0;
{
let header: HeaderOnDisk = self.header.into();
writer.write_all(header.as_bytes()).await?;
}
let mut pos = self.data_start_position();
while let Some((key, value)) = input_stream.try_next().await.map_err(StreamError::from_external)? {
let slice = buf.encode(&value);
let index_entry = IndexEntryOnDisk::new(key, pos);
index_writer.write_all(index_entry.as_bytes()).await?;
self.header.pair_count += 1;
pos += writer.write((slice.len() as u32).to_ne_bytes().as_slice()).await? as u64;
pos += writer.write(slice).await? as u64;
}
self.header.index_start_pos = pos;
index_writer.flush().await?;
let mut index_file = index_writer.into_inner();
let mut index = unsafe { MmapMut::map_mut(&index_file)? };
index.advise(Advice::WillNeed)?;
index.advise(Advice::Sequential)?;
{
let index = <[IndexEntryOnDisk<K>]>::mut_from_bytes_with_elems(index.as_mut(), self.header.pair_count as usize).unwrap();
index.sort_unstable();
}
task::spawn_blocking(move || {
index.flush()?;
drop(index);
Ok::<_, std::io::Error>(())
})
.await??;
index_file.seek(SeekFrom::Start(0)).await?;
writer.seek(SeekFrom::End(0)).await?;
io::copy(&mut index_file, &mut writer).await?;
self.header.completed_writing = 1;
{
let header: HeaderOnDisk = self.header.into();
writer.seek(SeekFrom::Start(0)).await?;
writer.write_all(header.as_bytes()).await?;
writer.flush().await?;
}
drop(writer);
let (new_fd, _) = StoreFiles::open(&self.path).await?;
self.file.replace(new_fd);
Ok(())
}
#[inline]
#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "trace", skip(self), fields(indexkv.path = %self.path.display())))]
fn get_index_entry(&self, i: u64) -> Option<&IndexEntryOnDisk<K>> {
let pos = i as usize * Self::INDEX_ENTRY_SIZE;
let entry = unsafe { self.file.index.get(pos..pos + Self::INDEX_ENTRY_SIZE).map(IndexEntryOnDisk::ref_from_bytes)?.unwrap_unchecked() };
Some(entry)
}
#[inline]
async fn read_value(&mut self) -> Result<Option<T>, Error<K>> {
let mut buf = [0u8; 4];
let size = match self.file.reader.read_exact(buf.as_mut_slice()).await {
Ok(_) => u32::from_ne_bytes(buf) as usize,
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => return Ok(None),
_ => return Err(e.into()),
},
};
let mut buf = vec![0u8; size];
match self.file.reader.read_exact(&mut buf).await {
Ok(read_count) => {
if (read_count < size) {
return Ok(None);
}
}
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => return Ok(None),
_ => return Err(e.into()),
},
};
let value = self.decoder.decode(buf.as_slice())?;
Ok(Some(value))
}
}
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct Header {
pub(crate) format_version: u16,
completed_writing: u8,
pub(crate) pair_count: u64,
index_start_pos: u64,
}
impl Header {
#[inline]
pub(crate) fn completed_writing(&self) -> bool {
self.completed_writing != 0
}
}
#[derive(Debug, FromBytes, Immutable, IntoBytes)]
#[repr(Rust, packed)]
struct HeaderOnDisk {
format_version: u16,
completed_writing: u8,
pair_count: u64,
index_start_pos: u64,
}
impl From<HeaderOnDisk> for Header {
#[inline]
fn from(v: HeaderOnDisk) -> Self {
Self {
format_version: v.format_version,
completed_writing: v.completed_writing,
pair_count: v.pair_count,
index_start_pos: v.index_start_pos,
}
}
}
impl From<Header> for HeaderOnDisk {
#[inline]
fn from(v: Header) -> Self {
Self {
format_version: v.format_version,
completed_writing: v.completed_writing,
pair_count: v.pair_count,
index_start_pos: v.index_start_pos,
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct IndexEntry<K: Key> {
pub(crate) position: u64,
pub(crate) key: K,
}
#[derive(FromBytes, Immutable, IntoBytes, KnownLayout)]
#[repr(C)]
pub(crate) struct IndexEntryOnDisk<K: Key> {
pub(crate) position: Unalign<u64>,
pub(crate) key: Unalign<K>,
}
impl<K: Key> IndexEntryOnDisk<K> {
#[inline]
fn new(key: K, position: u64) -> Self {
Self { key: Unalign::new(key), position: Unalign::new(position) }
}
}
impl<K: Key> PartialEq for IndexEntryOnDisk<K> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.key.get() == other.key.get() && self.position.get() == other.position.get()
}
}
impl<K: Key> Eq for IndexEntryOnDisk<K> {}
impl<K: Key> Ord for IndexEntryOnDisk<K> {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
self.key.get().cmp(&other.key.get())
}
}
impl<K: Key> PartialOrd for IndexEntryOnDisk<K> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<K: Key> From<IndexEntry<K>> for IndexEntryOnDisk<K> {
#[inline]
fn from(v: IndexEntry<K>) -> Self {
Self { position: Unalign::new(v.position), key: Unalign::new(v.key) }
}
}
impl<K: Key> From<&IndexEntryOnDisk<K>> for IndexEntry<K> {
#[inline]
fn from(v: &IndexEntryOnDisk<K>) -> Self {
Self { position: v.position.get(), key: v.key.get() }
}
}
pub(crate) struct StoreFiles {
reader: BufReader<File>,
index: Mmap,
}
impl StoreFiles {
async fn null() -> Result<Self, std::io::Error> {
let fd = File::options().read(true).write(false).open("/dev/null").await?;
Ok(Self { index: MmapOptions::new().map_anon()?.make_read_only()?, reader: BufReader::new(fd) })
}
pub(crate) async fn open(path: &Path) -> Result<(Self, Header), std::io::Error> {
File::options().write(true).create(true).truncate(false).open(path).await?;
let mut fd = File::options().read(true).write(false).open(path).await?;
let header = read_header(&mut fd).await?.unwrap_or_default();
let index = unsafe { MmapOptions::new().offset(header.index_start_pos).map(&fd)? };
index.advise(Advice::Random)?;
let this = Self { index, reader: BufReader::new(fd) };
Ok((this, header))
}
fn take(&mut self) -> impl Future<Output = Result<Self, std::io::Error>> + use<'_> {
Self::null().map(move |r| r.map(|replacement| self.replace(replacement)))
}
fn replace(&mut self, mut other: Self) -> Self {
mem::swap(self, &mut other);
other
}
}
#[inline]
pub(crate) async fn read_header(file: &mut File) -> Result<Option<Header>, std::io::Error> {
let mut buf = [0u8; mem::size_of::<HeaderOnDisk>()];
file.rewind().await?;
match file.read_exact(buf.as_mut_slice()).await {
Ok(_) => (),
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e),
};
let header = unsafe { HeaderOnDisk::read_from_bytes(buf.as_slice()).unwrap_unchecked().into() };
Ok(Some(header))
}