forester_rs/runtime/forester/
flow.rs

1use crate::runtime::args::{RtArgs, RtValue};
2use crate::runtime::context::{RNodeState, TreeContext};
3use crate::runtime::rtree::rnode::FlowType;
4use crate::runtime::{RtResult, RuntimeError, TickResult};
5use std::cmp::max;
6use FlowDecision::{Halt, PopNode, Stay};
7
8type HaltingChildCursor = usize;
9
10// current child
11pub const CURSOR: &str = "cursor";
12// the child len
13pub const LEN: &str = "len";
14
15// the current cursor
16// when the process is torn up(the child returns running or in seq is failure etc)
17pub const P_CURSOR: &str = "prev_cursor";
18// reason for the failure
19pub const REASON: &str = "reason";
20// Record of the currently running child, in case a reactive flow node needs to halt it.
21// This typically stores the same information as P_CURSOR, but doesn't affect read_cursor() results.
22pub const RUNNING_CHILD: &str = "running_child";
23
24// the list of children and states, where
25// 0 is ready,
26// 1 is running,
27// 2 is failure,
28// 3 is success
29pub const CHILDREN: &str = "children";
30
31pub fn run_with(tick_args: RtArgs, cursor: i64, len: i64) -> RtArgs {
32    debug!(target:"params", "{}, cur:{cursor}, len:{len}", tick_args);
33    tick_args
34        .with(CURSOR, RtValue::int(cursor))
35        .with(LEN, RtValue::int(len))
36}
37
38// parallel node needs to know the previous state of the children.
39// It acts non reactively
40// therefore if there is a previous state it tries to find a child that either running or ready
41pub fn run_with_par(tick_args: RtArgs, len: i64) -> RtArgs {
42    let prev_states = read_children_state(tick_args.clone());
43    if prev_states.is_empty() {
44        // the first time we create the children array
45        run_with(
46            tick_args.with(
47                CHILDREN,
48                RtValue::Array(vec![RtValue::int(0); len as usize]),
49            ),
50            0,
51            len,
52        )
53    } else {
54        run_with(tick_args.clone(), read_cursor(tick_args).unwrap_or(0), len)
55    }
56}
57
58pub(crate) fn read_len_or_zero(args: RtArgs) -> i64 {
59    args.find(LEN.to_string())
60        .and_then(|v| v.as_int())
61        .unwrap_or(0)
62}
63
64// read and compare the cursor and prev_cursor
65// if both are present, return the max
66// if only one is present, return it
67// if none is present, return 0
68pub(crate) fn read_cursor(tick_args: RtArgs) -> RtResult<i64> {
69    let p_cursor = tick_args.find(CURSOR.to_string()).and_then(RtValue::as_int);
70    let cursor = tick_args
71        .find(P_CURSOR.to_string())
72        .and_then(RtValue::as_int);
73
74    match (cursor, p_cursor) {
75        (Some(lhs), Some(rhs)) => Ok(max(lhs, rhs)),
76        (None, Some(v)) | (Some(v), None) => Ok(v),
77        _ => Ok(0),
78    }
79}
80
81/// Shortest version of TickResult, containing only finished statuses.
82pub enum TickResultFin {
83    Failure(String),
84    Success,
85}
86
87impl TryFrom<RNodeState> for TickResultFin {
88    type Error = RuntimeError;
89
90    fn try_from(value: RNodeState) -> Result<Self, Self::Error> {
91        match value {
92            RNodeState::Success(_) => Ok(TickResultFin::Success),
93            RNodeState::Failure(v) => {
94                let r = v
95                    .find(REASON.to_string())
96                    .and_then(RtValue::as_string)
97                    .unwrap_or_default();
98                Ok(TickResultFin::Failure(r))
99            }
100            _ => Err(RuntimeError::uex("running is unexpected".to_string())),
101        }
102    }
103}
104
105impl Into<TickResult> for TickResultFin {
106    fn into(self) -> TickResult {
107        match self {
108            TickResultFin::Failure(v) => TickResult::Failure(v),
109            TickResultFin::Success => TickResult::Success,
110        }
111    }
112}
113
114// It starts when the child is finished and the flow needs to go farther.
115pub fn finalize(
116    tpe: &FlowType,
117    _args: RtArgs,
118    tick_args: RtArgs,
119    res: TickResultFin,
120    _ctx: &mut TreeContext,
121) -> RtResult<FlowDecision> {
122    match tpe {
123        FlowType::Root => Ok(Stay(RNodeState::from(
124            run_with(tick_args, 0, 1),
125            res.into(),
126        ))),
127        FlowType::Sequence | FlowType::RSequence => {
128            let cursor = read_cursor(tick_args.clone())?;
129            let len = read_len_or_zero(tick_args.clone());
130            let running_child = tick_args
131                .find(RUNNING_CHILD.to_string())
132                .and_then(RtValue::as_int);
133
134            // There's only one scenario where we don't remove RUNNING_CHILD, we'll re-add it if that's the case.
135            let mut args = tick_args.remove(RUNNING_CHILD).remove(P_CURSOR);
136
137            match res {
138                TickResultFin::Failure(v) => {
139                    let args = args.with(REASON, RtValue::str(v));
140
141                    // Failure will interrupt a reactive sequence, check if we need to halt a running child
142                    if let Some(running) = running_child {
143                        if running > cursor {
144                            // This failure result needs to interrupt the running child.
145                            // Note non-reactive sequences will always have running == p_cursor == cursor, so this will be unreachable for them.
146                            return Ok(Halt(
147                                RNodeState::Failure(run_with(args, cursor, len)),
148                                running as usize,
149                            ));
150                        }
151                    }
152
153                    Ok(Stay(RNodeState::Failure(run_with(args, cursor, len))))
154                }
155                TickResultFin::Success => {
156                    if cursor == len - 1 {
157                        Ok(Stay(RNodeState::Success(run_with(args, cursor, len))))
158                    } else {
159                        if let Some(running) = running_child {
160                            if running > cursor {
161                                // We haven't reached the previously running child yet, re-add it.
162                                args = args.with(RUNNING_CHILD, RtValue::int(running))
163                            }
164                        }
165                        Ok(Stay(RNodeState::Running(run_with(args, cursor + 1, len))))
166                    }
167                }
168            }
169        }
170        FlowType::MSequence => {
171            let cursor = read_cursor(tick_args.clone())?;
172            let len = read_len_or_zero(tick_args.clone());
173            let running_child = tick_args
174                .find(RUNNING_CHILD.to_string())
175                .and_then(RtValue::as_int);
176
177            // There's only one scenario where we don't remove RUNNING_CHILD, we'll re-add it if that's the case.
178            let mut args = tick_args.remove(RUNNING_CHILD);
179
180            match res {
181                TickResultFin::Failure(v) => {
182                    let args = run_with(args.with(P_CURSOR, RtValue::int(cursor)), cursor, len)
183                        .with(REASON, RtValue::str(v));
184
185                    Ok(Stay(RNodeState::Failure(args)))
186                }
187                TickResultFin::Success => {
188                    if cursor == len - 1 {
189                        // Remove P_CURSOR so that the next tick will start from the beginning
190                        let args = args.remove(P_CURSOR);
191                        Ok(Stay(RNodeState::Success(run_with(args, cursor, len))))
192                    } else {
193                        if let Some(running) = running_child {
194                            if running > cursor {
195                                // We haven't reached the previously running child yet, re-add it.
196                                args = args.with(RUNNING_CHILD, RtValue::int(running))
197                            }
198                        }
199                        Ok(Stay(RNodeState::Running(run_with(args, cursor + 1, len))))
200                    }
201                }
202            }
203        }
204
205        FlowType::Fallback | FlowType::RFallback => {
206            let cursor = read_cursor(tick_args.clone())?;
207            let len = read_len_or_zero(tick_args.clone());
208            let running_child = tick_args
209                .find(RUNNING_CHILD.to_string())
210                .and_then(RtValue::as_int);
211
212            // There's only one scenario where we don't remove RUNNING_CHILD, we'll re-add it if that's the case.
213            let mut args = tick_args.remove(RUNNING_CHILD).remove(P_CURSOR);
214
215            match res {
216                TickResultFin::Failure(v) => {
217                    if cursor == len - 1 {
218                        let args = args.with(REASON, RtValue::str(v));
219                        Ok(Stay(RNodeState::Failure(run_with(args, cursor, len))))
220                    } else {
221                        if let Some(running) = running_child {
222                            if running > cursor {
223                                // We haven't reached the previously running child yet, re-add it.
224                                args = args.with(RUNNING_CHILD, RtValue::int(running))
225                            }
226                        }
227                        Ok(Stay(RNodeState::Running(run_with(args, cursor + 1, len))))
228                    }
229                }
230                TickResultFin::Success => {
231                    // Success will interrupt a reactive fallback, check if we need to halt a running child
232                    if let Some(running) = running_child {
233                        if running > cursor {
234                            // This success result needs to interrupt the running child.
235                            // Note non-reactive fallbacks will always have running == p_cursor == cursor, so this will be unreachable for them.
236                            return Ok(Halt(
237                                RNodeState::Success(run_with(args, cursor, len)),
238                                running as usize,
239                            ));
240                        }
241                    }
242
243                    // This success result is just like any other fallback success
244                    Ok(Stay(RNodeState::Success(run_with(args, cursor, len))))
245                }
246            }
247        }
248        FlowType::Parallel => {
249            let cursor = read_cursor(tick_args.clone())?;
250            let len = read_len_or_zero(tick_args.clone());
251            let st = match res {
252                TickResultFin::Failure(_) => 2,
253                TickResultFin::Success => 3,
254            };
255            let tick_args = replace_child_state(tick_args, cursor as usize, st);
256            let children = read_children_state(tick_args.clone());
257            // if some child is running or ready, we continue
258            if let Some(idx) = find_next_idx(&children, cursor) {
259                Ok(Stay(RNodeState::Running(
260                    tick_args.with(CURSOR, RtValue::int(idx as i64)),
261                )))
262            } else {
263                if children.contains(&1) || children.contains(&0) {
264                    let next_cursor = find_first_idx(&children, cursor).unwrap_or(0);
265                    let next_state = RNodeState::Running(
266                        run_with(tick_args, next_cursor as i64, len)
267                            // reset the prev cursor otherwise it weill be greater then the current cursor and the prev one will be taken
268                            .with(P_CURSOR, RtValue::int(0i64)),
269                    );
270                    // we know there are some nodes needs to be run but they are behind so we can touch them in the next tick only.
271                    // And we pop up the node to allow the next tick to run the children
272                    // if we stay the running nodes will be touched in the same tick
273                    Ok(PopNode(next_state))
274                } else if children.contains(&2) {
275                    let args = run_with(tick_args, cursor, len)
276                        .with(REASON, RtValue::str("parallel failure".to_string()))
277                        .remove(CHILDREN);
278                    // we stay allowing to remove us on the next iteration of the loop
279                    Ok(Stay(RNodeState::Failure(args)))
280                } else {
281                    // we stay allowing to remove us on the next iteration of the loop
282                    Ok(Stay(RNodeState::Success(
283                        run_with(tick_args, cursor, len).remove(CHILDREN),
284                    )))
285                }
286            }
287        }
288    }
289}
290
291// it starts when the child returns running.
292// This stage handles some peculiarities with the tearing state up and etc
293pub fn monitor(
294    tpe: &FlowType,
295    _args: RtArgs,
296    tick_args: RtArgs,
297    _ctx: &mut TreeContext,
298) -> RtResult<FlowDecision> {
299    match tpe {
300        FlowType::RSequence | FlowType::RFallback => {
301            // RSequence and RFallback don't use P_CURSOR
302            // let's get the cursor manually so P_CURSOR doesn't accidentially poison our result.
303            let cursor = tick_args
304                .find(CURSOR.to_string())
305                .and_then(RtValue::as_int)
306                .unwrap_or(0);
307            let previous_running_child = tick_args
308                .find(RUNNING_CHILD.to_string())
309                .and_then(RtValue::as_int);
310
311            let new_state =
312                RNodeState::Running(tick_args.with(RUNNING_CHILD, RtValue::int(cursor)));
313
314            // Check there isn't another child already being tracked by the RUNNING_CHILD key.
315            // If there is we'll need to halt that before going any further.
316            if let Some(prev_running_child_cursor) = previous_running_child {
317                if prev_running_child_cursor > cursor {
318                    return Ok(Halt(new_state, prev_running_child_cursor as usize));
319                }
320            }
321
322            // There was no other previously running child, continue as normal
323            Ok(PopNode(new_state))
324        }
325        FlowType::Sequence | FlowType::MSequence | FlowType::Fallback => {
326            let cursor = read_cursor(tick_args.clone())?;
327            Ok(PopNode(RNodeState::Running(
328                tick_args
329                    .with(RUNNING_CHILD, RtValue::int(cursor))
330                    .with(P_CURSOR, RtValue::int(cursor)),
331            )))
332        }
333        FlowType::Parallel => {
334            let cursor = read_cursor(tick_args.clone())?;
335            let new_args = replace_child_state(
336                tick_args.with(P_CURSOR, RtValue::int(cursor)),
337                cursor as usize,
338                1,
339            );
340            let children = read_children_state(new_args.clone());
341            if let Some(idx) = find_next_idx(&children, cursor) {
342                Ok(Stay(RNodeState::Running(
343                    new_args.with(CURSOR, RtValue::int(idx as i64)),
344                )))
345            } else {
346                Ok(PopNode(RNodeState::Running(new_args)))
347            }
348        }
349        _ => Ok(PopNode(RNodeState::Running(tick_args))),
350    }
351}
352
353// Handle ticking a flow node with the state "Halting".
354// Returns a tuple of the new state and the cursor position of the child to be halted, if one exists.
355pub fn halt(flow_type: &FlowType, tick_args: RtArgs) -> (RNodeState, Option<usize>) {
356    match flow_type {
357        FlowType::Sequence
358        | FlowType::MSequence
359        | FlowType::RSequence
360        | FlowType::Fallback
361        | FlowType::RFallback => {
362            // Sequence/fallback nodes check if they need to halt a child at the RUNNING_CHILD cursor position.
363            let running_child_cursor = tick_args
364                .find(RUNNING_CHILD.to_string())
365                .and_then(RtValue::as_int)
366                .map(|v| v as usize);
367            let mut args = tick_args.remove(RUNNING_CHILD);
368
369            // MSequence needs to keep its position, but other nodes don't.
370            if flow_type != &FlowType::MSequence {
371                args = args.remove(P_CURSOR);
372            }
373
374            let new_state = RNodeState::Ready(args);
375            (new_state, running_child_cursor)
376        }
377        _ => (RNodeState::Ready(tick_args), None),
378    }
379}
380
381// decision impacts on the case when we decide if we stay on the node
382// and go farther down or we climb up the tree
383// basically it processes the case when we have a running child before and after cursor.
384// in the latter we stay and in the former we climb up and eventually end up ticking the root
385#[derive(Debug, Clone)]
386pub enum FlowDecision {
387    PopNode(RNodeState),
388    Stay(RNodeState),
389    Halt(RNodeState, HaltingChildCursor),
390}
391
392fn replace_child_state(args: RtArgs, idx: usize, v: i64) -> RtArgs {
393    let args = args;
394    let mut elems = read_children_state(args.clone());
395    debug!(target:"params in child", "prev : [{args}], idx:{idx}, new state: {v}");
396    elems[idx] = v;
397    args.with(
398        CHILDREN,
399        RtValue::Array(elems.into_iter().map(RtValue::int).collect()),
400    )
401}
402
403fn read_children_state(args: RtArgs) -> Vec<i64> {
404    args.find(CHILDREN.to_string())
405        .and_then(|v| v.as_vec(|v| v.as_int().unwrap()))
406        .unwrap_or_default()
407}
408
409// find the next idx that is either running or ready
410fn find_next_idx(children: &Vec<i64>, current: i64) -> Option<usize> {
411    find_pos(children, current + 1, children.len() as i64)
412}
413
414// find the next idx that is either running or ready before the current idx
415fn find_first_idx(children: &Vec<i64>, current: i64) -> Option<usize> {
416    find_pos(children, 0, current)
417}
418
419fn find_pos(children: &Vec<i64>, low: i64, high: i64) -> Option<usize> {
420    let mut cursor = low as usize;
421    let mut next_idx = None;
422    while cursor < high as usize {
423        if children[cursor] == 0 || children[cursor] == 1 {
424            next_idx = Some(cursor);
425            break;
426        }
427        cursor = cursor + 1;
428    }
429    next_idx
430}