pub mod serialization;
pub mod storage;
pub mod update;
use crate::{
annostorage::{AnnotationStorage, ValueSearch},
errors::Result,
graph::storage::{registry, GraphStorage, WriteableGraphStorage},
};
use crate::{
errors::GraphAnnisCoreError,
types::{AnnoKey, Annotation, Component, ComponentType, Edge, NodeID},
};
use lfu::LFUCache;
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use rayon::prelude::*;
use smartstring::alias::String as SmartString;
use std::collections::BTreeMap;
use std::io::prelude::*;
use std::ops::Bound::Included;
use std::path::{Path, PathBuf};
use std::string::ToString;
use std::{
borrow::Cow,
sync::{Arc, Mutex},
};
use update::{GraphUpdate, UpdateEvent};
pub const ANNIS_NS: &str = "annis";
pub const DEFAULT_NS: &str = "default_ns";
pub const NODE_NAME: &str = "node_name";
pub const NODE_TYPE: &str = "node_type";
pub const DEFAULT_EMPTY_LAYER: &str = "default_layer";
lazy_static! {
pub static ref DEFAULT_ANNO_KEY: Arc<AnnoKey> = Arc::from(AnnoKey::default());
pub static ref NODE_NAME_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
ns: ANNIS_NS.into(),
name: NODE_NAME.into(),
});
pub static ref NODE_TYPE_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
ns: ANNIS_NS.into(),
name: NODE_TYPE.into(),
});
}
pub struct Graph<CT: ComponentType> {
node_annos: Box<dyn AnnotationStorage<NodeID>>,
location: Option<PathBuf>,
components: BTreeMap<Component<CT>, Option<Arc<dyn GraphStorage>>>,
current_change_id: u64,
background_persistance: Arc<Mutex<()>>,
cached_size: Mutex<Option<usize>>,
disk_based: bool,
}
impl<CT: ComponentType> MallocSizeOf for Graph<CT> {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
let mut size = self.node_annos.size_of(ops);
for c in self.components.keys() {
size += c.size_of(ops);
let gs_size = if let Some(gs) = self.get_graphstorage_as_ref(c) {
gs.size_of(ops) + std::mem::size_of::<usize>()
} else {
std::mem::size_of::<usize>()
};
size += gs_size;
}
size
}
}
fn load_component_from_disk(component_path: &Path) -> Result<Arc<dyn GraphStorage>> {
let impl_path = PathBuf::from(component_path).join("impl.cfg");
let mut f_impl = std::fs::File::open(impl_path)?;
let mut impl_name = String::new();
f_impl.read_to_string(&mut impl_name)?;
let gs = registry::deserialize(&impl_name, component_path)?;
Ok(gs)
}
fn component_to_relative_path<CT: ComponentType>(c: &Component<CT>) -> PathBuf {
let mut p = PathBuf::new();
p.push("gs");
p.push(c.get_type().to_string());
p.push(if c.layer.is_empty() {
DEFAULT_EMPTY_LAYER
} else {
&c.layer
});
p.push(c.name.as_str());
p
}
fn component_path<CT: ComponentType>(
location: &Option<PathBuf>,
c: &Component<CT>,
) -> Option<PathBuf> {
match location {
Some(ref loc) => {
let mut p = PathBuf::from(loc);
p.push("current");
p.push(component_to_relative_path(c));
Some(p)
}
None => None,
}
}
impl<CT: ComponentType> Graph<CT> {
pub fn new(disk_based: bool) -> Result<Self> {
let node_annos: Box<dyn AnnotationStorage<NodeID>> = if disk_based {
Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
} else {
Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
};
Ok(Graph {
node_annos,
components: BTreeMap::new(),
location: None,
current_change_id: 0,
background_persistance: Arc::new(Mutex::new(())),
cached_size: Mutex::new(None),
disk_based,
})
}
pub fn with_default_graphstorages(disk_based: bool) -> Result<Self> {
let mut db = Graph::new(disk_based)?;
for c in CT::default_components() {
db.get_or_create_writable(&c)?;
}
Ok(db)
}
fn set_location(&mut self, location: &Path) -> Result<()> {
self.location = Some(PathBuf::from(location));
Ok(())
}
fn clear(&mut self) -> Result<()> {
self.reset_cached_size()?;
self.node_annos = Box::new(crate::annostorage::inmemory::AnnoStorageImpl::new());
self.components.clear();
Ok(())
}
pub fn load_from(&mut self, location: &Path, preload: bool) -> Result<()> {
debug!("Loading corpus from {}", location.to_string_lossy());
self.clear()?;
let location = PathBuf::from(location);
self.set_location(location.as_path())?;
let backup = location.join("backup");
let mut backup_was_loaded = false;
let dir2load = if backup.exists() && backup.is_dir() {
backup_was_loaded = true;
backup.clone()
} else {
location.join("current")
};
let ondisk_subdirectory = dir2load.join(crate::annostorage::ondisk::SUBFOLDER_NAME);
if ondisk_subdirectory.exists() && ondisk_subdirectory.is_dir() {
self.disk_based = true;
let node_annos_tmp =
crate::annostorage::ondisk::AnnoStorageImpl::new(Some(ondisk_subdirectory))?;
self.node_annos = Box::new(node_annos_tmp);
} else {
self.disk_based = false;
let mut node_annos_tmp = crate::annostorage::inmemory::AnnoStorageImpl::new();
node_annos_tmp.load_annotations_from(&dir2load)?;
self.node_annos = Box::new(node_annos_tmp);
}
let log_path = dir2load.join("update_log.bin");
let logfile_exists = log_path.exists() && log_path.is_file();
self.find_components_from_disk(&dir2load)?;
if preload | logfile_exists | backup_was_loaded {
self.ensure_loaded_all()?;
}
if logfile_exists {
let log_reader = std::fs::File::open(&log_path)?;
let mut update = bincode::deserialize_from(log_reader)?;
self.apply_update_in_memory(&mut update, |_| {})?;
} else {
self.current_change_id = 0;
}
if backup_was_loaded {
self.save_to(&location.join("current"))?;
let tmp_dir = tempfile::Builder::new()
.prefix("temporary-graphannis-backup")
.tempdir_in(location)?;
std::fs::remove_dir(tmp_dir.path())?;
std::fs::rename(&backup, tmp_dir.path())?;
tmp_dir.close()?;
}
Ok(())
}
fn find_components_from_disk(&mut self, location: &Path) -> Result<()> {
self.components.clear();
for c in CT::all_component_types().into_iter() {
let cpath = PathBuf::from(location).join("gs").join(c.to_string());
if cpath.is_dir() {
for layer in cpath.read_dir()? {
let layer = layer?;
if layer.path().is_dir() {
let layer_file_name = layer.file_name();
let layer_name_from_file = layer_file_name.to_string_lossy();
let layer_name: SmartString = if layer_name_from_file == DEFAULT_EMPTY_LAYER
{
SmartString::default()
} else {
layer_name_from_file.into()
};
let empty_name_component =
Component::new(c.clone(), layer_name.clone(), SmartString::default());
{
let cfg_file = PathBuf::from(location)
.join(component_to_relative_path(&empty_name_component))
.join("impl.cfg");
if cfg_file.is_file() {
self.components.insert(empty_name_component.clone(), None);
debug!("Registered component {}", empty_name_component);
}
}
for name in layer.path().read_dir()? {
let name = name?;
let named_component = Component::new(
c.clone(),
layer_name.clone(),
name.file_name().to_string_lossy().into(),
);
let cfg_file = PathBuf::from(location)
.join(component_to_relative_path(&named_component))
.join("impl.cfg");
if cfg_file.is_file() {
self.components.insert(named_component.clone(), None);
debug!("Registered component {}", named_component);
}
}
}
}
}
} Ok(())
}
fn internal_save(&self, location: &Path) -> Result<()> {
let location = PathBuf::from(location);
std::fs::create_dir_all(&location)?;
self.node_annos.save_annotations_to(&location)?;
for (c, e) in &self.components {
if let Some(ref data) = *e {
let dir = PathBuf::from(&location).join(component_to_relative_path(c));
std::fs::create_dir_all(&dir)?;
let impl_name = data.serialization_id();
data.save_to(&dir)?;
let cfg_path = PathBuf::from(&dir).join("impl.cfg");
let mut f_cfg = std::fs::File::create(cfg_path)?;
f_cfg.write_all(impl_name.as_bytes())?;
}
}
Ok(())
}
pub fn save_to(&mut self, location: &Path) -> Result<()> {
self.ensure_loaded_all()?;
self.internal_save(&location.join("current"))
}
pub fn persist_to(&mut self, location: &Path) -> Result<()> {
self.set_location(location)?;
self.internal_save(&location.join("current"))
}
fn get_cached_node_id_from_name(
&self,
node_name: Cow<String>,
cache: &mut LFUCache<String, Option<NodeID>>,
) -> Result<Option<NodeID>> {
if let Some(id) = cache.get(node_name.as_ref()) {
Ok(*id)
} else {
let id = self.get_node_id_from_name(&node_name)?;
cache.set(node_name.to_string(), id);
Ok(id)
}
}
#[allow(clippy::cognitive_complexity)]
fn apply_update_in_memory<F>(&mut self, u: &mut GraphUpdate, progress_callback: F) -> Result<()>
where
F: Fn(&str),
{
self.reset_cached_size()?;
let all_components = self.get_all_components(None, None);
let mut update_graph_index = ComponentType::init_update_graph_index(self)?;
let mut node_id_cache = LFUCache::with_capacity(1_000)
.map_err(|err| GraphAnnisCoreError::LfuCache(err.to_string()))?;
let total_nr_updates = u.len()?;
progress_callback(&format!("applying {} atomic updates", total_nr_updates));
for (nr_updates, update_event) in u.iter()?.enumerate() {
let (id, change) = update_event?;
trace!("applying event {:?}", &change);
ComponentType::before_update_event(&change, self, &mut update_graph_index)?;
match &change {
UpdateEvent::AddNode {
node_name,
node_type,
} => {
let existing_node_id = self.get_cached_node_id_from_name(
Cow::Borrowed(node_name),
&mut node_id_cache,
)?;
if existing_node_id.is_none() {
let new_node_id: NodeID =
if let Some(id) = self.node_annos.get_largest_item()? {
id + 1
} else {
0
};
let new_anno_name = Annotation {
key: NODE_NAME_KEY.as_ref().clone(),
val: node_name.into(),
};
let new_anno_type = Annotation {
key: NODE_TYPE_KEY.as_ref().clone(),
val: node_type.into(),
};
self.node_annos.insert(new_node_id, new_anno_name)?;
self.node_annos.insert(new_node_id, new_anno_type)?;
node_id_cache.set(node_name.clone(), Some(new_node_id));
}
}
UpdateEvent::DeleteNode { node_name } => {
if let Some(existing_node_id) = self.get_cached_node_id_from_name(
Cow::Borrowed(node_name),
&mut node_id_cache,
)? {
{
for a in self
.node_annos
.get_annotations_for_item(&existing_node_id)?
{
self.node_annos
.remove_annotation_for_item(&existing_node_id, &a.key)?;
}
}
for c in all_components.iter() {
if let Ok(gs) = self.get_or_create_writable(c) {
gs.delete_node(existing_node_id)?;
}
}
node_id_cache.set(node_name.clone(), None);
}
}
UpdateEvent::AddNodeLabel {
node_name,
anno_ns,
anno_name,
anno_value,
} => {
if let Some(existing_node_id) = self.get_cached_node_id_from_name(
Cow::Borrowed(node_name),
&mut node_id_cache,
)? {
let anno = Annotation {
key: AnnoKey {
ns: anno_ns.into(),
name: anno_name.into(),
},
val: anno_value.into(),
};
self.node_annos.insert(existing_node_id, anno)?;
}
}
UpdateEvent::DeleteNodeLabel {
node_name,
anno_ns,
anno_name,
} => {
if let Some(existing_node_id) = self.get_cached_node_id_from_name(
Cow::Borrowed(node_name),
&mut node_id_cache,
)? {
let key = AnnoKey {
ns: anno_ns.into(),
name: anno_name.into(),
};
self.node_annos
.remove_annotation_for_item(&existing_node_id, &key)?;
}
}
UpdateEvent::AddEdge {
source_node,
target_node,
layer,
component_type,
component_name,
} => {
let source = self.get_cached_node_id_from_name(
Cow::Borrowed(source_node),
&mut node_id_cache,
)?;
let target = self.get_cached_node_id_from_name(
Cow::Borrowed(target_node),
&mut node_id_cache,
)?;
if let (Some(source), Some(target)) = (source, target) {
if let Ok(ctype) = CT::from_str(component_type) {
let c = Component::new(ctype, layer.into(), component_name.into());
let gs = self.get_or_create_writable(&c)?;
gs.add_edge(Edge { source, target })?;
}
}
}
UpdateEvent::DeleteEdge {
source_node,
target_node,
layer,
component_type,
component_name,
} => {
let source = self.get_cached_node_id_from_name(
Cow::Borrowed(source_node),
&mut node_id_cache,
)?;
let target = self.get_cached_node_id_from_name(
Cow::Borrowed(target_node),
&mut node_id_cache,
)?;
if let (Some(source), Some(target)) = (source, target) {
if let Ok(ctype) = CT::from_str(component_type) {
let c = Component::new(ctype, layer.into(), component_name.into());
let gs = self.get_or_create_writable(&c)?;
gs.delete_edge(&Edge { source, target })?;
}
}
}
UpdateEvent::AddEdgeLabel {
source_node,
target_node,
layer,
component_type,
component_name,
anno_ns,
anno_name,
anno_value,
} => {
let source = self.get_cached_node_id_from_name(
Cow::Borrowed(source_node),
&mut node_id_cache,
)?;
let target = self.get_cached_node_id_from_name(
Cow::Borrowed(target_node),
&mut node_id_cache,
)?;
if let (Some(source), Some(target)) = (source, target) {
if let Ok(ctype) = CT::from_str(component_type) {
let c = Component::new(ctype, layer.into(), component_name.into());
let gs = self.get_or_create_writable(&c)?;
let e = Edge { source, target };
if gs.is_connected(source, target, 1, Included(1))? {
let anno = Annotation {
key: AnnoKey {
ns: anno_ns.into(),
name: anno_name.into(),
},
val: anno_value.into(),
};
gs.add_edge_annotation(e, anno)?;
}
}
}
}
UpdateEvent::DeleteEdgeLabel {
source_node,
target_node,
layer,
component_type,
component_name,
anno_ns,
anno_name,
} => {
let source = self.get_cached_node_id_from_name(
Cow::Borrowed(source_node),
&mut node_id_cache,
)?;
let target = self.get_cached_node_id_from_name(
Cow::Borrowed(target_node),
&mut node_id_cache,
)?;
if let (Some(source), Some(target)) = (source, target) {
if let Ok(ctype) = CT::from_str(component_type) {
let c = Component::new(ctype, layer.into(), component_name.into());
let gs = self.get_or_create_writable(&c)?;
let e = Edge { source, target };
if gs.is_connected(source, target, 1, Included(1))? {
let key = AnnoKey {
ns: anno_ns.into(),
name: anno_name.into(),
};
gs.delete_edge_annotation(&e, &key)?;
}
}
}
}
} ComponentType::after_update_event(change, self, &mut update_graph_index)?;
self.current_change_id = id;
if nr_updates > 0 && nr_updates % 100_000 == 0 {
let progress = ((nr_updates as f64) / (total_nr_updates as f64)) * 100.0;
progress_callback(&format!(
"applied {:.2}% of the atomic updates ({}/{})",
progress, nr_updates, total_nr_updates,
));
}
}
progress_callback("calculating node statistics");
self.get_node_annos_mut().calculate_statistics()?;
progress_callback("extending graph with model-specific index");
ComponentType::apply_update_graph_index(update_graph_index, self)?;
Ok(())
}
pub fn apply_update<F>(&mut self, u: &mut GraphUpdate, progress_callback: F) -> Result<()>
where
F: Fn(&str),
{
progress_callback("applying list of atomic updates");
self.ensure_loaded_all()?;
let result = self.apply_update_in_memory(u, &progress_callback);
progress_callback("memory updates completed, persisting updates to disk");
if let Some(location) = self.location.clone() {
trace!("output location for persisting updates is {:?}", location);
if result.is_ok() {
let current_path = location.join("current");
std::fs::create_dir_all(¤t_path)?;
let log_path = location.join("update_log.bin");
let temporary_dir = tempfile::tempdir_in(¤t_path)?;
let mut temporary_disk_file = tempfile::NamedTempFile::new_in(&temporary_dir)?;
debug!("writing WAL update log to {:?}", temporary_disk_file.path());
bincode::serialize_into(temporary_disk_file.as_file(), &u)?;
temporary_disk_file.flush()?;
debug!("moving finished WAL update log to {:?}", &log_path);
temporary_disk_file.persist(&log_path)?;
progress_callback("finished writing WAL update log");
} else {
trace!("error occured while applying updates: {:?}", &result);
self.load_from(&location, true)?;
return result;
}
}
result
}
pub fn background_sync_wal_updates(&self) -> Result<()> {
if let Some(ref location) = self.location {
let _lock = self.background_persistance.lock()?;
self.internal_save_with_backup(location)?;
}
Ok(())
}
fn internal_save_with_backup(&self, location: &Path) -> Result<()> {
let backup_location = location.join("backup");
let current_location = location.join("current");
if !backup_location.exists() {
std::fs::rename(¤t_location, &backup_location)?;
}
self.internal_save(¤t_location)?;
let tmp_dir = tempfile::Builder::new()
.prefix("temporary-graphannis-backup")
.tempdir_in(location)?;
std::fs::remove_dir(tmp_dir.path())?;
std::fs::rename(&backup_location, tmp_dir.path())?;
tmp_dir.close()?;
Ok(())
}
fn ensure_writeable(&mut self, c: &Component<CT>) -> Result<()> {
self.ensure_loaded(c)?;
if let Some(gs_opt) = self.components.get_mut(c) {
let gs = gs_opt
.as_mut()
.ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
let is_writable = {
Arc::get_mut(gs)
.ok_or_else(|| {
GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string())
})?
.as_writeable()
.is_some()
};
if is_writable {
return Ok(());
}
} else {
return Ok(());
}
self.reset_cached_size()?;
let readonly_gs = self
.components
.get(c)
.cloned()
.ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
.ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
let writable_gs = registry::create_writeable(self, Some(readonly_gs.as_ref()))?;
self.components.insert(c.to_owned(), Some(writable_gs));
Ok(())
}
pub fn calculate_component_statistics(&mut self, c: &Component<CT>) -> Result<()> {
self.reset_cached_size()?;
let mut result: Result<()> = Ok(());
let mut entry = self
.components
.remove(c)
.ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?;
if let Some(ref mut gs) = entry {
if let Some(gs_mut) = Arc::get_mut(gs) {
if let Some(writeable_gs) = gs_mut.as_writeable() {
writeable_gs.calculate_statistics()?;
}
} else {
result = Err(GraphAnnisCoreError::NonExclusiveComponentReference(
c.to_string(),
));
}
}
self.components.insert(c.clone(), entry);
result
}
pub fn get_or_create_writable(
&mut self,
c: &Component<CT>,
) -> Result<&mut dyn WriteableGraphStorage> {
self.reset_cached_size()?;
if self.components.contains_key(c) {
self.ensure_writeable(c)?;
} else {
let w = registry::create_writeable(self, None)?;
self.components.insert(c.clone(), Some(w));
}
let entry: &mut Arc<dyn GraphStorage> = self
.components
.get_mut(c)
.ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
.as_mut()
.ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
let gs_mut_ref: &mut dyn GraphStorage = Arc::get_mut(entry)
.ok_or_else(|| GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string()))?;
let result = gs_mut_ref
.as_writeable()
.ok_or_else(|| GraphAnnisCoreError::ReadOnlyComponent(c.to_string()))?;
Ok(result)
}
pub fn is_loaded(&self, c: &Component<CT>) -> bool {
let entry: Option<&Option<Arc<dyn GraphStorage>>> = self.components.get(c);
if let Some(gs_opt) = entry {
if gs_opt.is_some() {
return true;
}
}
false
}
pub fn ensure_loaded_all(&mut self) -> Result<()> {
let mut components_to_load: Vec<_> = Vec::with_capacity(self.components.len());
for (c, gs) in &self.components {
if gs.is_none() {
components_to_load.push(c.clone());
}
}
self.reset_cached_size()?;
let loaded_components: Vec<(_, Result<Arc<dyn GraphStorage>>)> = components_to_load
.into_par_iter()
.map(|c| match component_path(&self.location, &c) {
Some(cpath) => {
debug!("loading component {} from {}", c, &cpath.to_string_lossy());
(c, load_component_from_disk(&cpath))
}
None => (c, Err(GraphAnnisCoreError::EmptyComponentPath)),
})
.collect();
for (c, gs) in loaded_components {
let gs = gs?;
self.components.insert(c, Some(gs));
}
Ok(())
}
pub fn ensure_loaded(&mut self, c: &Component<CT>) -> Result<()> {
let mut cache_reset_needed = false;
if let Some(gs_opt) = self.components.get_mut(c) {
if gs_opt.is_none() {
let component_path = component_path(&self.location, c)
.ok_or(GraphAnnisCoreError::EmptyComponentPath)?;
debug!(
"loading component {} from {}",
c,
&component_path.to_string_lossy()
);
let component = load_component_from_disk(&component_path)?;
gs_opt.get_or_insert_with(|| component);
cache_reset_needed = true;
}
}
if cache_reset_needed {
self.reset_cached_size()?;
}
Ok(())
}
pub fn optimize_impl(&mut self, disk_based: bool) -> Result<()> {
self.ensure_loaded_all()?;
if self.disk_based != disk_based {
self.disk_based = disk_based;
let mut new_node_annos: Box<dyn AnnotationStorage<NodeID>> = if disk_based {
Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
} else {
Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
};
info!("copying node annotations");
for m in self
.node_annos
.exact_anno_search(Some(ANNIS_NS), NODE_TYPE, ValueSearch::Any)
{
let m = m?;
for anno in self.node_annos.get_annotations_for_item(&m.node)? {
new_node_annos.insert(m.node, anno)?;
}
}
info!("re-calculating node annotation statistics");
new_node_annos.calculate_statistics()?;
self.node_annos = new_node_annos;
}
for c in self.get_all_components(None, None) {
info!("updating statistics for component {}", &c);
self.calculate_component_statistics(&c)?;
info!("optimizing implementation for component {}", &c);
self.optimize_gs_impl(&c)?;
}
if let Some(location) = &self.location {
info!("saving corpus to disk");
self.internal_save_with_backup(location)?;
}
Ok(())
}
pub fn optimize_gs_impl(&mut self, c: &Component<CT>) -> Result<()> {
if let Some(gs) = self.get_graphstorage(c) {
if let Some(stats) = gs.get_statistics() {
let opt_info = registry::get_optimal_impl_heuristic(self, stats);
if opt_info.id != gs.serialization_id() {
let mut new_gs = registry::create_from_info(&opt_info)?;
let converted = if let Some(new_gs_mut) = Arc::get_mut(&mut new_gs) {
info!(
"converting component {} to implementation {}",
c, opt_info.id,
);
new_gs_mut.copy(self.get_node_annos(), gs.as_ref())?;
true
} else {
false
};
if converted {
self.reset_cached_size()?;
info!(
"finished conversion of component {} to implementation {}",
c, opt_info.id,
);
self.components.insert(c.clone(), Some(new_gs.clone()));
}
}
}
}
Ok(())
}
pub fn get_node_id_from_name(&self, node_name: &str) -> Result<Option<NodeID>> {
let mut all_nodes_with_anno =
self.node_annos
.exact_anno_search(Some(ANNIS_NS), NODE_NAME, Some(node_name).into());
if let Some(m) = all_nodes_with_anno.next() {
let m = m?;
return Ok(Some(m.node));
}
Ok(None)
}
pub fn get_graphstorage(&self, c: &Component<CT>) -> Option<Arc<dyn GraphStorage>> {
let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
Some(entry.clone())
}
pub fn get_graphstorage_as_ref<'a>(
&'a self,
c: &Component<CT>,
) -> Option<&'a dyn GraphStorage> {
let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
Some(entry.as_ref())
}
pub fn get_node_annos(&self) -> &dyn AnnotationStorage<NodeID> {
self.node_annos.as_ref()
}
pub fn get_node_annos_mut(&mut self) -> &mut dyn AnnotationStorage<NodeID> {
self.node_annos.as_mut()
}
pub fn get_all_components(&self, ctype: Option<CT>, name: Option<&str>) -> Vec<Component<CT>> {
if let (Some(ctype), Some(name)) = (&ctype, name) {
let mut result: Vec<_> = Vec::new();
let ckey = Component::new(ctype.clone(), SmartString::default(), name.into());
for (c, _) in self.components.range(ckey..) {
if c.name != name || &c.get_type() != ctype {
break;
}
result.push(c.clone());
}
result
} else if let Some(ctype) = &ctype {
let mut result: Vec<_> = Vec::new();
let ckey = Component::new(
ctype.clone(),
SmartString::default(),
SmartString::default(),
);
for (c, _) in self.components.range(ckey..) {
if &c.get_type() != ctype {
break;
}
result.push(c.clone());
}
result
} else {
let filtered_components =
self.components
.keys()
.cloned()
.filter(move |c: &Component<CT>| {
if let Some(ctype) = ctype.clone() {
if ctype != c.get_type() {
return false;
}
}
if let Some(name) = name {
if name != c.name {
return false;
}
}
true
});
filtered_components.collect()
}
}
pub fn size_of_cached(&self, ops: &mut MallocSizeOfOps) -> Result<usize> {
let mut lock = self.cached_size.lock()?;
let cached_size: &mut Option<usize> = &mut *lock;
if let Some(cached) = cached_size {
return Ok(*cached);
}
let calculated_size = self.size_of(ops);
*cached_size = Some(calculated_size);
Ok(calculated_size)
}
fn reset_cached_size(&self) -> Result<()> {
let mut lock = self.cached_size.lock()?;
let cached_size: &mut Option<usize> = &mut *lock;
*cached_size = None;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{AnnoKey, Annotation, DefaultComponentType, Edge};
#[test]
fn create_writeable_gs() {
let mut db = Graph::<DefaultComponentType>::new(false).unwrap();
let anno_key = AnnoKey {
ns: "test".into(),
name: "edge_anno".into(),
};
let anno_val = "testValue".into();
let component = Component::new(DefaultComponentType::Edge, "test".into(), "dep".into());
let gs: &mut dyn WriteableGraphStorage = db.get_or_create_writable(&component).unwrap();
gs.add_edge(Edge {
source: 0,
target: 1,
})
.unwrap();
gs.add_edge_annotation(
Edge {
source: 0,
target: 1,
},
Annotation {
key: anno_key,
val: anno_val,
},
)
.unwrap();
}
}