#![feature(const_fn)]
#![allow(unused_parens)]
use std::cmp::Ordering;
use std::collections::HashMap;
use std::io::SeekFrom;
use std::mem::size_of;
use std::path::Path;
use std::path::PathBuf;
use async_bincode::AsyncBincodeReader;
use bincode::deserialize;
use bincode::Options;
use futures::stream::Stream;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufWriter;
mod error;
pub use error::Error;
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct IndexEntry {
key: u64,
position: u64
}
impl IndexEntry {
fn new(key: u64, position: u64) -> Self {
Self{
key: key,
position: position
}
}
}
impl Ord for IndexEntry {
fn cmp(&self, other: &Self) -> Ordering {
self.key.cmp(&other.key)
}
}
impl PartialOrd for IndexEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Header {
pair_count: u64,
index_start_pos: u64
}
pub trait Storable: Serialize + DeserializeOwned {}
impl<T> Storable for T where T: Serialize + DeserializeOwned {}
pub struct Store<T: Storable> {
path: PathBuf,
file: File,
header: Header,
index_cache: HashMap<u64, u64>,
_nothing: Option<T>
}
impl<T: Storable> Store<T> {
pub async fn new(path: impl AsRef<Path>) -> Result<Self, Error> {
let fd = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.append(false)
.open(path.as_ref())
.await?;
Ok(Self{
path: path.as_ref().to_path_buf(),
file: fd,
header: Header{
pair_count: 0,
index_start_pos: Self::header_size()
},
index_cache: HashMap::new(),
_nothing: None
})
}
const fn header_size() -> u64 {
size_of::<Header>() as u64
}
const fn index_entry_size() -> u64 {
size_of::<IndexEntry>() as u64
}
fn index_start_position(&self) -> u64 {
self.header.index_start_pos
}
fn data_start_position(&self) -> u64 {
Self::header_size()
}
async fn get_header(&mut self) -> Result<Header, Error> {
let mut buf = [0u8; Self::header_size() as usize];
self.file.seek(SeekFrom::Start(0)).await?;
self.file.read(&mut buf).await?;
Ok(deserialize(&buf)?)
}
async fn read_index_entry(&mut self) -> Result<IndexEntry, Error> {
let mut buf = [0u8; Self::index_entry_size() as usize];
self.file.read(&mut buf).await?;
Ok(deserialize(&buf)?)
}
async fn get_index_entry(&mut self, i: u64) -> Result<IndexEntry, Error> {
self.file.seek(SeekFrom::Start(self.index_start_position() + (i * Self::index_entry_size()))).await?;
let entry = self.read_index_entry().await?;
Ok(entry)
}
async fn find_index_entry(&mut self, key: u64) -> Result<Option<IndexEntry>, Error> {
let mut size = self.header.pair_count / 2;
let mut index = self.header.pair_count / 2;
let mut entry;
let mut tried_last = false;
while(size > 0) {
if(size % 2 == 1) {
size += 1;
}
size /= 2;
if(size == 0 && !tried_last) {
size += 1;
tried_last = true;
}
entry = self.get_index_entry(index).await?;
if(key == entry.key) {
return Ok(Some(entry));
} else if(key < entry.key) {
index -= size;
} else {
index += size;
}
}
Ok(None)
}
async fn read_value(&mut self) -> Result::<T, Error> {
let reader = AsyncBincodeReader::from(self.file.try_clone().await?);
match reader.take(1).err_into().next().await {
Some(v) => v,
None => Err(Error::MissingValue)
}
}
pub async fn stream_values(&self) -> Result<impl Stream<Item=Result<T, Error>>, Error> {
let mut file = tokio::fs::OpenOptions::new()
.read(true)
.write(false)
.create(false)
.append(false)
.open(&self.path)
.await?;
file.seek(SeekFrom::Start(self.data_start_position())).await?;
let stream = AsyncBincodeReader::from(file)
.take(self.header.pair_count as usize)
.map(|r| match r {
Ok(v) => Ok(v),
Err(e) => Err(e.into())
});
Ok(stream)
}
pub async fn get(&mut self, key: u64) -> Result<T, Error> {
let position = match self.index_cache.get(&key) {
Some(v) => *v,
None => {
self.header = self.get_header().await?;
let index = match self.find_index_entry(key).await? {
Some(v) => v,
None => return Err(Error::NotFound(key))
};
index.position
}
};
self.file.seek(SeekFrom::Start(position)).await?;
self.read_value().await
}
pub async fn write(&mut self, mut pairs: impl Stream<Item=(u64, T)> + Unpin) -> Result<(), Error> {
self.index_cache.clear();
let mut buf = vec![0u8; 1024];
let size_config = bincode::DefaultOptions::new()
.with_fixint_encoding()
.with_big_endian();
let value_config = bincode::DefaultOptions::new()
.with_fixint_encoding()
.with_native_endian();
let mut writer = BufWriter::new(self.file.try_clone().await?);
let mut pos = self.data_start_position();
let mut index = Vec::new();
let mut size = 0u32;
let size_size = size_config.serialized_size(&size)? as u32;
self.file.seek(SeekFrom::Start(pos)).await?;
while let Some((key, value)) = pairs.next().await {
size = value_config.serialized_size(&value)? as u32;
size_config.serialize_into(&mut buf[..], &size)?;
if(size as usize > buf.len()) {
buf.resize((size + size_size) as usize, 0);
}
value_config.serialize_into(&mut buf[size_size as usize..], &value)?;
index.push(IndexEntry::new(key, pos as u64));
pos += writer.write(&buf[0..(size + size_size) as usize]).await? as u64;
}
self.header.index_start_pos = pos;
self.header.pair_count = index.len() as u64;
index.sort_unstable();
for entry in index.into_iter() {
value_config.serialize_into(&mut buf[..], &entry)?;
writer.write(&buf[0..Self::index_entry_size() as usize]).await?;
}
value_config.serialize_into(&mut buf[..], &self.header)?;
writer.flush().await?;
self.file.seek(SeekFrom::Start(0)).await?;
self.file.write(&buf[0..Self::header_size() as usize]).await?;
self.file.flush().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use futures::stream;
use rand::prelude::SliceRandom;
use crate::*;
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
struct FixedWidth {
a: u8,
b: i16,
c: u32,
d: u64
}
impl FixedWidth {
fn new(i: u8) -> Self {
Self{a: i, b: i as i16 * 2, c: i as u32 * 4, d: i as u64 * 8}
}
fn new2(i: u8) -> Self {
Self{a: 255 - i, b: -(i as i16) * 2, c: i as u32 * 11, d: i as u64 * 17}
}
}
#[tokio::test]
async fn fixedwidth() {
let mut values = (0..255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
values.extend((0..255_u8).map(|i| FixedWidth::new2(i)));
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<FixedWidth> = Store::new(path).await.unwrap();
store.write(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn strings() {
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
struct Struct {
number: u8,
name: String,
maybe_name: Option<String>
}
impl Struct {
fn new(i: u8) -> Self {
Self{
number: i,
name: english_numbers::convert_all_fmt(i as i64),
maybe_name: match i % 2 {
0 => Some(english_numbers::convert_long(i as i64, english_numbers::Formatting::none())),
_ => None
}
}
}
}
let values = (0..255_u8).map(|i| Struct::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<Struct> = Store::new(path).await.unwrap();
store.write(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn stream() {
let mut values = (0..255_u8).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
values.extend((0..255_u8).map(|i| FixedWidth::new2(i)));
let path = {
let (_, path) = tempfile::NamedTempFile::new().unwrap().keep().unwrap();
path
};
let mut store: Store<FixedWidth> = Store::new(&path).await.unwrap();
store.write(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut stream = store.stream_values().await.unwrap().enumerate();
while let Some((i, result)) = stream.next().await {
let value = result.unwrap();
assert_eq!(value, values[i]);
}
assert_eq!(store.get(0).await.unwrap(), values[0]);
assert_eq!(store.get(1).await.unwrap(), values[1]);
tokio::fs::remove_file(path).await.unwrap();
}
#[tokio::test]
async fn fw_prime_5() {
let mut values = (0..5).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<FixedWidth> = Store::new(path).await.unwrap();
store.write(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_13() {
let mut values = (0..13).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<FixedWidth> = Store::new(path).await.unwrap();
store.write(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_41() {
let mut values = (0..41).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<FixedWidth> = Store::new(path).await.unwrap();
store.write(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_47() {
let mut values = (0..47).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<FixedWidth> = Store::new(path).await.unwrap();
store.write(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
#[tokio::test]
async fn fw_prime_53() {
let mut values = (0..53).map(|i| FixedWidth::new(i)).collect::<Vec<_>>();
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut store: Store<FixedWidth> = Store::new(path).await.unwrap();
store.write(stream::iter(values.clone().into_iter().enumerate().map(|(i, v)| (i as u64, v)))).await.unwrap();
let mut rng = rand::thread_rng();
let mut indices = (0..values.len() as u64).collect::<Vec<u64>>();
for _ in 1..16 {
indices.shuffle(&mut rng);
for i in indices.iter() {
let result = store.get(*i).await.unwrap();
assert_eq!(result, values[*i as usize]);
}
}
}
}