pub mod sync;
use futures::prelude::*;
use futures::future;
use std::sync::Arc;
use std::path::PathBuf;
use futures_locks::RwLock;
use crate::storage::{LabelStore, LayerStore, CachedLayerStore};
use crate::storage::memory::{MemoryLabelStore, MemoryLayerStore};
use crate::storage::directory::{DirectoryLabelStore, DirectoryLayerStore};
use crate::layer::{Layer,LayerBuilder,ObjectType,StringTriple,IdTriple,SubjectLookup,ObjectLookup, PredicateLookup};
use std::io;
pub struct StoreLayerBuilder {
builder: RwLock<Option<Box<dyn LayerBuilder>>>,
name: [u32;5],
store: Store
}
impl StoreLayerBuilder {
fn new(store: Store) -> impl Future<Item=Self,Error=io::Error>+Send {
store.layer_store.create_base_layer()
.map(|builder|
Self {
name: builder.name(),
builder: RwLock::new(Some(builder)),
store
})
}
fn wrap(builder: Box<dyn LayerBuilder>, store: Store) -> Self {
StoreLayerBuilder {
name: builder.name(),
builder: RwLock::new(Some(builder)),
store
}
}
fn with_builder<R:Send+Sync,F: FnOnce(&mut Box<dyn LayerBuilder>)->R+Send+Sync>(&self, f: F) -> impl Future<Item=R,Error=io::Error>+Send {
self.builder.write()
.then(|b| {
let mut builder = b.expect("rwlock write should always succeed");
match (*builder).as_mut() {
None => future::err(io::Error::new(io::ErrorKind::InvalidData, "builder has already been committed")),
Some(builder) => future::ok(f(builder))
}
})
}
pub fn name(&self) -> [u32;5] {
self.name
}
pub fn add_string_triple(&self, triple: &StringTriple) -> impl Future<Item=(),Error=io::Error>+Send {
let triple = triple.clone();
self.with_builder(move |b|b.add_string_triple(&triple))
}
pub fn add_id_triple(&self, triple: IdTriple) -> impl Future<Item=bool,Error=io::Error>+Send {
self.with_builder(move |b|b.add_id_triple(triple))
}
pub fn remove_string_triple(&self, triple: &StringTriple) -> impl Future<Item=bool,Error=io::Error>+Send {
let triple = triple.clone();
self.with_builder(move |b|b.remove_string_triple(&triple))
}
pub fn remove_id_triple(&self, triple: IdTriple) -> impl Future<Item=bool,Error=io::Error>+Send {
self.with_builder(move |b|b.remove_id_triple(triple))
}
pub fn commit(&self) -> impl Future<Item=StoreLayer, Error=std::io::Error>+Send {
let store = self.store.clone();
let name = self.name;
self.builder.write()
.then(move |b| {
let mut swap = b.expect("rwlock write should always succeed");
let mut builder = None;
std::mem::swap(&mut builder, &mut swap);
let result: Box<dyn Future<Item=_,Error=_>+Send> =
match builder {
None => Box::new(future::err(io::Error::new(io::ErrorKind::InvalidData, "builder has already been committed"))),
Some(builder) => Box::new(
builder.commit_boxed()
.and_then(move |_| store.layer_store.get_layer(name)
.map(move |layer| StoreLayer::wrap(layer.expect("layer that was just created was not found in store"), store))))
};
result
})
}
}
#[derive(Clone)]
pub struct StoreLayer {
layer: Arc<dyn Layer>,
store: Store
}
impl StoreLayer {
fn wrap(layer: Arc<dyn Layer>, store: Store) -> Self {
StoreLayer {
layer, store
}
}
pub fn open_write(&self) -> impl Future<Item=StoreLayerBuilder,Error=io::Error>+Send {
let store = self.store.clone();
self.store.layer_store.create_child_layer(self.layer.name())
.map(move |layer|StoreLayerBuilder::wrap(layer, store))
}
pub fn parent(&self) -> Option<StoreLayer> {
let parent = self.layer.parent();
parent.map(|p| StoreLayer {
layer: p,
store: self.store.clone()
})
}
}
impl Layer for StoreLayer {
fn name(&self) -> [u32;5] {
self.layer.name()
}
fn parent(&self) -> Option<Arc<dyn Layer>> {
self.layer.parent()
}
fn node_and_value_count(&self) -> usize {
self.layer.node_and_value_count()
}
fn predicate_count(&self) -> usize {
self.layer.predicate_count()
}
fn subject_id(&self, subject: &str) -> Option<u64> {
self.layer.subject_id(subject)
}
fn predicate_id(&self, predicate: &str) -> Option<u64> {
self.layer.predicate_id(predicate)
}
fn object_node_id(&self, object: &str) -> Option<u64> {
self.layer.object_node_id(object)
}
fn object_value_id(&self, object: &str) -> Option<u64> {
self.layer.object_value_id(object)
}
fn id_subject(&self, id: u64) -> Option<String> {
self.layer.id_subject(id)
}
fn id_predicate(&self, id: u64) -> Option<String> {
self.layer.id_predicate(id)
}
fn id_object(&self, id: u64) -> Option<ObjectType> {
self.layer.id_object(id)
}
fn subjects(&self) -> Box<dyn Iterator<Item=Box<dyn SubjectLookup>>> {
self.layer.subjects()
}
fn subject_additions(&self) -> Box<dyn Iterator<Item=Box<dyn SubjectLookup>>> {
self.layer.subject_additions()
}
fn subject_removals(&self) -> Box<dyn Iterator<Item=Box<dyn SubjectLookup>>> {
self.layer.subject_removals()
}
fn lookup_subject(&self, subject: u64) -> Option<Box<dyn SubjectLookup>> {
self.layer.lookup_subject(subject)
}
fn lookup_subject_addition(&self, subject: u64) -> Option<Box<dyn SubjectLookup>> {
self.layer.lookup_subject_addition(subject)
}
fn lookup_subject_removal(&self, subject: u64) -> Option<Box<dyn SubjectLookup>> {
self.layer.lookup_subject_removal(subject)
}
fn objects(&self) -> Box<dyn Iterator<Item=Box<dyn ObjectLookup>>> {
self.layer.objects()
}
fn object_additions(&self) -> Box<dyn Iterator<Item=Box<dyn ObjectLookup>>> {
self.layer.object_additions()
}
fn object_removals(&self) -> Box<dyn Iterator<Item=Box<dyn ObjectLookup>>> {
self.layer.object_removals()
}
fn lookup_object(&self, object: u64) -> Option<Box<dyn ObjectLookup>> {
self.layer.lookup_object(object)
}
fn lookup_object_addition(&self, object: u64) -> Option<Box<dyn ObjectLookup>> {
self.layer.lookup_object_addition(object)
}
fn lookup_object_removal(&self, object: u64) -> Option<Box<dyn ObjectLookup>> {
self.layer.lookup_object_removal(object)
}
fn lookup_predicate(&self, predicate: u64) -> Option<Box<dyn PredicateLookup>> {
self.layer.lookup_predicate(predicate)
}
fn lookup_predicate_addition(&self, predicate: u64) -> Option<Box<dyn PredicateLookup>> {
self.layer.lookup_predicate_addition(predicate)
}
fn lookup_predicate_removal(&self, predicate: u64) -> Option<Box<dyn PredicateLookup>> {
self.layer.lookup_predicate_removal(predicate)
}
fn clone_boxed(&self) -> Box<dyn Layer> {
Box::new(self.clone())
}
}
pub struct NamedGraph {
label: String,
store: Store
}
impl NamedGraph {
fn new(label: String, store: Store) -> Self {
NamedGraph {
label,
store
}
}
pub fn head(&self) -> impl Future<Item=Option<StoreLayer>,Error=io::Error>+Send {
let store = self.store.clone();
store.label_store.get_label(&self.label)
.and_then(move |new_label| {
match new_label {
None => Box::new(future::err(io::Error::new(io::ErrorKind::NotFound, "database not found"))),
Some(new_label) => {
let result: Box<dyn Future<Item=_,Error=_>+Send> =
match new_label.layer {
None => Box::new(future::ok(None)),
Some(layer) => Box::new(store.layer_store.get_layer(layer)
.map(move |layer| layer.map(move |layer|StoreLayer::wrap(layer, store))))
};
result
}
}
})
}
pub fn set_head(&self, layer: &StoreLayer) -> impl Future<Item=bool,Error=io::Error>+Send {
let store = self.store.clone();
let layer_name = layer.name();
let cloned_layer = layer.layer.clone();
store.label_store.get_label(&self.label)
.and_then(move |label| {
let result: Box<dyn Future<Item=_,Error=_>+Send> =
match label {
None => Box::new(future::err(io::Error::new(io::ErrorKind::NotFound, "label not found"))),
Some(label) => Box::new({
let result: Box<dyn Future<Item=_,Error=_>+Send> =
match label.layer {
None => Box::new(future::ok(true)),
Some(layer_name) => Box::new(store.layer_store.get_layer(layer_name)
.map(move |l|l.map(|l|l.is_ancestor_of(&*cloned_layer)).unwrap_or(false)))
};
result
}.and_then(move |b| {
let result: Box<dyn Future<Item=_,Error=_>+Send> =
if b {
Box::new(store.label_store.set_label(&label, layer_name).map(|_|true))
} else {
Box::new(future::ok(false))
};
result
}))
};
result
})
}
}
#[derive(Clone)]
pub struct Store {
label_store: Arc<dyn LabelStore>,
layer_store: Arc<dyn LayerStore>,
}
impl Store {
pub fn new<Labels:'static+LabelStore, Layers:'static+LayerStore>(label_store: Labels, layer_store: Layers) -> Store {
Store {
label_store: Arc::new(label_store),
layer_store: Arc::new(layer_store),
}
}
pub fn create(&self, label: &str) -> impl Future<Item=NamedGraph,Error=std::io::Error>+Send {
let store = self.clone();
self.label_store.create_label(label)
.map(move |label| NamedGraph::new(label.name, store))
}
pub fn open(&self, label: &str) -> impl Future<Item=Option<NamedGraph>,Error=std::io::Error> {
let store = self.clone();
self.label_store.get_label(label)
.map(move |label| label.map(|label|NamedGraph::new(label.name, store)))
}
pub fn create_base_layer(&self) -> impl Future<Item=StoreLayerBuilder,Error=io::Error>+Send {
StoreLayerBuilder::new(self.clone())
}
}
pub fn open_memory_store() -> Store {
Store::new(MemoryLabelStore::new(), CachedLayerStore::new(MemoryLayerStore::new()))
}
pub fn open_directory_store<P:Into<PathBuf>>(path: P) -> Store {
let p = path.into();
Store::new(DirectoryLabelStore::new(p.clone()), CachedLayerStore::new(DirectoryLayerStore::new(p)))
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::runtime::Runtime;
use futures::sync::oneshot;
use tempfile::tempdir;
#[test]
fn create_and_manipulate_memory_database() {
let runtime = Runtime::new().unwrap();
let store = open_memory_store();
let database = oneshot::spawn(store.create("foodb"), &runtime.executor()).wait().unwrap();
let head = oneshot::spawn(database.head(), &runtime.executor()).wait().unwrap();
assert!(head.is_none());
let mut builder = oneshot::spawn(store.create_base_layer(), &runtime.executor()).wait().unwrap();
oneshot::spawn(builder.add_string_triple(&StringTriple::new_value("cow","says","moo")), &runtime.executor()).wait().unwrap();
let layer = oneshot::spawn(builder.commit(), &runtime.executor()).wait().unwrap();
assert!(oneshot::spawn(database.set_head(&layer), &runtime.executor()).wait().unwrap());
builder = oneshot::spawn(layer.open_write(), &runtime.executor()).wait().unwrap();
oneshot::spawn(builder.add_string_triple(&StringTriple::new_value("pig","says","oink")), &runtime.executor()).wait().unwrap();
let layer2 = oneshot::spawn(builder.commit(), &runtime.executor()).wait().unwrap();
assert!(oneshot::spawn(database.set_head(&layer2), &runtime.executor()).wait().unwrap());
let layer2_name = layer2.name();
let layer = oneshot::spawn(database.head(), &runtime.executor()).wait().unwrap().unwrap();
assert_eq!(layer2_name, layer.name());
assert!(layer.string_triple_exists(&StringTriple::new_value("cow","says","moo")));
assert!(layer.string_triple_exists(&StringTriple::new_value("pig","says","oink")));
}
#[test]
fn create_and_manipulate_directory_database() {
let runtime = Runtime::new().unwrap();
let dir = tempdir().unwrap();
let store = open_directory_store(dir.path());
let database = oneshot::spawn(store.create("foodb"), &runtime.executor()).wait().unwrap();
let head = oneshot::spawn(database.head(), &runtime.executor()).wait().unwrap();
assert!(head.is_none());
let mut builder = oneshot::spawn(store.create_base_layer(), &runtime.executor()).wait().unwrap();
oneshot::spawn(builder.add_string_triple(&StringTriple::new_value("cow","says","moo")), &runtime.executor()).wait().unwrap();
let layer = oneshot::spawn(builder.commit(), &runtime.executor()).wait().unwrap();
assert!(oneshot::spawn(database.set_head(&layer), &runtime.executor()).wait().unwrap());
builder = oneshot::spawn(layer.open_write(), &runtime.executor()).wait().unwrap();
oneshot::spawn(builder.add_string_triple(&StringTriple::new_value("pig","says","oink")), &runtime.executor()).wait().unwrap();
let layer2 = oneshot::spawn(builder.commit(), &runtime.executor()).wait().unwrap();
assert!(oneshot::spawn(database.set_head(&layer2), &runtime.executor()).wait().unwrap());
let layer2_name = layer2.name();
let layer = oneshot::spawn(database.head(), &runtime.executor()).wait().unwrap().unwrap();
assert_eq!(layer2_name, layer.name());
assert!(layer.string_triple_exists(&StringTriple::new_value("cow","says","moo")));
assert!(layer.string_triple_exists(&StringTriple::new_value("pig","says","oink")));
}
}