1use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::time::Duration;
10
11use tokio::io::{AsyncBufReadExt, BufReader};
12use tokio::process::Command;
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15
16use crate::action::ActionMessage;
17use crate::action::ActionState;
18use crate::action_filter::{ActionFilter, ActionMessageKind, ActionMessageValue};
19use crate::error::SessionError;
20use crate::logging::LogContent;
21use crate::runner::CancelMethod;
22use crate::session_log;
23use crate::session_user::SessionUser;
24use std::sync::Arc;
25
26const STDOUT_GRACE_TIME: Duration = Duration::from_secs(5);
33
34const STDOUT_GRACE_TIME_POST_TERMINATE: Duration = Duration::from_secs(2);
53
54const STDOUT_DRAIN_AFTER_KILL: Duration = Duration::from_secs(1);
56
57pub(crate) const LOG_LINE_MAX_LENGTH: usize = 64 * 1024;
59
60pub(crate) fn truncate_line(line: &str) -> &str {
62 if line.len() > LOG_LINE_MAX_LENGTH {
63 &line[..line.floor_char_boundary(LOG_LINE_MAX_LENGTH)]
64 } else {
65 line
66 }
67}
68
69#[derive(Debug)]
71pub struct SubprocessResult {
72 pub state: ActionState,
73 pub exit_code: Option<i32>,
74 pub stdout: String,
75}
76
77pub struct SubprocessConfig {
79 pub args: Vec<String>,
80 pub env_vars: HashMap<String, Option<String>>,
81 pub working_dir: Option<PathBuf>,
82 pub timeout: Option<Duration>,
83 pub user: Option<Arc<dyn SessionUser>>,
84 pub cancel_method: CancelMethod,
85 pub cancel_request_rx: Option<tokio::sync::watch::Receiver<Option<Duration>>>,
86 pub debug_collect_stdout: bool,
92}
93
94#[cfg(unix)]
99mod platform {
100 use super::*;
101
102 pub fn notify_process_group(pgid: i32) -> Result<(), std::io::Error> {
104 nix::sys::signal::killpg(
105 nix::unistd::Pid::from_raw(pgid),
106 nix::sys::signal::Signal::SIGTERM,
107 )
108 .map_err(std::io::Error::other)
109 }
110
111 pub fn terminate_process_group(pgid: i32) -> Result<(), std::io::Error> {
113 nix::sys::signal::killpg(
114 nix::unistd::Pid::from_raw(pgid),
115 nix::sys::signal::Signal::SIGKILL,
116 )
117 .map_err(std::io::Error::other)
118 }
119
120 pub fn send_terminate(pid: i32) {
122 let _ = terminate_process_group(pid);
123 }
124
125 pub fn send_notify(pid: i32) {
127 let _ = notify_process_group(pid);
128 }
129
130 pub fn spawn_delayed_terminate(pid: i32, delay: Duration) {
132 tokio::spawn(async move {
133 tokio::time::sleep(delay).await;
134 let _ = terminate_process_group(pid);
135 });
136 }
137
138 pub unsafe fn configure_command(
146 cmd: &mut Command,
147 use_setsid: bool,
148 ) -> Option<Box<dyn tokio::io::AsyncRead + Unpin + Send>> {
149 cmd.pre_exec(move || {
150 if nix::libc::dup2(1, 2) == -1 {
152 return Err(std::io::Error::last_os_error());
153 }
154 if use_setsid {
155 nix::libc::setsid();
156 }
157 Ok(())
158 });
159 None
160 }
161}
162
163#[cfg(windows)]
164mod platform {
165 use super::*;
166
167 use windows::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
168 use windows::Win32::System::Threading::{
169 GetExitCodeProcess, OpenProcess, TerminateProcess, CREATE_NEW_PROCESS_GROUP,
170 PROCESS_QUERY_INFORMATION, PROCESS_TERMINATE,
171 };
172
173 fn send_ctrl_break(pid: u32) -> bool {
182 use windows::Win32::System::Console::{
183 AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, CTRL_BREAK_EVENT,
184 };
185
186 if crate::win32::is_session_zero() {
189 log::info!(target: "openjd.sessions", "Running in Session 0, skipping CTRL_BREAK (will fall back to terminate)");
190 return false;
191 }
192
193 unsafe {
194 let _ = FreeConsole();
196 if AttachConsole(pid).is_err() {
198 let _ = AttachConsole(u32::MAX); return false;
201 }
202 let ok = GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid).is_ok();
203 let _ = FreeConsole();
205 let _ = AttachConsole(u32::MAX);
206 ok
207 }
208 }
209
210 fn kill_process(pid: u32) -> bool {
212 unsafe {
213 let handle = OpenProcess(PROCESS_TERMINATE, false, pid);
214 if let Ok(h) = handle {
215 let ok = TerminateProcess(h, 1).is_ok();
216 let _ = CloseHandle(h);
217 ok
218 } else {
219 false
220 }
221 }
222 }
223
224 #[allow(dead_code)]
226 fn is_process_alive(pid: u32) -> bool {
227 unsafe {
228 let handle = OpenProcess(PROCESS_QUERY_INFORMATION, false, pid);
229 if let Ok(h) = handle {
230 let mut code = 0u32;
231 let _ = GetExitCodeProcess(h, &mut code);
232 let _ = CloseHandle(h);
233 code == STILL_ACTIVE.0 as u32
234 } else {
235 false
236 }
237 }
238 }
239
240 fn get_child_pids(parent_pid: u32) -> Vec<u32> {
242 use windows::Win32::System::Diagnostics::ToolHelp::{
243 CreateToolhelp32Snapshot, Process32FirstW, Process32NextW, PROCESSENTRY32W,
244 TH32CS_SNAPPROCESS,
245 };
246 let mut children = Vec::new();
247 unsafe {
248 let snap = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
249 if let Ok(snap) = snap {
250 let mut entry = PROCESSENTRY32W {
251 dwSize: std::mem::size_of::<PROCESSENTRY32W>() as u32,
252 ..Default::default()
253 };
254 if Process32FirstW(snap, &mut entry).is_ok() {
255 loop {
256 if entry.th32ParentProcessID == parent_pid {
257 children.push(entry.th32ProcessID);
258 }
259 if Process32NextW(snap, &mut entry).is_err() {
260 break;
261 }
262 }
263 }
264 let _ = CloseHandle(snap);
265 }
266 }
267 children
268 }
269
270 fn kill_process_tree(root_pid: u32) {
273 let mut to_kill = Vec::new();
274 collect_tree(root_pid, &mut to_kill);
275 for &pid in to_kill.iter().rev() {
277 kill_process(pid);
278 }
279 }
280
281 fn collect_tree(pid: u32, result: &mut Vec<u32>) {
282 result.push(pid);
283 for child in get_child_pids(pid) {
284 collect_tree(child, result);
285 }
286 }
287
288 pub fn send_terminate(pid: i32) {
290 kill_process_tree(pid as u32);
291 }
292
293 pub fn send_notify(pid: i32) {
295 if !send_ctrl_break(pid as u32) {
296 log::warn!(target: "openjd.sessions", "Failed to send CTRL_BREAK to pid {pid}, falling back to terminate");
297 send_terminate(pid);
298 }
299 }
300
301 pub fn spawn_delayed_terminate(pid: i32, delay: Duration) {
303 tokio::spawn(async move {
304 tokio::time::sleep(delay).await;
305 kill_process_tree(pid as u32);
306 });
307 }
308
309 pub unsafe fn configure_command(
317 cmd: &mut Command,
318 _use_setsid: bool,
319 ) -> Option<Box<dyn tokio::io::AsyncRead + Unpin + Send>> {
320 use std::os::windows::io::{FromRawHandle, OwnedHandle};
321 use windows::Win32::Foundation::HANDLE;
322 use windows::Win32::Security::SECURITY_ATTRIBUTES;
323 use windows::Win32::System::Pipes::CreatePipe;
324
325 cmd.creation_flags(CREATE_NEW_PROCESS_GROUP.0);
327
328 let mut read_handle = HANDLE::default();
330 let mut write_handle = HANDLE::default();
331 let sa = SECURITY_ATTRIBUTES {
332 nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
333 bInheritHandle: true.into(),
334 lpSecurityDescriptor: std::ptr::null_mut(),
335 };
336 if CreatePipe(&mut read_handle, &mut write_handle, Some(&sa), 0).is_err() {
337 cmd.stdout(std::process::Stdio::piped());
339 cmd.stderr(std::process::Stdio::piped());
340 return None;
341 }
342
343 let write_owned = OwnedHandle::from_raw_handle(write_handle.0);
346 let write_stdio_stdout = std::process::Stdio::from(write_owned);
347
348 use windows::Win32::Foundation::DuplicateHandle;
350 use windows::Win32::System::Threading::GetCurrentProcess;
351 let mut write_handle_dup = HANDLE::default();
352 let current_process = GetCurrentProcess();
353 if DuplicateHandle(
354 current_process,
355 write_handle,
356 current_process,
357 &mut write_handle_dup,
358 0,
359 true, windows::Win32::Foundation::DUPLICATE_SAME_ACCESS,
361 )
362 .is_err()
363 {
364 cmd.stdout(write_stdio_stdout);
366 cmd.stderr(std::process::Stdio::piped());
367 let read_owned = OwnedHandle::from_raw_handle(read_handle.0);
368 let read_std: std::fs::File = std::fs::File::from(read_owned);
369 let read_tokio = tokio::fs::File::from_std(read_std);
370 return Some(Box::new(read_tokio));
371 }
372 let write_owned_dup = OwnedHandle::from_raw_handle(write_handle_dup.0);
373 let write_stdio_stderr = std::process::Stdio::from(write_owned_dup);
374
375 cmd.stdout(write_stdio_stdout);
376 cmd.stderr(write_stdio_stderr);
377
378 let read_owned = OwnedHandle::from_raw_handle(read_handle.0);
380 let read_std: std::fs::File = std::fs::File::from(read_owned);
381 let read_tokio = tokio::fs::File::from_std(read_std);
382 Some(Box::new(read_tokio))
383 }
384}
385
386use platform::*;
387
388fn write_cancel_info(working_dir: &Path, terminate_delay: Duration) {
391 let notify_end = std::time::SystemTime::now() + terminate_delay;
392 let secs = notify_end
393 .duration_since(std::time::UNIX_EPOCH)
394 .unwrap_or_default()
395 .as_secs();
396 let s = secs % 60;
398 let m = (secs / 60) % 60;
399 let h = (secs / 3600) % 24;
400 let total_days = secs / 86400;
401 let (y, mo, d) = days_to_ymd(total_days);
403 let timestamp = format!("{y:04}-{mo:02}-{d:02}T{h:02}:{m:02}:{s:02}Z");
404 let info = serde_json::json!({ "NotifyEnd": timestamp });
405 let path = working_dir.join("cancel_info.json");
406 let _ = std::fs::write(&path, serde_json::to_string(&info).unwrap_or_default());
407}
408
409fn days_to_ymd(total_days: u64) -> (u64, u64, u64) {
411 let z = total_days + 719468;
413 let era = z / 146097;
414 let doe = z - era * 146097;
415 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
416 let y = yoe + era * 400;
417 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
418 let mp = (5 * doy + 2) / 153;
419 let d = doy - (153 * mp + 2) / 5 + 1;
420 let m = if mp < 10 { mp + 3 } else { mp - 9 };
421 let y = if m <= 2 { y + 1 } else { y };
422 (y, m, d)
423}
424
425pub(crate) fn format_command_for_log(args: &[String]) -> String {
428 let joined =
429 shlex::try_join(args.iter().map(|s| s.as_str())).unwrap_or_else(|_| args.join(" "));
430 crate::action_filter::redact_openjd_redacted_env_requests(&joined)
431}
432
433pub async fn run_subprocess(
443 config: SubprocessConfig,
444 filter: &mut ActionFilter,
445 session_id: &str,
446 message_tx: mpsc::UnboundedSender<ActionMessage>,
447 cancel_token: CancellationToken,
448) -> Result<SubprocessResult, SessionError> {
449 let args = &config.args;
450 if args.is_empty() {
451 return Err(SessionError::Runtime("No command specified".into()));
452 }
453
454 if config.user.as_deref().is_some_and(|u| !u.is_process_user()) {
456 return Err(SessionError::Runtime(
457 "Cross-user subprocess execution requires the helper binary. \
458 Use run_via_helper instead of run_subprocess for cross-user actions."
459 .into(),
460 ));
461 }
462
463 let mut merged: HashMap<String, String> = std::env::vars().collect();
465 for (k, v) in &config.env_vars {
466 match v {
467 Some(val) => {
468 merged.insert(k.clone(), val.clone());
469 }
470 None => {
471 merged.remove(k);
472 }
473 }
474 }
475
476 session_log!(
478 info,
479 session_id,
480 LogContent::FILE_PATH | LogContent::PROCESS_CONTROL,
481 "Running command {}",
482 format_command_for_log(args)
483 );
484
485 #[cfg(windows)]
488 let win32_process_handle: Option<windows::Win32::Foundation::HANDLE> = None;
489
490 #[allow(unused_mut)]
491 let (mut child, pid, stdout_for_reading): (
492 Option<tokio::process::Child>,
493 i32,
494 Option<Box<dyn tokio::io::AsyncRead + Unpin + Send>>,
495 ) = {
496 let mut cmd = Command::new(&args[0]);
497 cmd.args(&args[1..]);
498 cmd.env_clear();
499 for (k, v) in &merged {
500 cmd.env(k, v);
501 }
502 if let Some(dir) = &config.working_dir {
503 cmd.current_dir(dir);
504 }
505 let merged_reader = unsafe { configure_command(&mut cmd, true) };
506 if merged_reader.is_none() {
507 cmd.stdout(std::process::Stdio::piped());
508 }
509 let mut c = cmd.spawn().map_err(|e| {
510 session_log!(
511 info,
512 session_id,
513 LogContent::EXCEPTION_INFO | LogContent::PROCESS_CONTROL,
514 "Process failed to start: '{}': {}",
515 args[0],
516 e
517 );
518 SessionError::SubprocessStart {
519 command: args[0].clone(),
520 source: e,
521 }
522 })?;
523 let p = c.id().unwrap_or(0) as i32;
524 let stdout = merged_reader.or_else(|| {
525 c.stdout
526 .take()
527 .map(|s| Box::new(s) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
528 });
529 (Some(c), p, stdout)
530 };
531
532 session_log!(
533 info,
534 session_id,
535 LogContent::PROCESS_CONTROL,
536 "Command started as pid: {}",
537 pid
538 );
539 session_log!(
540 info,
541 session_id,
542 LogContent::BANNER | LogContent::COMMAND_OUTPUT,
543 "Output:"
544 );
545
546 let mut cancel_requested = false;
548 let mut timed_out = false;
549 let mut terminate_sent = false;
561 let mut stdout_collected = String::new();
562 let mut saw_fail = false;
563
564 if let Some(stdout) = stdout_for_reading {
565 let mut reader = BufReader::new(stdout);
566 let mut line_buf = Vec::new();
567
568 let timeout_fut = async {
570 match config.timeout {
571 Some(d) => tokio::time::sleep(d).await,
572 None => std::future::pending().await,
573 }
574 };
575 tokio::pin!(timeout_fut);
576
577 let drain_deadline = tokio::time::sleep(Duration::MAX);
582 tokio::pin!(drain_deadline);
583
584 loop {
585 tokio::select! {
586 biased;
587
588 _ = &mut drain_deadline, if cancel_requested => {
589 session_log!(info, session_id, LogContent::PROCESS_CONTROL,
590 "Stdout drain grace period expired, stopping read loop");
591 break;
592 }
593
594 _ = cancel_token.cancelled(), if !cancel_requested => {
595 cancel_requested = true;
596 drain_deadline.as_mut().reset(tokio::time::Instant::now() + STDOUT_DRAIN_AFTER_KILL);
597 let time_limit = config.cancel_request_rx.as_ref()
598 .and_then(|rx| *rx.borrow());
599
600 match (&config.cancel_method, time_limit) {
601 (_, Some(limit)) if limit.is_zero() => {
602 session_log!(info, session_id, LogContent::PROCESS_CONTROL, "Urgent cancel (time_limit=0), sending SIGKILL to process group {}", pid);
603 send_terminate(pid);
604 terminate_sent = true;
605 }
606 (CancelMethod::Terminate, _) => {
607 session_log!(info, session_id, LogContent::PROCESS_CONTROL, "Sending SIGKILL to process group {}", pid);
608 send_terminate(pid);
609 terminate_sent = true;
610 }
611 (CancelMethod::NotifyThenTerminate { terminate_delay }, _) => {
612 let delay = match time_limit {
613 Some(limit) => limit.min(*terminate_delay),
614 None => *terminate_delay,
615 };
616 if let Some(dir) = &config.working_dir {
617 write_cancel_info(dir, delay);
618 }
619 session_log!(info, session_id, LogContent::PROCESS_CONTROL, "Sending SIGTERM to process group {} (grace period: {:?})", pid, delay);
620 send_notify(pid);
621 spawn_delayed_terminate(pid, delay);
622 }
630 }
631 }
632
633 _ = &mut timeout_fut, if !cancel_requested && !timed_out => {
634 timed_out = true;
635 cancel_requested = true;
636 drain_deadline.as_mut().reset(tokio::time::Instant::now() + STDOUT_DRAIN_AFTER_KILL);
637 session_log!(info, session_id, LogContent::PROCESS_CONTROL, "Action timed out, sending SIGKILL to process group");
638 send_terminate(pid);
639 terminate_sent = true;
640 }
641
642 n = reader.read_until(b'\n', &mut line_buf) => {
643 match n {
644 Ok(0) => break, Ok(_) => {
646 if line_buf.last() == Some(&b'\n') {
648 line_buf.pop();
649 }
650 if line_buf.last() == Some(&b'\r') {
651 line_buf.pop();
652 }
653 let line = String::from_utf8_lossy(&line_buf);
654 let line = truncate_line(&line).to_string();
655 line_buf.clear();
656 let (display, pass_through) = process_line(&line, filter, session_id, &message_tx, &mut saw_fail);
657 if pass_through && filter.min_log_level() <= 20 {
658 session_log!(info, session_id, LogContent::COMMAND_OUTPUT, "{}", display);
659 }
660 if config.debug_collect_stdout {
661 stdout_collected.push_str(&display);
662 stdout_collected.push('\n');
663 }
664 }
665 Err(_) => break,
666 }
667 }
668 }
669 }
670 }
671
672 let exit_status = if let Some(ref mut c) = child {
674 let grace = if terminate_sent {
680 STDOUT_GRACE_TIME_POST_TERMINATE
681 } else {
682 STDOUT_GRACE_TIME
683 };
684 match tokio::time::timeout(grace, c.wait()).await {
685 Ok(Ok(s)) => Some(s),
686 Ok(Err(_)) => {
687 send_terminate(pid);
688 None
689 }
690 Err(_) => {
691 send_terminate(pid);
692 c.wait().await.ok()
693 }
694 }
695 } else {
696 #[cfg(windows)]
698 {
699 win32_process_handle.map(|h| {
700 use std::os::windows::process::ExitStatusExt;
701 use windows::Win32::System::Threading::{GetExitCodeProcess, WaitForSingleObject};
702 unsafe {
703 let _ = WaitForSingleObject(h, 60000);
704 let mut code = 0u32;
705 let _ = GetExitCodeProcess(h, &mut code);
706 let _ = windows::Win32::Foundation::CloseHandle(h);
707 std::process::ExitStatus::from_raw(code)
708 }
709 })
710 }
711 #[cfg(not(windows))]
712 {
713 None
714 }
715 };
716
717 let exit_code = exit_status.and_then(|s| s.code());
718 session_log!(
719 info,
720 session_id,
721 LogContent::PROCESS_CONTROL,
722 "Process exit code: {}",
723 exit_code.map_or("N/A".to_string(), |c| c.to_string())
724 );
725
726 let state = if timed_out {
727 ActionState::Timeout
728 } else if cancel_requested || cancel_token.is_cancelled() {
729 ActionState::Canceled
730 } else if saw_fail {
731 ActionState::Failed
732 } else if exit_status.is_some_and(|s| s.success()) {
733 ActionState::Success
734 } else {
735 ActionState::Failed
736 };
737
738 Ok(SubprocessResult {
739 state,
740 exit_code,
741 stdout: stdout_collected,
742 })
743}
744
745pub(crate) fn process_line(
746 line: &str,
747 filter: &mut ActionFilter,
748 session_id: &str,
749 message_tx: &mpsc::UnboundedSender<ActionMessage>,
750 saw_fail: &mut bool,
751) -> (String, bool) {
752 let (callbacks, pass_through, display) = filter.filter_message(line, session_id);
753 for cb in callbacks {
754 let cancel = cb.cancel;
755 let msg = match cb.kind {
756 ActionMessageKind::Progress => {
757 if let ActionMessageValue::Float(v) = cb.value {
758 Some(ActionMessage::Progress(v))
759 } else {
760 None
761 }
762 }
763 ActionMessageKind::Status => {
764 if let ActionMessageValue::String(s) = cb.value {
765 Some(ActionMessage::Status(s))
766 } else {
767 None
768 }
769 }
770 ActionMessageKind::Fail => {
771 if let ActionMessageValue::String(s) = cb.value {
772 *saw_fail = true;
773 Some(ActionMessage::Fail(s))
774 } else {
775 None
776 }
777 }
778 ActionMessageKind::Env => {
779 if let ActionMessageValue::EnvVar { name, value } = cb.value {
780 Some(ActionMessage::SetEnv { name, value })
781 } else {
782 None
783 }
784 }
785 ActionMessageKind::UnsetEnv => {
786 if let ActionMessageValue::String(name) = cb.value {
787 Some(ActionMessage::UnsetEnv { name })
788 } else {
789 None
790 }
791 }
792 ActionMessageKind::RedactedEnv => {
793 if let ActionMessageValue::EnvVar { name, value } = cb.value {
794 Some(ActionMessage::RedactedEnv { name, value })
795 } else {
796 None
797 }
798 }
799 _ => None,
800 };
801 if let Some(msg) = msg {
802 let _ = message_tx.send(msg);
803 }
804 if cancel {
805 let fail_msg = "Action canceled due to malformed command".to_string();
806 let _ = message_tx.send(ActionMessage::CancelMarkFailed {
807 fail_message: fail_msg,
808 });
809 }
810 }
811 (display, pass_through)
812}
813
814#[cfg(test)]
815mod tests {
816 #[allow(unused_imports)]
817 use super::*;
818
819 #[cfg(unix)]
820 #[tokio::test]
821 async fn test_cancel_ntt_with_zero_time_limit_is_immediate() {
822 use tokio_util::sync::CancellationToken;
823
824 let token = CancellationToken::new();
825 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
826 let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
827
828 let config = SubprocessConfig {
829 args: vec!["sleep".into(), "30".into()],
830 env_vars: HashMap::new(),
831 working_dir: None,
832 timeout: None,
833 user: None,
834 cancel_method: CancelMethod::NotifyThenTerminate {
835 terminate_delay: Duration::from_secs(60),
836 },
837 cancel_request_rx: Some(cancel_rx),
838 debug_collect_stdout: false,
839 };
840
841 let t = token.clone();
842 tokio::spawn(async move {
843 tokio::time::sleep(Duration::from_millis(200)).await;
844 let _ = _cancel_tx.send(Some(Duration::ZERO));
845 t.cancel();
846 });
847
848 let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
849 let start = std::time::Instant::now();
850 let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
851 .await
852 .unwrap();
853 let elapsed = start.elapsed();
854
855 assert_eq!(result.state, ActionState::Canceled);
856 assert!(
857 elapsed < Duration::from_secs(5),
858 "took {:?}, expected < 5s",
859 elapsed
860 );
861 }
862
863 #[cfg(unix)]
864 #[tokio::test]
865 async fn test_cancel_ntt_without_time_limit_uses_default() {
866 use tokio_util::sync::CancellationToken;
867
868 let token = CancellationToken::new();
869 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
870 let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
871
872 let ready_dir = tempfile::tempdir().unwrap();
883 let ready_path = ready_dir.path().join("ready");
884 let py_script = format!(
885 "import signal, time, pathlib; signal.signal(signal.SIGTERM, signal.SIG_IGN); pathlib.Path('{}').write_text('ok'); time.sleep(30)",
886 ready_path.display()
887 );
888 let config = SubprocessConfig {
889 args: vec!["python3".into(), "-c".into(), py_script],
890 env_vars: HashMap::new(),
891 working_dir: None,
892 timeout: None,
893 user: None,
894 cancel_method: CancelMethod::NotifyThenTerminate {
895 terminate_delay: Duration::from_secs(1),
896 },
897 cancel_request_rx: Some(cancel_rx),
898 debug_collect_stdout: false,
899 };
900
901 let t = token.clone();
902 let ready_path_clone = ready_path.clone();
903 tokio::spawn(async move {
904 for _ in 0..100 {
906 if ready_path_clone.exists() {
907 break;
908 }
909 tokio::time::sleep(Duration::from_millis(50)).await;
910 }
911 t.cancel();
912 });
913
914 let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
915 let start = std::time::Instant::now();
916 let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
917 .await
918 .unwrap();
919 let elapsed = start.elapsed();
920
921 assert_eq!(result.state, ActionState::Canceled);
922 assert!(
924 elapsed >= Duration::from_millis(800),
925 "took {:?}, expected >= 800ms",
926 elapsed
927 );
928 assert!(
929 elapsed < Duration::from_secs(10),
930 "took {:?}, expected < 10s",
931 elapsed
932 );
933 }
934
935 #[cfg(unix)]
936 #[tokio::test]
937 async fn test_cancel_terminate_ignores_time_limit() {
938 use tokio_util::sync::CancellationToken;
939
940 let token = CancellationToken::new();
941 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
942 let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
943
944 let config = SubprocessConfig {
945 args: vec!["sleep".into(), "30".into()],
946 env_vars: HashMap::new(),
947 working_dir: None,
948 timeout: None,
949 user: None,
950 cancel_method: CancelMethod::Terminate,
951 cancel_request_rx: Some(cancel_rx),
952 debug_collect_stdout: false,
953 };
954
955 let t = token.clone();
956 tokio::spawn(async move {
957 tokio::time::sleep(Duration::from_millis(200)).await;
958 let _ = _cancel_tx.send(Some(Duration::from_secs(10)));
959 t.cancel();
960 });
961
962 let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
963 let start = std::time::Instant::now();
964 let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
965 .await
966 .unwrap();
967 let elapsed = start.elapsed();
968
969 assert_eq!(result.state, ActionState::Canceled);
970 assert!(
971 elapsed < Duration::from_secs(2),
972 "took {:?}, expected < 2s",
973 elapsed
974 );
975 }
976
977 #[cfg(windows)]
978 #[tokio::test]
979 async fn test_cancel_terminate_on_windows() {
980 use tokio_util::sync::CancellationToken;
981
982 let token = CancellationToken::new();
983 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
984 let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
985
986 let config = SubprocessConfig {
988 args: vec![
989 "powershell".into(),
990 "-Command".into(),
991 "Start-Sleep 30".into(),
992 ],
993 env_vars: HashMap::new(),
994 working_dir: None,
995 timeout: None,
996 user: None,
997 cancel_method: CancelMethod::Terminate,
998 cancel_request_rx: Some(cancel_rx),
999 debug_collect_stdout: false,
1000 };
1001
1002 let t = token.clone();
1003 tokio::spawn(async move {
1004 tokio::time::sleep(Duration::from_millis(500)).await;
1005 t.cancel();
1006 });
1007
1008 let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
1009 let start = std::time::Instant::now();
1010 let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
1011 .await
1012 .unwrap();
1013 let elapsed = start.elapsed();
1014
1015 assert_eq!(result.state, ActionState::Canceled);
1016 assert!(
1017 elapsed < Duration::from_secs(5),
1018 "Cancel took {:?}, expected < 5s — process was not killed promptly",
1019 elapsed
1020 );
1021 }
1022
1023 #[test]
1024 fn test_format_command_for_log_simple() {
1025 let args = vec!["echo".to_string(), "hello".to_string(), "world".to_string()];
1026 let result = format_command_for_log(&args);
1027 assert_eq!(result, "echo hello world");
1028 }
1029
1030 #[test]
1031 fn test_format_command_for_log_with_spaces() {
1032 let args = vec!["echo".to_string(), "hello world".to_string()];
1033 let result = format_command_for_log(&args);
1034 assert!(result.contains("hello world"), "got: {result}");
1036 }
1037
1038 #[test]
1039 fn test_format_command_for_log_redacts_secret() {
1040 let args = vec![
1041 "python".to_string(),
1042 "-c".to_string(),
1043 "print('openjd_redacted_env: PASSWORD=secret123')".to_string(),
1044 ];
1045 let result = format_command_for_log(&args);
1046 assert!(!result.contains("secret123"), "secret leaked in: {result}");
1047 assert!(
1048 result.contains("openjd_redacted_env:"),
1049 "token missing in: {result}"
1050 );
1051 assert!(
1052 result.contains("********"),
1053 "redaction missing in: {result}"
1054 );
1055 }
1056
1057 #[test]
1058 fn test_format_command_for_log_no_redaction_needed() {
1059 let args = vec![
1060 "python".to_string(),
1061 "-c".to_string(),
1062 "print('hello')".to_string(),
1063 ];
1064 let result = format_command_for_log(&args);
1065 assert!(
1066 result.contains("print('hello')") || result.contains("print"),
1067 "got: {result}"
1068 );
1069 assert!(!result.contains("********"));
1070 }
1071
1072 #[test]
1075 fn test_days_to_ymd_epoch() {
1076 assert_eq!(days_to_ymd(0), (1970, 1, 1));
1077 }
1078
1079 #[test]
1080 fn test_days_to_ymd_known_date() {
1081 assert_eq!(days_to_ymd(19782), (2024, 2, 29));
1083 }
1084
1085 #[test]
1086 fn test_days_to_ymd_end_of_year() {
1087 assert_eq!(days_to_ymd(19722), (2023, 12, 31));
1089 }
1090
1091 #[test]
1092 fn test_days_to_ymd_y2k() {
1093 assert_eq!(days_to_ymd(10957), (2000, 1, 1));
1095 }
1096
1097 #[test]
1098 fn test_write_cancel_info_creates_file() {
1099 let dir = tempfile::tempdir().unwrap();
1100 write_cancel_info(dir.path(), Duration::from_secs(30));
1101 let path = dir.path().join("cancel_info.json");
1102 assert!(path.exists());
1103 let content: serde_json::Value =
1104 serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
1105 let ts = content["NotifyEnd"].as_str().unwrap();
1106 assert!(ts.ends_with('Z'), "Expected UTC timestamp, got: {ts}");
1107 assert!(ts.contains('T'), "Expected ISO 8601, got: {ts}");
1108 }
1109
1110 #[test]
1111 fn test_process_line_plain_text() {
1112 let (tx, _rx) = mpsc::unbounded_channel();
1113 let mut filter = ActionFilter::new("test", true, false);
1114 let mut saw_fail = false;
1115 let (display, pass_through) =
1116 process_line("hello world", &mut filter, "test", &tx, &mut saw_fail);
1117 assert!(pass_through);
1118 assert_eq!(display, "hello world");
1119 assert!(!saw_fail);
1120 }
1121
1122 #[test]
1123 fn test_process_line_progress() {
1124 let (tx, mut rx) = mpsc::unbounded_channel();
1125 let mut filter = ActionFilter::new("test", true, false);
1126 let mut saw_fail = false;
1127 let (_display, _pass_through) = process_line(
1128 "openjd_progress: 0.5",
1129 &mut filter,
1130 "test",
1131 &tx,
1132 &mut saw_fail,
1133 );
1134 assert!(!saw_fail);
1135 match rx.try_recv().unwrap() {
1136 ActionMessage::Progress(v) => assert!((v - 0.5).abs() < f64::EPSILON),
1137 other => panic!("Expected Progress, got: {other:?}"),
1138 }
1139 }
1140
1141 #[test]
1142 fn test_process_line_status() {
1143 let (tx, mut rx) = mpsc::unbounded_channel();
1144 let mut filter = ActionFilter::new("test", true, false);
1145 let mut saw_fail = false;
1146 process_line(
1147 "openjd_status: rendering frame 42",
1148 &mut filter,
1149 "test",
1150 &tx,
1151 &mut saw_fail,
1152 );
1153 assert!(!saw_fail);
1154 match rx.try_recv().unwrap() {
1155 ActionMessage::Status(s) => assert_eq!(s, "rendering frame 42"),
1156 other => panic!("Expected Status, got: {other:?}"),
1157 }
1158 }
1159
1160 #[test]
1161 fn test_process_line_fail() {
1162 let (tx, mut rx) = mpsc::unbounded_channel();
1163 let mut filter = ActionFilter::new("test", true, false);
1164 let mut saw_fail = false;
1165 process_line(
1166 "openjd_fail: out of memory",
1167 &mut filter,
1168 "test",
1169 &tx,
1170 &mut saw_fail,
1171 );
1172 assert!(saw_fail);
1173 match rx.try_recv().unwrap() {
1174 ActionMessage::Fail(s) => assert_eq!(s, "out of memory"),
1175 other => panic!("Expected Fail, got: {other:?}"),
1176 }
1177 }
1178
1179 #[test]
1180 fn test_process_line_env() {
1181 let (tx, mut rx) = mpsc::unbounded_channel();
1182 let mut filter = ActionFilter::new("test", true, false);
1183 let mut saw_fail = false;
1184 process_line(
1185 "openjd_env: MY_VAR=my_value",
1186 &mut filter,
1187 "test",
1188 &tx,
1189 &mut saw_fail,
1190 );
1191 match rx.try_recv().unwrap() {
1192 ActionMessage::SetEnv { name, value } => {
1193 assert_eq!(name, "MY_VAR");
1194 assert_eq!(value, "my_value");
1195 }
1196 other => panic!("Expected SetEnv, got: {other:?}"),
1197 }
1198 }
1199
1200 #[test]
1201 fn test_process_line_unset_env() {
1202 let (tx, mut rx) = mpsc::unbounded_channel();
1203 let mut filter = ActionFilter::new("test", true, false);
1204 let mut saw_fail = false;
1205 process_line(
1206 "openjd_unset_env: MY_VAR",
1207 &mut filter,
1208 "test",
1209 &tx,
1210 &mut saw_fail,
1211 );
1212 match rx.try_recv().unwrap() {
1213 ActionMessage::UnsetEnv { name } => assert_eq!(name, "MY_VAR"),
1214 other => panic!("Expected UnsetEnv, got: {other:?}"),
1215 }
1216 }
1217
1218 #[test]
1219 fn test_process_line_redacted_env() {
1220 let (tx, mut rx) = mpsc::unbounded_channel();
1221 let mut filter = ActionFilter::new("test", true, false);
1222 let mut saw_fail = false;
1223 process_line(
1224 "openjd_redacted_env: SECRET=hunter2",
1225 &mut filter,
1226 "test",
1227 &tx,
1228 &mut saw_fail,
1229 );
1230 match rx.try_recv().unwrap() {
1231 ActionMessage::RedactedEnv { name, value } => {
1232 assert_eq!(name, "SECRET");
1233 assert_eq!(value, "hunter2");
1234 }
1235 other => panic!("Expected RedactedEnv, got: {other:?}"),
1236 }
1237 }
1238
1239 #[cfg(unix)]
1242 fn run_simple(args: Vec<String>) -> (SubprocessResult, Vec<ActionMessage>) {
1243 run_with_config(SubprocessConfig {
1244 args,
1245 env_vars: HashMap::new(),
1246 working_dir: None,
1247 timeout: None,
1248 user: None,
1249 cancel_method: CancelMethod::Terminate,
1250 cancel_request_rx: None,
1251 debug_collect_stdout: true,
1252 })
1253 }
1254
1255 #[cfg(unix)]
1256 fn run_with_config(config: SubprocessConfig) -> (SubprocessResult, Vec<ActionMessage>) {
1257 let rt = tokio::runtime::Builder::new_current_thread()
1258 .enable_all()
1259 .build()
1260 .unwrap();
1261 rt.block_on(async {
1262 let (msg_tx, mut msg_rx) = mpsc::unbounded_channel();
1263 let mut filter = ActionFilter::new("test", true, false);
1264 let token = CancellationToken::new();
1265 let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
1266 .await
1267 .unwrap();
1268 let mut msgs = Vec::new();
1269 while let Ok(m) = msg_rx.try_recv() {
1270 msgs.push(m);
1271 }
1272 (result, msgs)
1273 })
1274 }
1275
1276 #[cfg(unix)]
1277 #[test]
1278 fn test_run_subprocess_success() {
1279 let (r, _) = run_simple(vec!["echo".into(), "hello".into()]);
1280 assert_eq!(r.state, ActionState::Success);
1281 assert_eq!(r.exit_code, Some(0));
1282 assert!(r.stdout.contains("hello"), "stdout: {}", r.stdout);
1283 }
1284
1285 #[cfg(unix)]
1286 #[test]
1287 fn test_run_subprocess_failure_exit_code() {
1288 let (r, _) = run_simple(vec!["sh".into(), "-c".into(), "exit 42".into()]);
1289 assert_eq!(r.state, ActionState::Failed);
1290 assert_eq!(r.exit_code, Some(42));
1291 }
1292
1293 #[cfg(unix)]
1294 #[test]
1295 fn test_run_subprocess_command_not_found() {
1296 let rt = tokio::runtime::Builder::new_current_thread()
1297 .enable_all()
1298 .build()
1299 .unwrap();
1300 let err = rt.block_on(async {
1301 let (msg_tx, _) = mpsc::unbounded_channel();
1302 let mut filter = ActionFilter::new("test", true, false);
1303 let token = CancellationToken::new();
1304 let config = SubprocessConfig {
1305 args: vec!["/nonexistent/binary_xyz".into()],
1306 env_vars: HashMap::new(),
1307 working_dir: None,
1308 timeout: None,
1309 user: None,
1310 cancel_method: CancelMethod::Terminate,
1311 cancel_request_rx: None,
1312 debug_collect_stdout: false,
1313 };
1314 run_subprocess(config, &mut filter, "test", msg_tx, token).await
1315 });
1316 assert!(err.is_err());
1317 let msg = err.unwrap_err().to_string();
1318 assert!(msg.contains("/nonexistent/binary_xyz"), "error: {msg}");
1319 }
1320
1321 #[cfg(unix)]
1322 #[test]
1323 fn test_run_subprocess_empty_args() {
1324 let rt = tokio::runtime::Builder::new_current_thread()
1325 .enable_all()
1326 .build()
1327 .unwrap();
1328 let err = rt.block_on(async {
1329 let (msg_tx, _) = mpsc::unbounded_channel();
1330 let mut filter = ActionFilter::new("test", true, false);
1331 let token = CancellationToken::new();
1332 let config = SubprocessConfig {
1333 args: vec![],
1334 env_vars: HashMap::new(),
1335 working_dir: None,
1336 timeout: None,
1337 user: None,
1338 cancel_method: CancelMethod::Terminate,
1339 cancel_request_rx: None,
1340 debug_collect_stdout: false,
1341 };
1342 run_subprocess(config, &mut filter, "test", msg_tx, token).await
1343 });
1344 assert!(err.is_err());
1345 assert!(
1346 err.unwrap_err().to_string().contains("No command"),
1347 "expected empty args error"
1348 );
1349 }
1350
1351 #[cfg(unix)]
1352 #[test]
1353 fn test_run_subprocess_timeout() {
1354 let rt = tokio::runtime::Builder::new_current_thread()
1355 .enable_all()
1356 .build()
1357 .unwrap();
1358 let (r, _) = rt.block_on(async {
1359 let (msg_tx, mut msg_rx) = mpsc::unbounded_channel();
1360 let mut filter = ActionFilter::new("test", true, false);
1361 let token = CancellationToken::new();
1362 let config = SubprocessConfig {
1363 args: vec!["sleep".into(), "30".into()],
1364 env_vars: HashMap::new(),
1365 working_dir: None,
1366 timeout: Some(Duration::from_millis(500)),
1367 user: None,
1368 cancel_method: CancelMethod::Terminate,
1369 cancel_request_rx: None,
1370 debug_collect_stdout: false,
1371 };
1372 let r = run_subprocess(config, &mut filter, "test", msg_tx, token)
1373 .await
1374 .unwrap();
1375 let mut msgs = Vec::new();
1376 while let Ok(m) = msg_rx.try_recv() {
1377 msgs.push(m);
1378 }
1379 (r, msgs)
1380 });
1381 assert_eq!(r.state, ActionState::Timeout);
1382 }
1383
1384 #[cfg(unix)]
1385 #[test]
1386 fn test_run_subprocess_timeout_drains_stdout() {
1387 let rt = tokio::runtime::Builder::new_current_thread()
1388 .enable_all()
1389 .build()
1390 .unwrap();
1391 let (r, _) = rt.block_on(async {
1392 let (msg_tx, mut msg_rx) = mpsc::unbounded_channel();
1393 let mut filter = ActionFilter::new("test", true, false);
1394 let token = CancellationToken::new();
1395 let config = SubprocessConfig {
1396 args: vec![
1397 "sh".into(),
1398 "-c".into(),
1399 "echo before_timeout; sleep 30".into(),
1400 ],
1401 env_vars: HashMap::new(),
1402 working_dir: None,
1403 timeout: Some(Duration::from_millis(500)),
1404 user: None,
1405 cancel_method: CancelMethod::Terminate,
1406 cancel_request_rx: None,
1407 debug_collect_stdout: true,
1408 };
1409 let r = run_subprocess(config, &mut filter, "test", msg_tx, token)
1410 .await
1411 .unwrap();
1412 let mut msgs = Vec::new();
1413 while let Ok(m) = msg_rx.try_recv() {
1414 msgs.push(m);
1415 }
1416 (r, msgs)
1417 });
1418 assert_eq!(r.state, ActionState::Timeout);
1419 assert!(
1420 r.stdout.contains("before_timeout"),
1421 "output before timeout should be captured: {:?}",
1422 r.stdout
1423 );
1424 }
1425
1426 #[cfg(unix)]
1427 #[test]
1428 fn test_run_subprocess_env_vars() {
1429 let mut env = HashMap::new();
1430 env.insert("OPENJD_TEST_VAR".into(), Some("test_value_42".into()));
1431 let (r, _) = run_with_config(SubprocessConfig {
1432 args: vec!["sh".into(), "-c".into(), "echo $OPENJD_TEST_VAR".into()],
1433 env_vars: env,
1434 working_dir: None,
1435 timeout: None,
1436 user: None,
1437 cancel_method: CancelMethod::Terminate,
1438 cancel_request_rx: None,
1439 debug_collect_stdout: true,
1440 });
1441 assert_eq!(r.state, ActionState::Success);
1442 assert!(r.stdout.contains("test_value_42"), "stdout: {}", r.stdout);
1443 }
1444
1445 #[cfg(unix)]
1446 #[test]
1447 fn test_run_subprocess_env_var_unset() {
1448 std::env::set_var("OPENJD_UNSET_TEST", "should_be_gone");
1450 let mut env = HashMap::new();
1451 env.insert("OPENJD_UNSET_TEST".into(), None);
1452 let (r, _) = run_with_config(SubprocessConfig {
1453 args: vec![
1454 "sh".into(),
1455 "-c".into(),
1456 "echo VAL=${OPENJD_UNSET_TEST:-UNSET}".into(),
1457 ],
1458 env_vars: env,
1459 working_dir: None,
1460 timeout: None,
1461 user: None,
1462 cancel_method: CancelMethod::Terminate,
1463 cancel_request_rx: None,
1464 debug_collect_stdout: true,
1465 });
1466 assert_eq!(r.state, ActionState::Success);
1467 assert!(r.stdout.contains("VAL=UNSET"), "stdout: {}", r.stdout);
1468 }
1469
1470 #[cfg(unix)]
1471 #[test]
1472 fn test_run_subprocess_working_dir() {
1473 let dir = tempfile::tempdir().unwrap();
1474 let (r, _) = run_with_config(SubprocessConfig {
1475 args: vec!["pwd".into()],
1476 env_vars: HashMap::new(),
1477 working_dir: Some(dir.path().to_path_buf()),
1478 timeout: None,
1479 user: None,
1480 cancel_method: CancelMethod::Terminate,
1481 cancel_request_rx: None,
1482 debug_collect_stdout: true,
1483 });
1484 assert_eq!(r.state, ActionState::Success);
1485 let expected = dir.path().canonicalize().unwrap();
1487 let actual = PathBuf::from(r.stdout.trim()).canonicalize().unwrap();
1488 assert_eq!(actual, expected);
1489 }
1490
1491 #[cfg(unix)]
1492 #[test]
1493 fn test_run_subprocess_openjd_progress() {
1494 let (r, msgs) = run_simple(vec![
1495 "sh".into(),
1496 "-c".into(),
1497 "echo 'openjd_progress: 0.75'".into(),
1498 ]);
1499 assert_eq!(r.state, ActionState::Success);
1500 assert!(
1501 msgs.iter().any(
1502 |m| matches!(m, ActionMessage::Progress(v) if (*v - 0.75).abs() < f64::EPSILON)
1503 ),
1504 "Expected Progress(0.75), got: {msgs:?}"
1505 );
1506 }
1507
1508 #[cfg(unix)]
1509 #[test]
1510 fn test_run_subprocess_openjd_status() {
1511 let (r, msgs) = run_simple(vec![
1512 "sh".into(),
1513 "-c".into(),
1514 "echo 'openjd_status: rendering'".into(),
1515 ]);
1516 assert_eq!(r.state, ActionState::Success);
1517 assert!(
1518 msgs.iter()
1519 .any(|m| matches!(m, ActionMessage::Status(s) if s == "rendering")),
1520 "Expected Status(rendering), got: {msgs:?}"
1521 );
1522 }
1523
1524 #[cfg(unix)]
1525 #[test]
1526 fn test_run_subprocess_openjd_fail_sets_failed() {
1527 let (r, msgs) = run_simple(vec![
1528 "sh".into(),
1529 "-c".into(),
1530 "echo 'openjd_fail: something broke'".into(),
1531 ]);
1532 assert_eq!(
1533 r.state,
1534 ActionState::Failed,
1535 "openjd_fail should cause Failed state even with exit 0"
1536 );
1537 assert!(
1538 msgs.iter()
1539 .any(|m| matches!(m, ActionMessage::Fail(s) if s == "something broke")),
1540 "Expected Fail message, got: {msgs:?}"
1541 );
1542 }
1543
1544 #[cfg(unix)]
1545 #[test]
1546 fn test_run_subprocess_openjd_env() {
1547 let (r, msgs) = run_simple(vec![
1548 "sh".into(),
1549 "-c".into(),
1550 "echo 'openjd_env: FOO=bar'".into(),
1551 ]);
1552 assert_eq!(r.state, ActionState::Success);
1553 assert!(msgs.iter().any(|m| matches!(m, ActionMessage::SetEnv { name, value } if name == "FOO" && value == "bar")),
1554 "Expected SetEnv, got: {msgs:?}");
1555 }
1556
1557 #[cfg(unix)]
1558 #[test]
1559 fn test_run_subprocess_stderr_merged() {
1560 let (r, _) = run_simple(vec![
1562 "sh".into(),
1563 "-c".into(),
1564 "echo stdout_line; echo stderr_line >&2".into(),
1565 ]);
1566 assert_eq!(r.state, ActionState::Success);
1567 assert!(r.stdout.contains("stdout_line"), "stdout: {}", r.stdout);
1568 assert!(
1569 r.stdout.contains("stderr_line"),
1570 "stderr should be merged into stdout: {}",
1571 r.stdout
1572 );
1573 }
1574
1575 #[cfg(unix)]
1576 #[test]
1577 fn test_run_subprocess_multiline_output() {
1578 let (r, _) = run_simple(vec![
1579 "sh".into(),
1580 "-c".into(),
1581 "echo line1; echo line2; echo line3".into(),
1582 ]);
1583 assert_eq!(r.state, ActionState::Success);
1584 assert!(r.stdout.contains("line1\n"), "stdout: {:?}", r.stdout);
1585 assert!(r.stdout.contains("line2\n"), "stdout: {:?}", r.stdout);
1586 assert!(r.stdout.contains("line3\n"), "stdout: {:?}", r.stdout);
1587 }
1588
1589 #[cfg(unix)]
1590 #[test]
1591 fn test_run_subprocess_debug_collect_stdout_false_by_default() {
1592 let (r, _) = run_simple(vec!["echo".into(), "hello".into()]);
1593 assert!(r.stdout.contains("hello"));
1595
1596 let rt = tokio::runtime::Builder::new_current_thread()
1598 .enable_all()
1599 .build()
1600 .unwrap();
1601 let r = rt.block_on(async {
1602 let (msg_tx, _) = mpsc::unbounded_channel();
1603 let mut filter = ActionFilter::new("test", true, false);
1604 let token = CancellationToken::new();
1605 let config = SubprocessConfig {
1606 args: vec!["echo".into(), "hello".into()],
1607 env_vars: HashMap::new(),
1608 working_dir: None,
1609 timeout: None,
1610 user: None,
1611 cancel_method: CancelMethod::Terminate,
1612 cancel_request_rx: None,
1613 debug_collect_stdout: false,
1614 };
1615 run_subprocess(config, &mut filter, "test", msg_tx, token)
1616 .await
1617 .unwrap()
1618 });
1619 assert_eq!(r.state, ActionState::Success);
1620 assert!(
1621 r.stdout.is_empty(),
1622 "stdout should be empty when debug_collect_stdout is false: {:?}",
1623 r.stdout
1624 );
1625 }
1626
1627 #[test]
1628 fn test_truncate_line_multibyte_boundary() {
1629 let s = "€".repeat(LOG_LINE_MAX_LENGTH); let truncated = truncate_line(&s);
1633 assert!(truncated.len() <= LOG_LINE_MAX_LENGTH);
1634 assert!(truncated.chars().count() > 0);
1636 }
1637
1638 #[test]
1639 fn test_truncate_line_short_line_unchanged() {
1640 let s = "hello";
1641 assert_eq!(truncate_line(s), "hello");
1642 }
1643
1644 #[cfg(unix)]
1645 #[test]
1646 fn test_run_subprocess_invalid_utf8_continues() {
1647 let (r, _) = run_simple(vec![
1650 "sh".into(),
1651 "-c".into(),
1652 r#"echo before; printf '\xff\n'; echo after"#.into(),
1653 ]);
1654 assert_eq!(r.state, ActionState::Success);
1655 assert!(
1656 r.stdout.contains("before"),
1657 "line before invalid UTF-8 should be captured: {:?}",
1658 r.stdout
1659 );
1660 assert!(
1661 r.stdout.contains("after"),
1662 "line after invalid UTF-8 should be captured: {:?}",
1663 r.stdout
1664 );
1665 }
1666
1667 #[cfg(unix)]
1668 #[test]
1669 fn test_run_subprocess_progress_error_in_stdout() {
1670 let (r, _) = run_simple(vec![
1671 "sh".into(),
1672 "-c".into(),
1673 "echo 'openjd_progress: 200.0'".into(),
1674 ]);
1675 assert!(
1676 r.stdout.contains("ERROR"),
1677 "out-of-range progress error should appear in stdout: {:?}",
1678 r.stdout
1679 );
1680 }
1681
1682 #[cfg(unix)]
1691 #[tokio::test]
1692 async fn test_cancel_token_set_but_process_killed_externally() {
1693 use tokio_util::sync::CancellationToken;
1694
1695 let token = CancellationToken::new();
1696 let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
1697
1698 let config = SubprocessConfig {
1700 args: vec!["sh".into(), "-c".into(), "exit 42".into()],
1701 env_vars: HashMap::new(),
1702 working_dir: None,
1703 timeout: None,
1704 user: None,
1705 cancel_method: CancelMethod::Terminate,
1706 cancel_request_rx: None,
1707 debug_collect_stdout: false,
1708 };
1709
1710 let token_clone = token.clone();
1712 std::thread::spawn(move || {
1713 token_clone.cancel();
1714 });
1715
1716 tokio::time::sleep(Duration::from_millis(1)).await;
1718
1719 let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
1720 let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
1721 .await
1722 .unwrap();
1723
1724 assert_eq!(
1727 result.state,
1728 ActionState::Canceled,
1729 "Non-zero exit with cancelled token should be Canceled, not {:?}",
1730 result.state
1731 );
1732 }
1733}