1use std::thread;
2use std::time::{Duration, Instant};
3
4use std::collections::HashMap;
5
6use crate::{
7 Action, AnchorDef, AutomataError, Desktop, OnFailure, Plan, RecoveryHandler, ResumeStrategy,
8 RetryPolicy, ShadowDom, Step, output::Output, step::OnSuccess,
9};
10
11const POLL_INTERVAL: Duration = Duration::from_millis(100);
12
13#[derive(PartialEq, Eq)]
14enum StepOutcome {
15 Continue,
16 ReturnPhase,
17}
18
19#[derive(Debug)]
23pub struct WorkflowState {
24 pub params: HashMap<String, String>,
27 pub locals: HashMap<String, String>,
30 pub output: Output,
33 pub action_snapshot: bool,
36}
37
38impl WorkflowState {
39 pub fn new(action_snapshot: bool) -> Self {
40 Self {
41 locals: HashMap::new(),
42 output: Output::new(),
43 params: HashMap::new(),
44 action_snapshot,
45 }
46 }
47}
48
49pub struct Executor<D: Desktop> {
54 pub dom: ShadowDom<D>,
55 pub desktop: D,
56 pub global_handlers: Vec<RecoveryHandler>,
57}
58
59impl<D: Desktop> Executor<D> {
60 pub fn new(desktop: D) -> Self {
61 Self {
62 dom: ShadowDom::new(),
63 desktop,
64 global_handlers: vec![],
65 }
66 }
67
68 pub fn mount(&mut self, anchors: Vec<AnchorDef>) -> Result<(), AutomataError> {
70 self.dom.mount(anchors, &self.desktop)
71 }
72
73 pub fn unmount(&mut self, names: &[&str]) {
75 self.dom.unmount(names, &self.desktop);
76 }
77
78 pub fn cleanup_depth(&mut self, depth: usize) {
80 self.dom.cleanup_depth(depth, &self.desktop);
81 }
82
83 pub fn run(&mut self, plan: &Plan<'_>, state: &mut WorkflowState) -> Result<(), AutomataError> {
88 self.log_info(&format!("plan: {}", plan.name));
89 let total = plan.steps.len();
90 let mut recovery_count: u32 = 0;
91 let result = (|| {
92 for (i, step) in plan.steps.iter().enumerate() {
93 let outcome = self.run_step(
94 step,
95 &plan.recovery_handlers,
96 plan.max_recoveries,
97 &mut recovery_count,
98 i + 1,
99 total,
100 plan.default_timeout,
101 &plan.default_retry,
102 state,
103 )?;
104 if outcome == StepOutcome::ReturnPhase {
105 break;
106 }
107 }
108 Ok(())
109 })();
110 if !plan.unmount.is_empty() {
111 let names: Vec<&str> = plan.unmount.iter().map(String::as_str).collect();
112 self.unmount(&names);
113 }
114 result
115 }
116
117 fn run_step(
120 &mut self,
121 step: &Step,
122 local_handlers: &[RecoveryHandler],
123 max_recoveries: u32,
124 recovery_count: &mut u32,
125 step_num: usize,
126 total: usize,
127 default_timeout: Duration,
128 default_retry: &RetryPolicy,
129 state: &mut WorkflowState,
130 ) -> Result<StepOutcome, AutomataError> {
131 let prefix = format!("step {step_num}/{total}");
132 let label = format!("{prefix} '{}'", step.intent);
133 self.log_info(&label);
134
135 let timeout = step.timeout.unwrap_or(default_timeout);
136 let retry = match &step.retry {
137 RetryPolicy::None => default_retry,
138 policy => policy,
139 };
140
141 if let Some(pre) = &step.precondition {
142 let pre_desc = pre.describe();
143 log::debug!("precondition: {pre_desc}");
144 if !self.eval(pre, state)? {
145 log::debug!("{prefix}: precondition not satisfied, skipping");
146 return Ok(StepOutcome::Continue);
147 }
148 }
149
150 let action = step.action.apply_output(&state.locals, &state.output);
151 let expect = step.expect.apply_output(&state.locals, &state.output);
152
153 let cond_desc = expect.describe();
154 let action_desc = action.describe();
155
156 let mut attempts: u32 = 0;
157 let mut last_action_error: Option<String>;
158 loop {
159 last_action_error = None; log::debug!("action: {action_desc}");
161 let action_result = self.exec(&action, state);
162 match &action_result {
163 Ok(()) => log::debug!("action → Ok"),
164 Err(e) => {
165 let msg = e.to_string();
166 match &step.on_failure {
169 OnFailure::Continue => {
170 log::debug!("{label}: action → Err: {msg}");
171 }
172 OnFailure::Abort => {
173 self.log_warn(&format!("{label}: action → Err: {msg}"));
174 }
175 }
176 last_action_error = Some(msg);
177 }
178 }
179 if state.action_snapshot {
182 if let Some(scope) = expect.scope_name() {
183 self.dom.sync(scope, &self.desktop);
184 }
185 }
186
187 let deadline = Instant::now() + timeout;
188 let mut last_poll: Option<bool> = None;
189 loop {
190 let satisfied = self.eval(&expect, state)?;
191 if last_poll != Some(satisfied) {
192 log::debug!("poll: {cond_desc} → {satisfied}");
193 last_poll = Some(satisfied);
194 }
195 if satisfied {
196 if let (Some(_), OnFailure::Abort) = (&last_action_error, &step.on_failure) {
200 break;
201 }
202 self.log_info(&format!("{prefix}: ok"));
203 return Ok(match step.on_success {
204 OnSuccess::Continue => StepOutcome::Continue,
205 OnSuccess::ReturnPhase => {
206 log::debug!("{prefix}: on_success=return_phase, stopping phase");
207 StepOutcome::ReturnPhase
208 }
209 });
210 }
211 if Instant::now() >= deadline {
212 break;
213 }
214 thread::sleep(POLL_INTERVAL);
215 }
216
217 let timeout_msg = format!(
218 "{label}: timed out (attempt {}), checking recovery",
219 attempts + 1
220 );
221 self.log_warn(&timeout_msg);
222
223 let all: Vec<(String, crate::Condition, Vec<Action>, ResumeStrategy)> = local_handlers
224 .iter()
225 .chain(self.global_handlers.iter())
226 .map(|h| {
227 (
228 h.name.clone(),
229 h.trigger.clone(),
230 h.actions.clone(),
231 h.resume,
232 )
233 })
234 .collect();
235
236 let mut fired: Option<(String, Vec<Action>, ResumeStrategy)> = None;
237 for (name, trigger, actions, resume) in all {
238 if self.eval(&trigger, state)? {
239 fired = Some((name, actions, resume));
240 break;
241 }
242 }
243
244 match fired {
245 Some((name, actions, resume)) if *recovery_count < max_recoveries => {
246 *recovery_count += 1;
247 self.log_info(&format!(
248 "{label}: recovery handler '{name}' fired ({recovery_count}/{max_recoveries})"
249 ));
250 for action in &actions {
251 let rdesc = action.describe();
252 log::debug!("recovery action: {rdesc}");
253 if let Err(e) = self.exec(action, state) {
254 log::debug!("{label}: recovery action → Err: {e}");
255 } else {
256 log::debug!("recovery action → Ok");
257 }
258 }
259 match resume {
260 ResumeStrategy::RetryStep => {
261 attempts += 1;
262 continue;
263 }
264 ResumeStrategy::SkipStep => {
265 self.log_info(&format!("{label}: skipped by recovery"));
266 return Ok(StepOutcome::Continue);
267 }
268 ResumeStrategy::Fail => {
269 let msg = format!("{label}: recovery handler '{name}' instructed Fail");
270 log::debug!("{msg}");
271 return Err(AutomataError::Internal(msg));
272 }
273 }
274 }
275 Some((name, _, _)) => {
276 let msg = format!(
277 "{label}: recovery handler '{name}' would fire but max_recoveries ({max_recoveries}) reached"
278 );
279 self.log_warn(&msg);
280 return self
281 .apply_on_failure_policy(step, &label, &expect, timeout, msg, state);
282 }
283 None => match retry {
284 RetryPolicy::Fixed { count, delay } if attempts < *count => {
285 attempts += 1;
286 thread::sleep(*delay);
287 continue;
288 }
289 _ => {
290 let msg = match &last_action_error {
291 Some(e) => format!(
292 "{label}: timed out after {} attempt(s)\n action error: {e}\n expect: {cond_desc}",
293 attempts + 1
294 ),
295 None => format!(
296 "{label}: timed out after {} attempt(s)\n expect: {cond_desc}",
297 attempts + 1
298 ),
299 };
300 log::debug!("{msg}");
301 return self
302 .apply_on_failure_policy(step, &label, &expect, timeout, msg, state);
303 }
304 },
305 }
306 }
307 }
308
309 fn apply_on_failure_policy(
313 &mut self,
314 step: &Step,
315 label: &str,
316 expect: &crate::Condition,
317 timeout: Duration,
318 failure_msg: String,
319 state: &mut WorkflowState,
320 ) -> Result<StepOutcome, AutomataError> {
321 if let Some(fallback) = &step.fallback {
322 self.log_info(&format!("{label}: trying fallback action"));
323 if let Err(e) = self.exec(fallback, state) {
324 log::debug!("{label}: fallback action → Err: {e}");
325 }
326 let deadline = Instant::now() + timeout;
328 loop {
329 if self.eval(expect, state)? {
330 self.log_info(&format!("{label}: fallback succeeded"));
331 return Ok(match step.on_success {
332 OnSuccess::Continue => StepOutcome::Continue,
333 OnSuccess::ReturnPhase => {
334 log::debug!("{label}: on_success=return_phase, stopping phase");
335 StepOutcome::ReturnPhase
336 }
337 });
338 }
339 if Instant::now() >= deadline {
340 break;
341 }
342 thread::sleep(POLL_INTERVAL);
343 }
344 self.log_warn(&format!("{label}: fallback did not satisfy expect"));
345 }
346 match &step.on_failure {
347 OnFailure::Abort => Err(AutomataError::Internal(failure_msg)),
348 OnFailure::Continue => {
349 self.log_warn(&format!("{label}: on_failure=continue, proceeding"));
350 Ok(StepOutcome::Continue)
351 }
352 }
353 }
354
355 fn exec(&mut self, action: &Action, state: &mut WorkflowState) -> Result<(), AutomataError> {
356 action.execute(
357 &mut self.dom,
358 &self.desktop,
359 &mut state.output,
360 &mut state.locals,
361 &state.params,
362 )
363 }
364
365 fn eval(
366 &mut self,
367 cond: &crate::Condition,
368 state: &WorkflowState,
369 ) -> Result<bool, AutomataError> {
370 cond.evaluate(
371 &mut self.dom,
372 &self.desktop,
373 &state.locals,
374 &state.params,
375 &state.output,
376 )
377 }
378
379 pub fn eval_condition(
382 &mut self,
383 cond: &crate::Condition,
384 locals: &std::collections::HashMap<String, String>,
385 params: &std::collections::HashMap<String, String>,
386 output: &crate::output::Output,
387 ) -> Result<bool, AutomataError> {
388 cond.evaluate(&mut self.dom, &self.desktop, locals, params, output)
389 }
390
391 fn log_info(&self, msg: &str) {
392 log::info!("{msg}");
393 }
394
395 fn log_warn(&self, msg: &str) {
396 log::warn!("{msg}");
397 }
398}