Skip to main content

acts_next/scheduler/process/task/
act.rs

1use super::TaskLifeCycle;
2use crate::{
3    Act, ActError, ActRunAs, ActTask, Result, TaskState, scheduler::Context, utils::consts,
4};
5
6impl ActTask for Act {
7    fn init(&self, ctx: &Context) -> Result<()> {
8        let task = ctx.task();
9        if let Some(expr) = &self.r#if {
10            let cond = ctx.eval::<bool>(expr)?;
11            if !cond {
12                task.set_state(TaskState::Skipped);
13                return Ok(());
14            }
15        }
16
17        for s in self.catches.iter() {
18            task.add_hook_catch(TaskLifeCycle::ErrorCatch, s);
19        }
20
21        if !self.timeout.is_empty() {
22            for s in &self.timeout {
23                task.add_hook_timeout(TaskLifeCycle::Timeout, s);
24            }
25        }
26
27        // run setup
28        if !self.setup.is_empty() {
29            ctx.dispatch_acts(self.setup.clone(), true)?;
30        }
31
32        if self.uses.is_empty() {
33            return Err(crate::ActError::Action(format!(
34                "cannot find 'uses' in act '{}' with key '{}'",
35                task.node.id,
36                task.node.content.key()
37            )));
38        }
39
40        // find the package to run
41        let package = ctx.executor.pack().get(&self.uses)?;
42        let schema: serde_json::Value = serde_json::from_str(&package.schema)?;
43        match package.run_as {
44            ActRunAs::Irq => {
45                jsonschema::validate(&schema, &task.params())?;
46                task.set_state(TaskState::Interrupt);
47            }
48            ActRunAs::Msg => {
49                jsonschema::validate(&schema, &task.params())?;
50                task.set_emit_disabled(true);
51                task.set_state(TaskState::Ready);
52            }
53            ActRunAs::Func => {
54                task.set_emit_disabled(true);
55                task.set_state(TaskState::Ready);
56            }
57        }
58
59        Ok(())
60    }
61
62    fn run(&self, ctx: &Context) -> Result<()> {
63        let task = ctx.task();
64
65        // find the package to run
66        let package = ctx.executor.pack().get(&self.uses)?;
67        if matches!(package.run_as, ActRunAs::Msg) {
68            // resume the msg emit state
69            task.set_emit_disabled(false);
70        }
71
72        if matches!(package.run_as, ActRunAs::Func) {
73            let register = ctx
74                .runtime
75                .package()
76                .get(&self.uses)
77                .ok_or(ActError::Runtime(format!(
78                    "cannot find the registed package '{}'",
79                    self.uses
80                )))?;
81            let package = (register.create)(ctx.task().params())?;
82            if let Some(vars) = package.execute(ctx)? {
83                task.update_data(&vars);
84            }
85        }
86
87        let children = task.node.children();
88        if !children.is_empty() {
89            for child in &children {
90                ctx.sched_task(child);
91            }
92        }
93
94        Ok(())
95    }
96
97    fn next(&self, ctx: &Context) -> Result<bool> {
98        let task = ctx.task();
99        let state = task.state();
100        let mut is_next: bool = false;
101        if state.is_running() {
102            let tasks = task.children();
103            let mut count = 0;
104
105            for task in tasks.iter() {
106                if task.state().is_none() || task.state().is_running() {
107                    is_next = true;
108                } else if task.state().is_pending() && task.is_ready() {
109                    // resume task
110                    task.set_state(TaskState::Running);
111                    ctx.runtime.scher().emit_task_event(task)?;
112
113                    task.exec(ctx)?;
114                    is_next = true;
115                }
116                if task.state().is_completed() {
117                    count += 1;
118                }
119            }
120
121            if count == tasks.len() {
122                if task.is_auto_complete() && !task.state().is_completed() {
123                    task.set_state(TaskState::Completed);
124                }
125
126                if let Some(next) = &task.node.next().upgrade() {
127                    ctx.sched_task(next);
128                    return Ok(true);
129                }
130            }
131        } else if (state.is_skip() || state.is_success())
132            && let Some(next) = &task.node.next().upgrade()
133        {
134            ctx.sched_task(next);
135            return Ok(true);
136        }
137        Ok(is_next)
138    }
139
140    fn review(&self, ctx: &Context) -> Result<bool> {
141        let task = ctx.task();
142        let state = task.state();
143        if state.is_running() {
144            let tasks = task.children();
145            let mut count = 0;
146            for t in tasks.iter() {
147                if t.state().is_error() {
148                    ctx.emit_error()?;
149                    return Ok(false);
150                }
151                if t.state().is_skip() {
152                    task.set_state(TaskState::Skipped);
153                    return Ok(true);
154                }
155
156                if t.state().is_success() {
157                    count += 1;
158                }
159            }
160
161            if count == tasks.len() {
162                if !task.state().is_completed() {
163                    task.set_state(TaskState::Completed);
164                }
165
166                if let Some(next) = &task.node.next().upgrade() {
167                    ctx.sched_task(next);
168                    return Ok(false);
169                }
170                return Ok(true);
171            }
172        }
173
174        Ok(false)
175    }
176}
177
178impl Act {
179    pub fn dispatch(&self, ctx: &Context, is_hook_event: bool) -> Result<()> {
180        // let package = ctx.executor.pack().get(&self.uses)?;
181        let mut act = self.clone();
182        if let Some(v) = ctx.get_var::<u32>(consts::ACT_INDEX) {
183            act.inputs.set(consts::ACT_INDEX, v);
184        }
185
186        if let Some(v) = ctx.get_var::<String>(consts::ACT_VALUE) {
187            act.inputs.set(consts::ACT_VALUE, v);
188        }
189
190        ctx.dispatch_act(self, is_hook_event)?;
191        Ok(())
192    }
193}