Skip to main content

scud/attractor/
runner.rs

1//! Core pipeline execution engine.
2//!
3//! Implements the lifecycle: PARSE → VALIDATE → INITIALIZE → EXECUTE → FINALIZE.
4//! The core loop follows the spec Section 3.2 exactly:
5//! 1. Execute node handler → get Outcome
6//! 2. Apply context_updates from Outcome
7//! 3. Save checkpoint
8//! 4. Select next edge (5-step algorithm)
9//! 5. At terminal nodes: check goal gates → retry_target if unsatisfied
10//! 6. Retry logic with exponential backoff + jitter
11
12use anyhow::{Context, Result};
13use petgraph::graph::NodeIndex;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17use super::checkpoint::Checkpoint;
18use super::conditions::{evaluate_condition, parse_condition};
19use super::context::{Context as PipelineContext, ContextSnapshot};
20use super::events::PipelineEvent;
21use super::graph::{PipelineGraph, PipelineNode};
22use super::handlers::HandlerRegistry;
23use super::interviewer::Interviewer;
24use super::outcome::{Outcome, StageStatus};
25use super::retry::RetryPolicy;
26use super::run_directory::RunDirectory;
27
28/// The pipeline execution engine.
29pub struct PipelineRunner {
30    pub handler_registry: HandlerRegistry,
31    pub interviewer: Box<dyn Interviewer>,
32    pub event_tx: Option<mpsc::Sender<PipelineEvent>>,
33}
34
35impl PipelineRunner {
36    /// Create a new PipelineRunner.
37    pub fn new(handler_registry: HandlerRegistry, interviewer: Box<dyn Interviewer>) -> Self {
38        Self {
39            handler_registry,
40            interviewer,
41            event_tx: None,
42        }
43    }
44
45    /// Set the event channel for observability.
46    pub fn with_events(mut self, tx: mpsc::Sender<PipelineEvent>) -> Self {
47        self.event_tx = Some(tx);
48        self
49    }
50
51    /// Execute a pipeline graph from start to finish.
52    pub async fn run(
53        &self,
54        graph: &PipelineGraph,
55        context: &PipelineContext,
56        run_dir: &RunDirectory,
57        checkpoint: Option<Checkpoint>,
58    ) -> Result<StageStatus> {
59        let run_start = Instant::now();
60        let run_id = run_dir
61            .root()
62            .file_name()
63            .map(|f| f.to_string_lossy().to_string())
64            .unwrap_or_default();
65
66        self.emit(PipelineEvent::pipeline_started(&graph.name, &run_id))
67            .await;
68
69        // Determine starting node
70        let (mut current_idx, mut cp) = if let Some(cp) = checkpoint {
71            // Resume from checkpoint
72            let idx = *graph
73                .node_index
74                .get(&cp.current_node)
75                .context("Checkpoint node not found in graph")?;
76            (idx, cp)
77        } else {
78            let snap = ContextSnapshot::from(context.snapshot().await);
79            let cp = Checkpoint::new(&graph.graph[graph.start_node].id, snap);
80            (graph.start_node, cp)
81        };
82
83        let mut nodes_executed = 0;
84
85        loop {
86            let node = &graph.graph[current_idx];
87            let node_id = node.id.clone();
88            let node_start = Instant::now();
89
90            self.emit(PipelineEvent::node_started(&node_id, &node.handler_type))
91                .await;
92
93            // Execute handler
94            let outcome = self.execute_node(node, context, graph, run_dir).await?;
95
96            let duration_ms = node_start.elapsed().as_millis() as u64;
97            self.emit(PipelineEvent::node_completed(
98                &node_id,
99                outcome.status.clone(),
100                duration_ms,
101            ))
102            .await;
103
104            nodes_executed += 1;
105
106            // Apply context updates
107            if !outcome.context_updates.is_empty() {
108                context.apply_updates(&outcome.context_updates).await;
109            }
110
111            // Update checkpoint
112            cp.current_node = node_id.clone();
113            cp.mark_completed(&node_id, outcome.status.clone());
114            cp.context = ContextSnapshot::from(context.snapshot().await);
115            cp.save(&run_dir.checkpoint_path())?;
116
117            self.emit(PipelineEvent::CheckpointSaved {
118                node_id: node_id.clone(),
119            })
120            .await;
121
122            // Check if this is a terminal node (exit)
123            if node.handler_type == "exit" {
124                // Goal gate check
125                let goal_satisfied = self.check_goal_gates(graph, &outcome, context).await;
126
127                self.emit(PipelineEvent::GoalGateCheck {
128                    node_id: node_id.clone(),
129                    satisfied: goal_satisfied,
130                })
131                .await;
132
133                if goal_satisfied {
134                    let total_ms = run_start.elapsed().as_millis() as u64;
135                    self.emit(PipelineEvent::PipelineCompleted {
136                        status: StageStatus::Success,
137                        total_duration_ms: total_ms,
138                        nodes_executed,
139                    })
140                    .await;
141                    return Ok(StageStatus::Success);
142                } else if let Some(ref retry_target) = node.retry_target {
143                    // Route to retry target
144                    if let Some(&target_idx) = graph.node_index.get(retry_target) {
145                        current_idx = target_idx;
146                        continue;
147                    }
148                }
149                // No retry target or not found — finish anyway
150                let total_ms = run_start.elapsed().as_millis() as u64;
151                self.emit(PipelineEvent::PipelineCompleted {
152                    status: outcome.status.clone(),
153                    total_duration_ms: total_ms,
154                    nodes_executed,
155                })
156                .await;
157                return Ok(outcome.status);
158            }
159
160            // Handle failure with retry logic
161            if !outcome.status.is_success() && outcome.status != StageStatus::Skipped {
162                let policy = RetryPolicy::from_max_retries(node.max_retries);
163                let attempt = cp.increment_retry(&node_id);
164
165                if policy.should_retry(attempt) {
166                    let delay = policy.delay_for_attempt(attempt);
167                    self.emit(PipelineEvent::RetryScheduled {
168                        node_id: node_id.clone(),
169                        attempt,
170                        max_retries: node.max_retries,
171                        delay_ms: delay.as_millis() as u64,
172                    })
173                    .await;
174
175                    tokio::time::sleep(delay).await;
176                    // Stay on the same node for retry
177                    continue;
178                }
179
180                // Exhausted retries — check for retry_target
181                if let Some(ref target) = node.retry_target {
182                    if let Some(&target_idx) = graph.node_index.get(target) {
183                        current_idx = target_idx;
184                        continue;
185                    }
186                }
187                if let Some(ref fallback) = node.fallback_retry_target {
188                    if let Some(&target_idx) = graph.node_index.get(fallback) {
189                        current_idx = target_idx;
190                        continue;
191                    }
192                }
193
194                // No retry options — fail the pipeline
195                let total_ms = run_start.elapsed().as_millis() as u64;
196                self.emit(PipelineEvent::PipelineCompleted {
197                    status: StageStatus::Failure,
198                    total_duration_ms: total_ms,
199                    nodes_executed,
200                })
201                .await;
202                return Ok(StageStatus::Failure);
203            }
204
205            // Select next edge (5-step algorithm)
206            let next_idx = self
207                .select_next_edge(graph, current_idx, &outcome, context)
208                .await?;
209
210            match next_idx {
211                Some(idx) => current_idx = idx,
212                None => {
213                    // No outgoing edges — treat as pipeline completion
214                    let total_ms = run_start.elapsed().as_millis() as u64;
215                    self.emit(PipelineEvent::PipelineCompleted {
216                        status: outcome.status,
217                        total_duration_ms: total_ms,
218                        nodes_executed,
219                    })
220                    .await;
221                    return Ok(StageStatus::Success);
222                }
223            }
224        }
225    }
226
227    /// Execute a single node using the handler registry.
228    ///
229    /// If the node has a `weave_event` attribute, it is checked before execution.
230    /// Currently this logs the event and records it in context; full coordinator
231    /// integration is deferred (see TODO below).
232    async fn execute_node(
233        &self,
234        node: &PipelineNode,
235        context: &PipelineContext,
236        graph: &PipelineGraph,
237        run_dir: &RunDirectory,
238    ) -> Result<Outcome> {
239        // Weave gating: check for weave_event attribute before executing
240        if let Some(weave_event_attr) = node.extra_attrs.get("weave_event") {
241            let weave_event_json = weave_event_attr.as_str();
242            tracing::info!(
243                node_id = %node.id,
244                weave_event = %weave_event_json,
245                "Weave event gate detected on node"
246            );
247
248            // Parse the weave event to validate it is well-formed JSON
249            match serde_json::from_str::<serde_json::Value>(&weave_event_json) {
250                Ok(event) => {
251                    // Store the weave event in context for downstream inspection
252                    context
253                        .set(
254                            format!("{}.weave_event", node.id),
255                            event,
256                        )
257                        .await;
258
259                    // TODO: Wire up full weave coordinator integration:
260                    //   let decision = coordinator.evaluate(&event);
261                    //   if !decision.is_proceed() {
262                    //       return Ok(Outcome::failure("Blocked by weave gate"));
263                    //   }
264                }
265                Err(e) => {
266                    tracing::warn!(
267                        node_id = %node.id,
268                        error = %e,
269                        "Invalid weave_event JSON on node, skipping gate check"
270                    );
271                }
272            }
273        }
274
275        let handler = self.handler_registry.get(&node.handler_type);
276        handler
277            .execute(node, context, graph, run_dir)
278            .await
279            .context(format!(
280                "Handler '{}' failed for node '{}'",
281                node.handler_type, node.id
282            ))
283    }
284
285    /// 5-step edge selection algorithm (spec Section 3.3).
286    async fn select_next_edge(
287        &self,
288        graph: &PipelineGraph,
289        current: NodeIndex,
290        outcome: &Outcome,
291        context: &PipelineContext,
292    ) -> Result<Option<NodeIndex>> {
293        let edges = graph.outgoing_edges(current);
294        if edges.is_empty() {
295            return Ok(None);
296        }
297
298        let current_id = &graph.graph[current].id;
299        let ctx_snapshot = context.snapshot().await;
300
301        // Step 1: Condition-matching edges
302        let mut condition_matches: Vec<(NodeIndex, &str)> = Vec::new();
303        for (target, edge) in &edges {
304            if !edge.condition.is_empty() {
305                let cond = parse_condition(&edge.condition);
306                if evaluate_condition(&cond, outcome, &ctx_snapshot) {
307                    condition_matches.push((*target, &edge.label));
308                }
309            }
310        }
311        if condition_matches.len() == 1 {
312            let (target, label) = condition_matches[0];
313            self.emit(PipelineEvent::edge_selected(
314                current_id,
315                &graph.graph[target].id,
316                label,
317                1,
318            ))
319            .await;
320            return Ok(Some(target));
321        }
322
323        // Step 2: Preferred label match
324        if let Some(ref preferred) = outcome.preferred_label {
325            let normalized = normalize_label(preferred);
326            for (target, edge) in &edges {
327                if normalize_label(&edge.label) == normalized {
328                    self.emit(PipelineEvent::edge_selected(
329                        current_id,
330                        &graph.graph[*target].id,
331                        &edge.label,
332                        2,
333                    ))
334                    .await;
335                    return Ok(Some(*target));
336                }
337            }
338        }
339
340        // Step 3: Suggested next IDs from outcome
341        for suggested in &outcome.suggested_next {
342            if let Some(&target_idx) = graph.node_index.get(suggested) {
343                // Verify edge exists
344                for (target, edge) in &edges {
345                    if *target == target_idx {
346                        self.emit(PipelineEvent::edge_selected(
347                            current_id,
348                            suggested,
349                            &edge.label,
350                            3,
351                        ))
352                        .await;
353                        return Ok(Some(target_idx));
354                    }
355                }
356            }
357        }
358
359        // Step 4: Highest weight among unconditional edges
360        let unconditional: Vec<_> = edges
361            .iter()
362            .filter(|(_, e)| e.condition.is_empty())
363            .collect();
364
365        if !unconditional.is_empty() {
366            let max_weight = unconditional.iter().map(|(_, e)| e.weight).max().unwrap();
367            let heaviest: Vec<_> = unconditional
368                .iter()
369                .filter(|(_, e)| e.weight == max_weight)
370                .collect();
371
372            if heaviest.len() == 1 {
373                let (target, edge) = heaviest[0];
374                self.emit(PipelineEvent::edge_selected(
375                    current_id,
376                    &graph.graph[*target].id,
377                    &edge.label,
378                    4,
379                ))
380                .await;
381                return Ok(Some(*target));
382            }
383
384            // Step 5: Lexical tiebreak
385            let mut candidates: Vec<_> = heaviest
386                .iter()
387                .map(|(target, edge)| {
388                    let t = *target;
389                    (t, graph.graph[t].id.clone(), edge.label.clone())
390                })
391                .collect();
392            candidates.sort_by(|a, b| a.1.cmp(&b.1));
393
394            let (target, ref id, ref label) = candidates[0];
395            self.emit(PipelineEvent::edge_selected(current_id, id, label, 5))
396                .await;
397            return Ok(Some(target));
398        }
399
400        // Fallback: if all edges have conditions and none matched, no edge selected
401        Ok(None)
402    }
403
404    /// Check goal gates on terminal nodes.
405    async fn check_goal_gates(
406        &self,
407        _graph: &PipelineGraph,
408        _outcome: &Outcome,
409        context: &PipelineContext,
410    ) -> bool {
411        // Check if any node with goal_gate=true has not been completed
412        // For MVP, check if context has a "goal_satisfied" key
413        if let Some(val) = context.get("goal_satisfied").await {
414            if let Some(b) = val.as_bool() {
415                return b;
416            }
417            if let Some(s) = val.as_str() {
418                return s == "true";
419            }
420        }
421        // Default: satisfied (no explicit gate)
422        true
423    }
424
425    async fn emit(&self, event: PipelineEvent) {
426        if let Some(ref tx) = self.event_tx {
427            let _ = tx.send(event).await;
428        }
429    }
430}
431
432/// Normalize a label for comparison: lowercase, trim, strip [K] accelerators.
433fn normalize_label(label: &str) -> String {
434    let s = label.trim().to_lowercase();
435    // Strip [X] accelerator markers
436    if s.starts_with('[') {
437        if let Some(pos) = s.find(']') {
438            return s[pos + 1..].trim().to_string();
439        }
440    }
441    s
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn test_normalize_label() {
450        assert_eq!(normalize_label("Success"), "success");
451        assert_eq!(normalize_label("  Approve  "), "approve");
452        assert_eq!(normalize_label("[A] Approve"), "approve");
453        assert_eq!(normalize_label("[K] Keep Going"), "keep going");
454    }
455}