1use anyhow::{Context, Result};
13use petgraph::graph::NodeIndex;
14use std::time::Instant;
15use tokio::sync::mpsc;
16
17use super::checkpoint::Checkpoint;
18use super::conditions::{evaluate_condition, parse_condition};
19use super::context::{Context as PipelineContext, ContextSnapshot};
20use super::events::PipelineEvent;
21use super::graph::{PipelineGraph, PipelineNode};
22use super::handlers::HandlerRegistry;
23use super::interviewer::Interviewer;
24use super::outcome::{Outcome, StageStatus};
25use super::retry::RetryPolicy;
26use super::run_directory::RunDirectory;
27
28pub struct PipelineRunner {
30 pub handler_registry: HandlerRegistry,
31 pub interviewer: Box<dyn Interviewer>,
32 pub event_tx: Option<mpsc::Sender<PipelineEvent>>,
33}
34
35impl PipelineRunner {
36 pub fn new(handler_registry: HandlerRegistry, interviewer: Box<dyn Interviewer>) -> Self {
38 Self {
39 handler_registry,
40 interviewer,
41 event_tx: None,
42 }
43 }
44
45 pub fn with_events(mut self, tx: mpsc::Sender<PipelineEvent>) -> Self {
47 self.event_tx = Some(tx);
48 self
49 }
50
51 pub async fn run(
53 &self,
54 graph: &PipelineGraph,
55 context: &PipelineContext,
56 run_dir: &RunDirectory,
57 checkpoint: Option<Checkpoint>,
58 ) -> Result<StageStatus> {
59 let run_start = Instant::now();
60 let run_id = run_dir
61 .root()
62 .file_name()
63 .map(|f| f.to_string_lossy().to_string())
64 .unwrap_or_default();
65
66 self.emit(PipelineEvent::pipeline_started(&graph.name, &run_id))
67 .await;
68
69 let (mut current_idx, mut cp) = if let Some(cp) = checkpoint {
71 let idx = *graph
73 .node_index
74 .get(&cp.current_node)
75 .context("Checkpoint node not found in graph")?;
76 (idx, cp)
77 } else {
78 let snap = ContextSnapshot::from(context.snapshot().await);
79 let cp = Checkpoint::new(&graph.graph[graph.start_node].id, snap);
80 (graph.start_node, cp)
81 };
82
83 let mut nodes_executed = 0;
84
85 loop {
86 let node = &graph.graph[current_idx];
87 let node_id = node.id.clone();
88 let node_start = Instant::now();
89
90 self.emit(PipelineEvent::node_started(&node_id, &node.handler_type))
91 .await;
92
93 let outcome = self.execute_node(node, context, graph, run_dir).await?;
95
96 let duration_ms = node_start.elapsed().as_millis() as u64;
97 self.emit(PipelineEvent::node_completed(
98 &node_id,
99 outcome.status.clone(),
100 duration_ms,
101 ))
102 .await;
103
104 nodes_executed += 1;
105
106 if !outcome.context_updates.is_empty() {
108 context.apply_updates(&outcome.context_updates).await;
109 }
110
111 cp.current_node = node_id.clone();
113 cp.mark_completed(&node_id, outcome.status.clone());
114 cp.context = ContextSnapshot::from(context.snapshot().await);
115 cp.save(&run_dir.checkpoint_path())?;
116
117 self.emit(PipelineEvent::CheckpointSaved {
118 node_id: node_id.clone(),
119 })
120 .await;
121
122 if node.handler_type == "exit" {
124 let goal_satisfied = self.check_goal_gates(graph, &outcome, context).await;
126
127 self.emit(PipelineEvent::GoalGateCheck {
128 node_id: node_id.clone(),
129 satisfied: goal_satisfied,
130 })
131 .await;
132
133 if goal_satisfied {
134 let total_ms = run_start.elapsed().as_millis() as u64;
135 self.emit(PipelineEvent::PipelineCompleted {
136 status: StageStatus::Success,
137 total_duration_ms: total_ms,
138 nodes_executed,
139 })
140 .await;
141 return Ok(StageStatus::Success);
142 } else if let Some(ref retry_target) = node.retry_target {
143 if let Some(&target_idx) = graph.node_index.get(retry_target) {
145 current_idx = target_idx;
146 continue;
147 }
148 }
149 let total_ms = run_start.elapsed().as_millis() as u64;
151 self.emit(PipelineEvent::PipelineCompleted {
152 status: outcome.status.clone(),
153 total_duration_ms: total_ms,
154 nodes_executed,
155 })
156 .await;
157 return Ok(outcome.status);
158 }
159
160 if !outcome.status.is_success() && outcome.status != StageStatus::Skipped {
162 let policy = RetryPolicy::from_max_retries(node.max_retries);
163 let attempt = cp.increment_retry(&node_id);
164
165 if policy.should_retry(attempt) {
166 let delay = policy.delay_for_attempt(attempt);
167 self.emit(PipelineEvent::RetryScheduled {
168 node_id: node_id.clone(),
169 attempt,
170 max_retries: node.max_retries,
171 delay_ms: delay.as_millis() as u64,
172 })
173 .await;
174
175 tokio::time::sleep(delay).await;
176 continue;
178 }
179
180 if let Some(ref target) = node.retry_target {
182 if let Some(&target_idx) = graph.node_index.get(target) {
183 current_idx = target_idx;
184 continue;
185 }
186 }
187 if let Some(ref fallback) = node.fallback_retry_target {
188 if let Some(&target_idx) = graph.node_index.get(fallback) {
189 current_idx = target_idx;
190 continue;
191 }
192 }
193
194 let total_ms = run_start.elapsed().as_millis() as u64;
196 self.emit(PipelineEvent::PipelineCompleted {
197 status: StageStatus::Failure,
198 total_duration_ms: total_ms,
199 nodes_executed,
200 })
201 .await;
202 return Ok(StageStatus::Failure);
203 }
204
205 let next_idx = self
207 .select_next_edge(graph, current_idx, &outcome, context)
208 .await?;
209
210 match next_idx {
211 Some(idx) => current_idx = idx,
212 None => {
213 let total_ms = run_start.elapsed().as_millis() as u64;
215 self.emit(PipelineEvent::PipelineCompleted {
216 status: outcome.status,
217 total_duration_ms: total_ms,
218 nodes_executed,
219 })
220 .await;
221 return Ok(StageStatus::Success);
222 }
223 }
224 }
225 }
226
227 async fn execute_node(
233 &self,
234 node: &PipelineNode,
235 context: &PipelineContext,
236 graph: &PipelineGraph,
237 run_dir: &RunDirectory,
238 ) -> Result<Outcome> {
239 if let Some(weave_event_attr) = node.extra_attrs.get("weave_event") {
241 let weave_event_json = weave_event_attr.as_str();
242 tracing::info!(
243 node_id = %node.id,
244 weave_event = %weave_event_json,
245 "Weave event gate detected on node"
246 );
247
248 match serde_json::from_str::<serde_json::Value>(&weave_event_json) {
250 Ok(event) => {
251 context
253 .set(
254 format!("{}.weave_event", node.id),
255 event,
256 )
257 .await;
258
259 }
265 Err(e) => {
266 tracing::warn!(
267 node_id = %node.id,
268 error = %e,
269 "Invalid weave_event JSON on node, skipping gate check"
270 );
271 }
272 }
273 }
274
275 let handler = self.handler_registry.get(&node.handler_type);
276 handler
277 .execute(node, context, graph, run_dir)
278 .await
279 .context(format!(
280 "Handler '{}' failed for node '{}'",
281 node.handler_type, node.id
282 ))
283 }
284
285 async fn select_next_edge(
287 &self,
288 graph: &PipelineGraph,
289 current: NodeIndex,
290 outcome: &Outcome,
291 context: &PipelineContext,
292 ) -> Result<Option<NodeIndex>> {
293 let edges = graph.outgoing_edges(current);
294 if edges.is_empty() {
295 return Ok(None);
296 }
297
298 let current_id = &graph.graph[current].id;
299 let ctx_snapshot = context.snapshot().await;
300
301 let mut condition_matches: Vec<(NodeIndex, &str)> = Vec::new();
303 for (target, edge) in &edges {
304 if !edge.condition.is_empty() {
305 let cond = parse_condition(&edge.condition);
306 if evaluate_condition(&cond, outcome, &ctx_snapshot) {
307 condition_matches.push((*target, &edge.label));
308 }
309 }
310 }
311 if condition_matches.len() == 1 {
312 let (target, label) = condition_matches[0];
313 self.emit(PipelineEvent::edge_selected(
314 current_id,
315 &graph.graph[target].id,
316 label,
317 1,
318 ))
319 .await;
320 return Ok(Some(target));
321 }
322
323 if let Some(ref preferred) = outcome.preferred_label {
325 let normalized = normalize_label(preferred);
326 for (target, edge) in &edges {
327 if normalize_label(&edge.label) == normalized {
328 self.emit(PipelineEvent::edge_selected(
329 current_id,
330 &graph.graph[*target].id,
331 &edge.label,
332 2,
333 ))
334 .await;
335 return Ok(Some(*target));
336 }
337 }
338 }
339
340 for suggested in &outcome.suggested_next {
342 if let Some(&target_idx) = graph.node_index.get(suggested) {
343 for (target, edge) in &edges {
345 if *target == target_idx {
346 self.emit(PipelineEvent::edge_selected(
347 current_id,
348 suggested,
349 &edge.label,
350 3,
351 ))
352 .await;
353 return Ok(Some(target_idx));
354 }
355 }
356 }
357 }
358
359 let unconditional: Vec<_> = edges
361 .iter()
362 .filter(|(_, e)| e.condition.is_empty())
363 .collect();
364
365 if !unconditional.is_empty() {
366 let max_weight = unconditional.iter().map(|(_, e)| e.weight).max().unwrap();
367 let heaviest: Vec<_> = unconditional
368 .iter()
369 .filter(|(_, e)| e.weight == max_weight)
370 .collect();
371
372 if heaviest.len() == 1 {
373 let (target, edge) = heaviest[0];
374 self.emit(PipelineEvent::edge_selected(
375 current_id,
376 &graph.graph[*target].id,
377 &edge.label,
378 4,
379 ))
380 .await;
381 return Ok(Some(*target));
382 }
383
384 let mut candidates: Vec<_> = heaviest
386 .iter()
387 .map(|(target, edge)| {
388 let t = *target;
389 (t, graph.graph[t].id.clone(), edge.label.clone())
390 })
391 .collect();
392 candidates.sort_by(|a, b| a.1.cmp(&b.1));
393
394 let (target, ref id, ref label) = candidates[0];
395 self.emit(PipelineEvent::edge_selected(current_id, id, label, 5))
396 .await;
397 return Ok(Some(target));
398 }
399
400 Ok(None)
402 }
403
404 async fn check_goal_gates(
406 &self,
407 _graph: &PipelineGraph,
408 _outcome: &Outcome,
409 context: &PipelineContext,
410 ) -> bool {
411 if let Some(val) = context.get("goal_satisfied").await {
414 if let Some(b) = val.as_bool() {
415 return b;
416 }
417 if let Some(s) = val.as_str() {
418 return s == "true";
419 }
420 }
421 true
423 }
424
425 async fn emit(&self, event: PipelineEvent) {
426 if let Some(ref tx) = self.event_tx {
427 let _ = tx.send(event).await;
428 }
429 }
430}
431
432fn normalize_label(label: &str) -> String {
434 let s = label.trim().to_lowercase();
435 if s.starts_with('[') {
437 if let Some(pos) = s.find(']') {
438 return s[pos + 1..].trim().to_string();
439 }
440 }
441 s
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn test_normalize_label() {
450 assert_eq!(normalize_label("Success"), "success");
451 assert_eq!(normalize_label(" Approve "), "approve");
452 assert_eq!(normalize_label("[A] Approve"), "approve");
453 assert_eq!(normalize_label("[K] Keep Going"), "keep going");
454 }
455}