Skip to main content

scud/attractor/
runner.rs

1//! Core pipeline execution engine.
2//!
3//! Implements the lifecycle: PARSE → VALIDATE → INITIALIZE → EXECUTE → FINALIZE.
4//! The core loop follows the spec Section 3.2 exactly:
5//! 1. Execute node handler → get Outcome
6//! 2. Apply context_updates from Outcome
7//! 3. Save checkpoint
8//! 4. Select next edge (5-step algorithm)
9//! 5. At terminal nodes: check goal gates → retry_target if unsatisfied
10//! 6. Retry logic with exponential backoff + jitter
11
12use anyhow::{Context, Result};
13use petgraph::graph::NodeIndex;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17use super::checkpoint::Checkpoint;
18use super::conditions::{evaluate_condition, parse_condition};
19use super::context::{Context as PipelineContext, ContextSnapshot};
20use super::events::PipelineEvent;
21use super::graph::{PipelineGraph, PipelineNode};
22use super::handlers::HandlerRegistry;
23use super::interviewer::Interviewer;
24use super::outcome::{Outcome, StageStatus};
25use super::retry::RetryPolicy;
26use super::run_directory::RunDirectory;
27
28/// The pipeline execution engine.
29pub struct PipelineRunner {
30    pub handler_registry: HandlerRegistry,
31    pub interviewer: Box<dyn Interviewer>,
32    pub event_tx: Option<mpsc::Sender<PipelineEvent>>,
33}
34
35impl PipelineRunner {
36    /// Create a new PipelineRunner.
37    pub fn new(handler_registry: HandlerRegistry, interviewer: Box<dyn Interviewer>) -> Self {
38        Self {
39            handler_registry,
40            interviewer,
41            event_tx: None,
42        }
43    }
44
45    /// Set the event channel for observability.
46    pub fn with_events(mut self, tx: mpsc::Sender<PipelineEvent>) -> Self {
47        self.event_tx = Some(tx);
48        self
49    }
50
51    /// Execute a pipeline graph from start to finish.
52    pub async fn run(
53        &self,
54        graph: &PipelineGraph,
55        context: &PipelineContext,
56        run_dir: &RunDirectory,
57        checkpoint: Option<Checkpoint>,
58    ) -> Result<StageStatus> {
59        let run_start = Instant::now();
60        let run_id = run_dir
61            .root()
62            .file_name()
63            .map(|f| f.to_string_lossy().to_string())
64            .unwrap_or_default();
65
66        self.emit(PipelineEvent::pipeline_started(&graph.name, &run_id))
67            .await;
68
69        // Determine starting node
70        let (mut current_idx, mut cp) = if let Some(cp) = checkpoint {
71            // Resume from checkpoint
72            let idx = *graph
73                .node_index
74                .get(&cp.current_node)
75                .context("Checkpoint node not found in graph")?;
76            (idx, cp)
77        } else {
78            let snap = ContextSnapshot::from(context.snapshot().await);
79            let cp = Checkpoint::new(&graph.graph[graph.start_node].id, snap);
80            (graph.start_node, cp)
81        };
82
83        let mut nodes_executed = 0;
84
85        loop {
86            let node = &graph.graph[current_idx];
87            let node_id = node.id.clone();
88            let node_start = Instant::now();
89
90            self.emit(PipelineEvent::node_started(&node_id, &node.handler_type))
91                .await;
92
93            // Execute handler
94            let outcome = self.execute_node(node, context, graph, run_dir).await?;
95
96            let duration_ms = node_start.elapsed().as_millis() as u64;
97            self.emit(PipelineEvent::node_completed(
98                &node_id,
99                outcome.status.clone(),
100                duration_ms,
101            ))
102            .await;
103
104            nodes_executed += 1;
105
106            // Apply context updates
107            if !outcome.context_updates.is_empty() {
108                context.apply_updates(&outcome.context_updates).await;
109            }
110
111            // Update checkpoint
112            cp.current_node = node_id.clone();
113            cp.mark_completed(&node_id, outcome.status.clone());
114            cp.context = ContextSnapshot::from(context.snapshot().await);
115            cp.save(&run_dir.checkpoint_path())?;
116
117            self.emit(PipelineEvent::CheckpointSaved {
118                node_id: node_id.clone(),
119            })
120            .await;
121
122            // Check if this is a terminal node (exit)
123            if node.handler_type == "exit" {
124                // Goal gate check
125                let goal_satisfied = self.check_goal_gates(graph, &outcome, context).await;
126
127                self.emit(PipelineEvent::GoalGateCheck {
128                    node_id: node_id.clone(),
129                    satisfied: goal_satisfied,
130                })
131                .await;
132
133                if goal_satisfied {
134                    let total_ms = run_start.elapsed().as_millis() as u64;
135                    self.emit(PipelineEvent::PipelineCompleted {
136                        status: StageStatus::Success,
137                        total_duration_ms: total_ms,
138                        nodes_executed,
139                    })
140                    .await;
141                    return Ok(StageStatus::Success);
142                } else if let Some(ref retry_target) = node.retry_target {
143                    // Route to retry target
144                    if let Some(&target_idx) = graph.node_index.get(retry_target) {
145                        current_idx = target_idx;
146                        continue;
147                    }
148                }
149                // No retry target or not found — finish anyway
150                let total_ms = run_start.elapsed().as_millis() as u64;
151                self.emit(PipelineEvent::PipelineCompleted {
152                    status: outcome.status.clone(),
153                    total_duration_ms: total_ms,
154                    nodes_executed,
155                })
156                .await;
157                return Ok(outcome.status);
158            }
159
160            // Handle failure with retry logic
161            if !outcome.status.is_success() && outcome.status != StageStatus::Skipped {
162                let policy = RetryPolicy::from_max_retries(node.max_retries);
163                let attempt = cp.increment_retry(&node_id);
164
165                if policy.should_retry(attempt) {
166                    let delay = policy.delay_for_attempt(attempt);
167                    self.emit(PipelineEvent::RetryScheduled {
168                        node_id: node_id.clone(),
169                        attempt,
170                        max_retries: node.max_retries,
171                        delay_ms: delay.as_millis() as u64,
172                    })
173                    .await;
174
175                    tokio::time::sleep(delay).await;
176                    // Stay on the same node for retry
177                    continue;
178                }
179
180                // Exhausted retries — check for retry_target
181                if let Some(ref target) = node.retry_target {
182                    if let Some(&target_idx) = graph.node_index.get(target) {
183                        current_idx = target_idx;
184                        continue;
185                    }
186                }
187                if let Some(ref fallback) = node.fallback_retry_target {
188                    if let Some(&target_idx) = graph.node_index.get(fallback) {
189                        current_idx = target_idx;
190                        continue;
191                    }
192                }
193
194                // No retry options — fail the pipeline
195                let total_ms = run_start.elapsed().as_millis() as u64;
196                self.emit(PipelineEvent::PipelineCompleted {
197                    status: StageStatus::Failure,
198                    total_duration_ms: total_ms,
199                    nodes_executed,
200                })
201                .await;
202                return Ok(StageStatus::Failure);
203            }
204
205            // Select next edge (5-step algorithm)
206            let next_idx = self
207                .select_next_edge(graph, current_idx, &outcome, context)
208                .await?;
209
210            match next_idx {
211                Some(idx) => current_idx = idx,
212                None => {
213                    // No outgoing edges — treat as pipeline completion
214                    let total_ms = run_start.elapsed().as_millis() as u64;
215                    self.emit(PipelineEvent::PipelineCompleted {
216                        status: outcome.status,
217                        total_duration_ms: total_ms,
218                        nodes_executed,
219                    })
220                    .await;
221                    return Ok(StageStatus::Success);
222                }
223            }
224        }
225    }
226
227    /// Execute a single node using the handler registry.
228    async fn execute_node(
229        &self,
230        node: &PipelineNode,
231        context: &PipelineContext,
232        graph: &PipelineGraph,
233        run_dir: &RunDirectory,
234    ) -> Result<Outcome> {
235        let handler = self.handler_registry.get(&node.handler_type);
236        handler
237            .execute(node, context, graph, run_dir)
238            .await
239            .context(format!(
240                "Handler '{}' failed for node '{}'",
241                node.handler_type, node.id
242            ))
243    }
244
245    /// 5-step edge selection algorithm (spec Section 3.3).
246    async fn select_next_edge(
247        &self,
248        graph: &PipelineGraph,
249        current: NodeIndex,
250        outcome: &Outcome,
251        context: &PipelineContext,
252    ) -> Result<Option<NodeIndex>> {
253        let edges = graph.outgoing_edges(current);
254        if edges.is_empty() {
255            return Ok(None);
256        }
257
258        let current_id = &graph.graph[current].id;
259        let ctx_snapshot = context.snapshot().await;
260
261        // Step 1: Condition-matching edges
262        let mut condition_matches: Vec<(NodeIndex, &str)> = Vec::new();
263        for (target, edge) in &edges {
264            if !edge.condition.is_empty() {
265                let cond = parse_condition(&edge.condition);
266                if evaluate_condition(&cond, outcome, &ctx_snapshot) {
267                    condition_matches.push((*target, &edge.label));
268                }
269            }
270        }
271        if condition_matches.len() == 1 {
272            let (target, label) = condition_matches[0];
273            self.emit(PipelineEvent::edge_selected(
274                current_id,
275                &graph.graph[target].id,
276                label,
277                1,
278            ))
279            .await;
280            return Ok(Some(target));
281        }
282
283        // Step 2: Preferred label match
284        if let Some(ref preferred) = outcome.preferred_label {
285            let normalized = normalize_label(preferred);
286            for (target, edge) in &edges {
287                if normalize_label(&edge.label) == normalized {
288                    self.emit(PipelineEvent::edge_selected(
289                        current_id,
290                        &graph.graph[*target].id,
291                        &edge.label,
292                        2,
293                    ))
294                    .await;
295                    return Ok(Some(*target));
296                }
297            }
298        }
299
300        // Step 3: Suggested next IDs from outcome
301        for suggested in &outcome.suggested_next {
302            if let Some(&target_idx) = graph.node_index.get(suggested) {
303                // Verify edge exists
304                for (target, edge) in &edges {
305                    if *target == target_idx {
306                        self.emit(PipelineEvent::edge_selected(
307                            current_id,
308                            suggested,
309                            &edge.label,
310                            3,
311                        ))
312                        .await;
313                        return Ok(Some(target_idx));
314                    }
315                }
316            }
317        }
318
319        // Step 4: Highest weight among unconditional edges
320        let unconditional: Vec<_> = edges
321            .iter()
322            .filter(|(_, e)| e.condition.is_empty())
323            .collect();
324
325        if !unconditional.is_empty() {
326            let max_weight = unconditional.iter().map(|(_, e)| e.weight).max().unwrap();
327            let heaviest: Vec<_> = unconditional
328                .iter()
329                .filter(|(_, e)| e.weight == max_weight)
330                .collect();
331
332            if heaviest.len() == 1 {
333                let (target, edge) = heaviest[0];
334                self.emit(PipelineEvent::edge_selected(
335                    current_id,
336                    &graph.graph[*target].id,
337                    &edge.label,
338                    4,
339                ))
340                .await;
341                return Ok(Some(*target));
342            }
343
344            // Step 5: Lexical tiebreak
345            let mut candidates: Vec<_> = heaviest
346                .iter()
347                .map(|(target, edge)| {
348                    let t = *target;
349                    (t, graph.graph[t].id.clone(), edge.label.clone())
350                })
351                .collect();
352            candidates.sort_by(|a, b| a.1.cmp(&b.1));
353
354            let (target, ref id, ref label) = candidates[0];
355            self.emit(PipelineEvent::edge_selected(current_id, id, label, 5))
356                .await;
357            return Ok(Some(target));
358        }
359
360        // Fallback: if all edges have conditions and none matched, no edge selected
361        Ok(None)
362    }
363
364    /// Check goal gates on terminal nodes.
365    async fn check_goal_gates(
366        &self,
367        _graph: &PipelineGraph,
368        _outcome: &Outcome,
369        context: &PipelineContext,
370    ) -> bool {
371        // Check if any node with goal_gate=true has not been completed
372        // For MVP, check if context has a "goal_satisfied" key
373        if let Some(val) = context.get("goal_satisfied").await {
374            if let Some(b) = val.as_bool() {
375                return b;
376            }
377            if let Some(s) = val.as_str() {
378                return s == "true";
379            }
380        }
381        // Default: satisfied (no explicit gate)
382        true
383    }
384
385    async fn emit(&self, event: PipelineEvent) {
386        if let Some(ref tx) = self.event_tx {
387            let _ = tx.send(event).await;
388        }
389    }
390}
391
392/// Normalize a label for comparison: lowercase, trim, strip [K] accelerators.
393fn normalize_label(label: &str) -> String {
394    let s = label.trim().to_lowercase();
395    // Strip [X] accelerator markers
396    if s.starts_with('[') {
397        if let Some(pos) = s.find(']') {
398            return s[pos + 1..].trim().to_string();
399        }
400    }
401    s
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407
408    #[test]
409    fn test_normalize_label() {
410        assert_eq!(normalize_label("Success"), "success");
411        assert_eq!(normalize_label("  Approve  "), "approve");
412        assert_eq!(normalize_label("[A] Approve"), "approve");
413        assert_eq!(normalize_label("[K] Keep Going"), "keep going");
414    }
415}