Skip to main content

muninn/
runner.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use crate::config::{OnErrorPolicy, PipelineConfig, PipelineStepConfig};
5use crate::envelope::MuninnEnvelopeV1;
6use async_trait::async_trait;
7use serde::Serialize;
8use tokio::time::timeout;
9
10mod codec;
11mod execution;
12mod transport;
13
14const MAX_STEP_STDOUT_BYTES: usize = 64 * 1024;
15const MAX_STEP_STDERR_BYTES: usize = 16 * 1024;
16const TRUNCATION_SUFFIX: &str = "\n[truncated]";
17
18#[derive(Clone)]
19pub struct PipelineRunner {
20    strict_step_contract: bool,
21    in_process_step_executor: Option<Arc<dyn InProcessStepExecutor>>,
22}
23
24#[async_trait]
25pub trait InProcessStepExecutor: Send + Sync {
26    async fn try_execute(
27        &self,
28        step: &PipelineStepConfig,
29        input: &MuninnEnvelopeV1,
30    ) -> Option<Result<MuninnEnvelopeV1, InProcessStepError>>;
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct InProcessStepError {
35    pub kind: StepFailureKind,
36    pub message: String,
37    pub stderr: String,
38    pub exit_status: Option<i32>,
39}
40
41impl Default for PipelineRunner {
42    fn default() -> Self {
43        Self {
44            strict_step_contract: true,
45            in_process_step_executor: None,
46        }
47    }
48}
49
50impl PipelineRunner {
51    pub fn new(strict_step_contract: bool) -> Self {
52        Self {
53            strict_step_contract,
54            in_process_step_executor: None,
55        }
56    }
57
58    pub fn with_in_process_step_executor(
59        strict_step_contract: bool,
60        in_process_step_executor: Arc<dyn InProcessStepExecutor>,
61    ) -> Self {
62        Self {
63            strict_step_contract,
64            in_process_step_executor: Some(in_process_step_executor),
65        }
66    }
67
68    pub async fn run(
69        &self,
70        envelope: MuninnEnvelopeV1,
71        config: &PipelineConfig,
72    ) -> PipelineOutcome {
73        let start = Instant::now();
74        let deadline = Duration::from_millis(config.deadline_ms);
75        let mut current_envelope = envelope;
76        let mut trace = Vec::with_capacity(config.steps.len());
77
78        for step in &config.steps {
79            let Some(remaining_budget) = remaining_budget(start, deadline) else {
80                return PipelineOutcome::FallbackRaw {
81                    envelope: current_envelope,
82                    trace,
83                    reason: PipelineStopReason::GlobalDeadlineExceeded {
84                        deadline_ms: config.deadline_ms,
85                        step_id: Some(step.id.clone()),
86                    },
87                };
88            };
89
90            let step_budget = Duration::from_millis(step.timeout_ms);
91            let effective_timeout = remaining_budget.min(step_budget);
92            let started = Instant::now();
93
94            match self
95                .run_step(step, current_envelope, effective_timeout)
96                .await
97            {
98                Ok(success) => {
99                    trace.push(PipelineTraceEntry {
100                        id: step.id.clone(),
101                        duration_ms: elapsed_ms(started.elapsed()),
102                        timed_out: false,
103                        exit_status: Some(success.exit_status),
104                        policy_applied: success.policy_applied,
105                        stderr: success.stderr,
106                    });
107                    current_envelope = success.envelope;
108                }
109                Err(failure) => {
110                    let StepFailure {
111                        envelope,
112                        kind,
113                        timed_out,
114                        exit_status,
115                        stderr,
116                        message,
117                    } = failure;
118                    let hit_global_deadline = timed_out && remaining_budget <= step_budget;
119                    let mut trace_entry = PipelineTraceEntry {
120                        id: step.id.clone(),
121                        duration_ms: elapsed_ms(started.elapsed()),
122                        timed_out,
123                        exit_status,
124                        policy_applied: PipelinePolicyApplied::None,
125                        stderr: stderr.clone(),
126                    };
127                    current_envelope = envelope;
128
129                    if hit_global_deadline {
130                        trace_entry.policy_applied = PipelinePolicyApplied::GlobalDeadlineFallback;
131                        trace.push(trace_entry);
132                        return PipelineOutcome::FallbackRaw {
133                            envelope: current_envelope,
134                            trace,
135                            reason: PipelineStopReason::GlobalDeadlineExceeded {
136                                deadline_ms: config.deadline_ms,
137                                step_id: Some(step.id.clone()),
138                            },
139                        };
140                    }
141
142                    let reason = PipelineStopReason::StepFailed {
143                        step_id: step.id.clone(),
144                        failure: kind,
145                        message,
146                    };
147
148                    match step.on_error {
149                        OnErrorPolicy::Continue => {
150                            trace_entry.policy_applied = PipelinePolicyApplied::Continue;
151                            trace.push(trace_entry);
152                        }
153                        OnErrorPolicy::FallbackRaw => {
154                            trace_entry.policy_applied = PipelinePolicyApplied::FallbackRaw;
155                            trace.push(trace_entry);
156                            return PipelineOutcome::FallbackRaw {
157                                envelope: current_envelope,
158                                trace,
159                                reason,
160                            };
161                        }
162                        OnErrorPolicy::Abort => {
163                            trace_entry.policy_applied = PipelinePolicyApplied::Abort;
164                            trace.push(trace_entry);
165                            return PipelineOutcome::Aborted { trace, reason };
166                        }
167                    }
168                }
169            }
170        }
171
172        PipelineOutcome::Completed {
173            envelope: current_envelope,
174            trace,
175        }
176    }
177
178    async fn run_step(
179        &self,
180        step: &PipelineStepConfig,
181        input_envelope: MuninnEnvelopeV1,
182        timeout_budget: Duration,
183    ) -> Result<StepSuccess, StepFailure> {
184        if let Some(executor) = &self.in_process_step_executor {
185            match self
186                .run_in_process_step(step, &input_envelope, timeout_budget, executor)
187                .await
188            {
189                Some(Ok(success)) => return Ok(success),
190                Some(Err(failure)) => {
191                    return Err(StepFailure {
192                        kind: failure.kind,
193                        envelope: input_envelope,
194                        timed_out: failure.timed_out,
195                        exit_status: failure.exit_status,
196                        stderr: failure.stderr,
197                        message: failure.message,
198                    });
199                }
200                None => {}
201            }
202        }
203
204        execution::run_external_step(
205            step,
206            input_envelope,
207            timeout_budget,
208            self.strict_step_contract,
209            MAX_STEP_STDOUT_BYTES,
210            MAX_STEP_STDERR_BYTES,
211            TRUNCATION_SUFFIX,
212        )
213        .await
214    }
215
216    async fn run_in_process_step(
217        &self,
218        step: &PipelineStepConfig,
219        input_envelope: &MuninnEnvelopeV1,
220        timeout_budget: Duration,
221        executor: &Arc<dyn InProcessStepExecutor>,
222    ) -> Option<Result<StepSuccess, InProcessStepFailure>> {
223        match timeout(timeout_budget, executor.try_execute(step, input_envelope)).await {
224            Ok(Some(Ok(envelope))) => Some(Ok(StepSuccess {
225                envelope,
226                exit_status: 0,
227                stderr: String::new(),
228                policy_applied: PipelinePolicyApplied::None,
229            })),
230            Ok(Some(Err(error))) => Some(Err(InProcessStepFailure {
231                kind: error.kind,
232                timed_out: false,
233                exit_status: error.exit_status,
234                stderr: error.stderr,
235                message: error.message,
236            })),
237            Ok(None) => None,
238            Err(_) => Some(Err(InProcessStepFailure {
239                kind: StepFailureKind::Timeout,
240                timed_out: true,
241                exit_status: None,
242                stderr: String::new(),
243                message: format!(
244                    "step exceeded timeout budget ({}ms)",
245                    timeout_budget.as_millis()
246                ),
247            })),
248        }
249    }
250}
251
252#[derive(Debug, Clone, PartialEq, Serialize)]
253pub enum PipelineOutcome {
254    Completed {
255        envelope: MuninnEnvelopeV1,
256        trace: Vec<PipelineTraceEntry>,
257    },
258    FallbackRaw {
259        envelope: MuninnEnvelopeV1,
260        trace: Vec<PipelineTraceEntry>,
261        reason: PipelineStopReason,
262    },
263    Aborted {
264        trace: Vec<PipelineTraceEntry>,
265        reason: PipelineStopReason,
266    },
267}
268
269#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
270pub enum PipelineStopReason {
271    GlobalDeadlineExceeded {
272        deadline_ms: u64,
273        step_id: Option<String>,
274    },
275    StepFailed {
276        step_id: String,
277        failure: StepFailureKind,
278        message: String,
279    },
280}
281
282#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
283pub struct PipelineTraceEntry {
284    pub id: String,
285    pub duration_ms: u64,
286    pub timed_out: bool,
287    pub exit_status: Option<i32>,
288    pub policy_applied: PipelinePolicyApplied,
289    pub stderr: String,
290}
291
292#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
293pub enum PipelinePolicyApplied {
294    None,
295    ContractBypass,
296    Continue,
297    FallbackRaw,
298    Abort,
299    GlobalDeadlineFallback,
300}
301
302#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
303pub enum StepFailureKind {
304    SerializeInput,
305    SpawnFailed,
306    IoFailed,
307    Timeout,
308    NonZeroExit,
309    InvalidStdout,
310    InvalidEnvelope,
311}
312
313#[derive(Debug)]
314struct StepSuccess {
315    envelope: MuninnEnvelopeV1,
316    exit_status: i32,
317    stderr: String,
318    policy_applied: PipelinePolicyApplied,
319}
320
321#[derive(Debug)]
322struct InProcessStepFailure {
323    kind: StepFailureKind,
324    timed_out: bool,
325    exit_status: Option<i32>,
326    stderr: String,
327    message: String,
328}
329
330#[derive(Debug)]
331struct StepFailure {
332    kind: StepFailureKind,
333    envelope: MuninnEnvelopeV1,
334    timed_out: bool,
335    exit_status: Option<i32>,
336    stderr: String,
337    message: String,
338}
339
340fn remaining_budget(start: Instant, deadline: Duration) -> Option<Duration> {
341    let elapsed = start.elapsed();
342    if elapsed >= deadline {
343        None
344    } else {
345        Some(deadline - elapsed)
346    }
347}
348
349fn elapsed_ms(duration: Duration) -> u64 {
350    duration
351        .as_millis()
352        .min(u128::from(u64::MAX))
353        .try_into()
354        .unwrap_or(u64::MAX)
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use crate::config::{PayloadFormat, PipelineStepConfig, StepIoMode};
361    use async_trait::async_trait;
362    use std::sync::{Arc, Mutex};
363
364    #[tokio::test]
365    async fn completes_when_steps_succeed() {
366        let runner = PipelineRunner::default();
367        let config = config_with_steps(
368            1_000,
369            vec![step("echo", "cat", &[], 500, OnErrorPolicy::Abort)],
370        );
371        let input = sample_envelope();
372
373        let outcome = runner.run(input.clone(), &config).await;
374
375        match outcome {
376            PipelineOutcome::Completed { envelope, trace } => {
377                assert_eq!(envelope, input);
378                assert_eq!(trace.len(), 1);
379                assert_eq!(trace[0].id, "echo");
380                assert_eq!(trace[0].exit_status, Some(0));
381                assert!(!trace[0].timed_out);
382                assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::None);
383            }
384            other => panic!("expected completed outcome, got {other:?}"),
385        }
386    }
387
388    #[tokio::test]
389    async fn external_steps_default_to_text_filter_on_transcript_raw_text() {
390        let runner = PipelineRunner::default();
391        let config = config_with_steps(
392            1_000,
393            vec![text_step(
394                "uppercase",
395                "/usr/bin/tr",
396                &["[:lower:]", "[:upper:]"],
397                500,
398                OnErrorPolicy::Abort,
399            )],
400        );
401
402        let outcome = runner.run(sample_envelope(), &config).await;
403
404        match outcome {
405            PipelineOutcome::Completed { envelope, trace } => {
406                assert_eq!(trace.len(), 1);
407                assert_eq!(trace[0].id, "uppercase");
408                assert_eq!(envelope.transcript.raw_text.as_deref(), Some("SHIP TO SF"));
409                assert!(envelope.output.final_text.is_none());
410            }
411            other => panic!("expected completed outcome, got {other:?}"),
412        }
413    }
414
415    #[tokio::test]
416    async fn text_filter_prefers_output_final_text_when_present() {
417        let runner = PipelineRunner::default();
418        let config = config_with_steps(
419            1_000,
420            vec![text_step(
421                "suffix",
422                "/bin/sh",
423                &["-c", "sed 's/$/!/'"],
424                500,
425                OnErrorPolicy::Abort,
426            )],
427        );
428        let input = sample_envelope().with_output_final_text("Ship to SF");
429
430        let outcome = runner.run(input, &config).await;
431
432        match outcome {
433            PipelineOutcome::Completed { envelope, .. } => {
434                assert_eq!(envelope.output.final_text.as_deref(), Some("Ship to SF!"));
435                assert_eq!(envelope.transcript.raw_text.as_deref(), Some("ship to sf"));
436            }
437            other => panic!("expected completed outcome, got {other:?}"),
438        }
439    }
440
441    #[tokio::test]
442    async fn continues_on_step_error_when_policy_continue() {
443        let runner = PipelineRunner::default();
444        let config = config_with_steps(
445            3_000,
446            vec![
447                step(
448                    "fails",
449                    "/bin/sh",
450                    &["-c", "cat >/dev/null; echo fail-continue >&2; exit 7"],
451                    1_000,
452                    OnErrorPolicy::Continue,
453                ),
454                step("echo", "cat", &[], 500, OnErrorPolicy::Abort),
455            ],
456        );
457
458        let outcome = runner.run(sample_envelope(), &config).await;
459
460        match outcome {
461            PipelineOutcome::Completed { trace, .. } => {
462                assert_eq!(trace.len(), 2);
463                assert_eq!(trace[0].id, "fails");
464                assert_eq!(trace[0].exit_status, Some(7));
465                assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Continue);
466                assert!(trace[0].stderr.contains("fail-continue"));
467
468                assert_eq!(trace[1].id, "echo");
469                assert_eq!(trace[1].exit_status, Some(0));
470                assert_eq!(trace[1].policy_applied, PipelinePolicyApplied::None);
471            }
472            other => panic!("expected completed outcome, got {other:?}"),
473        }
474    }
475
476    #[tokio::test]
477    async fn returns_fallback_when_policy_fallback_raw() {
478        let runner = PipelineRunner::default();
479        let config = config_with_steps(
480            3_000,
481            vec![step(
482                "fails",
483                "/bin/sh",
484                &["-c", "cat >/dev/null; echo fail-fallback >&2; exit 9"],
485                1_000,
486                OnErrorPolicy::FallbackRaw,
487            )],
488        );
489        let input = sample_envelope();
490
491        let outcome = runner.run(input.clone(), &config).await;
492
493        match outcome {
494            PipelineOutcome::FallbackRaw {
495                envelope,
496                trace,
497                reason,
498            } => {
499                assert_eq!(envelope, input);
500                assert_eq!(trace.len(), 1);
501                assert_eq!(trace[0].id, "fails");
502                assert_eq!(trace[0].exit_status, Some(9));
503                assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::FallbackRaw);
504                assert_eq!(
505                    reason,
506                    PipelineStopReason::StepFailed {
507                        step_id: "fails".to_string(),
508                        failure: StepFailureKind::NonZeroExit,
509                        message: "step exited non-zero with status 9".to_string(),
510                    }
511                );
512            }
513            other => panic!("expected fallback outcome, got {other:?}"),
514        }
515    }
516
517    #[tokio::test]
518    async fn returns_aborted_when_policy_abort() {
519        let runner = PipelineRunner::default();
520        let config = config_with_steps(
521            3_000,
522            vec![step(
523                "fails",
524                "/bin/sh",
525                &["-c", "cat >/dev/null; echo fail-abort >&2; exit 11"],
526                1_000,
527                OnErrorPolicy::Abort,
528            )],
529        );
530
531        let outcome = runner.run(sample_envelope(), &config).await;
532
533        match outcome {
534            PipelineOutcome::Aborted { trace, reason } => {
535                assert_eq!(trace.len(), 1);
536                assert_eq!(trace[0].id, "fails");
537                assert_eq!(trace[0].exit_status, Some(11));
538                assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
539                assert_eq!(
540                    reason,
541                    PipelineStopReason::StepFailed {
542                        step_id: "fails".to_string(),
543                        failure: StepFailureKind::NonZeroExit,
544                        message: "step exited non-zero with status 11".to_string(),
545                    }
546                );
547            }
548            other => panic!("expected aborted outcome, got {other:?}"),
549        }
550    }
551
552    #[tokio::test]
553    async fn step_timeout_maps_to_policy() {
554        let runner = PipelineRunner::default();
555        let config = config_with_steps(
556            3_000,
557            vec![step(
558                "slow",
559                "/bin/sh",
560                &["-c", "sleep 1; cat"],
561                50,
562                OnErrorPolicy::Abort,
563            )],
564        );
565
566        let outcome = runner.run(sample_envelope(), &config).await;
567
568        match outcome {
569            PipelineOutcome::Aborted { trace, reason } => {
570                assert_eq!(trace.len(), 1);
571                assert_eq!(trace[0].id, "slow");
572                assert!(trace[0].timed_out);
573                assert_eq!(trace[0].exit_status, None);
574                assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
575                assert_eq!(
576                    reason,
577                    PipelineStopReason::StepFailed {
578                        step_id: "slow".to_string(),
579                        failure: StepFailureKind::Timeout,
580                        message: "step exceeded timeout budget (50ms)".to_string(),
581                    }
582                );
583            }
584            other => panic!("expected aborted outcome, got {other:?}"),
585        }
586    }
587
588    #[tokio::test]
589    async fn global_deadline_forces_fallback() {
590        let runner = PipelineRunner::default();
591        let config = config_with_steps(
592            60,
593            vec![step(
594                "slow",
595                "/bin/sh",
596                &["-c", "sleep 1; cat"],
597                1_000,
598                OnErrorPolicy::Abort,
599            )],
600        );
601        let input = sample_envelope();
602
603        let outcome = runner.run(input.clone(), &config).await;
604
605        match outcome {
606            PipelineOutcome::FallbackRaw {
607                envelope,
608                trace,
609                reason,
610            } => {
611                assert_eq!(envelope, input);
612                assert_eq!(trace.len(), 1);
613                assert_eq!(trace[0].id, "slow");
614                assert!(trace[0].timed_out);
615                assert_eq!(
616                    trace[0].policy_applied,
617                    PipelinePolicyApplied::GlobalDeadlineFallback
618                );
619                assert_eq!(
620                    reason,
621                    PipelineStopReason::GlobalDeadlineExceeded {
622                        deadline_ms: 60,
623                        step_id: Some("slow".to_string()),
624                    }
625                );
626            }
627            other => panic!("expected fallback outcome, got {other:?}"),
628        }
629    }
630
631    #[tokio::test]
632    async fn strict_contract_rejects_non_object_stdout() {
633        let runner = PipelineRunner::default();
634        let config = config_with_steps(
635            1_000,
636            vec![step(
637                "bad-stdout",
638                "/bin/sh",
639                &["-c", "cat >/dev/null; echo not-json"],
640                1_000,
641                OnErrorPolicy::Abort,
642            )],
643        );
644
645        let outcome = runner.run(sample_envelope(), &config).await;
646
647        match outcome {
648            PipelineOutcome::Aborted { trace, reason } => {
649                assert_eq!(trace.len(), 1);
650                assert_eq!(trace[0].id, "bad-stdout");
651                assert_eq!(trace[0].exit_status, Some(0));
652                assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
653                match reason {
654                    PipelineStopReason::StepFailed {
655                        step_id,
656                        failure,
657                        message,
658                    } => {
659                        assert_eq!(step_id, "bad-stdout");
660                        assert_eq!(failure, StepFailureKind::InvalidStdout);
661                        assert!(message.starts_with("step stdout was not valid JSON:"));
662                    }
663                    other => panic!("unexpected reason: {other:?}"),
664                }
665            }
666            other => panic!("expected aborted outcome, got {other:?}"),
667        }
668    }
669
670    #[tokio::test]
671    async fn non_strict_contract_keeps_previous_envelope_on_bad_stdout() {
672        let runner = PipelineRunner::new(false);
673        let config = config_with_steps(
674            3_000,
675            vec![
676                step(
677                    "bad-stdout",
678                    "/bin/sh",
679                    &["-c", "cat >/dev/null; echo not-json"],
680                    1_000,
681                    OnErrorPolicy::Abort,
682                ),
683                step("echo", "cat", &[], 500, OnErrorPolicy::Abort),
684            ],
685        );
686        let input = sample_envelope();
687
688        let outcome = runner.run(input.clone(), &config).await;
689
690        match outcome {
691            PipelineOutcome::Completed { envelope, trace } => {
692                assert_eq!(envelope, input);
693                assert_eq!(trace.len(), 2);
694                assert_eq!(trace[0].exit_status, Some(0));
695                assert_eq!(
696                    trace[0].policy_applied,
697                    PipelinePolicyApplied::ContractBypass
698                );
699                assert_eq!(trace[1].exit_status, Some(0));
700            }
701            other => panic!("expected completed outcome, got {other:?}"),
702        }
703    }
704
705    #[tokio::test]
706    async fn non_strict_contract_marks_non_object_json_as_contract_bypass() {
707        let runner = PipelineRunner::new(false);
708        let config = config_with_steps(
709            1_000,
710            vec![step(
711                "array-json",
712                "/bin/sh",
713                &["-c", "cat >/dev/null; echo '[1,2,3]'"],
714                1_000,
715                OnErrorPolicy::Abort,
716            )],
717        );
718        let input = sample_envelope();
719
720        let outcome = runner.run(input.clone(), &config).await;
721
722        match outcome {
723            PipelineOutcome::Completed { envelope, trace } => {
724                assert_eq!(envelope, input);
725                assert_eq!(trace.len(), 1);
726                assert_eq!(
727                    trace[0].policy_applied,
728                    PipelinePolicyApplied::ContractBypass
729                );
730            }
731            other => panic!("expected completed outcome, got {other:?}"),
732        }
733    }
734
735    #[tokio::test]
736    async fn non_strict_contract_marks_invalid_envelope_json_as_contract_bypass() {
737        let runner = PipelineRunner::new(false);
738        let config = config_with_steps(
739            1_000,
740            vec![step(
741                "bad-envelope",
742                "/bin/sh",
743                &[
744                    "-c",
745                    "cat >/dev/null; echo '{\"schema\":\"muninn.envelope.v1\",\"utterance_id\":\"utt\"}'",
746                ],
747                1_000,
748                OnErrorPolicy::Abort,
749            )],
750        );
751        let input = sample_envelope();
752
753        let outcome = runner.run(input.clone(), &config).await;
754
755        match outcome {
756            PipelineOutcome::Completed { envelope, trace } => {
757                assert_eq!(envelope, input);
758                assert_eq!(trace.len(), 1);
759                assert_eq!(
760                    trace[0].policy_applied,
761                    PipelinePolicyApplied::ContractBypass
762                );
763            }
764            other => panic!("expected completed outcome, got {other:?}"),
765        }
766    }
767
768    #[derive(Default)]
769    struct FakeInProcessExecutor {
770        handled_step_ids: Mutex<Vec<String>>,
771    }
772
773    impl FakeInProcessExecutor {
774        fn handled_step_ids(&self) -> Vec<String> {
775            self.handled_step_ids
776                .lock()
777                .expect("handled steps mutex should not be poisoned")
778                .clone()
779        }
780    }
781
782    #[async_trait]
783    impl InProcessStepExecutor for FakeInProcessExecutor {
784        async fn try_execute(
785            &self,
786            step: &PipelineStepConfig,
787            input: &MuninnEnvelopeV1,
788        ) -> Option<Result<MuninnEnvelopeV1, InProcessStepError>> {
789            if step.cmd != "stt_openai" {
790                return None;
791            }
792
793            self.handled_step_ids
794                .lock()
795                .expect("handled steps mutex should not be poisoned")
796                .push(step.id.clone());
797
798            let mut envelope = input.clone();
799            envelope.transcript.raw_text = Some("handled in process".to_string());
800            Some(Ok(envelope))
801        }
802    }
803
804    struct SlowInProcessExecutor;
805
806    #[async_trait]
807    impl InProcessStepExecutor for SlowInProcessExecutor {
808        async fn try_execute(
809            &self,
810            step: &PipelineStepConfig,
811            input: &MuninnEnvelopeV1,
812        ) -> Option<Result<MuninnEnvelopeV1, InProcessStepError>> {
813            if step.cmd != "stt_openai" {
814                return None;
815            }
816
817            tokio::time::sleep(Duration::from_millis(50)).await;
818            Some(Ok(input.clone()))
819        }
820    }
821
822    #[tokio::test]
823    async fn in_process_executor_handles_builtin_steps_without_spawning() {
824        let executor = Arc::new(FakeInProcessExecutor::default());
825        let runner = PipelineRunner::with_in_process_step_executor(true, executor.clone());
826        let config = config_with_steps(
827            1_000,
828            vec![step(
829                "builtin",
830                "stt_openai",
831                &[],
832                500,
833                OnErrorPolicy::Abort,
834            )],
835        );
836
837        let outcome = runner.run(sample_envelope(), &config).await;
838
839        match outcome {
840            PipelineOutcome::Completed { envelope, trace } => {
841                assert_eq!(
842                    envelope.transcript.raw_text.as_deref(),
843                    Some("handled in process")
844                );
845                assert_eq!(trace.len(), 1);
846                assert_eq!(trace[0].exit_status, Some(0));
847            }
848            other => panic!("expected completed outcome, got {other:?}"),
849        }
850
851        assert_eq!(executor.handled_step_ids(), vec!["builtin".to_string()]);
852    }
853
854    #[tokio::test]
855    async fn in_process_executor_leaves_external_steps_to_subprocess_execution() {
856        let executor = Arc::new(FakeInProcessExecutor::default());
857        let runner = PipelineRunner::with_in_process_step_executor(true, executor.clone());
858        let config = config_with_steps(
859            1_000,
860            vec![step("echo", "cat", &[], 500, OnErrorPolicy::Abort)],
861        );
862        let input = sample_envelope();
863
864        let outcome = runner.run(input.clone(), &config).await;
865
866        match outcome {
867            PipelineOutcome::Completed { envelope, .. } => {
868                assert_eq!(envelope, input);
869            }
870            other => panic!("expected completed outcome, got {other:?}"),
871        }
872
873        assert!(executor.handled_step_ids().is_empty());
874    }
875
876    #[tokio::test]
877    async fn in_process_executor_preserves_timeout_behavior() {
878        let runner =
879            PipelineRunner::with_in_process_step_executor(true, Arc::new(SlowInProcessExecutor));
880        let config = config_with_steps(
881            1_000,
882            vec![step("builtin", "stt_openai", &[], 10, OnErrorPolicy::Abort)],
883        );
884
885        let outcome = runner.run(sample_envelope(), &config).await;
886
887        match outcome {
888            PipelineOutcome::Aborted { trace, reason } => {
889                assert_eq!(trace.len(), 1);
890                assert!(trace[0].timed_out);
891                assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
892                assert_eq!(
893                    reason,
894                    PipelineStopReason::StepFailed {
895                        step_id: "builtin".to_string(),
896                        failure: StepFailureKind::Timeout,
897                        message: "step exceeded timeout budget (10ms)".to_string(),
898                    }
899                );
900            }
901            other => panic!("expected aborted outcome, got {other:?}"),
902        }
903    }
904
905    #[tokio::test]
906    async fn rejects_step_stdout_that_exceeds_capture_budget() {
907        let runner = PipelineRunner::default();
908        let command = format!(
909            "python3 -c \"import sys; sys.stdout.write('a' * {})\"",
910            MAX_STEP_STDOUT_BYTES + 1
911        );
912        let config = config_with_steps(
913            1_000,
914            vec![step(
915                "big-stdout",
916                "/bin/sh",
917                &["-c", &command],
918                1_000,
919                OnErrorPolicy::Abort,
920            )],
921        );
922
923        let outcome = runner.run(sample_envelope(), &config).await;
924
925        match outcome {
926            PipelineOutcome::Aborted { trace, reason } => {
927                assert_eq!(trace.len(), 1);
928                assert_eq!(trace[0].id, "big-stdout");
929                assert_eq!(trace[0].policy_applied, PipelinePolicyApplied::Abort);
930                match reason {
931                    PipelineStopReason::StepFailed {
932                        step_id,
933                        failure,
934                        message,
935                    } => {
936                        assert_eq!(step_id, "big-stdout");
937                        assert_eq!(failure, StepFailureKind::InvalidStdout);
938                        assert!(message.contains("step stdout exceeded max capture budget"));
939                    }
940                    other => panic!("unexpected reason: {other:?}"),
941                }
942            }
943            other => panic!("expected aborted outcome, got {other:?}"),
944        }
945    }
946
947    #[tokio::test]
948    async fn truncates_large_stderr_without_unbounded_growth() {
949        let runner = PipelineRunner::default();
950        let command = format!(
951            "cat >/dev/null; python3 -c \"import sys; sys.stderr.write('e' * {})\"; exit 7",
952            MAX_STEP_STDERR_BYTES + 1_024
953        );
954        let config = config_with_steps(
955            1_000,
956            vec![step(
957                "big-stderr",
958                "/bin/sh",
959                &["-c", &command],
960                1_000,
961                OnErrorPolicy::Abort,
962            )],
963        );
964
965        let outcome = runner.run(sample_envelope(), &config).await;
966
967        match outcome {
968            PipelineOutcome::Aborted { trace, reason } => {
969                assert_eq!(trace.len(), 1);
970                assert_eq!(trace[0].id, "big-stderr");
971                assert!(trace[0].stderr.ends_with(TRUNCATION_SUFFIX));
972                assert!(trace[0].stderr.len() <= MAX_STEP_STDERR_BYTES + TRUNCATION_SUFFIX.len());
973                match reason {
974                    PipelineStopReason::StepFailed {
975                        step_id, failure, ..
976                    } => {
977                        assert_eq!(step_id, "big-stderr");
978                        assert_eq!(failure, StepFailureKind::NonZeroExit);
979                    }
980                    other => panic!("unexpected reason: {other:?}"),
981                }
982            }
983            other => panic!("expected aborted outcome, got {other:?}"),
984        }
985    }
986
987    fn config_with_steps(deadline_ms: u64, steps: Vec<PipelineStepConfig>) -> PipelineConfig {
988        PipelineConfig {
989            deadline_ms,
990            payload_format: PayloadFormat::JsonObject,
991            steps,
992        }
993    }
994
995    fn step(
996        id: &str,
997        cmd: &str,
998        args: &[&str],
999        timeout_ms: u64,
1000        on_error: OnErrorPolicy,
1001    ) -> PipelineStepConfig {
1002        PipelineStepConfig {
1003            id: id.to_string(),
1004            cmd: cmd.to_string(),
1005            args: args.iter().map(|arg| arg.to_string()).collect(),
1006            io_mode: StepIoMode::EnvelopeJson,
1007            timeout_ms,
1008            on_error,
1009        }
1010    }
1011
1012    fn text_step(
1013        id: &str,
1014        cmd: &str,
1015        args: &[&str],
1016        timeout_ms: u64,
1017        on_error: OnErrorPolicy,
1018    ) -> PipelineStepConfig {
1019        PipelineStepConfig {
1020            id: id.to_string(),
1021            cmd: cmd.to_string(),
1022            args: args.iter().map(|arg| arg.to_string()).collect(),
1023            io_mode: StepIoMode::Auto,
1024            timeout_ms,
1025            on_error,
1026        }
1027    }
1028
1029    fn sample_envelope() -> MuninnEnvelopeV1 {
1030        MuninnEnvelopeV1::new("utt-runner-001", "2026-03-05T17:16:09Z")
1031            .with_transcript_raw_text("ship to sf")
1032            .with_transcript_provider("openai")
1033    }
1034}