use crate::{
ActError, Error, Message, Result, Workflow,
data::{self, MessageStatus},
scheduler::{self, Node, Runtime, StatementBatch, TaskLifeCycle, TaskState},
store::{Store, query::*},
utils::{self, Id},
};
use std::{collections::HashMap, sync::Arc};
use tracing::debug;
impl Store {
pub fn load(&self, cap: usize, rt: &Arc<Runtime>) -> Result<Vec<Arc<scheduler::Process>>> {
debug!("load cap={}", cap);
let mut ret = Vec::new();
if cap > 0 {
let query = Query::new()
.push(
Cond::or()
.push(Expr::eq("state", TaskState::None.to_string()))
.push(Expr::eq("state", TaskState::Ready.to_string()))
.push(Expr::eq("state", TaskState::Running.to_string()))
.push(Expr::eq("state", TaskState::Pending.to_string())),
)
.set_limit(cap);
let procs = self.procs().query(&query)?;
for p in procs.rows {
let model = Workflow::from_json(&p.model)?;
let env_local: serde_json::Value =
serde_json::from_str(&p.env).map_err(|err| ActError::Store(err.to_string()))?;
let state = p.state.clone();
let proc = scheduler::Process::new_with_timestamp(&p.id, p.timestamp, rt);
proc.load(&model)?;
proc.set_pure_state(state.into());
proc.set_start_time(p.start_time);
proc.set_end_time(p.end_time);
proc.set_env(&env_local.into());
if let Some(err) = p.err {
let err: Error = serde_json::from_str(&err)
.map_err(|err| ActError::Store(err.to_string()))?;
proc.set_pure_err(&err)
}
self.load_tasks(&proc, rt)?;
ret.push(proc);
}
}
Ok(ret)
}
pub fn load_proc(
&self,
pid: &str,
rt: &Arc<Runtime>,
) -> Result<Option<Arc<scheduler::Process>>> {
debug!("load process pid={}", pid);
match self.procs().find(pid) {
Ok(p) => {
let model = Workflow::from_json(&p.model)?;
let proc = scheduler::Process::new(pid, rt);
let env_local: serde_json::Value =
serde_json::from_str(&p.env).map_err(|err| ActError::Store(err.to_string()))?;
proc.load(&model)?;
proc.set_pure_state(p.state.into());
proc.set_start_time(p.start_time);
proc.set_env(&env_local.into());
self.load_tasks(&proc, rt)?;
if let Some(err) = p.err {
let err: Error = serde_json::from_str(&err)
.map_err(|err| ActError::Store(err.to_string()))?;
proc.set_pure_err(&err)
}
Ok(Some(proc))
}
Err(_) => Ok(None),
}
}
pub fn remove_proc(&self, pid: &str) -> Result<bool> {
debug!("remove_proc pid={}", pid);
let q = Query::new().push(Cond::and().push(Expr::eq("pid", pid.to_string())));
let tasks = self.tasks().query(&q)?;
for task in tasks.rows {
self.tasks().delete(&task.id)?;
}
self.procs().delete(pid)?;
Ok(true)
}
pub fn set_message(&self, id: &str, status: MessageStatus) -> Result<()> {
if let Ok(mut message) = self.messages().find(id) {
message.status = status;
message.update_time = utils::time::time_millis();
self.messages().update(&message)?;
}
Ok(())
}
pub fn set_message_with(&self, pid: &str, tid: &str, status: MessageStatus) -> Result<bool> {
debug!("set_message_with pid={pid} tid={tid} status={status:?}");
let q = Query::new().push(
Cond::and()
.push(Expr::eq("pid", pid.to_string()))
.push(Expr::eq("tid", tid.to_string())),
);
let collection = self.messages();
if let Ok(messages) = collection.query(&q) {
for m in messages.rows.iter() {
let mut m = m.clone();
m.status = status;
m.update_time = utils::time::time_millis();
collection.update(&m)?;
}
}
Ok(true)
}
pub fn with_no_response_messages<F: Fn(&Message)>(
&self,
timeout_millis: i64,
max_message_retry_times: i32,
f: F,
) -> Result<()> {
let q = Query::new().set_limit(300).push(
Cond::and()
.push(Expr::eq("status", MessageStatus::Created))
.push(Expr::lt(
"update_time",
utils::time::time_millis() - timeout_millis,
)),
);
let collection = self.messages();
if let Ok(messages) = collection.query(&q) {
for m in messages.rows.iter() {
let mut message = m.clone();
message.update_time = utils::time::time_millis();
if message.retry_times < max_message_retry_times {
message.retry_times += 1;
let _ = collection.update(&message);
f(&message.into());
} else {
message.status = MessageStatus::Error;
let _ = collection.update(&message);
}
}
}
Ok(())
}
pub fn resend_error_messages(&self) -> Result<()> {
let collection = self.messages();
let q = Query::new().push(Cond::and().push(Expr::eq("status", MessageStatus::Error)));
if let Ok(messages) = collection.query(&q) {
for m in messages.rows.iter() {
let mut message = m.clone();
message.status = MessageStatus::Created;
message.retry_times = 0;
message.update_time = utils::time::time_millis();
collection.update(&message)?;
}
}
Ok(())
}
pub fn clear_error_messages(&self, pid: Option<String>) -> Result<()> {
let collection = self.messages();
let mut cond = Cond::and().push(Expr::eq("status", MessageStatus::Error));
if let Some(pid) = &pid {
cond = cond.push(Expr::eq("pid", pid));
}
let q = Query::new().push(cond);
if let Ok(messages) = collection.query(&q) {
for m in messages.rows.iter() {
collection.delete(&m.id)?;
}
}
Ok(())
}
pub fn upsert_task(&self, task: &Arc<scheduler::Task>) -> Result<()> {
debug!("upsert_task: {task:?}");
let collection = self.tasks();
let data: data::Task = task.into_data()?;
let id = Id::new(&task.pid, &task.id);
match collection.find(&id.id()) {
Ok(_) => {
collection.update(&data)?;
}
Err(_) => {
collection.create(&data)?;
}
}
Ok(())
}
pub fn upsert_proc(&self, proc: &Arc<scheduler::Process>) -> Result<()> {
debug!("upsert process: {}", proc.id());
let collection = self.procs();
let data: data::Proc = proc.into_data()?;
match collection.find(proc.id()) {
Ok(_) => {
collection.update(&data)?;
}
Err(_) => {
collection.create(&data)?;
}
}
Ok(())
}
fn load_tasks(&self, proc: &Arc<scheduler::Process>, rt: &Arc<Runtime>) -> Result<()> {
debug!("load_tasks pid={}", proc.id());
let collection = self.tasks();
let tree = &proc.tree();
let query = Query::new().push(Cond::and().push(Expr::eq("pid", proc.id())));
let tasks = collection.query(&query)?;
for t in tasks.rows {
let state: TaskState = t.state.into();
let node = Node::from_str(&t.node_data, tree);
let mut task = scheduler::Task::new(proc, &t.tid, node, rt);
task.set_pure_state(state.clone());
task.set_start_time(t.start_time);
task.set_end_time(t.end_time);
task.timestamp = t.timestamp;
task.set_prev(t.prev);
let data =
serde_json::from_str(&t.data).map_err(|err| ActError::Store(err.to_string()))?;
task.set_data(&data);
let hooks: HashMap<TaskLifeCycle, Vec<StatementBatch>> =
serde_json::from_str(&t.hooks).map_err(|err| ActError::Store(err.to_string()))?;
task.set_hooks(&hooks);
if let Some(err) = t.err {
let err: Error =
serde_json::from_str(&err).map_err(|err| ActError::Store(err.to_string()))?;
task.set_pure_err(&err)
}
proc.push_task(Arc::new(task));
}
Ok(())
}
}