Skip to main content

ftui_runtime/
process_subscription.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Process subscription for spawning and monitoring external processes.
3//!
4//! [`ProcessSubscription`] wraps [`std::process::Command`] as a first-class
5//! runtime [`Subscription`]. It spawns a child process, captures stdout
6//! line-by-line, and sends messages to the model. When the subscription is
7//! stopped (via [`StopSignal`]), the child process is killed.
8//!
9//! # Migration rationale
10//!
11//! Web Worker APIs and child-process patterns in source frameworks translate
12//! to process-based subscriptions in the terminal context. This provides a
13//! clean target for the migration code emitter.
14//!
15//! # Example
16//!
17//! ```ignore
18//! use ftui_runtime::process_subscription::{ProcessSubscription, ProcessEvent};
19//! use std::time::Duration;
20//!
21//! #[derive(Debug)]
22//! enum Msg {
23//!     ProcessOutput(ProcessEvent),
24//!     // ...
25//! }
26//!
27//! fn subscriptions() -> Vec<Box<dyn Subscription<Msg>>> {
28//!     vec![Box::new(
29//!         ProcessSubscription::new("tail", Msg::ProcessOutput)
30//!             .arg("-f")
31//!             .arg("/var/log/syslog")
32//!             .timeout(Duration::from_secs(60))
33//!     )]
34//! }
35//! ```
36
37#![forbid(unsafe_code)]
38
39use crate::subscription::{StopSignal, SubId, Subscription};
40use std::collections::hash_map::DefaultHasher;
41use std::hash::{Hash, Hasher};
42use std::io::{BufRead, Read};
43use std::process::{Command, Stdio};
44use std::sync::mpsc;
45use web_time::{Duration, Instant};
46
47/// Events emitted by a [`ProcessSubscription`].
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub enum ProcessEvent {
50    /// A line of stdout output from the process.
51    Stdout(String),
52    /// A line of stderr output from the process.
53    Stderr(String),
54    /// The process exited with a status code.
55    Exited(i32),
56    /// The process was terminated by a Unix signal.
57    Signaled(i32),
58    /// The process was killed by the subscription (stop signal or timeout).
59    Killed,
60    /// An error occurred spawning or monitoring the process.
61    Error(String),
62}
63
64/// A subscription that spawns and monitors an external process.
65///
66/// Captures stdout/stderr line-by-line and sends [`ProcessEvent`] messages.
67/// The process is killed when the subscription's [`StopSignal`] fires or
68/// when the optional timeout expires.
69pub struct ProcessSubscription<M: Send + 'static> {
70    program: String,
71    args: Vec<String>,
72    env: Vec<(String, String)>,
73    timeout: Option<Duration>,
74    id: SubId,
75    explicit_id: bool,
76    make_msg: std::sync::Arc<dyn Fn(ProcessEvent) -> M + Send + Sync>,
77}
78
79const PROCESS_READER_JOIN_TIMEOUT: Duration = Duration::from_millis(250);
80const PROCESS_READER_JOIN_POLL: Duration = Duration::from_millis(5);
81
82impl<M: Send + 'static> ProcessSubscription<M> {
83    fn computed_id(
84        program: &str,
85        args: &[String],
86        env: &[(String, String)],
87        timeout: Option<Duration>,
88    ) -> SubId {
89        let mut h = DefaultHasher::new();
90        "ProcessSubscription".hash(&mut h);
91        program.hash(&mut h);
92        args.hash(&mut h);
93        env.hash(&mut h);
94        timeout.map(|duration| duration.as_nanos()).hash(&mut h);
95        h.finish()
96    }
97
98    fn refresh_id(&mut self) {
99        if !self.explicit_id {
100            self.id = Self::computed_id(&self.program, &self.args, &self.env, self.timeout);
101        }
102    }
103
104    /// Create a new process subscription for the given program.
105    ///
106    /// The `make_msg` closure converts [`ProcessEvent`] into your model's
107    /// message type.
108    pub fn new(
109        program: impl Into<String>,
110        make_msg: impl Fn(ProcessEvent) -> M + Send + Sync + 'static,
111    ) -> Self {
112        let program = program.into();
113        let id = Self::computed_id(&program, &[], &[], None);
114        Self {
115            program,
116            args: Vec::new(),
117            env: Vec::new(),
118            timeout: None,
119            id,
120            explicit_id: false,
121            make_msg: std::sync::Arc::new(make_msg),
122        }
123    }
124
125    /// Add a command-line argument.
126    #[must_use]
127    pub fn arg(mut self, arg: impl Into<String>) -> Self {
128        self.args.push(arg.into());
129        self.refresh_id();
130        self
131    }
132
133    /// Add multiple command-line arguments.
134    #[must_use]
135    pub fn args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
136        for a in args {
137            self = self.arg(a);
138        }
139        self
140    }
141
142    /// Set an environment variable for the child process.
143    #[must_use]
144    pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
145        self.env.push((key.into(), value.into()));
146        self.refresh_id();
147        self
148    }
149
150    /// Set a timeout after which the process is killed.
151    #[must_use]
152    pub fn timeout(mut self, duration: Duration) -> Self {
153        self.timeout = Some(duration);
154        self.refresh_id();
155        self
156    }
157
158    /// Override the subscription ID (for explicit deduplication control).
159    #[must_use]
160    pub fn with_id(mut self, id: SubId) -> Self {
161        self.id = id;
162        self.explicit_id = true;
163        self
164    }
165}
166
167impl<M: Send + 'static> Subscription<M> for ProcessSubscription<M> {
168    fn id(&self) -> SubId {
169        self.id
170    }
171
172    fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
173        fn forward_lines<R, M>(
174            reader: std::io::BufReader<R>,
175            sender: mpsc::Sender<M>,
176            make_msg: impl Fn(String) -> M,
177        ) where
178            R: Read,
179            M: Send + 'static,
180        {
181            for line in reader.lines() {
182                match line {
183                    Ok(line) => {
184                        if sender.send(make_msg(line)).is_err() {
185                            break;
186                        }
187                    }
188                    Err(_) => break,
189                }
190            }
191        }
192
193        let spawn_start = web_time::Instant::now();
194        let sub_id = self.id;
195
196        let mut cmd = Command::new(&self.program);
197        cmd.args(&self.args)
198            .stdout(Stdio::piped())
199            .stderr(Stdio::piped())
200            .stdin(Stdio::null());
201
202        for (k, v) in &self.env {
203            cmd.env(k, v);
204        }
205
206        let mut child = match cmd.spawn() {
207            Ok(c) => {
208                tracing::debug!(
209                    target: "ftui.process",
210                    sub_id,
211                    program = %self.program,
212                    args = ?self.args,
213                    spawn_us = spawn_start.elapsed().as_micros() as u64,
214                    "process spawned"
215                );
216                c
217            }
218            Err(e) => {
219                tracing::warn!(
220                    target: "ftui.process",
221                    sub_id,
222                    program = %self.program,
223                    error = %e,
224                    "process spawn failed"
225                );
226                let _ = sender.send((self.make_msg.as_ref())(ProcessEvent::Error(format!(
227                    "Failed to spawn '{}': {}",
228                    self.program, e
229                ))));
230                return;
231            }
232        };
233
234        let deadline = self.timeout.map(|t| web_time::Instant::now() + t);
235        let stdout = child.stdout.take();
236        let stderr = child.stderr.take();
237        let make_msg_ref = std::sync::Arc::clone(&self.make_msg);
238        // Use the cancellation token for cooperative stop coordination.
239        let token = stop.cancellation_token().clone();
240        let poll_interval = Duration::from_millis(50);
241        let stdout_handle = stdout.map(|stdout| {
242            let sender_out = sender.clone();
243            let make_msg_out = std::sync::Arc::clone(&make_msg_ref);
244            std::thread::spawn(move || {
245                forward_lines(std::io::BufReader::new(stdout), sender_out, |line| {
246                    (make_msg_out.as_ref())(ProcessEvent::Stdout(line))
247                });
248            })
249        });
250        let stderr_handle = stderr.map(|stderr| {
251            let sender_err = sender.clone();
252            let make_msg_err = std::sync::Arc::clone(&make_msg_ref);
253            std::thread::spawn(move || {
254                forward_lines(std::io::BufReader::new(stderr), sender_err, |line| {
255                    (make_msg_err.as_ref())(ProcessEvent::Stderr(line))
256                });
257            })
258        });
259
260        let final_event = loop {
261            match child.try_wait() {
262                Ok(Some(status)) => {
263                    let event = process_exit_event(status);
264                    if let ProcessEvent::Exited(code) = &event {
265                        tracing::debug!(
266                            target: "ftui.process",
267                            sub_id,
268                            exit_code = *code,
269                            elapsed_ms = spawn_start.elapsed().as_millis() as u64,
270                            "process exited"
271                        );
272                    } else if let ProcessEvent::Signaled(signal) = &event {
273                        tracing::debug!(
274                            target: "ftui.process",
275                            sub_id,
276                            signal = *signal,
277                            elapsed_ms = spawn_start.elapsed().as_millis() as u64,
278                            "process terminated by signal"
279                        );
280                    }
281                    break event;
282                }
283                Ok(None) => {}
284                Err(e) => {
285                    tracing::warn!(
286                        target: "ftui.process",
287                        sub_id,
288                        error = %e,
289                        "process wait error"
290                    );
291                    break ProcessEvent::Error(format!("wait error: {e}"));
292                }
293            }
294
295            if let Some(dl) = deadline
296                && web_time::Instant::now() >= dl
297            {
298                tracing::debug!(
299                    target: "ftui.process",
300                    sub_id,
301                    elapsed_ms = spawn_start.elapsed().as_millis() as u64,
302                    reason = "timeout",
303                    "killing process"
304                );
305                let _ = child.kill();
306                let _ = child.wait();
307                break ProcessEvent::Killed;
308            }
309
310            if token.wait_timeout(poll_interval) {
311                tracing::debug!(
312                    target: "ftui.process",
313                    sub_id,
314                    elapsed_ms = spawn_start.elapsed().as_millis() as u64,
315                    reason = "cancellation",
316                    "killing process"
317                );
318                let _ = child.kill();
319                let _ = child.wait();
320                break ProcessEvent::Killed;
321            }
322        };
323
324        if let Some(handle) = stdout_handle {
325            join_reader_thread_bounded(handle, "stdout", sub_id);
326        }
327        if let Some(handle) = stderr_handle {
328            join_reader_thread_bounded(handle, "stderr", sub_id);
329        }
330
331        let _ = sender.send((make_msg_ref.as_ref())(final_event));
332    }
333}
334
335fn process_exit_event(status: std::process::ExitStatus) -> ProcessEvent {
336    #[cfg(unix)]
337    {
338        use std::os::unix::process::ExitStatusExt;
339
340        if let Some(signal) = status.signal() {
341            return ProcessEvent::Signaled(signal);
342        }
343    }
344
345    ProcessEvent::Exited(status.code().unwrap_or(-1))
346}
347
348fn join_reader_thread_bounded(
349    handle: std::thread::JoinHandle<()>,
350    stream: &'static str,
351    sub_id: SubId,
352) {
353    let start = Instant::now();
354    while !handle.is_finished() {
355        if start.elapsed() >= PROCESS_READER_JOIN_TIMEOUT {
356            tracing::warn!(
357                target: "ftui.process",
358                sub_id,
359                stream,
360                timeout_ms = PROCESS_READER_JOIN_TIMEOUT.as_millis() as u64,
361                "process reader thread did not exit within timeout; detaching"
362            );
363            detach_reader_join(handle, stream);
364            return;
365        }
366        std::thread::sleep(PROCESS_READER_JOIN_POLL);
367    }
368    let _ = handle.join();
369}
370
371fn detach_reader_join(handle: std::thread::JoinHandle<()>, stream: &'static str) {
372    let _ = std::thread::Builder::new()
373        .name(format!("ftui-process-{stream}-detached-join"))
374        .spawn(move || {
375            let _ = handle.join();
376        });
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use std::sync::mpsc as stdmpsc;
383    use std::thread;
384
385    #[derive(Debug, Clone, PartialEq)]
386    enum TestMsg {
387        Proc(ProcessEvent),
388    }
389
390    #[test]
391    fn process_event_variants() {
392        let stdout = ProcessEvent::Stdout("hello".into());
393        let stderr = ProcessEvent::Stderr("warn".into());
394        let exited = ProcessEvent::Exited(0);
395        let signaled = ProcessEvent::Signaled(15);
396        let killed = ProcessEvent::Killed;
397        let error = ProcessEvent::Error("oops".into());
398
399        assert_eq!(stdout, ProcessEvent::Stdout("hello".into()));
400        assert_eq!(stderr, ProcessEvent::Stderr("warn".into()));
401        assert_eq!(exited, ProcessEvent::Exited(0));
402        assert_eq!(signaled, ProcessEvent::Signaled(15));
403        assert_eq!(killed, ProcessEvent::Killed);
404        assert_eq!(error, ProcessEvent::Error("oops".into()));
405    }
406
407    #[test]
408    fn subscription_id_is_stable() {
409        let s1: ProcessSubscription<TestMsg> =
410            ProcessSubscription::new("echo", TestMsg::Proc).arg("hello");
411        let s2: ProcessSubscription<TestMsg> =
412            ProcessSubscription::new("echo", TestMsg::Proc).arg("hello");
413        assert_eq!(s1.id(), s2.id());
414    }
415
416    #[test]
417    fn different_args_produce_different_ids() {
418        let s1: ProcessSubscription<TestMsg> =
419            ProcessSubscription::new("echo", TestMsg::Proc).arg("hello");
420        let s2: ProcessSubscription<TestMsg> =
421            ProcessSubscription::new("echo", TestMsg::Proc).arg("world");
422        assert_ne!(s1.id(), s2.id());
423    }
424
425    #[test]
426    fn different_programs_produce_different_ids() {
427        let s1: ProcessSubscription<TestMsg> = ProcessSubscription::new("echo", TestMsg::Proc);
428        let s2: ProcessSubscription<TestMsg> = ProcessSubscription::new("cat", TestMsg::Proc);
429        assert_ne!(s1.id(), s2.id());
430    }
431
432    #[test]
433    fn custom_id_overrides_default() {
434        let s: ProcessSubscription<TestMsg> =
435            ProcessSubscription::new("echo", TestMsg::Proc).with_id(42);
436        assert_eq!(s.id(), 42);
437    }
438
439    #[test]
440    fn env_changes_affect_subscription_id() {
441        let s1: ProcessSubscription<TestMsg> =
442            ProcessSubscription::new("echo", TestMsg::Proc).env("FTUI_TEST_VAR", "a");
443        let s2: ProcessSubscription<TestMsg> =
444            ProcessSubscription::new("echo", TestMsg::Proc).env("FTUI_TEST_VAR", "b");
445        assert_ne!(s1.id(), s2.id());
446    }
447
448    #[test]
449    fn timeout_changes_affect_subscription_id() {
450        let s1: ProcessSubscription<TestMsg> =
451            ProcessSubscription::new("echo", TestMsg::Proc).timeout(Duration::from_millis(10));
452        let s2: ProcessSubscription<TestMsg> =
453            ProcessSubscription::new("echo", TestMsg::Proc).timeout(Duration::from_millis(20));
454        assert_ne!(s1.id(), s2.id());
455    }
456
457    #[test]
458    fn explicit_id_remains_stable_after_builder_changes() {
459        let s: ProcessSubscription<TestMsg> = ProcessSubscription::new("echo", TestMsg::Proc)
460            .with_id(42)
461            .arg("hello")
462            .env("FTUI_TEST_VAR", "value")
463            .timeout(Duration::from_millis(10));
464        assert_eq!(s.id(), 42);
465    }
466
467    #[test]
468    fn echo_captures_stdout() {
469        let sub = ProcessSubscription::new("echo", TestMsg::Proc).arg("hello world");
470        let (tx, rx) = stdmpsc::channel();
471        let (signal, trigger) = StopSignal::new();
472
473        let handle = thread::spawn(move || {
474            sub.run(tx, signal);
475        });
476
477        // Wait for process to complete
478        thread::sleep(Duration::from_millis(500));
479        trigger.stop();
480        handle.join().unwrap();
481
482        let msgs: Vec<TestMsg> = rx.try_iter().collect();
483        let has_stdout = msgs.iter().any(|m| match m {
484            TestMsg::Proc(ProcessEvent::Stdout(s)) => s.contains("hello world"),
485            _ => false,
486        });
487        assert!(
488            has_stdout,
489            "Expected stdout with 'hello world', got: {msgs:?}"
490        );
491
492        let has_exit = msgs
493            .iter()
494            .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Exited(0))));
495        assert!(has_exit, "Expected Exited(0), got: {msgs:?}");
496    }
497
498    #[test]
499    fn nonexistent_program_sends_error() {
500        let sub =
501            ProcessSubscription::new("/nonexistent/program/that/should/not/exist", TestMsg::Proc);
502        let (tx, rx) = stdmpsc::channel();
503        let (signal, _trigger) = StopSignal::new();
504
505        let handle = thread::spawn(move || {
506            sub.run(tx, signal);
507        });
508
509        handle.join().unwrap();
510        let msgs: Vec<TestMsg> = rx.try_iter().collect();
511        let has_error = msgs
512            .iter()
513            .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Error(_))));
514        assert!(has_error, "Expected Error event, got: {msgs:?}");
515    }
516
517    #[test]
518    fn stop_signal_kills_long_running_process() {
519        let sub = ProcessSubscription::new("sleep", TestMsg::Proc).arg("60");
520        let (tx, rx) = stdmpsc::channel();
521        let (signal, trigger) = StopSignal::new();
522        let start = web_time::Instant::now();
523
524        let handle = thread::spawn(move || {
525            sub.run(tx, signal);
526        });
527
528        // Give it a moment to start, then stop
529        thread::sleep(Duration::from_millis(100));
530        trigger.stop();
531        handle.join().unwrap();
532        assert!(
533            start.elapsed() < Duration::from_secs(2),
534            "stop should kill a quiet process promptly"
535        );
536
537        let msgs: Vec<TestMsg> = rx.try_iter().collect();
538        let has_killed = msgs
539            .iter()
540            .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed)));
541        assert!(has_killed, "Expected Killed event, got: {msgs:?}");
542    }
543
544    #[test]
545    fn timeout_kills_process() {
546        let sub = ProcessSubscription::new("sleep", TestMsg::Proc)
547            .arg("60")
548            .timeout(Duration::from_millis(100));
549        let (tx, rx) = stdmpsc::channel();
550        let (signal, _trigger) = StopSignal::new();
551        let start = web_time::Instant::now();
552
553        let handle = thread::spawn(move || {
554            sub.run(tx, signal);
555        });
556
557        handle.join().unwrap();
558        assert!(
559            start.elapsed() < Duration::from_secs(2),
560            "timeout should kill a quiet process promptly"
561        );
562        let msgs: Vec<TestMsg> = rx.try_iter().collect();
563        let has_killed = msgs
564            .iter()
565            .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed)));
566        assert!(has_killed, "Expected Killed on timeout, got: {msgs:?}");
567    }
568
569    #[test]
570    fn env_vars_are_passed() {
571        let sub =
572            ProcessSubscription::new("env", TestMsg::Proc).env("FTUI_TEST_VAR", "test_value_42");
573        let (tx, rx) = stdmpsc::channel();
574        let (signal, trigger) = StopSignal::new();
575
576        let handle = thread::spawn(move || {
577            sub.run(tx, signal);
578        });
579
580        thread::sleep(Duration::from_millis(500));
581        trigger.stop();
582        handle.join().unwrap();
583
584        let msgs: Vec<TestMsg> = rx.try_iter().collect();
585        let has_var = msgs.iter().any(|m| match m {
586            TestMsg::Proc(ProcessEvent::Stdout(s)) => s.contains("FTUI_TEST_VAR=test_value_42"),
587            _ => false,
588        });
589        assert!(has_var, "Expected env var in output, got: {msgs:?}");
590    }
591
592    #[test]
593    fn multiple_args_via_args_method() {
594        let sub = ProcessSubscription::new("echo", TestMsg::Proc).args(["hello", "world"]);
595        let (tx, rx) = stdmpsc::channel();
596        let (signal, trigger) = StopSignal::new();
597
598        let handle = thread::spawn(move || {
599            sub.run(tx, signal);
600        });
601
602        thread::sleep(Duration::from_millis(500));
603        trigger.stop();
604        handle.join().unwrap();
605
606        let msgs: Vec<TestMsg> = rx.try_iter().collect();
607        let has_output = msgs.iter().any(|m| match m {
608            TestMsg::Proc(ProcessEvent::Stdout(s)) => s.contains("hello world"),
609            _ => false,
610        });
611        assert!(has_output, "Expected combined output, got: {msgs:?}");
612    }
613
614    #[test]
615    fn stderr_captured() {
616        // Use sh -c to write to stderr
617        let sub = ProcessSubscription::new("sh", TestMsg::Proc)
618            .arg("-c")
619            .arg("echo error_msg >&2");
620        let (tx, rx) = stdmpsc::channel();
621        let (signal, trigger) = StopSignal::new();
622
623        let handle = thread::spawn(move || {
624            sub.run(tx, signal);
625        });
626
627        thread::sleep(Duration::from_millis(500));
628        trigger.stop();
629        handle.join().unwrap();
630
631        let msgs: Vec<TestMsg> = rx.try_iter().collect();
632        let has_stderr = msgs.iter().any(|m| match m {
633            TestMsg::Proc(ProcessEvent::Stderr(s)) => s.contains("error_msg"),
634            _ => false,
635        });
636        assert!(has_stderr, "Expected stderr output, got: {msgs:?}");
637    }
638
639    #[test]
640    fn exit_code_captured() {
641        let sub = ProcessSubscription::new("sh", TestMsg::Proc)
642            .arg("-c")
643            .arg("exit 42");
644        let (tx, rx) = stdmpsc::channel();
645        let (signal, trigger) = StopSignal::new();
646
647        let handle = thread::spawn(move || {
648            sub.run(tx, signal);
649        });
650
651        thread::sleep(Duration::from_millis(500));
652        trigger.stop();
653        handle.join().unwrap();
654
655        let msgs: Vec<TestMsg> = rx.try_iter().collect();
656        let has_exit = msgs
657            .iter()
658            .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Exited(42))));
659        assert!(has_exit, "Expected Exited(42), got: {msgs:?}");
660    }
661
662    #[cfg(unix)]
663    #[test]
664    fn signal_exit_is_preserved() {
665        let sub = ProcessSubscription::new("sh", TestMsg::Proc)
666            .arg("-c")
667            .arg("kill -TERM $$");
668        let (tx, rx) = stdmpsc::channel();
669        let (signal, trigger) = StopSignal::new();
670
671        let handle = thread::spawn(move || {
672            sub.run(tx, signal);
673        });
674
675        thread::sleep(Duration::from_millis(500));
676        trigger.stop();
677        handle.join().unwrap();
678
679        let msgs: Vec<TestMsg> = rx.try_iter().collect();
680        let has_signal = msgs
681            .iter()
682            .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Signaled(15))));
683        assert!(has_signal, "Expected Signaled(15), got: {msgs:?}");
684    }
685
686    // =========================================================================
687    // PROCESS LIFECYCLE CONTRACT TESTS (bd-3s3yw)
688    //
689    // These tests capture the observable process supervision contract that
690    // the Asupersync migration must preserve.
691    // =========================================================================
692
693    /// CONTRACT: Process subscription uses CancellationToken internally for
694    /// stop coordination (via StopSignal::cancellation_token()).
695    #[test]
696    fn contract_uses_cancellation_token_for_stop() {
697        let sub = ProcessSubscription::new("sleep", TestMsg::Proc).arg("60");
698        let (tx, rx) = stdmpsc::channel();
699        let (signal, trigger) = StopSignal::new();
700
701        // Verify the cancellation token is accessible
702        let token = signal.cancellation_token().clone();
703        assert!(!token.is_cancelled());
704
705        let handle = thread::spawn(move || {
706            sub.run(tx, signal);
707        });
708
709        thread::sleep(Duration::from_millis(100));
710
711        // Stopping via trigger should cancel the token
712        trigger.stop();
713        assert!(token.is_cancelled());
714
715        handle.join().unwrap();
716
717        let msgs: Vec<TestMsg> = rx.try_iter().collect();
718        assert!(
719            msgs.iter()
720                .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed))),
721            "process must be killed on cancellation, got: {msgs:?}"
722        );
723    }
724
725    /// CONTRACT: Final event is always sent, even on error paths.
726    /// The subscription must always emit exactly one terminal event
727    /// (Exited, Signaled, Killed, or Error).
728    #[test]
729    fn contract_always_emits_terminal_event() {
730        // Happy path: process exits normally
731        {
732            let sub = ProcessSubscription::new("true", TestMsg::Proc);
733            let (tx, rx) = stdmpsc::channel();
734            let (signal, trigger) = StopSignal::new();
735
736            let handle = thread::spawn(move || {
737                sub.run(tx, signal);
738            });
739
740            thread::sleep(Duration::from_millis(500));
741            trigger.stop();
742            handle.join().unwrap();
743
744            let msgs: Vec<TestMsg> = rx.try_iter().collect();
745            let terminal_events: Vec<_> = msgs
746                .iter()
747                .filter(|m| {
748                    matches!(
749                        m,
750                        TestMsg::Proc(
751                            ProcessEvent::Exited(_)
752                                | ProcessEvent::Signaled(_)
753                                | ProcessEvent::Killed
754                                | ProcessEvent::Error(_)
755                        )
756                    )
757                })
758                .collect();
759            assert_eq!(
760                terminal_events.len(),
761                1,
762                "must emit exactly one terminal event, got: {terminal_events:?}"
763            );
764        }
765
766        // Error path: nonexistent program
767        {
768            let sub = ProcessSubscription::new(
769                "/nonexistent/program/that/should/not/exist",
770                TestMsg::Proc,
771            );
772            let (tx, rx) = stdmpsc::channel();
773            let (signal, _trigger) = StopSignal::new();
774
775            let handle = thread::spawn(move || {
776                sub.run(tx, signal);
777            });
778
779            handle.join().unwrap();
780
781            let msgs: Vec<TestMsg> = rx.try_iter().collect();
782            let terminal_events: Vec<_> = msgs
783                .iter()
784                .filter(|m| {
785                    matches!(
786                        m,
787                        TestMsg::Proc(
788                            ProcessEvent::Exited(_)
789                                | ProcessEvent::Signaled(_)
790                                | ProcessEvent::Killed
791                                | ProcessEvent::Error(_)
792                        )
793                    )
794                })
795                .collect();
796            assert_eq!(
797                terminal_events.len(),
798                1,
799                "must emit exactly one terminal event on error, got: {terminal_events:?}"
800            );
801        }
802    }
803
804    /// CONTRACT: stdout and stderr lines arrive before the terminal event.
805    /// The output forwarding threads must join before the final event is sent.
806    #[test]
807    fn contract_output_precedes_terminal_event() {
808        let sub = ProcessSubscription::new("sh", TestMsg::Proc)
809            .arg("-c")
810            .arg("echo FIRST && echo SECOND >&2 && exit 0");
811        let (tx, rx) = stdmpsc::channel();
812        let (signal, trigger) = StopSignal::new();
813
814        let handle = thread::spawn(move || {
815            sub.run(tx, signal);
816        });
817
818        thread::sleep(Duration::from_millis(500));
819        trigger.stop();
820        handle.join().unwrap();
821
822        let msgs: Vec<TestMsg> = rx.try_iter().collect();
823
824        // Find the position of the terminal event
825        let terminal_pos = msgs.iter().position(|m| {
826            matches!(
827                m,
828                TestMsg::Proc(
829                    ProcessEvent::Exited(_)
830                        | ProcessEvent::Signaled(_)
831                        | ProcessEvent::Killed
832                        | ProcessEvent::Error(_)
833                )
834            )
835        });
836
837        // Find positions of stdout/stderr events
838        let output_positions: Vec<usize> = msgs
839            .iter()
840            .enumerate()
841            .filter_map(|(i, m)| match m {
842                TestMsg::Proc(ProcessEvent::Stdout(_) | ProcessEvent::Stderr(_)) => Some(i),
843                _ => None,
844            })
845            .collect();
846
847        if let Some(term_pos) = terminal_pos {
848            for &out_pos in &output_positions {
849                assert!(
850                    out_pos < term_pos,
851                    "output event at position {out_pos} must precede terminal event at {term_pos}"
852                );
853            }
854        }
855    }
856
857    /// CONTRACT: ProcessSubscription ID includes timeout in the hash.
858    /// Changing timeout creates a different subscription identity.
859    #[test]
860    fn contract_id_includes_timeout() {
861        let s1: ProcessSubscription<TestMsg> =
862            ProcessSubscription::new("echo", TestMsg::Proc).timeout(Duration::from_secs(5));
863        let s2: ProcessSubscription<TestMsg> =
864            ProcessSubscription::new("echo", TestMsg::Proc).timeout(Duration::from_secs(10));
865        let s3: ProcessSubscription<TestMsg> = ProcessSubscription::new("echo", TestMsg::Proc);
866
867        assert_ne!(
868            s1.id(),
869            s2.id(),
870            "different timeouts must produce different IDs"
871        );
872        assert_ne!(
873            s1.id(),
874            s3.id(),
875            "timeout vs no-timeout must produce different IDs"
876        );
877    }
878
879    /// CONTRACT: Kill is prompt — process is killed within poll_interval (50ms)
880    /// of the stop signal, not blocked waiting for process output.
881    #[test]
882    fn contract_kill_is_prompt() {
883        let sub = ProcessSubscription::new("sleep", TestMsg::Proc).arg("60");
884        let (tx, rx) = stdmpsc::channel();
885        let (signal, trigger) = StopSignal::new();
886
887        let handle = thread::spawn(move || {
888            sub.run(tx, signal);
889        });
890
891        thread::sleep(Duration::from_millis(100));
892
893        let kill_start = web_time::Instant::now();
894        trigger.stop();
895        handle.join().unwrap();
896        let kill_elapsed = kill_start.elapsed();
897
898        assert!(
899            kill_elapsed < Duration::from_millis(500),
900            "kill must complete within 500ms of stop signal, took {kill_elapsed:?}"
901        );
902
903        let msgs: Vec<TestMsg> = rx.try_iter().collect();
904        assert!(
905            msgs.iter()
906                .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed))),
907            "must emit Killed event"
908        );
909    }
910
911    #[test]
912    fn stop_signal_does_not_block_when_background_descendant_keeps_pipes_open() {
913        let sub = ProcessSubscription::new("sh", TestMsg::Proc)
914            .arg("-c")
915            .arg("sleep 60 & sleep 60");
916        let (tx, rx) = stdmpsc::channel();
917        let (signal, trigger) = StopSignal::new();
918        let start = web_time::Instant::now();
919
920        let handle = thread::spawn(move || {
921            sub.run(tx, signal);
922        });
923
924        thread::sleep(Duration::from_millis(100));
925        trigger.stop();
926        handle.join().unwrap();
927
928        assert!(
929            start.elapsed() < Duration::from_secs(2),
930            "stop should not block behind inherited stdout/stderr pipes"
931        );
932
933        let msgs: Vec<TestMsg> = rx.try_iter().collect();
934        assert!(
935            msgs.iter()
936                .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed))),
937            "expected Killed event, got: {msgs:?}"
938        );
939    }
940
941    #[test]
942    fn timeout_does_not_block_when_background_descendant_keeps_pipes_open() {
943        let sub = ProcessSubscription::new("sh", TestMsg::Proc)
944            .arg("-c")
945            .arg("sleep 60 & sleep 60")
946            .timeout(Duration::from_millis(100));
947        let (tx, rx) = stdmpsc::channel();
948        let (signal, _trigger) = StopSignal::new();
949        let start = web_time::Instant::now();
950
951        let handle = thread::spawn(move || {
952            sub.run(tx, signal);
953        });
954
955        handle.join().unwrap();
956
957        assert!(
958            start.elapsed() < Duration::from_secs(2),
959            "timeout should not block behind inherited stdout/stderr pipes"
960        );
961
962        let msgs: Vec<TestMsg> = rx.try_iter().collect();
963        assert!(
964            msgs.iter()
965                .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed))),
966            "expected Killed event, got: {msgs:?}"
967        );
968    }
969}