annatomic 0.4.0

The Annatomic annotation editor is intended to be used for the [RIDGES corpus](https://www.linguistik.hu-berlin.de/en/institut-en/professuren-en/korpuslinguistik/research/ridges-projekt). It is based on [graphANNIS](https://github.com/korpling/graphANNIS) and thus is internal data model is in principle suitable for a wide range of annotation concepts. "
Documentation
use std::{collections::BTreeMap, sync::Arc, time::Instant};

use annatto::workflow::StatusMessage;
use egui::{RichText, mutex::RwLock};
use log::debug;
use uuid::Uuid;

use super::AnnatomicApp;

#[derive(Clone, Default)]
pub(crate) struct Job {
    title: String,
    msg: Arc<RwLock<Option<String>>>,
}

impl Job {
    pub(crate) fn update_message<S>(&self, message: S)
    where
        S: Into<String>,
    {
        let mut lock = self.msg.write();
        lock.replace(message.into());
    }

    pub(crate) fn update_from_annatto_status(&self, status: StatusMessage) {
        let mut lock = self.msg.write();
        let message = match status {
            StatusMessage::StepsCreated(step_ids) => {
                format!("Created {} conversion steps", step_ids.len())
            }
            StatusMessage::Info(msg) => msg,
            StatusMessage::Warning(msg) => format!("[WARNING] {msg}"),
            StatusMessage::Progress {
                id,
                total_work,
                finished_work,
            } => {
                if let Some(total_work) = total_work {
                    let percentage = (finished_work as f32 / total_work as f32) * 100.0;
                    format!("Finished {percentage:.1}% of job {id}")
                } else {
                    format!("Finished {finished_work} task in step {id}")
                }
            }
            StatusMessage::StepDone { id } => format!("Finished step {id}"),
        };
        lock.replace(message);
    }
}

type FnStateUpdate = Box<dyn FnOnce(&mut AnnatomicApp) + Send + Sync>;

#[derive(Clone)]
enum JobType {
    Foreground(Job),
    Background(Job),
}

#[derive(Default, Clone)]
pub(crate) struct JobExecutor {
    running: Arc<RwLock<BTreeMap<Uuid, JobType>>>,
    finished: Arc<RwLock<BTreeMap<Uuid, (String, FnStateUpdate)>>>,
    failed: Arc<RwLock<BTreeMap<Uuid, (String, anyhow::Error)>>>,
}

impl JobExecutor {
    /// Add a job during no UI interaction should be possible. The job is run in a
    /// different background thread so we can inform the use about the progress and
    /// the app does not freeze. But the user should not be able to make any
    /// meaningful changes.
    pub(crate) fn add_foreground_job<F, U, R>(&self, title: &str, worker: F, state_updater: U)
    where
        F: FnOnce(Job) -> anyhow::Result<R> + Send + 'static,
        U: FnOnce(R, &mut AnnatomicApp) + Send + Sync + 'static,
        R: Send + Sync + 'static,
    {
        let id = Uuid::new_v4();
        debug!("Adding foreground job \"{title}\"");
        let running_jobs = self.running.clone();
        let failed_jobs = self.failed.clone();
        let finished_jobs = self.finished.clone();

        let single_job = Job {
            title: title.to_string(),
            ..Job::default()
        };
        {
            let mut lock = running_jobs.write();
            let single_job = JobType::Foreground(single_job.clone());
            lock.insert(id, single_job);
            debug!(
                "Number of currently running foreground jobs: {}",
                lock.len()
            );
        }
        rayon::spawn_fifo(move || {
            let title = single_job.title.clone();
            debug!("Spawning foreground job \"{title}\"");
            let start_time = Instant::now();
            let result = worker(single_job);
            let execution_time = Instant::now().saturating_duration_since(start_time);
            debug!("Finished foreground job \"{title}\" ({execution_time:?})");
            match result {
                Ok(result) => {
                    let mut finished_jobs = finished_jobs.write();
                    finished_jobs
                        .insert(id, (title, Box::new(move |app| state_updater(result, app))));
                }
                Err(err) => {
                    let mut failed_jobs = failed_jobs.write();
                    failed_jobs.insert(id, (title, err));
                }
            }
            let mut jobs = running_jobs.write();
            jobs.remove(&id);
        });
    }

    pub(crate) fn add_background_job<F, U, R>(&self, title: &str, worker: F, state_updater: U)
    where
        F: FnOnce(Job) -> anyhow::Result<R> + Send + 'static,
        U: FnOnce(R, &mut AnnatomicApp) + Send + Sync + 'static,
        R: Send + Sync + 'static,
    {
        let id = Uuid::new_v4();
        debug!("Adding background job \"{title}\"");
        let running_jobs = self.running.clone();
        let finished_jobs = self.finished.clone();
        let failed_jobs = self.failed.clone();

        let single_job = Job {
            title: title.to_string(),
            ..Job::default()
        };
        {
            let mut lock = running_jobs.write();
            let single_job = JobType::Background(single_job.clone());
            lock.insert(id, single_job);
            debug!(
                "Number of currently background running jobs: {}",
                lock.len()
            );
        }
        let title = title.to_string();
        rayon::spawn_fifo(move || {
            debug!("Spawning background job \"{title}\"");
            let start_time = Instant::now();
            let result = worker(single_job);
            let execution_time = Instant::now().saturating_duration_since(start_time);
            debug!("Finished background job \"{title}\" ({execution_time:?})");
            match result {
                Ok(result) => {
                    let mut finished_jobs = finished_jobs.write();
                    finished_jobs
                        .insert(id, (title, Box::new(move |app| state_updater(result, app))));
                }
                Err(err) => {
                    let mut failed_jobs = failed_jobs.write();
                    failed_jobs.insert(id, (title, err));
                }
            }
            let mut jobs = running_jobs.write();
            jobs.remove(&id);
        });
    }

    pub(super) fn show(&self, ctx: &egui::Context, app: &mut AnnatomicApp) -> bool {
        {
            let mut failed_jobs = self.failed.write();
            while let Some((_id, (title, err))) = failed_jobs.pop_first() {
                app.notifier
                    .report_error(err.context(format!("Error in job \"{title}\"")));
            }
        }

        // Collect jobs first because executing their state update might also
        // need a lock in the jobs.
        let mut finished_jobs_state_update_fn = Vec::new();
        {
            let mut finished_jobs = self.finished.write();

            while let Some(j) = finished_jobs.pop_first() {
                finished_jobs_state_update_fn.push(j.1.1);
            }
        }
        for state_update_fn in finished_jobs_state_update_fn.into_iter() {
            state_update_fn(app);
        }

        let running_jobs = self.running.read();

        let background_jobs: Vec<_> = running_jobs
            .iter()
            .filter_map(|(_id, job)| match job {
                JobType::Background(job) => Some(job),
                _ => None,
            })
            .collect();
        let foreground_jobs: Vec<_> = running_jobs
            .iter()
            .filter_map(|(_id, job)| match job {
                JobType::Foreground(job) => Some(job),
                _ => None,
            })
            .collect();

        let has_background_jobs = !background_jobs.is_empty();
        let has_foreground_jobs = !foreground_jobs.is_empty();

        if has_background_jobs {
            egui::TopBottomPanel::bottom("background_jobs").show(ctx, |ui| {
                for job in background_jobs.iter() {
                    let msg = job.msg.read();
                    ui.horizontal(|ui| {
                        ui.spinner();
                        ui.label(&job.title);
                        if let Some(msg) = msg.as_ref() {
                            ui.label(RichText::new(msg).italics());
                        }
                    });
                }
            });
        }

        if has_foreground_jobs {
            egui::CentralPanel::default().show(ctx, |ui| {
                for job in foreground_jobs.iter() {
                    ui.horizontal(|ui| {
                        ui.spinner();
                        ui.heading(&job.title);
                    });

                    let msg = job.msg.read();
                    ui.label(
                        msg.clone()
                            .unwrap_or_else(|| "Please wait for the job to finish".into()),
                    );
                }
            });
        }

        has_foreground_jobs
    }

    pub(crate) fn has_job_with_title(&self, title: &str) -> bool {
        let running_jobs = self.running.read();
        let finished_jobs = self.finished.read();
        let failed_jobs = self.failed.read();

        let has_running = running_jobs
            .values()
            .map(|jt| match jt {
                JobType::Background(job) | JobType::Foreground(job) => job.title.as_str(),
            })
            .any(|job_title| job_title == title);
        let has_finished = finished_jobs
            .values()
            .map(|j| j.0.as_str())
            .any(|job_title| job_title == title);
        let has_failed = failed_jobs
            .values()
            .map(|j| j.0.as_str())
            .any(|job_title| job_title == title);
        has_running || has_finished || has_failed
    }

    /// Returns `true` if any jobs are currently processed. This includes failed
    /// and finished jobs.
    #[cfg(test)]
    pub(crate) fn has_active_jobs(&self) -> bool {
        let running_jobs = self.running.read();
        let finished_jobs = self.finished.read();
        let failed_jobs = self.failed.read();
        !running_jobs.is_empty() || !finished_jobs.is_empty() || !failed_jobs.is_empty()
    }
}