use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::sync::Mutex;
use std::time::{Duration, SystemTime};
use tauri::plugin::{Builder, TauriPlugin};
use tauri::{AppHandle, Emitter, Manager, Runtime as TauriRuntime};
use tokio::task::JoinHandle;
use crate::command::Command;
use crate::event::{Event, Stream};
use crate::interpreter::Interpreter;
use crate::job::{Job, JobId};
use crate::outcome::{FailureReason, Outcome};
use crate::progress::Progress;
use crate::runtime::{Error, EventStream, JobHandle, Runtime};
type EventObserver<R> = Box<dyn Fn(&AppHandle<R>, &Event) + Send + 'static>;
type CreepFn = Box<dyn Fn(&str) -> Option<(f32, f32)> + Send + Sync + 'static>;
struct CreepState {
cap: f32,
cur: f32,
done: bool,
}
impl CreepState {
fn new(start: f32, end: f32) -> Self {
Self {
cap: start + 0.95 * (end - start),
cur: start,
done: false,
}
}
fn tick(&mut self) -> Option<f32> {
if self.done {
return None;
}
let cur = self.cur;
if cur + 0.0001 >= self.cap {
self.done = true;
return None;
}
let new = (cur + (self.cap - cur) * 0.02).min(self.cap);
let changed = (new * 100.0).round() as i32 != (cur * 100.0).round() as i32;
self.cur = new;
changed.then_some(new)
}
}
pub fn init<R: TauriRuntime>() -> TauriPlugin<R> {
init_with(Runtime::new())
}
pub fn init_with<R: TauriRuntime>(rt: Runtime) -> TauriPlugin<R> {
let slot = Mutex::new(Some(rt));
Builder::new("execra")
.setup(move |app, _api| {
let rt = slot
.lock()
.unwrap()
.take()
.expect("execra plugin setup called twice");
app.manage(rt);
Ok(())
})
.build()
}
pub trait ExecraExt<R: TauriRuntime> {
fn execra(&self) -> RuntimeRef<R>;
}
impl<R: TauriRuntime, M: Manager<R>> ExecraExt<R> for M {
fn execra(&self) -> RuntimeRef<R> {
let rt = self.state::<Runtime>().inner().clone();
RuntimeRef {
rt,
app: self.app_handle().clone(),
}
}
}
#[derive(Clone)]
pub struct RuntimeRef<R: TauriRuntime> {
rt: Runtime,
app: AppHandle<R>,
}
impl<R: TauriRuntime> RuntimeRef<R> {
pub fn task(&self, cmd: Command) -> TaskBuilder<R> {
TaskBuilder {
app: self.app.clone(),
rt: self.rt.clone(),
cmd,
channel: None,
observers: Vec::new(),
tags: Vec::new(),
creep: None,
}
}
pub fn cancel(&self, id: JobId) -> Result<(), Error> {
self.rt.cancel(id)
}
pub fn recent(&self, n: usize) -> Vec<Job> {
self.rt.recent(n)
}
pub fn running(&self) -> Vec<JobId> {
self.rt.running()
}
pub fn subscribe(&self) -> EventStream {
self.rt.subscribe()
}
pub fn subscribe_job(&self, id: JobId) -> EventStream {
self.rt.subscribe_job(id)
}
pub fn runtime(&self) -> &Runtime {
&self.rt
}
}
pub struct TaskBuilder<R: TauriRuntime> {
app: AppHandle<R>,
rt: Runtime,
cmd: Command,
channel: Option<String>,
observers: Vec<EventObserver<R>>,
tags: Vec<String>,
creep: Option<CreepFn>,
}
impl<R: TauriRuntime> TaskBuilder<R> {
pub fn channel(mut self, name: impl Into<String>) -> Self {
self.channel = Some(name.into());
self
}
pub fn observe<F>(mut self, f: F) -> Self
where
F: Fn(&AppHandle<R>, &Event) + Send + 'static,
{
self.observers.push(Box::new(f));
self
}
pub fn on_created<F>(self, f: F) -> Self
where
F: Fn(&AppHandle<R>, JobId) + Send + 'static,
{
self.observe(move |app, event| {
if let Event::JobCreated { job, .. } = event {
f(app, *job);
}
})
}
pub fn on_output<F>(self, f: F) -> Self
where
F: Fn(&AppHandle<R>, Stream, &str) + Send + 'static,
{
self.observe(move |app, event| {
if let Event::OutputAppended { stream, line, .. } = event {
f(app, *stream, line);
}
})
}
pub fn on_interpreter_error<F>(self, f: F) -> Self
where
F: Fn(&AppHandle<R>, &str, &str, Option<&str>) + Send + 'static,
{
self.observe(move |app, event| {
if let Event::InterpreterError {
interpreter,
error,
line,
..
} = event
{
f(app, interpreter, error, line.as_deref());
}
})
}
pub fn on_finalized<F>(self, f: F) -> Self
where
F: Fn(&AppHandle<R>, &Outcome) + Send + 'static,
{
self.observe(move |app, event| {
if let Event::Finalized { outcome, .. } = event {
f(app, outcome);
}
})
}
pub fn label(mut self, text: impl Into<String>) -> Self {
self.cmd = self.cmd.label(text);
self
}
pub fn tag(mut self, tag: impl Into<String>) -> Self {
self.tags.push(tag.into());
self
}
pub fn interpreter<I: Interpreter + Send + 'static>(mut self, i: I) -> Self {
self.cmd = self.cmd.interpreter(i);
self
}
pub fn creep<F>(mut self, f: F) -> Self
where
F: Fn(&str) -> Option<(f32, f32)> + Send + Sync + 'static,
{
self.creep = Some(Box::new(f));
self
}
#[allow(clippy::type_complexity)]
fn finalize_cmd(
self,
) -> (
AppHandle<R>,
Runtime,
Command,
Option<String>,
Vec<EventObserver<R>>,
Option<CreepFn>,
) {
let cmd = if self.tags.is_empty() {
self.cmd
} else {
self.cmd.tags(self.tags)
};
(
self.app,
self.rt,
cmd,
self.channel,
self.observers,
self.creep,
)
}
fn spawn_with_forwarding(self) -> Result<(JobHandle, Option<JoinHandle<()>>), Error> {
let (app, rt, cmd, channel, observers, creep) = self.finalize_cmd();
let mut handle = rt.spawn(cmd)?;
let forwarder = if channel.is_some() || !observers.is_empty() || creep.is_some() {
Some(forward_events(
app,
handle.id(),
handle.subscribe(),
channel,
observers,
creep,
))
} else {
None
};
Ok((handle, forwarder))
}
pub fn spawn(self) -> Result<JobHandle, Error> {
let (handle, _forwarder) = self.spawn_with_forwarding()?;
Ok(handle)
}
pub fn spawn_tracked(self) -> Result<JobId, Error> {
let handle = self.spawn()?;
Ok(handle.id())
}
}
impl<R: TauriRuntime> IntoFuture for TaskBuilder<R> {
type Output = Outcome;
type IntoFuture = Pin<Box<dyn Future<Output = Outcome> + Send>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
match self.spawn_with_forwarding() {
Ok((handle, forwarder)) => {
let outcome = handle.await;
if let Some(forwarder) = forwarder {
let _ = forwarder.await;
}
outcome
}
Err(e) => Outcome::Failed {
reason: FailureReason::SpawnFailed {
error: e.to_string(),
},
summary: None,
findings: vec![],
},
}
})
}
}
fn forward_events<R: TauriRuntime>(
app: AppHandle<R>,
job_id: JobId,
mut stream: EventStream,
channel: Option<String>,
observers: Vec<EventObserver<R>>,
creep: Option<CreepFn>,
) -> JoinHandle<()> {
tokio::spawn(async move {
fn deliver<R: TauriRuntime>(
app: &AppHandle<R>,
observers: &[EventObserver<R>],
channel: &Option<String>,
event: &Event,
) {
for observer in observers {
observer(app, event);
}
if let Some(channel) = channel {
let _ = app.emit(channel, event);
}
}
let Some(creep) = creep else {
while let Some(event) = stream.next().await {
let stop = matches!(event, Event::Finalized { .. });
deliver(&app, &observers, &channel, &event);
if stop {
break;
}
}
return;
};
let mut state: Option<CreepState> = None;
let mut ticker = tokio::time::interval(Duration::from_millis(500));
ticker.tick().await;
loop {
tokio::select! {
biased;
next = stream.next() => {
let Some(event) = next else { break };
let stop = matches!(event, Event::Finalized { .. });
match &event {
Event::PhaseEntered { name, .. } => {
state = creep(name)
.filter(|(s, e)| e > s)
.map(|(s, e)| CreepState::new(s, e));
}
Event::PhaseExited { .. }
| Event::Finalized { .. }
| Event::Cancelled { .. } => state = None,
_ => {}
}
deliver(&app, &observers, &channel, &event);
if stop {
break;
}
}
_ = ticker.tick(), if state.is_some() => {
let cs = state.as_mut().expect("guarded by state.is_some()");
match cs.tick() {
Some(frac) => deliver(&app, &observers, &channel, &Event::ProgressUpdated {
job: job_id,
progress: Progress::fraction(frac),
at: SystemTime::now(),
}),
None if cs.done => state = None,
None => {}
}
}
}
}
})
}