1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use crate::config::{OnErrorPolicy, PipelineConfig, PipelineStepConfig};
5use crate::envelope::MuninnEnvelopeV1;
6use async_trait::async_trait;
7use serde::Serialize;
8use tokio::time::timeout;
9
10mod codec;
11mod execution;
12mod transport;
13
14const MAX_STEP_STDOUT_BYTES: usize = 64 * 1024;
15const MAX_STEP_STDERR_BYTES: usize = 16 * 1024;
16const TRUNCATION_SUFFIX: &str = "\n[truncated]";
17
18#[derive(Clone)]
19pub struct PipelineRunner {
20 strict_step_contract: bool,
21 in_process_step_executor: Option<Arc<dyn InProcessStepExecutor>>,
22}
23
24#[async_trait]
25pub trait InProcessStepExecutor: Send + Sync {
26 async fn try_execute(
27 &self,
28 step: &PipelineStepConfig,
29 input: &MuninnEnvelopeV1,
30 ) -> Option<Result<MuninnEnvelopeV1, InProcessStepError>>;
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct InProcessStepError {
35 pub kind: StepFailureKind,
36 pub message: String,
37 pub stderr: String,
38 pub exit_status: Option<i32>,
39}
40
41impl Default for PipelineRunner {
42 fn default() -> Self {
43 Self {
44 strict_step_contract: true,
45 in_process_step_executor: None,
46 }
47 }
48}
49
50impl PipelineRunner {
51 pub fn new(strict_step_contract: bool) -> Self {
52 Self {
53 strict_step_contract,
54 in_process_step_executor: None,
55 }
56 }
57
58 pub fn with_in_process_step_executor(
59 strict_step_contract: bool,
60 in_process_step_executor: Arc<dyn InProcessStepExecutor>,
61 ) -> Self {
62 Self {
63 strict_step_contract,
64 in_process_step_executor: Some(in_process_step_executor),
65 }
66 }
67
68 pub async fn run(
69 &self,
70 envelope: MuninnEnvelopeV1,
71 config: &PipelineConfig,
72 ) -> PipelineOutcome {
73 let start = Instant::now();
74 let deadline = Duration::from_millis(config.deadline_ms);
75 let mut current_envelope = envelope;
76 let mut trace = Vec::with_capacity(config.steps.len());
77
78 for step in &config.steps {
79 let Some(remaining_budget) = remaining_budget(start, deadline) else {
80 return PipelineOutcome::FallbackRaw {
81 envelope: current_envelope,
82 trace,
83 reason: PipelineStopReason::GlobalDeadlineExceeded {
84 deadline_ms: config.deadline_ms,
85 step_id: Some(step.id.clone()),
86 },
87 };
88 };
89
90 let step_budget = Duration::from_millis(step.timeout_ms);
91 let effective_timeout = remaining_budget.min(step_budget);
92 let started = Instant::now();
93
94 match self
95 .run_step(step, current_envelope, effective_timeout)
96 .await
97 {
98 Ok(success) => {
99 trace.push(PipelineTraceEntry {
100 id: step.id.clone(),
101 duration_ms: elapsed_ms(started.elapsed()),
102 timed_out: false,
103 exit_status: Some(success.exit_status),
104 policy_applied: success.policy_applied,
105 stderr: success.stderr,
106 });
107 current_envelope = success.envelope;
108 }
109 Err(failure) => {
110 let StepFailure {
111 envelope,
112 kind,
113 timed_out,
114 exit_status,
115 stderr,
116 message,
117 } = failure;
118 let hit_global_deadline = timed_out && remaining_budget <= step_budget;
119 let mut trace_entry = PipelineTraceEntry {
120 id: step.id.clone(),
121 duration_ms: elapsed_ms(started.elapsed()),
122 timed_out,
123 exit_status,
124 policy_applied: PipelinePolicyApplied::None,
125 stderr: stderr.clone(),
126 };
127 current_envelope = envelope;
128
129 if hit_global_deadline {
130 trace_entry.policy_applied = PipelinePolicyApplied::GlobalDeadlineFallback;
131 trace.push(trace_entry);
132 return PipelineOutcome::FallbackRaw {
133 envelope: current_envelope,
134 trace,
135 reason: PipelineStopReason::GlobalDeadlineExceeded {
136 deadline_ms: config.deadline_ms,
137 step_id: Some(step.id.clone()),
138 },
139 };
140 }
141
142 let reason = PipelineStopReason::StepFailed {
143 step_id: step.id.clone(),
144 failure: kind,
145 message,
146 };
147
148 match step.on_error {
149 OnErrorPolicy::Continue => {
150 trace_entry.policy_applied = PipelinePolicyApplied::Continue;
151 trace.push(trace_entry);
152 }
153 OnErrorPolicy::FallbackRaw => {
154 trace_entry.policy_applied = PipelinePolicyApplied::FallbackRaw;
155 trace.push(trace_entry);
156 return PipelineOutcome::FallbackRaw {
157 envelope: current_envelope,
158 trace,
159 reason,
160 };
161 }
162 OnErrorPolicy::Abort => {
163 trace_entry.policy_applied = PipelinePolicyApplied::Abort;
164 trace.push(trace_entry);
165 return PipelineOutcome::Aborted { trace, reason };
166 }
167 }
168 }
169 }
170 }
171
172 PipelineOutcome::Completed {
173 envelope: current_envelope,
174 trace,
175 }
176 }
177
178 async fn run_step(
179 &self,
180 step: &PipelineStepConfig,
181 input_envelope: MuninnEnvelopeV1,
182 timeout_budget: Duration,
183 ) -> Result<StepSuccess, StepFailure> {
184 if let Some(executor) = &self.in_process_step_executor {
185 match self
186 .run_in_process_step(step, &input_envelope, timeout_budget, executor)
187 .await
188 {
189 Some(Ok(success)) => return Ok(success),
190 Some(Err(failure)) => {
191 return Err(StepFailure {
192 kind: failure.kind,
193 envelope: input_envelope,
194 timed_out: failure.timed_out,
195 exit_status: failure.exit_status,
196 stderr: failure.stderr,
197 message: failure.message,
198 });
199 }
200 None => {}
201 }
202 }
203
204 execution::run_external_step(
205 step,
206 input_envelope,
207 timeout_budget,
208 self.strict_step_contract,
209 MAX_STEP_STDOUT_BYTES,
210 MAX_STEP_STDERR_BYTES,
211 TRUNCATION_SUFFIX,
212 )
213 .await
214 }
215
216 async fn run_in_process_step(
217 &self,
218 step: &PipelineStepConfig,
219 input_envelope: &MuninnEnvelopeV1,
220 timeout_budget: Duration,
221 executor: &Arc<dyn InProcessStepExecutor>,
222 ) -> Option<Result<StepSuccess, InProcessStepFailure>> {
223 match timeout(timeout_budget, executor.try_execute(step, input_envelope)).await {
224 Ok(Some(Ok(envelope))) => Some(Ok(StepSuccess {
225 envelope,
226 exit_status: 0,
227 stderr: String::new(),
228 policy_applied: PipelinePolicyApplied::None,
229 })),
230 Ok(Some(Err(error))) => Some(Err(InProcessStepFailure {
231 kind: error.kind,
232 timed_out: false,
233 exit_status: error.exit_status,
234 stderr: error.stderr,
235 message: error.message,
236 })),
237 Ok(None) => None,
238 Err(_) => Some(Err(InProcessStepFailure {
239 kind: StepFailureKind::Timeout,
240 timed_out: true,
241 exit_status: None,
242 stderr: String::new(),
243 message: format!(
244 "step exceeded timeout budget ({}ms)",
245 timeout_budget.as_millis()
246 ),
247 })),
248 }
249 }
250}
251
252#[derive(Debug, Clone, PartialEq, Serialize)]
253pub enum PipelineOutcome {
254 Completed {
255 envelope: MuninnEnvelopeV1,
256 trace: Vec<PipelineTraceEntry>,
257 },
258 FallbackRaw {
259 envelope: MuninnEnvelopeV1,
260 trace: Vec<PipelineTraceEntry>,
261 reason: PipelineStopReason,
262 },
263 Aborted {
264 trace: Vec<PipelineTraceEntry>,
265 reason: PipelineStopReason,
266 },
267}
268
269#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
270pub enum PipelineStopReason {
271 GlobalDeadlineExceeded {
272 deadline_ms: u64,
273 step_id: Option<String>,
274 },
275 StepFailed {
276 step_id: String,
277 failure: StepFailureKind,
278 message: String,
279 },
280}
281
282#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
283pub struct PipelineTraceEntry {
284 pub id: String,
285 pub duration_ms: u64,
286 pub timed_out: bool,
287 pub exit_status: Option<i32>,
288 pub policy_applied: PipelinePolicyApplied,
289 pub stderr: String,
290}
291
292#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
293pub enum PipelinePolicyApplied {
294 None,
295 ContractBypass,
296 Continue,
297 FallbackRaw,
298 Abort,
299 GlobalDeadlineFallback,
300}
301
302#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
303pub enum StepFailureKind {
304 SerializeInput,
305 SpawnFailed,
306 IoFailed,
307 Timeout,
308 NonZeroExit,
309 InvalidStdout,
310 InvalidEnvelope,
311}
312
313#[derive(Debug)]
314struct StepSuccess {
315 envelope: MuninnEnvelopeV1,
316 exit_status: i32,
317 stderr: String,
318 policy_applied: PipelinePolicyApplied,
319}
320
321#[derive(Debug)]
322struct InProcessStepFailure {
323 kind: StepFailureKind,
324 timed_out: bool,
325 exit_status: Option<i32>,
326 stderr: String,
327 message: String,
328}
329
330#[derive(Debug)]
331struct StepFailure {
332 kind: StepFailureKind,
333 envelope: MuninnEnvelopeV1,
334 timed_out: bool,
335 exit_status: Option<i32>,
336 stderr: String,
337 message: String,
338}
339
340fn remaining_budget(start: Instant, deadline: Duration) -> Option<Duration> {
341 let elapsed = start.elapsed();
342 if elapsed >= deadline {
343 None
344 } else {
345 Some(deadline - elapsed)
346 }
347}
348
349fn elapsed_ms(duration: Duration) -> u64 {
350 duration
351 .as_millis()
352 .min(u128::from(u64::MAX))
353 .try_into()
354 .unwrap_or(u64::MAX)
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use crate::config::{PayloadFormat, PipelineStepConfig, StepIoMode};
361 use async_trait::async_trait;
362 use std::sync::{Arc, Mutex};
363
364 #[tokio::test]
365 async fn completes_when_steps_succeed() {
366 let runner = PipelineRunner::default();
367 let config = config_with_steps(
368 1_000,
369 vec![step("echo", "cat", &[], 500, OnErrorPolicy::Abort)],
370 );
371 let input = sample_envelope();
372
373 let outcome = runner.run(input.clone(), &config).await;
374
375 match outcome {
376 PipelineOutcome::Completed { envelope, trace } => {
377 assert_eq!(envelope, input);
378 assert_eq!(trace.len(), 1);
379 assert_eq!(trace[0].id, "echo");
380 assert_eq!(trace[0].exit_status, Some(0));
381 assert!(!trace[0].timed_out);
382 assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::None);
383 }
384 other => panic!("expected completed outcome, got {other:?}"),
385 }
386 }
387
388 #[tokio::test]
389 async fn external_steps_default_to_text_filter_on_transcript_raw_text() {
390 let runner = PipelineRunner::default();
391 let config = config_with_steps(
392 1_000,
393 vec![text_step(
394 "uppercase",
395 "/usr/bin/tr",
396 &["[:lower:]", "[:upper:]"],
397 500,
398 OnErrorPolicy::Abort,
399 )],
400 );
401
402 let outcome = runner.run(sample_envelope(), &config).await;
403
404 match outcome {
405 PipelineOutcome::Completed { envelope, trace } => {
406 assert_eq!(trace.len(), 1);
407 assert_eq!(trace[0].id, "uppercase");
408 assert_eq!(envelope.transcript.raw_text.as_deref(), Some("SHIP TO SF"));
409 assert!(envelope.output.final_text.is_none());
410 }
411 other => panic!("expected completed outcome, got {other:?}"),
412 }
413 }
414
415 #[tokio::test]
416 async fn text_filter_prefers_output_final_text_when_present() {
417 let runner = PipelineRunner::default();
418 let config = config_with_steps(
419 1_000,
420 vec![text_step(
421 "suffix",
422 "/bin/sh",
423 &["-c", "sed 's/$/!/'"],
424 500,
425 OnErrorPolicy::Abort,
426 )],
427 );
428 let input = sample_envelope().with_output_final_text("Ship to SF");
429
430 let outcome = runner.run(input, &config).await;
431
432 match outcome {
433 PipelineOutcome::Completed { envelope, .. } => {
434 assert_eq!(envelope.output.final_text.as_deref(), Some("Ship to SF!"));
435 assert_eq!(envelope.transcript.raw_text.as_deref(), Some("ship to sf"));
436 }
437 other => panic!("expected completed outcome, got {other:?}"),
438 }
439 }
440
441 #[tokio::test]
442 async fn continues_on_step_error_when_policy_continue() {
443 let runner = PipelineRunner::default();
444 let config = config_with_steps(
445 3_000,
446 vec![
447 step(
448 "fails",
449 "/bin/sh",
450 &["-c", "cat >/dev/null; echo fail-continue >&2; exit 7"],
451 1_000,
452 OnErrorPolicy::Continue,
453 ),
454 step("echo", "cat", &[], 500, OnErrorPolicy::Abort),
455 ],
456 );
457
458 let outcome = runner.run(sample_envelope(), &config).await;
459
460 match outcome {
461 PipelineOutcome::Completed { trace, .. } => {
462 assert_eq!(trace.len(), 2);
463 assert_eq!(trace[0].id, "fails");
464 assert_eq!(trace[0].exit_status, Some(7));
465 assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Continue);
466 assert!(trace[0].stderr.contains("fail-continue"));
467
468 assert_eq!(trace[1].id, "echo");
469 assert_eq!(trace[1].exit_status, Some(0));
470 assert_eq!(trace[1].policy_applied, PipelinePolicyApplied::None);
471 }
472 other => panic!("expected completed outcome, got {other:?}"),
473 }
474 }
475
476 #[tokio::test]
477 async fn returns_fallback_when_policy_fallback_raw() {
478 let runner = PipelineRunner::default();
479 let config = config_with_steps(
480 3_000,
481 vec![step(
482 "fails",
483 "/bin/sh",
484 &["-c", "cat >/dev/null; echo fail-fallback >&2; exit 9"],
485 1_000,
486 OnErrorPolicy::FallbackRaw,
487 )],
488 );
489 let input = sample_envelope();
490
491 let outcome = runner.run(input.clone(), &config).await;
492
493 match outcome {
494 PipelineOutcome::FallbackRaw {
495 envelope,
496 trace,
497 reason,
498 } => {
499 assert_eq!(envelope, input);
500 assert_eq!(trace.len(), 1);
501 assert_eq!(trace[0].id, "fails");
502 assert_eq!(trace[0].exit_status, Some(9));
503 assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::FallbackRaw);
504 assert_eq!(
505 reason,
506 PipelineStopReason::StepFailed {
507 step_id: "fails".to_string(),
508 failure: StepFailureKind::NonZeroExit,
509 message: "step exited non-zero with status 9".to_string(),
510 }
511 );
512 }
513 other => panic!("expected fallback outcome, got {other:?}"),
514 }
515 }
516
517 #[tokio::test]
518 async fn returns_aborted_when_policy_abort() {
519 let runner = PipelineRunner::default();
520 let config = config_with_steps(
521 3_000,
522 vec![step(
523 "fails",
524 "/bin/sh",
525 &["-c", "cat >/dev/null; echo fail-abort >&2; exit 11"],
526 1_000,
527 OnErrorPolicy::Abort,
528 )],
529 );
530
531 let outcome = runner.run(sample_envelope(), &config).await;
532
533 match outcome {
534 PipelineOutcome::Aborted { trace, reason } => {
535 assert_eq!(trace.len(), 1);
536 assert_eq!(trace[0].id, "fails");
537 assert_eq!(trace[0].exit_status, Some(11));
538 assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
539 assert_eq!(
540 reason,
541 PipelineStopReason::StepFailed {
542 step_id: "fails".to_string(),
543 failure: StepFailureKind::NonZeroExit,
544 message: "step exited non-zero with status 11".to_string(),
545 }
546 );
547 }
548 other => panic!("expected aborted outcome, got {other:?}"),
549 }
550 }
551
552 #[tokio::test]
553 async fn step_timeout_maps_to_policy() {
554 let runner = PipelineRunner::default();
555 let config = config_with_steps(
556 3_000,
557 vec![step(
558 "slow",
559 "/bin/sh",
560 &["-c", "sleep 1; cat"],
561 50,
562 OnErrorPolicy::Abort,
563 )],
564 );
565
566 let outcome = runner.run(sample_envelope(), &config).await;
567
568 match outcome {
569 PipelineOutcome::Aborted { trace, reason } => {
570 assert_eq!(trace.len(), 1);
571 assert_eq!(trace[0].id, "slow");
572 assert!(trace[0].timed_out);
573 assert_eq!(trace[0].exit_status, None);
574 assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
575 assert_eq!(
576 reason,
577 PipelineStopReason::StepFailed {
578 step_id: "slow".to_string(),
579 failure: StepFailureKind::Timeout,
580 message: "step exceeded timeout budget (50ms)".to_string(),
581 }
582 );
583 }
584 other => panic!("expected aborted outcome, got {other:?}"),
585 }
586 }
587
588 #[tokio::test]
589 async fn global_deadline_forces_fallback() {
590 let runner = PipelineRunner::default();
591 let config = config_with_steps(
592 60,
593 vec![step(
594 "slow",
595 "/bin/sh",
596 &["-c", "sleep 1; cat"],
597 1_000,
598 OnErrorPolicy::Abort,
599 )],
600 );
601 let input = sample_envelope();
602
603 let outcome = runner.run(input.clone(), &config).await;
604
605 match outcome {
606 PipelineOutcome::FallbackRaw {
607 envelope,
608 trace,
609 reason,
610 } => {
611 assert_eq!(envelope, input);
612 assert_eq!(trace.len(), 1);
613 assert_eq!(trace[0].id, "slow");
614 assert!(trace[0].timed_out);
615 assert_eq!(
616 trace[0].policy_applied,
617 PipelinePolicyApplied::GlobalDeadlineFallback
618 );
619 assert_eq!(
620 reason,
621 PipelineStopReason::GlobalDeadlineExceeded {
622 deadline_ms: 60,
623 step_id: Some("slow".to_string()),
624 }
625 );
626 }
627 other => panic!("expected fallback outcome, got {other:?}"),
628 }
629 }
630
631 #[tokio::test]
632 async fn strict_contract_rejects_non_object_stdout() {
633 let runner = PipelineRunner::default();
634 let config = config_with_steps(
635 1_000,
636 vec![step(
637 "bad-stdout",
638 "/bin/sh",
639 &["-c", "cat >/dev/null; echo not-json"],
640 1_000,
641 OnErrorPolicy::Abort,
642 )],
643 );
644
645 let outcome = runner.run(sample_envelope(), &config).await;
646
647 match outcome {
648 PipelineOutcome::Aborted { trace, reason } => {
649 assert_eq!(trace.len(), 1);
650 assert_eq!(trace[0].id, "bad-stdout");
651 assert_eq!(trace[0].exit_status, Some(0));
652 assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
653 match reason {
654 PipelineStopReason::StepFailed {
655 step_id,
656 failure,
657 message,
658 } => {
659 assert_eq!(step_id, "bad-stdout");
660 assert_eq!(failure, StepFailureKind::InvalidStdout);
661 assert!(message.starts_with("step stdout was not valid JSON:"));
662 }
663 other => panic!("unexpected reason: {other:?}"),
664 }
665 }
666 other => panic!("expected aborted outcome, got {other:?}"),
667 }
668 }
669
670 #[tokio::test]
671 async fn non_strict_contract_keeps_previous_envelope_on_bad_stdout() {
672 let runner = PipelineRunner::new(false);
673 let config = config_with_steps(
674 3_000,
675 vec![
676 step(
677 "bad-stdout",
678 "/bin/sh",
679 &["-c", "cat >/dev/null; echo not-json"],
680 1_000,
681 OnErrorPolicy::Abort,
682 ),
683 step("echo", "cat", &[], 500, OnErrorPolicy::Abort),
684 ],
685 );
686 let input = sample_envelope();
687
688 let outcome = runner.run(input.clone(), &config).await;
689
690 match outcome {
691 PipelineOutcome::Completed { envelope, trace } => {
692 assert_eq!(envelope, input);
693 assert_eq!(trace.len(), 2);
694 assert_eq!(trace[0].exit_status, Some(0));
695 assert_eq!(
696 trace[0].policy_applied,
697 PipelinePolicyApplied::ContractBypass
698 );
699 assert_eq!(trace[1].exit_status, Some(0));
700 }
701 other => panic!("expected completed outcome, got {other:?}"),
702 }
703 }
704
705 #[tokio::test]
706 async fn non_strict_contract_marks_non_object_json_as_contract_bypass() {
707 let runner = PipelineRunner::new(false);
708 let config = config_with_steps(
709 1_000,
710 vec![step(
711 "array-json",
712 "/bin/sh",
713 &["-c", "cat >/dev/null; echo '[1,2,3]'"],
714 1_000,
715 OnErrorPolicy::Abort,
716 )],
717 );
718 let input = sample_envelope();
719
720 let outcome = runner.run(input.clone(), &config).await;
721
722 match outcome {
723 PipelineOutcome::Completed { envelope, trace } => {
724 assert_eq!(envelope, input);
725 assert_eq!(trace.len(), 1);
726 assert_eq!(
727 trace[0].policy_applied,
728 PipelinePolicyApplied::ContractBypass
729 );
730 }
731 other => panic!("expected completed outcome, got {other:?}"),
732 }
733 }
734
735 #[tokio::test]
736 async fn non_strict_contract_marks_invalid_envelope_json_as_contract_bypass() {
737 let runner = PipelineRunner::new(false);
738 let config = config_with_steps(
739 1_000,
740 vec![step(
741 "bad-envelope",
742 "/bin/sh",
743 &[
744 "-c",
745 "cat >/dev/null; echo '{\"schema\":\"muninn.envelope.v1\",\"utterance_id\":\"utt\"}'",
746 ],
747 1_000,
748 OnErrorPolicy::Abort,
749 )],
750 );
751 let input = sample_envelope();
752
753 let outcome = runner.run(input.clone(), &config).await;
754
755 match outcome {
756 PipelineOutcome::Completed { envelope, trace } => {
757 assert_eq!(envelope, input);
758 assert_eq!(trace.len(), 1);
759 assert_eq!(
760 trace[0].policy_applied,
761 PipelinePolicyApplied::ContractBypass
762 );
763 }
764 other => panic!("expected completed outcome, got {other:?}"),
765 }
766 }
767
768 #[derive(Default)]
769 struct FakeInProcessExecutor {
770 handled_step_ids: Mutex<Vec<String>>,
771 }
772
773 impl FakeInProcessExecutor {
774 fn handled_step_ids(&self) -> Vec<String> {
775 self.handled_step_ids
776 .lock()
777 .expect("handled steps mutex should not be poisoned")
778 .clone()
779 }
780 }
781
782 #[async_trait]
783 impl InProcessStepExecutor for FakeInProcessExecutor {
784 async fn try_execute(
785 &self,
786 step: &PipelineStepConfig,
787 input: &MuninnEnvelopeV1,
788 ) -> Option<Result<MuninnEnvelopeV1, InProcessStepError>> {
789 if step.cmd != "stt_openai" {
790 return None;
791 }
792
793 self.handled_step_ids
794 .lock()
795 .expect("handled steps mutex should not be poisoned")
796 .push(step.id.clone());
797
798 let mut envelope = input.clone();
799 envelope.transcript.raw_text = Some("handled in process".to_string());
800 Some(Ok(envelope))
801 }
802 }
803
804 struct SlowInProcessExecutor;
805
806 #[async_trait]
807 impl InProcessStepExecutor for SlowInProcessExecutor {
808 async fn try_execute(
809 &self,
810 step: &PipelineStepConfig,
811 input: &MuninnEnvelopeV1,
812 ) -> Option<Result<MuninnEnvelopeV1, InProcessStepError>> {
813 if step.cmd != "stt_openai" {
814 return None;
815 }
816
817 tokio::time::sleep(Duration::from_millis(50)).await;
818 Some(Ok(input.clone()))
819 }
820 }
821
822 #[tokio::test]
823 async fn in_process_executor_handles_builtin_steps_without_spawning() {
824 let executor = Arc::new(FakeInProcessExecutor::default());
825 let runner = PipelineRunner::with_in_process_step_executor(true, executor.clone());
826 let config = config_with_steps(
827 1_000,
828 vec![step(
829 "builtin",
830 "stt_openai",
831 &[],
832 500,
833 OnErrorPolicy::Abort,
834 )],
835 );
836
837 let outcome = runner.run(sample_envelope(), &config).await;
838
839 match outcome {
840 PipelineOutcome::Completed { envelope, trace } => {
841 assert_eq!(
842 envelope.transcript.raw_text.as_deref(),
843 Some("handled in process")
844 );
845 assert_eq!(trace.len(), 1);
846 assert_eq!(trace[0].exit_status, Some(0));
847 }
848 other => panic!("expected completed outcome, got {other:?}"),
849 }
850
851 assert_eq!(executor.handled_step_ids(), vec!["builtin".to_string()]);
852 }
853
854 #[tokio::test]
855 async fn in_process_executor_leaves_external_steps_to_subprocess_execution() {
856 let executor = Arc::new(FakeInProcessExecutor::default());
857 let runner = PipelineRunner::with_in_process_step_executor(true, executor.clone());
858 let config = config_with_steps(
859 1_000,
860 vec![step("echo", "cat", &[], 500, OnErrorPolicy::Abort)],
861 );
862 let input = sample_envelope();
863
864 let outcome = runner.run(input.clone(), &config).await;
865
866 match outcome {
867 PipelineOutcome::Completed { envelope, .. } => {
868 assert_eq!(envelope, input);
869 }
870 other => panic!("expected completed outcome, got {other:?}"),
871 }
872
873 assert!(executor.handled_step_ids().is_empty());
874 }
875
876 #[tokio::test]
877 async fn in_process_executor_preserves_timeout_behavior() {
878 let runner =
879 PipelineRunner::with_in_process_step_executor(true, Arc::new(SlowInProcessExecutor));
880 let config = config_with_steps(
881 1_000,
882 vec![step("builtin", "stt_openai", &[], 10, OnErrorPolicy::Abort)],
883 );
884
885 let outcome = runner.run(sample_envelope(), &config).await;
886
887 match outcome {
888 PipelineOutcome::Aborted { trace, reason } => {
889 assert_eq!(trace.len(), 1);
890 assert!(trace[0].timed_out);
891 assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
892 assert_eq!(
893 reason,
894 PipelineStopReason::StepFailed {
895 step_id: "builtin".to_string(),
896 failure: StepFailureKind::Timeout,
897 message: "step exceeded timeout budget (10ms)".to_string(),
898 }
899 );
900 }
901 other => panic!("expected aborted outcome, got {other:?}"),
902 }
903 }
904
905 #[tokio::test]
906 async fn rejects_step_stdout_that_exceeds_capture_budget() {
907 let runner = PipelineRunner::default();
908 let command = format!(
909 "python3 -c \"import sys; sys.stdout.write('a' * {})\"",
910 MAX_STEP_STDOUT_BYTES + 1
911 );
912 let config = config_with_steps(
913 1_000,
914 vec![step(
915 "big-stdout",
916 "/bin/sh",
917 &["-c", &command],
918 1_000,
919 OnErrorPolicy::Abort,
920 )],
921 );
922
923 let outcome = runner.run(sample_envelope(), &config).await;
924
925 match outcome {
926 PipelineOutcome::Aborted { trace, reason } => {
927 assert_eq!(trace.len(), 1);
928 assert_eq!(trace[0].id, "big-stdout");
929 assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
930 match reason {
931 PipelineStopReason::StepFailed {
932 step_id,
933 failure,
934 message,
935 } => {
936 assert_eq!(step_id, "big-stdout");
937 assert_eq!(failure, StepFailureKind::InvalidStdout);
938 assert!(message.contains("step stdout exceeded max capture budget"));
939 }
940 other => panic!("unexpected reason: {other:?}"),
941 }
942 }
943 other => panic!("expected aborted outcome, got {other:?}"),
944 }
945 }
946
947 #[tokio::test]
948 async fn truncates_large_stderr_without_unbounded_growth() {
949 let runner = PipelineRunner::default();
950 let command = format!(
951 "cat >/dev/null; python3 -c \"import sys; sys.stderr.write('e' * {})\"; exit 7",
952 MAX_STEP_STDERR_BYTES + 1_024
953 );
954 let config = config_with_steps(
955 1_000,
956 vec![step(
957 "big-stderr",
958 "/bin/sh",
959 &["-c", &command],
960 1_000,
961 OnErrorPolicy::Abort,
962 )],
963 );
964
965 let outcome = runner.run(sample_envelope(), &config).await;
966
967 match outcome {
968 PipelineOutcome::Aborted { trace, reason } => {
969 assert_eq!(trace.len(), 1);
970 assert_eq!(trace[0].id, "big-stderr");
971 assert!(trace[0].stderr.ends_with(TRUNCATION_SUFFIX));
972 assert!(trace[0].stderr.len() <= MAX_STEP_STDERR_BYTES + TRUNCATION_SUFFIX.len());
973 match reason {
974 PipelineStopReason::StepFailed {
975 step_id, failure, ..
976 } => {
977 assert_eq!(step_id, "big-stderr");
978 assert_eq!(failure, StepFailureKind::NonZeroExit);
979 }
980 other => panic!("unexpected reason: {other:?}"),
981 }
982 }
983 other => panic!("expected aborted outcome, got {other:?}"),
984 }
985 }
986
987 fn config_with_steps(deadline_ms: u64, steps: Vec<PipelineStepConfig>) -> PipelineConfig {
988 PipelineConfig {
989 deadline_ms,
990 payload_format: PayloadFormat::JsonObject,
991 steps,
992 }
993 }
994
995 fn step(
996 id: &str,
997 cmd: &str,
998 args: &[&str],
999 timeout_ms: u64,
1000 on_error: OnErrorPolicy,
1001 ) -> PipelineStepConfig {
1002 PipelineStepConfig {
1003 id: id.to_string(),
1004 cmd: cmd.to_string(),
1005 args: args.iter().map(|arg| arg.to_string()).collect(),
1006 io_mode: StepIoMode::EnvelopeJson,
1007 timeout_ms,
1008 on_error,
1009 }
1010 }
1011
1012 fn text_step(
1013 id: &str,
1014 cmd: &str,
1015 args: &[&str],
1016 timeout_ms: u64,
1017 on_error: OnErrorPolicy,
1018 ) -> PipelineStepConfig {
1019 PipelineStepConfig {
1020 id: id.to_string(),
1021 cmd: cmd.to_string(),
1022 args: args.iter().map(|arg| arg.to_string()).collect(),
1023 io_mode: StepIoMode::Auto,
1024 timeout_ms,
1025 on_error,
1026 }
1027 }
1028
1029 fn sample_envelope() -> MuninnEnvelopeV1 {
1030 MuninnEnvelopeV1::new("utt-runner-001", "2026-03-05T17:16:09Z")
1031 .with_transcript_raw_text("ship to sf")
1032 .with_transcript_provider("openai")
1033 }
1034}