1use anyhow::{Context, Result};
13use petgraph::graph::NodeIndex;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17use super::checkpoint::Checkpoint;
18use super::conditions::{evaluate_condition, parse_condition};
19use super::context::{Context as PipelineContext, ContextSnapshot};
20use super::events::PipelineEvent;
21use super::graph::{PipelineGraph, PipelineNode};
22use super::handlers::HandlerRegistry;
23use super::interviewer::Interviewer;
24use super::outcome::{Outcome, StageStatus};
25use super::retry::RetryPolicy;
26use super::run_directory::RunDirectory;
27
28pub struct PipelineRunner {
30 pub handler_registry: HandlerRegistry,
31 pub interviewer: Box<dyn Interviewer>,
32 pub event_tx: Option<mpsc::Sender<PipelineEvent>>,
33}
34
35impl PipelineRunner {
36 pub fn new(
38 handler_registry: HandlerRegistry,
39 interviewer: Box<dyn Interviewer>,
40 ) -> Self {
41 Self {
42 handler_registry,
43 interviewer,
44 event_tx: None,
45 }
46 }
47
48 pub fn with_events(mut self, tx: mpsc::Sender<PipelineEvent>) -> Self {
50 self.event_tx = Some(tx);
51 self
52 }
53
54 pub async fn run(
56 &self,
57 graph: &PipelineGraph,
58 context: &PipelineContext,
59 run_dir: &RunDirectory,
60 checkpoint: Option<Checkpoint>,
61 ) -> Result<StageStatus> {
62 let run_start = Instant::now();
63 let run_id = run_dir
64 .root()
65 .file_name()
66 .map(|f| f.to_string_lossy().to_string())
67 .unwrap_or_default();
68
69 self.emit(PipelineEvent::pipeline_started(&graph.name, &run_id))
70 .await;
71
72 let (mut current_idx, mut cp) = if let Some(cp) = checkpoint {
74 let idx = *graph
76 .node_index
77 .get(&cp.current_node)
78 .context("Checkpoint node not found in graph")?;
79 (idx, cp)
80 } else {
81 let snap = ContextSnapshot::from(context.snapshot().await);
82 let cp = Checkpoint::new(
83 &graph.graph[graph.start_node].id,
84 snap,
85 );
86 (graph.start_node, cp)
87 };
88
89 let mut nodes_executed = 0;
90
91 loop {
92 let node = &graph.graph[current_idx];
93 let node_id = node.id.clone();
94 let node_start = Instant::now();
95
96 self.emit(PipelineEvent::node_started(&node_id, &node.handler_type))
97 .await;
98
99 let outcome = self.execute_node(node, context, graph, run_dir).await?;
101
102 let duration_ms = node_start.elapsed().as_millis() as u64;
103 self.emit(PipelineEvent::node_completed(
104 &node_id,
105 outcome.status.clone(),
106 duration_ms,
107 ))
108 .await;
109
110 nodes_executed += 1;
111
112 if !outcome.context_updates.is_empty() {
114 context.apply_updates(&outcome.context_updates).await;
115 }
116
117 cp.current_node = node_id.clone();
119 cp.mark_completed(&node_id, outcome.status.clone());
120 cp.context = ContextSnapshot::from(context.snapshot().await);
121 cp.save(&run_dir.checkpoint_path())?;
122
123 self.emit(PipelineEvent::CheckpointSaved {
124 node_id: node_id.clone(),
125 })
126 .await;
127
128 if node.handler_type == "exit" {
130 let goal_satisfied = self.check_goal_gates(graph, &outcome, context).await;
132
133 self.emit(PipelineEvent::GoalGateCheck {
134 node_id: node_id.clone(),
135 satisfied: goal_satisfied,
136 })
137 .await;
138
139 if goal_satisfied {
140 let total_ms = run_start.elapsed().as_millis() as u64;
141 self.emit(PipelineEvent::PipelineCompleted {
142 status: StageStatus::Success,
143 total_duration_ms: total_ms,
144 nodes_executed,
145 })
146 .await;
147 return Ok(StageStatus::Success);
148 } else if let Some(ref retry_target) = node.retry_target {
149 if let Some(&target_idx) = graph.node_index.get(retry_target) {
151 current_idx = target_idx;
152 continue;
153 }
154 }
155 let total_ms = run_start.elapsed().as_millis() as u64;
157 self.emit(PipelineEvent::PipelineCompleted {
158 status: outcome.status.clone(),
159 total_duration_ms: total_ms,
160 nodes_executed,
161 })
162 .await;
163 return Ok(outcome.status);
164 }
165
166 if !outcome.status.is_success() && outcome.status != StageStatus::Skipped {
168 let policy = RetryPolicy::from_max_retries(node.max_retries);
169 let attempt = cp.increment_retry(&node_id);
170
171 if policy.should_retry(attempt) {
172 let delay = policy.delay_for_attempt(attempt);
173 self.emit(PipelineEvent::RetryScheduled {
174 node_id: node_id.clone(),
175 attempt,
176 max_retries: node.max_retries,
177 delay_ms: delay.as_millis() as u64,
178 })
179 .await;
180
181 tokio::time::sleep(delay).await;
182 continue;
184 }
185
186 if let Some(ref target) = node.retry_target {
188 if let Some(&target_idx) = graph.node_index.get(target) {
189 current_idx = target_idx;
190 continue;
191 }
192 }
193 if let Some(ref fallback) = node.fallback_retry_target {
194 if let Some(&target_idx) = graph.node_index.get(fallback) {
195 current_idx = target_idx;
196 continue;
197 }
198 }
199
200 let total_ms = run_start.elapsed().as_millis() as u64;
202 self.emit(PipelineEvent::PipelineCompleted {
203 status: StageStatus::Failure,
204 total_duration_ms: total_ms,
205 nodes_executed,
206 })
207 .await;
208 return Ok(StageStatus::Failure);
209 }
210
211 let next_idx = self
213 .select_next_edge(graph, current_idx, &outcome, context)
214 .await?;
215
216 match next_idx {
217 Some(idx) => current_idx = idx,
218 None => {
219 let total_ms = run_start.elapsed().as_millis() as u64;
221 self.emit(PipelineEvent::PipelineCompleted {
222 status: outcome.status,
223 total_duration_ms: total_ms,
224 nodes_executed,
225 })
226 .await;
227 return Ok(StageStatus::Success);
228 }
229 }
230 }
231 }
232
233 async fn execute_node(
235 &self,
236 node: &PipelineNode,
237 context: &PipelineContext,
238 graph: &PipelineGraph,
239 run_dir: &RunDirectory,
240 ) -> Result<Outcome> {
241 let handler = self.handler_registry.get(&node.handler_type);
242 handler
243 .execute(node, context, graph, run_dir)
244 .await
245 .context(format!("Handler '{}' failed for node '{}'", node.handler_type, node.id))
246 }
247
248 async fn select_next_edge(
250 &self,
251 graph: &PipelineGraph,
252 current: NodeIndex,
253 outcome: &Outcome,
254 context: &PipelineContext,
255 ) -> Result<Option<NodeIndex>> {
256 let edges = graph.outgoing_edges(current);
257 if edges.is_empty() {
258 return Ok(None);
259 }
260
261 let current_id = &graph.graph[current].id;
262 let ctx_snapshot = context.snapshot().await;
263
264 let mut condition_matches: Vec<(NodeIndex, &str)> = Vec::new();
266 for (target, edge) in &edges {
267 if !edge.condition.is_empty() {
268 let cond = parse_condition(&edge.condition);
269 if evaluate_condition(&cond, outcome, &ctx_snapshot) {
270 condition_matches.push((*target, &edge.label));
271 }
272 }
273 }
274 if condition_matches.len() == 1 {
275 let (target, label) = condition_matches[0];
276 self.emit(PipelineEvent::edge_selected(
277 current_id,
278 &graph.graph[target].id,
279 label,
280 1,
281 ))
282 .await;
283 return Ok(Some(target));
284 }
285
286 if let Some(ref preferred) = outcome.preferred_label {
288 let normalized = normalize_label(preferred);
289 for (target, edge) in &edges {
290 if normalize_label(&edge.label) == normalized {
291 self.emit(PipelineEvent::edge_selected(
292 current_id,
293 &graph.graph[*target].id,
294 &edge.label,
295 2,
296 ))
297 .await;
298 return Ok(Some(*target));
299 }
300 }
301 }
302
303 for suggested in &outcome.suggested_next {
305 if let Some(&target_idx) = graph.node_index.get(suggested) {
306 for (target, edge) in &edges {
308 if *target == target_idx {
309 self.emit(PipelineEvent::edge_selected(
310 current_id,
311 suggested,
312 &edge.label,
313 3,
314 ))
315 .await;
316 return Ok(Some(target_idx));
317 }
318 }
319 }
320 }
321
322 let unconditional: Vec<_> = edges
324 .iter()
325 .filter(|(_, e)| e.condition.is_empty())
326 .collect();
327
328 if !unconditional.is_empty() {
329 let max_weight = unconditional.iter().map(|(_, e)| e.weight).max().unwrap();
330 let heaviest: Vec<_> = unconditional
331 .iter()
332 .filter(|(_, e)| e.weight == max_weight)
333 .collect();
334
335 if heaviest.len() == 1 {
336 let (target, edge) = heaviest[0];
337 self.emit(PipelineEvent::edge_selected(
338 current_id,
339 &graph.graph[*target].id,
340 &edge.label,
341 4,
342 ))
343 .await;
344 return Ok(Some(*target));
345 }
346
347 let mut candidates: Vec<_> = heaviest
349 .iter()
350 .map(|(target, edge)| {
351 let t = *target;
352 (t, graph.graph[t].id.clone(), edge.label.clone())
353 })
354 .collect();
355 candidates.sort_by(|a, b| a.1.cmp(&b.1));
356
357 let (target, ref id, ref label) = candidates[0];
358 self.emit(PipelineEvent::edge_selected(current_id, id, label, 5))
359 .await;
360 return Ok(Some(target));
361 }
362
363 Ok(None)
365 }
366
367 async fn check_goal_gates(
369 &self,
370 _graph: &PipelineGraph,
371 _outcome: &Outcome,
372 context: &PipelineContext,
373 ) -> bool {
374 if let Some(val) = context.get("goal_satisfied").await {
377 if let Some(b) = val.as_bool() {
378 return b;
379 }
380 if let Some(s) = val.as_str() {
381 return s == "true";
382 }
383 }
384 true
386 }
387
388 async fn emit(&self, event: PipelineEvent) {
389 if let Some(ref tx) = self.event_tx {
390 let _ = tx.send(event).await;
391 }
392 }
393}
394
395fn normalize_label(label: &str) -> String {
397 let s = label.trim().to_lowercase();
398 if s.starts_with('[') {
400 if let Some(pos) = s.find(']') {
401 return s[pos + 1..].trim().to_string();
402 }
403 }
404 s
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 #[test]
412 fn test_normalize_label() {
413 assert_eq!(normalize_label("Success"), "success");
414 assert_eq!(normalize_label(" Approve "), "approve");
415 assert_eq!(normalize_label("[A] Approve"), "approve");
416 assert_eq!(normalize_label("[K] Keep Going"), "keep going");
417 }
418}