use crate::{
fields::*,
object::{AEADReader, BufferedSink, Pool, Stream, Writer},
tree::CommitId,
};
use serde::{de::DeserializeOwned, Serialize};
pub(crate) type Field = String;
pub(crate) type TransactionPointer = (CommitId, Field, Stream);
pub(crate) type TransactionList = Vec<TransactionPointer>;
pub trait Transaction: Send + Sync + std::io::Write {}
impl<T> Transaction for T where T: Send + Sync + std::io::Write {}
pub trait Index: Send + Sync {
fn store_all(&self) -> anyhow::Result<Vec<Intent<Box<dyn Store>>>>;
fn load_all(&self) -> anyhow::Result<Vec<Intent<Box<dyn Load>>>>;
}
pub trait FieldWriter: Send {
fn write_next(&mut self, obj: impl Serialize + Send);
}
impl<T> FieldWriter for T
where
T: std::io::Write + Send,
{
fn write_next(&mut self, obj: impl Serialize + Send) {
crate::serialize_to_writer(self, &obj).unwrap();
}
}
pub trait FieldReader: Send {
fn read_next<T: DeserializeOwned>(&mut self) -> anyhow::Result<T>;
}
impl<'a, R> FieldReader for crate::Deserializer<R>
where
R: rmp_serde::decode::ReadSlice<'a> + Send,
{
fn read_next<T: DeserializeOwned>(&mut self) -> anyhow::Result<T> {
Ok(T::deserialize(self)?)
}
}
impl<T> IndexExt for T where T: Index {}
pub(crate) trait IndexExt: Index {
fn load_all_from(
&mut self,
full_transaction_list: &TransactionList,
pool: &Pool<AEADReader>,
) -> anyhow::Result<()> {
for action in self.load_all()?.iter_mut() {
let commits_for_field = full_transaction_list
.iter()
.filter(|(_, name, _)| name == &action.name)
.cloned()
.collect::<Vec<_>>();
self.load(commits_for_field, pool, action);
}
Ok(())
}
fn commit<W: Writer + Send + Sync>(
&self,
sink: &mut BufferedSink<W>,
object: &mut dyn Writer,
mut hashed_data: Vec<u8>,
crypto: impl crate::crypto::ICryptoOps,
) -> anyhow::Result<(CommitId, Vec<(Field, Stream)>)> {
let log = self
.store_all()?
.drain(..)
.map(|mut action| (action.name.clone(), self.store(sink, object, &mut action)))
.collect();
hashed_data.extend(crate::serialize_to_vec(&log)?);
let version = crypto.hash(&hashed_data);
Ok((CommitId::from_bytes(version), log))
}
fn store<W: Writer + Send + Sync>(
&self,
index: &mut BufferedSink<W>,
object: &mut dyn Writer,
field: &mut Intent<Box<dyn Store>>,
) -> Stream {
field.strategy.store(index, object);
index.clear().unwrap()
}
fn load(
&self,
commits_for_field: TransactionList,
pool: &Pool<AEADReader>,
field: &mut Intent<Box<dyn Load>>,
) {
field.strategy.load(pool.clone(), commits_for_field);
}
fn select<K>(
&self,
commits_for_field: TransactionList,
pool: &Pool<AEADReader>,
mut field: Intent<Box<impl Query<Key = K>>>,
pred: impl Fn(&K) -> QueryAction,
) {
field.strategy.select(pool.clone(), commits_for_field, pred);
}
}
#[cfg(test)]
pub(crate) mod test {
use crate::{crypto::{Digest, Scheme}, fields::Strategy, index::*, ChunkPointer};
#[macro_export]
macro_rules! len_check_test {
( $t:ty, $strat:ty, $prep:expr, $len:expr ) => {
paste::paste! {
#[test]
fn [<strategy_ $strat:snake>]() {
let store = $t::default();
let load = $t::default();
($prep)(&store);
store_then_load($strat::for_field(&store), $strat::for_field(&load));
assert_eq!(($len)(load), ($len)(store));
}
}
};
}
pub(crate) fn store_then_load<T: Send + Sync, S: Strategy<T> + Store + Load>(
mut store: S,
mut load: S,
) {
use crate::{backends, crypto, object::AEADWriter};
use std::sync::Arc;
let crypto = crypto::UsernamePassword::with_credentials("username".to_owned(), "password".to_owned()).unwrap();
let storage = Arc::new(backends::test::InMemoryBackend::default());
let writer = {
let storage = storage.clone();
let ck = crypto.chunk_key().unwrap();
move || AEADWriter::new(storage.clone(), ck.clone())
};
let reader = {
let storage = storage.clone();
let ck = crypto.chunk_key().unwrap();
move || AEADReader::new(storage.clone(), ck.clone())
};
let object = {
let mut transaction = BufferedSink::new(writer());
Store::store(&mut store, &mut transaction, &mut writer());
transaction.finish().unwrap()
};
Load::load(
&mut load,
Pool::with_constructor(0, reader),
vec![(CommitId::default(), "field name".into(), object)],
);
}
#[test]
fn can_deserialize_fields() {
type ChunkMap = Map<Digest, ChunkPointer>;
let store = ChunkMap::default();
let load = ChunkMap::default();
store.insert(Digest::default(), ChunkPointer::default());
store_then_load(LocalField::for_field(&store), LocalField::for_field(&load));
assert_eq!(load.len(), store.len());
}
}