acts_next/scheduler/process/task/
act.rs1mod block;
2mod call;
3mod cmd;
4mod irq;
5mod pack;
6
7use super::TaskLifeCycle;
8use crate::{
9 scheduler::Context,
10 utils::{self, consts},
11 Act, ActError, ActFn, ActTask, Result, TaskState, Vars,
12};
13use async_trait::async_trait;
14use std::{cell::RefCell, rc::Rc};
15use tracing::debug;
16
17#[async_trait]
18impl ActTask for Act {
19 fn init(&self, ctx: &Context) -> Result<()> {
20 let task = ctx.task();
21 for s in self.catches.iter() {
22 task.add_hook_catch(TaskLifeCycle::ErrorCatch, s);
23 }
24
25 if !self.timeout.is_empty() {
26 for s in &self.timeout {
27 task.add_hook_timeout(TaskLifeCycle::Timeout, s);
28 }
29 }
30
31 if !self.setup.is_empty() {
33 for act in &self.setup {
34 act.exec(ctx)?;
35 }
36 }
37
38 let func: ActFn = self.into();
39 match func {
40 ActFn::Irq(irq) => irq.init(ctx),
41 ActFn::Call(u) => u.init(ctx),
42 ActFn::Block(b) => b.init(ctx),
43 ActFn::Pack(p) => p.init(ctx),
44 _ => Ok(()),
45 }
46 }
47
48 fn run(&self, ctx: &Context) -> Result<()> {
49 let func: ActFn = self.into();
50 match func {
51 ActFn::Irq(req) => req.run(ctx),
52 ActFn::Call(u) => u.run(ctx),
53 ActFn::Block(b) => b.run(ctx),
54 ActFn::Pack(p) => p.run(ctx),
55 _ => Ok(()),
56 }
57 }
58
59 fn next(&self, ctx: &Context) -> Result<bool> {
60 let func: ActFn = self.into();
61 match func {
62 ActFn::Irq(req) => req.next(ctx),
63 ActFn::Call(u) => u.next(ctx),
64 ActFn::Block(b) => b.next(ctx),
65 ActFn::Pack(p) => p.next(ctx),
66 _ => Ok(false),
67 }
68 }
69
70 fn review(&self, ctx: &Context) -> Result<bool> {
71 let func: ActFn = self.into();
72 match func {
73 ActFn::Irq(req) => req.review(ctx),
74 ActFn::Call(u) => u.review(ctx),
75 ActFn::Block(b) => b.review(ctx),
76 ActFn::Pack(p) => p.review(ctx),
77 _ => Ok(true),
78 }
79 }
80}
81
82impl Act {
83 pub fn exec(&self, ctx: &Context) -> Result<()> {
84 let task = ctx.task();
85 debug!("act.exec task={}", task.id);
86 let act_fn = self.into();
87 match act_fn {
88 ActFn::Set(vars) => {
89 let inputs = utils::fill_inputs(&vars, ctx);
90 task.update_data(&inputs);
91 }
92 ActFn::Expose(vars) => {
93 let outputs = utils::fill_outputs(&vars, ctx);
94 task.set_data_with(move |data| data.set(consts::ACT_OUTPUTS, &outputs));
96 }
97 ActFn::Irq(_) => {
98 let mut req = self.clone();
99 if let Some(v) = ctx.get_var::<u32>(consts::ACT_INDEX) {
100 req.inputs.set(consts::ACT_INDEX, v);
101 }
102
103 if let Some(v) = ctx.get_var::<String>(consts::ACT_VALUE) {
104 req.inputs.set(consts::ACT_VALUE, v);
105 }
106 if req.key.is_empty() {
107 return Err(ActError::Action(format!(
108 "not found 'key' in act({})",
109 self.id
110 )));
111 }
112 ctx.append_act(&req)?;
113 }
114 ActFn::Msg(_) => {
115 let mut msg = self.clone();
116 if let Some(v) = ctx.get_var::<u32>(consts::ACT_INDEX) {
117 msg.inputs.set(consts::ACT_INDEX, v);
118 }
119
120 if let Some(v) = ctx.get_var::<String>(consts::ACT_VALUE) {
121 msg.inputs.set(consts::ACT_VALUE, v);
122 }
123 if task.state().is_none() {
124 task.add_hook_stmts(TaskLifeCycle::Created, &msg);
125 } else {
126 if msg.key.is_empty() {
127 return Err(ActError::Action(format!(
128 "not found 'key' in act({})",
129 self.id
130 )));
131 }
132 ctx.emit_message(&msg)?;
133 }
134 }
135 ActFn::Cmd(cmd) => {
136 if task.state().is_none() {
137 task.add_hook_stmts(TaskLifeCycle::Created, &cmd.into());
138 } else if let Err(err) = cmd.run(ctx) {
139 task.set_state(TaskState::Error);
140 return Err(err);
141 }
142 }
143 ActFn::Block(_) => {
144 ctx.append_act(self)?;
145 }
146 ActFn::Pack(_) => {
147 ctx.append_act(self)?;
148 }
149 ActFn::If(cond) => {
150 let result = ctx.eval(&cond.on)?;
151 if result {
152 for s in &cond.then {
153 s.exec(ctx)?;
154 }
155 } else {
156 for s in &cond.r#else {
157 s.exec(ctx)?;
158 }
159 }
160 }
161 ActFn::Each(each) => {
162 let cans = each.parse(ctx, &each.r#in)?;
163 for (index, value) in cans.iter().enumerate() {
164 ctx.set_var(consts::ACT_INDEX, index);
165 ctx.set_var(consts::ACT_VALUE, value);
166 for s in &each.then {
167 s.exec(ctx)?;
168 }
169 }
170 }
171 ActFn::Chain(chain) => {
172 let cans = chain.parse(ctx, &chain.r#in)?;
173 let stmts = &chain.then;
174 let mut items = cans.iter().enumerate();
175 if let Some((index, value)) = items.next() {
176 let head = Rc::new(RefCell::new(Act::default()));
177
178 head.borrow_mut().id = utils::shortid();
179 head.borrow_mut().act = "block".to_string();
180 head.borrow_mut().then = stmts.clone();
181 head.borrow_mut().inputs = Vars::new()
182 .with(consts::ACT_INDEX, index)
183 .with(consts::ACT_VALUE, value);
184
185 let mut pre = head.clone();
186 for (index, value) in items {
187 let p = Rc::new(RefCell::new(Act::default()));
188 p.borrow_mut().id = utils::shortid();
189 p.borrow_mut().act = "block".to_string();
190 p.borrow_mut().then = stmts.clone();
191 p.borrow_mut().inputs = Vars::new()
192 .with(consts::ACT_INDEX, index)
193 .with(consts::ACT_VALUE, value);
194
195 pre.borrow_mut().next = Some(Box::new((*p).clone().into_inner()));
196 pre = p;
197 }
198
199 let act = head.take();
200 act.exec(ctx)?;
201 }
202 }
203 ActFn::Call(_) => {
204 ctx.append_act(self)?;
205 }
206 ActFn::OnCreated(stmts) => {
207 let task = ctx.task();
208 for s in stmts {
209 task.add_hook_stmts(TaskLifeCycle::Created, &s);
210 }
211 }
212 ActFn::OnCompleted(stmts) => {
213 let task = ctx.task();
214 for s in stmts {
215 task.add_hook_stmts(TaskLifeCycle::Completed, &s);
216 }
217 }
218 ActFn::OnBeforeUpdate(stmts) => {
219 let task = ctx.task();
220 for s in stmts {
221 task.add_hook_stmts(TaskLifeCycle::BeforeUpdate, &s);
222 }
223 }
224 ActFn::OnUpdated(stmts) => {
225 let task = ctx.task();
226 for s in stmts {
227 task.add_hook_stmts(TaskLifeCycle::Updated, &s);
228 }
229 }
230 ActFn::OnStep(stmts) => {
231 let task = ctx.task();
232 for s in stmts {
233 task.add_hook_stmts(TaskLifeCycle::Step, &s);
234 }
235 }
236 ActFn::OnErrorCatch(stmts) => {
237 let task = ctx.task();
238 for s in stmts {
239 task.add_hook_catch(TaskLifeCycle::ErrorCatch, &s);
240 }
241 }
242 ActFn::OnTimeout(stmts) => {
243 let task = ctx.task();
244 for s in stmts {
245 task.add_hook_timeout(TaskLifeCycle::Timeout, &s);
246 }
247 }
248 ActFn::None => {
249 return Err(ActError::Action(format!(
251 "cannot recognize the act({}) as a valid act function",
252 self.id
253 )));
254 }
255 }
256 Ok(())
257 }
258}