use crate::annis::db;
use crate::annis::db::annostorage::AnnoStorage;
use crate::annis::db::aql;
use crate::annis::db::aql::operators;
use crate::annis::db::aql::operators::RangeSpec;
use crate::annis::db::exec::nodesearch::NodeSearchSpec;
use crate::annis::db::graphstorage::GraphStatistic;
use crate::annis::db::plan::ExecutionPlan;
use crate::annis::db::query;
use crate::annis::db::query::conjunction::Conjunction;
use crate::annis::db::query::disjunction::Disjunction;
use crate::annis::db::relannis;
use crate::annis::db::token_helper::TokenHelper;
use crate::annis::db::{AnnotationStorage, Graph, Match, ANNIS_NS, NODE_TYPE};
use crate::annis::errors::ErrorKind;
use crate::annis::errors::*;
use crate::annis::types::AnnoKey;
use crate::annis::types::{
Annotation, Component, ComponentType, CountExtra, Edge, FrequencyTable, NodeID,
QueryAttributeDescription,
};
use crate::annis::util;
use crate::annis::util::memory_estimation;
use crate::annis::util::quicksort;
use crate::malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use crate::update::GraphUpdate;
use fs2::FileExt;
use linked_hash_map::LinkedHashMap;
use std;
use std::collections::{BTreeSet, HashSet};
use std::fmt;
use std::fs::File;
use std::fs::OpenOptions;
use std::iter::FromIterator;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Condvar, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::thread;
use rustc_hash::FxHashMap;
use rand;
use rand::seq::SliceRandom;
use sys_info;
enum CacheEntry {
Loaded(Graph),
NotLoaded,
}
#[derive(Debug, Ord, Eq, PartialOrd, PartialEq)]
pub enum LoadStatus {
NotLoaded,
PartiallyLoaded(usize),
FullyLoaded(usize),
}
pub struct GraphStorageInfo {
pub component: Component,
pub load_status: LoadStatus,
pub number_of_annotations: usize,
pub implementation: String,
pub statistics: Option<GraphStatistic>,
}
impl fmt::Display for GraphStorageInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(
f,
"Component {}: {} annnotations",
self.component, self.number_of_annotations
)?;
if let Some(stats) = &self.statistics {
writeln!(f, "Stats: {}", stats)?;
}
writeln!(f, "Implementation: {}", self.implementation)?;
match self.load_status {
LoadStatus::NotLoaded => writeln!(f, "Not Loaded")?,
LoadStatus::PartiallyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "partially loaded")?;
writeln!(
f,
"Memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
LoadStatus::FullyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "fully loaded")?;
writeln!(
f,
"Memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
};
Ok(())
}
}
pub struct CorpusInfo {
pub name: String,
pub load_status: LoadStatus,
pub graphstorages: Vec<GraphStorageInfo>,
}
impl fmt::Display for CorpusInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.load_status {
LoadStatus::NotLoaded => writeln!(f, "Not Loaded")?,
LoadStatus::PartiallyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "partially loaded")?;
writeln!(
f,
"Total memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
LoadStatus::FullyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "fully loaded")?;
writeln!(
f,
"Total memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
};
if !self.graphstorages.is_empty() {
writeln!(f, "------------")?;
for gs in &self.graphstorages {
write!(f, "{}", gs)?;
writeln!(f, "------------")?;
}
}
Ok(())
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
#[repr(C)]
pub enum ResultOrder {
Normal,
Inverted,
Randomized,
NotSorted,
}
struct PreparationResult<'a> {
query: Disjunction<'a>,
db_entry: Arc<RwLock<CacheEntry>>,
}
#[derive(Debug)]
pub struct FrequencyDefEntry {
pub ns: Option<String>,
pub name: String,
pub node_ref: String,
}
impl FromStr for FrequencyDefEntry {
type Err = Error;
fn from_str(s: &str) -> std::result::Result<FrequencyDefEntry, Self::Err> {
let splitted: Vec<&str> = s.splitn(2, ':').collect();
if splitted.len() != 2 {
return Err("Frequency definition must consists of two parts: \
the referenced node and the annotation name or \"tok\" separated by \":\""
.into());
}
let node_ref = splitted[0];
let anno_key = util::split_qname(splitted[1]);
Ok(FrequencyDefEntry {
ns: anno_key.0.and_then(|ns| Some(String::from(ns))),
name: String::from(anno_key.1),
node_ref: String::from(node_ref),
})
}
}
#[repr(C)]
#[derive(Clone, Copy)]
pub enum QueryLanguage {
AQL,
AQLQuirksV3,
}
#[repr(C)]
#[derive(Clone, Copy)]
pub enum ImportFormat {
RelANNIS,
}
pub enum CacheStrategy {
FixedMaxMemory(usize),
PercentOfFreeMemory(f64),
}
pub struct CorpusStorage {
db_dir: PathBuf,
lock_file: File,
cache_strategy: CacheStrategy,
corpus_cache: RwLock<LinkedHashMap<String, Arc<RwLock<CacheEntry>>>>,
query_config: query::Config,
active_background_workers: Arc<(Mutex<usize>, Condvar)>,
}
impl CorpusStorage {
pub fn with_cache_strategy(
db_dir: &Path,
cache_strategy: CacheStrategy,
use_parallel_joins: bool,
) -> Result<CorpusStorage> {
let query_config = query::Config { use_parallel_joins };
#[cfg_attr(feature = "cargo-clippy", allow(clippy))]
let active_background_workers = Arc::new((Mutex::new(0), Condvar::new()));
let cs = CorpusStorage {
db_dir: PathBuf::from(db_dir),
lock_file: create_lockfile_for_directory(db_dir)?,
cache_strategy,
corpus_cache: RwLock::new(LinkedHashMap::new()),
query_config,
active_background_workers,
};
Ok(cs)
}
pub fn with_auto_cache_size(db_dir: &Path, use_parallel_joins: bool) -> Result<CorpusStorage> {
let query_config = query::Config { use_parallel_joins };
let cache_strategy: CacheStrategy = CacheStrategy::PercentOfFreeMemory(25.0);
#[cfg_attr(feature = "cargo-clippy", allow(clippy))]
let active_background_workers = Arc::new((Mutex::new(0), Condvar::new()));
let cs = CorpusStorage {
db_dir: PathBuf::from(db_dir),
lock_file: create_lockfile_for_directory(db_dir)?,
cache_strategy,
corpus_cache: RwLock::new(LinkedHashMap::new()),
query_config,
active_background_workers,
};
Ok(cs)
}
pub fn list(&self) -> Result<Vec<CorpusInfo>> {
let names: Vec<String> = self.list_from_disk().unwrap_or_default();
let mut result: Vec<CorpusInfo> = vec![];
let mut mem_ops =
MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
for n in names {
if let Ok(corpus_info) = self.create_corpus_info(&n, &mut mem_ops) {
result.push(corpus_info);
}
}
Ok(result)
}
fn list_from_disk(&self) -> Result<Vec<String>> {
let mut corpora: Vec<String> = Vec::new();
for c_dir in self.db_dir.read_dir().chain_err(|| {
format!(
"Listing directories from {} failed",
self.db_dir.to_string_lossy()
)
})? {
let c_dir = c_dir.chain_err(|| {
format!(
"Could not get directory entry of folder {}",
self.db_dir.to_string_lossy()
)
})?;
let ftype = c_dir.file_type().chain_err(|| {
format!(
"Could not determine file type for {}",
c_dir.path().to_string_lossy()
)
})?;
if ftype.is_dir() {
let corpus_name = c_dir.file_name().to_string_lossy().to_string();
corpora.push(corpus_name.clone());
}
}
Ok(corpora)
}
fn create_corpus_info(
&self,
corpus_name: &str,
mem_ops: &mut MallocSizeOfOps,
) -> Result<CorpusInfo> {
let cache_entry = self.get_entry(corpus_name)?;
let lock = cache_entry.read().unwrap();
let corpus_info: CorpusInfo = match &*lock {
CacheEntry::Loaded(ref db) => {
let heap_size = db.size_of(mem_ops);
let mut load_status = LoadStatus::FullyLoaded(heap_size);
let mut graphstorages = Vec::new();
for c in db.get_all_components(None, None) {
if let Some(gs) = db.get_graphstorage_as_ref(&c) {
graphstorages.push(GraphStorageInfo {
component: c.clone(),
load_status: LoadStatus::FullyLoaded(gs.size_of(mem_ops)),
number_of_annotations: gs.get_anno_storage().number_of_annotations(),
implementation: gs.serialization_id().clone(),
statistics: gs.get_statistics().cloned(),
});
} else {
load_status = LoadStatus::PartiallyLoaded(heap_size);
graphstorages.push(GraphStorageInfo {
component: c.clone(),
load_status: LoadStatus::NotLoaded,
number_of_annotations: 0,
implementation: "".to_owned(),
statistics: None,
})
}
}
CorpusInfo {
name: corpus_name.to_owned(),
load_status,
graphstorages,
}
}
&CacheEntry::NotLoaded => CorpusInfo {
name: corpus_name.to_owned(),
load_status: LoadStatus::NotLoaded,
graphstorages: vec![],
},
};
Ok(corpus_info)
}
pub fn info(&self, corpus_name: &str) -> Result<CorpusInfo> {
let mut mem_ops =
MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
self.create_corpus_info(corpus_name, &mut mem_ops)
}
fn get_entry(&self, corpus_name: &str) -> Result<Arc<RwLock<CacheEntry>>> {
let corpus_name = corpus_name.to_string();
{
let cache_lock = self.corpus_cache.read().unwrap();
let cache = &*cache_lock;
if let Some(e) = cache.get(&corpus_name) {
return Ok(e.clone());
}
}
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
let entry = cache
.entry(corpus_name.clone())
.or_insert_with(|| Arc::new(RwLock::new(CacheEntry::NotLoaded)));
Ok(entry.clone())
}
fn load_entry_with_lock(
&self,
cache_lock: &mut RwLockWriteGuard<LinkedHashMap<String, Arc<RwLock<CacheEntry>>>>,
corpus_name: &str,
create_if_missing: bool,
) -> Result<Arc<RwLock<CacheEntry>>> {
let cache = &mut *cache_lock;
let db_path: PathBuf = [self.db_dir.to_string_lossy().as_ref(), &corpus_name]
.iter()
.collect();
let create_corpus = if db_path.is_dir() {
false
} else if create_if_missing {
true
} else {
return Err(ErrorKind::NoSuchCorpus(corpus_name.to_string()).into());
};
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![]);
let db = if create_corpus {
let mut db = Graph::with_default_graphstorages()?;
db.persist_to(&db_path)
.chain_err(|| format!("Could not create corpus with name {}", corpus_name))?;
db
} else {
let mut db = Graph::new();
db.load_from(&db_path, false)?;
db
};
let entry = Arc::new(RwLock::new(CacheEntry::Loaded(db)));
cache.remove(corpus_name);
cache.insert(String::from(corpus_name), entry.clone());
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![corpus_name]);
Ok(entry)
}
fn get_loaded_entry(
&self,
corpus_name: &str,
create_if_missing: bool,
) -> Result<Arc<RwLock<CacheEntry>>> {
let cache_entry = self.get_entry(corpus_name)?;
let loaded = {
let lock = cache_entry.read().unwrap();
match &*lock {
CacheEntry::Loaded(_) => true,
_ => false,
}
};
if loaded {
Ok(cache_entry)
} else {
let mut cache_lock = self.corpus_cache.write().unwrap();
self.load_entry_with_lock(&mut cache_lock, corpus_name, create_if_missing)
}
}
fn get_loaded_entry_with_components(
&self,
corpus_name: &str,
components: Vec<Component>,
) -> Result<Arc<RwLock<CacheEntry>>> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let missing_components = {
let lock = db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let mut missing: HashSet<Component> = HashSet::new();
for c in components {
if !db.is_loaded(&c) {
missing.insert(c);
}
}
missing
};
if !missing_components.is_empty() {
let mut lock = db_entry.write().unwrap();
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
};
Ok(db_entry)
}
fn get_fully_loaded_entry(&self, corpus_name: &str) -> Result<Arc<RwLock<CacheEntry>>> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let missing_components = {
let lock = db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let mut missing: HashSet<Component> = HashSet::new();
for c in db.get_all_components(None, None) {
if !db.is_loaded(&c) {
missing.insert(c);
}
}
missing
};
if !missing_components.is_empty() {
let mut lock = db_entry.write().unwrap();
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
};
Ok(db_entry)
}
pub fn import_from_fs(
&self,
path: &Path,
format: ImportFormat,
corpus_name: Option<String>,
) -> Result<String> {
let (orig_name, mut graph) = match format {
ImportFormat::RelANNIS => relannis::load(path, |status| {
info!("{}", status);
self.check_cache_size_and_remove(vec![]);
})?,
};
let r = graph.ensure_loaded_all();
if let Err(e) = r {
error!(
"Some error occured when attempting to load components from disk: {:?}",
e
);
}
let corpus_name = corpus_name.unwrap_or(orig_name);
let mut db_path = PathBuf::from(&self.db_dir);
db_path.push(corpus_name.clone());
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![]);
let old_entry = cache.remove(&corpus_name);
if old_entry.is_some() {
if let Err(e) = std::fs::remove_dir_all(db_path.clone()) {
error!("Error when removing existing files {}", e);
}
}
if let Err(e) = std::fs::create_dir_all(&db_path) {
error!(
"Can't create directory {}: {:?}",
db_path.to_string_lossy(),
e
);
}
let save_result = graph.save_to(&db_path);
if let Err(e) = save_result {
error!(
"Can't save corpus to {}: {:?}",
db_path.to_string_lossy(),
e
);
}
cache.insert(
corpus_name.clone(),
Arc::new(RwLock::new(CacheEntry::Loaded(graph))),
);
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![&corpus_name]);
Ok(corpus_name)
}
pub fn delete(&self, corpus_name: &str) -> Result<bool> {
let mut db_path = PathBuf::from(&self.db_dir);
db_path.push(corpus_name);
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
if let Some(db_entry) = cache.remove(corpus_name) {
let mut _lock = db_entry.write().unwrap();
if db_path.is_dir() && db_path.exists() {
std::fs::remove_dir_all(db_path.clone())
.chain_err(|| "Error when removing existing files")?
}
return Ok(true);
} else {
return Ok(false);
}
}
pub fn apply_update(&self, corpus_name: &str, update: &mut GraphUpdate) -> Result<()> {
let db_entry = self
.get_loaded_entry(corpus_name, true)
.chain_err(|| format!("Could not get loaded entry for corpus {}", corpus_name))?;
{
let mut lock = db_entry.write().unwrap();
let db: &mut Graph = get_write_or_error(&mut lock)?;
db.apply_update(update)?;
}
let active_background_workers = self.active_background_workers.clone();
{
let &(ref lock, ref _cvar) = &*active_background_workers;
let mut nr_active_background_workers = lock.lock().unwrap();
*nr_active_background_workers += 1;
}
thread::spawn(move || {
trace!("Starting background thread to sync WAL updates");
let lock = db_entry.read().unwrap();
if let Ok(db) = get_read_or_error(&lock) {
let db: &Graph = db;
if let Err(e) = db.background_sync_wal_updates() {
error!("Can't sync changes in background thread: {:?}", e);
} else {
trace!("Finished background thread to sync WAL updates");
}
}
let &(ref lock, ref cvar) = &*active_background_workers;
let mut nr_active_background_workers = lock.lock().unwrap();
*nr_active_background_workers -= 1;
cvar.notify_all();
});
Ok(())
}
fn prepare_query<'a>(
&self,
corpus_name: &str,
query: &'a str,
query_language: QueryLanguage,
additional_components: Vec<Component>,
) -> Result<PreparationResult<'a>> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let (q, missing_components) = {
let lock = db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let q = match query_language {
QueryLanguage::AQL => aql::parse(query, false)?,
QueryLanguage::AQLQuirksV3 => aql::parse(query, true)?,
};
let necessary_components = q.necessary_components(db);
let mut missing: HashSet<Component> =
HashSet::from_iter(necessary_components.iter().cloned());
missing.extend(additional_components.into_iter());
for c in &necessary_components {
if db.get_graphstorage(c).is_some() {
missing.remove(c);
}
}
let missing: Vec<Component> = missing.into_iter().collect();
(q, missing)
};
if !missing_components.is_empty() {
{
let mut lock = db_entry.write().unwrap();
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
}
self.check_cache_size_and_remove(vec![corpus_name]);
};
Ok(PreparationResult { query: q, db_entry })
}
pub fn preload(&self, corpus_name: &str) -> Result<()> {
{
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let mut lock = db_entry.write().unwrap();
let db = get_write_or_error(&mut lock)?;
db.ensure_loaded_all()?;
}
self.check_cache_size_and_remove(vec![corpus_name]);
Ok(())
}
pub fn unload(&self, corpus_name: &str) {
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
cache.remove(corpus_name);
}
pub fn update_statistics(&self, corpus_name: &str) -> Result<()> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let mut lock = db_entry.write().unwrap();
let db: &mut Graph = get_write_or_error(&mut lock)?;
Arc::make_mut(&mut db.node_annos).calculate_statistics();
for c in db.get_all_components(None, None) {
db.calculate_component_statistics(&c)?;
}
Ok(())
}
pub fn validate_query(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
) -> Result<bool> {
let prep: PreparationResult =
self.prepare_query(corpus_name, query, query_language, vec![])?;
let lock = prep.db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
Ok(true)
}
pub fn plan(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
) -> Result<String> {
let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;
let lock = prep.db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
Ok(format!("{}", plan))
}
pub fn count(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
) -> Result<u64> {
let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;
let lock = prep.db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
Ok(plan.count() as u64)
}
pub fn count_extra(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
) -> Result<CountExtra> {
let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;
let lock = prep.db_entry.read().unwrap();
let db: &Graph = get_read_or_error(&lock)?;
let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
let mut known_documents = HashSet::new();
let node_name_key_id = db
.node_annos
.get_key_id(&db.get_node_name_key())
.ok_or("No internal ID for node names found")?;
let result = plan.fold((0, 0), move |acc: (u64, usize), m: Vec<Match>| {
if !m.is_empty() {
let m: &Match = &m[0];
if let Some(node_name) = db
.node_annos
.get_value_for_item_by_id(&m.node, node_name_key_id)
{
let node_name: &str = node_name;
let doc_path =
&node_name[0..node_name.rfind('#').unwrap_or_else(|| node_name.len())];
known_documents.insert(doc_path.to_owned());
}
}
(acc.0 + 1, known_documents.len())
});
Ok(CountExtra {
match_count: result.0,
document_count: result.1 as u64,
})
}
pub fn find(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
offset: usize,
limit: usize,
order: ResultOrder,
) -> Result<Vec<String>> {
let order_component = Component {
ctype: ComponentType::Ordering,
layer: String::from("annis"),
name: String::from(""),
};
let prep = self.prepare_query(corpus_name, query, query_language, vec![order_component])?;
let lock = prep.db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let mut query_config = self.query_config.clone();
if order == ResultOrder::NotSorted {
query_config.use_parallel_joins = false;
}
let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &query_config)?;
let mut expected_size: Option<usize> = None;
let base_it: Box<Iterator<Item = Vec<Match>>> = if order == ResultOrder::NotSorted
|| (order == ResultOrder::Normal && plan.is_sorted_by_text())
{
Box::from(plan)
} else {
let estimated_result_size = plan.estimated_output_size();
let mut tmp_results: Vec<Vec<Match>> = Vec::with_capacity(estimated_result_size);
for mgroup in plan {
tmp_results.push(mgroup);
}
if order == ResultOrder::Randomized {
let mut rng = rand::thread_rng();
tmp_results.shuffle(&mut rng);
} else {
let token_helper = TokenHelper::new(db);
let component_order = Component {
ctype: ComponentType::Ordering,
layer: String::from("annis"),
name: String::from(""),
};
let gs_order = db.get_graphstorage_as_ref(&component_order);
let order_func = |m1: &Vec<Match>, m2: &Vec<Match>| -> std::cmp::Ordering {
if order == ResultOrder::Inverted {
db::sort_matches::compare_matchgroup_by_text_pos(
m1,
m2,
&db.node_annos,
token_helper.as_ref(),
gs_order,
)
.reverse()
} else {
db::sort_matches::compare_matchgroup_by_text_pos(
m1,
m2,
&db.node_annos,
token_helper.as_ref(),
gs_order,
)
}
};
if self.query_config.use_parallel_joins {
quicksort::sort_first_n_items_parallel(
&mut tmp_results,
offset + limit,
order_func,
);
} else {
quicksort::sort_first_n_items(&mut tmp_results, offset + limit, order_func);
}
}
expected_size = Some(tmp_results.len());
Box::from(tmp_results.into_iter())
};
let node_name_key_id = db
.node_annos
.get_key_id(&db.get_node_name_key())
.ok_or("No internal ID for node names found")?;
let mut results: Vec<String> = if let Some(expected_size) = expected_size {
Vec::with_capacity(std::cmp::min(expected_size, limit))
} else {
Vec::new()
};
results.extend(base_it.skip(offset).take(limit).map(|m: Vec<Match>| {
let mut match_desc: Vec<String> = Vec::new();
for singlematch in &m {
let mut node_desc = String::new();
if let Some(anno_key) = db.node_annos.get_key_value(singlematch.anno_key) {
if &anno_key.ns != "annis" {
if !anno_key.ns.is_empty() {
node_desc.push_str(&anno_key.ns);
node_desc.push_str("::");
}
node_desc.push_str(&anno_key.name);
node_desc.push_str("::");
}
}
if let Some(name) = db
.node_annos
.get_value_for_item_by_id(&singlematch.node, node_name_key_id)
{
node_desc.push_str("salt:/");
node_desc.push_str(name);
}
match_desc.push(node_desc);
}
let mut result = String::new();
result.push_str(&match_desc.join(" "));
result
}));
Ok(results)
}
pub fn subgraph(
&self,
corpus_name: &str,
node_ids: Vec<String>,
ctx_left: usize,
ctx_right: usize,
) -> Result<Graph> {
let db_entry = self.get_fully_loaded_entry(corpus_name)?;
let mut query = Disjunction {
alternatives: vec![],
};
for source_node_id in node_ids {
let source_node_id: &str = if source_node_id.starts_with("salt:/") {
&source_node_id[6..]
} else {
&source_node_id
};
let m = NodeSearchSpec::ExactValue {
ns: Some(db::ANNIS_NS.to_string()),
name: db::NODE_NAME.to_string(),
val: Some(source_node_id.to_string()),
is_meta: false,
};
{
let mut q = Conjunction::new();
let node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
let m_idx = q.add_node(m.clone(), None);
q.add_operator(Box::new(operators::OverlapSpec {}), &m_idx, &node_idx, true)?;
query.alternatives.push(q);
}
{
let mut q = Conjunction::new();
let tok_idx = q.add_node(NodeSearchSpec::AnyToken, None);
let m_idx = q.add_node(m.clone(), None);
q.add_operator(
Box::new(operators::PrecedenceSpec {
segmentation: None,
dist: RangeSpec::Bound {
min_dist: 0,
max_dist: ctx_left,
},
}),
&tok_idx,
&m_idx,
true,
)?;
query.alternatives.push(q);
}
{
let mut q = Conjunction::new();
let node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
let tok_idx = q.add_node(NodeSearchSpec::AnyToken, None);
let m_idx = q.add_node(m.clone(), None);
q.add_operator(
Box::new(operators::OverlapSpec {}),
&node_idx,
&tok_idx,
true,
)?;
q.add_operator(
Box::new(operators::PrecedenceSpec {
segmentation: None,
dist: RangeSpec::Bound {
min_dist: 0,
max_dist: ctx_left,
},
}),
&tok_idx,
&m_idx,
true,
)?;
query.alternatives.push(q);
}
{
let mut q = Conjunction::new();
let tok_idx = q.add_node(NodeSearchSpec::AnyToken, None);
let m_idx = q.add_node(m.clone(), None);
q.add_operator(
Box::new(operators::PrecedenceSpec {
segmentation: None,
dist: RangeSpec::Bound {
min_dist: 0,
max_dist: ctx_right,
},
}),
&m_idx,
&tok_idx,
true,
)?;
query.alternatives.push(q);
}
{
let mut q = Conjunction::new();
let node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
let m_idx = q.add_node(m.clone(), None);
let tok_idx = q.add_node(NodeSearchSpec::AnyToken, None);
q.add_operator(
Box::new(operators::PrecedenceSpec {
segmentation: None,
dist: RangeSpec::Bound {
min_dist: 0,
max_dist: ctx_right,
},
}),
&m_idx,
&tok_idx,
true,
)?;
q.add_operator(
Box::new(operators::OverlapSpec {}),
&tok_idx,
&node_idx,
true,
)?;
query.alternatives.push(q);
}
}
extract_subgraph_by_query(&db_entry, &query, &[0], &self.query_config, None)
}
pub fn subgraph_for_query(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
component_type_filter: Option<ComponentType>,
) -> Result<Graph> {
let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;
let mut max_alt_size = 0;
for alt in &prep.query.alternatives {
max_alt_size = std::cmp::max(max_alt_size, alt.num_of_nodes());
}
let match_idx: Vec<usize> = (0..max_alt_size).collect();
extract_subgraph_by_query(
&prep.db_entry.clone(),
&prep.query,
&match_idx,
&self.query_config,
component_type_filter,
)
}
pub fn subcorpus_graph(&self, corpus_name: &str, corpus_ids: Vec<String>) -> Result<Graph> {
let db_entry = self.get_fully_loaded_entry(corpus_name)?;
let mut query = Disjunction {
alternatives: vec![],
};
for source_corpus_id in corpus_ids {
let source_corpus_id: &str = if source_corpus_id.starts_with("salt:/") {
&source_corpus_id[6..]
} else {
&source_corpus_id
};
let mut q = Conjunction::new();
let corpus_idx = q.add_node(
NodeSearchSpec::ExactValue {
ns: Some(db::ANNIS_NS.to_string()),
name: db::NODE_NAME.to_string(),
val: Some(source_corpus_id.to_string()),
is_meta: false,
},
None,
);
let any_node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
q.add_operator(
Box::new(operators::PartOfSubCorpusSpec {
dist: RangeSpec::Unbound,
}),
&any_node_idx,
&corpus_idx,
true,
)?;
query.alternatives.push(q);
}
extract_subgraph_by_query(&db_entry, &query, &[1], &self.query_config, None)
}
pub fn corpus_graph(&self, corpus_name: &str) -> Result<Graph> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let subcorpus_components = {
let lock = db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
db.get_all_components(Some(ComponentType::PartOfSubcorpus), None)
};
let db_entry = self.get_loaded_entry_with_components(corpus_name, subcorpus_components)?;
let mut query = Conjunction::new();
query.add_node(
NodeSearchSpec::new_exact(Some(ANNIS_NS), NODE_TYPE, Some("corpus"), false),
None,
);
extract_subgraph_by_query(
&db_entry,
&query.into_disjunction(),
&[0],
&self.query_config,
Some(ComponentType::PartOfSubcorpus),
)
}
pub fn frequency(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
definition: Vec<FrequencyDefEntry>,
) -> Result<FrequencyTable<String>> {
let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;
let lock = prep.db_entry.read().unwrap();
let db: &Graph = get_read_or_error(&lock)?;
let mut annokeys: Vec<(usize, Vec<AnnoKey>)> = Vec::default();
for def in definition {
if let Some(node_ref) = prep.query.get_variable_pos(&def.node_ref) {
if let Some(ns) = def.ns {
annokeys.push((
node_ref,
vec![AnnoKey {
ns: ns.clone(),
name: def.name.clone(),
}],
));
} else {
annokeys.push((node_ref, db.node_annos.get_qnames(&def.name)));
}
}
}
let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
let mut tuple_frequency: FxHashMap<Vec<String>, usize> = FxHashMap::default();
for mgroup in plan {
let mut tuple: Vec<String> = Vec::with_capacity(annokeys.len());
for (node_ref, anno_keys) in &annokeys {
let mut tuple_val: String = String::default();
if *node_ref < mgroup.len() {
let m: &Match = &mgroup[*node_ref];
for k in anno_keys.iter() {
if let Some(val) = db.node_annos.get_value_for_item(&m.node, k) {
tuple_val = val.to_owned();
}
}
}
tuple.push(tuple_val);
}
let tuple_count: &mut usize = tuple_frequency.entry(tuple).or_insert(0);
*tuple_count += 1;
}
let mut result: FrequencyTable<String> = FrequencyTable::default();
for (tuple, count) in tuple_frequency {
result.push((tuple, count));
}
result.sort_by(|a, b| a.1.cmp(&b.1).reverse());
Ok(result)
}
pub fn node_descriptions(
&self,
query: &str,
query_language: QueryLanguage,
) -> Result<Vec<QueryAttributeDescription>> {
let mut result = Vec::new();
let q: Disjunction = match query_language {
QueryLanguage::AQL => aql::parse(query, false)?,
QueryLanguage::AQLQuirksV3 => aql::parse(query, true)?,
};
let mut component_nr = 0;
for alt in q.alternatives {
let alt: Conjunction = alt;
for mut n in alt.get_node_descriptions() {
n.alternative = component_nr;
result.push(n);
}
component_nr += 1;
}
Ok(result)
}
pub fn list_components(
&self,
corpus_name: &str,
ctype: Option<ComponentType>,
name: Option<&str>,
) -> Vec<Component> {
if let Ok(db_entry) = self.get_loaded_entry(corpus_name, false) {
let lock = db_entry.read().unwrap();
if let Ok(db) = get_read_or_error(&lock) {
return db.get_all_components(ctype, name);
}
}
return vec![];
}
pub fn list_node_annotations(
&self,
corpus_name: &str,
list_values: bool,
only_most_frequent_values: bool,
) -> Vec<Annotation> {
let mut result: Vec<Annotation> = Vec::new();
if let Ok(db_entry) = self.get_loaded_entry(corpus_name, false) {
let lock = db_entry.read().unwrap();
if let Ok(db) = get_read_or_error(&lock) {
let node_annos: &AnnoStorage<NodeID> = &db.node_annos;
for key in node_annos.annotation_keys() {
if list_values {
if only_most_frequent_values {
if let Some(val) =
node_annos.get_all_values(&key, true).into_iter().next()
{
result.push(Annotation {
key: key.clone(),
val: val.to_owned(),
});
}
} else {
for val in node_annos.get_all_values(&key, false) {
result.push(Annotation {
key: key.clone(),
val: val.to_owned(),
});
}
}
} else {
result.push(Annotation {
key: key.clone(),
val: String::default(),
});
}
}
}
}
result
}
pub fn list_edge_annotations(
&self,
corpus_name: &str,
component: &Component,
list_values: bool,
only_most_frequent_values: bool,
) -> Vec<Annotation> {
let mut result: Vec<Annotation> = Vec::new();
if let Ok(db_entry) =
self.get_loaded_entry_with_components(corpus_name, vec![component.clone()])
{
let lock = db_entry.read().unwrap();
if let Ok(db) = get_read_or_error(&lock) {
if let Some(gs) = db.get_graphstorage(&component) {
let edge_annos: &AnnotationStorage<Edge> = gs.get_anno_storage();
for key in edge_annos.annotation_keys() {
if list_values {
if only_most_frequent_values {
if let Some(val) =
edge_annos.get_all_values(&key, true).into_iter().next()
{
result.push(Annotation {
key: key.clone(),
val: val.to_owned(),
});
}
} else {
for val in edge_annos.get_all_values(&key, false) {
result.push(Annotation {
key: key.clone(),
val: val.to_owned(),
});
}
}
} else {
result.push(Annotation {
key: key.clone(),
val: String::new(),
});
}
}
}
}
}
result
}
fn check_cache_size_and_remove(&self, keep: Vec<&str>) {
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, keep);
}
}
impl Drop for CorpusStorage {
fn drop(&mut self) {
let &(ref lock, ref cvar) = &*self.active_background_workers;
let mut nr_active_background_workers = lock.lock().unwrap();
while *nr_active_background_workers > 0 {
trace!(
"Waiting for background thread to finish ({} worker(s) left)...",
*nr_active_background_workers
);
nr_active_background_workers = cvar.wait(nr_active_background_workers).unwrap();
}
if let Err(e) = self.lock_file.unlock() {
warn!("Could not unlock CorpusStorage lock file: {:?}", e);
} else {
trace!("Unlocked CorpusStorage lock file");
}
}
}
#[cfg(test)]
mod tests {
extern crate log;
extern crate simplelog;
extern crate tempfile;
use crate::corpusstorage::QueryLanguage;
use crate::update::{GraphUpdate, UpdateEvent};
use crate::CorpusStorage;
#[test]
fn delete() {
if let Ok(tmp) = tempfile::tempdir() {
let cs = CorpusStorage::with_auto_cache_size(tmp.path(), false).unwrap();
let mut g = GraphUpdate::new();
g.add_event(UpdateEvent::AddNode {
node_name: "test".to_string(),
node_type: "node".to_string(),
});
cs.apply_update("testcorpus", &mut g).unwrap();
cs.preload("testcorpus").unwrap();
cs.delete("testcorpus").unwrap();
}
}
#[test]
fn load_cs_twice() {
if let Ok(tmp) = tempfile::tempdir() {
{
let cs = CorpusStorage::with_auto_cache_size(tmp.path(), false).unwrap();
let mut g = GraphUpdate::new();
g.add_event(UpdateEvent::AddNode {
node_name: "test".to_string(),
node_type: "node".to_string(),
});
cs.apply_update("testcorpus", &mut g).unwrap();
}
{
let cs = CorpusStorage::with_auto_cache_size(tmp.path(), false).unwrap();
let mut g = GraphUpdate::new();
g.add_event(UpdateEvent::AddNode {
node_name: "test".to_string(),
node_type: "node".to_string(),
});
cs.apply_update("testcorpus", &mut g).unwrap();
}
}
}
#[test]
fn apply_update_add_and_delete_nodes() {
if let Ok(tmp) = tempfile::tempdir() {
let cs = CorpusStorage::with_auto_cache_size(tmp.path(), false).unwrap();
let mut g = GraphUpdate::new();
g.add_event(UpdateEvent::AddNode {
node_name: "root".to_string(),
node_type: "corpus".to_string(),
});
g.add_event(UpdateEvent::AddNode {
node_name: "root/doc1".to_string(),
node_type: "corpus".to_string(),
});
g.add_event(UpdateEvent::AddNode {
node_name: "root/doc1#MyToken1".to_string(),
node_type: "node".to_string(),
});
g.add_event(UpdateEvent::AddNode {
node_name: "root/doc1#MyToken2".to_string(),
node_type: "node".to_string(),
});
g.add_event(UpdateEvent::AddEdge {
source_node: "root/doc1#MyToken1".to_owned(),
target_node: "root/doc1#MyToken2".to_owned(),
layer: "dep".to_owned(),
component_type: "Pointing".to_owned(),
component_name: "dep".to_owned(),
});
g.add_event(UpdateEvent::AddNode {
node_name: "root/doc2".to_string(),
node_type: "corpus".to_string(),
});
g.add_event(UpdateEvent::AddNode {
node_name: "root/doc2#MyToken".to_string(),
node_type: "node".to_string(),
});
cs.apply_update("root", &mut g).unwrap();
let node_count = cs.count("root", "node", QueryLanguage::AQL).unwrap();
assert_eq!(3, node_count);
let edge_count = cs
.count("root", "node ->dep node", QueryLanguage::AQL)
.unwrap();
assert_eq!(1, edge_count);
let mut g = GraphUpdate::new();
g.add_event(UpdateEvent::DeleteNode {
node_name: "root/doc1#MyToken2".to_string(),
});
cs.apply_update("root", &mut g).unwrap();
let node_count = cs.count("root", "node", QueryLanguage::AQL).unwrap();
assert_eq!(2, node_count);
let edge_count = cs
.count("root", "node ->dep node", QueryLanguage::AQL)
.unwrap();
assert_eq!(0, edge_count);
}
}
}
fn get_read_or_error<'a>(lock: &'a RwLockReadGuard<CacheEntry>) -> Result<&'a Graph> {
if let CacheEntry::Loaded(ref db) = &**lock {
return Ok(db);
} else {
return Err(ErrorKind::LoadingDBFailed("".to_string()).into());
}
}
fn get_write_or_error<'a>(lock: &'a mut RwLockWriteGuard<CacheEntry>) -> Result<&'a mut Graph> {
if let CacheEntry::Loaded(ref mut db) = &mut **lock {
return Ok(db);
} else {
return Err("Could get loaded graph storage entry".into());
}
}
fn check_cache_size_and_remove_with_cache(
cache: &mut LinkedHashMap<String, Arc<RwLock<CacheEntry>>>,
cache_strategy: &CacheStrategy,
keep: Vec<&str>,
) {
let mut mem_ops = MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
let keep: HashSet<&str> = keep.into_iter().collect();
let mut size_sum: usize = 0;
let mut db_sizes: LinkedHashMap<String, usize> = LinkedHashMap::new();
for (corpus, db_entry) in cache.iter() {
let lock = db_entry.read().unwrap();
if let CacheEntry::Loaded(ref db) = &*lock {
let s = db.size_of_cached(&mut mem_ops);
size_sum += s;
db_sizes.insert(corpus.clone(), s);
}
}
let max_cache_size: usize = match cache_strategy {
CacheStrategy::FixedMaxMemory(max_size) => *max_size,
CacheStrategy::PercentOfFreeMemory(max_percent) => {
if let Ok(mem) = sys_info::mem_info() {
let free_system_mem: usize = mem.avail as usize * 1024;
let available_memory: usize = free_system_mem + size_sum;
((available_memory as f64) * (max_percent / 100.0)) as usize
} else {
0
}
}
};
debug!(
"Current cache size is {:.2} MB / max {:.2} MB",
(size_sum as f64) / (1024.0 * 1024.0),
(max_cache_size as f64) / (1024.0 * 1024.0)
);
for (corpus_name, corpus_size) in db_sizes.iter() {
if size_sum > max_cache_size {
if !keep.contains(corpus_name.as_str()) {
info!("Removing corpus {} from cache", corpus_name);
cache.remove(corpus_name);
size_sum -= corpus_size;
debug!(
"Current cache size is {:.2} MB / max {:.2} MB",
(size_sum as f64) / (1024.0 * 1024.0),
(max_cache_size as f64) / (1024.0 * 1024.0)
);
}
} else {
break;
}
}
}
fn extract_subgraph_by_query(
db_entry: &Arc<RwLock<CacheEntry>>,
query: &Disjunction,
match_idx: &[usize],
query_config: &query::Config,
component_type_filter: Option<ComponentType>,
) -> Result<Graph> {
let lock = db_entry.read().unwrap();
let orig_db = get_read_or_error(&lock)?;
let plan = ExecutionPlan::from_disjunction(&query, &orig_db, &query_config).chain_err(|| "")?;
debug!("executing subgraph query\n{}", plan);
let mut match_result: BTreeSet<Match> = BTreeSet::new();
let mut result = Graph::new();
for r in plan {
trace!("subgraph query found match {:?}", r);
for i in match_idx.iter().cloned() {
if i < r.len() {
let m: &Match = &r[i];
if !match_result.contains(m) {
match_result.insert(m.clone());
trace!("subgraph query extracted node {:?}", m.node);
create_subgraph_node(m.node, &mut result, orig_db);
}
}
}
}
let components = orig_db.get_all_components(component_type_filter, None);
for m in &match_result {
create_subgraph_edge(m.node, &mut result, orig_db, &components);
}
Ok(result)
}
fn create_subgraph_node(id: NodeID, db: &mut Graph, orig_db: &Graph) {
let node_annos = Arc::make_mut(&mut db.node_annos);
for a in orig_db.node_annos.get_annotations_for_item(&id) {
node_annos.insert(id, a);
}
}
fn create_subgraph_edge(
source_id: NodeID,
db: &mut Graph,
orig_db: &Graph,
components: &[Component],
) {
for c in components {
if !(c.ctype == ComponentType::Coverage && c.layer == "annis" && c.name != "")
&& !(c.ctype == ComponentType::LeftToken)
&& !(c.ctype == ComponentType::RightToken)
{
if let Some(orig_gs) = orig_db.get_graphstorage(c) {
for target in orig_gs.get_outgoing_edges(source_id) {
if !db.node_annos.get_all_keys_for_item(&target).is_empty() {
let e = Edge {
source: source_id,
target,
};
if let Ok(new_gs) = db.get_or_create_writable(&c) {
new_gs.add_edge(e.clone());
}
for a in orig_gs.get_anno_storage().get_annotations_for_item(&Edge {
source: source_id,
target,
}) {
if let Ok(new_gs) = db.get_or_create_writable(&c) {
new_gs.add_edge_annotation(e.clone(), a);
}
}
}
}
}
}
}
}
fn create_lockfile_for_directory(db_dir: &Path) -> Result<File> {
std::fs::create_dir_all(&db_dir)
.chain_err(|| format!("Could not create directory {}", db_dir.to_string_lossy()))?;
let lock_file_path = db_dir.join("db.lock");
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(lock_file_path.as_path())
.chain_err(|| {
format!(
"Could not open or create lockfile {}",
lock_file_path.to_string_lossy()
)
})?;
lock_file.try_lock_exclusive().chain_err(|| {
format!(
"Could not acquire lock for directory {}",
db_dir.to_string_lossy()
)
})?;
Ok(lock_file)
}