Skip to main content

acts_next/scheduler/
context.rs

1use super::{ActTask, Runtime};
2use crate::{
3    Act, ActError, Executor, Message, MessageState, NodeKind, Result, TaskState, Vars,
4    event::{Action, Model},
5    scheduler::{
6        Node, Process, Task,
7        tree::{NodeContent, dyn_build_act},
8    },
9    utils::{self, consts, shortid},
10};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use std::{any::type_name, cell::RefCell, sync::Arc, vec};
13use tracing::debug;
14
15tokio::task_local! {
16    static CONTEXT: Context;
17}
18
19#[derive(Clone)]
20pub struct Context {
21    // pub scher: Arc<Scheduler>,
22    // pub env: Arc<Enviroment>,
23    pub runtime: Arc<Runtime>,
24    pub executor: Arc<Executor>,
25    pub proc: Arc<Process>,
26    task: RefCell<Arc<Task>>,
27    action: RefCell<Option<Action>>,
28    vars: RefCell<Vars>,
29}
30
31impl std::fmt::Debug for Context {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        f.debug_struct("Context")
34            .field("pid", &self.proc.id())
35            .field("tid", &self.task().id)
36            .field("action", &self.action())
37            .finish()
38    }
39}
40
41impl Context {
42    fn init_vars(&self, task: &Arc<Task>) {
43        let inputs = task.inputs();
44        debug!("init_vars: {inputs}");
45
46        // set the inputs to task's data
47        self.task().set_data_with(|data| {
48            for (ref k, v) in &inputs {
49                data.set(k, v.clone());
50            }
51        });
52    }
53
54    pub fn new(proc: &Arc<Process>, task: &Arc<Task>) -> Self {
55        Context {
56            runtime: task.runtime().clone(),
57            executor: Arc::new(Executor::new(task.runtime())),
58            proc: proc.clone(),
59            action: RefCell::new(None),
60            task: RefCell::new(task.clone()),
61            vars: RefCell::new(Vars::new()),
62        }
63    }
64
65    pub fn scope<T, F: Fn() -> T>(ctx: Context, f: F) -> T {
66        if Context::current().is_ok() {
67            f()
68        } else {
69            CONTEXT.sync_scope(ctx, f)
70        }
71    }
72
73    pub fn with<T, F: Fn(&Context) -> T>(f: F) -> T {
74        CONTEXT.with(|ctx| f(ctx))
75    }
76
77    pub fn current() -> Result<Context> {
78        CONTEXT
79            .try_with(Clone::clone)
80            .map_err(|e| ActError::Runtime(e.to_string()))
81    }
82
83    pub fn set_task(&self, task: &Arc<Task>) {
84        if self.task.borrow().id != task.id {
85            *self.task.borrow_mut() = task.clone();
86        }
87    }
88
89    pub fn task(&self) -> Arc<Task> {
90        self.task.borrow().clone()
91    }
92
93    pub fn prepare(&self) {
94        self.init_vars(&self.task());
95    }
96
97    pub fn set_action(&self, action: &Action) -> Result<()> {
98        *self.action.borrow_mut() = Some(action.clone());
99
100        // set the action options to the context
101        let mut vars = self.vars.borrow_mut();
102        for (name, v) in &action.options {
103            vars.entry(name.to_string())
104                .and_modify(|i| *i = v.clone())
105                .or_insert(v.clone());
106        }
107
108        Ok(())
109    }
110
111    pub fn vars(&self) -> Vars {
112        self.vars.borrow().clone()
113    }
114
115    pub fn set_env<T>(&self, name: &str, value: T)
116    where
117        T: Serialize + Clone,
118    {
119        // in context, the global env is not writable
120        // just set the value to local env of the process
121        self.proc.with_env_mut(|data| {
122            data.set(name, value);
123        });
124    }
125
126    pub fn get_env<T>(&self, name: &str) -> Option<T>
127    where
128        T: for<'de> Deserialize<'de> + Clone,
129    {
130        // find the env from proc
131        if let Some(v) = self.proc.with_env(|vars| vars.get(name)) {
132            return Some(v);
133        }
134
135        // get from system env
136        if let Ok(v) = std::env::var(name) {
137            #[allow(clippy::expect_fun_call)]
138            return Some(T::deserialize(serde_json::json!(v)).expect(&format!(
139                "cannot convert env '{name} to {}",
140                type_name::<T>()
141            )));
142        }
143
144        None
145    }
146
147    pub fn set_var<T>(&self, name: &str, value: T)
148    where
149        T: Serialize + Clone,
150    {
151        self.vars.borrow_mut().set(name, value);
152    }
153
154    pub fn get_var<T>(&self, name: &str) -> Option<T>
155    where
156        T: for<'de> Deserialize<'de> + Clone,
157    {
158        self.vars.borrow().get::<T>(name)
159    }
160
161    pub fn eval<T: DeserializeOwned + Serialize>(&self, expr: &str) -> Result<T> {
162        Context::scope(self.clone(), || self.runtime.env().eval::<T>(expr))
163    }
164
165    #[allow(unused)]
166    pub(in crate::scheduler) fn action(&self) -> Option<Action> {
167        self.action.borrow().clone()
168    }
169
170    pub fn sched_task(&self, node: &Arc<Node>) {
171        debug!("sched_task: {}", node.to_string());
172        let task = self.proc.create_task(node, Some(self.task()));
173        self.runtime.push(&task);
174    }
175
176    pub fn dispatch_act(&self, act: &Act, is_hook_event: bool) -> Result<()> {
177        debug!("dispatch_act: {act:?}  {:?}", self.task);
178        let task = self.task();
179        // if task.is_kind(NodeKind::Act) {
180        //     let is_block_backage = task.is_uses(consts::ACT_TYPE_BLOCK);
181
182        //     // not package act or completed package
183        //     if !is_block_backage || task.state().is_completed() {
184        //         // find its parent to append task
185        //         while let Some(parent) = task.parent() {
186        //             if parent.is_kind(NodeKind::Step) || parent.is_uses(consts::ACT_TYPE_BLOCK) {
187        //                 task = parent;
188        //                 break;
189        //             }
190        //             task = parent;
191        //         }
192        //     }
193        // }
194
195        let mut id = act.id.to_string();
196        if id.is_empty() {
197            id = shortid();
198        }
199
200        let node = Arc::new(Node::new(
201            &id,
202            NodeContent::Act(act.clone()),
203            task.node().level + 1,
204        ));
205
206        // let task = self.proc.create_task(&node, Some(task));
207        // if is_hook_event {
208        //     // set the tag on task to avoid the repetitive hook execution
209        //     task.set_data_with(|data| data.set(consts::IS_EVENT_PROCESSED, true));
210        // }
211
212        // self.runtime.push(&task);
213
214        if !task.state().is_none() {
215            let task = self.proc.create_task(&node, Some(task));
216
217            // set the tag on task to avoid the repetitive hook execution
218            if is_hook_event {
219                task.set_data_with(|data| data.set(consts::IS_EVENT_PROCESSED, true));
220            }
221            self.runtime.push(&task);
222        }
223
224        Ok(())
225    }
226
227    pub fn build_acts(&self, acts: &[Act], is_sequence: bool) -> Result<()> {
228        let task = self.task();
229
230        let mut prev = task.node().clone();
231        let parent = task.node().clone();
232        let mut acts = acts.to_owned();
233        for (index, act) in acts.iter_mut().enumerate() {
234            dyn_build_act(
235                act,
236                &parent,
237                &mut prev,
238                parent.level + 1,
239                index,
240                is_sequence,
241            )?;
242        }
243
244        Ok(())
245    }
246
247    pub fn dispatch_acts(&self, acts: Vec<Act>, is_sequence: bool) -> Result<()> {
248        let task = self.task();
249        let mut normal_acts = vec![];
250        acts.iter().for_each(|act| {
251            // if the act is setting the 'on' event, it will be added to the task's hook
252            if let Some(on) = act.on.as_ref() {
253                match on {
254                    crate::ActEvent::Created => {
255                        task.add_hook_stmts(super::TaskLifeCycle::Created, act)
256                    }
257                    crate::ActEvent::Completed => {
258                        task.add_hook_stmts(super::TaskLifeCycle::Completed, act)
259                    }
260                    crate::ActEvent::BeforeUpdate => {
261                        task.add_hook_stmts(super::TaskLifeCycle::BeforeUpdate, act)
262                    }
263                    crate::ActEvent::Updated => {
264                        task.add_hook_stmts(super::TaskLifeCycle::Updated, act)
265                    }
266                    crate::ActEvent::Step => task.add_hook_stmts(super::TaskLifeCycle::Step, act),
267                }
268            } else {
269                normal_acts.push(act.clone());
270            }
271        });
272        self.build_acts(&normal_acts, is_sequence)?;
273        Ok(())
274    }
275
276    /// redo the task and dispatch directly
277    pub fn redo_task(&self, task: &Arc<Task>) -> Result<()> {
278        if let Some(prev) = task.prev()
279            && let Some(prev_task) = self.proc.task(&prev)
280        {
281            let task = self.proc.create_task(task.node(), Some(prev_task));
282            self.runtime.push(&task);
283        }
284
285        Ok(())
286    }
287
288    pub fn back_task(&self, task: &Arc<Task>, paths: &Vec<Arc<Task>>) -> Result<()> {
289        for task in task.siblings().iter() {
290            if task.state().is_completed() {
291                continue;
292            }
293            task.set_state(TaskState::Skipped);
294            self.emit_task(task)?;
295        }
296
297        task.set_state(TaskState::Backed);
298        self.emit_task(task)?;
299
300        // find parent util to the step task and marks it as backed
301        if task.is_kind(NodeKind::Act) {
302            let mut parent = task.parent();
303            while let Some(p) = parent {
304                if p.is_kind(NodeKind::Step) || p.is_kind(NodeKind::Act) {
305                    p.set_state(TaskState::Backed);
306                    self.emit_task(&p)?;
307                    break;
308                }
309                parent = p.parent();
310            }
311        }
312
313        // marks the state in the paths
314        for p in paths {
315            if p.state().is_running() {
316                p.set_state(TaskState::Completed);
317                self.emit_task(p)?;
318            } else if p.state().is_pending() {
319                p.set_state(TaskState::Skipped);
320                self.emit_task(p)?;
321            }
322        }
323
324        Ok(())
325    }
326
327    pub fn abort_task(&self, task: &Arc<Task>) -> Result<()> {
328        // abort all task's acts
329        for task in task.siblings().iter() {
330            if task.state().is_completed() {
331                continue;
332            }
333            task.set_state(TaskState::Skipped);
334            self.emit_task(task)?;
335        }
336
337        task.set_state(TaskState::Aborted);
338        task.set_data(&self.vars());
339        self.emit_task(task)?;
340
341        // abort all running task
342        let ctx = self;
343        let mut parent = task.parent();
344        while let Some(task) = parent {
345            task.set_state(TaskState::Aborted);
346            ctx.set_task(&task);
347            ctx.emit_task(&ctx.task())?;
348
349            for t in task.children() {
350                if t.state().is_pending() {
351                    t.set_state(TaskState::Skipped);
352                    ctx.emit_task(&t)?;
353                } else if t.state().is_running() {
354                    t.set_state(TaskState::Aborted);
355                    ctx.emit_task(&t)?;
356                }
357            }
358
359            parent = task.parent();
360        }
361        Ok(())
362    }
363
364    /// undo task
365    /// the undo task is a step task, set the task as completed and set the children acts as cancelled
366    pub fn undo_task(&self, task: &Arc<Task>) -> Result<()> {
367        if task.state().is_completed() {
368            return Err(ActError::Action(format!(
369                "task('{}') is not allowed to cancel",
370                task.id
371            )));
372        }
373
374        // cancel all of the task's children
375        let mut children = task.children();
376        while !children.is_empty() {
377            let mut nexts = Vec::new();
378            for t in &children {
379                if t.state().is_completed() {
380                    continue;
381                }
382                t.set_state(TaskState::Cancelled);
383                self.emit_task(t)?;
384                nexts.extend_from_slice(&t.children());
385            }
386
387            children = nexts;
388        }
389        task.set_state(TaskState::Completed);
390        self.emit_task(task)?;
391
392        Ok(())
393    }
394
395    pub fn emit_error(&self) -> Result<()> {
396        let task = self.task();
397        if task.state().is_error() {
398            self.emit_task(&task)?;
399
400            // after emitting, re-check the task state
401            if task.state().is_error()
402                && let Some(err) = task.err()
403                && let Some(parent) = task.parent()
404            {
405                parent.set_err(&err);
406                return parent.error(self);
407            }
408        }
409
410        Ok(())
411    }
412
413    pub fn emit_task(&self, task: &Arc<Task>) -> Result<()> {
414        debug!("ctx::emit_task, task={:?}", task);
415
416        // on workflow start
417        if let NodeContent::Workflow(_) = &task.node().content
418            && task.state().is_created()
419        {
420            if self.proc.state().is_none() {
421                self.proc.set_state(TaskState::Running);
422            }
423            self.runtime.scher().emit_proc_event(&self.proc);
424        }
425
426        self.runtime.scher().emit_task_event(task)?;
427
428        // on workflow complete
429        if let NodeContent::Workflow(_) = &task.node().content
430            && task.state().is_completed()
431        {
432            self.proc.set_state(task.state());
433            if let Some(err) = task.err() {
434                self.proc.set_err(&err);
435            }
436            self.runtime.scher().emit_proc_event(&self.proc);
437        }
438
439        Ok(())
440    }
441
442    pub fn emit_message(&self, msg: &Act) -> Result<()> {
443        debug!("emit_message: {:?}", msg);
444        let workflow = self.proc.model();
445        let mut inputs = utils::fill_inputs(&msg.inputs, self);
446        // append act.optins to inputs
447        inputs.set(consts::ACT_OPTIONS_KEY, msg.options.clone());
448
449        // append act.params to inputs
450        let params = utils::fill_params(&msg.params, self);
451        inputs.set(consts::ACT_PARAMS_KEY, params);
452
453        let task = self.task();
454        if let Some(err) = task.err() {
455            inputs.set(consts::ACT_ERR_MESSAGE, err.message);
456            inputs.set(consts::ACT_ERR_CODE, err.ecode);
457        }
458
459        let state: MessageState = MessageState::Completed;
460        let msg = Message {
461            id: utils::longid(),
462            r#type: "act".to_string(),
463            state,
464            pid: task.pid.clone(),
465            tid: task.id.clone(),
466            key: msg.key.clone(),
467            name: task.node().name(),
468            uses: msg.uses.clone(),
469            model: Model {
470                id: workflow.id.clone(),
471                name: workflow.name.to_string(),
472                tag: workflow.tag.to_string(),
473            },
474
475            tag: msg.tag.to_string(),
476            inputs,
477            ..Default::default()
478        };
479
480        self.runtime.emitter().emit_message(&msg);
481        Ok(())
482    }
483}