1use anyhow::{Context, Result};
13use petgraph::graph::NodeIndex;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17use super::checkpoint::Checkpoint;
18use super::conditions::{evaluate_condition, parse_condition};
19use super::context::{Context as PipelineContext, ContextSnapshot};
20use super::events::PipelineEvent;
21use super::graph::{PipelineGraph, PipelineNode};
22use super::handlers::HandlerRegistry;
23use super::interviewer::Interviewer;
24use super::outcome::{Outcome, StageStatus};
25use super::retry::RetryPolicy;
26use super::run_directory::RunDirectory;
27
28pub struct PipelineRunner {
30 pub handler_registry: HandlerRegistry,
31 pub interviewer: Box<dyn Interviewer>,
32 pub event_tx: Option<mpsc::Sender<PipelineEvent>>,
33}
34
35impl PipelineRunner {
36 pub fn new(handler_registry: HandlerRegistry, interviewer: Box<dyn Interviewer>) -> Self {
38 Self {
39 handler_registry,
40 interviewer,
41 event_tx: None,
42 }
43 }
44
45 pub fn with_events(mut self, tx: mpsc::Sender<PipelineEvent>) -> Self {
47 self.event_tx = Some(tx);
48 self
49 }
50
51 pub async fn run(
53 &self,
54 graph: &PipelineGraph,
55 context: &PipelineContext,
56 run_dir: &RunDirectory,
57 checkpoint: Option<Checkpoint>,
58 ) -> Result<StageStatus> {
59 let run_start = Instant::now();
60 let run_id = run_dir
61 .root()
62 .file_name()
63 .map(|f| f.to_string_lossy().to_string())
64 .unwrap_or_default();
65
66 self.emit(PipelineEvent::pipeline_started(&graph.name, &run_id))
67 .await;
68
69 let (mut current_idx, mut cp) = if let Some(cp) = checkpoint {
71 let idx = *graph
73 .node_index
74 .get(&cp.current_node)
75 .context("Checkpoint node not found in graph")?;
76 (idx, cp)
77 } else {
78 let snap = ContextSnapshot::from(context.snapshot().await);
79 let cp = Checkpoint::new(&graph.graph[graph.start_node].id, snap);
80 (graph.start_node, cp)
81 };
82
83 let mut nodes_executed = 0;
84
85 loop {
86 let node = &graph.graph[current_idx];
87 let node_id = node.id.clone();
88 let node_start = Instant::now();
89
90 self.emit(PipelineEvent::node_started(&node_id, &node.handler_type))
91 .await;
92
93 let outcome = self.execute_node(node, context, graph, run_dir).await?;
95
96 let duration_ms = node_start.elapsed().as_millis() as u64;
97 self.emit(PipelineEvent::node_completed(
98 &node_id,
99 outcome.status.clone(),
100 duration_ms,
101 ))
102 .await;
103
104 nodes_executed += 1;
105
106 if !outcome.context_updates.is_empty() {
108 context.apply_updates(&outcome.context_updates).await;
109 }
110
111 cp.current_node = node_id.clone();
113 cp.mark_completed(&node_id, outcome.status.clone());
114 cp.context = ContextSnapshot::from(context.snapshot().await);
115 cp.save(&run_dir.checkpoint_path())?;
116
117 self.emit(PipelineEvent::CheckpointSaved {
118 node_id: node_id.clone(),
119 })
120 .await;
121
122 if node.handler_type == "exit" {
124 let goal_satisfied = self.check_goal_gates(graph, &outcome, context).await;
126
127 self.emit(PipelineEvent::GoalGateCheck {
128 node_id: node_id.clone(),
129 satisfied: goal_satisfied,
130 })
131 .await;
132
133 if goal_satisfied {
134 let total_ms = run_start.elapsed().as_millis() as u64;
135 self.emit(PipelineEvent::PipelineCompleted {
136 status: StageStatus::Success,
137 total_duration_ms: total_ms,
138 nodes_executed,
139 })
140 .await;
141 return Ok(StageStatus::Success);
142 } else if let Some(ref retry_target) = node.retry_target {
143 if let Some(&target_idx) = graph.node_index.get(retry_target) {
145 current_idx = target_idx;
146 continue;
147 }
148 }
149 let total_ms = run_start.elapsed().as_millis() as u64;
151 self.emit(PipelineEvent::PipelineCompleted {
152 status: outcome.status.clone(),
153 total_duration_ms: total_ms,
154 nodes_executed,
155 })
156 .await;
157 return Ok(outcome.status);
158 }
159
160 if !outcome.status.is_success() && outcome.status != StageStatus::Skipped {
162 let policy = RetryPolicy::from_max_retries(node.max_retries);
163 let attempt = cp.increment_retry(&node_id);
164
165 if policy.should_retry(attempt) {
166 let delay = policy.delay_for_attempt(attempt);
167 self.emit(PipelineEvent::RetryScheduled {
168 node_id: node_id.clone(),
169 attempt,
170 max_retries: node.max_retries,
171 delay_ms: delay.as_millis() as u64,
172 })
173 .await;
174
175 tokio::time::sleep(delay).await;
176 continue;
178 }
179
180 if let Some(ref target) = node.retry_target {
182 if let Some(&target_idx) = graph.node_index.get(target) {
183 current_idx = target_idx;
184 continue;
185 }
186 }
187 if let Some(ref fallback) = node.fallback_retry_target {
188 if let Some(&target_idx) = graph.node_index.get(fallback) {
189 current_idx = target_idx;
190 continue;
191 }
192 }
193
194 let total_ms = run_start.elapsed().as_millis() as u64;
196 self.emit(PipelineEvent::PipelineCompleted {
197 status: StageStatus::Failure,
198 total_duration_ms: total_ms,
199 nodes_executed,
200 })
201 .await;
202 return Ok(StageStatus::Failure);
203 }
204
205 let next_idx = self
207 .select_next_edge(graph, current_idx, &outcome, context)
208 .await?;
209
210 match next_idx {
211 Some(idx) => current_idx = idx,
212 None => {
213 let total_ms = run_start.elapsed().as_millis() as u64;
215 self.emit(PipelineEvent::PipelineCompleted {
216 status: outcome.status,
217 total_duration_ms: total_ms,
218 nodes_executed,
219 })
220 .await;
221 return Ok(StageStatus::Success);
222 }
223 }
224 }
225 }
226
227 async fn execute_node(
229 &self,
230 node: &PipelineNode,
231 context: &PipelineContext,
232 graph: &PipelineGraph,
233 run_dir: &RunDirectory,
234 ) -> Result<Outcome> {
235 let handler = self.handler_registry.get(&node.handler_type);
236 handler
237 .execute(node, context, graph, run_dir)
238 .await
239 .context(format!(
240 "Handler '{}' failed for node '{}'",
241 node.handler_type, node.id
242 ))
243 }
244
245 async fn select_next_edge(
247 &self,
248 graph: &PipelineGraph,
249 current: NodeIndex,
250 outcome: &Outcome,
251 context: &PipelineContext,
252 ) -> Result<Option<NodeIndex>> {
253 let edges = graph.outgoing_edges(current);
254 if edges.is_empty() {
255 return Ok(None);
256 }
257
258 let current_id = &graph.graph[current].id;
259 let ctx_snapshot = context.snapshot().await;
260
261 let mut condition_matches: Vec<(NodeIndex, &str)> = Vec::new();
263 for (target, edge) in &edges {
264 if !edge.condition.is_empty() {
265 let cond = parse_condition(&edge.condition);
266 if evaluate_condition(&cond, outcome, &ctx_snapshot) {
267 condition_matches.push((*target, &edge.label));
268 }
269 }
270 }
271 if condition_matches.len() == 1 {
272 let (target, label) = condition_matches[0];
273 self.emit(PipelineEvent::edge_selected(
274 current_id,
275 &graph.graph[target].id,
276 label,
277 1,
278 ))
279 .await;
280 return Ok(Some(target));
281 }
282
283 if let Some(ref preferred) = outcome.preferred_label {
285 let normalized = normalize_label(preferred);
286 for (target, edge) in &edges {
287 if normalize_label(&edge.label) == normalized {
288 self.emit(PipelineEvent::edge_selected(
289 current_id,
290 &graph.graph[*target].id,
291 &edge.label,
292 2,
293 ))
294 .await;
295 return Ok(Some(*target));
296 }
297 }
298 }
299
300 for suggested in &outcome.suggested_next {
302 if let Some(&target_idx) = graph.node_index.get(suggested) {
303 for (target, edge) in &edges {
305 if *target == target_idx {
306 self.emit(PipelineEvent::edge_selected(
307 current_id,
308 suggested,
309 &edge.label,
310 3,
311 ))
312 .await;
313 return Ok(Some(target_idx));
314 }
315 }
316 }
317 }
318
319 let unconditional: Vec<_> = edges
321 .iter()
322 .filter(|(_, e)| e.condition.is_empty())
323 .collect();
324
325 if !unconditional.is_empty() {
326 let max_weight = unconditional.iter().map(|(_, e)| e.weight).max().unwrap();
327 let heaviest: Vec<_> = unconditional
328 .iter()
329 .filter(|(_, e)| e.weight == max_weight)
330 .collect();
331
332 if heaviest.len() == 1 {
333 let (target, edge) = heaviest[0];
334 self.emit(PipelineEvent::edge_selected(
335 current_id,
336 &graph.graph[*target].id,
337 &edge.label,
338 4,
339 ))
340 .await;
341 return Ok(Some(*target));
342 }
343
344 let mut candidates: Vec<_> = heaviest
346 .iter()
347 .map(|(target, edge)| {
348 let t = *target;
349 (t, graph.graph[t].id.clone(), edge.label.clone())
350 })
351 .collect();
352 candidates.sort_by(|a, b| a.1.cmp(&b.1));
353
354 let (target, ref id, ref label) = candidates[0];
355 self.emit(PipelineEvent::edge_selected(current_id, id, label, 5))
356 .await;
357 return Ok(Some(target));
358 }
359
360 Ok(None)
362 }
363
364 async fn check_goal_gates(
366 &self,
367 _graph: &PipelineGraph,
368 _outcome: &Outcome,
369 context: &PipelineContext,
370 ) -> bool {
371 if let Some(val) = context.get("goal_satisfied").await {
374 if let Some(b) = val.as_bool() {
375 return b;
376 }
377 if let Some(s) = val.as_str() {
378 return s == "true";
379 }
380 }
381 true
383 }
384
385 async fn emit(&self, event: PipelineEvent) {
386 if let Some(ref tx) = self.event_tx {
387 let _ = tx.send(event).await;
388 }
389 }
390}
391
392fn normalize_label(label: &str) -> String {
394 let s = label.trim().to_lowercase();
395 if s.starts_with('[') {
397 if let Some(pos) = s.find(']') {
398 return s[pos + 1..].trim().to_string();
399 }
400 }
401 s
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 #[test]
409 fn test_normalize_label() {
410 assert_eq!(normalize_label("Success"), "success");
411 assert_eq!(normalize_label(" Approve "), "approve");
412 assert_eq!(normalize_label("[A] Approve"), "approve");
413 assert_eq!(normalize_label("[K] Keep Going"), "keep going");
414 }
415}