acts_next/scheduler/process/task/
act.rs

1mod block;
2mod call;
3mod cmd;
4mod irq;
5mod pack;
6
7use super::TaskLifeCycle;
8use crate::{
9    scheduler::Context,
10    utils::{self, consts},
11    Act, ActError, ActFn, ActTask, Result, TaskState, Vars,
12};
13use async_trait::async_trait;
14use std::{cell::RefCell, rc::Rc};
15use tracing::debug;
16
17#[async_trait]
18impl ActTask for Act {
19    fn init(&self, ctx: &Context) -> Result<()> {
20        let task = ctx.task();
21        for s in self.catches.iter() {
22            task.add_hook_catch(TaskLifeCycle::ErrorCatch, s);
23        }
24
25        if !self.timeout.is_empty() {
26            for s in &self.timeout {
27                task.add_hook_timeout(TaskLifeCycle::Timeout, s);
28            }
29        }
30
31        // run setup
32        if !self.setup.is_empty() {
33            for act in &self.setup {
34                act.exec(ctx)?;
35            }
36        }
37
38        let func: ActFn = self.into();
39        match func {
40            ActFn::Irq(irq) => irq.init(ctx),
41            ActFn::Call(u) => u.init(ctx),
42            ActFn::Block(b) => b.init(ctx),
43            ActFn::Pack(p) => p.init(ctx),
44            _ => Ok(()),
45        }
46    }
47
48    fn run(&self, ctx: &Context) -> Result<()> {
49        let func: ActFn = self.into();
50        match func {
51            ActFn::Irq(req) => req.run(ctx),
52            ActFn::Call(u) => u.run(ctx),
53            ActFn::Block(b) => b.run(ctx),
54            ActFn::Pack(p) => p.run(ctx),
55            _ => Ok(()),
56        }
57    }
58
59    fn next(&self, ctx: &Context) -> Result<bool> {
60        let func: ActFn = self.into();
61        match func {
62            ActFn::Irq(req) => req.next(ctx),
63            ActFn::Call(u) => u.next(ctx),
64            ActFn::Block(b) => b.next(ctx),
65            ActFn::Pack(p) => p.next(ctx),
66            _ => Ok(false),
67        }
68    }
69
70    fn review(&self, ctx: &Context) -> Result<bool> {
71        let func: ActFn = self.into();
72        match func {
73            ActFn::Irq(req) => req.review(ctx),
74            ActFn::Call(u) => u.review(ctx),
75            ActFn::Block(b) => b.review(ctx),
76            ActFn::Pack(p) => p.review(ctx),
77            _ => Ok(true),
78        }
79    }
80}
81
82impl Act {
83    pub fn exec(&self, ctx: &Context) -> Result<()> {
84        let task = ctx.task();
85        debug!("act.exec task={}", task.id);
86        let act_fn = self.into();
87        match act_fn {
88            ActFn::Set(vars) => {
89                let inputs = utils::fill_inputs(&vars, ctx);
90                task.update_data(&inputs);
91            }
92            ActFn::Expose(vars) => {
93                let outputs = utils::fill_outputs(&vars, ctx);
94                // expose the vars to outputs
95                task.set_data_with(move |data| data.set(consts::ACT_OUTPUTS, &outputs));
96            }
97            ActFn::Irq(_) => {
98                let mut req = self.clone();
99                if let Some(v) = ctx.get_var::<u32>(consts::ACT_INDEX) {
100                    req.inputs.set(consts::ACT_INDEX, v);
101                }
102
103                if let Some(v) = ctx.get_var::<String>(consts::ACT_VALUE) {
104                    req.inputs.set(consts::ACT_VALUE, v);
105                }
106                if req.key.is_empty() {
107                    return Err(ActError::Action(format!(
108                        "not found 'key' in act({})",
109                        self.id
110                    )));
111                }
112                ctx.append_act(&req)?;
113            }
114            ActFn::Msg(_) => {
115                let mut msg = self.clone();
116                if let Some(v) = ctx.get_var::<u32>(consts::ACT_INDEX) {
117                    msg.inputs.set(consts::ACT_INDEX, v);
118                }
119
120                if let Some(v) = ctx.get_var::<String>(consts::ACT_VALUE) {
121                    msg.inputs.set(consts::ACT_VALUE, v);
122                }
123                if task.state().is_none() {
124                    task.add_hook_stmts(TaskLifeCycle::Created, &msg);
125                } else {
126                    if msg.key.is_empty() {
127                        return Err(ActError::Action(format!(
128                            "not found 'key' in act({})",
129                            self.id
130                        )));
131                    }
132                    ctx.emit_message(&msg)?;
133                }
134            }
135            ActFn::Cmd(cmd) => {
136                if task.state().is_none() {
137                    task.add_hook_stmts(TaskLifeCycle::Created, &cmd.into());
138                } else if let Err(err) = cmd.run(ctx) {
139                    task.set_state(TaskState::Error);
140                    return Err(err);
141                }
142            }
143            ActFn::Block(_) => {
144                ctx.append_act(self)?;
145            }
146            ActFn::Pack(_) => {
147                ctx.append_act(self)?;
148            }
149            ActFn::If(cond) => {
150                let result = ctx.eval(&cond.on)?;
151                if result {
152                    for s in &cond.then {
153                        s.exec(ctx)?;
154                    }
155                } else {
156                    for s in &cond.r#else {
157                        s.exec(ctx)?;
158                    }
159                }
160            }
161            ActFn::Each(each) => {
162                let cans = each.parse(ctx, &each.r#in)?;
163                for (index, value) in cans.iter().enumerate() {
164                    ctx.set_var(consts::ACT_INDEX, index);
165                    ctx.set_var(consts::ACT_VALUE, value);
166                    for s in &each.then {
167                        s.exec(ctx)?;
168                    }
169                }
170            }
171            ActFn::Chain(chain) => {
172                let cans = chain.parse(ctx, &chain.r#in)?;
173                let stmts = &chain.then;
174                let mut items = cans.iter().enumerate();
175                if let Some((index, value)) = items.next() {
176                    let head = Rc::new(RefCell::new(Act::default()));
177
178                    head.borrow_mut().id = utils::shortid();
179                    head.borrow_mut().act = "block".to_string();
180                    head.borrow_mut().then = stmts.clone();
181                    head.borrow_mut().inputs = Vars::new()
182                        .with(consts::ACT_INDEX, index)
183                        .with(consts::ACT_VALUE, value);
184
185                    let mut pre = head.clone();
186                    for (index, value) in items {
187                        let p = Rc::new(RefCell::new(Act::default()));
188                        p.borrow_mut().id = utils::shortid();
189                        p.borrow_mut().act = "block".to_string();
190                        p.borrow_mut().then = stmts.clone();
191                        p.borrow_mut().inputs = Vars::new()
192                            .with(consts::ACT_INDEX, index)
193                            .with(consts::ACT_VALUE, value);
194
195                        pre.borrow_mut().next = Some(Box::new((*p).clone().into_inner()));
196                        pre = p;
197                    }
198
199                    let act = head.take();
200                    act.exec(ctx)?;
201                }
202            }
203            ActFn::Call(_) => {
204                ctx.append_act(self)?;
205            }
206            ActFn::OnCreated(stmts) => {
207                let task = ctx.task();
208                for s in stmts {
209                    task.add_hook_stmts(TaskLifeCycle::Created, &s);
210                }
211            }
212            ActFn::OnCompleted(stmts) => {
213                let task = ctx.task();
214                for s in stmts {
215                    task.add_hook_stmts(TaskLifeCycle::Completed, &s);
216                }
217            }
218            ActFn::OnBeforeUpdate(stmts) => {
219                let task = ctx.task();
220                for s in stmts {
221                    task.add_hook_stmts(TaskLifeCycle::BeforeUpdate, &s);
222                }
223            }
224            ActFn::OnUpdated(stmts) => {
225                let task = ctx.task();
226                for s in stmts {
227                    task.add_hook_stmts(TaskLifeCycle::Updated, &s);
228                }
229            }
230            ActFn::OnStep(stmts) => {
231                let task = ctx.task();
232                for s in stmts {
233                    task.add_hook_stmts(TaskLifeCycle::Step, &s);
234                }
235            }
236            ActFn::OnErrorCatch(stmts) => {
237                let task = ctx.task();
238                for s in stmts {
239                    task.add_hook_catch(TaskLifeCycle::ErrorCatch, &s);
240                }
241            }
242            ActFn::OnTimeout(stmts) => {
243                let task = ctx.task();
244                for s in stmts {
245                    task.add_hook_timeout(TaskLifeCycle::Timeout, &s);
246                }
247            }
248            ActFn::None => {
249                // ignore
250                return Err(ActError::Action(format!(
251                    "cannot recognize the act({}) as a valid act function",
252                    self.id
253                )));
254            }
255        }
256        Ok(())
257    }
258}