use crate::annis::db::aql;
use crate::annis::db::aql::conjunction::Conjunction;
use crate::annis::db::aql::disjunction::Disjunction;
use crate::annis::db::aql::operators;
use crate::annis::db::aql::operators::RangeSpec;
use crate::annis::db::exec::nodesearch::NodeSearchSpec;
use crate::annis::db::plan::ExecutionPlan;
use crate::annis::db::relannis;
use crate::annis::db::sort_matches::CollationType;
use crate::annis::db::token_helper;
use crate::annis::db::token_helper::TokenHelper;
use crate::annis::errors::*;
use crate::annis::types::CountExtra;
use crate::annis::types::{
CorpusConfiguration, FrequencyTable, FrequencyTableRow, QueryAttributeDescription,
};
use crate::annis::util::quicksort;
use crate::annis::{db, util::TimeoutCheck};
use crate::{
graph::Match,
malloc_size_of::{MallocSizeOf, MallocSizeOfOps},
AnnotationGraph,
};
use fmt::Display;
use fs2::FileExt;
use graphannis_core::annostorage::symboltable::SymbolTable;
use graphannis_core::annostorage::{match_group_resolve_symbol_ids, match_group_with_symbol_ids};
use graphannis_core::{
annostorage::{MatchGroup, ValueSearch},
graph::{
storage::GraphStatistic, update::GraphUpdate, ANNIS_NS, NODE_NAME, NODE_NAME_KEY, NODE_TYPE,
},
types::{AnnoKey, Annotation, Component, NodeID},
util::memory_estimation,
};
use itertools::Itertools;
use linked_hash_map::LinkedHashMap;
use percent_encoding::{percent_decode_str, utf8_percent_encode, AsciiSet, CONTROLS};
use rand::Rng;
use smartstring::alias::String as SmartString;
use std::collections::HashSet;
use std::fmt;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::mem::size_of;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Condvar, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::thread;
use std::{borrow::Cow, time::Duration};
use transient_btree_index::{BtreeConfig, BtreeIndex};
use rustc_hash::FxHashMap;
use rand::seq::SliceRandom;
use std::{
ffi::CString,
io::{BufReader, Write},
};
use aql::model::AnnotationComponentType;
use db::AnnotationStorage;
use self::subgraph::new_subgraph_iterator;
use super::sort_matches::SortCache;
mod subgraph;
#[cfg(test)]
mod tests;
enum CacheEntry {
Loaded(AnnotationGraph),
NotLoaded,
}
#[derive(Debug, Ord, Eq, PartialOrd, PartialEq)]
pub enum LoadStatus {
NotLoaded,
PartiallyLoaded(usize),
FullyLoaded(usize),
}
pub struct GraphStorageInfo {
pub component: Component<AnnotationComponentType>,
pub load_status: LoadStatus,
pub number_of_annotations: usize,
pub implementation: String,
pub statistics: Option<GraphStatistic>,
}
impl fmt::Display for GraphStorageInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(
f,
"Component {}: {} annnotations",
self.component, self.number_of_annotations
)?;
if let Some(stats) = &self.statistics {
writeln!(f, "Stats: {}", stats)?;
}
writeln!(f, "Implementation: {}", self.implementation)?;
match self.load_status {
LoadStatus::NotLoaded => writeln!(f, "Not Loaded")?,
LoadStatus::PartiallyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "partially loaded")?;
writeln!(
f,
"Memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
LoadStatus::FullyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "fully loaded")?;
writeln!(
f,
"Memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
};
Ok(())
}
}
pub struct CorpusInfo {
pub name: String,
pub load_status: LoadStatus,
pub node_annos_load_size: Option<usize>,
pub graphstorages: Vec<GraphStorageInfo>,
pub config: CorpusConfiguration,
}
impl fmt::Display for CorpusInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.load_status {
LoadStatus::NotLoaded => writeln!(f, "Not Loaded")?,
LoadStatus::PartiallyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "partially loaded")?;
writeln!(
f,
"Total memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
LoadStatus::FullyLoaded(memory_size) => {
writeln!(f, "Status: {:?}", "fully loaded")?;
writeln!(
f,
"Total memory: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
};
if let Some(memory_size) = self.node_annos_load_size {
writeln!(
f,
"Node Annotations: {:.2} MB",
memory_size as f64 / f64::from(1024 * 1024)
)?;
}
if !self.graphstorages.is_empty() {
writeln!(f, "------------")?;
for gs in &self.graphstorages {
write!(f, "{}", gs)?;
writeln!(f, "------------")?;
}
}
Ok(())
}
}
#[derive(Debug, PartialEq, Clone, Copy, Serialize, Deserialize)]
#[repr(C)]
pub enum ResultOrder {
Normal,
Inverted,
Randomized,
NotSorted,
}
impl Default for ResultOrder {
fn default() -> Self {
ResultOrder::Normal
}
}
struct PreparationResult {
query: Disjunction,
db_entry: Arc<RwLock<CacheEntry>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FrequencyDefEntry {
#[serde(default)]
pub ns: Option<String>,
pub name: String,
pub node_ref: String,
}
impl FromStr for FrequencyDefEntry {
type Err = GraphAnnisError;
fn from_str(s: &str) -> std::result::Result<FrequencyDefEntry, Self::Err> {
let splitted: Vec<&str> = s.splitn(2, ':').collect();
if splitted.len() != 2 {
return Err(GraphAnnisError::InvalidFrequencyDefinition);
}
let node_ref = splitted[0];
let anno_key = graphannis_core::util::split_qname(splitted[1]);
Ok(FrequencyDefEntry {
ns: anno_key.0.map(String::from),
name: String::from(anno_key.1),
node_ref: String::from(node_ref),
})
}
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum QueryLanguage {
AQL,
AQLQuirksV3,
}
impl Default for QueryLanguage {
fn default() -> Self {
QueryLanguage::AQL
}
}
#[repr(C)]
#[derive(Clone, Copy)]
pub enum ImportFormat {
RelANNIS,
GraphML,
}
#[repr(C)]
#[derive(Clone, Copy)]
pub enum ExportFormat {
GraphML,
GraphMLZip,
GraphMLDirectory,
}
#[derive(Debug, Deserialize, Clone)]
pub enum CacheStrategy {
FixedMaxMemory(usize),
PercentOfFreeMemory(f64),
}
impl Display for CacheStrategy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CacheStrategy::FixedMaxMemory(megabytes) => write!(f, "{} MB", megabytes),
CacheStrategy::PercentOfFreeMemory(percent) => write!(f, "{}%", percent),
}
}
}
impl Default for CacheStrategy {
fn default() -> Self {
CacheStrategy::PercentOfFreeMemory(25.0)
}
}
pub const PATH_ENCODE_SET: &AsciiSet = &CONTROLS
.add(b' ')
.add(b'"')
.add(b'#')
.add(b'<')
.add(b'>')
.add(b'`')
.add(b'?')
.add(b'{')
.add(b'}')
.add(b'%')
.add(b'/')
.add(b':')
.add(b'"')
.add(b'|')
.add(b'*')
.add(b'\\');
pub const NODE_NAME_ENCODE_SET: &AsciiSet = &CONTROLS
.add(b':')
.add(b'/')
.add(b' ')
.add(b'%')
.add(b'\\')
.add(b'<')
.add(b'>')
.add(b'"')
.add(b'|')
.add(b'?')
.add(b'*');
const QUIRKS_SALT_URI_ENCODE_SET: &AsciiSet = &CONTROLS
.add(b' ')
.add(b'"')
.add(b'<')
.add(b'>')
.add(b'`')
.add(b'?')
.add(b'{')
.add(b'}')
.add(b'%')
.add(b'/');
const DB_LOCK_FILE_NAME: &str = "db.lock";
#[derive(Debug, Clone)]
pub struct SearchQuery<'a, S: AsRef<str>> {
pub corpus_names: &'a [S],
pub query: &'a str,
pub query_language: QueryLanguage,
pub timeout: Option<Duration>,
}
pub struct CorpusStorage {
db_dir: PathBuf,
lock_file: File,
cache_strategy: CacheStrategy,
corpus_cache: RwLock<LinkedHashMap<String, Arc<RwLock<CacheEntry>>>>,
query_config: aql::Config,
active_background_workers: Arc<(Mutex<usize>, Condvar)>,
}
fn init_locale() {
unsafe {
let locale = CString::new("").unwrap_or_default();
libc::setlocale(libc::LC_COLLATE, locale.as_ptr());
}
}
fn new_vector_with_memory_aligned_capacity<T>(expected_len: usize) -> Vec<T> {
let page_size = page_size::get();
let expected_memory_size = std::mem::size_of::<T>() * expected_len;
let aligned_memory_size =
expected_memory_size + (page_size - (expected_memory_size % page_size));
Vec::with_capacity(aligned_memory_size / std::mem::size_of::<T>())
}
type FindIterator<'a> = Box<dyn Iterator<Item = Result<MatchGroup>> + 'a>;
impl CorpusStorage {
pub fn with_cache_strategy(
db_dir: &Path,
cache_strategy: CacheStrategy,
use_parallel_joins: bool,
) -> Result<CorpusStorage> {
init_locale();
let query_config = aql::Config { use_parallel_joins };
#[allow(clippy::mutex_atomic)]
let active_background_workers = Arc::new((Mutex::new(0), Condvar::new()));
let cs = CorpusStorage {
db_dir: PathBuf::from(db_dir),
lock_file: create_lockfile_for_directory(db_dir)?,
cache_strategy,
corpus_cache: RwLock::new(LinkedHashMap::new()),
query_config,
active_background_workers,
};
Ok(cs)
}
pub fn with_auto_cache_size(db_dir: &Path, use_parallel_joins: bool) -> Result<CorpusStorage> {
init_locale();
let query_config = aql::Config { use_parallel_joins };
let cache_strategy: CacheStrategy = CacheStrategy::PercentOfFreeMemory(25.0);
#[allow(clippy::mutex_atomic)]
let active_background_workers = Arc::new((Mutex::new(0), Condvar::new()));
let cs = CorpusStorage {
db_dir: PathBuf::from(db_dir),
lock_file: create_lockfile_for_directory(db_dir)?,
cache_strategy,
corpus_cache: RwLock::new(LinkedHashMap::new()),
query_config,
active_background_workers,
};
Ok(cs)
}
pub fn list(&self) -> Result<Vec<CorpusInfo>> {
let names: Vec<String> = self.list_from_disk().unwrap_or_default();
let mut result: Vec<CorpusInfo> = vec![];
let mut mem_ops =
MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
for n in names {
let corpus_info = self.create_corpus_info(&n, &mut mem_ops)?;
result.push(corpus_info);
}
Ok(result)
}
fn list_from_disk(&self) -> Result<Vec<String>> {
let mut corpora: Vec<String> = Vec::new();
let directories =
self.db_dir
.read_dir()
.map_err(|e| CorpusStorageError::ListingDirectories {
source: e,
path: self.db_dir.to_string_lossy().to_string(),
})?;
for c_dir in directories {
let c_dir = c_dir.map_err(|e| CorpusStorageError::DirectoryEntry {
source: e,
path: self.db_dir.to_string_lossy().to_string(),
})?;
let ftype = c_dir
.file_type()
.map_err(|e| CorpusStorageError::FileTypeDetection {
source: e,
path: self.db_dir.to_string_lossy().to_string(),
})?;
if ftype.is_dir() {
let directory_name = c_dir.file_name();
let corpus_name = directory_name.to_string_lossy();
let corpus_name = percent_decode_str(&corpus_name);
corpora.push(corpus_name.decode_utf8_lossy().to_string());
}
}
Ok(corpora)
}
fn get_corpus_config(&self, corpus_name: &str) -> Result<Option<CorpusConfiguration>> {
let corpus_config_path = self
.corpus_directory_on_disk(corpus_name)
.join("corpus-config.toml");
if corpus_config_path.is_file() {
let file_content = std::fs::read_to_string(corpus_config_path)?;
let config = toml::from_str(&file_content)?;
Ok(Some(config))
} else {
Ok(None)
}
}
fn create_corpus_info(
&self,
corpus_name: &str,
mem_ops: &mut MallocSizeOfOps,
) -> Result<CorpusInfo> {
let cache_entry = self.get_entry(corpus_name)?;
let lock = cache_entry.read()?;
let config: CorpusConfiguration = self
.get_corpus_config(corpus_name)
.map_err(|e| CorpusStorageError::LoadingCorpusConfig {
corpus: corpus_name.to_string(),
source: Box::new(e),
})?
.unwrap_or_default();
let corpus_info: CorpusInfo = match &*lock {
CacheEntry::Loaded(ref db) => {
let heap_size = db.size_of(mem_ops);
let mut load_status = LoadStatus::FullyLoaded(heap_size);
let node_annos_load_size = Some(db.get_node_annos().size_of(mem_ops));
let mut graphstorages = Vec::new();
for c in db.get_all_components(None, None) {
if let Some(gs) = db.get_graphstorage_as_ref(&c) {
graphstorages.push(GraphStorageInfo {
component: c.clone(),
load_status: LoadStatus::FullyLoaded(gs.size_of(mem_ops)),
number_of_annotations: gs.get_anno_storage().number_of_annotations()?,
implementation: gs.serialization_id().clone(),
statistics: gs.get_statistics().cloned(),
});
} else {
load_status = LoadStatus::PartiallyLoaded(heap_size);
graphstorages.push(GraphStorageInfo {
component: c.clone(),
load_status: LoadStatus::NotLoaded,
number_of_annotations: 0,
implementation: "".to_owned(),
statistics: None,
})
}
}
CorpusInfo {
name: corpus_name.to_owned(),
load_status,
graphstorages,
node_annos_load_size,
config,
}
}
&CacheEntry::NotLoaded => CorpusInfo {
name: corpus_name.to_owned(),
load_status: LoadStatus::NotLoaded,
graphstorages: vec![],
node_annos_load_size: None,
config,
},
};
Ok(corpus_info)
}
pub fn info(&self, corpus_name: &str) -> Result<CorpusInfo> {
let mut mem_ops =
MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
self.create_corpus_info(corpus_name, &mut mem_ops)
}
fn get_entry(&self, corpus_name: &str) -> Result<Arc<RwLock<CacheEntry>>> {
let corpus_name = corpus_name.to_string();
{
let cache_lock = self.corpus_cache.read()?;
let cache = &*cache_lock;
if let Some(e) = cache.get(&corpus_name) {
return Ok(e.clone());
}
}
let mut cache_lock = self.corpus_cache.write()?;
let cache = &mut *cache_lock;
let entry = cache
.entry(corpus_name)
.or_insert_with(|| Arc::new(RwLock::new(CacheEntry::NotLoaded)));
Ok(entry.clone())
}
fn load_entry_with_lock(
&self,
cache_lock: &mut RwLockWriteGuard<LinkedHashMap<String, Arc<RwLock<CacheEntry>>>>,
corpus_name: &str,
create_if_missing: bool,
) -> Result<Arc<RwLock<CacheEntry>>> {
let cache = &mut *cache_lock;
let db_path = self.corpus_directory_on_disk(corpus_name);
let create_corpus = if db_path.is_dir() {
false
} else if create_if_missing {
true
} else {
return Err(GraphAnnisError::NoSuchCorpus(corpus_name.to_string()));
};
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![], false)?;
let db = if create_corpus {
let mut db = AnnotationGraph::with_default_graphstorages(false)?;
db.persist_to(&db_path)
.map_err(|e| CorpusStorageError::CreateCorpus {
corpus: corpus_name.to_string(),
source: e,
})?;
db
} else {
let mut db = AnnotationGraph::new(false)?;
db.load_from(&db_path, false)?;
db
};
let entry = Arc::new(RwLock::new(CacheEntry::Loaded(db)));
cache.remove(corpus_name);
cache.insert(String::from(corpus_name), entry.clone());
info!("Loaded corpus {}", corpus_name,);
check_cache_size_and_remove_with_cache(
cache,
&self.cache_strategy,
vec![corpus_name],
true,
)?;
Ok(entry)
}
fn get_loaded_entry(
&self,
corpus_name: &str,
create_if_missing: bool,
) -> Result<Arc<RwLock<CacheEntry>>> {
let cache_entry = self.get_entry(corpus_name)?;
let loaded = {
let lock = cache_entry.read()?;
matches!(&*lock, CacheEntry::Loaded(_))
};
if loaded {
Ok(cache_entry)
} else {
let mut cache_lock = self.corpus_cache.write()?;
self.load_entry_with_lock(&mut cache_lock, corpus_name, create_if_missing)
}
}
fn get_loaded_entry_with_components(
&self,
corpus_name: &str,
components: Vec<Component<AnnotationComponentType>>,
) -> Result<Arc<RwLock<CacheEntry>>> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let missing_components = {
let lock = db_entry.read()?;
let db = get_read_or_error(&lock)?;
let mut missing: HashSet<_> = HashSet::new();
for c in components {
if !db.is_loaded(&c) {
missing.insert(c);
}
}
missing
};
if !missing_components.is_empty() {
let mut lock = db_entry.write()?;
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
};
Ok(db_entry)
}
fn get_fully_loaded_entry(&self, corpus_name: &str) -> Result<Arc<RwLock<CacheEntry>>> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let missing_components = {
let lock = db_entry.read()?;
let db = get_read_or_error(&lock)?;
let mut missing: HashSet<_> = HashSet::new();
for c in db.get_all_components(None, None) {
if !db.is_loaded(&c) {
missing.insert(c);
}
}
missing
};
if !missing_components.is_empty() {
let mut lock = db_entry.write()?;
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
};
Ok(db_entry)
}
pub fn import_all_from_zip<R, F>(
&self,
zip_file: R,
disk_based: bool,
overwrite_existing: bool,
progress_callback: F,
) -> Result<Vec<String>>
where
R: Read + Seek,
F: Fn(&str),
{
let tmp_dir = tempfile::tempdir()?;
debug!(
"Using temporary directory {} to extract ZIP file content.",
tmp_dir.path().to_string_lossy()
);
let mut archive = zip::ZipArchive::new(zip_file)?;
let mut relannis_files = Vec::new();
let mut graphannis_files = Vec::new();
for i in 0..archive.len() {
let mut file = archive.by_index(i)?;
if let Some(file_path) = file.enclosed_name() {
let output_path = tmp_dir.path().join(file_path);
if let Some(file_name) = output_path.file_name() {
if file_name == "corpus.annis" || file_name == "corpus.tab" {
if let Some(relannis_root) = output_path.parent() {
relannis_files.push(relannis_root.to_owned())
}
} else if let Some(ext) = output_path.extension() {
if ext.to_string_lossy().to_ascii_lowercase() == "graphml" {
graphannis_files.push(output_path.clone());
}
}
}
debug!("copying ZIP file content {}", file_path.to_string_lossy(),);
if file.is_dir() {
std::fs::create_dir_all(output_path)?;
} else if let Some(parent) = output_path.parent() {
std::fs::create_dir_all(parent)?;
let mut output_file = std::fs::File::create(&output_path)?;
std::io::copy(&mut file, &mut output_file)?;
}
}
}
let mut corpus_names = Vec::new();
for p in relannis_files {
info!("importing relANNIS corpus from {}", p.to_string_lossy());
let name = self.import_from_fs(
&p,
ImportFormat::RelANNIS,
None,
disk_based,
overwrite_existing,
&progress_callback,
)?;
corpus_names.push(name);
}
for p in graphannis_files {
info!("importing corpus from {}", p.to_string_lossy());
let name = self.import_from_fs(
&p,
ImportFormat::GraphML,
None,
disk_based,
overwrite_existing,
&progress_callback,
)?;
corpus_names.push(name);
}
debug!(
"deleting temporary directory {}",
tmp_dir.path().to_string_lossy()
);
std::fs::remove_dir_all(tmp_dir.path())?;
Ok(corpus_names)
}
pub fn import_from_fs<F>(
&self,
path: &Path,
format: ImportFormat,
corpus_name: Option<String>,
disk_based: bool,
overwrite_existing: bool,
progress_callback: F,
) -> Result<String>
where
F: Fn(&str),
{
let (orig_name, mut graph, config) = match format {
ImportFormat::RelANNIS => relannis::load(path, disk_based, |status| {
progress_callback(status);
if let Err(e) = self.check_cache_size_and_remove(vec![], false) {
error!("Could not check cache size: {}", e);
};
})?,
ImportFormat::GraphML => {
let orig_corpus_name = if let Some(file_name) = path.file_stem() {
file_name.to_string_lossy().to_string()
} else {
"UnknownCorpus".to_string()
};
let input_file = File::open(path)?;
let (g, config_str) = graphannis_core::graph::serialization::graphml::import(
input_file,
disk_based,
|status| {
progress_callback(status);
if let Err(e) = self.check_cache_size_and_remove(vec![], false) {
error!("Could not check cache size: {}", e);
};
},
)?;
let config = if let Some(config_str) = config_str {
toml::from_str(&config_str)?
} else {
CorpusConfiguration::default()
};
(orig_corpus_name.into(), g, config)
}
};
let r = graph.ensure_loaded_all();
if let Err(e) = r {
error!(
"Some error occurred when attempting to load components from disk: {:?}",
e
);
}
let corpus_name = corpus_name.unwrap_or_else(|| orig_name.into());
let db_path = self.corpus_directory_on_disk(&corpus_name);
let mut cache_lock = self.corpus_cache.write()?;
let cache = &mut *cache_lock;
check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![], false)?;
if cache.contains_key(&corpus_name) {
if overwrite_existing {
let old_entry = cache.remove(&corpus_name);
if old_entry.is_some() {
if let Err(e) = std::fs::remove_dir_all(db_path.clone()) {
error!("Error when removing existing files {}", e);
}
}
} else {
return Err(GraphAnnisError::CorpusExists(corpus_name.to_string()));
}
}
if let Err(e) = std::fs::create_dir_all(&db_path) {
error!(
"Can't create directory {}: {:?}",
db_path.to_string_lossy(),
e
);
}
info!("copying linked files for corpus {}", corpus_name);
let current_dir = PathBuf::from(".");
let files_dir = db_path.join("files");
std::fs::create_dir_all(&files_dir)?;
self.copy_linked_files_and_update_references(
path.parent().unwrap_or(¤t_dir),
&files_dir,
&mut graph,
)?;
info!("saving corpus {} to disk", corpus_name);
let save_result = graph.save_to(&db_path);
if let Err(e) = save_result {
error!(
"Can't save corpus to {}: {:?}",
db_path.to_string_lossy(),
e
);
}
let corpus_config_path = db_path.join("corpus-config.toml");
info!(
"saving corpus configuration file for corpus {} to {}",
corpus_name,
&corpus_config_path.to_string_lossy()
);
std::fs::write(corpus_config_path, toml::to_string(&config)?)?;
cache.insert(
corpus_name.clone(),
Arc::new(RwLock::new(CacheEntry::Loaded(graph))),
);
check_cache_size_and_remove_with_cache(
cache,
&self.cache_strategy,
vec![&corpus_name],
true,
)?;
Ok(corpus_name)
}
fn copy_linked_files_and_update_references(
&self,
old_base_path: &Path,
new_base_path: &Path,
graph: &mut AnnotationGraph,
) -> Result<()> {
let linked_file_key = AnnoKey {
ns: ANNIS_NS.into(),
name: "file".into(),
};
let node_annos: &mut dyn AnnotationStorage<NodeID> = graph.get_node_annos_mut();
let file_nodes: Result<Vec<_>> = node_annos
.exact_anno_search(Some(ANNIS_NS), NODE_TYPE, ValueSearch::Some("file"))
.map_ok(|m| m.node)
.map(|n| n.map_err(GraphAnnisError::from))
.collect();
for node in file_nodes? {
if let Some(original_path) = node_annos.get_value_for_item(&node, &linked_file_key)? {
let original_path = old_base_path
.canonicalize()?
.join(&PathBuf::from(original_path.as_ref()));
if original_path.is_file() {
if let Some(node_name) = node_annos.get_value_for_item(&node, &NODE_NAME_KEY)? {
let new_path = new_base_path.join(node_name.as_ref());
if let Some(parent) = new_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::copy(&original_path, &new_path)?;
let relative_path = new_path.strip_prefix(&new_base_path)?;
node_annos.insert(
node,
Annotation {
key: linked_file_key.clone(),
val: relative_path.to_string_lossy().into(),
},
)?;
}
}
}
}
Ok(())
}
fn get_linked_files<'a>(
&'a self,
corpus_name: &'a str,
graph: &'a AnnotationGraph,
) -> Result<Option<impl Iterator<Item = Result<(String, PathBuf)>> + 'a>> {
let linked_file_key = AnnoKey {
ns: ANNIS_NS.into(),
name: "file".into(),
};
let base_path = self.corpus_directory_on_disk(corpus_name).join("files");
if base_path.is_dir() {
let base_path = base_path.canonicalize()?;
let node_annos: &dyn AnnotationStorage<NodeID> = graph.get_node_annos();
let it = node_annos
.exact_anno_search(Some(ANNIS_NS), NODE_TYPE, ValueSearch::Some("file"))
.map(move |m| match m {
Ok(m) => node_annos
.get_value_for_item(&m.node, &NODE_NAME_KEY)
.map(|node_name| (m, node_name)),
Err(e) => Err(e),
})
.map(move |result| match result {
Ok((m, node_name)) => node_annos
.get_value_for_item(&m.node, &linked_file_key)
.map(|file_path_value| (node_name, file_path_value)),
Err(e) => Err(e),
})
.filter_map_ok(move |(node_name, file_path_value)| {
if let (Some(node_name), Some(file_path_value)) = (node_name, file_path_value) {
return Some((
node_name.to_string(),
base_path.join(file_path_value.to_string()),
));
}
None
})
.map(|item| item.map_err(GraphAnnisError::from));
Ok(Some(it))
} else {
Ok(None)
}
}
fn copy_linked_files_to_disk(
&self,
corpus_name: &str,
new_base_path: &Path,
graph: &AnnotationGraph,
) -> Result<()> {
if let Some(it_files) = self.get_linked_files(corpus_name, graph)? {
for file in it_files {
let (node_name, original_path) = file?;
let node_name: String = node_name;
if original_path.is_file() {
let new_path = new_base_path.join(&node_name);
if let Some(parent) = new_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::copy(&original_path, &new_path)?;
}
}
}
Ok(())
}
fn export_corpus_graphml(&self, corpus_name: &str, path: &Path) -> Result<()> {
let output_file = File::create(path)?;
let entry = self.get_loaded_entry(corpus_name, false)?;
{
let mut lock = entry.write()?;
let graph: &mut AnnotationGraph = get_write_or_error(&mut lock)?;
graph.ensure_loaded_all()?;
}
let lock = entry.read()?;
let graph: &AnnotationGraph = get_read_or_error(&lock)?;
let config_as_str = if let Some(config) = self.get_corpus_config(corpus_name)? {
Some(toml::to_string_pretty(&config)?)
} else {
None
};
let config_as_str = config_as_str.as_deref();
graphannis_core::graph::serialization::graphml::export(
graph,
config_as_str,
output_file,
|status| {
info!("{}", status);
},
)?;
if let Some(parent_dir) = path.parent() {
self.copy_linked_files_to_disk(corpus_name, parent_dir, graph)?;
}
Ok(())
}
pub fn export_to_zip<W, F>(
&self,
corpus_name: &str,
use_corpus_subdirectory: bool,
mut zip: &mut zip::ZipWriter<W>,
progress_callback: F,
) -> Result<()>
where
W: Write + Seek,
F: Fn(&str),
{
let options =
zip::write::FileOptions::default().compression_method(zip::CompressionMethod::Deflated);
let mut base_path = PathBuf::default();
if use_corpus_subdirectory {
base_path.push(corpus_name);
}
let path_in_zip = base_path.join(format!("{}.graphml", corpus_name));
zip.start_file(path_in_zip.to_string_lossy(), options)?;
let entry = self.get_loaded_entry(corpus_name, false)?;
{
let mut lock = entry.write()?;
let graph: &mut AnnotationGraph = get_write_or_error(&mut lock)?;
graph.ensure_loaded_all()?;
}
let lock = entry.read()?;
let graph: &AnnotationGraph = get_read_or_error(&lock)?;
let config_as_str = if let Some(config) = self.get_corpus_config(corpus_name)? {
Some(toml::to_string_pretty(&config)?)
} else {
None
};
let config_as_str: Option<&str> = config_as_str.as_deref();
graphannis_core::graph::serialization::graphml::export(
graph,
config_as_str,
&mut zip,
progress_callback,
)?;
if let Some(it_files) = self.get_linked_files(corpus_name.as_ref(), graph)? {
for file in it_files {
let (node_name, original_path) = file?;
let node_name: String = node_name;
zip.start_file(base_path.join(&node_name).to_string_lossy(), options)?;
let file_to_copy = File::open(original_path)?;
let mut reader = BufReader::new(file_to_copy);
std::io::copy(&mut reader, zip)?;
}
}
Ok(())
}
pub fn export_to_fs<S: AsRef<str>>(
&self,
corpora: &[S],
path: &Path,
format: ExportFormat,
) -> Result<()> {
match format {
ExportFormat::GraphML => {
if corpora.len() == 1 {
self.export_corpus_graphml(corpora[0].as_ref(), path)?;
} else {
return Err(CorpusStorageError::MultipleCorporaForSingleCorpusFormat(
corpora.len(),
)
.into());
}
}
ExportFormat::GraphMLDirectory => {
let use_corpus_subdirectory = corpora.len() > 1;
for corpus_name in corpora {
let mut path = PathBuf::from(path);
if use_corpus_subdirectory {
path.push(corpus_name.as_ref());
} else {
};
std::fs::create_dir_all(&path)?;
path.push(format!("{}.graphml", corpus_name.as_ref()));
self.export_corpus_graphml(corpus_name.as_ref(), &path)?;
}
}
ExportFormat::GraphMLZip => {
let output_file = File::create(path)?;
let mut zip = zip::ZipWriter::new(output_file);
let use_corpus_subdirectory = corpora.len() > 1;
for corpus_name in corpora {
let corpus_name: &str = corpus_name.as_ref();
self.export_to_zip(corpus_name, use_corpus_subdirectory, &mut zip, |status| {
info!("{}", status);
})?;
}
zip.finish()?;
}
}
Ok(())
}
pub fn delete(&self, corpus_name: &str) -> Result<bool> {
let db_path = self.corpus_directory_on_disk(corpus_name);
let mut cache_lock = self.corpus_cache.write()?;
let cache = &mut *cache_lock;
if let Some(db_entry) = cache.remove(corpus_name) {
let mut _lock = db_entry.write()?;
if db_path.is_dir() && db_path.exists() {
std::fs::remove_dir_all(db_path).map_err(|e| {
CorpusStorageError::RemoveFileForCorpus {
corpus: corpus_name.to_string(),
source: e,
}
})?
}
Ok(true)
} else {
Ok(false)
}
}
pub fn apply_update(&self, corpus_name: &str, update: &mut GraphUpdate) -> Result<()> {
let db_entry = self.get_loaded_entry(corpus_name, true)?;
{
let mut lock = db_entry.write()?;
let db: &mut AnnotationGraph = get_write_or_error(&mut lock)?;
db.apply_update(update, |_| {})?;
}
let active_background_workers = self.active_background_workers.clone();
{
let &(ref lock, ref _cvar) = &*active_background_workers;
let mut nr_active_background_workers = lock.lock()?;
*nr_active_background_workers += 1;
}
thread::spawn(move || {
trace!("Starting background thread to sync WAL updates");
let lock = db_entry.read().unwrap();
if let Ok(db) = get_read_or_error(&lock) {
let db: &AnnotationGraph = db;
if let Err(e) = db.background_sync_wal_updates() {
error!("Can't sync changes in background thread: {:?}", e);
} else {
trace!("Finished background thread to sync WAL updates");
}
}
let &(ref lock, ref cvar) = &*active_background_workers;
let mut nr_active_background_workers = lock.lock().unwrap();
*nr_active_background_workers -= 1;
cvar.notify_all();
});
Ok(())
}
fn prepare_query<F>(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
additional_components_callback: F,
) -> Result<PreparationResult>
where
F: FnOnce(&AnnotationGraph) -> Vec<Component<AnnotationComponentType>>,
{
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let (q, missing_components) = {
let lock = db_entry.read()?;
let db = get_read_or_error(&lock)?;
let q = match query_language {
QueryLanguage::AQL => aql::parse(query, false)?,
QueryLanguage::AQLQuirksV3 => aql::parse(query, true)?,
};
let necessary_components = q.necessary_components(db);
let mut missing: HashSet<_> = necessary_components.iter().cloned().collect();
let additional_components = additional_components_callback(db);
missing.extend(additional_components.into_iter());
for c in &necessary_components {
if db.get_graphstorage(c).is_some() {
missing.remove(c);
}
}
let missing: Vec<_> = missing.into_iter().collect();
(q, missing)
};
if !missing_components.is_empty() {
{
let mut lock = db_entry.write()?;
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
}
self.check_cache_size_and_remove(vec![corpus_name], true)?;
};
Ok(PreparationResult { query: q, db_entry })
}
pub fn preload(&self, corpus_name: &str) -> Result<()> {
{
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let mut lock = db_entry.write()?;
let db = get_write_or_error(&mut lock)?;
db.ensure_loaded_all()?;
}
self.check_cache_size_and_remove(vec![corpus_name], true)?;
Ok(())
}
pub fn unload(&self, corpus_name: &str) -> Result<()> {
let mut cache_lock = self.corpus_cache.write()?;
let cache = &mut *cache_lock;
cache.remove(corpus_name);
Ok(())
}
#[doc(hidden)]
pub fn reoptimize_implementation(&self, corpus_name: &str, disk_based: bool) -> Result<()> {
let graph_entry = self.get_loaded_entry(corpus_name, false)?;
let mut lock = graph_entry.write()?;
let graph: &mut AnnotationGraph = get_write_or_error(&mut lock)?;
graph.optimize_impl(disk_based)?;
Ok(())
}
pub fn validate_query<S: AsRef<str>>(
&self,
corpus_names: &[S],
query: &str,
query_language: QueryLanguage,
) -> Result<bool> {
let timeout = TimeoutCheck::new(None);
for cn in corpus_names {
let prep: PreparationResult =
self.prepare_query(cn.as_ref(), query, query_language, |_| vec![])?;
let lock = prep.db_entry.read()?;
let db = get_read_or_error(&lock)?;
ExecutionPlan::from_disjunction(&prep.query, db, &self.query_config, timeout)?;
}
Ok(true)
}
pub fn plan<S: AsRef<str>>(
&self,
corpus_names: &[S],
query: &str,
query_language: QueryLanguage,
) -> Result<String> {
let timeout = TimeoutCheck::new(None);
let mut all_plans = Vec::with_capacity(corpus_names.len());
for cn in corpus_names {
let prep = self.prepare_query(cn.as_ref(), query, query_language, |_| vec![])?;
let lock = prep.db_entry.read()?;
let db = get_read_or_error(&lock)?;
let plan =
ExecutionPlan::from_disjunction(&prep.query, db, &self.query_config, timeout)?;
all_plans.push(format!("{}:\n{}", cn.as_ref(), plan));
}
Ok(all_plans.join("\n"))
}
pub fn count<S: AsRef<str>>(&self, query: SearchQuery<S>) -> Result<u64> {
let timeout = TimeoutCheck::new(query.timeout);
let mut total_count: u64 = 0;
for cn in query.corpus_names {
let prep =
self.prepare_query(cn.as_ref(), query.query, query.query_language, |_| vec![])?;
let lock = prep.db_entry.read()?;
let db = get_read_or_error(&lock)?;
let plan =
ExecutionPlan::from_disjunction(&prep.query, db, &self.query_config, timeout)?;
for _ in plan {
total_count += 1;
if total_count % 1_000 == 0 {
timeout.check()?;
}
}
timeout.check()?;
}
Ok(total_count)
}
pub fn count_extra<S: AsRef<str>>(&self, query: SearchQuery<S>) -> Result<CountExtra> {
let timeout = TimeoutCheck::new(query.timeout);
let mut match_count: u64 = 0;
let mut document_count: u64 = 0;
for cn in query.corpus_names {
let prep =
self.prepare_query(cn.as_ref(), query.query, query.query_language, |_| vec![])?;
let lock = prep.db_entry.read()?;
let db: &AnnotationGraph = get_read_or_error(&lock)?;
let plan =
ExecutionPlan::from_disjunction(&prep.query, db, &self.query_config, timeout)?;
let mut known_documents: HashSet<SmartString> = HashSet::new();
for m in plan {
let m = m?;
if !m.is_empty() {
let m: &Match = &m[0];
if let Some(node_name) = db
.get_node_annos()
.get_value_for_item(&m.node, &NODE_NAME_KEY)?
{
let doc_path = if let Some((before, _)) = node_name.rsplit_once('#') {
before
} else {
&node_name
};
known_documents.insert(doc_path.into());
}
}
match_count += 1;
if match_count % 1_000 == 0 {
timeout.check()?;
}
}
document_count += known_documents.len() as u64;
timeout.check()?;
}
Ok(CountExtra {
match_count,
document_count,
})
}
fn create_find_iterator_for_query<'b>(
&'b self,
db: &'b AnnotationGraph,
query: &'b Disjunction,
offset: usize,
limit: Option<usize>,
order: ResultOrder,
quirks_mode: bool,
timeout: TimeoutCheck,
) -> Result<(FindIterator<'b>, Option<usize>)> {
let mut query_config = self.query_config.clone();
if order == ResultOrder::NotSorted {
query_config.use_parallel_joins = false;
}
let plan = ExecutionPlan::from_disjunction(query, db, &query_config, timeout)?;
let mut relannis_version_33 = false;
if quirks_mode {
let mut relannis_version_it = db.get_node_annos().exact_anno_search(
Some(ANNIS_NS),
"relannis-version",
ValueSearch::Any,
);
if let Some(m) = relannis_version_it.next() {
let m = m?;
if let Some(v) = db
.get_node_annos()
.get_value_for_item(&m.node, &m.anno_key)?
{
if v == "3.3" {
relannis_version_33 = true;
}
}
}
}
let mut expected_size: Option<usize> = None;
let base_it: FindIterator = if order == ResultOrder::NotSorted
|| (order == ResultOrder::Normal && plan.is_sorted_by_text() && !quirks_mode)
{
Box::from(plan)
} else {
let estimated_result_size = plan.estimated_output_size();
let btree_config = BtreeConfig::default()
.fixed_key_size(size_of::<usize>())
.max_value_size(512);
let mut anno_key_symbols: SymbolTable<AnnoKey> = SymbolTable::new();
let mut tmp_results: BtreeIndex<usize, Vec<(NodeID, usize)>> =
BtreeIndex::with_capacity(btree_config, estimated_result_size)?;
if order == ResultOrder::Randomized {
let mut rng = rand::thread_rng();
for mgroup in plan {
let mgroup = mgroup?;
let mut idx: usize = rng.gen();
while tmp_results.contains_key(&idx)? {
idx = rng.gen();
}
let m = match_group_with_symbol_ids(&mgroup, &mut anno_key_symbols)?;
tmp_results.insert(idx, m)?;
}
} else {
for (idx, mgroup) in plan.enumerate() {
let mgroup = mgroup?;
let m = match_group_with_symbol_ids(&mgroup, &mut anno_key_symbols)?;
tmp_results.insert(idx, m)?;
}
let token_helper = TokenHelper::new(db).ok();
let component_order = Component::new(
AnnotationComponentType::Ordering,
ANNIS_NS.into(),
"".into(),
);
let collation = if quirks_mode && !relannis_version_33 {
CollationType::Locale
} else {
CollationType::Default
};
let mut cache = SortCache::default();
let gs_order = db.get_graphstorage_as_ref(&component_order);
let order_func = |m1: &Vec<(NodeID, usize)>,
m2: &Vec<(NodeID, usize)>|
-> Result<std::cmp::Ordering> {
let m1 = match_group_resolve_symbol_ids(m1, &anno_key_symbols)?;
let m2 = match_group_resolve_symbol_ids(m2, &anno_key_symbols)?;
if order == ResultOrder::Inverted {
let result = db::sort_matches::compare_matchgroup_by_text_pos(
&m1,
&m2,
db.get_node_annos(),
token_helper.as_ref(),
gs_order,
collation,
quirks_mode,
&mut cache,
)?
.reverse();
Ok(result)
} else {
let result = db::sort_matches::compare_matchgroup_by_text_pos(
&m1,
&m2,
db.get_node_annos(),
token_helper.as_ref(),
gs_order,
collation,
quirks_mode,
&mut cache,
)?;
Ok(result)
}
};
let sort_size = if let Some(limit) = limit {
offset + limit
} else {
tmp_results.len()
};
quicksort::sort_first_n_items(&mut tmp_results, sort_size, order_func)?;
}
expected_size = Some(tmp_results.len());
let iterator = tmp_results.into_iter()?.map(move |unresolved_match_group| {
match unresolved_match_group {
Ok((_idx, unresolved_match_group)) => {
let result = match_group_resolve_symbol_ids(
&unresolved_match_group,
&anno_key_symbols,
)?;
Ok(result)
}
Err(e) => Err(e.into()),
}
});
Box::from(iterator)
};
Ok((base_it, expected_size))
}
fn find_in_single_corpus<S: AsRef<str>>(
&self,
query: &SearchQuery<S>,
corpus_name: &str,
offset: usize,
limit: Option<usize>,
order: ResultOrder,
timeout: TimeoutCheck,
) -> Result<(Vec<String>, usize)> {
let prep = self.prepare_query(corpus_name, query.query, query.query_language, |db| {
let mut additional_components = vec![Component::new(
AnnotationComponentType::Ordering,
ANNIS_NS.into(),
"".into(),
)];
if order == ResultOrder::Normal || order == ResultOrder::Inverted {
for c in token_helper::necessary_components(db) {
additional_components.push(c);
}
}
additional_components
})?;
let lock = prep.db_entry.read()?;
let db = get_read_or_error(&lock)?;
let quirks_mode = match query.query_language {
QueryLanguage::AQL => false,
QueryLanguage::AQLQuirksV3 => true,
};
let (mut base_it, expected_size) = self.create_find_iterator_for_query(
db,
&prep.query,
offset,
limit,
order,
quirks_mode,
timeout,
)?;
let mut results: Vec<String> = if let Some(expected_size) = expected_size {
new_vector_with_memory_aligned_capacity(expected_size)
} else if let Some(limit) = limit {
new_vector_with_memory_aligned_capacity(limit)
} else {
Vec::new()
};
let mut skipped = 0;
while skipped < offset && base_it.next().is_some() {
skipped += 1;
if skipped % 1_000 == 0 {
timeout.check()?;
}
}
let base_it: Box<dyn Iterator<Item = Result<MatchGroup>>> = if let Some(limit) = limit {
Box::new(base_it.take(limit))
} else {
Box::new(base_it)
};
for (match_nr, m) in base_it.enumerate() {
let m = m?;
let mut match_desc = String::new();
for (i, singlematch) in m.iter().enumerate() {
let include_in_output = prep
.query
.get_variable_by_pos(i)
.map_or(true, |var| prep.query.is_included_in_output(&var));
if include_in_output {
if i > 0 {
match_desc.push(' ');
}
let singlematch_anno_key = &singlematch.anno_key;
if singlematch_anno_key.ns != ANNIS_NS || singlematch_anno_key.name != NODE_TYPE
{
if !singlematch_anno_key.ns.is_empty() {
let encoded_anno_ns: Cow<str> =
utf8_percent_encode(&singlematch_anno_key.ns, NODE_NAME_ENCODE_SET)
.into();
match_desc.push_str(&encoded_anno_ns);
match_desc.push_str("::");
}
let encoded_anno_name: Cow<str> =
utf8_percent_encode(&singlematch_anno_key.name, NODE_NAME_ENCODE_SET)
.into();
match_desc.push_str(&encoded_anno_name);
match_desc.push_str("::");
}
if let Some(node_name) = db
.get_node_annos()
.get_value_for_item(&singlematch.node, &NODE_NAME_KEY)?
{
if quirks_mode {
let re_encoded_name = node_name
.split('/')
.map(|n| {
let decoded_name =
percent_encoding::percent_decode_str(n).decode_utf8_lossy();
let re_encoded_name: Cow<str> = utf8_percent_encode(
&decoded_name,
QUIRKS_SALT_URI_ENCODE_SET,
)
.into();
re_encoded_name.to_string()
})
.join("/");
match_desc.push_str(&re_encoded_name);
} else {
match_desc.push_str(&node_name);
}
}
}
}
results.push(match_desc);
if match_nr % 1_000 == 0 {
timeout.check()?;
}
}
Ok((results, skipped))
}
pub fn find<S: AsRef<str>>(
&self,
query: SearchQuery<S>,
offset: usize,
limit: Option<usize>,
order: ResultOrder,
) -> Result<Vec<String>> {
let timeout = TimeoutCheck::new(query.timeout);
let mut corpus_names: Vec<SmartString> = query
.corpus_names
.iter()
.map(|c| c.as_ref().into())
.collect();
match corpus_names.len() {
0 => Ok(Vec::new()),
1 => self
.find_in_single_corpus(
&query,
corpus_names[0].as_str(),
offset,
limit,
order,
timeout,
)
.map(|r| r.0),
_ => {
if order == ResultOrder::Randomized {
let mut rng = rand::thread_rng();
corpus_names.shuffle(&mut rng);
} else if order == ResultOrder::Inverted {
corpus_names.sort();
corpus_names.reverse();
} else {
corpus_names.sort();
}
let mut offset = offset;
let mut limit = limit;
let mut result = Vec::new();
for cn in corpus_names {
let (single_result, skipped) = self.find_in_single_corpus(
&query,
cn.as_ref(),
offset,
limit,
order,
timeout,
)?;
let single_result_length = single_result.len();
result.extend(single_result.into_iter());
if let Some(current_limit) = limit {
if current_limit <= single_result_length {
break;
} else {
limit = Some(current_limit - single_result_length);
}
}
if skipped < offset {
offset -= skipped;
} else {
offset = 0;
}
timeout.check()?;
}
Ok(result)
}
}
}
pub fn subgraph(
&self,
corpus_name: &str,
node_ids: Vec<String>,
ctx_left: usize,
ctx_right: usize,
segmentation: Option<String>,
) -> Result<AnnotationGraph> {
let db_entry = self.get_fully_loaded_entry(corpus_name)?;
let lock = db_entry.read()?;
let graph = get_read_or_error(&lock)?;
let it = new_subgraph_iterator(graph, node_ids, ctx_left, ctx_right, segmentation)?;
let result = subgraph::create_subgraph_for_iterator(it, &[0], graph, None)?;
Ok(result)
}
pub fn subgraph_for_query(
&self,
corpus_name: &str,
query: &str,
query_language: QueryLanguage,
component_type_filter: Option<AnnotationComponentType>,
) -> Result<AnnotationGraph> {
let prep = self.prepare_query(corpus_name, query, query_language, |g| {
g.get_all_components(component_type_filter.clone(), None)
})?;
let mut max_alt_size = 0;
for alt in &prep.query.alternatives {
max_alt_size = std::cmp::max(max_alt_size, alt.num_of_nodes());
}
let match_idx: Vec<usize> = (0..max_alt_size).collect();
extract_subgraph_by_query(
&prep.db_entry,
&prep.query,
&match_idx,
&self.query_config,
component_type_filter,
TimeoutCheck::new(None),
)
}
pub fn subcorpus_graph(
&self,
corpus_name: &str,
corpus_ids: Vec<String>,
) -> Result<AnnotationGraph> {
let db_entry = self.get_fully_loaded_entry(corpus_name)?;
let mut query = Disjunction {
alternatives: vec![],
};
for source_corpus_id in corpus_ids {
let source_corpus_id: &str = source_corpus_id
.strip_prefix("salt:/")
.unwrap_or(&source_corpus_id);
{
let mut q = Conjunction::new();
let corpus_idx = q.add_node(
NodeSearchSpec::ExactValue {
ns: Some(ANNIS_NS.to_string()),
name: NODE_NAME.to_string(),
val: Some(source_corpus_id.to_string()),
is_meta: false,
},
None,
);
let any_node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
q.add_operator(
Arc::new(operators::PartOfSubCorpusSpec {
dist: RangeSpec::Unbound,
}),
&any_node_idx,
&corpus_idx,
true,
)?;
query.alternatives.push(q);
}
{
let mut q = Conjunction::new();
let corpus_idx = q.add_node(
NodeSearchSpec::ExactValue {
ns: Some(ANNIS_NS.to_string()),
name: NODE_NAME.to_string(),
val: Some(source_corpus_id.to_string()),
is_meta: false,
},
None,
);
let any_node_idx = q.add_node(
NodeSearchSpec::ExactValue {
ns: Some(ANNIS_NS.to_string()),
name: NODE_TYPE.to_string(),
val: Some("datasource".to_string()),
is_meta: false,
},
None,
);
q.add_operator(
Arc::new(operators::PartOfSubCorpusSpec {
dist: RangeSpec::Unbound,
}),
&any_node_idx,
&corpus_idx,
true,
)?;
query.alternatives.push(q);
}
}
extract_subgraph_by_query(
&db_entry,
&query,
&[1],
&self.query_config,
None,
TimeoutCheck::new(None),
)
}
pub fn corpus_graph(&self, corpus_name: &str) -> Result<AnnotationGraph> {
let db_entry = self.get_loaded_entry(corpus_name, false)?;
let subcorpus_components = {
let lock = db_entry.read()?;
let db = get_read_or_error(&lock)?;
db.get_all_components(Some(AnnotationComponentType::PartOf), None)
};
let db_entry = self.get_loaded_entry_with_components(corpus_name, subcorpus_components)?;
let mut query = Conjunction::new();
query.add_node(
NodeSearchSpec::ExactValue {
ns: Some(ANNIS_NS.into()),
name: NODE_TYPE.into(),
val: Some("corpus".into()),
is_meta: false,
},
None,
);
extract_subgraph_by_query(
&db_entry,
&query.into_disjunction(),
&[0],
&self.query_config,
Some(AnnotationComponentType::PartOf),
TimeoutCheck::new(None),
)
}
pub fn frequency<S: AsRef<str>>(
&self,
query: SearchQuery<S>,
definition: Vec<FrequencyDefEntry>,
) -> Result<FrequencyTable<String>> {
let timeout = TimeoutCheck::new(query.timeout);
let mut tuple_frequency: FxHashMap<Vec<String>, usize> = FxHashMap::default();
for cn in query.corpus_names {
let prep =
self.prepare_query(cn.as_ref(), query.query, query.query_language, |_| vec![])?;
let lock = prep.db_entry.read()?;
let db: &AnnotationGraph = get_read_or_error(&lock)?;
let mut annokeys: Vec<(usize, Vec<AnnoKey>)> = Vec::default();
for def in definition.iter() {
if let Some(node_ref) = prep.query.get_variable_pos(&def.node_ref) {
if let Some(ns) = &def.ns {
annokeys.push((
node_ref,
vec![AnnoKey {
ns: ns.clone().into(),
name: def.name.clone().into(),
}],
));
} else {
annokeys.push((node_ref, db.get_node_annos().get_qnames(&def.name)?));
}
}
}
let plan =
ExecutionPlan::from_disjunction(&prep.query, db, &self.query_config, timeout)?;
for mgroup in plan {
let mgroup = mgroup?;
let mut tuple: Vec<String> = Vec::with_capacity(annokeys.len());
for (node_ref, anno_keys) in &annokeys {
let mut tuple_val: String = String::default();
if *node_ref < mgroup.len() {
let m: &Match = &mgroup[*node_ref];
for k in anno_keys.iter() {
if let Some(val) = db.get_node_annos().get_value_for_item(&m.node, k)? {
tuple_val = val.to_string();
}
}
}
tuple.push(tuple_val);
}
let tuple_count: &mut usize = tuple_frequency.entry(tuple).or_insert(0);
*tuple_count += 1;
if *tuple_count % 1_000 == 0 {
timeout.check()?;
}
}
}
let mut result: FrequencyTable<String> = FrequencyTable::default();
for (tuple, count) in tuple_frequency {
result.push(FrequencyTableRow {
values: tuple,
count,
});
}
result.sort_by(|a, b| a.count.cmp(&b.count).reverse());
Ok(result)
}
pub fn node_descriptions(
&self,
query: &str,
query_language: QueryLanguage,
) -> Result<Vec<QueryAttributeDescription>> {
let mut result = Vec::new();
let q: Disjunction = match query_language {
QueryLanguage::AQL => aql::parse(query, false)?,
QueryLanguage::AQLQuirksV3 => aql::parse(query, true)?,
};
for (component_nr, alt) in q.alternatives.iter().enumerate() {
for mut n in alt.get_node_descriptions() {
n.alternative = component_nr;
result.push(n);
}
}
Ok(result)
}
pub fn list_components(
&self,
corpus_name: &str,
ctype: Option<AnnotationComponentType>,
name: Option<&str>,
) -> Result<Vec<Component<AnnotationComponentType>>> {
if let Ok(db_entry) = self.get_loaded_entry(corpus_name, false) {
let lock = db_entry.read()?;
if let Ok(db) = get_read_or_error(&lock) {
return Ok(db.get_all_components(ctype, name));
}
}
Ok(vec![])
}
pub fn list_node_annotations(
&self,
corpus_name: &str,
list_values: bool,
only_most_frequent_values: bool,
) -> Result<Vec<Annotation>> {
let mut result: Vec<Annotation> = Vec::new();
if let Ok(db_entry) = self.get_loaded_entry(corpus_name, false) {
let lock = db_entry.read()?;
if let Ok(db) = get_read_or_error(&lock) {
let node_annos: &dyn AnnotationStorage<NodeID> = db.get_node_annos();
for key in node_annos.annotation_keys()? {
if list_values {
if only_most_frequent_values {
if let Some(val) =
node_annos.get_all_values(&key, true)?.into_iter().next()
{
result.push(Annotation {
key: key.clone(),
val: val.into(),
});
}
} else {
for val in node_annos.get_all_values(&key, false)? {
result.push(Annotation {
key: key.clone(),
val: val.into(),
});
}
}
} else {
result.push(Annotation {
key: key.clone(),
val: SmartString::default(),
});
}
}
}
}
Ok(result)
}
pub fn list_edge_annotations(
&self,
corpus_name: &str,
component: &Component<AnnotationComponentType>,
list_values: bool,
only_most_frequent_values: bool,
) -> Result<Vec<Annotation>> {
let mut result: Vec<Annotation> = Vec::new();
if let Ok(db_entry) =
self.get_loaded_entry_with_components(corpus_name, vec![component.clone()])
{
let lock = db_entry.read()?;
if let Ok(db) = get_read_or_error(&lock) {
if let Some(gs) = db.get_graphstorage(component) {
let edge_annos = gs.get_anno_storage();
for key in edge_annos.annotation_keys()? {
if list_values {
if only_most_frequent_values {
if let Some(val) =
edge_annos.get_all_values(&key, true)?.into_iter().next()
{
result.push(Annotation {
key: key.clone(),
val: val.into(),
});
}
} else {
for val in edge_annos.get_all_values(&key, false)? {
result.push(Annotation {
key: key.clone(),
val: val.into(),
});
}
}
} else {
result.push(Annotation {
key: key.clone(),
val: SmartString::new(),
});
}
}
}
}
}
Ok(result)
}
fn check_cache_size_and_remove(
&self,
keep: Vec<&str>,
report_cache_status: bool,
) -> Result<()> {
let mut cache_lock = self.corpus_cache.write()?;
let cache = &mut *cache_lock;
check_cache_size_and_remove_with_cache(
cache,
&self.cache_strategy,
keep,
report_cache_status,
)?;
Ok(())
}
fn corpus_directory_on_disk(&self, corpus_name: &str) -> PathBuf {
let escaped_corpus_name: Cow<str> =
utf8_percent_encode(corpus_name, PATH_ENCODE_SET).into();
let db_path: PathBuf = [self.db_dir.to_string_lossy().as_ref(), &escaped_corpus_name]
.iter()
.collect();
db_path
}
}
impl Drop for CorpusStorage {
fn drop(&mut self) {
let &(ref lock, ref cvar) = &*self.active_background_workers;
let mut nr_active_background_workers = lock.lock().unwrap();
while *nr_active_background_workers > 0 {
trace!(
"Waiting for background thread to finish ({} worker(s) left)...",
*nr_active_background_workers
);
nr_active_background_workers = cvar.wait(nr_active_background_workers).unwrap();
}
if let Err(e) = self.lock_file.unlock() {
warn!("Could not unlock CorpusStorage lock file: {:?}", e);
} else {
trace!("Unlocked CorpusStorage lock file");
let lock_file_path = self.db_dir.join(DB_LOCK_FILE_NAME);
if lock_file_path.exists() && lock_file_path.is_file() {
if let Err(e) = std::fs::remove_file(lock_file_path) {
warn!("Could not remove CorpusStorage lock file: {:?}", e);
}
}
}
}
}
fn get_read_or_error<'a>(lock: &'a RwLockReadGuard<CacheEntry>) -> Result<&'a AnnotationGraph> {
if let CacheEntry::Loaded(ref db) = &**lock {
Ok(db)
} else {
Err(GraphAnnisError::LoadingGraphFailed {
name: "".to_string(),
})
}
}
fn get_write_or_error<'a>(
lock: &'a mut RwLockWriteGuard<CacheEntry>,
) -> Result<&'a mut AnnotationGraph> {
if let CacheEntry::Loaded(ref mut db) = &mut **lock {
Ok(db)
} else {
Err(CorpusStorageError::CorpusCacheEntryNotLoaded.into())
}
}
fn get_cache_sizes(
cache: &LinkedHashMap<String, Arc<RwLock<CacheEntry>>>,
) -> Result<LinkedHashMap<String, usize>> {
let mut mem_ops = MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
let mut db_sizes: LinkedHashMap<String, usize> = LinkedHashMap::new();
for (corpus, db_entry) in cache.iter() {
let lock = db_entry.read()?;
if let CacheEntry::Loaded(ref db) = &*lock {
let s = db.size_of_cached(&mut mem_ops)?;
db_sizes.insert(corpus.clone(), s);
}
}
Ok(db_sizes)
}
fn get_max_cache_size(cache_strategy: &CacheStrategy, used_cache_size: usize) -> usize {
match cache_strategy {
CacheStrategy::FixedMaxMemory(max_size) => *max_size * 1_000_000,
CacheStrategy::PercentOfFreeMemory(max_percent) => {
if let Ok(mem) = sys_info::mem_info() {
let free_system_mem: usize = mem.avail as usize * 1024; let available_memory: usize = free_system_mem + used_cache_size;
((available_memory as f64) * (max_percent / 100.0)) as usize
} else {
0
}
}
}
}
fn check_cache_size_and_remove_with_cache(
cache: &mut LinkedHashMap<String, Arc<RwLock<CacheEntry>>>,
cache_strategy: &CacheStrategy,
keep: Vec<&str>,
report_cache_status: bool,
) -> Result<()> {
let keep: HashSet<&str> = keep.into_iter().collect();
let db_sizes = get_cache_sizes(cache)?;
let mut size_sum: usize = db_sizes.iter().map(|(_, s)| s).sum();
let max_cache_size: usize = get_max_cache_size(cache_strategy, size_sum);
debug!(
"Current cache size is {:.2} MB / max {:.2} MB",
(size_sum as f64) / 1_000_000.0,
(max_cache_size as f64) / 1_000_000.0
);
for (corpus_name, corpus_size) in db_sizes.iter() {
if size_sum > max_cache_size {
if !keep.contains(corpus_name.as_str()) {
cache.remove(corpus_name);
size_sum -= corpus_size;
debug!(
"Removing corpus {} from cache. {}",
corpus_name,
get_corpus_cache_info_as_string(cache, max_cache_size)?,
);
}
} else {
break;
}
}
if report_cache_status {
info!(
"{}",
get_corpus_cache_info_as_string(cache, max_cache_size)?
);
}
Ok(())
}
fn get_corpus_cache_info_as_string(
cache: &mut LinkedHashMap<String, Arc<RwLock<CacheEntry>>>,
max_cache_size: usize,
) -> Result<String> {
let cache_sizes = get_cache_sizes(cache)?;
let result = if cache_sizes.is_empty() {
"Corpus cache is currently empty".to_string()
} else {
let corpus_memory_as_string: Vec<String> = cache_sizes
.iter()
.map(|(corpus_name, corpus_size)| {
format!(
"{} ({:.2} MB)",
corpus_name,
(*corpus_size as f64) / 1_000_000.0
)
})
.collect();
let size_sum: usize = cache_sizes.iter().map(|(_, s)| s).sum();
format!(
"Total cache size is {:.2} MB / {:.2} MB and loaded corpora are: {}.",
(size_sum as f64) / 1_000_000.0,
(max_cache_size as f64) / 1_000_000.0,
corpus_memory_as_string.join(", ")
)
};
Ok(result)
}
fn extract_subgraph_by_query(
db_entry: &Arc<RwLock<CacheEntry>>,
query: &Disjunction,
match_idx: &[usize],
query_config: &aql::Config,
component_type_filter: Option<AnnotationComponentType>,
timeout: TimeoutCheck,
) -> Result<AnnotationGraph> {
let t_before = std::time::SystemTime::now();
let lock = db_entry.read().unwrap();
let orig_db = get_read_or_error(&lock)?;
let plan = ExecutionPlan::from_disjunction(query, orig_db, query_config, timeout)?;
debug!("executing subgraph query\n{}", plan);
let result =
subgraph::create_subgraph_for_iterator(plan, match_idx, orig_db, component_type_filter)?;
let load_time = t_before.elapsed();
if let Ok(t) = load_time {
debug! {"Extracted subgraph in {} ms", (t.as_secs() * 1000 + t.subsec_nanos() as u64 / 1_000_000)};
}
Ok(result)
}
fn create_lockfile_for_directory(db_dir: &Path) -> Result<File> {
std::fs::create_dir_all(&db_dir).map_err(|e| CorpusStorageError::LockCorpusDirectory {
path: db_dir.to_string_lossy().to_string(),
source: e,
})?;
let lock_file_path = db_dir.join(DB_LOCK_FILE_NAME);
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(lock_file_path.as_path())
.map_err(|e| CorpusStorageError::LockCorpusDirectory {
path: db_dir.to_string_lossy().to_string(),
source: e,
})?;
lock_file
.try_lock_exclusive()
.map_err(|e| CorpusStorageError::LockCorpusDirectory {
path: db_dir.to_string_lossy().to_string(),
source: e,
})?;
Ok(lock_file)
}