use crate::annis::db::annostorage::AnnoStorage;
use crate::annis::db::graphstorage::adjacencylist::AdjacencyListStorage;
use crate::annis::db::graphstorage::registry;
use crate::annis::db::graphstorage::union::UnionEdgeContainer;
use crate::annis::db::graphstorage::EdgeContainer;
use crate::annis::db::graphstorage::{GraphStorage, WriteableGraphStorage};
use crate::annis::db::update::{GraphUpdate, UpdateEvent};
use crate::annis::dfs::CycleSafeDFS;
use crate::annis::errors::*;
use crate::annis::types::AnnoKey;
use crate::annis::types::{AnnoKeyID, Annotation, Component, ComponentType, Edge, NodeID};
use crate::malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use bincode;
use rayon::prelude::*;
use rustc_hash::FxHashSet;
use serde;
use std;
use std::collections::BTreeMap;
use std::io::prelude::*;
use std::iter::FromIterator;
use std::ops::Bound::Included;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::string::ToString;
use std::sync::{Arc, Mutex};
use strum::IntoEnumIterator;
use tempfile;
pub mod annostorage;
pub mod aql;
pub mod corpusstorage;
pub mod exec;
pub mod graphstorage;
mod plan;
pub mod query;
pub mod relannis;
pub mod sort_matches;
pub mod token_helper;
pub mod update;
pub const ANNIS_NS: &str = "annis";
pub const NODE_NAME: &str = "node_name";
pub const TOK: &str = "tok";
pub const NODE_TYPE: &str = "node_type";
#[derive(Debug, Default, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
#[repr(C)]
pub struct Match {
node: NodeID,
anno_key: AnnoKeyID,
}
impl Match {
pub fn get_node(&self) -> NodeID {
self.node
}
pub fn extract_annotation(&self, graph: &Graph) -> Option<Annotation> {
let val = graph
.node_annos
.get_value_for_item_by_id(&self.node, self.anno_key)?
.to_owned();
let key = graph.node_annos.get_key_value(self.anno_key)?;
Some(Annotation { key, val })
}
pub fn different_to_all(&self, other: &Vec<Match>) -> bool {
for o in other.iter() {
if self.node == o.node && self.anno_key == o.anno_key {
return false;
}
}
true
}
pub fn different_to(&self, other: &Match) -> bool {
self.node != other.node || self.anno_key != other.anno_key
}
}
impl Into<Match> for (Edge, AnnoKeyID) {
fn into(self) -> Match {
Match {
node: self.0.source,
anno_key: self.1,
}
}
}
impl Into<Match> for (NodeID, AnnoKeyID) {
fn into(self) -> Match {
Match {
node: self.0,
anno_key: self.1,
}
}
}
#[derive(Clone)]
pub enum ValueSearch<T> {
Any,
Some(T),
NotSome(T),
}
impl<T> From<Option<T>> for ValueSearch<T> {
fn from(orig: Option<T>) -> ValueSearch<T> {
match orig {
None => ValueSearch::Any,
Some(v) => ValueSearch::Some(v),
}
}
}
pub trait AnnotationStorage<T> {
fn get_annotations_for_item(&self, item: &T) -> Vec<Annotation>;
fn number_of_annotations(&self) -> usize;
fn number_of_annotations_by_name(&self, ns: Option<String>, name: String) -> usize;
fn exact_anno_search<'a>(
&'a self,
namespace: Option<String>,
name: String,
value: ValueSearch<String>,
) -> Box<Iterator<Item = Match> + 'a>;
fn regex_anno_search<'a>(
&'a self,
namespace: Option<String>,
name: String,
pattern: &str,
negated: bool,
) -> Box<Iterator<Item = Match> + 'a>;
fn guess_max_count(
&self,
ns: Option<String>,
name: String,
lower_val: &str,
upper_val: &str,
) -> usize;
fn guess_max_count_regex(&self, ns: Option<String>, name: String, pattern: &str) -> usize;
fn get_all_values(&self, key: &AnnoKey, most_frequent_first: bool) -> Vec<&str>;
fn annotation_keys(&self) -> Vec<AnnoKey>;
}
pub struct Graph {
node_annos: Arc<AnnoStorage<NodeID>>,
location: Option<PathBuf>,
components: BTreeMap<Component, Option<Arc<GraphStorage>>>,
current_change_id: u64,
background_persistance: Arc<Mutex<()>>,
cached_size: Mutex<Option<usize>>,
}
impl MallocSizeOf for Graph {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
let mut size = self.node_annos.size_of(ops);
for c in self.components.keys() {
size += c.size_of(ops);
let gs_size = if let Some(gs) = self.get_graphstorage_as_ref(c) {
gs.size_of(ops) + std::mem::size_of::<usize>()
} else {
std::mem::size_of::<usize>()
};
size += gs_size;
}
size
}
}
fn load_component_from_disk(component_path: Option<PathBuf>) -> Result<Arc<GraphStorage>> {
let cpath = r#try!(component_path.ok_or("Can't load component with empty path"));
let impl_path = PathBuf::from(&cpath).join("impl.cfg");
let mut f_impl = std::fs::File::open(impl_path)?;
let mut impl_name = String::new();
f_impl.read_to_string(&mut impl_name)?;
let data_path = PathBuf::from(&cpath).join("component.bin");
let f_data = std::fs::File::open(data_path)?;
let mut buf_reader = std::io::BufReader::new(f_data);
let gs = registry::deserialize(&impl_name, &mut buf_reader)?;
Ok(gs)
}
fn component_to_relative_path(c: &Component) -> PathBuf {
let mut p = PathBuf::new();
p.push("gs");
p.push(c.ctype.to_string());
p.push(if c.layer.is_empty() {
"default_layer"
} else {
&c.layer
});
p.push(&c.name);
p
}
fn save_bincode<T>(location: &Path, path: &str, object: &T) -> Result<()>
where
T: serde::Serialize,
{
let mut full_path = PathBuf::from(location);
full_path.push(path);
let f = std::fs::File::create(full_path)?;
let mut writer = std::io::BufWriter::new(f);
bincode::serialize_into(&mut writer, object)?;
Ok(())
}
impl AnnotationStorage<NodeID> for Graph {
fn get_annotations_for_item(&self, item: &NodeID) -> Vec<Annotation> {
self.node_annos.get_annotations_for_item(item)
}
fn number_of_annotations(&self) -> usize {
self.node_annos.number_of_annotations()
}
fn number_of_annotations_by_name(&self, ns: Option<String>, name: String) -> usize {
self.node_annos.number_of_annotations_by_name(ns, name)
}
fn exact_anno_search<'a>(
&'a self,
namespace: Option<String>,
name: String,
value: ValueSearch<String>,
) -> Box<Iterator<Item = Match> + 'a> {
self.node_annos.exact_anno_search(namespace, name, value)
}
fn regex_anno_search<'a>(
&'a self,
namespace: Option<String>,
name: String,
pattern: &str,
negated: bool,
) -> Box<Iterator<Item = Match> + 'a> {
self.node_annos
.regex_anno_search(namespace, name, pattern, negated)
}
fn guess_max_count(
&self,
ns: Option<String>,
name: String,
lower_val: &str,
upper_val: &str,
) -> usize {
self.node_annos
.guess_max_count(ns, name, lower_val, upper_val)
}
fn guess_max_count_regex(&self, ns: Option<String>, name: String, pattern: &str) -> usize {
self.node_annos.guess_max_count_regex(ns, name, pattern)
}
fn get_all_values(&self, key: &AnnoKey, most_frequent_first: bool) -> Vec<&str> {
self.node_annos.get_all_values(key, most_frequent_first)
}
fn annotation_keys(&self) -> Vec<AnnoKey> {
self.node_annos.annotation_keys()
}
}
impl Graph {
fn new() -> Graph {
Graph {
node_annos: Arc::new(AnnoStorage::<NodeID>::new()),
components: BTreeMap::new(),
location: None,
current_change_id: 0,
background_persistance: Arc::new(Mutex::new(())),
cached_size: Mutex::new(None),
}
}
fn with_default_graphstorages() -> Result<Graph> {
let mut db = Graph::new();
db.get_or_create_writable(&Component {
ctype: ComponentType::Coverage,
layer: ANNIS_NS.to_owned(),
name: "".to_owned(),
})?;
db.get_or_create_writable(&Component {
ctype: ComponentType::Ordering,
layer: ANNIS_NS.to_owned(),
name: "".to_owned(),
})?;
db.get_or_create_writable(&Component {
ctype: ComponentType::LeftToken,
layer: ANNIS_NS.to_owned(),
name: "".to_owned(),
})?;
db.get_or_create_writable(&Component {
ctype: ComponentType::RightToken,
layer: ANNIS_NS.to_owned(),
name: "".to_owned(),
})?;
db.get_or_create_writable(&Component {
ctype: ComponentType::PartOf,
layer: ANNIS_NS.to_owned(),
name: "".to_owned(),
})?;
Ok(db)
}
fn set_location(&mut self, location: &Path) -> Result<()> {
self.location = Some(PathBuf::from(location));
Ok(())
}
fn clear(&mut self) {
self.reset_cached_size();
self.node_annos = Arc::new(AnnoStorage::new());
self.components.clear();
}
fn load_from(&mut self, location: &Path, preload: bool) -> Result<()> {
info!("Loading corpus from {}", location.to_string_lossy());
self.clear();
let location = PathBuf::from(location);
self.set_location(location.as_path())?;
let backup = location.join("backup");
let mut backup_was_loaded = false;
let dir2load = if backup.exists() && backup.is_dir() {
backup_was_loaded = true;
backup.clone()
} else {
location.join("current")
};
let mut node_annos_tmp: AnnoStorage<NodeID> = AnnoStorage::new();
node_annos_tmp.load_from_file(&dir2load.join("nodes_v1.bin").to_string_lossy())?;
self.node_annos = Arc::from(node_annos_tmp);
let log_path = dir2load.join("update_log.bin");
let logfile_exists = log_path.exists() && log_path.is_file();
self.find_components_from_disk(&dir2load)?;
if preload | logfile_exists | backup_was_loaded {
self.ensure_loaded_all()?;
}
if logfile_exists {
let f_log = std::fs::File::open(log_path)?;
let mut buf_reader = std::io::BufReader::new(f_log);
let mut update: GraphUpdate = bincode::deserialize_from(&mut buf_reader)?;
if update.get_last_consistent_change_id() > self.current_change_id {
self.apply_update_in_memory(&mut update)?;
}
} else {
self.current_change_id = 0;
}
if backup_was_loaded {
self.save_to(&location.join("current"))?;
let tmp_dir = tempfile::Builder::new()
.prefix("temporary-graphannis-backup")
.tempdir_in(location)?;
std::fs::rename(&backup, tmp_dir.path())?;
tmp_dir.close()?;
}
Ok(())
}
fn find_components_from_disk(&mut self, location: &Path) -> Result<()> {
self.components.clear();
for c in ComponentType::iter() {
let cpath = PathBuf::from(location).join("gs").join(c.to_string());
if cpath.is_dir() {
for layer in cpath.read_dir()? {
let layer = layer?;
if layer.path().is_dir() {
let empty_name_component = Component {
ctype: c.clone(),
layer: layer.file_name().to_string_lossy().to_string(),
name: String::from(""),
};
{
let input_file = PathBuf::from(location)
.join(component_to_relative_path(&empty_name_component))
.join("component.bin");
if input_file.is_file() {
self.components.insert(empty_name_component.clone(), None);
debug!("Registered component {}", empty_name_component);
}
}
for name in layer.path().read_dir()? {
let name = name?;
let named_component = Component {
ctype: c.clone(),
layer: layer.file_name().to_string_lossy().to_string(),
name: name.file_name().to_string_lossy().to_string(),
};
let data_file = PathBuf::from(location)
.join(component_to_relative_path(&named_component))
.join("component.bin");
let cfg_file = PathBuf::from(location)
.join(component_to_relative_path(&named_component))
.join("impl.cfg");
if data_file.is_file() && cfg_file.is_file() {
self.components.insert(named_component.clone(), None);
debug!("Registered component {}", named_component);
}
}
}
}
}
}
Ok(())
}
fn internal_save(&self, location: &Path) -> Result<()> {
let location = PathBuf::from(location);
std::fs::create_dir_all(&location)?;
save_bincode(&location, "nodes_v1.bin", self.node_annos.as_ref())?;
for (c, e) in &self.components {
if let Some(ref data) = *e {
let dir = PathBuf::from(&location).join(component_to_relative_path(c));
std::fs::create_dir_all(&dir)?;
let data_path = PathBuf::from(&dir).join("component.bin");
let f_data = std::fs::File::create(&data_path)?;
let mut writer = std::io::BufWriter::new(f_data);
let impl_name = registry::serialize(&data, &mut writer)?;
let cfg_path = PathBuf::from(&dir).join("impl.cfg");
let mut f_cfg = std::fs::File::create(cfg_path)?;
f_cfg.write_all(impl_name.as_bytes())?;
}
}
Ok(())
}
fn save_to(&mut self, location: &Path) -> Result<()> {
self.ensure_loaded_all()?;
self.internal_save(&location.join("current"))
}
fn persist_to(&mut self, location: &Path) -> Result<()> {
self.set_location(location)?;
self.internal_save(&location.join("current"))
}
fn apply_update_in_memory(&mut self, u: &mut GraphUpdate) -> Result<()> {
self.reset_cached_size();
let mut invalid_nodes: FxHashSet<NodeID> = FxHashSet::default();
let all_components = self.get_all_components(None, None);
let mut text_coverage_components = FxHashSet::default();
text_coverage_components
.extend(self.get_all_components(Some(ComponentType::Dominance), Some("")));
text_coverage_components
.extend(self.get_all_components(Some(ComponentType::Coverage), None));
for (id, change) in u.consistent_changes() {
trace!("applying event {:?}", &change);
match change {
UpdateEvent::AddNode {
node_name,
node_type,
} => {
let existing_node_id = self.get_node_id_from_name(&node_name);
if existing_node_id.is_none() {
let new_node_id: NodeID =
if let Some(id) = self.node_annos.get_largest_item() {
id + 1
} else {
0
};
let new_anno_name = Annotation {
key: self.get_node_name_key(),
val: node_name,
};
let new_anno_type = Annotation {
key: self.get_node_type_key(),
val: node_type,
};
let node_annos = Arc::make_mut(&mut self.node_annos);
node_annos.insert(new_node_id, new_anno_name);
node_annos.insert(new_node_id, new_anno_type);
}
}
UpdateEvent::DeleteNode { node_name } => {
if let Some(existing_node_id) = self.get_node_id_from_name(&node_name) {
if !invalid_nodes.contains(&existing_node_id) {
self.extend_parent_text_coverage_nodes(
existing_node_id,
&text_coverage_components,
&mut invalid_nodes,
);
}
{
let node_annos = Arc::make_mut(&mut self.node_annos);
for a in node_annos.get_annotations_for_item(&existing_node_id) {
node_annos.remove_annotation_for_item(&existing_node_id, &a.key);
}
}
for c in all_components.iter() {
if let Ok(gs) = self.get_or_create_writable(c) {
gs.delete_node(&existing_node_id);
}
}
}
}
UpdateEvent::AddNodeLabel {
node_name,
anno_ns,
anno_name,
anno_value,
} => {
if let Some(existing_node_id) = self.get_node_id_from_name(&node_name) {
let anno = Annotation {
key: AnnoKey {
ns: anno_ns,
name: anno_name,
},
val: anno_value,
};
Arc::make_mut(&mut self.node_annos).insert(existing_node_id, anno);
}
}
UpdateEvent::DeleteNodeLabel {
node_name,
anno_ns,
anno_name,
} => {
if let Some(existing_node_id) = self.get_node_id_from_name(&node_name) {
let key = AnnoKey {
ns: anno_ns,
name: anno_name,
};
Arc::make_mut(&mut self.node_annos)
.remove_annotation_for_item(&existing_node_id, &key);
}
}
UpdateEvent::AddEdge {
source_node,
target_node,
layer,
component_type,
component_name,
} => {
if let (Some(source), Some(target)) = (
self.get_node_id_from_name(&source_node),
self.get_node_id_from_name(&target_node),
) {
if let Ok(ctype) = ComponentType::from_str(&component_type) {
let c = Component {
ctype,
layer,
name: component_name,
};
let gs = self.get_or_create_writable(&c)?;
gs.add_edge(Edge { source, target });
if (c.ctype == ComponentType::Dominance
|| c.ctype == ComponentType::Coverage)
&& c.name.is_empty()
{
text_coverage_components.insert(c.clone());
}
if c.ctype == ComponentType::Coverage
|| c.ctype == ComponentType::Dominance
|| c.ctype == ComponentType::Ordering
|| c.ctype == ComponentType::LeftToken
|| c.ctype == ComponentType::RightToken
{
self.extend_parent_text_coverage_nodes(
source,
&text_coverage_components,
&mut invalid_nodes,
);
}
if c.ctype == ComponentType::Ordering {
self.extend_parent_text_coverage_nodes(
target,
&text_coverage_components,
&mut invalid_nodes,
);
}
}
}
}
UpdateEvent::DeleteEdge {
source_node,
target_node,
layer,
component_type,
component_name,
} => {
if let (Some(source), Some(target)) = (
self.get_node_id_from_name(&source_node),
self.get_node_id_from_name(&target_node),
) {
if let Ok(ctype) = ComponentType::from_str(&component_type) {
let c = Component {
ctype,
layer,
name: component_name,
};
if c.ctype == ComponentType::Coverage
|| c.ctype == ComponentType::Dominance
|| c.ctype == ComponentType::Ordering
|| c.ctype == ComponentType::LeftToken
|| c.ctype == ComponentType::RightToken
{
self.extend_parent_text_coverage_nodes(
source,
&text_coverage_components,
&mut invalid_nodes,
);
}
if c.ctype == ComponentType::Ordering {
self.extend_parent_text_coverage_nodes(
target,
&text_coverage_components,
&mut invalid_nodes,
);
}
let gs = self.get_or_create_writable(&c)?;
gs.delete_edge(&Edge { source, target });
}
}
}
UpdateEvent::AddEdgeLabel {
source_node,
target_node,
layer,
component_type,
component_name,
anno_ns,
anno_name,
anno_value,
} => {
if let (Some(source), Some(target)) = (
self.get_node_id_from_name(&source_node),
self.get_node_id_from_name(&target_node),
) {
if let Ok(ctype) = ComponentType::from_str(&component_type) {
let c = Component {
ctype,
layer,
name: component_name,
};
let gs = self.get_or_create_writable(&c)?;
let e = Edge { source, target };
if gs.is_connected(&source, &target, 1, Included(1)) {
let anno = Annotation {
key: AnnoKey {
ns: anno_ns,
name: anno_name,
},
val: anno_value,
};
gs.add_edge_annotation(e, anno);
}
}
}
}
UpdateEvent::DeleteEdgeLabel {
source_node,
target_node,
layer,
component_type,
component_name,
anno_ns,
anno_name,
} => {
if let (Some(source), Some(target)) = (
self.get_node_id_from_name(&source_node),
self.get_node_id_from_name(&target_node),
) {
if let Ok(ctype) = ComponentType::from_str(&component_type) {
let c = Component {
ctype,
layer,
name: component_name,
};
let gs = self.get_or_create_writable(&c)?;
let e = Edge { source, target };
if gs.is_connected(&source, &target, 1, Included(1)) {
let key = AnnoKey {
ns: anno_ns,
name: anno_name,
};
gs.delete_edge_annotation(&e, &key);
}
}
}
}
}
self.current_change_id = id;
}
if let Some(gs_order) = self.get_graphstorage(&Component {
ctype: ComponentType::Ordering,
layer: ANNIS_NS.to_owned(),
name: "".to_owned(),
}) {
self.reindex_inherited_coverage(invalid_nodes, gs_order)?;
}
Ok(())
}
fn extend_parent_text_coverage_nodes(
&self,
node: NodeID,
text_coverage_components: &FxHashSet<Component>,
invalid_nodes: &mut FxHashSet<NodeID>,
) {
let containers: Vec<&EdgeContainer> = text_coverage_components
.iter()
.filter_map(|c| self.get_graphstorage_as_ref(c))
.map(|gs| gs.as_edgecontainer())
.collect();
let union = UnionEdgeContainer::new(containers);
let dfs = CycleSafeDFS::new_inverse(&union, node, 0, usize::max_value());
for step in dfs {
invalid_nodes.insert(step.node);
}
}
fn reindex_inherited_coverage(
&mut self,
invalid_nodes: FxHashSet<NodeID>,
gs_order: Arc<GraphStorage>,
) -> Result<()> {
{
let gs_left = self.get_or_create_writable(&Component {
ctype: ComponentType::LeftToken,
name: "".to_owned(),
layer: ANNIS_NS.to_owned(),
})?;
for n in invalid_nodes.iter() {
gs_left.delete_node(n);
}
let gs_right = self.get_or_create_writable(&Component {
ctype: ComponentType::RightToken,
name: "".to_owned(),
layer: ANNIS_NS.to_owned(),
})?;
for n in invalid_nodes.iter() {
gs_right.delete_node(n);
}
let gs_cov = self.get_or_create_writable(&Component {
ctype: ComponentType::Coverage,
name: "inherited-coverage".to_owned(),
layer: ANNIS_NS.to_owned(),
})?;
for n in invalid_nodes.iter() {
gs_cov.delete_node(n);
}
}
let all_cov_components = self.get_all_components(Some(ComponentType::Coverage), None);
let all_dom_gs: Vec<Arc<GraphStorage>> = self
.get_all_components(Some(ComponentType::Dominance), Some(""))
.into_iter()
.filter_map(|c| self.get_graphstorage(&c))
.collect();
{
let all_cov_gs: Vec<Arc<GraphStorage>> = all_cov_components
.iter()
.filter_map(|c| self.get_graphstorage(c))
.collect();
for n in invalid_nodes.iter() {
self.calculate_token_alignment(
*n,
ComponentType::LeftToken,
gs_order.as_ref(),
&all_cov_gs,
&all_dom_gs,
);
self.calculate_token_alignment(
*n,
ComponentType::RightToken,
gs_order.as_ref(),
&all_cov_gs,
&all_dom_gs,
);
}
}
for n in invalid_nodes.iter() {
self.calculate_inherited_coverage_edges(*n, &all_cov_components, &all_dom_gs);
}
Ok(())
}
fn calculate_inherited_coverage_edges(
&mut self,
n: NodeID,
all_cov_components: &Vec<Component>,
all_dom_gs: &Vec<Arc<GraphStorage>>,
) -> FxHashSet<NodeID> {
let mut covered_token = FxHashSet::default();
for c in all_cov_components.iter() {
if let Some(gs) = self.get_graphstorage_as_ref(c) {
covered_token.extend(gs.find_connected(n, 1, std::ops::Bound::Included(1)));
}
}
if covered_token.is_empty() {
if self
.node_annos
.get_value_for_item(&n, &self.get_token_key())
.is_some()
{
covered_token.insert(n);
} else {
for dom_gs in all_dom_gs {
for out in dom_gs.get_outgoing_edges(n) {
covered_token.extend(self.calculate_inherited_coverage_edges(
out,
all_cov_components,
all_dom_gs,
));
}
}
}
}
if let Ok(gs_cov) = self.get_or_create_writable(&Component {
ctype: ComponentType::Coverage,
name: "inherited-coverage".to_owned(),
layer: ANNIS_NS.to_owned(),
}) {
for t in covered_token.iter() {
gs_cov.add_edge(Edge {
source: n,
target: *t,
});
}
}
covered_token
}
fn calculate_token_alignment(
&mut self,
n: NodeID,
ctype: ComponentType,
gs_order: &GraphStorage,
all_cov_gs: &Vec<Arc<GraphStorage>>,
all_dom_gs: &Vec<Arc<GraphStorage>>,
) -> Option<NodeID> {
let alignment_component = Component {
ctype: ctype.clone(),
name: "".to_owned(),
layer: ANNIS_NS.to_owned(),
};
if self
.node_annos
.get_value_for_item(&n, &self.get_token_key())
.is_some()
{
let mut is_token = true;
for gs_coverage in all_cov_gs.iter() {
if gs_coverage.get_outgoing_edges(n).next().is_some() {
is_token = false;
break;
}
}
if is_token {
return Some(n);
}
}
let existing = self
.get_graphstorage_as_ref(&alignment_component)?
.get_outgoing_edges(n)
.next();
if let Some(existing) = existing {
return Some(existing);
}
let mut candidates = FxHashSet::default();
for gs_for_component in all_dom_gs.iter().chain(all_cov_gs.iter()) {
for target in gs_for_component.get_outgoing_edges(n) {
let candidate_for_target = self.calculate_token_alignment(
target,
ctype.clone(),
gs_order,
all_cov_gs,
all_dom_gs,
)?;
candidates.insert(candidate_for_target);
}
}
let mut candidates = Vec::from_iter(candidates.into_iter());
candidates.sort_unstable_by(move |a, b| {
if a == b {
return std::cmp::Ordering::Equal;
}
if gs_order.is_connected(&a, &b, 1, std::ops::Bound::Unbounded) {
return std::cmp::Ordering::Less;
} else if gs_order.is_connected(&b, &a, 1, std::ops::Bound::Unbounded) {
return std::cmp::Ordering::Greater;
}
return std::cmp::Ordering::Equal;
});
let t = if ctype == ComponentType::RightToken {
candidates.last()
} else {
candidates.first()
};
if let Some(t) = t {
let gs = self.get_or_create_writable(&alignment_component).ok()?;
let e = Edge {
source: n,
target: *t,
};
gs.add_edge(e);
return Some(*t);
} else {
return None;
}
}
fn apply_update(&mut self, u: &mut GraphUpdate) -> Result<()> {
trace!("applying updates");
if !u.is_consistent() {
u.finish();
}
self.ensure_loaded_all()?;
let result = self.apply_update_in_memory(u);
trace!("memory updates completed");
if let Some(location) = self.location.clone() {
trace!("output location for persisting updates is {:?}", location);
if result.is_ok() {
let current_path = location.join("current");
std::fs::create_dir_all(¤t_path)?;
let log_path = current_path.join("update_log.bin");
trace!("writing WAL update log to {:?}", &log_path);
let f_log = std::fs::File::create(log_path)?;
let mut buf_writer = std::io::BufWriter::new(f_log);
bincode::serialize_into(&mut buf_writer, &u)?;
trace!("finished writing WAL update log");
} else {
trace!("error occured while applying updates: {:?}", &result);
self.load_from(&location, true)?;
return result;
}
}
Ok(())
}
fn background_sync_wal_updates(&self) -> Result<()> {
if let Some(ref location) = self.location {
let _lock = self.background_persistance.lock().unwrap();
if !location.join("backup").exists() {
std::fs::rename(
location.join("current"),
location.join(location.join("backup")),
)?;
}
self.internal_save(&location.join("current"))?;
std::fs::remove_dir_all(location.join("backup"))?;
}
Ok(())
}
fn component_path(&self, c: &Component) -> Option<PathBuf> {
match self.location {
Some(ref loc) => {
let mut p = PathBuf::from(loc);
p.push("current");
p.push(component_to_relative_path(c));
Some(p)
}
None => None,
}
}
fn insert_or_copy_writeable(&mut self, c: &Component) -> Result<()> {
self.reset_cached_size();
let entry = self.components.remove(c);
if entry.is_some() {
let gs_opt = entry.unwrap();
let mut loaded_comp: Arc<GraphStorage> = if gs_opt.is_none() {
load_component_from_disk(self.component_path(c))?
} else {
gs_opt.unwrap()
};
let is_writable = {
Arc::get_mut(&mut loaded_comp)
.ok_or(format!(
"Could not get mutable reference for component {}",
c
))?
.as_writeable()
.is_some()
};
let loaded_comp = if is_writable {
loaded_comp
} else {
let mut gs_copy: AdjacencyListStorage = registry::create_writeable();
gs_copy.copy(&self, loaded_comp.as_ref());
Arc::from(gs_copy)
};
self.components.insert(c.clone(), Some(loaded_comp));
}
Ok(())
}
fn calculate_component_statistics(&mut self, c: &Component) -> Result<()> {
self.reset_cached_size();
let mut result: Result<()> = Ok(());
let mut entry = self
.components
.remove(c)
.ok_or_else(|| format!("Component {} is missing", c.clone()))?;
if let Some(ref mut gs) = entry {
if let Some(gs_mut) = Arc::get_mut(gs) {
if let Some(writeable_gs) = gs_mut.as_writeable() {
writeable_gs.calculate_statistics();
}
} else {
result = Err(format!("Component {} is currently used", c.clone()).into());
}
}
self.components.insert(c.clone(), entry);
result
}
fn get_or_create_writable(&mut self, c: &Component) -> Result<&mut WriteableGraphStorage> {
self.reset_cached_size();
if self.components.contains_key(c) {
self.insert_or_copy_writeable(c)?;
} else {
let w = registry::create_writeable();
self.components.insert(c.clone(), Some(Arc::from(w)));
}
let entry: &mut Arc<GraphStorage> = self
.components
.get_mut(c)
.ok_or(format!(
"Could not get mutable reference for component {}",
c
))?
.as_mut()
.ok_or(format!(
"Could not get mutable reference to optional value for component {}",
c
))?;
let gs_mut_ref: &mut GraphStorage = Arc::get_mut(entry).ok_or(format!(
"Could not get mutable reference for component {}",
c
))?;
Ok(gs_mut_ref.as_writeable().ok_or("Invalid type")?)
}
fn is_loaded(&self, c: &Component) -> bool {
let entry: Option<&Option<Arc<GraphStorage>>> = self.components.get(c);
if let Some(gs_opt) = entry {
if gs_opt.is_some() {
return true;
}
}
false
}
fn ensure_loaded_all(&mut self) -> Result<()> {
let mut components_to_load: Vec<Component> = Vec::with_capacity(self.components.len());
for (c, gs) in &self.components {
if gs.is_none() {
components_to_load.push(c.clone());
}
}
self.reset_cached_size();
let loaded_components: Vec<(Component, Result<Arc<GraphStorage>>)> = components_to_load
.into_par_iter()
.map(|c| {
info!("Loading component {} from disk", c);
let cpath = self.component_path(&c);
let loaded_component = load_component_from_disk(cpath);
(c, loaded_component)
})
.collect();
for (c, gs) in loaded_components {
let gs = gs?;
self.components.insert(c, Some(gs));
}
Ok(())
}
fn ensure_loaded(&mut self, c: &Component) -> Result<()> {
let entry: Option<Option<Arc<GraphStorage>>> = self.components.remove(c);
if let Some(gs_opt) = entry {
let loaded: Arc<GraphStorage> = if gs_opt.is_none() {
self.reset_cached_size();
info!("Loading component {} from disk", c);
load_component_from_disk(self.component_path(c))?
} else {
gs_opt.unwrap()
};
self.components.insert(c.clone(), Some(loaded));
}
Ok(())
}
fn optimize_impl(&mut self, c: &Component) {
if let Some(gs) = self.get_graphstorage(c) {
if let Some(stats) = gs.get_statistics() {
let opt_info = registry::get_optimal_impl_heuristic(self, stats);
if opt_info.id != gs.serialization_id() {
let mut new_gs = registry::create_from_info(&opt_info);
let converted = if let Some(new_gs_mut) = Arc::get_mut(&mut new_gs) {
new_gs_mut.copy(self, gs.as_ref());
true
} else {
false
};
if converted {
self.reset_cached_size();
info!(
"Converted component {} to implementation {}",
c, opt_info.id,
);
self.components.insert(c.clone(), Some(new_gs.clone()));
}
}
}
}
}
fn get_node_id_from_name(&self, node_name: &str) -> Option<NodeID> {
let mut all_nodes_with_anno = self.node_annos.exact_anno_search(
Some(ANNIS_NS.to_owned()),
NODE_NAME.to_owned(),
Some(node_name.to_owned()).into(),
);
if let Some(m) = all_nodes_with_anno.next() {
return Some(m.node);
}
None
}
pub fn get_graphstorage(&self, c: &Component) -> Option<Arc<GraphStorage>> {
let entry: Option<&Option<Arc<GraphStorage>>> = self.components.get(c);
if let Some(gs_opt) = entry {
if let Some(ref impl_type) = *gs_opt {
return Some(impl_type.clone());
}
}
None
}
fn get_graphstorage_as_ref<'a>(&'a self, c: &Component) -> Option<&'a GraphStorage> {
let entry: Option<&Option<Arc<GraphStorage>>> = self.components.get(c);
if let Some(gs_opt) = entry {
if let Some(ref impl_type) = *gs_opt {
return Some(impl_type.as_ref());
}
}
None
}
pub fn get_all_components(
&self,
ctype: Option<ComponentType>,
name: Option<&str>,
) -> Vec<Component> {
if let (Some(ctype), Some(name)) = (&ctype, name) {
let mut result: Vec<Component> = Vec::new();
let ckey = Component {
ctype: ctype.clone(),
name: String::from(name),
layer: String::default(),
};
for (c, _) in self.components.range(ckey..) {
if c.name != name || c.ctype != *ctype {
break;
}
result.push(c.clone());
}
return result;
} else if let Some(ctype) = &ctype {
let mut result: Vec<Component> = Vec::new();
let ckey = Component {
ctype: ctype.clone(),
name: String::default(),
layer: String::default(),
};
for (c, _) in self.components.range(ckey..) {
if c.ctype != *ctype {
break;
}
result.push(c.clone());
}
return result;
} else {
let filtered_components =
self.components
.keys()
.cloned()
.filter(move |c: &Component| {
if let Some(ctype) = ctype.clone() {
if ctype != c.ctype {
return false;
}
}
if let Some(name) = name {
if name != c.name {
return false;
}
}
true
});
return filtered_components.collect();
}
}
fn get_token_key(&self) -> AnnoKey {
AnnoKey {
ns: ANNIS_NS.to_owned(),
name: TOK.to_owned(),
}
}
fn get_node_name_key(&self) -> AnnoKey {
AnnoKey {
ns: ANNIS_NS.to_owned(),
name: NODE_NAME.to_owned(),
}
}
pub fn get_node_type_key(&self) -> AnnoKey {
AnnoKey {
ns: ANNIS_NS.to_owned(),
name: NODE_TYPE.to_owned(),
}
}
pub fn size_of_cached(&self, ops: &mut MallocSizeOfOps) -> usize {
let mut lock = self.cached_size.lock().unwrap();
let cached_size: &mut Option<usize> = &mut *lock;
if let Some(cached) = cached_size {
return *cached;
}
let calculated_size = self.size_of(ops);
*cached_size = Some(calculated_size);
calculated_size
}
fn reset_cached_size(&self) {
let mut lock = self.cached_size.lock().unwrap();
let cached_size: &mut Option<usize> = &mut *lock;
*cached_size = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::annis::types::{AnnoKey, Annotation, ComponentType, Edge};
#[test]
fn create_writeable_gs() {
let mut db = Graph::new();
let anno_key = AnnoKey {
ns: "test".to_owned(),
name: "edge_anno".to_owned(),
};
let anno_val = "testValue".to_owned();
let gs: &mut WriteableGraphStorage = db
.get_or_create_writable(&Component {
ctype: ComponentType::Pointing,
layer: String::from("test"),
name: String::from("dep"),
})
.unwrap();
gs.add_edge(Edge {
source: 0,
target: 1,
});
gs.add_edge_annotation(
Edge {
source: 0,
target: 1,
},
Annotation {
key: anno_key,
val: anno_val,
},
);
}
}