use crate::{
debug,
env::VirtualMachine,
sch::{
event::{EventAction, EventData, Message},
proc::{
tree,
tree::{Node, Tree},
},
ActState, ActTask, Context, Scheduler, Task, TaskState,
},
utils, ShareLock, Vars, Workflow,
};
use std::{
collections::HashMap,
sync::{Arc, Mutex, RwLock},
};
#[derive(Clone)]
pub struct Proc {
pub(in crate::sch) vm: Arc<VirtualMachine>,
pub(in crate::sch) scher: Arc<Scheduler>,
pub(in crate::sch) tree: Arc<Tree<Task>>,
pid: String,
workflow: Arc<Workflow>,
state: ShareLock<TaskState>,
ctx: ShareLock<Option<Context>>,
messages: ShareLock<HashMap<String, Message>>,
sync: Arc<Mutex<i32>>,
}
impl Proc {
pub fn new(scher: Arc<Scheduler>, workflow: &Workflow, state: &TaskState) -> Self {
let mut pid = workflow.biz_id();
if pid.is_empty() {
pid = utils::longid();
}
let vm = scher.env().vm();
let vars = utils::fill_vars(&vm, &workflow.env);
Proc::new_raw(scher, workflow, &pid, state, &vars)
}
pub fn new_raw(
scher: Arc<Scheduler>,
workflow: &Workflow,
pid: &str,
state: &TaskState,
vars: &Vars,
) -> Self {
let vm = scher.env().vm();
vm.append(vars.clone());
let mut workflow = workflow.clone();
let tr = tree::from(&mut workflow);
Proc {
pid: pid.to_string(),
vm: Arc::new(vm),
scher,
workflow: Arc::new(workflow.clone()),
tree: tr,
state: Arc::new(RwLock::new(state.clone())),
messages: Arc::new(RwLock::new(HashMap::new())),
ctx: Arc::new(RwLock::new(None)),
sync: Arc::new(Mutex::new(0)),
}
}
pub fn state(&self) -> TaskState {
self.state.read().unwrap().clone()
}
pub fn pid(&self) -> String {
self.pid.clone()
}
pub fn workflow(&self) -> Arc<Workflow> {
self.workflow.clone()
}
pub fn task(&self, tid: &str) -> Option<Task> {
match self.tree.node(tid) {
Some(node) => Some(node.data()),
None => None,
}
}
pub fn vm(&self) -> Arc<VirtualMachine> {
self.vm.clone()
}
pub fn context(&self) -> Arc<Context> {
let mut ctx = self.ctx.write().unwrap();
if ctx.is_none() {
*ctx = Some(Context::new(self));
}
let ctx = ctx.clone().unwrap();
Arc::new(ctx)
}
pub fn message(&self, id: &str) -> Option<Message> {
match self.messages.read().unwrap().get(id) {
Some(m) => Some(m.clone()),
None => None,
}
}
pub fn set_state(&self, state: &TaskState) {
*self.state.write().unwrap() = state.clone();
self.workflow.set_state(state);
}
pub async fn run_with_task(&self, tid: &str) {
debug!("sch::proc::run_with_task(tid={})", tid);
let opt = self.tree.node(tid);
if let Some(node) = opt {
let task = node.data();
let ctx = &self.context();
task.prepare(ctx);
task.run(ctx);
task.next(ctx);
}
}
pub fn run_with_message(&self, msg: &Message) {
debug!("sch::proc::run_with_message(id={})", msg.id);
let opt = self.tree.node(&msg.tid);
if let Some(node) = opt {
let task = node.data();
let ctx = &self.context();
ctx.set_message(msg);
let mut count = self.sync.lock().unwrap();
task.prepare(ctx);
task.run(ctx);
task.next(ctx);
*count += 1;
}
}
pub fn start(&self) {
let task = self.clone();
let tr = self.tree.clone();
#[cfg(feature = "debug")]
tr.print();
self.set_state(&TaskState::Running);
let data = EventData {
pid: self.pid.clone(),
state: self.state(),
action: EventAction::Create,
vars: HashMap::new(),
};
self.scher.evt().disp_proc(self, &data);
if let Some(root) = &tr.root {
self.scher.sched_task(&task, &root.id());
}
}
pub fn complete(&self) {
let data = EventData {
pid: self.pid.clone(),
state: self.state(),
action: EventAction::Next,
vars: self.workflow.outputs(),
};
self.scher.evt().disp_proc(self, &data);
}
pub fn next(&self, node: &Arc<Node<Task>>, ctx: &Context) {
debug!("next: {}->", node.id());
let state = node.data().state();
match state {
TaskState::Pending
| TaskState::WaitingEvent
| TaskState::Fail(..)
| TaskState::Abort(..) => {
debug!(" post");
node.data().post(ctx);
}
TaskState::Skip => {
self.next_inner(node, ctx);
}
_ => {
let children = node.children();
if children.len() > 0 {
let next = &children[0];
debug!(" {}", next.id());
next.data().set_state(&TaskState::None);
self.scher.sched_task(&self.clone(), &next.id());
} else {
node.data().post(ctx);
self.next_inner(node, ctx);
}
}
}
}
fn next_inner(&self, node: &Arc<Node<Task>>, ctx: &Context) {
let next = node.next().upgrade();
match next {
Some(next) => {
debug!(" {}", next.id());
next.data().set_state(&TaskState::None);
self.scher.sched_task(&self.clone(), &next.id());
}
None => {
debug!(" next=None");
let mut parent = node.parent();
while let Some(p) = &parent {
p.data().post(ctx);
let n = p.next().upgrade();
match n {
Some(next) => {
next.data().set_state(&TaskState::None);
self.scher.sched_task(&self.clone(), &next.id());
break;
}
None => {
parent = p.parent();
}
}
}
}
}
}
pub(crate) fn make_message(&self, tid: &str, user: &str) -> Message {
let msg = Message::new(&self.pid, tid, user, None);
debug!("sch::proc::make_message(id={}, tid={})", msg.id, tid);
self.messages
.write()
.unwrap()
.insert(tid.to_string(), msg.clone());
msg
}
}