#![allow(unused_parens)]
#![allow(clippy::tabs_in_doc_comments)]
#![warn(clippy::future_not_send)]
use std::cmp::Ordering;
use std::collections::HashMap;
use std::hash::Hash;
use std::io::SeekFrom;
use std::mem::size_of;
use std::path::Path;
use std::path::PathBuf;
use bincode::deserialize;
use bincode::Options;
use tokio_byteorder::AsyncReadBytesExt;
use tokio_byteorder::NativeEndian;
use futures::stream::Stream;
use futures::stream::StreamExt;
use futures::stream::TryStream;
use futures::stream::TryStreamExt;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeek;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::io::BufStream;
use tokio::io::BufWriter;
use tracing::instrument;
mod error;
pub use error::Error;
pub use error::StreamError;
pub mod stream;
pub trait Key: Copy + Eq + Hash + Ord + std::fmt::Debug + std::fmt::Display + for<'de> Deserialize<'de> + Serialize + Send + Sync {}
impl<T> Key for T where T: Copy + Eq + Hash + Ord + std::fmt::Debug + std::fmt::Display + for<'de> Deserialize<'de> + Serialize + Send + Sync {}
const CURRENT_FORMAT_VERSION: u16 = 2;
fn codec() -> impl bincode::Options + Copy {
bincode::DefaultOptions::new()
.with_limit(u32::max_value() as u64)
.allow_trailing_bytes()
.with_fixint_encoding()
.with_native_endian()
}
#[derive(Debug, Eq, PartialEq, Serialize)]
pub struct IndexEntry<K: Key> {
key: K,
position: u64
}
impl<K: Key> IndexEntry<K> {
fn new(key: K, position: u64) -> Self {
Self{
key,
position
}
} }
impl<K: Key> Ord for IndexEntry<K> {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
self.key.cmp(&other.key)
}
}
impl<K: Key> PartialOrd for IndexEntry<K> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<'de, K: Key> Deserialize<'de> for IndexEntry<K> {
#[inline]
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let (key, position) = <(K, u64)>::deserialize(deserializer)?;
Ok(Self{
key, position
})
}
}
#[derive(Debug, Deserialize, Serialize)]
struct Header {
format_version: u16,
completed_writing: bool,
pair_count: u64,
index_start_pos: u64
}
pub struct Store<K, T>
where
K: Key,
T: Serialize + DeserializeOwned
{
path: PathBuf,
file: BufStream<File>,
header: Header,
index_cache: HashMap<K, u64, fnv::FnvBuildHasher>,
_nothing: std::marker::PhantomData<T>
}
impl<K, T> Store<K, T>
where
K: Key,
T: Serialize + DeserializeOwned + std::fmt::Debug + Send + Sync,
{
const HEADER_SIZE: usize = size_of::<Header>();
const INDEX_ENTRY_SIZE: usize = size_of::<IndexEntry<K>>();
pub async fn new(path: impl AsRef<Path> + Send) -> Result<Self, Error<K>> {
let path = path.as_ref().to_path_buf();
let fd = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.append(false)
.open(&path)
.await?;
let mut store = Self{
path,
file: BufStream::new(fd),
header: Header{
format_version: 0,
completed_writing: false,
pair_count: 0,
index_start_pos: Self::HEADER_SIZE as u64
},
index_cache: HashMap::with_hasher(fnv::FnvBuildHasher::default()),
_nothing: std::marker::PhantomData
};
if let Some(header) = store.read_header().await? {
store.header = header;
}
Ok(store)
}
#[inline]
fn index_start_position(&self) -> u64 {
self.header.index_start_pos
}
const fn data_start_position(&self) -> u64 {
Self::HEADER_SIZE as u64
}
#[inline]
async fn read_header(&mut self) -> Result<Option<Header>, Error<K>> {
let mut buf = [0u8; size_of::<Header>()];
self.file.seek(SeekFrom::Start(0)).await?;
let read_count = self.file.read(&mut buf).await?;
if(read_count != buf.len()) {
return Ok(None);
}
Ok(Some(deserialize(&buf)?))
}
#[inline]
async fn read_index_entry(&mut self) -> Result<IndexEntry<K>, Error<K>> {
let mut buf = [0u8; 192];
let count = self.file.read(&mut buf).await?;
Ok(deserialize(&buf[..count])?)
}
#[inline]
#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "trace", skip(self), fields(indexkv.path = %self.path.display())))]
async fn get_index_entry(&mut self, i: u64) -> Result<IndexEntry<K>, Error<K>> {
let pos = self.index_start_position() + (i * Self::INDEX_ENTRY_SIZE as u64);
self.file.seek(SeekFrom::Start(pos)).await?;
let entry = self.read_index_entry().await?;
Ok(entry)
}
#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
async fn find_index_entry(&mut self, key: K) -> Result<Option<IndexEntry<K>>, Error<K>> {
let mut size = self.header.pair_count;
let mut left = 0;
let mut right = size;
while left < right {
let mid = left + size / 2;
let entry = self.get_index_entry(mid).await?;
match key.cmp(&entry.key) {
Ordering::Equal => return Ok(Some(entry)),
Ordering::Greater => left = mid + 1,
Ordering::Less => right = mid
};
size = right - left;
}
Ok(None)
}
#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
async fn find_index_entries(&mut self, keys: &[K]) -> Result<Vec<(K, IndexEntry<K>)>, Error<K>> {
let mut chunks = Vec::with_capacity(keys.len());
chunks.push((keys, 0, self.header.pair_count - 1));
let mut chunks_new = Vec::with_capacity(keys.len());
let mut results = Vec::with_capacity(keys.len());
while(!chunks.is_empty()) {
chunks_new.clear();
for (keys, left, right) in chunks.iter() {
let left = *left;
let right = *right;
let mid = left + ((right - left) / 2);
let entry = self.get_index_entry(mid).await?;
let pivot = match keys.binary_search(&entry.key) {
Ok(i) => {
results.push((keys[i], entry));
if(keys.len() == 1) {
continue;
}
i
},
Err(i) => i
};
if(left == right) {
continue;
}
match pivot {
0 => chunks_new.push((&keys[..], mid + 1, right)),
i if i == keys.len() => chunks_new.push((&keys[..], left, mid)),
i => {
chunks_new.push((&keys[..=i], left, mid));
if(mid < right) {
chunks_new.push((&keys[i..], mid + 1, right));
}
}
};
}
std::mem::swap(&mut chunks, &mut chunks_new);
}
Ok(results)
}
#[inline]
async fn read_value(&mut self) -> Result<Option<T>, Error<K>> {
let size = match AsyncReadBytesExt::read_u32::<NativeEndian>(&mut self.file).await {
Ok(v) => v as usize,
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => return Ok(None),
_ => return Err(e.into())
}
};
let mut buf = vec![0u8; size];
match self.file.read_exact(&mut buf).await {
Ok(read_count) => if(read_count < size) {
return Ok(None);
},
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => return Ok(None),
_ => return Err(e.into())
}
};
Ok(Some(deserialize(&buf)?))
}
#[inline]
async fn read_value_at(&mut self, position: u64) -> Result::<T, Error<K>> {
self.file.seek(SeekFrom::Start(position)).await?;
match self.read_value().await? {
Some(v) => Ok(v),
None => Err(Error::MissingValue)
}
}
#[inline]
pub async fn is_valid(&mut self) -> Result<bool, Error<K>> {
Ok(
self.header.format_version == CURRENT_FORMAT_VERSION &&
self.header.completed_writing
)
}
#[instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display()))]
#[inline]
pub async fn stream_values(&self) -> Result<impl Stream<Item=Result<T, Error<K>>>, Error<K>> {
let start_pos = self.data_start_position();
let pair_count = self.header.pair_count as usize;
let mut file = tokio::fs::OpenOptions::new()
.read(true)
.write(false)
.create(false)
.append(false)
.open(&self.path)
.await?;
file.seek(SeekFrom::Start(start_pos)).await?;
let stream = stream::ValueStream::new(file).take(pair_count);
Ok(stream)
}
#[cfg_attr(feature = "full-instrumentation", instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display())))]
#[inline]
pub async fn get(&mut self, key: K) -> Result<T, Error<K>> {
let position = self.index_cache.get(&key);
let position = match position {
Some(v) => *v,
None => {
let index = match self.find_index_entry(key).await? {
Some(v) => v,
None => return Err(Error::NotFound(key))
};
self.index_cache.insert(key, index.position);
index.position
}
};
self.read_value_at(position).await
}
#[instrument(err, level = "debug", skip(self), fields(indexkv.path = %self.path.display()))]
pub async fn get_many(&mut self, keys_in: &[K]) -> Result<HashMap<K, T, fnv::FnvBuildHasher>, Error<K>> {
let mut results = HashMap::with_capacity_and_hasher(keys_in.len(), fnv::FnvBuildHasher::default());
let mut keys = Vec::with_capacity(keys_in.len());
for key in keys_in {
if let Some(&position) = self.index_cache.get(key) {
let value = self.read_value_at(position).await?;
results.insert(*key, value);
} else {
keys.push(*key);
}
}
keys.sort_unstable();
if(keys.len() == 1) {
let key = keys[0];
if let Some(index) = self.find_index_entry(key).await? {
self.index_cache.insert(key, index.position);
results.insert(key, self.read_value_at(index.position).await?);
}
} else {
let found = self.find_index_entries(&keys).await?;
for (key, index) in found.into_iter() {
self.index_cache.insert(key, index.position);
results.insert(key, self.read_value_at(index.position).await?);
}
}
Ok(results)
}
#[instrument(err, level = "info", skip(self, input_stream, writer, index_size_hint), fields(indexkv.path = %self.path.display()))]
async fn write_to<S, W>(&mut self, mut input_stream: S, writer: &mut W, index_size_hint: usize) -> Result<(), StreamError<S::Error, K>>
where
S: TryStream<Ok=(K, T)> + Unpin + Send,
W: AsyncWrite + AsyncSeek + Unpin + Send,
S::Error: std::error::Error + Send
{
let mut buf = vec![0u8; 1024];
let encoder = codec();
self.header.format_version = CURRENT_FORMAT_VERSION;
self.header.completed_writing = false;
encoder.serialize_into(&mut buf[0..], &self.header)?;
writer.seek(SeekFrom::Start(0)).await?;
writer.write_all(&buf[0..Self::HEADER_SIZE]).await?;
writer.flush().await?;
let mut pos = self.data_start_position();
let mut index = Vec::with_capacity(index_size_hint);
let mut size = 0u32;
let size_of_size = encoder.serialized_size(&size)? as u32;
writer.seek(SeekFrom::Start(pos)).await?;
while let Some((key, value)) = input_stream.try_next().await.map_err(StreamError::from_external)? {
size = encoder.serialized_size(&value)? as u32;
encoder.serialize_into(&mut buf[0..], &size)?;
if((size + size_of_size) as usize > buf.len()) {
buf.resize((size + size_of_size) as usize, 0);
}
encoder.serialize_into(&mut buf[size_of_size as usize..], &value)?;
index.push(IndexEntry::new(key, pos));
pos += writer.write(&buf[0..(size + size_of_size) as usize]).await? as u64;
}
self.header.index_start_pos = pos;
self.header.pair_count = index.len() as u64;
index.sort_unstable();
for entry in index.into_iter() {
encoder.serialize_into(&mut buf[0..], &entry)?;
writer.write_all(&buf[0..Self::INDEX_ENTRY_SIZE]).await?;
}
self.header.completed_writing = true;
encoder.serialize_into(&mut buf[0..], &self.header)?;
writer.seek(SeekFrom::Start(0)).await?;
writer.write_all(&buf[0..Self::HEADER_SIZE]).await?;
writer.flush().await?;
Ok(())
}
#[inline]
pub async fn write<S>(&mut self, input_stream: S, index_size_hint: usize) -> Result<(), StreamError<S::Error, K>>
where
S: TryStream<Ok=(K, T)> + Unpin + Send,
S::Error: std::error::Error + Send
{
self.index_cache.clear();
let mut writer = BufWriter::new(self.file.get_ref().try_clone().await?);
self.write_to(input_stream, &mut writer, index_size_hint).await
}
#[inline]
pub async fn write_infallible<S>(&mut self, input_stream: S, index_size_hint: usize) -> Result<(), Error<K>>
where
S: Stream<Item=(K, T)> + Unpin + Send
{
self.write(input_stream.map(Result::<_, Error<K>>::Ok), index_size_hint).await.map_err(|e| match e {
StreamError::Internal(e) => e,
_ => unreachable!()
})
} }
#[cfg(test)]
mod tests {
use futures::stream;
use rand::prelude::SliceRandom;
use crate::*;
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
struct FixedWidth {
a: u8,
b: i16,
c: u32,
d: u64
}
impl FixedWidth {
fn new(i: u8) -> Self {
Self{a: i, b: i as i16 * 2, c: i as u32 * 4, d: i as u64 * 8}
}
fn new2(i: u8) -> Self {
Self{a: 255 - i, b: -(i as i16) * 2, c: i as u32 * 11, d: i as u64 * 17}
}
}
#[tokio::test]
async fn constant_u8_1() {
let values = vec![42u8];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u8, u8> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as u8, v) })), 0).await.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 42u8);
}
#[tokio::test]
async fn constant_u8_2() {
let values = vec![11u8, 32u8];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<i16, u8> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as i16, v) })), 0).await.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 11u8);
let result = store.get(1).await.unwrap();
assert_eq!(result, 32u8);
}
#[tokio::test]
async fn constant_u8_3() {
let values = vec![47u8, 53u8, 128u8];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u32, u8> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as u32, v) })), 0).await.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 47u8);
let result = store.get(1).await.unwrap();
assert_eq!(result, 53u8);
let result = store.get(2).await.unwrap();
assert_eq!(result, 128u8);
}
#[tokio::test]
async fn constant_u16_1() {
let values = vec![42u16];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<i128, u16> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as i128, v) })), 0).await.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 42u16);
}
#[tokio::test]
async fn constant_u16_2() {
let values = vec![11u16, 32u16];
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<i32, u16> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| { println!("{} -> {}", i, v); (i as i32, v) })), 0).await.unwrap();
let result = store.get(0).await.unwrap();
assert_eq!(result, 11u16);
let result = store.get(1).await.unwrap();
assert_eq!(result, 32u16);
}
#[tokio::test]
async fn fixedwidth() {
let mut values = (0..255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
values.extend((0..255_u8).map(|i| FixedWidth::new2(i)));
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u16, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u16, v))), 512).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u16).collect::<Vec<u16>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn strings() {
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
struct Struct {
number: u8,
name: String,
maybe_name: Option<String>
}
impl Struct {
fn new(i: u8) -> Self {
Self{
number: i,
name: english_numbers::convert_all_fmt(i as i64),
maybe_name: match i % 2 {
0 => Some(english_numbers::convert_long(i as i64, english_numbers::Formatting::none())),
_ => None
}
}
}
}
let values = (0..255_u8).map(|i| Struct::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<i64, Struct> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as i64, v))), 256).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as i64).collect::<Vec<i64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn strings_reopen() {
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
struct Struct {
number: u8,
name: String,
maybe_name: Option<String>
}
impl Struct {
fn new(i: u8) -> Self {
Self{
number: i,
name: english_numbers::convert_all_fmt(i as i64),
maybe_name: match i % 2 {
0 => Some(english_numbers::convert_long(i as i64, english_numbers::Formatting::none())),
_ => None
}
}
}
}
let values = (0..255_u8).map(|i| Struct::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
for _ in 1..4 {
let mut store: Store<u128, Struct> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u128, v))), 256).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u128).collect::<Vec<u128>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
}
#[tokio::test]
async fn smallstream() {
let values = (0..5_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().keep().unwrap().1;
let mut store: Store<i8, FixedWidth> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as i8, v))), 0).await.unwrap();
let mut stream = store.stream_values().await.unwrap().enumerate();
while let Some((i, result)) = stream.next().await {
let value = result.unwrap();
assert_eq!(value, values[i]);
}
assert_eq!(store.get(0).await.unwrap(), values[0]);
assert_eq!(store.get(1).await.unwrap(), values[1]);
tokio::fs::remove_file(path).await.unwrap();
}
#[tokio::test]
async fn stream() {
let mut values = (0..255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
values.extend((0..255_u8).map(|i| FixedWidth::new2(i)));
let path = tempfile::NamedTempFile::new().unwrap().keep().unwrap().1;
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 512).await.unwrap();
let mut stream = store.stream_values().await.unwrap().enumerate();
while let Some((i, result)) = stream.next().await {
let value = result.unwrap();
assert_eq!(value, values[i]);
}
assert_eq!(store.get(0).await.unwrap(), values[0]);
assert_eq!(store.get(1).await.unwrap(), values[1]);
tokio::fs::remove_file(path).await.unwrap();
}
#[tokio::test]
async fn stream_reopen() {
let mut values = (0..255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
values.extend((0..255_u8).map(|i| FixedWidth::new2(i)));
let path = tempfile::NamedTempFile::new().unwrap().keep().unwrap().1;
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 512).await.unwrap();
drop(store);
for iteration in 1..4 {
println!(">>> 1-{}", iteration);
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
let mut stream = store.stream_values().await.unwrap().enumerate();
while let Some((i, result)) = stream.next().await {
let value = result.unwrap();
assert_eq!(value, values[i]);
}
assert_eq!(store.get(0).await.unwrap(), values[0]);
assert_eq!(store.get(1).await.unwrap(), values[1]);
}
for iteration in 1..4 {
println!(">>> 2-{}", iteration);
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
let mut stream = store.stream_values().await.unwrap().enumerate();
while let Some((i, result)) = stream.next().await {
let value = result.unwrap();
assert_eq!(value, values[i]);
}
assert_eq!(store.get(0).await.unwrap(), values[0]);
assert_eq!(store.get(1).await.unwrap(), values[1]);
}
tokio::fs::remove_file(path).await.unwrap();
}
#[tokio::test]
async fn fw_constant() {
let values = (0..5).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 0).await.unwrap();
let indices = (0..values.len() as u64).collect::<Vec<u64>>();
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
#[tokio::test]
async fn fw_prime_5() {
let values = (0..5).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 0).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_13() {
let values = (0..13).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 13).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_41() {
let values = (0..41).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 0).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_47() {
let values = (0..47).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 47).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_53() {
let values = (0..53).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 0).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_53_reopen() {
let values = (0..53).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
for _ in 1..2 {
let mut store: Store<u64, FixedWidth> = Store::new(&path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 53).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
}
#[tokio::test]
async fn get_many() {
let mut values = (0..=255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
values.extend((0..=255_u8).map(|i| FixedWidth::new2(i)));
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<u64, FixedWidth> = Store::new(path).await.unwrap();
store.write_infallible(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v))), 512).await.unwrap();
let mut rng = rand::thread_rng();
let indices = (0..values.len() as u64).collect::<Vec<u64>>();
let counts = [1, 2, 3, 5, 7, 8, 11, 13, 16, 17, 19, 23, 29, 31, 32, 37];
for _ in 0..4 {
for count in &counts {
let is = indices.choose_multiple(&mut rng, *count).map(|i| *i).collect::<Vec<u64>>();
let results = store.get_many(&is).await.unwrap();
assert_eq!(results.len(), is.len());
for (key, value) in results.into_iter() {
assert_eq!(value, values[key as usize]);
}
}
}
} }