acts-next 0.14.4

a fast, tiny, extensiable workflow engine
Documentation
use crate::{
    data::Package,
    sch::Runtime,
    store::{Cond, Expr, StoreAdapter},
    utils::Id,
    MessageInfo, ModelInfo, PackageInfo, ProcInfo, Query, Result, TaskInfo, Workflow,
};
use std::sync::Arc;
use tracing::instrument;

#[derive(Clone)]
pub struct Manager {
    runtime: Arc<Runtime>,
}

impl Manager {
    pub(crate) fn new(rt: &Arc<Runtime>) -> Self {
        Self {
            runtime: rt.clone(),
        }
    }

    #[instrument(skip(self))]
    pub fn publish(&self, pack: &Package) -> Result<bool> {
        let ret = self.runtime.cache().store().publish(pack)?;
        Ok(ret)
    }

    pub fn resend_error_messages(&self) -> Result<()> {
        self.runtime.cache().store().resend_error_messages()?;
        Ok(())
    }

    pub fn clear_error_messages(&self) -> Result<()> {
        self.runtime.cache().store().clear_error_messages()?;
        Ok(())
    }

    #[instrument(skip(self))]
    pub fn packages(&self, limit: usize) -> Result<Vec<PackageInfo>> {
        let query = Query::new().set_limit(limit);
        match self.runtime.cache().store().packages().query(&query) {
            Ok(mut packages) => {
                packages.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
                let mut ret = Vec::new();
                for t in &packages {
                    ret.push(t.into());
                }

                Ok(ret)
            }
            Err(err) => Err(err),
        }
    }

    #[instrument(skip(self))]
    pub fn package(&self, id: &str) -> Result<PackageInfo> {
        let package = &self.runtime.cache().store().packages().find(id)?;
        Ok(package.into())
    }

    #[instrument(skip(self))]
    pub fn deploy(&self, model: &Workflow) -> Result<bool> {
        model.valid()?;
        let ret = self.runtime.cache().store().deploy(model)?;
        Ok(ret)
    }

    #[instrument(skip(self))]
    pub fn models(&self, limit: usize) -> Result<Vec<ModelInfo>> {
        let query = Query::new().set_limit(limit);
        match self.runtime.cache().store().models().query(&query) {
            Ok(models) => {
                let mut ret = Vec::new();
                for m in models {
                    ret.push(m.into());
                }

                Ok(ret)
            }
            Err(err) => Err(err),
        }
    }

    #[instrument(skip(self))]
    pub fn model(&self, id: &str, fmt: &str) -> Result<ModelInfo> {
        match self.runtime.cache().store().models().find(id) {
            Ok(m) => {
                let mut model: ModelInfo = m.into();
                if fmt == "tree" {
                    let workflow = Workflow::from_yml(&model.data)?;
                    model.data = workflow.tree_output();
                }
                Ok(model)
            }
            Err(err) => Err(err),
        }
    }

    #[instrument(skip(self))]
    pub fn rm_model(&self, id: &str) -> Result<bool> {
        self.runtime.cache().store().models().delete(id)
    }

    #[instrument(skip(self))]
    pub fn rm_package(&self, id: &str) -> Result<bool> {
        self.runtime.cache().store().packages().delete(id)
    }

    #[instrument(skip(self))]
    pub fn rm_message(&self, id: &str) -> Result<bool> {
        self.runtime.cache().store().messages().delete(id)
    }

    #[instrument(skip(self))]
    pub fn procs(&self, cap: usize) -> Result<Vec<ProcInfo>> {
        let query = Query::new().set_limit(cap);
        match self.runtime.cache().store().procs().query(&query) {
            Ok(mut procs) => {
                procs.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
                let mut ret = Vec::new();
                for t in &procs {
                    ret.push(t.into());
                }

                Ok(ret)
            }
            Err(err) => Err(err),
        }
    }

    #[instrument(skip(self))]
    pub fn proc(&self, pid: &str) -> Result<ProcInfo> {
        match self.runtime.cache().store().procs().find(pid) {
            Ok(ref proc) => {
                let mut info: ProcInfo = proc.into();

                if let Some(proc) = self.runtime.cache().proc(pid, &self.runtime) {
                    let mut tasks: Vec<TaskInfo> = Vec::new();
                    for task in proc.tasks().iter() {
                        tasks.push(task.into());
                    }

                    tasks.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
                    info.tasks = tasks;
                }

                Ok(info)
            }
            Err(err) => Err(err),
        }
    }

    #[instrument(skip(self))]
    pub fn tasks(&self, pid: &str, count: usize) -> Result<Vec<TaskInfo>> {
        let query = Query::new()
            .push(Cond::and().push(Expr::eq("pid", pid.to_string())))
            .set_limit(10000);
        match self.runtime.cache().store().tasks().query(&query) {
            Ok(mut tasks) => {
                let mut ret = Vec::new();
                tasks.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
                for t in tasks.into_iter().take(count) {
                    ret.push(t.into());
                }

                Ok(ret)
            }
            Err(err) => Err(err),
        }
    }

    #[instrument(skip(self))]
    pub fn acts(&self, pid: &str) -> Result<Vec<TaskInfo>> {
        let query = Query::new().push(
            Cond::and()
                .push(Expr::eq("pid", pid.to_string()))
                .push(Expr::eq("kind", "act")),
        );
        match self.runtime.cache().store().tasks().query(&query) {
            Ok(mut tasks) => {
                let mut ret = Vec::new();
                tasks.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
                for t in tasks {
                    ret.push(t.into());
                }

                Ok(ret)
            }
            Err(err) => Err(err),
        }
    }

    #[instrument(skip(self))]
    pub fn messages(self: &Arc<Self>, pid: &str, count: usize) -> Result<Vec<MessageInfo>> {
        let query = Query::new()
            .push(Cond::and().push(Expr::eq("pid", pid.to_string())))
            .set_limit(10000);
        match self.runtime.cache().store().messages().query(&query) {
            Ok(mut messages) => {
                let mut ret = Vec::new();
                messages.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
                for t in messages.iter().take(count) {
                    ret.push(t.into());
                }

                Ok(ret)
            }
            Err(err) => Err(err),
        }
    }

    #[instrument(skip(self))]
    pub fn task(&self, pid: &str, tid: &str) -> Result<TaskInfo> {
        let id = Id::new(pid, tid);
        match self.runtime.cache().store().tasks().find(&id.id()) {
            Ok(t) => Ok(t.into()),
            Err(err) => Err(err),
        }
    }

    #[instrument(skip(self))]
    pub fn message(self: &Arc<Self>, id: &str) -> Result<MessageInfo> {
        let message = &self.runtime.cache().store().messages().find(id)?;
        Ok(message.into())
    }
}