#![deny(missing_docs)]
use crate::{
crypto::ICryptoOps,
fields::{depth::Depth, Collection, Intent, KeyCachingIterator, Load, Query, QueryAction},
index::{self, Index, IndexExt, TransactionList},
object::{AEADReader, AEADWriter, BlockBuffer, BufferedSink, Pool, PoolRef},
Backend, Key,
};
use anyhow::{Context, Result};
use parking_lot::RwLock;
use serde::{de::DeserializeOwned, Serialize};
use std::{collections::HashMap, ops::Deref, sync::Arc, time::SystemTime};
mod commit;
pub use commit::*;
mod root;
pub(crate) use root::*;
mod sealed_root;
pub enum CommitMode {
Always,
OnlyOnChange,
}
pub struct Infinitree<I, CustomData = ()>
where
CustomData: Serialize + DeserializeOwned + Send + Sync + 'static,
{
root: RootIndex<CustomData>,
index: RwLock<I>,
backend: Arc<dyn Backend>,
commit_filter: CommitFilter,
reader_pool: Pool<AEADReader>,
}
impl<I, CustomData> Drop for Infinitree<I, CustomData>
where
CustomData: Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn drop(&mut self) {
self.backend.sync().unwrap();
}
}
impl<I: Index + Default, CustomData> Infinitree<I, CustomData>
where
CustomData: Serialize + DeserializeOwned + Send + Sync,
{
pub fn empty(
backend: Arc<dyn Backend>,
key: impl Into<Key>,
) -> Result<Infinitree<I, CustomData>> {
Self::with_key(backend, I::default(), key)
}
pub fn open(
backend: Arc<dyn Backend>,
key: impl Into<Key>,
) -> Result<Infinitree<I, CustomData>> {
let key = key.into();
let root = sealed_root::open(BlockBuffer::default(), backend.clone(), key)?;
let chunk_key = root.key.chunk_key()?;
let reader_pool = {
let backend = backend.clone();
Pool::with_constructor(0, move || {
AEADReader::new(backend.clone(), chunk_key.clone())
})
};
Ok(Infinitree {
root,
reader_pool,
backend: backend.clone(),
index: I::default().into(),
commit_filter: CommitFilter::default(),
})
}
}
impl<I: Index, CustomData> Infinitree<I, CustomData>
where
CustomData: Serialize + DeserializeOwned + Send + Sync + Default,
{
pub fn commit(
&mut self,
message: impl Into<Message>,
) -> Result<Option<Arc<Commit<CustomData>>>> {
let metadata = CommitMetadata {
time: SystemTime::now(),
message: message.into().into(),
previous: self.root.commit_list.read().last().map(|c| c.id),
..Default::default()
};
self.commit_with_metadata(metadata, CommitMode::OnlyOnChange)
}
}
impl<I: Index, CustomData> Infinitree<I, CustomData>
where
CustomData: Serialize + DeserializeOwned + Send + Sync,
{
pub fn with_key(
backend: Arc<dyn Backend>,
index: I,
key: impl Into<Key>,
) -> Result<Infinitree<I, CustomData>> {
let key = key.into();
let chunk_key = key.chunk_key()?;
Ok(Infinitree {
backend: backend.clone(),
index: index.into(),
root: RootIndex::uninitialized(key),
commit_filter: CommitFilter::default(),
reader_pool: Pool::with_constructor(0, move || {
AEADReader::new(backend.clone(), chunk_key.clone())
}),
})
}
pub fn reseal(&mut self) -> Result<()> {
sealed_root::commit(&mut self.root, self.backend.clone())?;
Ok(())
}
pub fn commit_list(&self) -> impl Deref<Target = CommitList<CustomData>> + '_ {
self.root.commit_list.read()
}
pub fn filter_commits(&mut self, version: CommitFilter) {
self.commit_filter = version;
}
pub fn commit_with_custom_data(
&mut self,
message: impl Into<Message>,
mode: CommitMode,
custom_data: CustomData,
) -> Result<Option<Arc<Commit<CustomData>>>> {
let metadata = CommitMetadata {
time: SystemTime::now(),
message: message.into().into(),
previous: self.root.commit_list.read().last().map(|c| c.id),
custom_data,
};
self.commit_with_metadata(metadata, mode)
}
pub fn commit_with_metadata(
&mut self,
metadata: CommitMetadata<CustomData>,
mode: CommitMode,
) -> Result<Option<Arc<Commit<CustomData>>>> {
let mut object = self.chunk_writer()?;
let mut sink = BufferedSink::new(self.chunk_writer()?);
let (id, changeset) = self.index.write().commit(
&mut sink,
&mut object,
crate::serialize_to_vec(&metadata)?,
self.root.key.chunk_key()?,
)?;
if let CommitMode::OnlyOnChange = mode {
if changeset.iter().all(|(_, stream)| stream.is_empty()) {
return Ok(self.last_commit());
}
}
{
let mut tr_log = self.root.transaction_log.write();
let size = tr_log.len() + changeset.len();
let history = std::mem::replace(&mut *tr_log, Vec::with_capacity(size));
tr_log.extend(changeset.into_iter().map(|(field, oid)| (id, field, oid)));
tr_log.extend(history);
self.root
.commit_list
.write()
.push(Commit { id, metadata }.into());
}
sealed_root::commit(&mut self.root, self.backend.clone())?;
Ok(self.last_commit())
}
fn chunk_writer(&self) -> Result<AEADWriter> {
Ok(AEADWriter::new(
self.backend.clone(),
self.root.key.chunk_key()?,
))
}
fn chunk_reader(&self) -> Result<PoolRef<AEADReader>> {
Ok(self.reader_pool.lease()?)
}
pub fn storage_writer(&self) -> Result<AEADWriter> {
Ok(AEADWriter::for_storage(
self.backend.clone(),
self.root.key.storage_key()?,
))
}
pub fn storage_reader(&self) -> Result<PoolRef<AEADReader>> {
Ok(PoolRef::without_pool(AEADReader::for_storage(
self.backend(),
self.root.key.storage_key()?,
)))
}
pub fn hasher(&self) -> Result<crate::Hasher> {
Ok(self.root.key.chunk_key()?.hasher())
}
}
impl<I: Index, CustomData> Infinitree<I, CustomData>
where
CustomData: Serialize + DeserializeOwned + Send + Sync,
{
fn last_commit(&self) -> Option<Arc<Commit<CustomData>>> {
self.root.commit_list.read().last().cloned()
}
pub fn load_all(&self) -> Result<()> {
self.index
.write()
.load_all_from(&self.filter_generations(), &self.reader_pool)
}
pub fn load(&self, field: impl Into<Intent<Box<dyn Load>>>) -> Result<()> {
let mut field = field.into();
let commits_for_field = self.field_for_version(&field.name);
field
.strategy
.load(self.reader_pool.clone(), commits_for_field);
Ok(())
}
pub fn query<K>(
&self,
mut field: Intent<Box<impl Query<Key = K>>>,
pred: impl Fn(&K) -> QueryAction,
) -> Result<()> {
let commits_for_field = self.field_for_version(&field.name);
field
.strategy
.select(self.reader_pool.clone(), commits_for_field, pred);
Ok(())
}
pub fn iter<'a, K, O, Q>(
&'a self,
mut field: Intent<Box<Q>>,
pred: impl Fn(&K) -> QueryAction + Send + Sync + 'static,
) -> Result<impl Iterator<Item = O> + Send + Sync + '_>
where
for<'de> Q::Serialized: serde::Deserialize<'de>,
Q: Collection<Key = K, Item = O> + Send + Sync + 'static,
K: Eq + std::hash::Hash + Clone + Send + Sync + 'a,
{
let commits_for_field = self.field_for_version(&field.name);
let transactions =
<Q as Collection>::Depth::resolve(self.reader_pool.clone(), commits_for_field);
KeyCachingIterator::new(
transactions,
self.chunk_reader()?,
pred,
field.strategy.as_mut(),
)
.context("no commits")
}
fn filter_generations(&self) -> TransactionList {
let Some(allowed) = self.apply_commit_filter() else {
return Default::default();
};
self.root
.transaction_log
.read()
.iter()
.filter(|(cid, _, _)| allowed.contains(cid))
.cloned()
.collect()
}
fn apply_commit_filter(&self) -> Option<Vec<CommitId>> {
let clist = self.commit_list();
let commits = clist
.iter()
.cloned()
.map(|c| (c.id, c))
.collect::<HashMap<_, _>>();
let mut list = vec![];
let mut next = match &self.commit_filter {
CommitFilter::Single(id) => return Some(vec![commits.get(id)?.id]),
CommitFilter::All => clist.last(),
CommitFilter::UpTo(id) => commits.get(id),
CommitFilter::Range(_start, end) => commits.get(end),
};
while let Some(current) = next {
list.push(current.id);
next = match &self.commit_filter {
CommitFilter::Range(start, _end) if ¤t.id == start => return Some(list),
CommitFilter::Single(_) => unreachable!(),
_ => current
.metadata
.previous
.as_ref()
.and_then(|id| commits.get(id)),
};
}
Some(list)
}
fn field_for_version(&self, field: &index::Field) -> TransactionList {
self.filter_generations()
.into_iter()
.filter(|(_, name, _)| name == field)
.collect::<Vec<_>>()
}
pub fn index(&self) -> impl Deref<Target = I> + '_ {
self.index.read()
}
pub fn backend(&self) -> Arc<dyn Backend> {
self.backend.clone()
}
pub fn index_object_count(&self) -> usize {
self.root.objects().len()
}
}
impl<I, CustomData> Infinitree<I, CustomData>
where
CustomData: Serialize + DeserializeOwned + Send + Sync,
I: Index + Clone + crate::fields::Store,
{
pub fn root_intent(&self) -> Intent<Box<crate::fields::strategy::LocalField<I>>> {
use crate::fields::strategy::Strategy;
Intent::new(
"root",
Box::new(crate::fields::strategy::LocalField::for_field(
&self.index.read().deref().clone(),
)),
)
}
}
#[cfg(test)]
mod tests {
use super::Infinitree;
use crate::{
backends::test::InMemoryBackend,
crypto::UsernamePassword,
fields::{QueryAction, VersionedMap},
};
use std::sync::Arc;
fn key() -> UsernamePassword {
UsernamePassword::with_credentials("username".to_string(), "password".to_string()).unwrap()
}
fn test_tree_with_multiple_commits() -> Arc<InMemoryBackend> {
let backend = InMemoryBackend::shared();
{
let mut tree =
Infinitree::<VersionedMap<String, String>>::empty(backend.clone(), key()).unwrap();
tree.load_all().unwrap();
tree.index().insert("a".to_string(), "1".to_string());
tree.commit(None).unwrap().unwrap();
}
{
let mut tree =
Infinitree::<VersionedMap<String, String>>::open(backend.clone(), key()).unwrap();
tree.load_all().unwrap();
assert_eq!(tree.index().get("a"), Some("1".to_string().into()));
tree.index()
.update_with("a".to_string(), |_| "2".to_string());
tree.commit(None).unwrap().unwrap();
}
backend
}
#[test]
fn versioned_commits() {
let backend = test_tree_with_multiple_commits();
{
let tree = Infinitree::<VersionedMap<String, String>>::open(backend, key()).unwrap();
tree.load_all().unwrap();
assert_eq!(tree.index().get("a"), Some("2".to_string().into()));
}
}
#[test]
fn versioned_commits_iter() {
let backend = test_tree_with_multiple_commits();
{
let tree = Infinitree::<VersionedMap<String, String>>::open(backend, key()).unwrap();
let mut iter = tree
.iter(tree.root_intent(), |_| QueryAction::Take)
.unwrap();
assert_eq!(
iter.next(),
Some(("a".to_string(), Some("2".to_string().into())))
);
assert_eq!(iter.next(), None);
}
}
#[test]
fn commit_filter_resolution_upto() {
let backend = test_tree_with_multiple_commits();
{
let mut tree =
Infinitree::<VersionedMap<String, String>>::open(backend, key()).unwrap();
let commit = tree.commit_list().first().unwrap().id;
tree.filter_commits(super::CommitFilter::UpTo(commit));
tree.load_all().unwrap();
assert_eq!(tree.index().get("a"), Some("1".to_string().into()));
}
}
#[test]
fn commit_filter_resolution_range() {
let backend = test_tree_with_multiple_commits();
{
let mut tree =
Infinitree::<VersionedMap<String, String>>::open(backend, key()).unwrap();
let start = tree.commit_list().first().unwrap().id;
let end = tree.commit_list().last().unwrap().id;
tree.filter_commits(super::CommitFilter::Range(start, end));
tree.load_all().unwrap();
assert_eq!(tree.index().get("a"), Some("2".to_string().into()));
}
}
#[test]
fn commit_filter_resolution_single() {
let backend = test_tree_with_multiple_commits();
{
let mut tree =
Infinitree::<VersionedMap<String, String>>::open(backend.clone(), key()).unwrap();
tree.load_all().unwrap();
assert_eq!(tree.index().get("a"), Some("2".to_string().into()));
tree.index()
.update_with("a".to_string(), |_| "3".to_string());
tree.commit(None).unwrap().unwrap();
}
{
let mut tree =
Infinitree::<VersionedMap<String, String>>::open(backend, key()).unwrap();
let commit = tree.commit_list().get(1).unwrap().id;
tree.filter_commits(super::CommitFilter::Single(commit));
tree.load_all().unwrap();
assert_eq!(tree.index().get("a"), Some("2".to_string().into()));
}
}
}