use std::{
collections::{BTreeMap, HashSet, VecDeque},
path::PathBuf,
};
use annatto::workflow::{LoadGraph, SaveGraph};
use anyhow::{Context, Result, anyhow};
use cache::CorpusCache;
use egui_notify::Toast;
use graphannis::{
AnnotationGraph,
graph::{AnnoKey, Annotation},
model::{AnnotationComponent, AnnotationComponentType},
update::{GraphUpdate, UpdateEvent},
};
use graphannis_core::{
annostorage::ValueSearch,
errors::GraphAnnisCoreError,
graph::{ANNIS_NS, NODE_NAME, NODE_NAME_KEY, NODE_TYPE},
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{
AnnatomicApp,
app::{MainView, views::Editor},
};
use super::job_executor::JobExecutor;
use super::{APP_ID, Notifier};
use egui::mutex::RwLock;
use std::sync::Arc;
mod cache;
#[cfg(test)]
mod tests;
const MAX_UNDO_HISTORY_SIZE: usize = 25;
#[derive(Serialize, Deserialize, Clone, PartialEq)]
pub(crate) struct Corpus {
pub(crate) name: String,
pub(crate) location: PathBuf,
}
impl Corpus {
pub(crate) fn new<S, P>(name: S, location: P) -> Self
where
S: Into<String>,
P: Into<PathBuf>,
{
Self {
name: name.into(),
location: location.into(),
}
}
}
pub(crate) type EditorStateUpdateFn = Box<dyn FnOnce(&mut AnnatomicApp) + Send + Sync>;
#[derive(Default)]
pub(crate) struct ChangeSet {
pub(crate) updates: Vec<UpdateEvent>,
pub(crate) editor_state_updates: EditorStateUpdates,
pub(crate) reverse_updates: Vec<UpdateEvent>,
}
#[derive(Serialize, Deserialize)]
pub(crate) struct Project {
pub(crate) selected_corpus: Option<Corpus>,
pub(crate) scheduled_for_deletion: Option<String>,
pub(crate) corpus_locations: BTreeMap<String, PathBuf>,
#[serde(skip)]
pub(super) corpus_cache: CorpusCache,
#[serde(skip)]
notifier: Notifier,
#[serde(skip)]
jobs: JobExecutor,
#[serde(skip)]
undo_changesets: VecDeque<ChangeSet>,
#[serde(skip)]
redo_changesets: VecDeque<ChangeSet>,
needs_persisting: bool,
#[serde(skip)]
corpus_storage_dir: Option<PathBuf>,
}
#[derive(Default)]
pub(crate) struct EditorStateUpdates {
pub(crate) before: Option<EditorStateUpdateFn>,
pub(crate) after: Option<EditorStateUpdateFn>,
}
impl From<(Vec<UpdateEvent>, Vec<UpdateEvent>)> for ChangeSet {
fn from(value: (Vec<UpdateEvent>, Vec<UpdateEvent>)) -> Self {
Self {
updates: value.0,
reverse_updates: value.1,
editor_state_updates: EditorStateUpdates::default(),
}
}
}
impl EditorStateUpdates {
pub(crate) fn set_before<F: 'static + FnOnce(&mut T) + Send + Sync, T: 'static + Editor>(
&mut self,
update_fn: F,
) {
self.before = Some(Box::new(|app| {
if let Some(editor) = app.current_typed_editor::<T>() {
update_fn(editor)
}
}));
}
pub(crate) fn set_after<F: 'static + FnOnce(&mut T) + Send + Sync, T: 'static + Editor>(
&mut self,
update_fn: F,
) {
self.after = Some(Box::new(|app| {
if let Some(editor) = app.current_typed_editor::<T>() {
update_fn(editor)
}
}));
}
}
impl Project {
pub(crate) fn new(notifier: Notifier, jobs: JobExecutor) -> Self {
Self {
selected_corpus: None,
corpus_cache: CorpusCache::default(),
scheduled_for_deletion: None,
corpus_locations: BTreeMap::new(),
notifier,
jobs,
undo_changesets: VecDeque::default(),
redo_changesets: VecDeque::default(),
needs_persisting: false,
corpus_storage_dir: None,
}
}
pub(crate) fn corpus_storage_dir(&mut self) -> Result<PathBuf> {
if let Some(cached) = &self.corpus_storage_dir {
Ok(cached.clone())
} else {
let result = eframe::storage_dir(APP_ID)
.context("Unable to get local file storage path")
.map(|p| p.join("corpora"))?;
self.corpus_storage_dir = Some(result.clone());
Ok(result)
}
}
pub(crate) fn delete_corpus(&mut self, corpus_name: String) {
self.scheduled_for_deletion = None;
if let Some(location) = self.corpus_locations.remove(&corpus_name) {
let title = format!(
"Deleting corpus \"{corpus_name}\" from {}",
location.to_string_lossy()
);
self.jobs.add_foreground_job(
&title,
move |_job| {
std::fs::remove_dir_all(location)?;
Ok(())
},
|_result, app| {
app.project.select_corpus(None);
app.reset_editor();
},
);
}
}
pub(super) fn select_corpus(&mut self, selection: Option<String>) {
if let Some(selected_corpus) = &self.selected_corpus
&& Some(&selected_corpus.name) == selection.as_ref()
{
return;
}
let job_title = if let Some(corpus) = &selection {
format!("Selecting corpus {corpus}")
} else {
"Unselecting corpus".to_string()
};
let currently_selected_corpus = self.selected_corpus.clone();
let corpus_cache = self.corpus_cache.clone();
let needs_persisting = self.needs_persisting;
self.jobs.add_foreground_job(
&job_title,
move |_job| {
if needs_persisting
&& let Some(currently_selected_corpus) = currently_selected_corpus
{
let graph = corpus_cache.get(¤tly_selected_corpus.location)?;
let mut graph = graph.write();
graph.persist_to(¤tly_selected_corpus.location)?;
}
Ok(())
},
move |_result, app| {
app.project.needs_persisting = false;
app.project.selected_corpus = None;
app.project.undo_changesets.clear();
app.project.redo_changesets.clear();
if let Some(name) = selection {
if let Some(location) = app.project.corpus_locations.get(&name) {
let new_selection = Corpus::new(name, location);
app.project.selected_corpus = Some(new_selection);
} else {
app.project
.notifier
.add_toast(Toast::error(format!("Missing location for corpus {name}")));
}
}
},
);
}
pub(crate) fn new_empty_corpus(&mut self, name: &str) -> Result<()> {
let id = Uuid::new_v4();
let location = self.corpus_storage_dir()?.join(id.to_string());
let mut graph = AnnotationGraph::with_default_graphstorages(false)?;
let root_node_id = 0;
let type_anno = Annotation {
key: AnnoKey {
ns: ANNIS_NS.into(),
name: NODE_TYPE.into(),
},
val: "corpus".into(),
};
let name_anno = Annotation {
key: AnnoKey {
ns: ANNIS_NS.into(),
name: NODE_NAME.into(),
},
val: name.into(),
};
graph.get_node_annos_mut().insert(root_node_id, type_anno)?;
graph.get_node_annos_mut().insert(root_node_id, name_anno)?;
graph.persist_to(&location)?;
self.needs_persisting = false;
self.corpus_locations.insert(name.to_string(), location);
Ok(())
}
pub(crate) fn persist_changes_on_exit(&mut self) -> Result<()> {
if let Some(selected_corpus) = self.selected_corpus.clone() {
let corpus_cache = self.corpus_cache.clone();
let graph = corpus_cache.get(&selected_corpus.location)?;
let mut graph = graph.write();
graph.persist_to(&selected_corpus.location)?;
self.needs_persisting = false;
}
Ok(())
}
pub(crate) fn import(
&mut self,
import_step: annatto::ImporterStep,
overwrite_corpus_name: Option<String>,
) -> Result<()> {
let id = Uuid::new_v4();
let location = self.corpus_storage_dir()?.join(id.to_string());
let parent_dir = location
.parent()
.context("Corpus storage location has no parent directory")?;
let corpus_directory_name = location
.file_name()
.context("Corpus storage location is empty")?
.to_str()
.context("Invalid characters in corpus storage path")?;
let save_step = SaveGraph::new(parent_dir, None, Some(corpus_directory_name));
let workflow = annatto::workflow::Workflow::default()
.with_importer_steps(vec![import_step])
.with_save(save_step);
self.jobs.add_foreground_job(
"Importing corpus",
move |job| {
job.update_message("Create empty corpus directory");
let mut graph = AnnotationGraph::with_default_graphstorages(false)?;
graph.persist_to(&location)?;
job.update_message("Running conversion");
let (tx, rx) = std::sync::mpsc::channel();
let location_for_job = location.clone();
let job_result =
std::thread::spawn(move || workflow.execute(Some(tx), &location_for_job, true));
for status_message in rx {
job.update_from_annatto_status(status_message);
}
let job_result = job_result
.join()
.map_err(|_err| anyhow!("Could not join background thread."))?;
job_result?;
let mut new_corpus_name = if let Some(n) = &overwrite_corpus_name {
n.clone()
} else {
"imported-corpus".to_string()
};
if overwrite_corpus_name.is_none() {
job.update_message("Retrieving corpus name");
graph.import(&location)?;
let part_of_component = AnnotationComponent::new(
AnnotationComponentType::PartOf,
ANNIS_NS.into(),
"".into(),
);
graph.ensure_loaded(&part_of_component)?;
let gs = graph
.get_graphstorage(&part_of_component)
.context("Missing PartOf component")?;
for m in graph.get_node_annos().exact_anno_search(
Some(ANNIS_NS),
NODE_TYPE,
ValueSearch::Some("corpus"),
) {
let matched_node = m?.node;
if !gs.has_outgoing_edges(matched_node)?
&& let Some(node_name) = graph
.get_node_annos()
.get_value_for_item(&matched_node, &NODE_NAME_KEY)?
{
new_corpus_name = node_name.to_string();
}
}
}
Ok((location, new_corpus_name))
},
|(location, new_corpus_name), app| {
app.project
.corpus_locations
.insert(new_corpus_name, location);
app.main_view = MainView::Start;
},
);
Ok(())
}
pub(crate) fn export_selected(&mut self, export_step: annatto::ExporterStep) -> Result<()> {
let currently_selected_corpus =
self.selected_corpus.clone().context("No corpus selected")?;
let location = currently_selected_corpus.location.clone();
let parent_dir = location
.parent()
.context("Corpus directory has no parent directory")?;
let id = location
.file_name()
.context("Corpus path is empty")?
.to_str()
.context("Invalid characters in corpus path")?;
let load_step = LoadGraph::new(parent_dir, id.to_string(), None);
let workflow = annatto::workflow::Workflow::default()
.with_exporter_steps(vec![export_step])
.with_load(load_step);
let needs_persisting = self.needs_persisting;
let corpus_cache = self.corpus_cache.clone();
self.jobs.add_foreground_job(
"Export corpus",
move |job| {
if needs_persisting {
job.update_message("Persist pending changes");
let graph = corpus_cache.get(¤tly_selected_corpus.location)?;
let mut graph = graph.write();
graph.persist_to(¤tly_selected_corpus.location)?;
}
job.update_message("Running conversion");
let (tx, rx) = std::sync::mpsc::channel();
let location_for_job = location.clone();
let job_result =
std::thread::spawn(move || workflow.execute(Some(tx), &location_for_job, true));
for status_message in rx {
job.update_from_annatto_status(status_message);
}
let job_result = job_result
.join()
.map_err(|_err| anyhow!("Could not join background thread."))?;
job_result?;
Ok(())
},
|_, app| {
app.main_view = MainView::Start;
app.notifier.add_toast(Toast::info("Exported corpus"));
},
);
Ok(())
}
pub(crate) fn can_undo(&self) -> bool {
!self.undo_changesets.is_empty()
}
pub(crate) fn undo(&mut self) {
if let Some(selected_corpus) = &self.selected_corpus
&& let Some(changeset) = self.undo_changesets.pop_front()
{
let corpus_cache = self.corpus_cache.clone();
let lock = corpus_cache.get(&selected_corpus.location);
let mut updates = GraphUpdate::new();
let mut results = Vec::new();
for event in changeset.reverse_updates.iter() {
results.push(updates.add_event(event.clone()));
}
self.jobs.add_foreground_job(
"Undoing changes",
move |j| {
let results: Result<Vec<_>, GraphAnnisCoreError> =
results.into_iter().collect();
results?;
let lock = lock?;
{
let mut graph = lock.write();
j.update_message("Applying updates");
graph.apply_update_keep_statistics(&mut updates, |msg| {
j.update_message(format!("Applying updates: {msg}"));
})?;
}
Ok(())
},
move |_, app| {
app.project.needs_persisting = true;
app.reset_editor();
app.project.redo_changesets.push_front(changeset);
},
);
}
}
pub(crate) fn can_redo(&self) -> bool {
!self.redo_changesets.is_empty()
}
pub(crate) fn redo(&mut self) {
if let Some(selected_corpus) = &self.selected_corpus
&& let Some(changeset) = self.redo_changesets.pop_front()
{
let corpus_cache = self.corpus_cache.clone();
let lock = corpus_cache.get(&selected_corpus.location);
let mut updates = GraphUpdate::new();
let mut results = Vec::new();
for event in changeset.updates.iter() {
results.push(updates.add_event(event.clone()));
}
self.jobs.add_foreground_job(
"Undoing changes",
move |j| {
let results: Result<Vec<_>, GraphAnnisCoreError> =
results.into_iter().collect();
results?;
let lock = lock?;
{
let mut graph = lock.write();
j.update_message("Applying updates");
graph.apply_update_keep_statistics(&mut updates, |msg| {
j.update_message(format!("Applying updates: {msg}"));
})?;
}
Ok(())
},
|_, app| {
app.reset_editor();
app.project.add_checkpoint_internal(changeset, false);
},
);
}
}
pub(super) fn add_checkpoint(&mut self, changeset: ChangeSet) {
self.add_checkpoint_internal(changeset, true);
}
fn add_checkpoint_internal(&mut self, changeset: ChangeSet, is_manual: bool) {
self.needs_persisting = true;
self.undo_changesets.push_front(changeset);
self.undo_changesets.truncate(MAX_UNDO_HISTORY_SIZE);
if is_manual {
self.redo_changesets.clear();
}
}
pub(crate) fn load_after_init(&mut self, notifier: Notifier, jobs: JobExecutor) -> Result<()> {
self.notifier = notifier;
self.jobs = jobs;
self.corpus_storage_dir()?;
Ok(())
}
pub(crate) fn cleanup_unused_corpus_files_in_background(&mut self) -> anyhow::Result<()> {
let corpus_dir = self.corpus_storage_dir()?;
let used_paths: HashSet<_> = self.corpus_locations.values().cloned().collect();
self.jobs.add_background_job(
"Cleaning up unused corpus files",
move |job| {
if corpus_dir.is_dir() {
for entry in std::fs::read_dir(corpus_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() & !used_paths.contains(&path) {
job.update_message(format!("Removing {}", path.to_string_lossy()));
std::fs::remove_dir_all(path)?;
}
}
}
Ok(())
},
|_, _| {},
);
Ok(())
}
pub(crate) fn get_selected_graph(&self) -> Result<Option<Arc<RwLock<AnnotationGraph>>>> {
if let Some(corpus) = &self.selected_corpus {
let graph = self.corpus_cache.get(&corpus.location)?;
Ok(Some(graph))
} else {
Ok(None)
}
}
}