use crate::annis::db;
use crate::annis::db::aql;
use crate::annis::db::aql::operators;
use crate::annis::db::aql::operators::RangeSpec;
use crate::annis::db::exec::nodesearch::NodeSearchSpec;
use crate::annis::db::plan::ExecutionPlan;
use crate::annis::db::query;
use crate::annis::db::query::conjunction::Conjunction;
use crate::annis::db::query::disjunction::Disjunction;
use crate::annis::db::relannis;
use crate::annis::db::sort_matches::CollationType;
use crate::annis::db::token_helper;
use crate::annis::db::token_helper::TokenHelper;
use crate::annis::errors::*;
use crate::annis::types::CountExtra;
use crate::annis::types::{
CorpusConfiguration, FrequencyTable, FrequencyTableRow, QueryAttributeDescription,
};
use crate::annis::util::quicksort;
use crate::{
graph::Match,
malloc_size_of::{MallocSizeOf, MallocSizeOfOps},
AnnotationGraph,
};
use fs2::FileExt;
use graphannis_core::{
annostorage::ValueSearch,
graph::{
storage::GraphStatistic, update::GraphUpdate, ANNIS_NS, NODE_NAME, NODE_NAME_KEY, NODE_TYPE,
},
types::{AnnoKey, Annotation, Component, Edge, NodeID},
util::memory_estimation,
};
use linked_hash_map::LinkedHashMap;
use percent_encoding::{utf8_percent_encode, AsciiSet, CONTROLS};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashSet};
use std::fmt;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::iter::FromIterator;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Condvar, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::thread;
use rustc_hash::FxHashMap;
use rand::seq::SliceRandom;
use std::{
ffi::CString,
io::{BufReader, Write},
};
use anyhow::{Context, Error};
use aql::model::AnnotationComponentType;
use db::AnnotationStorage;
#[cfg(test)]
mod tests;
enum CacheEntry {
Loaded(AnnotationGraph),
NotLoaded,
}
#[derive(Debug, Ord, Eq, PartialOrd, PartialEq)]
pub enum LoadStatus {
NotLoaded,
PartiallyLoaded(usize),
FullyLoaded(usize),
}
pub struct GraphStorageInfo {
pub component: Component<AnnotationComponentType>,
pub load_status: LoadStatus,
pub number_of_annotations: usize,
pub implementation: String,
pub statistics: Option<GraphStatistic>,
}
impl fmt::Display for GraphStorageInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(
f,
"Component {}: {} annnotations",
self.component, self.number_of_annotations
)?;
if let Some(stats) = &self.statistics {
writeln!(f, "Stats: {}", stats)?;
}
writeln!(f, "Implementation: {}", self.implementation)?;
match self.load_status {
LoadStatus::NotLoaded => writeln!(f, "Not Loaded")?,
LoadStatus::PartiallyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "partially loaded")?;
writeln!(
f,
"Memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
LoadStatus::FullyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "fully loaded")?;
writeln!(
f,
"Memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
};
Ok(())
}
}
pub struct CorpusInfo {
pub name: String,
pub load_status: LoadStatus,
pub node_annos_load_size: Option<usize>,
pub graphstorages: Vec<GraphStorageInfo>,
pub config: CorpusConfiguration,
}
impl fmt::Display for CorpusInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.load_status {
LoadStatus::NotLoaded => writeln!(f, "Not Loaded")?,
LoadStatus::PartiallyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "partially loaded")?;
writeln!(
f,
"Total memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
LoadStatus::FullyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "fully loaded")?;
writeln!(
f,
"Total memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
};
if let Some(memory_size) = self.node_annos_load_size {
writeln!(
f,
"Node Annotations: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
if !self.graphstorages.is_empty() {
writeln!(f, "------------")?;
for gs in &self.graphstorages {
write!(f, "{}", gs)?;
writeln!(f, "------------")?;
}
}
Ok(())
}
}
#[derive(Debug, PartialEq, Clone, Copy, Serialize, Deserialize)]
#[repr(C)]
pub enum ResultOrder {
Normal,
Inverted,
Randomized,
NotSorted,
}
impl Default for ResultOrder {
fn default() -> Self {
ResultOrder::Normal
}
}
struct PreparationResult<'a> {
query: Disjunction<'a>,
db_entry: Arc<RwLock<CacheEntry>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FrequencyDefEntry {
#[serde(default)]
pub ns: Option<String>,
pub name: String,
pub node_ref: String,
}
impl FromStr for FrequencyDefEntry {
type Err = Error;
fn from_str(s: &str) -> std::result::Result<FrequencyDefEntry, Self::Err> {
let splitted: Vec<&str> = s.splitn(2, ':').collect();
if splitted.len() != 2 {
bail!(
"Frequency definition must consists of two parts: \
the referenced node and the annotation name or \"tok\" separated by \":\""
);
}
let node_ref = splitted[0];
let anno_key = graphannis_core::util::split_qname(splitted[1]);
Ok(FrequencyDefEntry {
ns: anno_key.0.map(String::from),
name: String::from(anno_key.1),
node_ref: String::from(node_ref),
})
}
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum QueryLanguage {
AQL,
AQLQuirksV3,
}
impl Default for QueryLanguage {
fn default() -> Self {
QueryLanguage::AQL
}
}
#[repr(C)]
#[derive(Clone, Copy)]
pub enum ImportFormat {
RelANNIS,
GraphML,
}
#[repr(C)]
#[derive(Clone, Copy)]
pub enum ExportFormat {
GraphML,
GraphMLZip,
GraphMLDirectory,
}
pub enum CacheStrategy {
FixedMaxMemory(usize),
PercentOfFreeMemory(f64),
}
pub const SALT_URI_ENCODE_SET: &AsciiSet = &CONTROLS.add(b' ').add(b':').add(b'%');
pub const PATH_SEGMENT_ENCODE_SET: &AsciiSet = &CONTROLS
.add(b' ')
.add(b'"')
.add(b'#')
.add(b'<')
.add(b'>')
.add(b'`')
.add(b'?')
.add(b'{')
.add(b'}')
.add(b'%')
.add(b'/');
pub struct CorpusStorage {
db_dir: PathBuf,
lock_file: File,
cache_strategy: CacheStrategy,
corpus_cache: RwLock<LinkedHashMap<String, Arc<RwLock<CacheEntry>>>>,
query_config: query::Config,
active_background_workers: Arc<(Mutex<usize>, Condvar)>,
}
fn init_locale() {
unsafe {
let locale = CString::new("").unwrap_or_default();
libc::setlocale(libc::LC_COLLATE, locale.as_ptr());
}
}
fn add_subgraph_precedence(
query: &mut Disjunction,
ctx: usize,
m: &NodeSearchSpec,
left: bool,
) -> Result<()> {
{
let mut q = Conjunction::new();
let node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
let tok_idx = q.add_node(NodeSearchSpec::AnyToken, None);
let m_idx = q.add_node(m.clone(), None);
q.add_operator(
Box::new(operators::OverlapSpec { reflexive: true }),
&node_idx,
&tok_idx,
true,
)?;
q.add_operator(
Box::new(operators::PrecedenceSpec {
segmentation: None,
dist: RangeSpec::Bound {
min_dist: 0,
max_dist: ctx,
},
}),
if left { &tok_idx } else { &m_idx },
if left { &m_idx } else { &tok_idx },
true,
)?;
query.alternatives.push(q);
}
Ok(())
}
fn add_subgraph_precedence_with_segmentation(
query: &mut Disjunction,
ctx: usize,
segmentation: &str,
m: &NodeSearchSpec,
left: bool,
) -> Result<()> {
{
let mut q = Conjunction::new();
let target_idx = q.add_node(NodeSearchSpec::AnyNode, None);
let node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
let m_node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
let m_idx = q.add_node(m.clone(), None);
q.add_operator(
Box::new(operators::OverlapSpec { reflexive: true }),
&m_node_idx,
&m_idx,
false,
)?;
q.add_operator(
Box::new(operators::OverlapSpec { reflexive: true }),
&target_idx,
&node_idx,
false,
)?;
q.add_operator(
Box::new(operators::PrecedenceSpec {
segmentation: Some(segmentation.to_string()),
dist: RangeSpec::Bound {
min_dist: 0,
max_dist: ctx,
},
}),
if left { &node_idx } else { &m_node_idx },
if left { &m_node_idx } else { &node_idx },
false,
)?;
query.alternatives.push(q);
}
Ok(())
}
type FindIterator<'a> = Box<dyn Iterator<Item = Vec<Match>> + 'a>;
impl CorpusStorage {
pub fn with_cache_strategy(
db_dir: &Path,
cache_strategy: CacheStrategy,
use_parallel_joins: bool,
) -> Result<CorpusStorage> {
init_locale();
let query_config = query::Config { use_parallel_joins };
#[allow(clippy::mutex_atomic)]
let active_background_workers = Arc::new((Mutex::new(0), Condvar::new()));
let cs = CorpusStorage {
db_dir: PathBuf::from(db_dir),
lock_file: create_lockfile_for_directory(db_dir)?,
cache_strategy,
corpus_cache: RwLock::new(LinkedHashMap::new()),
query_config,
active_background_workers,
};
Ok(cs)
}
pub fn with_auto_cache_size(db_dir: &Path, use_parallel_joins: bool) -> Result<CorpusStorage> {
init_locale();
let query_config = query::Config { use_parallel_joins };
let cache_strategy: CacheStrategy = CacheStrategy::PercentOfFreeMemory(25.0);
#[allow(clippy::mutex_atomic)]
let active_background_workers = Arc::new((Mutex::new(0), Condvar::new()));
let cs = CorpusStorage {
db_dir: PathBuf::from(db_dir),
lock_file: create_lockfile_for_directory(db_dir)?,
cache_strategy,
corpus_cache: RwLock::new(LinkedHashMap::new()),
query_config,
active_background_workers,
};
Ok(cs)
}
pub fn list(&self) -> Result<Vec<CorpusInfo>> {
let names: Vec<String> = self.list_from_disk().unwrap_or_default();
let mut result: Vec<CorpusInfo> = vec![];
let mut mem_ops =
MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
for n in names {
let corpus_info = self.create_corpus_info(&n, &mut mem_ops)?;
result.push(corpus_info);
}
Ok(result)
}
fn list_from_disk(&self) -> Result<Vec<String>> {
let mut corpora: Vec<String> = Vec::new();
let directories = self.db_dir.read_dir().with_context(|| {
format!(
"Listing directories from {} failed",
self.db_dir.to_string_lossy()
)
})?;
for c_dir in directories {
let c_dir = c_dir.with_context(|| {
format!(
"Could not get directory entry of folder {}",
self.db_dir.to_string_lossy()
)
})?;
let ftype = c_dir.file_type().with_context(|| {
format!(
"Could not determine file type for {}",
c_dir.path().to_string_lossy()
)
})?;
if ftype.is_dir() {
let corpus_name = c_dir.file_name().to_string_lossy().to_string();
corpora.push(corpus_name.clone());
}
}
Ok(corpora)
}
fn get_corpus_config(&self, corpus_name: &str) -> Result<Option<CorpusConfiguration>> {
let corpus_config_path = self.db_dir.join(corpus_name).join("corpus-config.toml");
if corpus_config_path.is_file() {
let file_content = std::fs::read_to_string(corpus_config_path)?;
let config = toml::from_str(&file_content)?;
Ok(Some(config))
} else {
Ok(None)
}
}
fn create_corpus_info(
&self,
corpus_name: &str,
mem_ops: &mut MallocSizeOfOps,
) -> Result<CorpusInfo> {
let cache_entry = self.get_entry(corpus_name)?;
let lock = cache_entry.read().unwrap();
let config: CorpusConfiguration = self
.get_corpus_config(corpus_name)
.with_context(|| {
format!(
"Loading corpus-config.toml for corpus {} failed",
corpus_name
)
})?
.unwrap_or_default();
let corpus_info: CorpusInfo = match &*lock {
CacheEntry::Loaded(ref db) => {
let heap_size = db.size_of(mem_ops);
let mut load_status = LoadStatus::FullyLoaded(heap_size);
let node_annos_load_size = Some(db.get_node_annos().size_of(mem_ops));
let mut graphstorages = Vec::new();
for c in db.get_all_components(None, None) {
if let Some(gs) = db.get_graphstorage_as_ref(&c) {
graphstorages.push(GraphStorageInfo {
component: c.clone(),
load_status: LoadStatus::FullyLoaded(gs.size_of(mem_ops)),
number_of_annotations: gs.get_anno_storage().number_of_annotations(),
implementation: gs.serialization_id().clone(),
statistics: gs.get_statistics().cloned(),
});
} else {
load_status = LoadStatus::PartiallyLoaded(heap_size);
graphstorages.push(GraphStorageInfo {
component: c.clone(),
load_status: LoadStatus::NotLoaded,
number_of_annotations: 0,
implementation: "".to_owned(),
statistics: None,
})
}
}
CorpusInfo {
name: corpus_name.to_owned(),
load_status,
graphstorages,
node_annos_load_size,
config,
}
}
&CacheEntry::NotLoaded => CorpusInfo {
name: corpus_name.to_owned(),
load_status: LoadStatus::NotLoaded,
graphstorages: vec![],
node_annos_load_size: None,
config,
},
};
Ok(corpus_info)
}
pub fn info(&self, corpus_name: &str) -> Result<CorpusInfo> {
let mut mem_ops =
MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
self.create_corpus_info(corpus_name, &mut mem_ops)
}
fn get_entry(&self, corpus_name: &str) -> Result<Arc<RwLock<CacheEntry>>> {
let corpus_name = corpus_name.to_string();
{
let cache_lock = self.corpus_cache.read().unwrap();
let cache = &*cache_lock;
if let Some(e) = cache.get(&corpus_name) {
return Ok(e.clone());
}
}
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
let entry = cache
.entry(corpus_name)
.or_insert_with(|| Arc::new(RwLock::new(CacheEntry::NotLoaded)));
Ok(entry.clone())
}
fn load_entry_with_lock(
&self,
cache_lock: &mut RwLockWriteGuard<LinkedHashMap<String, Arc<RwLock<CacheEntry>>>>,
corpus_name: &str,
create_if_missing: bool,
) -> Result<Arc<RwLock<CacheEntry>>> {
let cache = &mut *cache_lock;
let escaped_corpus_name: Cow<str> =
utf8_percent_encode(&corpus_name, PATH_SEGMENT_ENCODE_SET).into();
let db_path: PathBuf = [self.db_dir.to_string_lossy().as_ref(), &escaped_corpus_name]
.iter()
.collect();
let create_corpus = if db_path.is_dir() {
false
} else if create_if_missing {
true
} else {
return Err(GraphAnnisError::NoSuchCorpus(corpus_name.to_string()).into());
};
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![]);
let db = if create_corpus {
let mut db = AnnotationGraph::with_default_graphstorages(false)?;
db.persist_to(&db_path)
.with_context(|| format!("Could not create corpus with name {}", corpus_name))?;
db
} else {
let mut db = AnnotationGraph::new(false)?;
db.load_from(&db_path, false)?;
db
};
let entry = Arc::new(RwLock::new(CacheEntry::Loaded(db)));
cache.remove(corpus_name);
cache.insert(String::from(corpus_name), entry.clone());
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![corpus_name]);
Ok(entry)
}
fn get_loaded_entry(
&self,
corpus_name: &str,
create_if_missing: bool,
) -> Result<Arc<RwLock<CacheEntry>>> {
let cache_entry = self.get_entry(corpus_name)?;
let loaded = {
let lock = cache_entry.read().unwrap();
match &*lock {
CacheEntry::Loaded(_) => true,
_ => false,
}
};
if loaded {
Ok(cache_entry)
} else {
let mut cache_lock = self.corpus_cache.write().unwrap();
self.load_entry_with_lock(&mut cache_lock, corpus_name, create_if_missing)
}
}
fn get_loaded_entry_with_components(
&self,
corpus_name: &str,
components: Vec<Component<AnnotationComponentType>>,
) -> Result<Arc<RwLock<CacheEntry>>> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let missing_components = {
let lock = db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let mut missing: HashSet<_> = HashSet::new();
for c in components {
if !db.is_loaded(&c) {
missing.insert(c);
}
}
missing
};
if !missing_components.is_empty() {
let mut lock = db_entry.write().unwrap();
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
};
Ok(db_entry)
}
fn get_fully_loaded_entry(&self, corpus_name: &str) -> Result<Arc<RwLock<CacheEntry>>> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let missing_components = {
let lock = db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let mut missing: HashSet<_> = HashSet::new();
for c in db.get_all_components(None, None) {
if !db.is_loaded(&c) {
missing.insert(c);
}
}
missing
};
if !missing_components.is_empty() {
let mut lock = db_entry.write().unwrap();
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
};
Ok(db_entry)
}
pub fn import_all_from_zip<R, F>(
&self,
zip_file: R,
disk_based: bool,
overwrite_existing: bool,
progress_callback: F,
) -> Result<Vec<String>>
where
R: Read + Seek,
F: Fn(&str),
{
let tmp_dir = tempfile::tempdir()?;
debug!(
"Using temporary directory {} to extract ZIP file content.",
tmp_dir.path().to_string_lossy()
);
let mut archive = zip::ZipArchive::new(zip_file)?;
let mut relannis_files = Vec::new();
let mut graphannis_files = Vec::new();
for i in 0..archive.len() {
let mut file = archive.by_index(i)?;
let output_path = tmp_dir.path().join(file.sanitized_name());
if let Some(file_name) = output_path.file_name() {
if file_name == "corpus.annis" || file_name == "corpus.tab" {
if let Some(relannis_root) = output_path.parent() {
relannis_files.push(relannis_root.to_owned())
}
} else if let Some(ext) = output_path.extension() {
if ext.to_string_lossy().to_ascii_lowercase() == "graphml" {
graphannis_files.push(output_path.clone());
}
}
}
debug!(
"copying ZIP file content {}",
file.sanitized_name().to_string_lossy(),
);
if file.is_dir() {
std::fs::create_dir_all(output_path)?;
} else if let Some(parent) = output_path.parent() {
std::fs::create_dir_all(parent)?;
let mut output_file = std::fs::File::create(&output_path)?;
std::io::copy(&mut file, &mut output_file)?;
}
}
let mut corpus_names = Vec::new();
for p in relannis_files {
info!("importing relANNIS corpus from {}", p.to_string_lossy());
let name = self.import_from_fs(
&p,
ImportFormat::RelANNIS,
None,
disk_based,
overwrite_existing,
&progress_callback,
)?;
corpus_names.push(name);
}
for p in graphannis_files {
info!("importing corpus from {}", p.to_string_lossy());
let name = self.import_from_fs(
&p,
ImportFormat::GraphML,
None,
disk_based,
overwrite_existing,
&progress_callback,
)?;
corpus_names.push(name);
}
debug!(
"deleting temporary directory {}",
tmp_dir.path().to_string_lossy()
);
std::fs::remove_dir_all(tmp_dir.path())?;
Ok(corpus_names)
}
pub fn import_from_fs<F>(
&self,
path: &Path,
format: ImportFormat,
corpus_name: Option<String>,
disk_based: bool,
overwrite_existing: bool,
progress_callback: F,
) -> Result<String>
where
F: Fn(&str),
{
let (orig_name, mut graph, config) = match format {
ImportFormat::RelANNIS => relannis::load(path, disk_based, |status| {
progress_callback(status);
self.check_cache_size_and_remove(vec![]);
})?,
ImportFormat::GraphML => {
let orig_corpus_name = if let Some(file_name) = path.file_stem() {
file_name.to_string_lossy().to_string()
} else {
"UnknownCorpus".to_string()
};
let input_file = File::open(path)?;
let (g, config_str) = graphannis_core::graph::serialization::graphml::import(
input_file,
disk_based,
|status| {
progress_callback(status);
self.check_cache_size_and_remove(vec![]);
},
)?;
let config = if let Some(config_str) = config_str {
toml::from_str(&config_str)?
} else {
CorpusConfiguration::default()
};
(orig_corpus_name, g, config)
}
};
let r = graph.ensure_loaded_all();
if let Err(e) = r {
error!(
"Some error occurred when attempting to load components from disk: {:?}",
e
);
}
let corpus_name = corpus_name.unwrap_or(orig_name);
let escaped_corpus_name: Cow<str> =
utf8_percent_encode(&corpus_name, PATH_SEGMENT_ENCODE_SET).into();
let mut db_path = PathBuf::from(&self.db_dir);
db_path.push(escaped_corpus_name.to_string());
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![]);
if cache.contains_key(&corpus_name) {
if overwrite_existing {
let old_entry = cache.remove(&corpus_name);
if old_entry.is_some() {
if let Err(e) = std::fs::remove_dir_all(db_path.clone()) {
error!("Error when removing existing files {}", e);
}
}
} else {
return Err(GraphAnnisError::CorpusExists(corpus_name.to_string()).into());
}
}
if let Err(e) = std::fs::create_dir_all(&db_path) {
error!(
"Can't create directory {}: {:?}",
db_path.to_string_lossy(),
e
);
}
info!("copying linked files for corpus {}", corpus_name);
let current_dir = PathBuf::from(".");
let files_dir = db_path.join("files");
std::fs::create_dir_all(&files_dir)?;
self.copy_linked_files_and_update_references(
path.parent().unwrap_or(¤t_dir),
&files_dir,
&mut graph,
)?;
info!("saving corpus {} to disk", corpus_name);
let save_result = graph.save_to(&db_path);
if let Err(e) = save_result {
error!(
"Can't save corpus to {}: {:?}",
db_path.to_string_lossy(),
e
);
}
let corpus_config_path = db_path.join("corpus-config.toml");
info!(
"saving corpus configuration file for corpus {} to {}",
corpus_name,
&corpus_config_path.to_string_lossy()
);
std::fs::write(corpus_config_path, toml::to_string(&config)?)?;
cache.insert(
corpus_name.clone(),
Arc::new(RwLock::new(CacheEntry::Loaded(graph))),
);
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![&corpus_name]);
Ok(corpus_name)
}
fn copy_linked_files_and_update_references(
&self,
old_base_path: &Path,
new_base_path: &Path,
graph: &mut AnnotationGraph,
) -> Result<()> {
let linked_file_key = AnnoKey {
ns: ANNIS_NS.to_string(),
name: "file".to_string(),
};
let node_annos: &mut dyn AnnotationStorage<NodeID> = graph.get_node_annos_mut();
let file_nodes: Vec<NodeID> = node_annos
.exact_anno_search(Some(ANNIS_NS), NODE_TYPE, ValueSearch::Some("file"))
.map(|m| m.node)
.collect();
for node in file_nodes {
if let Some(original_path) = node_annos.get_value_for_item(&node, &linked_file_key) {
let original_path = old_base_path
.canonicalize()?
.join(&PathBuf::from(original_path.as_ref()));
if original_path.is_file() {
if let Some(node_name) = node_annos.get_value_for_item(&node, &NODE_NAME_KEY) {
let new_path = new_base_path.join(node_name.as_ref());
if let Some(parent) = new_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::copy(&original_path, &new_path)?;
let relative_path = new_path.strip_prefix(&new_base_path)?;
node_annos.insert(
node,
Annotation {
key: linked_file_key.clone(),
val: relative_path.to_string_lossy().to_string(),
},
)?;
}
}
}
}
Ok(())
}
fn get_linked_files<'a>(
&'a self,
corpus_name: &'a str,
graph: &'a AnnotationGraph,
) -> Result<impl Iterator<Item = (String, PathBuf)> + 'a> {
let linked_file_key = AnnoKey {
ns: ANNIS_NS.to_string(),
name: "file".to_string(),
};
let base_path = self.db_dir.join(corpus_name).join("files").canonicalize()?;
let node_annos: &dyn AnnotationStorage<NodeID> = graph.get_node_annos();
let it = node_annos
.exact_anno_search(Some(ANNIS_NS), NODE_TYPE, ValueSearch::Some("file"))
.filter_map(move |m| {
if let Some(node_name) = node_annos.get_value_for_item(&m.node, &NODE_NAME_KEY) {
if let Some(file_path_value) =
node_annos.get_value_for_item(&m.node, &linked_file_key)
{
return Some((
node_name.to_string(),
base_path.join(file_path_value.to_string()),
));
}
}
None
});
Ok(it)
}
fn copy_linked_files_to_disk(
&self,
corpus_name: &str,
new_base_path: &Path,
graph: &AnnotationGraph,
) -> Result<()> {
for (node_name, original_path) in self.get_linked_files(corpus_name, graph)? {
let node_name: String = node_name;
if original_path.is_file() {
let new_path = new_base_path.join(&node_name);
if let Some(parent) = new_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::copy(&original_path, &new_path)?;
}
}
Ok(())
}
fn export_corpus_graphml(&self, corpus_name: &str, path: &Path) -> Result<()> {
let output_file = File::create(path)?;
let entry = self.get_loaded_entry(corpus_name, false)?;
{
let mut lock = entry.write().unwrap();
let graph: &mut AnnotationGraph = get_write_or_error(&mut lock)?;
graph.ensure_loaded_all()?;
}
let lock = entry.read().unwrap();
let graph: &AnnotationGraph = get_read_or_error(&lock)?;
let config_as_str = if let Some(config) = self.get_corpus_config(corpus_name)? {
Some(toml::to_string_pretty(&config)?)
} else {
None
};
let config_as_str = config_as_str.as_deref();
graphannis_core::graph::serialization::graphml::export(
graph,
config_as_str,
output_file,
|status| {
info!("{}", status);
},
)?;
if let Some(parent_dir) = path.parent() {
self.copy_linked_files_to_disk(corpus_name, &parent_dir, &graph)?;
}
Ok(())
}
pub fn export_corpus_zip<W, F>(
&self,
corpus_name: &str,
use_corpus_subdirectory: bool,
mut zip: &mut zip::ZipWriter<W>,
progress_callback: F,
) -> Result<()>
where
W: Write + Seek,
F: Fn(&str),
{
let options =
zip::write::FileOptions::default().compression_method(zip::CompressionMethod::Deflated);
let mut base_path = PathBuf::default();
if use_corpus_subdirectory {
base_path.push(corpus_name);
}
let path_in_zip = base_path.join(format!("{}.graphml", corpus_name));
zip.start_file_from_path(&path_in_zip, options)?;
let entry = self.get_loaded_entry(corpus_name, false)?;
{
let mut lock = entry.write().unwrap();
let graph: &mut AnnotationGraph = get_write_or_error(&mut lock)?;
graph.ensure_loaded_all()?;
}
let lock = entry.read().unwrap();
let graph: &AnnotationGraph = get_read_or_error(&lock)?;
let config_as_str = if let Some(config) = self.get_corpus_config(corpus_name)? {
Some(toml::to_string_pretty(&config)?)
} else {
None
};
let config_as_str: Option<&str> = config_as_str.as_deref();
graphannis_core::graph::serialization::graphml::export(
graph,
config_as_str,
&mut zip,
progress_callback,
)?;
for (node_name, original_path) in self.get_linked_files(corpus_name.as_ref(), graph)? {
let node_name: String = node_name;
zip.start_file_from_path(&base_path.join(&node_name), options)?;
let file_to_copy = File::open(original_path)?;
let mut reader = BufReader::new(file_to_copy);
std::io::copy(&mut reader, zip)?;
}
Ok(())
}
pub fn export_to_fs<S: AsRef<str>>(
&self,
corpora: &[S],
path: &Path,
format: ExportFormat,
) -> Result<()> {
match format {
ExportFormat::GraphML => {
if corpora.len() == 1 {
self.export_corpus_graphml(corpora[0].as_ref(), path)?;
} else {
return Err(anyhow!(
"This format can only export one corpus but {} have been given as argument",
corpora.len()
));
}
}
ExportFormat::GraphMLDirectory => {
let use_corpus_subdirectory = corpora.len() > 1;
for corpus_name in corpora {
let mut path = PathBuf::from(path);
if use_corpus_subdirectory {
path.push(corpus_name.as_ref());
} else {
};
std::fs::create_dir_all(&path)?;
path.push(format!("{}.graphml", corpus_name.as_ref()));
self.export_corpus_graphml(corpus_name.as_ref(), &path)?;
}
}
ExportFormat::GraphMLZip => {
let output_file = File::create(path)?;
let mut zip = zip::ZipWriter::new(output_file);
let use_corpus_subdirectory = corpora.len() > 1;
for corpus_name in corpora {
let corpus_name: &str = corpus_name.as_ref();
self.export_corpus_zip(
corpus_name,
use_corpus_subdirectory,
&mut zip,
|status| {
info!("{}", status);
},
)?;
}
zip.finish()?;
}
}
Ok(())
}
pub fn delete(&self, corpus_name: &str) -> Result<bool> {
let mut db_path = PathBuf::from(&self.db_dir);
db_path.push(corpus_name);
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
if let Some(db_entry) = cache.remove(corpus_name) {
let mut _lock = db_entry.write().unwrap();
if db_path.is_dir() && db_path.exists() {
std::fs::remove_dir_all(db_path).context("Error when removing existing files")?
}
Ok(true)
} else {
Ok(false)
}
}
pub fn apply_update(&self, corpus_name: &str, update: &mut GraphUpdate) -> Result<()> {
let db_entry = self
.get_loaded_entry(corpus_name, true)
.with_context(|| format!("Could not get loaded entry for corpus {}", corpus_name))?;
{
let mut lock = db_entry.write().unwrap();
let db: &mut AnnotationGraph = get_write_or_error(&mut lock)?;
db.apply_update(update, |_| {})?;
}
let active_background_workers = self.active_background_workers.clone();
{
let &(ref lock, ref _cvar) = &*active_background_workers;
let mut nr_active_background_workers = lock.lock().unwrap();
*nr_active_background_workers += 1;
}
thread::spawn(move || {
trace!("Starting background thread to sync WAL updates");
let lock = db_entry.read().unwrap();
if let Ok(db) = get_read_or_error(&lock) {
let db: &AnnotationGraph = db;
if let Err(e) = db.background_sync_wal_updates() {
error!("Can't sync changes in background thread: {:?}", e);
} else {
trace!("Finished background thread to sync WAL updates");
}
}
let &(ref lock, ref cvar) = &*active_background_workers;
let mut nr_active_background_workers = lock.lock().unwrap();
*nr_active_background_workers -= 1;
cvar.notify_all();
});
Ok(())
}
fn prepare_query<'a, F>(
&self,
corpus_name: &str,
query: &'a str,
query_language: QueryLanguage,
additional_components_callback: F,
) -> Result<PreparationResult<'a>>
where
F: FnOnce(&AnnotationGraph) -> Vec<Component<AnnotationComponentType>>,
{
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let (q, missing_components) = {
let lock = db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let q = match query_language {
QueryLanguage::AQL => aql::parse(query, false)?,
QueryLanguage::AQLQuirksV3 => aql::parse(query, true)?,
};
let necessary_components = q.necessary_components(db);
let mut missing: HashSet<_> = HashSet::from_iter(necessary_components.iter().cloned());
let additional_components = additional_components_callback(db);
missing.extend(additional_components.into_iter());
for c in &necessary_components {
if db.get_graphstorage(c).is_some() {
missing.remove(c);
}
}
let missing: Vec<_> = missing.into_iter().collect();
(q, missing)
};
if !missing_components.is_empty() {
{
let mut lock = db_entry.write().unwrap();
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
}
self.check_cache_size_and_remove(vec![corpus_name]);
};
Ok(PreparationResult { query: q, db_entry })
}
pub fn preload(&self, corpus_name: &str) -> Result<()> {
{
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let mut lock = db_entry.write().unwrap();
let db = get_write_or_error(&mut lock)?;
db.ensure_loaded_all()?;
}
self.check_cache_size_and_remove(vec![corpus_name]);
Ok(())
}
pub fn unload(&self, corpus_name: &str) {
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
cache.remove(corpus_name);
}
pub fn update_statistics(&self, corpus_name: &str) -> Result<()> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let mut lock = db_entry.write().unwrap();
let db: &mut AnnotationGraph = get_write_or_error(&mut lock)?;
db.get_node_annos_mut().calculate_statistics();
for c in db.get_all_components(None, None) {
db.calculate_component_statistics(&c)?;
}
Ok(())
}
pub fn validate_query<S: AsRef<str>>(
&self,
corpus_names: &[S],
query: &str,
query_language: QueryLanguage,
) -> Result<bool> {
for cn in corpus_names {
let prep: PreparationResult =
self.prepare_query(cn.as_ref(), query, query_language, |_| vec![])?;
let lock = prep.db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
}
Ok(true)
}
pub fn plan<S: AsRef<str>>(
&self,
corpus_names: &[S],
query: &str,
query_language: QueryLanguage,
) -> Result<String> {
let mut all_plans = Vec::with_capacity(corpus_names.len());
for cn in corpus_names {
let prep = self.prepare_query(cn.as_ref(), query, query_language, |_| vec![])?;
let lock = prep.db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
all_plans.push(format!("{}:\n{}", cn.as_ref(), plan));
}
Ok(all_plans.join("\n"))
}
pub fn count<S: AsRef<str>>(
&self,
corpus_names: &[S],
query: &str,
query_language: QueryLanguage,
) -> Result<u64> {
let mut total_count: u64 = 0;
for cn in corpus_names {
let prep = self.prepare_query(cn.as_ref(), query, query_language, |_| vec![])?;
let lock = prep.db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
total_count += plan.count() as u64;
}
Ok(total_count)
}
pub fn count_extra<S: AsRef<str>>(
&self,
corpus_names: &[S],
query: &str,
query_language: QueryLanguage,
) -> Result<CountExtra> {
let mut match_count: u64 = 0;
let mut document_count: u64 = 0;
for cn in corpus_names {
let prep = self.prepare_query(cn.as_ref(), query, query_language, |_| vec![])?;
let lock = prep.db_entry.read().unwrap();
let db: &AnnotationGraph = get_read_or_error(&lock)?;
let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
let mut known_documents = HashSet::new();
let result = plan.fold((0, 0), move |acc: (u64, usize), m: Vec<Match>| {
if !m.is_empty() {
let m: &Match = &m[0];
if let Some(node_name) = db
.get_node_annos()
.get_value_for_item(&m.node, &NODE_NAME_KEY)
{
let node_name: &str = &node_name;
let doc_path =
&node_name[0..node_name.rfind('#').unwrap_or_else(|| node_name.len())];
known_documents.insert(doc_path.to_owned());
}
}
(acc.0 + 1, known_documents.len())
});
match_count += result.0;
document_count += result.1 as u64;
}
Ok(CountExtra {
match_count,
document_count,
})
}
fn create_find_iterator_for_query<'b>(
&'b self,
db: &'b AnnotationGraph,
query: &'b Disjunction,
offset: usize,
limit: Option<usize>,
order: ResultOrder,
quirks_mode: bool,
) -> Result<(FindIterator<'b>, Option<usize>)> {
let mut query_config = self.query_config.clone();
if order == ResultOrder::NotSorted {
query_config.use_parallel_joins = false;
}
let plan = ExecutionPlan::from_disjunction(query, &db, &query_config)?;
let mut relannis_version_33 = false;
if quirks_mode {
let mut relannis_version_it = db.get_node_annos().exact_anno_search(
Some(ANNIS_NS),
"relannis-version",
ValueSearch::Any,
);
if let Some(m) = relannis_version_it.next() {
if let Some(v) = db.get_node_annos().get_value_for_item(&m.node, &m.anno_key) {
if v == "3.3" {
relannis_version_33 = true;
}
}
}
}
let mut expected_size: Option<usize> = None;
let base_it: FindIterator = if order == ResultOrder::NotSorted
|| (order == ResultOrder::Normal && plan.is_sorted_by_text() && !quirks_mode)
{
Box::from(plan)
} else {
let estimated_result_size = plan.estimated_output_size();
let mut tmp_results: Vec<Vec<Match>> = Vec::with_capacity(estimated_result_size);
for mgroup in plan {
tmp_results.push(mgroup);
}
if order == ResultOrder::Randomized {
let mut rng = rand::thread_rng();
tmp_results.shuffle(&mut rng);
} else {
let token_helper = TokenHelper::new(db);
let component_order = Component::new(
AnnotationComponentType::Ordering,
ANNIS_NS.to_owned(),
"".to_owned(),
);
let collation = if quirks_mode && !relannis_version_33 {
CollationType::Locale
} else {
CollationType::Default
};
let gs_order = db.get_graphstorage_as_ref(&component_order);
let order_func = |m1: &Vec<Match>, m2: &Vec<Match>| -> std::cmp::Ordering {
if order == ResultOrder::Inverted {
db::sort_matches::compare_matchgroup_by_text_pos(
m1,
m2,
db.get_node_annos(),
token_helper.as_ref(),
gs_order,
collation,
quirks_mode,
)
.reverse()
} else {
db::sort_matches::compare_matchgroup_by_text_pos(
m1,
m2,
db.get_node_annos(),
token_helper.as_ref(),
gs_order,
collation,
quirks_mode,
)
}
};
let sort_size = if let Some(limit) = limit {
offset + limit
} else {
tmp_results.len()
};
if self.query_config.use_parallel_joins {
quicksort::sort_first_n_items_parallel(&mut tmp_results, sort_size, order_func);
} else {
quicksort::sort_first_n_items(&mut tmp_results, sort_size, order_func);
}
}
expected_size = Some(tmp_results.len());
Box::from(tmp_results.into_iter())
};
Ok((base_it, expected_size))
}
fn find_in_single_corpus(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
offset: usize,
limit: Option<usize>,
order: ResultOrder,
) -> Result<(Vec<String>, usize)> {
let prep = self.prepare_query(corpus_name, query, query_language, |db| {
let mut additional_components = vec![Component::new(
AnnotationComponentType::Ordering,
ANNIS_NS.to_owned(),
"".to_owned(),
)];
if order == ResultOrder::Normal || order == ResultOrder::Inverted {
for c in token_helper::necessary_components(db) {
additional_components.push(c);
}
}
additional_components
})?;
let lock = prep.db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
let quirks_mode = match query_language {
QueryLanguage::AQL => false,
QueryLanguage::AQLQuirksV3 => true,
};
let (mut base_it, expected_size) = self.create_find_iterator_for_query(
db,
&prep.query,
offset,
limit,
order,
quirks_mode,
)?;
let mut results: Vec<String> =
if let (Some(expected_size), Some(limit)) = (expected_size, limit) {
Vec::with_capacity(std::cmp::min(expected_size, limit))
} else {
Vec::new()
};
let mut skipped = 0;
while skipped < offset && base_it.next().is_some() {
skipped += 1;
}
let base_it: Box<dyn Iterator<Item = Vec<Match>>> = if let Some(limit) = limit {
Box::new(base_it.take(limit))
} else {
Box::new(base_it)
};
results.extend(base_it.map(|m: Vec<Match>| {
let mut match_desc: Vec<String> = Vec::new();
for (i, singlematch) in m.iter().enumerate() {
let include_in_output = if quirks_mode {
if let Some(var) = prep.query.get_variable_by_pos(i) {
prep.query.is_included_in_output(&var)
} else {
true
}
} else {
true
};
if include_in_output {
let mut node_desc = String::new();
let singlematch_anno_key = &singlematch.anno_key;
if singlematch_anno_key.ns != ANNIS_NS || singlematch_anno_key.name != NODE_TYPE
{
if !singlematch_anno_key.ns.is_empty() {
let encoded_anno_ns: Cow<str> =
utf8_percent_encode(&singlematch_anno_key.ns, SALT_URI_ENCODE_SET)
.into();
node_desc.push_str(&encoded_anno_ns);
node_desc.push_str("::");
}
let encoded_anno_name: Cow<str> =
utf8_percent_encode(&singlematch_anno_key.name, SALT_URI_ENCODE_SET)
.into();
node_desc.push_str(&encoded_anno_name);
node_desc.push_str("::");
}
if let Some(name) = db
.get_node_annos()
.get_value_for_item(&singlematch.node, &NODE_NAME_KEY)
{
node_desc.push_str(&name);
}
match_desc.push(node_desc);
}
}
let mut result = String::new();
result.push_str(&match_desc.join(" "));
result
}));
Ok((results, skipped))
}
pub fn find<S: AsRef<str>>(
&self,
corpus_names: &[S],
query: &str,
query_language: QueryLanguage,
offset: usize,
limit: Option<usize>,
order: ResultOrder,
) -> Result<Vec<String>> {
let mut result = Vec::new();
let mut corpus_names: Vec<String> = corpus_names
.iter()
.map(|c| String::from(c.as_ref()))
.collect();
if order == ResultOrder::Randomized {
let mut rng = rand::thread_rng();
corpus_names.shuffle(&mut rng);
} else if order == ResultOrder::Inverted {
corpus_names.sort();
corpus_names.reverse();
} else {
corpus_names.sort();
}
let mut offset = offset;
let mut limit = limit;
for cn in corpus_names {
let (single_result, skipped) = self.find_in_single_corpus(
cn.as_ref(),
query,
query_language,
offset,
limit,
order,
)?;
let single_result_length = single_result.len();
result.extend(single_result.into_iter());
if let Some(current_limit) = limit {
if current_limit <= single_result_length {
break;
} else {
limit = Some(current_limit - single_result_length);
}
}
if skipped < offset {
offset -= skipped;
} else {
offset = 0;
}
}
Ok(result)
}
pub fn subgraph(
&self,
corpus_name: &str,
node_ids: Vec<String>,
ctx_left: usize,
ctx_right: usize,
segmentation: Option<String>,
) -> Result<AnnotationGraph> {
let db_entry = self.get_fully_loaded_entry(corpus_name)?;
let mut query = Disjunction {
alternatives: vec![],
};
for source_node_id in node_ids {
let source_node_id: &str = if source_node_id.starts_with("salt:/") {
&source_node_id[6..]
} else {
&source_node_id
};
let m = NodeSearchSpec::ExactValue {
ns: Some(ANNIS_NS.to_string()),
name: NODE_NAME.to_string(),
val: Some(source_node_id.to_string()),
is_meta: false,
};
{
let mut q = Conjunction::new();
let node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
let m_idx = q.add_node(m.clone(), None);
q.add_operator(
Box::new(operators::OverlapSpec { reflexive: true }),
&m_idx,
&node_idx,
false,
)?;
query.alternatives.push(q);
}
if let Some(ref segmentation) = segmentation {
add_subgraph_precedence_with_segmentation(
&mut query,
ctx_left,
segmentation,
&m,
true,
)?;
add_subgraph_precedence_with_segmentation(
&mut query,
ctx_right,
segmentation,
&m,
false,
)?;
} else {
add_subgraph_precedence(&mut query, ctx_left, &m, true)?;
add_subgraph_precedence(&mut query, ctx_right, &m, false)?;
}
{
let mut q = Conjunction::new();
let datasource_idx = q.add_node(
NodeSearchSpec::ExactValue {
ns: Some(ANNIS_NS.to_string()),
name: NODE_TYPE.to_string(),
val: Some("datasource".to_string()),
is_meta: false,
},
None,
);
let m_idx = q.add_node(m.clone(), None);
q.add_operator(
Box::new(operators::PartOfSubCorpusSpec {
dist: RangeSpec::Bound {
min_dist: 1,
max_dist: 1,
},
}),
&m_idx,
&datasource_idx,
false,
)?;
query.alternatives.push(q);
}
}
extract_subgraph_by_query(&db_entry, &query, &[0], &self.query_config, None)
}
pub fn subgraph_for_query(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
component_type_filter: Option<AnnotationComponentType>,
) -> Result<AnnotationGraph> {
let prep = self.prepare_query(corpus_name, query, query_language, |g| {
g.get_all_components(component_type_filter.clone(), None)
})?;
let mut max_alt_size = 0;
for alt in &prep.query.alternatives {
max_alt_size = std::cmp::max(max_alt_size, alt.num_of_nodes());
}
let match_idx: Vec<usize> = (0..max_alt_size).collect();
extract_subgraph_by_query(
&prep.db_entry,
&prep.query,
&match_idx,
&self.query_config,
component_type_filter,
)
}
pub fn subcorpus_graph(
&self,
corpus_name: &str,
corpus_ids: Vec<String>,
) -> Result<AnnotationGraph> {
let db_entry = self.get_fully_loaded_entry(corpus_name)?;
let mut query = Disjunction {
alternatives: vec![],
};
for source_corpus_id in corpus_ids {
let source_corpus_id: &str = if source_corpus_id.starts_with("salt:/") {
&source_corpus_id[6..]
} else {
&source_corpus_id
};
{
let mut q = Conjunction::new();
let corpus_idx = q.add_node(
NodeSearchSpec::ExactValue {
ns: Some(ANNIS_NS.to_string()),
name: NODE_NAME.to_string(),
val: Some(source_corpus_id.to_string()),
is_meta: false,
},
None,
);
let any_node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
q.add_operator(
Box::new(operators::PartOfSubCorpusSpec {
dist: RangeSpec::Unbound,
}),
&any_node_idx,
&corpus_idx,
true,
)?;
query.alternatives.push(q);
}
{
let mut q = Conjunction::new();
let corpus_idx = q.add_node(
NodeSearchSpec::ExactValue {
ns: Some(ANNIS_NS.to_string()),
name: NODE_NAME.to_string(),
val: Some(source_corpus_id.to_string()),
is_meta: false,
},
None,
);
let any_node_idx = q.add_node(
NodeSearchSpec::ExactValue {
ns: Some(ANNIS_NS.to_string()),
name: NODE_TYPE.to_string(),
val: Some("datasource".to_string()),
is_meta: false,
},
None,
);
q.add_operator(
Box::new(operators::PartOfSubCorpusSpec {
dist: RangeSpec::Unbound,
}),
&any_node_idx,
&corpus_idx,
true,
)?;
query.alternatives.push(q);
}
}
extract_subgraph_by_query(&db_entry, &query, &[1], &self.query_config, None)
}
pub fn corpus_graph(&self, corpus_name: &str) -> Result<AnnotationGraph> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let subcorpus_components = {
let lock = db_entry.read().unwrap();
let db = get_read_or_error(&lock)?;
db.get_all_components(Some(AnnotationComponentType::PartOf), None)
};
let db_entry = self.get_loaded_entry_with_components(corpus_name, subcorpus_components)?;
let mut query = Conjunction::new();
query.add_node(
NodeSearchSpec::new_exact(Some(ANNIS_NS), NODE_TYPE, Some("corpus"), false),
None,
);
extract_subgraph_by_query(
&db_entry,
&query.into_disjunction(),
&[0],
&self.query_config,
Some(AnnotationComponentType::PartOf),
)
}
pub fn frequency<S: AsRef<str>>(
&self,
corpus_names: &[S],
query: &str,
query_language: QueryLanguage,
definition: Vec<FrequencyDefEntry>,
) -> Result<FrequencyTable<String>> {
let mut tuple_frequency: FxHashMap<Vec<String>, usize> = FxHashMap::default();
for cn in corpus_names {
let prep = self.prepare_query(cn.as_ref(), query, query_language, |_| vec![])?;
let lock = prep.db_entry.read().unwrap();
let db: &AnnotationGraph = get_read_or_error(&lock)?;
let mut annokeys: Vec<(usize, Vec<AnnoKey>)> = Vec::default();
for def in definition.iter() {
if let Some(node_ref) = prep.query.get_variable_pos(&def.node_ref) {
if let Some(ns) = &def.ns {
annokeys.push((
node_ref,
vec![AnnoKey {
ns: ns.clone(),
name: def.name.clone(),
}],
));
} else {
annokeys.push((node_ref, db.get_node_annos().get_qnames(&def.name)));
}
}
}
let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
for mgroup in plan {
let mut tuple: Vec<String> = Vec::with_capacity(annokeys.len());
for (node_ref, anno_keys) in &annokeys {
let mut tuple_val: String = String::default();
if *node_ref < mgroup.len() {
let m: &Match = &mgroup[*node_ref];
for k in anno_keys.iter() {
if let Some(val) = db.get_node_annos().get_value_for_item(&m.node, k) {
tuple_val = val.to_string();
}
}
}
tuple.push(tuple_val);
}
let tuple_count: &mut usize = tuple_frequency.entry(tuple).or_insert(0);
*tuple_count += 1;
}
}
let mut result: FrequencyTable<String> = FrequencyTable::default();
for (tuple, count) in tuple_frequency {
result.push(FrequencyTableRow {
values: tuple,
count,
});
}
result.sort_by(|a, b| a.count.cmp(&b.count).reverse());
Ok(result)
}
pub fn node_descriptions(
&self,
query: &str,
query_language: QueryLanguage,
) -> Result<Vec<QueryAttributeDescription>> {
let mut result = Vec::new();
let q: Disjunction = match query_language {
QueryLanguage::AQL => aql::parse(query, false)?,
QueryLanguage::AQLQuirksV3 => aql::parse(query, true)?,
};
for (component_nr, alt) in q.alternatives.iter().enumerate() {
for mut n in alt.get_node_descriptions() {
n.alternative = component_nr;
result.push(n);
}
}
Ok(result)
}
pub fn list_components(
&self,
corpus_name: &str,
ctype: Option<AnnotationComponentType>,
name: Option<&str>,
) -> Vec<Component<AnnotationComponentType>> {
if let Ok(db_entry) = self.get_loaded_entry(corpus_name, false) {
let lock = db_entry.read().unwrap();
if let Ok(db) = get_read_or_error(&lock) {
return db.get_all_components(ctype, name);
}
}
return vec![];
}
pub fn list_node_annotations(
&self,
corpus_name: &str,
list_values: bool,
only_most_frequent_values: bool,
) -> Vec<Annotation> {
let mut result: Vec<Annotation> = Vec::new();
if let Ok(db_entry) = self.get_loaded_entry(corpus_name, false) {
let lock = db_entry.read().unwrap();
if let Ok(db) = get_read_or_error(&lock) {
let node_annos: &dyn AnnotationStorage<NodeID> = db.get_node_annos();
for key in node_annos.annotation_keys() {
if list_values {
if only_most_frequent_values {
if let Some(val) =
node_annos.get_all_values(&key, true).into_iter().next()
{
result.push(Annotation {
key: key.clone(),
val: val.to_string(),
});
}
} else {
for val in node_annos.get_all_values(&key, false) {
result.push(Annotation {
key: key.clone(),
val: val.to_string(),
});
}
}
} else {
result.push(Annotation {
key: key.clone(),
val: String::default(),
});
}
}
}
}
result
}
pub fn list_edge_annotations(
&self,
corpus_name: &str,
component: &Component<AnnotationComponentType>,
list_values: bool,
only_most_frequent_values: bool,
) -> Vec<Annotation> {
let mut result: Vec<Annotation> = Vec::new();
if let Ok(db_entry) =
self.get_loaded_entry_with_components(corpus_name, vec![component.clone()])
{
let lock = db_entry.read().unwrap();
if let Ok(db) = get_read_or_error(&lock) {
if let Some(gs) = db.get_graphstorage(&component) {
let edge_annos = gs.get_anno_storage();
for key in edge_annos.annotation_keys() {
if list_values {
if only_most_frequent_values {
if let Some(val) =
edge_annos.get_all_values(&key, true).into_iter().next()
{
result.push(Annotation {
key: key.clone(),
val: val.to_string(),
});
}
} else {
for val in edge_annos.get_all_values(&key, false) {
result.push(Annotation {
key: key.clone(),
val: val.to_string(),
});
}
}
} else {
result.push(Annotation {
key: key.clone(),
val: String::new(),
});
}
}
}
}
}
result
}
fn check_cache_size_and_remove(&self, keep: Vec<&str>) {
let mut cache_lock = self.corpus_cache.write().unwrap();
let cache = &mut *cache_lock;
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, keep);
}
}
impl Drop for CorpusStorage {
fn drop(&mut self) {
let &(ref lock, ref cvar) = &*self.active_background_workers;
let mut nr_active_background_workers = lock.lock().unwrap();
while *nr_active_background_workers > 0 {
trace!(
"Waiting for background thread to finish ({} worker(s) left)...",
*nr_active_background_workers
);
nr_active_background_workers = cvar.wait(nr_active_background_workers).unwrap();
}
if let Err(e) = self.lock_file.unlock() {
warn!("Could not unlock CorpusStorage lock file: {:?}", e);
} else {
trace!("Unlocked CorpusStorage lock file");
}
}
}
fn get_read_or_error<'a>(lock: &'a RwLockReadGuard<CacheEntry>) -> Result<&'a AnnotationGraph> {
if let CacheEntry::Loaded(ref db) = &**lock {
Ok(db)
} else {
Err(GraphAnnisError::LoadingGraphFailed {
name: "".to_string(),
}
.into())
}
}
fn get_write_or_error<'a>(
lock: &'a mut RwLockWriteGuard<CacheEntry>,
) -> Result<&'a mut AnnotationGraph> {
if let CacheEntry::Loaded(ref mut db) = &mut **lock {
Ok(db)
} else {
Err(anyhow!("Could get loaded graph storage entry"))
}
}
fn check_cache_size_and_remove_with_cache(
cache: &mut LinkedHashMap<String, Arc<RwLock<CacheEntry>>>,
cache_strategy: &CacheStrategy,
keep: Vec<&str>,
) {
let mut mem_ops = MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
let keep: HashSet<&str> = keep.into_iter().collect();
let mut size_sum: usize = 0;
let mut db_sizes: LinkedHashMap<String, usize> = LinkedHashMap::new();
for (corpus, db_entry) in cache.iter() {
let lock = db_entry.read().unwrap();
if let CacheEntry::Loaded(ref db) = &*lock {
let s = db.size_of_cached(&mut mem_ops);
size_sum += s;
db_sizes.insert(corpus.clone(), s);
}
}
let max_cache_size: usize = match cache_strategy {
CacheStrategy::FixedMaxMemory(max_size) => *max_size,
CacheStrategy::PercentOfFreeMemory(max_percent) => {
if let Ok(mem) = sys_info::mem_info() {
let free_system_mem: usize = mem.avail as usize * 1024;
let available_memory: usize = free_system_mem + size_sum;
((available_memory as f64) * (max_percent / 100.0)) as usize
} else {
0
}
}
};
debug!(
"Current cache size is {:.2} MB / max {:.2} MB",
(size_sum as f64) / (1024.0 * 1024.0),
(max_cache_size as f64) / (1024.0 * 1024.0)
);
for (corpus_name, corpus_size) in db_sizes.iter() {
if size_sum > max_cache_size {
if !keep.contains(corpus_name.as_str()) {
info!("Removing corpus {} from cache", corpus_name);
cache.remove(corpus_name);
size_sum -= corpus_size;
debug!(
"Current cache size is {:.2} MB / max {:.2} MB",
(size_sum as f64) / (1024.0 * 1024.0),
(max_cache_size as f64) / (1024.0 * 1024.0)
);
}
} else {
break;
}
}
}
fn extract_subgraph_by_query(
db_entry: &Arc<RwLock<CacheEntry>>,
query: &Disjunction,
match_idx: &[usize],
query_config: &query::Config,
component_type_filter: Option<AnnotationComponentType>,
) -> Result<AnnotationGraph> {
let lock = db_entry.read().unwrap();
let orig_db = get_read_or_error(&lock)?;
let plan = ExecutionPlan::from_disjunction(&query, &orig_db, &query_config)?;
debug!("executing subgraph query\n{}", plan);
let mut match_result: BTreeSet<Match> = BTreeSet::new();
let mut result = AnnotationGraph::new(false)?;
for r in plan {
trace!("subgraph query found match {:?}", r);
for i in match_idx.iter().cloned() {
if i < r.len() {
let m: &Match = &r[i];
if !match_result.contains(m) {
match_result.insert(m.clone());
trace!("subgraph query extracted node {:?}", m.node);
create_subgraph_node(m.node, &mut result, orig_db)?;
}
}
}
}
let components = orig_db.get_all_components(component_type_filter, None);
for m in &match_result {
create_subgraph_edge(m.node, &mut result, orig_db, &components)?;
}
Ok(result)
}
fn create_subgraph_node(
id: NodeID,
db: &mut AnnotationGraph,
orig_db: &AnnotationGraph,
) -> Result<()> {
for a in orig_db.get_node_annos().get_annotations_for_item(&id) {
db.get_node_annos_mut().insert(id, a)?;
}
Ok(())
}
fn create_subgraph_edge(
source_id: NodeID,
db: &mut AnnotationGraph,
orig_db: &AnnotationGraph,
components: &[Component<AnnotationComponentType>],
) -> Result<()> {
for c in components {
let ctype = c.get_type();
if !((ctype == AnnotationComponentType::Coverage && c.layer == "annis" && c.name != "")
|| ctype == AnnotationComponentType::RightToken
|| ctype == AnnotationComponentType::LeftToken)
{
if let Some(orig_gs) = orig_db.get_graphstorage(c) {
for target in orig_gs.get_outgoing_edges(source_id) {
if !db
.get_node_annos()
.get_all_keys_for_item(&target, None, None)
.is_empty()
{
let e = Edge {
source: source_id,
target,
};
if let Ok(new_gs) = db.get_or_create_writable(&c) {
new_gs.add_edge(e.clone())?;
}
for a in orig_gs.get_anno_storage().get_annotations_for_item(&Edge {
source: source_id,
target,
}) {
if let Ok(new_gs) = db.get_or_create_writable(&c) {
new_gs.add_edge_annotation(e.clone(), a)?;
}
}
}
}
}
}
}
Ok(())
}
fn create_lockfile_for_directory(db_dir: &Path) -> Result<File> {
std::fs::create_dir_all(&db_dir)
.with_context(|| format!("Could not create directory {}", db_dir.to_string_lossy()))?;
let lock_file_path = db_dir.join("db.lock");
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(lock_file_path.as_path())
.with_context(|| {
format!(
"Could not open or create lockfile {}",
lock_file_path.to_string_lossy()
)
})?;
lock_file.try_lock_exclusive().with_context(|| {
format!(
"Could not acquire lock for directory {}",
db_dir.to_string_lossy()
)
})?;
Ok(lock_file)
}