use std::{collections::BTreeMap, sync::Arc, time::Instant};
use annatto::workflow::StatusMessage;
use egui::{RichText, mutex::RwLock};
use log::debug;
use uuid::Uuid;
use super::AnnatomicApp;
#[derive(Clone, Default)]
pub(crate) struct Job {
title: String,
msg: Arc<RwLock<Option<String>>>,
}
impl Job {
pub(crate) fn update_message<S>(&self, message: S)
where
S: Into<String>,
{
let mut lock = self.msg.write();
lock.replace(message.into());
}
pub(crate) fn update_from_annatto_status(&self, status: StatusMessage) {
let mut lock = self.msg.write();
let message = match status {
StatusMessage::StepsCreated(step_ids) => {
format!("Created {} conversion steps", step_ids.len())
}
StatusMessage::Info(msg) => msg,
StatusMessage::Warning(msg) => format!("[WARNING] {msg}"),
StatusMessage::Progress {
id,
total_work,
finished_work,
} => {
if let Some(total_work) = total_work {
let percentage = (finished_work as f32 / total_work as f32) * 100.0;
format!("Finished {percentage:.1}% of job {id}")
} else {
format!("Finished {finished_work} task in step {id}")
}
}
StatusMessage::StepDone { id } => format!("Finished step {id}"),
};
lock.replace(message);
}
}
type FnStateUpdate = Box<dyn FnOnce(&mut AnnatomicApp) + Send + Sync>;
#[derive(Clone)]
enum JobType {
Foreground(Job),
Background(Job),
}
#[derive(Default, Clone)]
pub(crate) struct JobExecutor {
running: Arc<RwLock<BTreeMap<Uuid, JobType>>>,
finished: Arc<RwLock<BTreeMap<Uuid, (String, FnStateUpdate)>>>,
failed: Arc<RwLock<BTreeMap<Uuid, (String, anyhow::Error)>>>,
}
impl JobExecutor {
pub(crate) fn add_foreground_job<F, U, R>(&self, title: &str, worker: F, state_updater: U)
where
F: FnOnce(Job) -> anyhow::Result<R> + Send + 'static,
U: FnOnce(R, &mut AnnatomicApp) + Send + Sync + 'static,
R: Send + Sync + 'static,
{
let id = Uuid::new_v4();
debug!("Adding foreground job \"{title}\"");
let running_jobs = self.running.clone();
let failed_jobs = self.failed.clone();
let finished_jobs = self.finished.clone();
let single_job = Job {
title: title.to_string(),
..Job::default()
};
{
let mut lock = running_jobs.write();
let single_job = JobType::Foreground(single_job.clone());
lock.insert(id, single_job);
debug!(
"Number of currently running foreground jobs: {}",
lock.len()
);
}
rayon::spawn_fifo(move || {
let title = single_job.title.clone();
debug!("Spawning foreground job \"{title}\"");
let start_time = Instant::now();
let result = worker(single_job);
let execution_time = Instant::now().saturating_duration_since(start_time);
debug!("Finished foreground job \"{title}\" ({execution_time:?})");
match result {
Ok(result) => {
let mut finished_jobs = finished_jobs.write();
finished_jobs
.insert(id, (title, Box::new(move |app| state_updater(result, app))));
}
Err(err) => {
let mut failed_jobs = failed_jobs.write();
failed_jobs.insert(id, (title, err));
}
}
let mut jobs = running_jobs.write();
jobs.remove(&id);
});
}
pub(crate) fn add_background_job<F, U, R>(&self, title: &str, worker: F, state_updater: U)
where
F: FnOnce(Job) -> anyhow::Result<R> + Send + 'static,
U: FnOnce(R, &mut AnnatomicApp) + Send + Sync + 'static,
R: Send + Sync + 'static,
{
let id = Uuid::new_v4();
debug!("Adding background job \"{title}\"");
let running_jobs = self.running.clone();
let finished_jobs = self.finished.clone();
let failed_jobs = self.failed.clone();
let single_job = Job {
title: title.to_string(),
..Job::default()
};
{
let mut lock = running_jobs.write();
let single_job = JobType::Background(single_job.clone());
lock.insert(id, single_job);
debug!(
"Number of currently background running jobs: {}",
lock.len()
);
}
let title = title.to_string();
rayon::spawn_fifo(move || {
debug!("Spawning background job \"{title}\"");
let start_time = Instant::now();
let result = worker(single_job);
let execution_time = Instant::now().saturating_duration_since(start_time);
debug!("Finished background job \"{title}\" ({execution_time:?})");
match result {
Ok(result) => {
let mut finished_jobs = finished_jobs.write();
finished_jobs
.insert(id, (title, Box::new(move |app| state_updater(result, app))));
}
Err(err) => {
let mut failed_jobs = failed_jobs.write();
failed_jobs.insert(id, (title, err));
}
}
let mut jobs = running_jobs.write();
jobs.remove(&id);
});
}
pub(super) fn show(&self, ctx: &egui::Context, app: &mut AnnatomicApp) -> bool {
{
let mut failed_jobs = self.failed.write();
while let Some((_id, (title, err))) = failed_jobs.pop_first() {
app.notifier
.report_error(err.context(format!("Error in job \"{title}\"")));
}
}
let mut finished_jobs_state_update_fn = Vec::new();
{
let mut finished_jobs = self.finished.write();
while let Some(j) = finished_jobs.pop_first() {
finished_jobs_state_update_fn.push(j.1.1);
}
}
for state_update_fn in finished_jobs_state_update_fn.into_iter() {
state_update_fn(app);
}
let running_jobs = self.running.read();
let background_jobs: Vec<_> = running_jobs
.iter()
.filter_map(|(_id, job)| match job {
JobType::Background(job) => Some(job),
_ => None,
})
.collect();
let foreground_jobs: Vec<_> = running_jobs
.iter()
.filter_map(|(_id, job)| match job {
JobType::Foreground(job) => Some(job),
_ => None,
})
.collect();
let has_background_jobs = !background_jobs.is_empty();
let has_foreground_jobs = !foreground_jobs.is_empty();
if has_background_jobs {
egui::TopBottomPanel::bottom("background_jobs").show(ctx, |ui| {
for job in background_jobs.iter() {
let msg = job.msg.read();
ui.horizontal(|ui| {
ui.spinner();
ui.label(&job.title);
if let Some(msg) = msg.as_ref() {
ui.label(RichText::new(msg).italics());
}
});
}
});
}
if has_foreground_jobs {
egui::CentralPanel::default().show(ctx, |ui| {
for job in foreground_jobs.iter() {
ui.horizontal(|ui| {
ui.spinner();
ui.heading(&job.title);
});
let msg = job.msg.read();
ui.label(
msg.clone()
.unwrap_or_else(|| "Please wait for the job to finish".into()),
);
}
});
}
has_foreground_jobs
}
pub(crate) fn has_job_with_title(&self, title: &str) -> bool {
let running_jobs = self.running.read();
let finished_jobs = self.finished.read();
let failed_jobs = self.failed.read();
let has_running = running_jobs
.values()
.map(|jt| match jt {
JobType::Background(job) | JobType::Foreground(job) => job.title.as_str(),
})
.any(|job_title| job_title == title);
let has_finished = finished_jobs
.values()
.map(|j| j.0.as_str())
.any(|job_title| job_title == title);
let has_failed = failed_jobs
.values()
.map(|j| j.0.as_str())
.any(|job_title| job_title == title);
has_running || has_finished || has_failed
}
#[cfg(test)]
pub(crate) fn has_active_jobs(&self) -> bool {
let running_jobs = self.running.read();
let finished_jobs = self.finished.read();
let failed_jobs = self.failed.read();
!running_jobs.is_empty() || !finished_jobs.is_empty() || !failed_jobs.is_empty()
}
}