Skip to main content

ui_automata/
executor.rs

1use std::thread;
2use std::time::{Duration, Instant};
3
4use std::collections::HashMap;
5
6use crate::{
7    Action, AnchorDef, AutomataError, Desktop, OnFailure, Plan, RecoveryHandler, ResumeStrategy,
8    RetryPolicy, ShadowDom, Step, output::Output, step::OnSuccess,
9};
10
11const POLL_INTERVAL: Duration = Duration::from_millis(100);
12
13#[derive(PartialEq, Eq)]
14enum StepOutcome {
15    Continue,
16    ReturnPhase,
17}
18
19/// Workflow-level mutable state: output, locals, and per-run flags.
20/// Separated from `Executor` so it can be created, passed around, and
21/// returned independently (e.g. across subflow invocations).
22#[derive(Debug)]
23pub struct WorkflowState {
24    /// Resolved param values for this workflow invocation. Read-only during execution;
25    /// accessible in `Eval` expressions as `param.key`.
26    pub params: HashMap<String, String>,
27    /// Workflow-local values from `Extract { local: true }`. Not propagated to
28    /// parent workflows. Accessible via `{output.<key>}` within this workflow.
29    pub locals: HashMap<String, String>,
30    /// Accumulated output from `Extract` actions. Read after the workflow completes.
31    /// Propagated to parent workflows via output.merge. Accessible via `{output.<key>}`.
32    pub output: Output,
33    /// When `false`, the post-action DOM snapshot is skipped entirely.
34    /// Set via `defaults.action_snapshot: false` in the workflow YAML.
35    pub action_snapshot: bool,
36}
37
38impl WorkflowState {
39    pub fn new(action_snapshot: bool) -> Self {
40        Self {
41            locals: HashMap::new(),
42            output: Output::new(),
43            params: HashMap::new(),
44            action_snapshot,
45        }
46    }
47}
48
49/// Runs plans against a live UIA desktop.
50///
51/// Owns the `ShadowDom` (element handle cache) and the desktop. Global recovery
52/// handlers fire for every plan; plan-local handlers fire only within their plan.
53pub struct Executor<D: Desktop> {
54    pub dom: ShadowDom<D>,
55    pub desktop: D,
56    pub global_handlers: Vec<RecoveryHandler>,
57}
58
59impl<D: Desktop> Executor<D> {
60    pub fn new(desktop: D) -> Self {
61        Self {
62            dom: ShadowDom::new(),
63            desktop,
64            global_handlers: vec![],
65        }
66    }
67
68    /// Register anchor definitions.
69    pub fn mount(&mut self, anchors: Vec<AnchorDef>) -> Result<(), AutomataError> {
70        self.dom.mount(anchors, &self.desktop)
71    }
72
73    /// Unmount anchors by name.
74    pub fn unmount(&mut self, names: &[&str]) {
75        self.dom.unmount(names, &self.desktop);
76    }
77
78    /// Clean up DOM anchors at `depth`.
79    pub fn cleanup_depth(&mut self, depth: usize) {
80        self.dom.cleanup_depth(depth, &self.desktop);
81    }
82
83    /// Run all steps of a plan in order.
84    ///
85    /// Anchors listed in `plan.unmount` are always removed after the plan
86    /// completes, whether it succeeds or fails (guaranteed cleanup).
87    pub fn run(&mut self, plan: &Plan<'_>, state: &mut WorkflowState) -> Result<(), AutomataError> {
88        self.log_info(&format!("plan: {}", plan.name));
89        let total = plan.steps.len();
90        let mut recovery_count: u32 = 0;
91        let result = (|| {
92            for (i, step) in plan.steps.iter().enumerate() {
93                let outcome = self.run_step(
94                    step,
95                    &plan.recovery_handlers,
96                    plan.max_recoveries,
97                    &mut recovery_count,
98                    i + 1,
99                    total,
100                    plan.default_timeout,
101                    &plan.default_retry,
102                    state,
103                )?;
104                if outcome == StepOutcome::ReturnPhase {
105                    break;
106                }
107            }
108            Ok(())
109        })();
110        if !plan.unmount.is_empty() {
111            let names: Vec<&str> = plan.unmount.iter().map(String::as_str).collect();
112            self.unmount(&names);
113        }
114        result
115    }
116
117    // ── Step execution ────────────────────────────────────────────────────────
118
119    fn run_step(
120        &mut self,
121        step: &Step,
122        local_handlers: &[RecoveryHandler],
123        max_recoveries: u32,
124        recovery_count: &mut u32,
125        step_num: usize,
126        total: usize,
127        default_timeout: Duration,
128        default_retry: &RetryPolicy,
129        state: &mut WorkflowState,
130    ) -> Result<StepOutcome, AutomataError> {
131        let prefix = format!("step {step_num}/{total}");
132        let label = format!("{prefix} '{}'", step.intent);
133        self.log_info(&label);
134
135        let timeout = step.timeout.unwrap_or(default_timeout);
136        let retry = match &step.retry {
137            RetryPolicy::None => default_retry,
138            policy => policy,
139        };
140
141        if let Some(pre) = &step.precondition {
142            let pre_desc = pre.describe();
143            log::debug!("precondition: {pre_desc}");
144            if !self.eval(pre, state)? {
145                log::debug!("{prefix}: precondition not satisfied, skipping");
146                return Ok(StepOutcome::Continue);
147            }
148        }
149
150        let action = step.action.apply_output(&state.locals, &state.output);
151        let expect = step.expect.apply_output(&state.locals, &state.output);
152
153        let cond_desc = expect.describe();
154        let action_desc = action.describe();
155
156        let mut attempts: u32 = 0;
157        let mut last_action_error: Option<String>;
158        loop {
159            last_action_error = None; // reset each attempt; prior attempt's error must not bleed into this one
160            log::debug!("action: {action_desc}");
161            let action_result = self.exec(&action, state);
162            match &action_result {
163                Ok(()) => log::debug!("action → Ok"),
164                Err(e) => {
165                    let msg = e.to_string();
166                    // Downgrade to debug when on_failure=continue — the caller
167                    // expects failure and the step-level outcome handles it.
168                    match &step.on_failure {
169                        OnFailure::Continue => {
170                            log::debug!("{label}: action → Err: {msg}");
171                        }
172                        OnFailure::Abort => {
173                            self.log_warn(&format!("{label}: action → Err: {msg}"));
174                        }
175                    }
176                    last_action_error = Some(msg);
177                }
178            }
179            // Sync once after the action so the trace captures what changed.
180            // Skipped when action_snapshot is false (e.g. complex windows with deep trees).
181            if state.action_snapshot {
182                if let Some(scope) = expect.scope_name() {
183                    self.dom.sync(scope, &self.desktop);
184                }
185            }
186
187            let deadline = Instant::now() + timeout;
188            let mut last_poll: Option<bool> = None;
189            loop {
190                let satisfied = self.eval(&expect, state)?;
191                if last_poll != Some(satisfied) {
192                    log::debug!("poll: {cond_desc} → {satisfied}");
193                    last_poll = Some(satisfied);
194                }
195                if satisfied {
196                    // Action error prevents success — fall through to retry/recovery
197                    // rather than returning immediately, so those mechanisms can fire.
198                    // Exception: on_failure=continue explicitly opts out of this.
199                    if let (Some(_), OnFailure::Abort) = (&last_action_error, &step.on_failure) {
200                        break;
201                    }
202                    self.log_info(&format!("{prefix}: ok"));
203                    return Ok(match step.on_success {
204                        OnSuccess::Continue => StepOutcome::Continue,
205                        OnSuccess::ReturnPhase => {
206                            log::debug!("{prefix}: on_success=return_phase, stopping phase");
207                            StepOutcome::ReturnPhase
208                        }
209                    });
210                }
211                if Instant::now() >= deadline {
212                    break;
213                }
214                thread::sleep(POLL_INTERVAL);
215            }
216
217            let timeout_msg = format!(
218                "{label}: timed out (attempt {}), checking recovery",
219                attempts + 1
220            );
221            self.log_warn(&timeout_msg);
222
223            let all: Vec<(String, crate::Condition, Vec<Action>, ResumeStrategy)> = local_handlers
224                .iter()
225                .chain(self.global_handlers.iter())
226                .map(|h| {
227                    (
228                        h.name.clone(),
229                        h.trigger.clone(),
230                        h.actions.clone(),
231                        h.resume,
232                    )
233                })
234                .collect();
235
236            let mut fired: Option<(String, Vec<Action>, ResumeStrategy)> = None;
237            for (name, trigger, actions, resume) in all {
238                if self.eval(&trigger, state)? {
239                    fired = Some((name, actions, resume));
240                    break;
241                }
242            }
243
244            match fired {
245                Some((name, actions, resume)) if *recovery_count < max_recoveries => {
246                    *recovery_count += 1;
247                    self.log_info(&format!(
248                        "{label}: recovery handler '{name}' fired ({recovery_count}/{max_recoveries})"
249                    ));
250                    for action in &actions {
251                        let rdesc = action.describe();
252                        log::debug!("recovery action: {rdesc}");
253                        if let Err(e) = self.exec(action, state) {
254                            log::debug!("{label}: recovery action → Err: {e}");
255                        } else {
256                            log::debug!("recovery action → Ok");
257                        }
258                    }
259                    match resume {
260                        ResumeStrategy::RetryStep => {
261                            attempts += 1;
262                            continue;
263                        }
264                        ResumeStrategy::SkipStep => {
265                            self.log_info(&format!("{label}: skipped by recovery"));
266                            return Ok(StepOutcome::Continue);
267                        }
268                        ResumeStrategy::Fail => {
269                            let msg = format!("{label}: recovery handler '{name}' instructed Fail");
270                            log::debug!("{msg}");
271                            return Err(AutomataError::Internal(msg));
272                        }
273                    }
274                }
275                Some((name, _, _)) => {
276                    let msg = format!(
277                        "{label}: recovery handler '{name}' would fire but max_recoveries ({max_recoveries}) reached"
278                    );
279                    self.log_warn(&msg);
280                    return self
281                        .apply_on_failure_policy(step, &label, &expect, timeout, msg, state);
282                }
283                None => match retry {
284                    RetryPolicy::Fixed { count, delay } if attempts < *count => {
285                        attempts += 1;
286                        thread::sleep(*delay);
287                        continue;
288                    }
289                    _ => {
290                        let msg = match &last_action_error {
291                            Some(e) => format!(
292                                "{label}: timed out after {} attempt(s)\n  action error: {e}\n  expect: {cond_desc}",
293                                attempts + 1
294                            ),
295                            None => format!(
296                                "{label}: timed out after {} attempt(s)\n  expect: {cond_desc}",
297                                attempts + 1
298                            ),
299                        };
300                        log::debug!("{msg}");
301                        return self
302                            .apply_on_failure_policy(step, &label, &expect, timeout, msg, state);
303                    }
304                },
305            }
306        }
307    }
308
309    // ── Helpers ───────────────────────────────────────────────────────────────
310
311    /// Try `fallback` (if any), re-poll `expect`, then apply `on_failure` policy.
312    fn apply_on_failure_policy(
313        &mut self,
314        step: &Step,
315        label: &str,
316        expect: &crate::Condition,
317        timeout: Duration,
318        failure_msg: String,
319        state: &mut WorkflowState,
320    ) -> Result<StepOutcome, AutomataError> {
321        if let Some(fallback) = &step.fallback {
322            self.log_info(&format!("{label}: trying fallback action"));
323            if let Err(e) = self.exec(fallback, state) {
324                log::debug!("{label}: fallback action → Err: {e}");
325            }
326            // Re-poll expect with a fresh timeout.
327            let deadline = Instant::now() + timeout;
328            loop {
329                if self.eval(expect, state)? {
330                    self.log_info(&format!("{label}: fallback succeeded"));
331                    return Ok(match step.on_success {
332                        OnSuccess::Continue => StepOutcome::Continue,
333                        OnSuccess::ReturnPhase => {
334                            log::debug!("{label}: on_success=return_phase, stopping phase");
335                            StepOutcome::ReturnPhase
336                        }
337                    });
338                }
339                if Instant::now() >= deadline {
340                    break;
341                }
342                thread::sleep(POLL_INTERVAL);
343            }
344            self.log_warn(&format!("{label}: fallback did not satisfy expect"));
345        }
346        match &step.on_failure {
347            OnFailure::Abort => Err(AutomataError::Internal(failure_msg)),
348            OnFailure::Continue => {
349                self.log_warn(&format!("{label}: on_failure=continue, proceeding"));
350                Ok(StepOutcome::Continue)
351            }
352        }
353    }
354
355    fn exec(&mut self, action: &Action, state: &mut WorkflowState) -> Result<(), AutomataError> {
356        action.execute(
357            &mut self.dom,
358            &self.desktop,
359            &mut state.output,
360            &mut state.locals,
361            &state.params,
362        )
363    }
364
365    fn eval(
366        &mut self,
367        cond: &crate::Condition,
368        state: &WorkflowState,
369    ) -> Result<bool, AutomataError> {
370        cond.evaluate(
371            &mut self.dom,
372            &self.desktop,
373            &state.locals,
374            &state.params,
375            &state.output,
376        )
377    }
378
379    /// Evaluate a condition against the current DOM state. Used by `WorkflowFile::run()`
380    /// for phase-level preconditions before mounting anchors.
381    pub fn eval_condition(
382        &mut self,
383        cond: &crate::Condition,
384        locals: &std::collections::HashMap<String, String>,
385        params: &std::collections::HashMap<String, String>,
386        output: &crate::output::Output,
387    ) -> Result<bool, AutomataError> {
388        cond.evaluate(&mut self.dom, &self.desktop, locals, params, output)
389    }
390
391    fn log_info(&self, msg: &str) {
392        log::info!("{msg}");
393    }
394
395    fn log_warn(&self, msg: &str) {
396        log::warn!("{msg}");
397    }
398}