Skip to main content

scud/attractor/
runner.rs

1//! Core pipeline execution engine.
2//!
3//! Implements the lifecycle: PARSE → VALIDATE → INITIALIZE → EXECUTE → FINALIZE.
4//! The core loop follows the spec Section 3.2 exactly:
5//! 1. Execute node handler → get Outcome
6//! 2. Apply context_updates from Outcome
7//! 3. Save checkpoint
8//! 4. Select next edge (5-step algorithm)
9//! 5. At terminal nodes: check goal gates → retry_target if unsatisfied
10//! 6. Retry logic with exponential backoff + jitter
11
12use anyhow::{Context, Result};
13use petgraph::graph::NodeIndex;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17use super::checkpoint::Checkpoint;
18use super::conditions::{evaluate_condition, parse_condition};
19use super::context::{Context as PipelineContext, ContextSnapshot};
20use super::events::PipelineEvent;
21use super::graph::{PipelineGraph, PipelineNode};
22use super::handlers::HandlerRegistry;
23use super::interviewer::Interviewer;
24use super::outcome::{Outcome, StageStatus};
25use super::retry::RetryPolicy;
26use super::run_directory::RunDirectory;
27
28/// The pipeline execution engine.
29pub struct PipelineRunner {
30    pub handler_registry: HandlerRegistry,
31    pub interviewer: Box<dyn Interviewer>,
32    pub event_tx: Option<mpsc::Sender<PipelineEvent>>,
33}
34
35impl PipelineRunner {
36    /// Create a new PipelineRunner.
37    pub fn new(
38        handler_registry: HandlerRegistry,
39        interviewer: Box<dyn Interviewer>,
40    ) -> Self {
41        Self {
42            handler_registry,
43            interviewer,
44            event_tx: None,
45        }
46    }
47
48    /// Set the event channel for observability.
49    pub fn with_events(mut self, tx: mpsc::Sender<PipelineEvent>) -> Self {
50        self.event_tx = Some(tx);
51        self
52    }
53
54    /// Execute a pipeline graph from start to finish.
55    pub async fn run(
56        &self,
57        graph: &PipelineGraph,
58        context: &PipelineContext,
59        run_dir: &RunDirectory,
60        checkpoint: Option<Checkpoint>,
61    ) -> Result<StageStatus> {
62        let run_start = Instant::now();
63        let run_id = run_dir
64            .root()
65            .file_name()
66            .map(|f| f.to_string_lossy().to_string())
67            .unwrap_or_default();
68
69        self.emit(PipelineEvent::pipeline_started(&graph.name, &run_id))
70            .await;
71
72        // Determine starting node
73        let (mut current_idx, mut cp) = if let Some(cp) = checkpoint {
74            // Resume from checkpoint
75            let idx = *graph
76                .node_index
77                .get(&cp.current_node)
78                .context("Checkpoint node not found in graph")?;
79            (idx, cp)
80        } else {
81            let snap = ContextSnapshot::from(context.snapshot().await);
82            let cp = Checkpoint::new(
83                &graph.graph[graph.start_node].id,
84                snap,
85            );
86            (graph.start_node, cp)
87        };
88
89        let mut nodes_executed = 0;
90
91        loop {
92            let node = &graph.graph[current_idx];
93            let node_id = node.id.clone();
94            let node_start = Instant::now();
95
96            self.emit(PipelineEvent::node_started(&node_id, &node.handler_type))
97                .await;
98
99            // Execute handler
100            let outcome = self.execute_node(node, context, graph, run_dir).await?;
101
102            let duration_ms = node_start.elapsed().as_millis() as u64;
103            self.emit(PipelineEvent::node_completed(
104                &node_id,
105                outcome.status.clone(),
106                duration_ms,
107            ))
108            .await;
109
110            nodes_executed += 1;
111
112            // Apply context updates
113            if !outcome.context_updates.is_empty() {
114                context.apply_updates(&outcome.context_updates).await;
115            }
116
117            // Update checkpoint
118            cp.current_node = node_id.clone();
119            cp.mark_completed(&node_id, outcome.status.clone());
120            cp.context = ContextSnapshot::from(context.snapshot().await);
121            cp.save(&run_dir.checkpoint_path())?;
122
123            self.emit(PipelineEvent::CheckpointSaved {
124                node_id: node_id.clone(),
125            })
126            .await;
127
128            // Check if this is a terminal node (exit)
129            if node.handler_type == "exit" {
130                // Goal gate check
131                let goal_satisfied = self.check_goal_gates(graph, &outcome, context).await;
132
133                self.emit(PipelineEvent::GoalGateCheck {
134                    node_id: node_id.clone(),
135                    satisfied: goal_satisfied,
136                })
137                .await;
138
139                if goal_satisfied {
140                    let total_ms = run_start.elapsed().as_millis() as u64;
141                    self.emit(PipelineEvent::PipelineCompleted {
142                        status: StageStatus::Success,
143                        total_duration_ms: total_ms,
144                        nodes_executed,
145                    })
146                    .await;
147                    return Ok(StageStatus::Success);
148                } else if let Some(ref retry_target) = node.retry_target {
149                    // Route to retry target
150                    if let Some(&target_idx) = graph.node_index.get(retry_target) {
151                        current_idx = target_idx;
152                        continue;
153                    }
154                }
155                // No retry target or not found — finish anyway
156                let total_ms = run_start.elapsed().as_millis() as u64;
157                self.emit(PipelineEvent::PipelineCompleted {
158                    status: outcome.status.clone(),
159                    total_duration_ms: total_ms,
160                    nodes_executed,
161                })
162                .await;
163                return Ok(outcome.status);
164            }
165
166            // Handle failure with retry logic
167            if !outcome.status.is_success() && outcome.status != StageStatus::Skipped {
168                let policy = RetryPolicy::from_max_retries(node.max_retries);
169                let attempt = cp.increment_retry(&node_id);
170
171                if policy.should_retry(attempt) {
172                    let delay = policy.delay_for_attempt(attempt);
173                    self.emit(PipelineEvent::RetryScheduled {
174                        node_id: node_id.clone(),
175                        attempt,
176                        max_retries: node.max_retries,
177                        delay_ms: delay.as_millis() as u64,
178                    })
179                    .await;
180
181                    tokio::time::sleep(delay).await;
182                    // Stay on the same node for retry
183                    continue;
184                }
185
186                // Exhausted retries — check for retry_target
187                if let Some(ref target) = node.retry_target {
188                    if let Some(&target_idx) = graph.node_index.get(target) {
189                        current_idx = target_idx;
190                        continue;
191                    }
192                }
193                if let Some(ref fallback) = node.fallback_retry_target {
194                    if let Some(&target_idx) = graph.node_index.get(fallback) {
195                        current_idx = target_idx;
196                        continue;
197                    }
198                }
199
200                // No retry options — fail the pipeline
201                let total_ms = run_start.elapsed().as_millis() as u64;
202                self.emit(PipelineEvent::PipelineCompleted {
203                    status: StageStatus::Failure,
204                    total_duration_ms: total_ms,
205                    nodes_executed,
206                })
207                .await;
208                return Ok(StageStatus::Failure);
209            }
210
211            // Select next edge (5-step algorithm)
212            let next_idx = self
213                .select_next_edge(graph, current_idx, &outcome, context)
214                .await?;
215
216            match next_idx {
217                Some(idx) => current_idx = idx,
218                None => {
219                    // No outgoing edges — treat as pipeline completion
220                    let total_ms = run_start.elapsed().as_millis() as u64;
221                    self.emit(PipelineEvent::PipelineCompleted {
222                        status: outcome.status,
223                        total_duration_ms: total_ms,
224                        nodes_executed,
225                    })
226                    .await;
227                    return Ok(StageStatus::Success);
228                }
229            }
230        }
231    }
232
233    /// Execute a single node using the handler registry.
234    async fn execute_node(
235        &self,
236        node: &PipelineNode,
237        context: &PipelineContext,
238        graph: &PipelineGraph,
239        run_dir: &RunDirectory,
240    ) -> Result<Outcome> {
241        let handler = self.handler_registry.get(&node.handler_type);
242        handler
243            .execute(node, context, graph, run_dir)
244            .await
245            .context(format!("Handler '{}' failed for node '{}'", node.handler_type, node.id))
246    }
247
248    /// 5-step edge selection algorithm (spec Section 3.3).
249    async fn select_next_edge(
250        &self,
251        graph: &PipelineGraph,
252        current: NodeIndex,
253        outcome: &Outcome,
254        context: &PipelineContext,
255    ) -> Result<Option<NodeIndex>> {
256        let edges = graph.outgoing_edges(current);
257        if edges.is_empty() {
258            return Ok(None);
259        }
260
261        let current_id = &graph.graph[current].id;
262        let ctx_snapshot = context.snapshot().await;
263
264        // Step 1: Condition-matching edges
265        let mut condition_matches: Vec<(NodeIndex, &str)> = Vec::new();
266        for (target, edge) in &edges {
267            if !edge.condition.is_empty() {
268                let cond = parse_condition(&edge.condition);
269                if evaluate_condition(&cond, outcome, &ctx_snapshot) {
270                    condition_matches.push((*target, &edge.label));
271                }
272            }
273        }
274        if condition_matches.len() == 1 {
275            let (target, label) = condition_matches[0];
276            self.emit(PipelineEvent::edge_selected(
277                current_id,
278                &graph.graph[target].id,
279                label,
280                1,
281            ))
282            .await;
283            return Ok(Some(target));
284        }
285
286        // Step 2: Preferred label match
287        if let Some(ref preferred) = outcome.preferred_label {
288            let normalized = normalize_label(preferred);
289            for (target, edge) in &edges {
290                if normalize_label(&edge.label) == normalized {
291                    self.emit(PipelineEvent::edge_selected(
292                        current_id,
293                        &graph.graph[*target].id,
294                        &edge.label,
295                        2,
296                    ))
297                    .await;
298                    return Ok(Some(*target));
299                }
300            }
301        }
302
303        // Step 3: Suggested next IDs from outcome
304        for suggested in &outcome.suggested_next {
305            if let Some(&target_idx) = graph.node_index.get(suggested) {
306                // Verify edge exists
307                for (target, edge) in &edges {
308                    if *target == target_idx {
309                        self.emit(PipelineEvent::edge_selected(
310                            current_id,
311                            suggested,
312                            &edge.label,
313                            3,
314                        ))
315                        .await;
316                        return Ok(Some(target_idx));
317                    }
318                }
319            }
320        }
321
322        // Step 4: Highest weight among unconditional edges
323        let unconditional: Vec<_> = edges
324            .iter()
325            .filter(|(_, e)| e.condition.is_empty())
326            .collect();
327
328        if !unconditional.is_empty() {
329            let max_weight = unconditional.iter().map(|(_, e)| e.weight).max().unwrap();
330            let heaviest: Vec<_> = unconditional
331                .iter()
332                .filter(|(_, e)| e.weight == max_weight)
333                .collect();
334
335            if heaviest.len() == 1 {
336                let (target, edge) = heaviest[0];
337                self.emit(PipelineEvent::edge_selected(
338                    current_id,
339                    &graph.graph[*target].id,
340                    &edge.label,
341                    4,
342                ))
343                .await;
344                return Ok(Some(*target));
345            }
346
347            // Step 5: Lexical tiebreak
348            let mut candidates: Vec<_> = heaviest
349                .iter()
350                .map(|(target, edge)| {
351                    let t = *target;
352                    (t, graph.graph[t].id.clone(), edge.label.clone())
353                })
354                .collect();
355            candidates.sort_by(|a, b| a.1.cmp(&b.1));
356
357            let (target, ref id, ref label) = candidates[0];
358            self.emit(PipelineEvent::edge_selected(current_id, id, label, 5))
359                .await;
360            return Ok(Some(target));
361        }
362
363        // Fallback: if all edges have conditions and none matched, no edge selected
364        Ok(None)
365    }
366
367    /// Check goal gates on terminal nodes.
368    async fn check_goal_gates(
369        &self,
370        _graph: &PipelineGraph,
371        _outcome: &Outcome,
372        context: &PipelineContext,
373    ) -> bool {
374        // Check if any node with goal_gate=true has not been completed
375        // For MVP, check if context has a "goal_satisfied" key
376        if let Some(val) = context.get("goal_satisfied").await {
377            if let Some(b) = val.as_bool() {
378                return b;
379            }
380            if let Some(s) = val.as_str() {
381                return s == "true";
382            }
383        }
384        // Default: satisfied (no explicit gate)
385        true
386    }
387
388    async fn emit(&self, event: PipelineEvent) {
389        if let Some(ref tx) = self.event_tx {
390            let _ = tx.send(event).await;
391        }
392    }
393}
394
395/// Normalize a label for comparison: lowercase, trim, strip [K] accelerators.
396fn normalize_label(label: &str) -> String {
397    let s = label.trim().to_lowercase();
398    // Strip [X] accelerator markers
399    if s.starts_with('[') {
400        if let Some(pos) = s.find(']') {
401            return s[pos + 1..].trim().to_string();
402        }
403    }
404    s
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[test]
412    fn test_normalize_label() {
413        assert_eq!(normalize_label("Success"), "success");
414        assert_eq!(normalize_label("  Approve  "), "approve");
415        assert_eq!(normalize_label("[A] Approve"), "approve");
416        assert_eq!(normalize_label("[K] Keep Going"), "keep going");
417    }
418}