pub mod sync;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use crate::layer::{IdTriple, Layer, LayerBuilder, LayerCounts, ObjectType, StringTriple};
use crate::storage::directory::{DirectoryLabelStore, DirectoryLayerStore};
use crate::storage::memory::{MemoryLabelStore, MemoryLayerStore};
use crate::storage::{CachedLayerStore, LabelStore, LayerStore, LockingHashMapLayerCache};
use std::io;
use rayon;
use rayon::prelude::*;
#[derive(Clone)]
pub struct Store {
label_store: Arc<dyn LabelStore>,
layer_store: Arc<dyn LayerStore>,
}
pub struct StoreLayerBuilder {
parent: Option<Arc<dyn Layer>>,
builder: RwLock<Option<Box<dyn LayerBuilder>>>,
name: [u32; 5],
store: Store,
}
impl StoreLayerBuilder {
async fn new(store: Store) -> io::Result<Self> {
let builder = store.layer_store.create_base_layer().await?;
Ok(Self {
parent: builder.parent(),
name: builder.name(),
builder: RwLock::new(Some(builder)),
store,
})
}
fn wrap(builder: Box<dyn LayerBuilder>, store: Store) -> Self {
StoreLayerBuilder {
parent: builder.parent(),
name: builder.name(),
builder: RwLock::new(Some(builder)),
store,
}
}
fn with_builder<R, F: FnOnce(&mut Box<dyn LayerBuilder>) -> R>(
&self,
f: F,
) -> Result<R, io::Error> {
let mut builder = self
.builder
.write()
.expect("rwlock write should always succeed");
match (*builder).as_mut() {
None => Err(io::Error::new(
io::ErrorKind::InvalidData,
"builder has already been committed",
)),
Some(builder) => Ok(f(builder)),
}
}
pub fn name(&self) -> [u32; 5] {
self.name
}
pub fn parent(&self) -> Option<Arc<dyn Layer>> {
self.parent.clone()
}
pub fn add_string_triple(&self, triple: StringTriple) -> Result<(), io::Error> {
self.with_builder(move |b| b.add_string_triple(triple))
}
pub fn add_id_triple(&self, triple: IdTriple) -> Result<(), io::Error> {
self.with_builder(move |b| b.add_id_triple(triple))
}
pub fn remove_string_triple(&self, triple: StringTriple) -> Result<(), io::Error> {
self.with_builder(move |b| b.remove_string_triple(triple))
}
pub fn remove_id_triple(&self, triple: IdTriple) -> Result<(), io::Error> {
self.with_builder(move |b| b.remove_id_triple(triple))
}
pub fn committed(&self) -> bool {
self.builder
.read()
.expect("rwlock write should always succeed")
.is_none()
}
pub async fn commit_no_load(&self) -> io::Result<()> {
let mut builder = None;
{
let mut guard = self
.builder
.write()
.expect("rwlock write should always succeed");
std::mem::swap(&mut builder, &mut guard);
}
match builder {
None => Err(io::Error::new(
io::ErrorKind::InvalidData,
"builder has already been committed",
)),
Some(builder) => builder.commit_boxed().await,
}
}
pub async fn commit(&self) -> io::Result<StoreLayer> {
let name = self.name;
self.commit_no_load().await?;
let layer = self.store.layer_store.get_layer(name).await?;
Ok(StoreLayer::wrap(
layer.expect("layer that was just created was not found in store"),
self.store.clone(),
))
}
pub fn apply_delta(&self, delta: &StoreLayer) -> Result<(), io::Error> {
rayon::join(
|| {
delta.triple_additions().par_bridge().for_each(|t| {
delta
.id_triple_to_string(&t)
.map(|st| self.add_string_triple(st));
});
},
|| {
delta.triple_removals().par_bridge().for_each(|t| {
delta
.id_triple_to_string(&t)
.map(|st| self.remove_string_triple(st));
})
},
);
Ok(())
}
pub fn apply_diff(&self, other: &StoreLayer) -> Result<(), io::Error> {
rayon::join(
|| {
if let Some(this) = self.parent() {
this.triples().par_bridge().for_each(|t| {
this.id_triple_to_string(&t).map(|st| {
if !other.string_triple_exists(&st) {
self.remove_string_triple(st).unwrap()
};
});
})
};
},
|| {
other.triples().par_bridge().for_each(|t| {
other.id_triple_to_string(&t).map(|st| {
if let Some(this) = self.parent() {
if !this.string_triple_exists(&st) {
self.add_string_triple(st).unwrap()
}
} else {
self.add_string_triple(st).unwrap()
};
});
})
},
);
Ok(())
}
}
#[derive(Clone)]
pub struct StoreLayer {
layer: Arc<dyn Layer>,
store: Store,
}
impl StoreLayer {
fn wrap(layer: Arc<dyn Layer>, store: Store) -> Self {
StoreLayer { layer, store }
}
pub async fn open_write(&self) -> io::Result<StoreLayerBuilder> {
let layer = self
.store
.layer_store
.create_child_layer(self.layer.name())
.await?;
Ok(StoreLayerBuilder::wrap(layer, self.store.clone()))
}
pub async fn parent(&self) -> io::Result<Option<StoreLayer>> {
let parent_name = self.layer.parent_name();
match parent_name {
None => Ok(None),
Some(parent_name) => match self.store.layer_store.get_layer(parent_name).await? {
None => Err(io::Error::new(
io::ErrorKind::NotFound,
"parent layer not found even though it should exist",
)),
Some(layer) => Ok(Some(StoreLayer::wrap(layer, self.store.clone()))),
},
}
}
pub async fn squash(&self) -> io::Result<StoreLayer> {
let new_builder = self.store.create_base_layer().await?;
self.triples().par_bridge().for_each(|t| {
let st = self.id_triple_to_string(&t).unwrap();
new_builder.add_string_triple(st).unwrap()
});
new_builder.commit().await
}
pub async fn rollup(&self) -> io::Result<()> {
let store1 = self.store.layer_store.clone();
let layer_opt = store1.get_layer(self.name()).await?;
let layer = layer_opt.ok_or(io::Error::new(io::ErrorKind::NotFound, "label not found"))?;
let store2 = self.store.layer_store.clone();
store2.rollup(layer).await?;
Ok(())
}
}
impl Layer for StoreLayer {
fn name(&self) -> [u32; 5] {
self.layer.name()
}
fn parent_name(&self) -> Option<[u32; 5]> {
self.layer.parent_name()
}
fn node_and_value_count(&self) -> usize {
self.layer.node_and_value_count()
}
fn predicate_count(&self) -> usize {
self.layer.predicate_count()
}
fn subject_id(&self, subject: &str) -> Option<u64> {
self.layer.subject_id(subject)
}
fn predicate_id(&self, predicate: &str) -> Option<u64> {
self.layer.predicate_id(predicate)
}
fn object_node_id(&self, object: &str) -> Option<u64> {
self.layer.object_node_id(object)
}
fn object_value_id(&self, object: &str) -> Option<u64> {
self.layer.object_value_id(object)
}
fn id_subject(&self, id: u64) -> Option<String> {
self.layer.id_subject(id)
}
fn id_predicate(&self, id: u64) -> Option<String> {
self.layer.id_predicate(id)
}
fn id_object(&self, id: u64) -> Option<ObjectType> {
self.layer.id_object(id)
}
fn triple_exists(&self, subject: u64, predicate: u64, object: u64) -> bool {
self.layer.triple_exists(subject, predicate, object)
}
fn triple_addition_exists(&self, subject: u64, predicate: u64, object: u64) -> bool {
self.layer
.triple_addition_exists(subject, predicate, object)
}
fn triple_removal_exists(&self, subject: u64, predicate: u64, object: u64) -> bool {
self.layer.triple_removal_exists(subject, predicate, object)
}
fn triples(&self) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triples()
}
fn triple_additions(&self) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_additions()
}
fn triple_removals(&self) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_removals()
}
fn triples_s(&self, subject: u64) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triples_s(subject)
}
fn triple_additions_s(&self, subject: u64) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_additions_s(subject)
}
fn triple_removals_s(&self, subject: u64) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_removals_s(subject)
}
fn triples_sp(
&self,
subject: u64,
predicate: u64,
) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triples_sp(subject, predicate)
}
fn triple_additions_sp(
&self,
subject: u64,
predicate: u64,
) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_additions_sp(subject, predicate)
}
fn triple_removals_sp(
&self,
subject: u64,
predicate: u64,
) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_removals_sp(subject, predicate)
}
fn triples_p(&self, predicate: u64) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triples_p(predicate)
}
fn triple_additions_p(&self, predicate: u64) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_additions_p(predicate)
}
fn triple_removals_p(&self, predicate: u64) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_removals_p(predicate)
}
fn triples_o(&self, object: u64) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triples_o(object)
}
fn triple_additions_o(&self, object: u64) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_additions_o(object)
}
fn triple_removals_o(&self, object: u64) -> Box<dyn Iterator<Item = IdTriple> + Send> {
self.layer.triple_removals_o(object)
}
fn clone_boxed(&self) -> Box<dyn Layer> {
Box::new(self.clone())
}
fn triple_layer_addition_count(&self) -> usize {
self.layer.triple_layer_addition_count()
}
fn triple_layer_removal_count(&self) -> usize {
self.layer.triple_layer_removal_count()
}
fn triple_addition_count(&self) -> usize {
self.layer.triple_addition_count()
}
fn triple_removal_count(&self) -> usize {
self.layer.triple_removal_count()
}
fn all_counts(&self) -> LayerCounts {
self.layer.all_counts()
}
}
pub struct NamedGraph {
label: String,
store: Store,
}
impl NamedGraph {
fn new(label: String, store: Store) -> Self {
NamedGraph { label, store }
}
pub fn name(&self) -> &str {
&self.label
}
pub async fn head(&self) -> io::Result<Option<StoreLayer>> {
let new_label = self.store.label_store.get_label(&self.label).await?;
match new_label {
None => Err(io::Error::new(
io::ErrorKind::NotFound,
"database not found",
)),
Some(new_label) => match new_label.layer {
None => Ok(None),
Some(layer) => {
let layer = self.store.layer_store.get_layer(layer).await?;
match layer {
None => Err(io::Error::new(
io::ErrorKind::NotFound,
"layer not found even though it is pointed at by a label",
)),
Some(layer) => Ok(Some(StoreLayer::wrap(layer, self.store.clone()))),
}
}
},
}
}
pub async fn set_head(&self, layer: &StoreLayer) -> io::Result<bool> {
let layer_name = layer.name();
let label = self.store.label_store.get_label(&self.label).await?;
if label.is_none() {
return Err(io::Error::new(io::ErrorKind::NotFound, "label not found"));
}
let label = label.unwrap();
let set_is_ok = match label.layer {
None => true,
Some(retrieved_layer_name) => {
self.store
.layer_store
.layer_is_ancestor_of(layer_name, retrieved_layer_name)
.await?
}
};
if set_is_ok {
self.store.label_store.set_label(&label, layer_name).await?;
}
Ok(set_is_ok)
}
pub async fn force_set_head(&self, layer: &StoreLayer) -> io::Result<bool> {
let layer_name = layer.name();
let label = self.store.label_store.get_label(&self.label).await?;
match label {
None => Err(io::Error::new(io::ErrorKind::NotFound, "label not found")),
Some(label) => {
self.store.label_store.set_label(&label, layer_name).await?;
Ok(true)
}
}
}
}
impl Store {
pub fn new<Labels: 'static + LabelStore, Layers: 'static + LayerStore>(
label_store: Labels,
layer_store: Layers,
) -> Store {
Store {
label_store: Arc::new(label_store),
layer_store: Arc::new(layer_store),
}
}
pub async fn create(&self, label: &str) -> io::Result<NamedGraph> {
let label = self.label_store.create_label(label).await?;
Ok(NamedGraph::new(label.name, self.clone()))
}
pub async fn open(&self, label: &str) -> io::Result<Option<NamedGraph>> {
let label = self.label_store.get_label(label).await?;
Ok(label.map(|label| NamedGraph::new(label.name, self.clone())))
}
pub async fn get_layer_from_id(&self, layer: [u32; 5]) -> io::Result<Option<StoreLayer>> {
let layer = self.layer_store.get_layer(layer).await?;
Ok(layer.map(|layer| StoreLayer::wrap(layer, self.clone())))
}
pub async fn create_base_layer(&self) -> io::Result<StoreLayerBuilder> {
StoreLayerBuilder::new(self.clone()).await
}
pub fn export_layers(&self, layer_ids: Box<dyn Iterator<Item = [u32; 5]>>) -> Vec<u8> {
self.layer_store.export_layers(layer_ids)
}
pub fn import_layers(
&self,
pack: &[u8],
layer_ids: Box<dyn Iterator<Item = [u32; 5]>>,
) -> Result<(), io::Error> {
self.layer_store.import_layers(pack, layer_ids)
}
}
pub fn open_memory_store() -> Store {
Store::new(
MemoryLabelStore::new(),
CachedLayerStore::new(MemoryLayerStore::new(), LockingHashMapLayerCache::new()),
)
}
pub fn open_directory_store<P: Into<PathBuf>>(path: P) -> Store {
let p = path.into();
Store::new(
DirectoryLabelStore::new(p.clone()),
CachedLayerStore::new(DirectoryLayerStore::new(p), LockingHashMapLayerCache::new()),
)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
use tokio::runtime::Runtime;
fn create_and_manipulate_database(mut runtime: Runtime, store: Store) {
let database = runtime.block_on(store.create("foodb")).unwrap();
let head = runtime.block_on(database.head()).unwrap();
assert!(head.is_none());
let mut builder = runtime.block_on(store.create_base_layer()).unwrap();
builder
.add_string_triple(StringTriple::new_value("cow", "says", "moo"))
.unwrap();
let layer = runtime.block_on(builder.commit()).unwrap();
assert!(runtime.block_on(database.set_head(&layer)).unwrap());
builder = runtime.block_on(layer.open_write()).unwrap();
builder
.add_string_triple(StringTriple::new_value("pig", "says", "oink"))
.unwrap();
let layer2 = runtime.block_on(builder.commit()).unwrap();
assert!(runtime.block_on(database.set_head(&layer2)).unwrap());
let layer2_name = layer2.name();
let layer = runtime.block_on(database.head()).unwrap().unwrap();
assert_eq!(layer2_name, layer.name());
assert!(layer.string_triple_exists(&StringTriple::new_value("cow", "says", "moo")));
assert!(layer.string_triple_exists(&StringTriple::new_value("pig", "says", "oink")));
}
#[test]
fn create_and_manipulate_memory_database() {
let runtime = Runtime::new().unwrap();
let store = open_memory_store();
create_and_manipulate_database(runtime, store);
}
#[test]
fn create_and_manipulate_directory_database() {
let runtime = Runtime::new().unwrap();
let dir = tempdir().unwrap();
let store = open_directory_store(dir.path());
create_and_manipulate_database(runtime, store);
}
#[test]
fn create_layer_and_retrieve_it_by_id() {
let mut runtime = Runtime::new().unwrap();
let store = open_memory_store();
let builder = runtime.block_on(store.create_base_layer()).unwrap();
builder
.add_string_triple(StringTriple::new_value("cow", "says", "moo"))
.unwrap();
let layer = runtime.block_on(builder.commit()).unwrap();
let id = layer.name();
let layer2 = runtime
.block_on(store.get_layer_from_id(id))
.unwrap()
.unwrap();
assert!(layer2.string_triple_exists(&StringTriple::new_value("cow", "says", "moo")));
}
#[test]
fn commit_builder_makes_builder_committed() {
let mut runtime = Runtime::new().unwrap();
let store = open_memory_store();
let builder = runtime.block_on(store.create_base_layer()).unwrap();
builder
.add_string_triple(StringTriple::new_value("cow", "says", "moo"))
.unwrap();
assert!(!builder.committed());
runtime.block_on(builder.commit_no_load()).unwrap();
assert!(builder.committed());
}
#[test]
fn hard_reset() {
let mut runtime = Runtime::new().unwrap();
let store = open_memory_store();
let database = runtime.block_on(store.create("foodb")).unwrap();
let builder1 = runtime.block_on(store.create_base_layer()).unwrap();
builder1
.add_string_triple(StringTriple::new_value("cow", "says", "moo"))
.unwrap();
let layer1 = runtime.block_on(builder1.commit()).unwrap();
assert!(runtime.block_on(database.set_head(&layer1)).unwrap());
let builder2 = runtime.block_on(store.create_base_layer()).unwrap();
builder2
.add_string_triple(StringTriple::new_value("duck", "says", "quack"))
.unwrap();
let layer2 = runtime.block_on(builder2.commit()).unwrap();
assert!(runtime.block_on(database.force_set_head(&layer2)).unwrap());
let new_layer = runtime.block_on(database.head()).unwrap().unwrap();
assert!(new_layer.string_triple_exists(&StringTriple::new_value("duck", "says", "quack")));
assert!(!new_layer.string_triple_exists(&StringTriple::new_value("cow", "says", "moo")));
}
#[test]
fn create_two_layers_and_squash() {
let mut runtime = Runtime::new().unwrap();
let store = open_memory_store();
let builder = runtime.block_on(store.create_base_layer()).unwrap();
builder
.add_string_triple(StringTriple::new_value("cow", "says", "moo"))
.unwrap();
let layer = runtime.block_on(builder.commit()).unwrap();
let builder2 = runtime.block_on(layer.open_write()).unwrap();
builder2
.add_string_triple(StringTriple::new_value("dog", "says", "woof"))
.unwrap();
let layer2 = runtime.block_on(builder2.commit()).unwrap();
let new = runtime.block_on(layer2.squash()).unwrap();
assert!(new.string_triple_exists(&StringTriple::new_value("cow", "says", "moo")));
assert!(new.string_triple_exists(&StringTriple::new_value("dog", "says", "woof")));
assert!(runtime.block_on(new.parent()).unwrap().is_none());
}
#[test]
fn apply_a_base_delta() {
let mut runtime = Runtime::new().unwrap();
let store = open_memory_store();
let builder = runtime.block_on(store.create_base_layer()).unwrap();
builder
.add_string_triple(StringTriple::new_value("cow", "says", "moo"))
.unwrap();
let layer = runtime.block_on(builder.commit()).unwrap();
let builder2 = runtime.block_on(layer.open_write()).unwrap();
builder2
.add_string_triple(StringTriple::new_value("dog", "says", "woof"))
.unwrap();
let layer2 = runtime.block_on(builder2.commit()).unwrap();
let delta_builder_1 = runtime.block_on(store.create_base_layer()).unwrap();
delta_builder_1
.add_string_triple(StringTriple::new_value("dog", "says", "woof"))
.unwrap();
delta_builder_1
.add_string_triple(StringTriple::new_value("cat", "says", "meow"))
.unwrap();
let delta_1 = runtime.block_on(delta_builder_1.commit()).unwrap();
let delta_builder_2 = runtime.block_on(delta_1.open_write()).unwrap();
delta_builder_2
.add_string_triple(StringTriple::new_value("crow", "says", "caw"))
.unwrap();
delta_builder_2
.remove_string_triple(StringTriple::new_value("cat", "says", "meow"))
.unwrap();
let delta = runtime.block_on(delta_builder_2.commit()).unwrap();
let rebase_builder = runtime.block_on(layer2.open_write()).unwrap();
let _ = rebase_builder.apply_delta(&delta).unwrap();
let rebase_layer = runtime.block_on(rebase_builder.commit()).unwrap();
assert!(rebase_layer.string_triple_exists(&StringTriple::new_value("cow", "says", "moo")));
assert!(rebase_layer.string_triple_exists(&StringTriple::new_value("crow", "says", "caw")));
assert!(rebase_layer.string_triple_exists(&StringTriple::new_value("dog", "says", "woof")));
assert!(!rebase_layer.string_triple_exists(&StringTriple::new_value("cat", "says", "meow")));
}
}