use std::process::Stdio;
use std::time::Duration;
use anyhow::{Context, Result};
use futures::StreamExt;
use kanade_shared::subject;
use kanade_shared::wire::{Command, RunAs, Shell};
use rand::Rng;
use tokio::io::AsyncReadExt;
use tokio::process::Command as ProcessCommand;
use tracing::{info, warn};
pub(crate) const POWERSHELL_UTF8_PRELUDE: &str = "[Console]::OutputEncoding = New-Object System.Text.UTF8Encoding $false; \
$OutputEncoding = [Console]::OutputEncoding; ";
pub(crate) fn with_powershell_utf8_prelude(user_script: &str) -> String {
format!("{POWERSHELL_UTF8_PRELUDE}{user_script}")
}
pub enum ExecOutcome {
Completed {
exit_code: i32,
stdout: String,
stderr: String,
},
Killed {
stdout: String,
stderr: String,
},
Timeout {
stdout: String,
stderr: String,
},
}
pub async fn run_command_with_kill(
client: &async_nats::Client,
cmd: &Command,
) -> Result<ExecOutcome> {
if let Some(j) = cmd.jitter_secs.filter(|&s| s > 0) {
let secs = rand::rng().random_range(0..j);
info!(
jitter_secs = j,
sleep_secs = secs,
"applying jitter before exec"
);
tokio::time::sleep(Duration::from_secs(secs)).await;
}
if !matches!(cmd.run_as, RunAs::System) {
return run_in_user_session_dispatch(client, cmd).await;
}
let ps_script;
let (program, args): (&str, Vec<&str>) = match cmd.shell {
Shell::Powershell => {
ps_script = with_powershell_utf8_prelude(&cmd.script);
(
"powershell",
vec!["-NoProfile", "-NonInteractive", "-Command", &ps_script],
)
}
Shell::Cmd => ("cmd", vec!["/C", &cmd.script]),
};
let mut builder = ProcessCommand::new(program);
builder
.args(&args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
if let Some(dir) = cmd.cwd.as_deref().filter(|s| !s.is_empty()) {
#[cfg(target_os = "windows")]
{
match crate::cwd_expand::open_self_token()
.and_then(|tok| crate::cwd_expand::expand(dir, tok.handle()))
{
Ok(expanded) => {
builder.current_dir(expanded);
}
Err(e) => {
warn!(error = %e, raw_cwd = %dir, "cwd expansion failed; using raw value");
builder.current_dir(dir);
}
}
}
#[cfg(not(target_os = "windows"))]
{
builder.current_dir(dir);
}
}
let mut child = builder
.spawn()
.with_context(|| format!("spawn {program}"))?;
let stdout_handle = child.stdout.take();
let stderr_handle = child.stderr.take();
let stdout_task = tokio::spawn(async move {
let mut buf = Vec::new();
let mut err: Option<anyhow::Error> = None;
if let Some(mut s) = stdout_handle
&& let Err(e) = s.read_to_end(&mut buf).await
{
err = Some(anyhow::Error::new(e));
}
(String::from_utf8_lossy(&buf).into_owned(), err)
});
let stderr_task = tokio::spawn(async move {
let mut buf = Vec::new();
let mut err: Option<anyhow::Error> = None;
if let Some(mut s) = stderr_handle
&& let Err(e) = s.read_to_end(&mut buf).await
{
err = Some(anyhow::Error::new(e));
}
(String::from_utf8_lossy(&buf).into_owned(), err)
});
let timeout_dur = Duration::from_secs(cmd.timeout_secs.max(1));
let inner = match &cmd.exec_id {
Some(eid) => {
let kill_subject = subject::kill(eid);
let mut kill_sub = client
.subscribe(kill_subject.clone())
.await
.with_context(|| format!("subscribe {kill_subject}"))?;
client.flush().await.ok();
info!(exec_id = %eid, subject = %kill_subject, "kill listener armed");
tokio::select! {
status = child.wait() => {
info!(exec_id = %eid, "child exited (wait arm fired)");
let s = status?;
OutcomeInner::Completed(s.code().unwrap_or(-1))
}
msg = kill_sub.next() => {
info!(exec_id = %eid, has_msg = msg.is_some(), "kill arm fired");
if let Err(e) = child.kill().await {
warn!(error = %e, "child.kill failed (process may already be dead)");
}
OutcomeInner::Killed
}
_ = tokio::time::sleep(timeout_dur) => {
info!(exec_id = %eid, "timeout arm fired");
if let Err(e) = child.kill().await {
warn!(error = %e, "child.kill on timeout failed");
}
OutcomeInner::Timeout
}
}
}
None => {
tokio::select! {
status = child.wait() => {
let s = status?;
OutcomeInner::Completed(s.code().unwrap_or(-1))
}
_ = tokio::time::sleep(timeout_dur) => {
if let Err(e) = child.kill().await {
warn!(error = %e, "child.kill on timeout failed");
}
OutcomeInner::Timeout
}
}
}
};
let (stdout, stdout_err) = stdout_task
.await
.map_err(|e| anyhow::anyhow!("stdout task join: {e}"))?;
if let Some(e) = stdout_err {
warn!(error = %e, "stdout capture failed (kept partial)");
}
let (mut stderr, stderr_err) = stderr_task
.await
.map_err(|e| anyhow::anyhow!("stderr task join: {e}"))?;
if let Some(e) = stderr_err {
warn!(error = %e, "stderr capture failed (kept partial)");
stderr.push_str(&format!("\n[agent: stderr capture failed: {e}]\n"));
}
Ok(match inner {
OutcomeInner::Completed(code) => ExecOutcome::Completed {
exit_code: code,
stdout,
stderr,
},
OutcomeInner::Killed => ExecOutcome::Killed { stdout, stderr },
OutcomeInner::Timeout => ExecOutcome::Timeout { stdout, stderr },
})
}
enum OutcomeInner {
Completed(i32),
Killed,
Timeout,
}
#[allow(clippy::needless_return)]
async fn run_in_user_session_dispatch(
client: &async_nats::Client,
cmd: &Command,
) -> Result<ExecOutcome> {
#[cfg(not(target_os = "windows"))]
{
let _ = client;
warn!(
run_as = ?cmd.run_as,
"run_as: user / system_gui is Windows-only — falling back to inherited identity",
);
return Ok(ExecOutcome::Completed {
exit_code: 0,
stdout: String::new(),
stderr: format!(
"run_as: {:?} is Windows-only; non-Windows agents skip the script.\n",
cmd.run_as
),
});
}
#[cfg(target_os = "windows")]
{
let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
let bridge = if let Some(eid) = cmd.exec_id.clone() {
let nats = client.clone();
let subject = subject::kill(&eid);
Some(tokio::spawn(async move {
match nats.subscribe(subject.clone()).await {
Ok(mut sub) => {
nats.flush().await.ok();
info!(exec_id = %eid, subject = %subject, "kill listener armed (user-session path)");
if sub.next().await.is_some() {
info!(exec_id = %eid, "kill received → forwarding to user-session waiter");
let _ = kill_tx.send(());
}
}
Err(e) => {
warn!(error = %e, %subject, "subscribe kill failed (user-session path)")
}
}
}))
} else {
None
};
let timeout = Duration::from_secs(cmd.timeout_secs.max(1));
let outcome =
crate::process_as_user::run_command_in_user_session(cmd, cmd.run_as, timeout, kill_rx)
.await;
if let Some(b) = bridge {
b.abort();
}
outcome
}
}
#[cfg(test)]
mod tests {
use tokio::io::AsyncReadExt;
async fn capture_lossy<R: tokio::io::AsyncRead + Unpin>(mut r: R) -> String {
let mut buf = Vec::new();
r.read_to_end(&mut buf).await.unwrap();
String::from_utf8_lossy(&buf).into_owned()
}
#[tokio::test]
async fn cp932_japanese_bytes_are_kept_lossy_not_dropped() {
let raw: Vec<u8> = vec![
b'{', b'"', b'k', b'"', b':', b'"', 0x82, 0xbf, 0x82, 0xc2, b'"', b'}',
];
let captured = capture_lossy(tokio::io::BufReader::new(&raw[..])).await;
assert!(captured.starts_with("{\"k\":\""), "ASCII frame preserved");
assert!(captured.ends_with("\"}"), "ASCII frame preserved");
assert!(captured.contains('\u{FFFD}'), "invalid runs marked");
}
#[tokio::test]
async fn pure_utf8_payload_round_trips() {
let raw = "こんにちは {\"ok\": true}".as_bytes().to_vec();
let captured = capture_lossy(tokio::io::BufReader::new(&raw[..])).await;
assert_eq!(captured, "こんにちは {\"ok\": true}");
}
#[tokio::test]
async fn empty_stream_yields_empty_string() {
let raw: Vec<u8> = Vec::new();
let captured = capture_lossy(tokio::io::BufReader::new(&raw[..])).await;
assert_eq!(captured, "");
}
#[test]
fn powershell_prelude_forces_utf8_output() {
let user_script = "Write-Output 'hello'";
let combined = super::with_powershell_utf8_prelude(user_script);
assert!(combined.contains("[Console]::OutputEncoding"));
assert!(combined.contains("UTF8Encoding"));
assert!(combined.ends_with(user_script));
}
#[test]
fn powershell_prelude_constant_shape() {
assert!(super::POWERSHELL_UTF8_PRELUDE.ends_with("; "));
assert!(super::POWERSHELL_UTF8_PRELUDE.contains("[Console]::OutputEncoding"));
assert!(super::POWERSHELL_UTF8_PRELUDE.contains("$OutputEncoding"));
}
}