#![allow(unused_parens)]
#![allow(clippy::tabs_in_doc_comments)]
#![warn(clippy::future_not_send)]
use core::fmt::{Debug, Display};
use core::hash::Hash;
use std::collections::HashMap;
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use bitcode::{Decode, Encode};
use futures::stream::{Stream, StreamExt, TryStream};
use tokio::io::AsyncSeekExt;
use tracing::instrument;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
mod error;
mod internal;
pub mod stream;
pub use error::{Error, StreamError};
use internal::{Header, StoreFiles};
pub trait Key: Copy + Eq + Hash + Ord + Debug + Display + FromBytes + Immutable + IntoBytes + KnownLayout + Send + Sync {}
impl<T> Key for T where T: Copy + Eq + Hash + Ord + Debug + Display + FromBytes + Immutable + IntoBytes + KnownLayout + Send + Sync {}
const CURRENT_FORMAT_VERSION: u16 = 3;
pub struct Store<K, T>
where
K: Key,
T: for<'a> Decode<'a> + Encode + Debug + Send + Sync,
{
path: PathBuf,
file: StoreFiles,
header: Header,
index_cache: HashMap<K, u64, fnv::FnvBuildHasher>,
decoder: bitcode::Buffer,
_nothing: std::marker::PhantomData<T>,
}
impl<K, T> Store<K, T>
where
K: Key,
T: for<'a> Decode<'a> + Encode + Debug + Send + Sync,
{
pub async fn new(path: impl AsRef<Path> + Send) -> Result<Self, Error<K>> {
let path = path.as_ref().to_path_buf();
let (file, header) = StoreFiles::open(path.as_ref()).await?;
Ok(Self {
file,
path,
header,
index_cache: HashMap::with_hasher(fnv::FnvBuildHasher::default()),
decoder: bitcode::Buffer::new(),
_nothing: std::marker::PhantomData,
})
}
#[inline]
pub async fn is_valid(&mut self) -> Result<bool, Error<K>> {
Ok(self.header.format_version == CURRENT_FORMAT_VERSION && self.header.completed_writing())
}
#[instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display()))]
#[inline]
pub async fn stream_values(&self) -> Result<impl Stream<Item = Result<T, Error<K>>>, Error<K>> {
let start_pos = self.data_start_position();
let pair_count = self.header.pair_count as usize;
let mut file = tokio::fs::OpenOptions::new().read(true).write(false).create(false).append(false).open(&self.path).await?;
file.seek(SeekFrom::Start(start_pos)).await?;
let stream = stream::ValueStream::new(file).take(pair_count);
Ok(stream)
}
#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
#[inline]
pub async fn get(&mut self, key: K) -> Result<T, Error<K>> {
let position = self.index_cache.get(&key);
let position = match position {
Some(v) => *v,
None => {
let pos = match self.find_index_entry(key) {
Some(v) => v.position.get(),
None => return Err(Error::NotFound(key)),
};
self.index_cache.insert(key, pos);
pos
}
};
self.read_value_at(position).await
}
#[instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display()))]
pub async fn get_many(&mut self, keys_in: &[K]) -> Result<HashMap<K, T, fnv::FnvBuildHasher>, Error<K>> {
let mut results = HashMap::with_capacity_and_hasher(keys_in.len(), fnv::FnvBuildHasher::default());
let mut keys = Vec::new(); for key in keys_in {
if let Some(&position) = self.index_cache.get(key) {
let value = self.read_value_at(position).await?;
results.insert(*key, value);
} else {
keys.push(*key);
}
}
keys.sort_unstable();
if (keys.len() == 1) {
let key = keys[0];
if let Some(index) = self.find_index_entry(key) {
let position = index.position.get();
self.index_cache.insert(key, position);
results.insert(key, self.read_value_at(position).await?);
}
} else {
let found = self.find_index_entries(&keys);
for (key, index) in found.into_iter() {
self.index_cache.insert(key, index.position);
results.insert(key, self.read_value_at(index.position).await?);
}
}
Ok(results)
}
#[inline]
pub async fn write<S>(&mut self, input_stream: S) -> Result<(), StreamError<S::Error, K>>
where
S: TryStream<Ok = (K, T)> + Unpin + Send,
S::Error: std::error::Error + Send,
{
self._write(input_stream).await
}
#[inline]
pub async fn write_infallible<S>(&mut self, input_stream: S) -> Result<(), Error<K>>
where
S: Stream<Item = (K, T)> + Unpin + Send,
{
self.write(input_stream.map(Result::<_, Error<K>>::Ok)).await.map_err(|e| match e {
StreamError::Internal(e) => e,
_ => unreachable!(),
})
}
pub fn clear_index_cache(&mut self) {
self.index_cache.clear();
}
}
#[cfg(test)]
mod tests {
use futures::stream;
use rand::prelude::{IndexedRandom, SliceRandom};
use crate::*;
#[derive(Clone, Debug, PartialEq, Decode, Encode)]
struct FixedWidth {
a: u8,
b: i16,
c: u32,
d: u64,
}
impl FixedWidth {
fn new(i: u8) -> Self {
Self { a: i, b: i as i16 * 2, c: i as u32 * 4, d: i as u64 * 8 }
}
fn new2(i: u8) -> Self {
Self { a: 255 - i, b: -(i as i16) * 2, c: i as u32 * 11, d: i as u64 * 17 }
}
}
#[tokio::test]
async fn constant_u8_1()
{
let values = vec![42u8];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u8, u8> = Store::new(path).await.unwrap();
store
.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| {
println!("{} -> {}", i, v);
(i as u8, v)
})))
.await
.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 42u8);
}
#[tokio::test]
async fn constant_u8_2()
{
let values = vec![11u8, 32u8];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<i16, u8> = Store::new(path).await.unwrap();
store
.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| {
println!("{} -> {}", i, v);
(i as i16, v)
})))
.await
.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 11u8);
let result = store.get(1).await.unwrap();
assert_eq!(result, 32u8);
}
#[tokio::test]
async fn constant_u8_3()
{
let values = vec![47u8, 53u8, 128u8];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u32, u8> = Store::new(path).await.unwrap();
store
.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| {
println!("{} -> {}", i, v);
(i as u32, v)
})))
.await
.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 47u8);
let result = store.get(1).await.unwrap();
assert_eq!(result, 53u8);
let result = store.get(2).await.unwrap();
assert_eq!(result, 128u8);
}
#[tokio::test]
async fn constant_u16_1()
{
let values = vec![42u16];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<i128, u16> = Store::new(path).await.unwrap();
store
.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| {
println!("{} -> {}", i, v);
(i as i128, v)
})))
.await
.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 42u16);
}
#[tokio::test]
async fn constant_u16_2()
{
let values = vec![11u16, 32u16];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<i32, u16> = Store::new(path).await.unwrap();
store
.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| {
println!("{} -> {}", i, v);
(i as i32, v)
})))
.await
.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 11u16);
let result = store.get(1).await.unwrap();
assert_eq!(result, 32u16);
}
#[tokio::test]
async fn fixedwidth()
{
let mut values = (0..255_u8).map(FixedWidth::new).collect::<Vec<_>>();
values.extend((0..255_u8).map(FixedWidth::new2));
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u16, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u16, v)))).await.unwrap();
let mut rng = rand::rng();
let mut indices = (0..values.len() as u16).collect::<Vec<u16>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn strings()
{
#[derive(Clone, Debug, PartialEq, Decode, Encode)]
struct Struct {
number: u8,
name: String,
maybe_name: Option<String>,
}
impl Struct {
fn new(i: u8) -> Self {
Self {
number: i,
name: english_numbers::convert_all_fmt(i as i64),
maybe_name: match i % 2 {
0 => Some(english_numbers::convert_long(i as i64, english_numbers::Formatting::none())),
_ => None,
},
}
}
}
let values = (0..255_u8).map(Struct::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<i64, Struct> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as i64, v)))).await.unwrap();
let mut rng = rand::rng();
let mut indices = (0..values.len() as i64).collect::<Vec<i64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn strings_reopen()
{
#[derive(Clone, Debug, PartialEq, Decode, Encode)]
struct Struct {
number: u8,
name: String,
maybe_name: Option<String>,
}
impl Struct {
fn new(i: u8) -> Self {
Self {
number: i,
name: english_numbers::convert_all_fmt(i as i64),
maybe_name: match i % 2 {
0 => Some(english_numbers::convert_long(i as i64, english_numbers::Formatting::none())),
_ => None,
},
}
}
}
let values = (0..255_u8).map(Struct::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
for _ in 1..4 {
let mut store: Store<u128, Struct> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u128, v)))).await.unwrap();
let mut rng = rand::rng();
let mut indices = (0..values.len() as u128).collect::<Vec<u128>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
}
#[tokio::test]
async fn smallstream()
{
let values = (0..5_u8).map(FixedWidth::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().keep().unwrap().1;
let mut store: Store<i8, FixedWidth> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as i8, v)))).await.unwrap();
let mut stream = store.stream_values().await.unwrap().enumerate();
while let Some((i, result)) = stream.next().await {
let value = result.unwrap();
assert_eq!(value, values[i]);
}
assert_eq!(store.get(0).await.unwrap(), values[0]);
assert_eq!(store.get(1).await.unwrap(), values[1]);
tokio::fs::remove_file(path).await.unwrap();
}
#[tokio::test]
async fn stream()
{
let mut values = (0..255_u8).map(FixedWidth::new).collect::<Vec<_>>();
values.extend((0..255_u8).map(FixedWidth::new2));
let path = tempfile::NamedTempFile::new().unwrap().keep().unwrap().1;
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut stream = store.stream_values().await.unwrap().enumerate();
while let Some((i, result)) = stream.next().await {
let value = result.unwrap();
assert_eq!(value, values[i]);
}
assert_eq!(store.get(0).await.unwrap(), values[0]);
assert_eq!(store.get(1).await.unwrap(), values[1]);
tokio::fs::remove_file(path).await.unwrap();
}
#[tokio::test]
async fn stream_reopen()
{
let mut values = (0..255_u8).map(FixedWidth::new).collect::<Vec<_>>();
values.extend((0..255_u8).map(FixedWidth::new2));
let path = tempfile::NamedTempFile::new().unwrap().keep().unwrap().1;
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
drop(store);
for iteration in 1..4 {
println!(">>> 1-{}", iteration);
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
let mut stream = store.stream_values().await.unwrap().enumerate();
while let Some((i, result)) = stream.next().await {
let value = result.unwrap();
assert_eq!(value, values[i]);
}
assert_eq!(store.get(0).await.unwrap(), values[0]);
assert_eq!(store.get(1).await.unwrap(), values[1]);
}
for iteration in 1..4 {
println!(">>> 2-{}", iteration);
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
let mut stream = store.stream_values().await.unwrap().enumerate();
while let Some((i, result)) = stream.next().await {
let value = result.unwrap();
assert_eq!(value, values[i]);
}
assert_eq!(store.get(0).await.unwrap(), values[0]);
assert_eq!(store.get(1).await.unwrap(), values[1]);
}
tokio::fs::remove_file(path).await.unwrap();
}
#[tokio::test]
async fn fw_constant()
{
let values = (0..5).map(FixedWidth::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let indices = (0..values.len() as u64).collect::<Vec<u64>>();
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
#[tokio::test]
async fn fw_prime_5()
{
let values = (0..5).map(FixedWidth::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_13()
{
let values = (0..13).map(FixedWidth::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_41()
{
let values = (0..41).map(FixedWidth::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_47()
{
let values = (0..47).map(FixedWidth::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_53()
{
let values = (0..53).map(FixedWidth::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_53_reopen()
{
let values = (0..53).map(FixedWidth::new).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
for _ in 1..2 {
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
}
#[tokio::test]
async fn get_many()
{
let mut values = (0..=255_u8).map(FixedWidth::new).collect::<Vec<_>>();
values.extend((0..=255_u8).map(FixedWidth::new2));
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::rng();
let indices = (0..values.len() as u64).collect::<Vec<u64>>();
let counts = [1, 2, 3, 5, 7, 8, 11, 13, 16, 17, 19, 23, 29, 31, 32, 37];
for _ in 0..4 {
for count in &counts {
let is = indices.sample(&mut rng, *count).copied().collect::<Vec<u64>>();
let results = store.get_many(&is).await.unwrap();
assert_eq!(results.len(), is.len());
for (key, value) in results.into_iter() {
assert_eq!(value, values[key as usize]);
}
}
}
} }