acts 0.17.2

a fast, lightweight, extensiable workflow engine
Documentation
use crate::{
    Engine, Event, Result,
    event::{Emitter, TaskExtra},
    scheduler::{
        Process, Task,
        queue::{Queue, Signal},
    },
};
use std::sync::{Arc, Mutex};
use tracing::debug;

#[derive(Clone)]
pub struct Scheduler {
    queue: Arc<Queue>,
    emitter: Arc<Emitter>,
    closed: Arc<Mutex<bool>>,
}

impl std::fmt::Debug for Scheduler {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Scheduler").finish()
    }
}

impl Scheduler {
    pub fn new() -> Arc<Self> {
        Arc::new(Self {
            queue: Queue::new(),
            emitter: Arc::new(Emitter::new()),
            closed: Arc::new(Mutex::new(false)),
        })
    }

    pub fn init(self: &Arc<Self>, _engine: &Engine) {
        debug!("scheduler::init");
    }

    pub fn push(&self, task: &Arc<Task>) {
        debug!("scheduler::push  task={:?}", task);
        self.queue.send(&Signal::Task(task.clone()));
    }

    pub async fn next(self: &Arc<Self>) -> bool {
        if let Some(signal) = self.queue.next().await {
            debug!("next: {:?}", signal);
            match signal {
                Signal::Task(task) => {
                    let ctx = &task.create_context();
                    task.exec(ctx).unwrap_or_else(|err| {
                        eprintln!("error: {err}");
                        task.set_err(&err.into());
                        let _ = ctx.emit_error();
                    });
                }
                Signal::Terminal => {
                    *self.closed.lock().unwrap() = true;
                    return false;
                }
            }
        }

        true
    }

    pub fn close(&self) {
        debug!("scheduler::close");
        self.queue.terminate();
    }

    pub fn is_closed(&self) -> bool {
        *self.closed.lock().unwrap()
    }

    pub fn on_proc(&self, f: impl Fn(&Event<Arc<Process>>) + Send + Sync + 'static) {
        self.emitter.on_proc(f)
    }

    pub fn on_task(&self, f: impl Fn(&Event<Arc<Task>, TaskExtra>) + Send + Sync + 'static) {
        self.emitter.on_task(f)
    }

    pub fn emit_proc_event(&self, proc: &Arc<Process>) {
        self.emitter.emit_proc_event(proc)
    }

    pub fn emit_task_event(&self, task: &Arc<Task>) -> Result<()> {
        self.emitter.emit_task_event(task)
    }
}