use super::internal::*;
use super::layer::*;
use crate::storage::*;
use crate::structure::TypedDictEntry;
use std::collections::{HashMap, HashSet};
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use futures::future::Future;
use rayon::prelude::*;
pub trait LayerBuilder: Send + Sync {
fn name(&self) -> [u32; 5];
fn parent(&self) -> Option<Arc<dyn Layer>>;
fn add_value_triple(&mut self, triple: ValueTriple);
fn add_id_triple(&mut self, triple: IdTriple);
fn remove_value_triple(&mut self, triple: ValueTriple);
fn remove_id_triple(&mut self, triple: IdTriple);
fn commit(self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send>>;
fn commit_boxed(self: Box<Self>) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send>>;
}
#[derive(Clone)]
pub struct SimpleLayerBuilder<F: 'static + FileLoad + FileStore + Clone> {
name: [u32; 5],
parent: Option<Arc<dyn Layer>>,
files: LayerFiles<F>,
additions: Vec<ValueTriple>,
id_additions: Vec<IdTriple>,
removals: Vec<ValueTriple>,
id_removals: Vec<IdTriple>,
}
impl<F: 'static + FileLoad + FileStore + Clone> SimpleLayerBuilder<F> {
pub fn new(name: [u32; 5], files: BaseLayerFiles<F>) -> Self {
Self {
name,
parent: None,
files: LayerFiles::Base(files),
additions: Vec::new(),
id_additions: Vec::with_capacity(0),
removals: Vec::new(),
id_removals: Vec::with_capacity(0),
}
}
pub fn from_parent(name: [u32; 5], parent: Arc<dyn Layer>, files: ChildLayerFiles<F>) -> Self {
Self {
name,
parent: Some(parent),
files: LayerFiles::Child(files),
additions: Vec::new(),
id_additions: Vec::new(),
removals: Vec::new(),
id_removals: Vec::new(),
}
}
}
impl<F: 'static + FileLoad + FileStore + Clone> LayerBuilder for SimpleLayerBuilder<F> {
fn name(&self) -> [u32; 5] {
self.name
}
fn parent(&self) -> Option<Arc<dyn Layer>> {
self.parent.clone()
}
fn add_value_triple(&mut self, triple: ValueTriple) {
self.additions.push(triple);
}
fn add_id_triple(&mut self, triple: IdTriple) {
self.id_additions.push(triple);
}
fn remove_value_triple(&mut self, triple: ValueTriple) {
self.removals.push(triple);
}
fn remove_id_triple(&mut self, triple: IdTriple) {
self.id_removals.push(triple);
}
fn commit(self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send>> {
let SimpleLayerBuilder {
name: _,
parent,
files,
additions,
id_additions,
removals,
id_removals,
} = self;
let (mut additions, mut removals) = rayon::join(
|| {
let mut additions: Vec<_> = match parent.as_ref() {
None => additions
.into_iter()
.map(|triple| triple.to_unresolved())
.collect(),
Some(parent) => additions
.into_par_iter()
.map(move |triple| parent.value_triple_to_partially_resolved(triple))
.collect(),
};
additions.extend(id_additions.into_iter().map(|triple| triple.to_resolved()));
additions.par_sort_unstable();
additions.dedup();
additions
},
|| {
let mut removals: Vec<_> = match parent.as_ref() {
None => removals
.into_iter()
.map(|triple| triple.to_unresolved())
.collect(),
Some(parent) => removals
.into_par_iter()
.map(move |triple| parent.value_triple_to_partially_resolved(triple))
.collect(),
};
removals.extend(id_removals.into_iter().map(|triple| triple.to_resolved()));
removals.par_sort_unstable();
removals.dedup();
removals
},
);
zero_equivalents(&mut additions, &mut removals);
if parent.is_some() {
removals
.par_iter_mut()
.for_each(|triple| triple.make_resolved_or_zero())
}
let (unresolved_nodes, unresolved_predicates, unresolved_values) =
collect_unresolved_strings(&additions);
Box::pin(async {
match parent {
Some(parent) => {
let files = files.into_child();
let mut builder =
ChildLayerFileBuilder::from_files(parent.clone(), &files).await?;
let node_ids = builder.add_nodes(unresolved_nodes.clone());
let predicate_ids = builder.add_predicates(unresolved_predicates.clone());
let value_ids = builder.add_values(unresolved_values.clone());
let mut builder = builder.into_phase2().await?;
let counts = parent.all_counts();
let parent_node_offset = counts.node_count as u64 + counts.value_count as u64;
let parent_predicate_offset = counts.predicate_count as u64;
let mut node_map = HashMap::new();
for (node, id) in unresolved_nodes.into_iter().zip(node_ids) {
node_map.insert(node, id + parent_node_offset);
}
let mut predicate_map = HashMap::new();
for (predicate, id) in unresolved_predicates.into_iter().zip(predicate_ids) {
predicate_map.insert(predicate, id + parent_predicate_offset);
}
let mut value_map = HashMap::new();
for (value, id) in unresolved_values.into_iter().zip(value_ids) {
value_map.insert(value, id + parent_node_offset + node_map.len() as u64);
}
let mut add_triples: Vec<_> = additions
.into_iter()
.map(|t| {
t.resolve_with(&node_map, &predicate_map, &value_map)
.expect("triple should have been resolvable")
})
.collect();
add_triples.par_sort_unstable();
let remove_triples: Vec<_> = removals
.into_iter()
.filter_map(|r| r.as_resolved())
.collect();
builder.add_id_triples(add_triples).await?;
builder.remove_id_triples(remove_triples).await?;
builder.finalize().await
}
None => {
let files = files.into_base();
let mut builder = BaseLayerFileBuilder::from_files(&files).await?;
let node_ids = builder.add_nodes(unresolved_nodes.clone());
let predicate_ids = builder.add_predicates(unresolved_predicates.clone());
let value_ids = builder.add_values(unresolved_values.clone());
let mut builder = builder.into_phase2().await?;
let mut node_map = HashMap::new();
for (node, id) in unresolved_nodes.into_iter().zip(node_ids) {
node_map.insert(node, id);
}
let mut predicate_map = HashMap::new();
for (predicate, id) in unresolved_predicates.into_iter().zip(predicate_ids) {
predicate_map.insert(predicate, id);
}
let mut value_map = HashMap::new();
for (value, id) in unresolved_values.into_iter().zip(value_ids) {
value_map.insert(value, id + node_map.len() as u64);
}
let mut add_triples: Vec<_> = additions
.into_iter()
.map(|t| {
t.resolve_with(&node_map, &predicate_map, &value_map)
.expect("triple should have been resolvable")
})
.collect();
add_triples.par_sort_unstable();
builder.add_id_triples(add_triples).await?;
builder.finalize().await
}
}
})
}
fn commit_boxed(self: Box<Self>) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send>> {
let builder = *self;
builder.commit()
}
}
fn zero_equivalents(
additions: &mut [PartiallyResolvedTriple],
removals: &mut [PartiallyResolvedTriple],
) {
let mut removals_iter = removals.iter_mut().peekable();
'outer: for mut addition in additions {
let mut next = removals_iter.peek();
if next == None {
break;
}
if next < Some(&addition) {
loop {
removals_iter.next().unwrap();
next = removals_iter.peek();
if next == None {
break 'outer;
} else if next >= Some(&addition) {
break;
}
}
}
if next == Some(&addition) {
let mut removal = removals_iter.next().unwrap();
addition.subject = PossiblyResolved::Resolved(0);
addition.predicate = PossiblyResolved::Resolved(0);
addition.object = PossiblyResolved::Resolved(0);
removal.subject = PossiblyResolved::Resolved(0);
removal.predicate = PossiblyResolved::Resolved(0);
removal.object = PossiblyResolved::Resolved(0);
}
}
}
fn collect_unresolved_strings(
triples: &[PartiallyResolvedTriple],
) -> (Vec<String>, Vec<String>, Vec<TypedDictEntry>) {
let (unresolved_nodes, (unresolved_predicates, unresolved_values)) = rayon::join(
|| {
let unresolved_nodes_set: HashSet<_> = triples
.par_iter()
.filter_map(|triple| {
let subject = match triple.subject.is_resolved() {
true => None,
false => Some(triple.subject.as_ref().unwrap_unresolved().to_owned()),
};
let object = match triple.object.is_resolved() {
true => None,
false => match triple.object.as_ref().unwrap_unresolved() {
ObjectType::Node(node) => Some(node.to_owned()),
_ => None,
},
};
match (subject, object) {
(Some(subject), Some(object)) => Some(vec![subject, object]),
(Some(subject), _) => Some(vec![subject]),
(_, Some(object)) => Some(vec![object]),
_ => None,
}
})
.flatten()
.collect();
let mut unresolved_nodes: Vec<_> = unresolved_nodes_set.into_iter().collect();
unresolved_nodes.par_sort_unstable();
unresolved_nodes
},
|| {
rayon::join(
|| {
let unresolved_predicates_set: HashSet<_> = triples
.par_iter()
.filter_map(|triple| match triple.predicate.is_resolved() {
true => None,
false => Some(triple.predicate.as_ref().unwrap_unresolved().to_owned()),
})
.collect();
let mut unresolved_predicates: Vec<_> =
unresolved_predicates_set.into_iter().collect();
unresolved_predicates.par_sort_unstable();
unresolved_predicates
},
|| {
let unresolved_values_set: HashSet<_> = triples
.par_iter()
.filter_map(|triple| match triple.object.is_resolved() {
true => None,
false => match triple.object.as_ref().unwrap_unresolved() {
ObjectType::Value(value) => Some(value.to_owned()),
_ => None,
},
})
.collect();
let mut unresolved_values: Vec<_> = unresolved_values_set.into_iter().collect();
unresolved_values.par_sort_unstable();
unresolved_values
},
)
},
);
(unresolved_nodes, unresolved_predicates, unresolved_values)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::layer::internal::InternalLayer;
use crate::storage::memory::*;
use crate::structure::TdbDataType;
fn new_base_files() -> BaseLayerFiles<MemoryBackedStore> {
base_layer_memory_files()
}
fn new_child_files() -> ChildLayerFiles<MemoryBackedStore> {
child_layer_memory_files()
}
async fn example_base_layer() -> Arc<InternalLayer> {
let name = [1, 2, 3, 4, 5];
let files = new_base_files();
let mut builder = SimpleLayerBuilder::new(name, files.clone());
builder.add_value_triple(ValueTriple::new_string_value("cow", "says", "moo"));
builder.add_value_triple(ValueTriple::new_string_value("pig", "says", "oink"));
builder.add_value_triple(ValueTriple::new_string_value("duck", "says", "quack"));
builder.commit().await.unwrap();
let layer = BaseLayer::load_from_files(name, &files).await.unwrap();
Arc::new(layer.into())
}
#[tokio::test]
async fn simple_base_layer_construction() {
let layer = example_base_layer().await;
assert!(layer.value_triple_exists(&ValueTriple::new_string_value("cow", "says", "moo")));
assert!(layer.value_triple_exists(&ValueTriple::new_string_value("pig", "says", "oink")));
assert!(layer.value_triple_exists(&ValueTriple::new_string_value("duck", "says", "quack")));
}
#[tokio::test]
async fn simple_child_layer_construction() {
let base_layer = example_base_layer().await;
let files = new_child_files();
let name = [0, 0, 0, 0, 0];
let mut builder = SimpleLayerBuilder::from_parent(name, base_layer.clone(), files.clone());
builder.add_value_triple(ValueTriple::new_string_value("horse", "says", "neigh"));
builder.add_value_triple(ValueTriple::new_node("horse", "likes", "cow"));
builder.remove_value_triple(ValueTriple::new_string_value("duck", "says", "quack"));
let child_layer = Arc::new(
async {
builder.commit().await?;
ChildLayer::load_from_files(name, base_layer, &files).await
}
.await
.unwrap(),
);
assert!(child_layer
.value_triple_exists(&ValueTriple::new_string_value("horse", "says", "neigh")));
assert!(child_layer.value_triple_exists(&ValueTriple::new_node("horse", "likes", "cow")));
assert!(
child_layer.value_triple_exists(&ValueTriple::new_string_value("cow", "says", "moo"))
);
assert!(
child_layer.value_triple_exists(&ValueTriple::new_string_value("pig", "says", "oink"))
);
assert!(!child_layer
.value_triple_exists(&ValueTriple::new_string_value("duck", "says", "quack")));
}
#[tokio::test]
async fn multi_level_layers() {
let base_layer = example_base_layer().await;
let name2 = [0, 0, 0, 0, 0];
let files2 = new_child_files();
let mut builder =
SimpleLayerBuilder::from_parent(name2, base_layer.clone(), files2.clone());
builder.add_value_triple(ValueTriple::new_string_value("horse", "says", "neigh"));
builder.add_value_triple(ValueTriple::new_node("horse", "likes", "cow"));
builder.remove_value_triple(ValueTriple::new_string_value("duck", "says", "quack"));
builder.commit().await.unwrap();
let layer2: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name2, base_layer, &files2)
.await
.unwrap()
.into(),
);
let name3 = [0, 0, 0, 0, 1];
let files3 = new_child_files();
builder = SimpleLayerBuilder::from_parent(name3, layer2.clone(), files3.clone());
builder.remove_value_triple(ValueTriple::new_node("horse", "likes", "cow"));
builder.add_value_triple(ValueTriple::new_node("horse", "likes", "pig"));
builder.add_value_triple(ValueTriple::new_string_value("duck", "says", "quack"));
builder.commit().await.unwrap();
let layer3: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name3, layer2, &files3)
.await
.unwrap()
.into(),
);
let name4 = [0, 0, 0, 0, 1];
let files4 = new_child_files();
builder = SimpleLayerBuilder::from_parent(name4, layer3.clone(), files4.clone());
builder.remove_value_triple(ValueTriple::new_string_value("pig", "says", "oink"));
builder.add_value_triple(ValueTriple::new_node("cow", "likes", "horse"));
builder.commit().await.unwrap();
let layer4: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name4, layer3, &files4)
.await
.unwrap()
.into(),
);
assert!(layer4.value_triple_exists(&ValueTriple::new_string_value("cow", "says", "moo")));
assert!(layer4.value_triple_exists(&ValueTriple::new_string_value("duck", "says", "quack")));
assert!(
layer4.value_triple_exists(&ValueTriple::new_string_value("horse", "says", "neigh"))
);
assert!(layer4.value_triple_exists(&ValueTriple::new_node("horse", "likes", "pig")));
assert!(layer4.value_triple_exists(&ValueTriple::new_node("cow", "likes", "horse")));
assert!(!layer4.value_triple_exists(&ValueTriple::new_string_value("pig", "says", "oink")));
assert!(!layer4.value_triple_exists(&ValueTriple::new_node("horse", "likes", "cow")));
}
#[tokio::test]
async fn remove_and_add_same_triple_on_base_layer_is_noop() {
let files = new_base_files();
let name = [0, 0, 0, 0, 0];
let mut builder = SimpleLayerBuilder::new(name, files.clone());
builder.remove_value_triple(ValueTriple::new_string_value("crow", "says", "caw"));
builder.add_value_triple(ValueTriple::new_string_value("crow", "says", "caw"));
builder.commit().await.unwrap();
let base_layer: Arc<InternalLayer> = Arc::new(
BaseLayer::load_from_files(name, &files)
.await
.unwrap()
.into(),
);
assert!(
!base_layer.value_triple_exists(&ValueTriple::new_string_value("crow", "says", "caw"))
);
}
#[tokio::test]
async fn add_and_remove_same_triple_on_base_layer_is_noop() {
let files = new_base_files();
let name = [0, 0, 0, 0, 0];
let mut builder = SimpleLayerBuilder::new(name, files.clone());
builder.add_value_triple(ValueTriple::new_string_value("crow", "says", "caw"));
builder.remove_value_triple(ValueTriple::new_string_value("crow", "says", "caw"));
builder.commit().await.unwrap();
let base_layer: Arc<InternalLayer> = Arc::new(
BaseLayer::load_from_files(name, &files)
.await
.unwrap()
.into(),
);
assert!(
!base_layer.value_triple_exists(&ValueTriple::new_string_value("crow", "says", "caw"))
);
}
#[tokio::test]
async fn remove_and_add_same_existing_triple_on_child_layer_is_noop() {
let base_layer = example_base_layer().await;
let files = new_child_files();
let name = [0, 0, 0, 0, 0];
let mut builder = SimpleLayerBuilder::from_parent(name, base_layer.clone(), files.clone());
builder.remove_value_triple(ValueTriple::new_string_value("cow", "says", "moo"));
builder.add_value_triple(ValueTriple::new_string_value("cow", "says", "moo"));
builder.commit().await.unwrap();
let child_layer: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name, base_layer, &files)
.await
.unwrap()
.into(),
);
assert!(
child_layer.value_triple_exists(&ValueTriple::new_string_value("cow", "says", "moo"))
);
}
#[tokio::test]
async fn add_and_remove_same_existing_triple_on_child_layer_is_noop() {
let base_layer = example_base_layer().await;
let files = new_child_files();
let name = [0, 0, 0, 0, 0];
let mut builder = SimpleLayerBuilder::from_parent(name, base_layer.clone(), files.clone());
builder.add_value_triple(ValueTriple::new_string_value("cow", "says", "moo"));
builder.remove_value_triple(ValueTriple::new_string_value("cow", "says", "moo"));
builder.commit().await.unwrap();
let child_layer: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name, base_layer, &files)
.await
.unwrap()
.into(),
);
assert!(
child_layer.value_triple_exists(&ValueTriple::new_string_value("cow", "says", "moo"))
);
}
#[tokio::test]
async fn remove_and_add_same_nonexisting_triple_on_child_layer_is_noop() {
let base_layer = example_base_layer().await;
let files = new_child_files();
let name = [0, 0, 0, 0, 0];
let mut builder = SimpleLayerBuilder::from_parent(name, base_layer.clone(), files.clone());
builder.remove_value_triple(ValueTriple::new_string_value("crow", "says", "caw"));
builder.add_value_triple(ValueTriple::new_string_value("crow", "says", "caw"));
builder.commit().await.unwrap();
let child_layer: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name, base_layer, &files)
.await
.unwrap()
.into(),
);
assert!(
!child_layer.value_triple_exists(&ValueTriple::new_string_value("crow", "says", "caw"))
);
}
#[tokio::test]
async fn add_and_remove_same_nonexisting_triple_on_child_layer_is_noop() {
let base_layer = example_base_layer().await;
let files = new_child_files();
let name = [0, 0, 0, 0, 0];
let mut builder = SimpleLayerBuilder::from_parent(name, base_layer.clone(), files.clone());
builder.add_value_triple(ValueTriple::new_string_value("crow", "says", "caw"));
builder.remove_value_triple(ValueTriple::new_string_value("crow", "says", "caw"));
builder.commit().await.unwrap();
let child_layer: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name, base_layer, &files)
.await
.unwrap()
.into(),
);
assert!(
!child_layer.value_triple_exists(&ValueTriple::new_string_value("crow", "says", "caw"))
);
}
#[tokio::test]
async fn remove_and_add_same_triple_by_id_and_string_on_child_layer_is_noop() {
let base_layer = example_base_layer().await;
let files = new_child_files();
let name = [0, 0, 0, 0, 0];
let node_id = base_layer.subject_id("cow").unwrap();
let predicate_id = base_layer.predicate_id("says").unwrap();
let value_id = base_layer
.object_value_id(&String::make_entry(&"moo"))
.unwrap();
let mut builder = SimpleLayerBuilder::from_parent(name, base_layer.clone(), files.clone());
builder.remove_value_triple(ValueTriple::new_string_value("cow", "says", "moo"));
builder.add_id_triple(IdTriple::new(node_id, predicate_id, value_id));
builder.commit().await.unwrap();
let child_layer: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name, base_layer, &files)
.await
.unwrap()
.into(),
);
assert!(
child_layer.value_triple_exists(&ValueTriple::new_string_value("cow", "says", "moo"))
);
}
#[tokio::test]
async fn remove_and_add_same_triple_by_string_and_id_on_child_layer_is_noop() {
let base_layer = example_base_layer().await;
let files = new_child_files();
let name = [0, 0, 0, 0, 0];
let node_id = base_layer.subject_id("cow").unwrap();
let predicate_id = base_layer.predicate_id("says").unwrap();
let value_id = base_layer
.object_value_id(&String::make_entry(&"moo"))
.unwrap();
let mut builder = SimpleLayerBuilder::from_parent(name, base_layer.clone(), files.clone());
builder.remove_id_triple(IdTriple::new(node_id, predicate_id, value_id));
builder.add_value_triple(ValueTriple::new_string_value("cow", "says", "moo"));
builder.commit().await.unwrap();
let child_layer: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name, base_layer, &files)
.await
.unwrap()
.into(),
);
assert!(
child_layer.value_triple_exists(&ValueTriple::new_string_value("cow", "says", "moo"))
);
}
#[tokio::test]
async fn add_and_remove_same_triple_by_id_and_string_on_child_layer_is_noop() {
let base_layer = example_base_layer().await;
let files = new_child_files();
let name = [0, 0, 0, 0, 0];
let node_id = base_layer.subject_id("cow").unwrap();
let predicate_id = base_layer.predicate_id("says").unwrap();
let value_id = base_layer
.object_value_id(&String::make_entry(&"moo"))
.unwrap();
let mut builder = SimpleLayerBuilder::from_parent(name, base_layer.clone(), files.clone());
builder.add_value_triple(ValueTriple::new_string_value("cow", "says", "moo"));
builder.remove_id_triple(IdTriple::new(node_id, predicate_id, value_id));
builder.commit().await.unwrap();
let child_layer: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name, base_layer, &files)
.await
.unwrap()
.into(),
);
assert!(
child_layer.value_triple_exists(&ValueTriple::new_string_value("cow", "says", "moo"))
);
}
#[tokio::test]
async fn add_and_remove_same_triple_by_string_and_id_on_child_layer_is_noop() {
let base_layer = example_base_layer().await;
let files = new_child_files();
let name = [0, 0, 0, 0, 0];
let node_id = base_layer.subject_id("cow").unwrap();
let predicate_id = base_layer.predicate_id("says").unwrap();
let value_id = base_layer
.object_value_id(&String::make_entry(&"moo"))
.unwrap();
let mut builder = SimpleLayerBuilder::from_parent(name, base_layer.clone(), files.clone());
builder.add_id_triple(IdTriple::new(node_id, predicate_id, value_id));
builder.remove_value_triple(ValueTriple::new_string_value("cow", "says", "moo"));
builder.commit().await.unwrap();
let child_layer: Arc<InternalLayer> = Arc::new(
ChildLayer::load_from_files(name, base_layer, &files)
.await
.unwrap()
.into(),
);
assert!(
child_layer.value_triple_exists(&ValueTriple::new_string_value("cow", "says", "moo"))
);
}
}