1use super::{ActTask, Runtime};
2use crate::{
3 Act, ActError, Executor, Message, MessageState, NodeKind, Result, TaskState, Vars,
4 event::{Action, Model},
5 scheduler::{
6 Node, Process, Task,
7 tree::{NodeContent, dyn_build_act},
8 },
9 utils::{self, consts, shortid},
10};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use std::{any::type_name, cell::RefCell, sync::Arc, vec};
13use tracing::debug;
14
15tokio::task_local! {
16 static CONTEXT: Context;
17}
18
19#[derive(Clone)]
20pub struct Context {
21 pub runtime: Arc<Runtime>,
24 pub executor: Arc<Executor>,
25 pub proc: Arc<Process>,
26 task: RefCell<Arc<Task>>,
27 action: RefCell<Option<Action>>,
28 vars: RefCell<Vars>,
29}
30
31impl std::fmt::Debug for Context {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 f.debug_struct("Context")
34 .field("pid", &self.proc.id())
35 .field("tid", &self.task().id)
36 .field("action", &self.action())
37 .finish()
38 }
39}
40
41impl Context {
42 fn init_vars(&self, task: &Arc<Task>) {
43 let inputs = task.inputs();
44 debug!("init_vars: {inputs}");
45
46 self.task().set_data_with(|data| {
48 for (ref k, v) in &inputs {
49 data.set(k, v.clone());
50 }
51 });
52 }
53
54 pub fn new(proc: &Arc<Process>, task: &Arc<Task>) -> Self {
55 Context {
56 runtime: task.runtime().clone(),
57 executor: Arc::new(Executor::new(task.runtime())),
58 proc: proc.clone(),
59 action: RefCell::new(None),
60 task: RefCell::new(task.clone()),
61 vars: RefCell::new(Vars::new()),
62 }
63 }
64
65 pub fn scope<T, F: Fn() -> T>(ctx: Context, f: F) -> T {
66 if Context::current().is_ok() {
67 f()
68 } else {
69 CONTEXT.sync_scope(ctx, f)
70 }
71 }
72
73 pub fn with<T, F: Fn(&Context) -> T>(f: F) -> T {
74 CONTEXT.with(|ctx| f(ctx))
75 }
76
77 pub fn current() -> Result<Context> {
78 CONTEXT
79 .try_with(Clone::clone)
80 .map_err(|e| ActError::Runtime(e.to_string()))
81 }
82
83 pub fn set_task(&self, task: &Arc<Task>) {
84 if self.task.borrow().id != task.id {
85 *self.task.borrow_mut() = task.clone();
86 }
87 }
88
89 pub fn task(&self) -> Arc<Task> {
90 self.task.borrow().clone()
91 }
92
93 pub fn prepare(&self) {
94 self.init_vars(&self.task());
95 }
96
97 pub fn set_action(&self, action: &Action) -> Result<()> {
98 *self.action.borrow_mut() = Some(action.clone());
99
100 let mut vars = self.vars.borrow_mut();
102 for (name, v) in &action.options {
103 vars.entry(name.to_string())
104 .and_modify(|i| *i = v.clone())
105 .or_insert(v.clone());
106 }
107
108 Ok(())
109 }
110
111 pub fn vars(&self) -> Vars {
112 self.vars.borrow().clone()
113 }
114
115 pub fn set_env<T>(&self, name: &str, value: T)
116 where
117 T: Serialize + Clone,
118 {
119 self.proc.with_env_mut(|data| {
122 data.set(name, value);
123 });
124 }
125
126 pub fn get_env<T>(&self, name: &str) -> Option<T>
127 where
128 T: for<'de> Deserialize<'de> + Clone,
129 {
130 if let Some(v) = self.proc.with_env(|vars| vars.get(name)) {
132 return Some(v);
133 }
134
135 if let Ok(v) = std::env::var(name) {
137 #[allow(clippy::expect_fun_call)]
138 return Some(T::deserialize(serde_json::json!(v)).expect(&format!(
139 "cannot convert env '{name} to {}",
140 type_name::<T>()
141 )));
142 }
143
144 None
145 }
146
147 pub fn set_var<T>(&self, name: &str, value: T)
148 where
149 T: Serialize + Clone,
150 {
151 self.vars.borrow_mut().set(name, value);
152 }
153
154 pub fn get_var<T>(&self, name: &str) -> Option<T>
155 where
156 T: for<'de> Deserialize<'de> + Clone,
157 {
158 self.vars.borrow().get::<T>(name)
159 }
160
161 pub fn eval<T: DeserializeOwned + Serialize>(&self, expr: &str) -> Result<T> {
162 Context::scope(self.clone(), || self.runtime.env().eval::<T>(expr))
163 }
164
165 #[allow(unused)]
166 pub(in crate::scheduler) fn action(&self) -> Option<Action> {
167 self.action.borrow().clone()
168 }
169
170 pub fn sched_task(&self, node: &Arc<Node>) {
171 debug!("sched_task: {}", node.to_string());
172 let task = self.proc.create_task(node, Some(self.task()));
173 self.runtime.push(&task);
174 }
175
176 pub fn dispatch_act(&self, act: &Act, is_hook_event: bool) -> Result<()> {
177 debug!("dispatch_act: {act:?} {:?}", self.task);
178 let task = self.task();
179 let mut id = act.id.to_string();
196 if id.is_empty() {
197 id = shortid();
198 }
199
200 let node = Arc::new(Node::new(
201 &id,
202 NodeContent::Act(act.clone()),
203 task.node().level + 1,
204 ));
205
206 if !task.state().is_none() {
215 let task = self.proc.create_task(&node, Some(task));
216
217 if is_hook_event {
219 task.set_data_with(|data| data.set(consts::IS_EVENT_PROCESSED, true));
220 }
221 self.runtime.push(&task);
222 }
223
224 Ok(())
225 }
226
227 pub fn build_acts(&self, acts: &[Act], is_sequence: bool) -> Result<()> {
228 let task = self.task();
229
230 let mut prev = task.node().clone();
231 let parent = task.node().clone();
232 let mut acts = acts.to_owned();
233 for (index, act) in acts.iter_mut().enumerate() {
234 dyn_build_act(
235 act,
236 &parent,
237 &mut prev,
238 parent.level + 1,
239 index,
240 is_sequence,
241 )?;
242 }
243
244 Ok(())
245 }
246
247 pub fn dispatch_acts(&self, acts: Vec<Act>, is_sequence: bool) -> Result<()> {
248 let task = self.task();
249 let mut normal_acts = vec![];
250 acts.iter().for_each(|act| {
251 if let Some(on) = act.on.as_ref() {
253 match on {
254 crate::ActEvent::Created => {
255 task.add_hook_stmts(super::TaskLifeCycle::Created, act)
256 }
257 crate::ActEvent::Completed => {
258 task.add_hook_stmts(super::TaskLifeCycle::Completed, act)
259 }
260 crate::ActEvent::BeforeUpdate => {
261 task.add_hook_stmts(super::TaskLifeCycle::BeforeUpdate, act)
262 }
263 crate::ActEvent::Updated => {
264 task.add_hook_stmts(super::TaskLifeCycle::Updated, act)
265 }
266 crate::ActEvent::Step => task.add_hook_stmts(super::TaskLifeCycle::Step, act),
267 }
268 } else {
269 normal_acts.push(act.clone());
270 }
271 });
272 self.build_acts(&normal_acts, is_sequence)?;
273 Ok(())
274 }
275
276 pub fn redo_task(&self, task: &Arc<Task>) -> Result<()> {
278 if let Some(prev) = task.prev()
279 && let Some(prev_task) = self.proc.task(&prev)
280 {
281 let task = self.proc.create_task(task.node(), Some(prev_task));
282 self.runtime.push(&task);
283 }
284
285 Ok(())
286 }
287
288 pub fn back_task(&self, task: &Arc<Task>, paths: &Vec<Arc<Task>>) -> Result<()> {
289 for task in task.siblings().iter() {
290 if task.state().is_completed() {
291 continue;
292 }
293 task.set_state(TaskState::Skipped);
294 self.emit_task(task)?;
295 }
296
297 task.set_state(TaskState::Backed);
298 self.emit_task(task)?;
299
300 if task.is_kind(NodeKind::Act) {
302 let mut parent = task.parent();
303 while let Some(p) = parent {
304 if p.is_kind(NodeKind::Step) || p.is_kind(NodeKind::Act) {
305 p.set_state(TaskState::Backed);
306 self.emit_task(&p)?;
307 break;
308 }
309 parent = p.parent();
310 }
311 }
312
313 for p in paths {
315 if p.state().is_running() {
316 p.set_state(TaskState::Completed);
317 self.emit_task(p)?;
318 } else if p.state().is_pending() {
319 p.set_state(TaskState::Skipped);
320 self.emit_task(p)?;
321 }
322 }
323
324 Ok(())
325 }
326
327 pub fn abort_task(&self, task: &Arc<Task>) -> Result<()> {
328 for task in task.siblings().iter() {
330 if task.state().is_completed() {
331 continue;
332 }
333 task.set_state(TaskState::Skipped);
334 self.emit_task(task)?;
335 }
336
337 task.set_state(TaskState::Aborted);
338 task.set_data(&self.vars());
339 self.emit_task(task)?;
340
341 let ctx = self;
343 let mut parent = task.parent();
344 while let Some(task) = parent {
345 task.set_state(TaskState::Aborted);
346 ctx.set_task(&task);
347 ctx.emit_task(&ctx.task())?;
348
349 for t in task.children() {
350 if t.state().is_pending() {
351 t.set_state(TaskState::Skipped);
352 ctx.emit_task(&t)?;
353 } else if t.state().is_running() {
354 t.set_state(TaskState::Aborted);
355 ctx.emit_task(&t)?;
356 }
357 }
358
359 parent = task.parent();
360 }
361 Ok(())
362 }
363
364 pub fn undo_task(&self, task: &Arc<Task>) -> Result<()> {
367 if task.state().is_completed() {
368 return Err(ActError::Action(format!(
369 "task('{}') is not allowed to cancel",
370 task.id
371 )));
372 }
373
374 let mut children = task.children();
376 while !children.is_empty() {
377 let mut nexts = Vec::new();
378 for t in &children {
379 if t.state().is_completed() {
380 continue;
381 }
382 t.set_state(TaskState::Cancelled);
383 self.emit_task(t)?;
384 nexts.extend_from_slice(&t.children());
385 }
386
387 children = nexts;
388 }
389 task.set_state(TaskState::Completed);
390 self.emit_task(task)?;
391
392 Ok(())
393 }
394
395 pub fn emit_error(&self) -> Result<()> {
396 let task = self.task();
397 if task.state().is_error() {
398 self.emit_task(&task)?;
399
400 if task.state().is_error()
402 && let Some(err) = task.err()
403 && let Some(parent) = task.parent()
404 {
405 parent.set_err(&err);
406 return parent.error(self);
407 }
408 }
409
410 Ok(())
411 }
412
413 pub fn emit_task(&self, task: &Arc<Task>) -> Result<()> {
414 debug!("ctx::emit_task, task={:?}", task);
415
416 if let NodeContent::Workflow(_) = &task.node().content
418 && task.state().is_created()
419 {
420 if self.proc.state().is_none() {
421 self.proc.set_state(TaskState::Running);
422 }
423 self.runtime.scher().emit_proc_event(&self.proc);
424 }
425
426 self.runtime.scher().emit_task_event(task)?;
427
428 if let NodeContent::Workflow(_) = &task.node().content
430 && task.state().is_completed()
431 {
432 self.proc.set_state(task.state());
433 if let Some(err) = task.err() {
434 self.proc.set_err(&err);
435 }
436 self.runtime.scher().emit_proc_event(&self.proc);
437 }
438
439 Ok(())
440 }
441
442 pub fn emit_message(&self, msg: &Act) -> Result<()> {
443 debug!("emit_message: {:?}", msg);
444 let workflow = self.proc.model();
445 let mut inputs = utils::fill_inputs(&msg.inputs, self);
446 inputs.set(consts::ACT_OPTIONS_KEY, msg.options.clone());
448
449 let params = utils::fill_params(&msg.params, self);
451 inputs.set(consts::ACT_PARAMS_KEY, params);
452
453 let task = self.task();
454 if let Some(err) = task.err() {
455 inputs.set(consts::ACT_ERR_MESSAGE, err.message);
456 inputs.set(consts::ACT_ERR_CODE, err.ecode);
457 }
458
459 let state: MessageState = MessageState::Completed;
460 let msg = Message {
461 id: utils::longid(),
462 r#type: "act".to_string(),
463 state,
464 pid: task.pid.clone(),
465 tid: task.id.clone(),
466 key: msg.key.clone(),
467 name: task.node().name(),
468 uses: msg.uses.clone(),
469 model: Model {
470 id: workflow.id.clone(),
471 name: workflow.name.to_string(),
472 tag: workflow.tag.to_string(),
473 },
474
475 tag: msg.tag.to_string(),
476 inputs,
477 ..Default::default()
478 };
479
480 self.runtime.emitter().emit_message(&msg);
481 Ok(())
482 }
483}