acts_next/scheduler/process/task/
act.rs1use super::TaskLifeCycle;
2use crate::{
3 Act, ActError, ActRunAs, ActTask, Result, TaskState, scheduler::Context, utils::consts,
4};
5
6impl ActTask for Act {
7 fn init(&self, ctx: &Context) -> Result<()> {
8 let task = ctx.task();
9 if let Some(expr) = &self.r#if {
10 let cond = ctx.eval::<bool>(expr)?;
11 if !cond {
12 task.set_state(TaskState::Skipped);
13 return Ok(());
14 }
15 }
16
17 for s in self.catches.iter() {
18 task.add_hook_catch(TaskLifeCycle::ErrorCatch, s);
19 }
20
21 if !self.timeout.is_empty() {
22 for s in &self.timeout {
23 task.add_hook_timeout(TaskLifeCycle::Timeout, s);
24 }
25 }
26
27 if !self.setup.is_empty() {
29 ctx.dispatch_acts(self.setup.clone(), true)?;
30 }
31
32 if self.uses.is_empty() {
33 return Err(crate::ActError::Action(format!(
34 "cannot find 'uses' in act '{}' with key '{}'",
35 task.node.id,
36 task.node.content.key()
37 )));
38 }
39
40 let package = ctx.executor.pack().get(&self.uses)?;
42 let schema: serde_json::Value = serde_json::from_str(&package.schema)?;
43 match package.run_as {
44 ActRunAs::Irq => {
45 jsonschema::validate(&schema, &task.params())?;
46 task.set_state(TaskState::Interrupt);
47 }
48 ActRunAs::Msg => {
49 jsonschema::validate(&schema, &task.params())?;
50 task.set_emit_disabled(true);
51 task.set_state(TaskState::Ready);
52 }
53 ActRunAs::Func => {
54 task.set_emit_disabled(true);
55 task.set_state(TaskState::Ready);
56 }
57 }
58
59 Ok(())
60 }
61
62 fn run(&self, ctx: &Context) -> Result<()> {
63 let task = ctx.task();
64
65 let package = ctx.executor.pack().get(&self.uses)?;
67 if matches!(package.run_as, ActRunAs::Msg) {
68 task.set_emit_disabled(false);
70 }
71
72 if matches!(package.run_as, ActRunAs::Func) {
73 let register = ctx
74 .runtime
75 .package()
76 .get(&self.uses)
77 .ok_or(ActError::Runtime(format!(
78 "cannot find the registed package '{}'",
79 self.uses
80 )))?;
81 let package = (register.create)(ctx.task().params())?;
82 if let Some(vars) = package.execute(ctx)? {
83 task.update_data(&vars);
84 }
85 }
86
87 let children = task.node.children();
88 if !children.is_empty() {
89 for child in &children {
90 ctx.sched_task(child);
91 }
92 }
93
94 Ok(())
95 }
96
97 fn next(&self, ctx: &Context) -> Result<bool> {
98 let task = ctx.task();
99 let state = task.state();
100 let mut is_next: bool = false;
101 if state.is_running() {
102 let tasks = task.children();
103 let mut count = 0;
104
105 for task in tasks.iter() {
106 if task.state().is_none() || task.state().is_running() {
107 is_next = true;
108 } else if task.state().is_pending() && task.is_ready() {
109 task.set_state(TaskState::Running);
111 ctx.runtime.scher().emit_task_event(task)?;
112
113 task.exec(ctx)?;
114 is_next = true;
115 }
116 if task.state().is_completed() {
117 count += 1;
118 }
119 }
120
121 if count == tasks.len() {
122 if task.is_auto_complete() && !task.state().is_completed() {
123 task.set_state(TaskState::Completed);
124 }
125
126 if let Some(next) = &task.node.next().upgrade() {
127 ctx.sched_task(next);
128 return Ok(true);
129 }
130 }
131 } else if (state.is_skip() || state.is_success())
132 && let Some(next) = &task.node.next().upgrade()
133 {
134 ctx.sched_task(next);
135 return Ok(true);
136 }
137 Ok(is_next)
138 }
139
140 fn review(&self, ctx: &Context) -> Result<bool> {
141 let task = ctx.task();
142 let state = task.state();
143 if state.is_running() {
144 let tasks = task.children();
145 let mut count = 0;
146 for t in tasks.iter() {
147 if t.state().is_error() {
148 ctx.emit_error()?;
149 return Ok(false);
150 }
151 if t.state().is_skip() {
152 task.set_state(TaskState::Skipped);
153 return Ok(true);
154 }
155
156 if t.state().is_success() {
157 count += 1;
158 }
159 }
160
161 if count == tasks.len() {
162 if !task.state().is_completed() {
163 task.set_state(TaskState::Completed);
164 }
165
166 if let Some(next) = &task.node.next().upgrade() {
167 ctx.sched_task(next);
168 return Ok(false);
169 }
170 return Ok(true);
171 }
172 }
173
174 Ok(false)
175 }
176}
177
178impl Act {
179 pub fn dispatch(&self, ctx: &Context, is_hook_event: bool) -> Result<()> {
180 let mut act = self.clone();
182 if let Some(v) = ctx.get_var::<u32>(consts::ACT_INDEX) {
183 act.inputs.set(consts::ACT_INDEX, v);
184 }
185
186 if let Some(v) = ctx.get_var::<String>(consts::ACT_VALUE) {
187 act.inputs.set(consts::ACT_VALUE, v);
188 }
189
190 ctx.dispatch_act(self, is_hook_event)?;
191 Ok(())
192 }
193}