1#![forbid(unsafe_code)]
38
39use crate::subscription::{StopSignal, SubId, Subscription};
40use std::collections::hash_map::DefaultHasher;
41use std::hash::{Hash, Hasher};
42use std::io::{BufRead, Read};
43use std::process::{Command, Stdio};
44use std::sync::mpsc;
45use web_time::{Duration, Instant};
46
47#[derive(Debug, Clone, PartialEq, Eq)]
49pub enum ProcessEvent {
50 Stdout(String),
52 Stderr(String),
54 Exited(i32),
56 Signaled(i32),
58 Killed,
60 Error(String),
62}
63
64pub struct ProcessSubscription<M: Send + 'static> {
70 program: String,
71 args: Vec<String>,
72 env: Vec<(String, String)>,
73 timeout: Option<Duration>,
74 id: SubId,
75 explicit_id: bool,
76 make_msg: std::sync::Arc<dyn Fn(ProcessEvent) -> M + Send + Sync>,
77}
78
79const PROCESS_READER_JOIN_TIMEOUT: Duration = Duration::from_millis(250);
80const PROCESS_READER_JOIN_POLL: Duration = Duration::from_millis(5);
81
82impl<M: Send + 'static> ProcessSubscription<M> {
83 fn computed_id(
84 program: &str,
85 args: &[String],
86 env: &[(String, String)],
87 timeout: Option<Duration>,
88 ) -> SubId {
89 let mut h = DefaultHasher::new();
90 "ProcessSubscription".hash(&mut h);
91 program.hash(&mut h);
92 args.hash(&mut h);
93 env.hash(&mut h);
94 timeout.map(|duration| duration.as_nanos()).hash(&mut h);
95 h.finish()
96 }
97
98 fn refresh_id(&mut self) {
99 if !self.explicit_id {
100 self.id = Self::computed_id(&self.program, &self.args, &self.env, self.timeout);
101 }
102 }
103
104 pub fn new(
109 program: impl Into<String>,
110 make_msg: impl Fn(ProcessEvent) -> M + Send + Sync + 'static,
111 ) -> Self {
112 let program = program.into();
113 let id = Self::computed_id(&program, &[], &[], None);
114 Self {
115 program,
116 args: Vec::new(),
117 env: Vec::new(),
118 timeout: None,
119 id,
120 explicit_id: false,
121 make_msg: std::sync::Arc::new(make_msg),
122 }
123 }
124
125 #[must_use]
127 pub fn arg(mut self, arg: impl Into<String>) -> Self {
128 self.args.push(arg.into());
129 self.refresh_id();
130 self
131 }
132
133 #[must_use]
135 pub fn args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
136 for a in args {
137 self = self.arg(a);
138 }
139 self
140 }
141
142 #[must_use]
144 pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
145 self.env.push((key.into(), value.into()));
146 self.refresh_id();
147 self
148 }
149
150 #[must_use]
152 pub fn timeout(mut self, duration: Duration) -> Self {
153 self.timeout = Some(duration);
154 self.refresh_id();
155 self
156 }
157
158 #[must_use]
160 pub fn with_id(mut self, id: SubId) -> Self {
161 self.id = id;
162 self.explicit_id = true;
163 self
164 }
165}
166
167impl<M: Send + 'static> Subscription<M> for ProcessSubscription<M> {
168 fn id(&self) -> SubId {
169 self.id
170 }
171
172 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
173 fn forward_lines<R, M>(
174 reader: std::io::BufReader<R>,
175 sender: mpsc::Sender<M>,
176 make_msg: impl Fn(String) -> M,
177 ) where
178 R: Read,
179 M: Send + 'static,
180 {
181 for line in reader.lines() {
182 match line {
183 Ok(line) => {
184 if sender.send(make_msg(line)).is_err() {
185 break;
186 }
187 }
188 Err(_) => break,
189 }
190 }
191 }
192
193 let spawn_start = web_time::Instant::now();
194 let sub_id = self.id;
195
196 let mut cmd = Command::new(&self.program);
197 cmd.args(&self.args)
198 .stdout(Stdio::piped())
199 .stderr(Stdio::piped())
200 .stdin(Stdio::null());
201
202 for (k, v) in &self.env {
203 cmd.env(k, v);
204 }
205
206 let mut child = match cmd.spawn() {
207 Ok(c) => {
208 tracing::debug!(
209 target: "ftui.process",
210 sub_id,
211 program = %self.program,
212 args = ?self.args,
213 spawn_us = spawn_start.elapsed().as_micros() as u64,
214 "process spawned"
215 );
216 c
217 }
218 Err(e) => {
219 tracing::warn!(
220 target: "ftui.process",
221 sub_id,
222 program = %self.program,
223 error = %e,
224 "process spawn failed"
225 );
226 let _ = sender.send((self.make_msg.as_ref())(ProcessEvent::Error(format!(
227 "Failed to spawn '{}': {}",
228 self.program, e
229 ))));
230 return;
231 }
232 };
233
234 let deadline = self.timeout.map(|t| web_time::Instant::now() + t);
235 let stdout = child.stdout.take();
236 let stderr = child.stderr.take();
237 let make_msg_ref = std::sync::Arc::clone(&self.make_msg);
238 let token = stop.cancellation_token().clone();
240 let poll_interval = Duration::from_millis(50);
241 let stdout_handle = stdout.map(|stdout| {
242 let sender_out = sender.clone();
243 let make_msg_out = std::sync::Arc::clone(&make_msg_ref);
244 std::thread::spawn(move || {
245 forward_lines(std::io::BufReader::new(stdout), sender_out, |line| {
246 (make_msg_out.as_ref())(ProcessEvent::Stdout(line))
247 });
248 })
249 });
250 let stderr_handle = stderr.map(|stderr| {
251 let sender_err = sender.clone();
252 let make_msg_err = std::sync::Arc::clone(&make_msg_ref);
253 std::thread::spawn(move || {
254 forward_lines(std::io::BufReader::new(stderr), sender_err, |line| {
255 (make_msg_err.as_ref())(ProcessEvent::Stderr(line))
256 });
257 })
258 });
259
260 let final_event = loop {
261 match child.try_wait() {
262 Ok(Some(status)) => {
263 let event = process_exit_event(status);
264 if let ProcessEvent::Exited(code) = &event {
265 tracing::debug!(
266 target: "ftui.process",
267 sub_id,
268 exit_code = *code,
269 elapsed_ms = spawn_start.elapsed().as_millis() as u64,
270 "process exited"
271 );
272 } else if let ProcessEvent::Signaled(signal) = &event {
273 tracing::debug!(
274 target: "ftui.process",
275 sub_id,
276 signal = *signal,
277 elapsed_ms = spawn_start.elapsed().as_millis() as u64,
278 "process terminated by signal"
279 );
280 }
281 break event;
282 }
283 Ok(None) => {}
284 Err(e) => {
285 tracing::warn!(
286 target: "ftui.process",
287 sub_id,
288 error = %e,
289 "process wait error"
290 );
291 break ProcessEvent::Error(format!("wait error: {e}"));
292 }
293 }
294
295 if let Some(dl) = deadline
296 && web_time::Instant::now() >= dl
297 {
298 tracing::debug!(
299 target: "ftui.process",
300 sub_id,
301 elapsed_ms = spawn_start.elapsed().as_millis() as u64,
302 reason = "timeout",
303 "killing process"
304 );
305 let _ = child.kill();
306 let _ = child.wait();
307 break ProcessEvent::Killed;
308 }
309
310 if token.wait_timeout(poll_interval) {
311 tracing::debug!(
312 target: "ftui.process",
313 sub_id,
314 elapsed_ms = spawn_start.elapsed().as_millis() as u64,
315 reason = "cancellation",
316 "killing process"
317 );
318 let _ = child.kill();
319 let _ = child.wait();
320 break ProcessEvent::Killed;
321 }
322 };
323
324 if let Some(handle) = stdout_handle {
325 join_reader_thread_bounded(handle, "stdout", sub_id);
326 }
327 if let Some(handle) = stderr_handle {
328 join_reader_thread_bounded(handle, "stderr", sub_id);
329 }
330
331 let _ = sender.send((make_msg_ref.as_ref())(final_event));
332 }
333}
334
335fn process_exit_event(status: std::process::ExitStatus) -> ProcessEvent {
336 #[cfg(unix)]
337 {
338 use std::os::unix::process::ExitStatusExt;
339
340 if let Some(signal) = status.signal() {
341 return ProcessEvent::Signaled(signal);
342 }
343 }
344
345 ProcessEvent::Exited(status.code().unwrap_or(-1))
346}
347
348fn join_reader_thread_bounded(
349 handle: std::thread::JoinHandle<()>,
350 stream: &'static str,
351 sub_id: SubId,
352) {
353 let start = Instant::now();
354 while !handle.is_finished() {
355 if start.elapsed() >= PROCESS_READER_JOIN_TIMEOUT {
356 tracing::warn!(
357 target: "ftui.process",
358 sub_id,
359 stream,
360 timeout_ms = PROCESS_READER_JOIN_TIMEOUT.as_millis() as u64,
361 "process reader thread did not exit within timeout; detaching"
362 );
363 detach_reader_join(handle, stream);
364 return;
365 }
366 std::thread::sleep(PROCESS_READER_JOIN_POLL);
367 }
368 let _ = handle.join();
369}
370
371fn detach_reader_join(handle: std::thread::JoinHandle<()>, stream: &'static str) {
372 let _ = std::thread::Builder::new()
373 .name(format!("ftui-process-{stream}-detached-join"))
374 .spawn(move || {
375 let _ = handle.join();
376 });
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use std::sync::mpsc as stdmpsc;
383 use std::thread;
384
385 #[derive(Debug, Clone, PartialEq)]
386 enum TestMsg {
387 Proc(ProcessEvent),
388 }
389
390 #[test]
391 fn process_event_variants() {
392 let stdout = ProcessEvent::Stdout("hello".into());
393 let stderr = ProcessEvent::Stderr("warn".into());
394 let exited = ProcessEvent::Exited(0);
395 let signaled = ProcessEvent::Signaled(15);
396 let killed = ProcessEvent::Killed;
397 let error = ProcessEvent::Error("oops".into());
398
399 assert_eq!(stdout, ProcessEvent::Stdout("hello".into()));
400 assert_eq!(stderr, ProcessEvent::Stderr("warn".into()));
401 assert_eq!(exited, ProcessEvent::Exited(0));
402 assert_eq!(signaled, ProcessEvent::Signaled(15));
403 assert_eq!(killed, ProcessEvent::Killed);
404 assert_eq!(error, ProcessEvent::Error("oops".into()));
405 }
406
407 #[test]
408 fn subscription_id_is_stable() {
409 let s1: ProcessSubscription<TestMsg> =
410 ProcessSubscription::new("echo", TestMsg::Proc).arg("hello");
411 let s2: ProcessSubscription<TestMsg> =
412 ProcessSubscription::new("echo", TestMsg::Proc).arg("hello");
413 assert_eq!(s1.id(), s2.id());
414 }
415
416 #[test]
417 fn different_args_produce_different_ids() {
418 let s1: ProcessSubscription<TestMsg> =
419 ProcessSubscription::new("echo", TestMsg::Proc).arg("hello");
420 let s2: ProcessSubscription<TestMsg> =
421 ProcessSubscription::new("echo", TestMsg::Proc).arg("world");
422 assert_ne!(s1.id(), s2.id());
423 }
424
425 #[test]
426 fn different_programs_produce_different_ids() {
427 let s1: ProcessSubscription<TestMsg> = ProcessSubscription::new("echo", TestMsg::Proc);
428 let s2: ProcessSubscription<TestMsg> = ProcessSubscription::new("cat", TestMsg::Proc);
429 assert_ne!(s1.id(), s2.id());
430 }
431
432 #[test]
433 fn custom_id_overrides_default() {
434 let s: ProcessSubscription<TestMsg> =
435 ProcessSubscription::new("echo", TestMsg::Proc).with_id(42);
436 assert_eq!(s.id(), 42);
437 }
438
439 #[test]
440 fn env_changes_affect_subscription_id() {
441 let s1: ProcessSubscription<TestMsg> =
442 ProcessSubscription::new("echo", TestMsg::Proc).env("FTUI_TEST_VAR", "a");
443 let s2: ProcessSubscription<TestMsg> =
444 ProcessSubscription::new("echo", TestMsg::Proc).env("FTUI_TEST_VAR", "b");
445 assert_ne!(s1.id(), s2.id());
446 }
447
448 #[test]
449 fn timeout_changes_affect_subscription_id() {
450 let s1: ProcessSubscription<TestMsg> =
451 ProcessSubscription::new("echo", TestMsg::Proc).timeout(Duration::from_millis(10));
452 let s2: ProcessSubscription<TestMsg> =
453 ProcessSubscription::new("echo", TestMsg::Proc).timeout(Duration::from_millis(20));
454 assert_ne!(s1.id(), s2.id());
455 }
456
457 #[test]
458 fn explicit_id_remains_stable_after_builder_changes() {
459 let s: ProcessSubscription<TestMsg> = ProcessSubscription::new("echo", TestMsg::Proc)
460 .with_id(42)
461 .arg("hello")
462 .env("FTUI_TEST_VAR", "value")
463 .timeout(Duration::from_millis(10));
464 assert_eq!(s.id(), 42);
465 }
466
467 #[test]
468 fn echo_captures_stdout() {
469 let sub = ProcessSubscription::new("echo", TestMsg::Proc).arg("hello world");
470 let (tx, rx) = stdmpsc::channel();
471 let (signal, trigger) = StopSignal::new();
472
473 let handle = thread::spawn(move || {
474 sub.run(tx, signal);
475 });
476
477 thread::sleep(Duration::from_millis(500));
479 trigger.stop();
480 handle.join().unwrap();
481
482 let msgs: Vec<TestMsg> = rx.try_iter().collect();
483 let has_stdout = msgs.iter().any(|m| match m {
484 TestMsg::Proc(ProcessEvent::Stdout(s)) => s.contains("hello world"),
485 _ => false,
486 });
487 assert!(
488 has_stdout,
489 "Expected stdout with 'hello world', got: {msgs:?}"
490 );
491
492 let has_exit = msgs
493 .iter()
494 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Exited(0))));
495 assert!(has_exit, "Expected Exited(0), got: {msgs:?}");
496 }
497
498 #[test]
499 fn nonexistent_program_sends_error() {
500 let sub =
501 ProcessSubscription::new("/nonexistent/program/that/should/not/exist", TestMsg::Proc);
502 let (tx, rx) = stdmpsc::channel();
503 let (signal, _trigger) = StopSignal::new();
504
505 let handle = thread::spawn(move || {
506 sub.run(tx, signal);
507 });
508
509 handle.join().unwrap();
510 let msgs: Vec<TestMsg> = rx.try_iter().collect();
511 let has_error = msgs
512 .iter()
513 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Error(_))));
514 assert!(has_error, "Expected Error event, got: {msgs:?}");
515 }
516
517 #[test]
518 fn stop_signal_kills_long_running_process() {
519 let sub = ProcessSubscription::new("sleep", TestMsg::Proc).arg("60");
520 let (tx, rx) = stdmpsc::channel();
521 let (signal, trigger) = StopSignal::new();
522 let start = web_time::Instant::now();
523
524 let handle = thread::spawn(move || {
525 sub.run(tx, signal);
526 });
527
528 thread::sleep(Duration::from_millis(100));
530 trigger.stop();
531 handle.join().unwrap();
532 assert!(
533 start.elapsed() < Duration::from_secs(2),
534 "stop should kill a quiet process promptly"
535 );
536
537 let msgs: Vec<TestMsg> = rx.try_iter().collect();
538 let has_killed = msgs
539 .iter()
540 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed)));
541 assert!(has_killed, "Expected Killed event, got: {msgs:?}");
542 }
543
544 #[test]
545 fn timeout_kills_process() {
546 let sub = ProcessSubscription::new("sleep", TestMsg::Proc)
547 .arg("60")
548 .timeout(Duration::from_millis(100));
549 let (tx, rx) = stdmpsc::channel();
550 let (signal, _trigger) = StopSignal::new();
551 let start = web_time::Instant::now();
552
553 let handle = thread::spawn(move || {
554 sub.run(tx, signal);
555 });
556
557 handle.join().unwrap();
558 assert!(
559 start.elapsed() < Duration::from_secs(2),
560 "timeout should kill a quiet process promptly"
561 );
562 let msgs: Vec<TestMsg> = rx.try_iter().collect();
563 let has_killed = msgs
564 .iter()
565 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed)));
566 assert!(has_killed, "Expected Killed on timeout, got: {msgs:?}");
567 }
568
569 #[test]
570 fn env_vars_are_passed() {
571 let sub =
572 ProcessSubscription::new("env", TestMsg::Proc).env("FTUI_TEST_VAR", "test_value_42");
573 let (tx, rx) = stdmpsc::channel();
574 let (signal, trigger) = StopSignal::new();
575
576 let handle = thread::spawn(move || {
577 sub.run(tx, signal);
578 });
579
580 thread::sleep(Duration::from_millis(500));
581 trigger.stop();
582 handle.join().unwrap();
583
584 let msgs: Vec<TestMsg> = rx.try_iter().collect();
585 let has_var = msgs.iter().any(|m| match m {
586 TestMsg::Proc(ProcessEvent::Stdout(s)) => s.contains("FTUI_TEST_VAR=test_value_42"),
587 _ => false,
588 });
589 assert!(has_var, "Expected env var in output, got: {msgs:?}");
590 }
591
592 #[test]
593 fn multiple_args_via_args_method() {
594 let sub = ProcessSubscription::new("echo", TestMsg::Proc).args(["hello", "world"]);
595 let (tx, rx) = stdmpsc::channel();
596 let (signal, trigger) = StopSignal::new();
597
598 let handle = thread::spawn(move || {
599 sub.run(tx, signal);
600 });
601
602 thread::sleep(Duration::from_millis(500));
603 trigger.stop();
604 handle.join().unwrap();
605
606 let msgs: Vec<TestMsg> = rx.try_iter().collect();
607 let has_output = msgs.iter().any(|m| match m {
608 TestMsg::Proc(ProcessEvent::Stdout(s)) => s.contains("hello world"),
609 _ => false,
610 });
611 assert!(has_output, "Expected combined output, got: {msgs:?}");
612 }
613
614 #[test]
615 fn stderr_captured() {
616 let sub = ProcessSubscription::new("sh", TestMsg::Proc)
618 .arg("-c")
619 .arg("echo error_msg >&2");
620 let (tx, rx) = stdmpsc::channel();
621 let (signal, trigger) = StopSignal::new();
622
623 let handle = thread::spawn(move || {
624 sub.run(tx, signal);
625 });
626
627 thread::sleep(Duration::from_millis(500));
628 trigger.stop();
629 handle.join().unwrap();
630
631 let msgs: Vec<TestMsg> = rx.try_iter().collect();
632 let has_stderr = msgs.iter().any(|m| match m {
633 TestMsg::Proc(ProcessEvent::Stderr(s)) => s.contains("error_msg"),
634 _ => false,
635 });
636 assert!(has_stderr, "Expected stderr output, got: {msgs:?}");
637 }
638
639 #[test]
640 fn exit_code_captured() {
641 let sub = ProcessSubscription::new("sh", TestMsg::Proc)
642 .arg("-c")
643 .arg("exit 42");
644 let (tx, rx) = stdmpsc::channel();
645 let (signal, trigger) = StopSignal::new();
646
647 let handle = thread::spawn(move || {
648 sub.run(tx, signal);
649 });
650
651 thread::sleep(Duration::from_millis(500));
652 trigger.stop();
653 handle.join().unwrap();
654
655 let msgs: Vec<TestMsg> = rx.try_iter().collect();
656 let has_exit = msgs
657 .iter()
658 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Exited(42))));
659 assert!(has_exit, "Expected Exited(42), got: {msgs:?}");
660 }
661
662 #[cfg(unix)]
663 #[test]
664 fn signal_exit_is_preserved() {
665 let sub = ProcessSubscription::new("sh", TestMsg::Proc)
666 .arg("-c")
667 .arg("kill -TERM $$");
668 let (tx, rx) = stdmpsc::channel();
669 let (signal, trigger) = StopSignal::new();
670
671 let handle = thread::spawn(move || {
672 sub.run(tx, signal);
673 });
674
675 thread::sleep(Duration::from_millis(500));
676 trigger.stop();
677 handle.join().unwrap();
678
679 let msgs: Vec<TestMsg> = rx.try_iter().collect();
680 let has_signal = msgs
681 .iter()
682 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Signaled(15))));
683 assert!(has_signal, "Expected Signaled(15), got: {msgs:?}");
684 }
685
686 #[test]
696 fn contract_uses_cancellation_token_for_stop() {
697 let sub = ProcessSubscription::new("sleep", TestMsg::Proc).arg("60");
698 let (tx, rx) = stdmpsc::channel();
699 let (signal, trigger) = StopSignal::new();
700
701 let token = signal.cancellation_token().clone();
703 assert!(!token.is_cancelled());
704
705 let handle = thread::spawn(move || {
706 sub.run(tx, signal);
707 });
708
709 thread::sleep(Duration::from_millis(100));
710
711 trigger.stop();
713 assert!(token.is_cancelled());
714
715 handle.join().unwrap();
716
717 let msgs: Vec<TestMsg> = rx.try_iter().collect();
718 assert!(
719 msgs.iter()
720 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed))),
721 "process must be killed on cancellation, got: {msgs:?}"
722 );
723 }
724
725 #[test]
729 fn contract_always_emits_terminal_event() {
730 {
732 let sub = ProcessSubscription::new("true", TestMsg::Proc);
733 let (tx, rx) = stdmpsc::channel();
734 let (signal, trigger) = StopSignal::new();
735
736 let handle = thread::spawn(move || {
737 sub.run(tx, signal);
738 });
739
740 thread::sleep(Duration::from_millis(500));
741 trigger.stop();
742 handle.join().unwrap();
743
744 let msgs: Vec<TestMsg> = rx.try_iter().collect();
745 let terminal_events: Vec<_> = msgs
746 .iter()
747 .filter(|m| {
748 matches!(
749 m,
750 TestMsg::Proc(
751 ProcessEvent::Exited(_)
752 | ProcessEvent::Signaled(_)
753 | ProcessEvent::Killed
754 | ProcessEvent::Error(_)
755 )
756 )
757 })
758 .collect();
759 assert_eq!(
760 terminal_events.len(),
761 1,
762 "must emit exactly one terminal event, got: {terminal_events:?}"
763 );
764 }
765
766 {
768 let sub = ProcessSubscription::new(
769 "/nonexistent/program/that/should/not/exist",
770 TestMsg::Proc,
771 );
772 let (tx, rx) = stdmpsc::channel();
773 let (signal, _trigger) = StopSignal::new();
774
775 let handle = thread::spawn(move || {
776 sub.run(tx, signal);
777 });
778
779 handle.join().unwrap();
780
781 let msgs: Vec<TestMsg> = rx.try_iter().collect();
782 let terminal_events: Vec<_> = msgs
783 .iter()
784 .filter(|m| {
785 matches!(
786 m,
787 TestMsg::Proc(
788 ProcessEvent::Exited(_)
789 | ProcessEvent::Signaled(_)
790 | ProcessEvent::Killed
791 | ProcessEvent::Error(_)
792 )
793 )
794 })
795 .collect();
796 assert_eq!(
797 terminal_events.len(),
798 1,
799 "must emit exactly one terminal event on error, got: {terminal_events:?}"
800 );
801 }
802 }
803
804 #[test]
807 fn contract_output_precedes_terminal_event() {
808 let sub = ProcessSubscription::new("sh", TestMsg::Proc)
809 .arg("-c")
810 .arg("echo FIRST && echo SECOND >&2 && exit 0");
811 let (tx, rx) = stdmpsc::channel();
812 let (signal, trigger) = StopSignal::new();
813
814 let handle = thread::spawn(move || {
815 sub.run(tx, signal);
816 });
817
818 thread::sleep(Duration::from_millis(500));
819 trigger.stop();
820 handle.join().unwrap();
821
822 let msgs: Vec<TestMsg> = rx.try_iter().collect();
823
824 let terminal_pos = msgs.iter().position(|m| {
826 matches!(
827 m,
828 TestMsg::Proc(
829 ProcessEvent::Exited(_)
830 | ProcessEvent::Signaled(_)
831 | ProcessEvent::Killed
832 | ProcessEvent::Error(_)
833 )
834 )
835 });
836
837 let output_positions: Vec<usize> = msgs
839 .iter()
840 .enumerate()
841 .filter_map(|(i, m)| match m {
842 TestMsg::Proc(ProcessEvent::Stdout(_) | ProcessEvent::Stderr(_)) => Some(i),
843 _ => None,
844 })
845 .collect();
846
847 if let Some(term_pos) = terminal_pos {
848 for &out_pos in &output_positions {
849 assert!(
850 out_pos < term_pos,
851 "output event at position {out_pos} must precede terminal event at {term_pos}"
852 );
853 }
854 }
855 }
856
857 #[test]
860 fn contract_id_includes_timeout() {
861 let s1: ProcessSubscription<TestMsg> =
862 ProcessSubscription::new("echo", TestMsg::Proc).timeout(Duration::from_secs(5));
863 let s2: ProcessSubscription<TestMsg> =
864 ProcessSubscription::new("echo", TestMsg::Proc).timeout(Duration::from_secs(10));
865 let s3: ProcessSubscription<TestMsg> = ProcessSubscription::new("echo", TestMsg::Proc);
866
867 assert_ne!(
868 s1.id(),
869 s2.id(),
870 "different timeouts must produce different IDs"
871 );
872 assert_ne!(
873 s1.id(),
874 s3.id(),
875 "timeout vs no-timeout must produce different IDs"
876 );
877 }
878
879 #[test]
882 fn contract_kill_is_prompt() {
883 let sub = ProcessSubscription::new("sleep", TestMsg::Proc).arg("60");
884 let (tx, rx) = stdmpsc::channel();
885 let (signal, trigger) = StopSignal::new();
886
887 let handle = thread::spawn(move || {
888 sub.run(tx, signal);
889 });
890
891 thread::sleep(Duration::from_millis(100));
892
893 let kill_start = web_time::Instant::now();
894 trigger.stop();
895 handle.join().unwrap();
896 let kill_elapsed = kill_start.elapsed();
897
898 assert!(
899 kill_elapsed < Duration::from_millis(500),
900 "kill must complete within 500ms of stop signal, took {kill_elapsed:?}"
901 );
902
903 let msgs: Vec<TestMsg> = rx.try_iter().collect();
904 assert!(
905 msgs.iter()
906 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed))),
907 "must emit Killed event"
908 );
909 }
910
911 #[test]
912 fn stop_signal_does_not_block_when_background_descendant_keeps_pipes_open() {
913 let sub = ProcessSubscription::new("sh", TestMsg::Proc)
914 .arg("-c")
915 .arg("sleep 60 & sleep 60");
916 let (tx, rx) = stdmpsc::channel();
917 let (signal, trigger) = StopSignal::new();
918 let start = web_time::Instant::now();
919
920 let handle = thread::spawn(move || {
921 sub.run(tx, signal);
922 });
923
924 thread::sleep(Duration::from_millis(100));
925 trigger.stop();
926 handle.join().unwrap();
927
928 assert!(
929 start.elapsed() < Duration::from_secs(2),
930 "stop should not block behind inherited stdout/stderr pipes"
931 );
932
933 let msgs: Vec<TestMsg> = rx.try_iter().collect();
934 assert!(
935 msgs.iter()
936 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed))),
937 "expected Killed event, got: {msgs:?}"
938 );
939 }
940
941 #[test]
942 fn timeout_does_not_block_when_background_descendant_keeps_pipes_open() {
943 let sub = ProcessSubscription::new("sh", TestMsg::Proc)
944 .arg("-c")
945 .arg("sleep 60 & sleep 60")
946 .timeout(Duration::from_millis(100));
947 let (tx, rx) = stdmpsc::channel();
948 let (signal, _trigger) = StopSignal::new();
949 let start = web_time::Instant::now();
950
951 let handle = thread::spawn(move || {
952 sub.run(tx, signal);
953 });
954
955 handle.join().unwrap();
956
957 assert!(
958 start.elapsed() < Duration::from_secs(2),
959 "timeout should not block behind inherited stdout/stderr pipes"
960 );
961
962 let msgs: Vec<TestMsg> = rx.try_iter().collect();
963 assert!(
964 msgs.iter()
965 .any(|m| matches!(m, TestMsg::Proc(ProcessEvent::Killed))),
966 "expected Killed event, got: {msgs:?}"
967 );
968 }
969}