use std::io::Read;
use std::path::Path;
use crate::action::{ActionMessage, ActionState};
use crate::error::SessionError;
use crate::logging::LogContent;
const MAX_RESPONSE_LINE_LENGTH: usize = 128 * 1024;
use crate::session_log;
use crate::session_user::SessionUser;
pub(crate) const AUTH_TOKEN_LEN: usize = 22;
const AUTH_TOKEN_ALPHABET: &[u8; 64] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
pub(crate) fn generate_auth_token() -> Result<String, SessionError> {
let mut raw = [0u8; AUTH_TOKEN_LEN];
getrandom::fill(&mut raw).map_err(|e| {
SessionError::HelperCommunication(format!("Failed to generate auth token: {e}"))
})?;
let mut out = String::with_capacity(AUTH_TOKEN_LEN);
for b in raw {
out.push(AUTH_TOKEN_ALPHABET[(b & 0x3F) as usize] as char);
}
Ok(out)
}
fn augment_with_token(
cmd: &serde_json::Value,
auth_token: &str,
) -> Result<serde_json::Value, SessionError> {
let serde_json::Value::Object(map) = cmd else {
return Err(SessionError::HelperCommunication(format!(
"helper command must be a JSON object, got: {cmd}"
)));
};
let mut with_token = map.clone();
with_token.insert(
"token".to_string(),
serde_json::Value::String(auth_token.to_string()),
);
Ok(serde_json::Value::Object(with_token))
}
type NextResponseFuture<'a> = std::pin::Pin<
Box<
dyn std::future::Future<Output = Option<Result<serde_json::Value, SessionError>>>
+ Send
+ 'a,
>,
>;
pub(crate) trait AsyncHelperReader: Send {
fn next_response(&mut self) -> NextResponseFuture<'_>;
}
#[cfg(unix)]
pub(crate) struct UnixAsyncHelperReader {
rx: tokio::sync::mpsc::UnboundedReceiver<Result<serde_json::Value, SessionError>>,
_thread: Option<std::thread::JoinHandle<()>>,
}
#[cfg(unix)]
impl UnixAsyncHelperReader {
pub fn new(stdout: std::process::ChildStdout) -> Result<Self, SessionError> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let thread = std::thread::spawn(move || {
use std::io::BufRead;
let mut reader = std::io::BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match Read::take(&mut reader, MAX_RESPONSE_LINE_LENGTH as u64).read_line(&mut line)
{
Ok(0) => break,
Ok(n) => {
if n == MAX_RESPONSE_LINE_LENGTH && !line.ends_with('\n') {
let mut discard = Vec::new();
let _ = reader.read_until(b'\n', &mut discard);
let result = Err(SessionError::HelperCommunication(format!(
"response line exceeds {MAX_RESPONSE_LINE_LENGTH} byte limit"
)));
if tx.send(result).is_err() {
break;
}
continue;
}
let result = serde_json::from_str(line.trim_end()).map_err(|e| {
SessionError::HelperCommunication(format!("parse error: {e}"))
});
if tx.send(result).is_err() {
break;
}
}
Err(_) => break,
}
}
});
Ok(Self {
rx,
_thread: Some(thread),
})
}
}
#[cfg(unix)]
impl AsyncHelperReader for UnixAsyncHelperReader {
fn next_response(&mut self) -> NextResponseFuture<'_> {
Box::pin(async { self.rx.recv().await })
}
}
#[cfg(windows)]
pub(crate) struct WindowsAsyncHelperReader {
rx: tokio::sync::mpsc::UnboundedReceiver<Result<serde_json::Value, SessionError>>,
_thread: Option<std::thread::JoinHandle<()>>,
}
#[cfg(windows)]
impl WindowsAsyncHelperReader {
pub fn new(stdout: std::fs::File) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let thread = std::thread::spawn(move || {
use std::io::BufRead;
let mut reader = std::io::BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match Read::take(&mut reader, MAX_RESPONSE_LINE_LENGTH as u64).read_line(&mut line)
{
Ok(0) => break,
Ok(n) => {
if n == MAX_RESPONSE_LINE_LENGTH && !line.ends_with('\n') {
let mut discard = Vec::new();
let _ = reader.read_until(b'\n', &mut discard);
let result = Err(SessionError::HelperCommunication(format!(
"response line exceeds {MAX_RESPONSE_LINE_LENGTH} byte limit"
)));
if tx.send(result).is_err() {
break;
}
continue;
}
let result = serde_json::from_str(line.trim_end()).map_err(|e| {
SessionError::HelperCommunication(format!("parse error: {e}"))
});
if tx.send(result).is_err() {
break;
}
}
Err(_) => break,
}
}
});
Self {
rx,
_thread: Some(thread),
}
}
}
#[cfg(windows)]
impl AsyncHelperReader for WindowsAsyncHelperReader {
fn next_response(&mut self) -> NextResponseFuture<'_> {
Box::pin(async { self.rx.recv().await })
}
}
#[cfg(unix)]
pub(crate) struct CrossUserHelper {
child: std::process::Child,
stdin: std::io::BufWriter<std::process::ChildStdin>,
pub(crate) async_reader: UnixAsyncHelperReader,
auth_token: String,
}
#[cfg(windows)]
pub(crate) struct CrossUserHelperWin {
process_handle: windows::Win32::Foundation::HANDLE,
stdin: std::io::BufWriter<std::fs::File>,
pub(crate) async_reader: WindowsAsyncHelperReader,
auth_token: String,
}
#[cfg(windows)]
unsafe impl Send for CrossUserHelperWin {}
#[cfg(unix)]
impl CrossUserHelper {
#[cfg(unix)]
pub fn spawn(
helper_path: &Path,
user: &dyn SessionUser,
) -> Result<(Self, std::fs::File), SessionError> {
let auth_token = generate_auth_token()?;
let mut child = std::process::Command::new("sudo")
.args([
"-u",
user.user(),
"-i",
&helper_path.to_string_lossy(),
"--auth-token",
&auth_token,
])
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.spawn()
.map_err(|source| SessionError::SubprocessStart {
command: format!(
"sudo -u {} -i {} --auth-token <redacted>",
user.user(),
helper_path.display()
),
source,
})?;
let child_stdin = child.stdin.take().expect("stdin was piped");
use std::os::unix::io::{AsFd, FromRawFd, IntoRawFd};
let dup_fd = nix::unistd::dup(child_stdin.as_fd()).map_err(|e| {
SessionError::HelperCommunication(format!("Failed to dup helper stdin fd: {e}"))
})?;
let cancel_writer = unsafe { FromRawFd::from_raw_fd(dup_fd.into_raw_fd()) };
let stdin = std::io::BufWriter::new(child_stdin);
let async_reader =
UnixAsyncHelperReader::new(child.stdout.take().expect("stdout was piped"))?;
Ok((
Self {
child,
stdin,
async_reader,
auth_token,
},
cancel_writer,
))
}
pub fn auth_token(&self) -> &str {
&self.auth_token
}
pub fn send_command(&mut self, cmd: &serde_json::Value) -> Result<(), SessionError> {
use std::io::Write;
let payload = augment_with_token(cmd, &self.auth_token)?;
serde_json::to_writer(&mut self.stdin, &payload).map_err(|e| {
SessionError::HelperCommunication(format!("Failed to write to helper stdin: {e}"))
})?;
self.stdin.write_all(b"\n").map_err(|e| {
SessionError::HelperCommunication(format!("Failed to write newline to helper: {e}"))
})?;
self.stdin.flush().map_err(|e| {
SessionError::HelperCommunication(format!("Failed to flush helper stdin: {e}"))
})?;
Ok(())
}
pub fn shutdown(&mut self) {
let _ = self.send_command(&serde_json::json!({"shutdown": true}));
let _ = self.child.wait();
}
}
#[cfg(unix)]
impl Drop for CrossUserHelper {
fn drop(&mut self) {
if let Ok(None) = self.child.try_wait() {
log::warn!(target: "openjd.sessions", "CrossUserHelper dropped without shutdown(), killing child process");
let _ = self.child.kill();
let _ = self.child.wait();
}
}
}
#[cfg(windows)]
impl CrossUserHelperWin {
pub fn spawn(
helper_path: &Path,
user: &dyn SessionUser,
) -> Result<(Self, std::fs::File), SessionError> {
use crate::session_user::WindowsSessionUser;
use std::collections::HashMap;
use std::os::windows::io::FromRawHandle;
use windows::Win32::Foundation::{DuplicateHandle, DUPLICATE_SAME_ACCESS};
use windows::Win32::System::Threading::GetCurrentProcess;
let wu = user
.as_any()
.downcast_ref::<WindowsSessionUser>()
.ok_or_else(|| {
SessionError::Runtime("Cross-user on Windows requires WindowsSessionUser".into())
})?;
let auth_token = generate_auth_token()?;
let spawned = crate::win32::spawn_as_user_with_stdin(
&[
helper_path.to_string_lossy().to_string(),
"--auth-token".to_string(),
auth_token.clone(),
],
&HashMap::new(),
None, wu.password(),
wu.user(),
wu.logon_token(),
)
.map_err(|e| SessionError::SubprocessStart {
command: format!(
"spawn_as_user_with_stdin {} --auth-token <redacted>",
helper_path.display()
),
source: std::io::Error::other(e),
})?;
let stdin_write = spawned.stdin_write.ok_or_else(|| {
SessionError::HelperCommunication(
"spawn_as_user_with_stdin did not return stdin pipe".into(),
)
})?;
let stdin_file: std::fs::File = stdin_write.into();
let cancel_writer = unsafe {
use std::os::windows::io::AsRawHandle;
let current_process = GetCurrentProcess();
let src_handle = windows::Win32::Foundation::HANDLE(stdin_file.as_raw_handle());
let mut dup_handle = windows::Win32::Foundation::HANDLE::default();
DuplicateHandle(
current_process,
src_handle,
current_process,
&mut dup_handle,
0,
false,
DUPLICATE_SAME_ACCESS,
)
.map_err(|e| {
SessionError::HelperCommunication(format!(
"DuplicateHandle for cancel_writer failed: {e}"
))
})?;
std::fs::File::from_raw_handle(dup_handle.0 as std::os::windows::io::RawHandle)
};
let stdout_file: std::fs::File = spawned.stdout_read.into();
let stdin = std::io::BufWriter::new(stdin_file);
let async_reader = WindowsAsyncHelperReader::new(stdout_file);
Ok((
Self {
process_handle: spawned.process_handle,
stdin,
async_reader,
auth_token,
},
cancel_writer,
))
}
pub fn auth_token(&self) -> &str {
&self.auth_token
}
pub fn send_command(&mut self, cmd: &serde_json::Value) -> Result<(), SessionError> {
use std::io::Write;
let payload = augment_with_token(cmd, &self.auth_token)?;
serde_json::to_writer(&mut self.stdin, &payload).map_err(|e| {
SessionError::HelperCommunication(format!("Failed to write to helper stdin: {e}"))
})?;
self.stdin.write_all(b"\n").map_err(|e| {
SessionError::HelperCommunication(format!("Failed to write newline to helper: {e}"))
})?;
self.stdin.flush().map_err(|e| {
SessionError::HelperCommunication(format!("Failed to flush helper stdin: {e}"))
})?;
Ok(())
}
pub fn shutdown(&mut self) {
let _ = self.send_command(&serde_json::json!({"shutdown": true}));
unsafe {
let _ =
windows::Win32::System::Threading::WaitForSingleObject(self.process_handle, 5000);
let _ = windows::Win32::Foundation::CloseHandle(self.process_handle);
self.process_handle = windows::Win32::Foundation::INVALID_HANDLE_VALUE;
}
}
}
#[cfg(windows)]
impl Drop for CrossUserHelperWin {
fn drop(&mut self) {
if !self.process_handle.is_invalid() {
log::warn!(target: "openjd.sessions", "CrossUserHelperWin dropped without shutdown(), terminating child process");
unsafe {
let _ = windows::Win32::System::Threading::TerminateProcess(self.process_handle, 1);
let _ = windows::Win32::Foundation::CloseHandle(self.process_handle);
}
}
}
}
pub(crate) trait AsyncHelper: Send {
fn async_reader(&mut self) -> &mut dyn AsyncHelperReader;
fn send_command(&mut self, cmd: &serde_json::Value) -> Result<(), SessionError>;
fn auth_token(&self) -> &str;
}
#[cfg(unix)]
impl AsyncHelper for CrossUserHelper {
fn async_reader(&mut self) -> &mut dyn AsyncHelperReader {
&mut self.async_reader
}
fn send_command(&mut self, cmd: &serde_json::Value) -> Result<(), SessionError> {
CrossUserHelper::send_command(self, cmd)
}
fn auth_token(&self) -> &str {
CrossUserHelper::auth_token(self)
}
}
#[cfg(windows)]
impl AsyncHelper for CrossUserHelperWin {
fn async_reader(&mut self) -> &mut dyn AsyncHelperReader {
&mut self.async_reader
}
fn send_command(&mut self, cmd: &serde_json::Value) -> Result<(), SessionError> {
CrossUserHelperWin::send_command(self, cmd)
}
fn auth_token(&self) -> &str {
CrossUserHelperWin::auth_token(self)
}
}
pub(crate) async fn run_via_helper(
helper: &mut dyn AsyncHelper,
config: &crate::subprocess::SubprocessConfig,
filter: &mut crate::action_filter::ActionFilter,
session_id: &str,
message_tx: tokio::sync::mpsc::UnboundedSender<ActionMessage>,
cancel_writer: Option<&std::fs::File>,
) -> Result<crate::subprocess::SubprocessResult, SessionError> {
let env: serde_json::Map<String, serde_json::Value> = config
.env_vars
.iter()
.filter_map(|(k, v)| {
v.as_ref()
.map(|val| (k.clone(), serde_json::Value::String(val.clone())))
})
.collect();
let cmd = serde_json::json!({
"command": config.args[0],
"args": &config.args[1..],
"env": env,
"cwd": config.working_dir,
});
helper.send_command(&cmd)?;
session_log!(
info,
session_id,
LogContent::FILE_PATH | LogContent::PROCESS_CONTROL,
"Running command {}",
crate::subprocess::format_command_for_log(&config.args)
);
let timeout_fut = match config.timeout {
Some(d) => {
use futures_util::FutureExt;
tokio::time::sleep(d).boxed()
}
None => {
use futures_util::FutureExt;
futures_util::future::pending::<()>().boxed()
}
};
tokio::pin!(timeout_fut);
let mut timed_out = false;
let mut stdout_collected = String::new();
let mut saw_fail = false;
loop {
tokio::select! {
biased;
resp = helper.async_reader().next_response() => {
let resp = match resp {
None => return Err(SessionError::HelperCommunication(
"Helper process closed stdout unexpectedly".into(),
)),
Some(r) => r?,
};
if let Some(pid) = resp.get("pid").and_then(|v| v.as_i64()) {
session_log!(
info,
session_id,
LogContent::PROCESS_CONTROL,
"Command started as pid: {}",
pid
);
continue;
}
if let Some(line) = resp.get("out").and_then(|v| v.as_str()) {
let line = crate::subprocess::truncate_line(line);
let (display, pass_through) = crate::subprocess::process_line(
line,
filter,
session_id,
&message_tx,
&mut saw_fail,
);
if pass_through && filter.min_log_level() <= 20 {
session_log!(info, session_id, LogContent::COMMAND_OUTPUT, "{}", display);
}
if config.debug_collect_stdout {
stdout_collected.push_str(&display);
stdout_collected.push('\n');
}
continue;
}
if let Some(code) = resp.get("exited").and_then(|v| v.as_i64()) {
let exit_code = code as i32;
session_log!(
info,
session_id,
LogContent::PROCESS_CONTROL,
"Process exit code: {}",
exit_code
);
let canceled = config
.cancel_request_rx
.as_ref()
.is_some_and(|rx| rx.has_changed().unwrap_or(false));
let state = if canceled {
ActionState::Canceled
} else if timed_out {
ActionState::Timeout
} else if saw_fail {
ActionState::Failed
} else if exit_code == 0 {
ActionState::Success
} else {
ActionState::Failed
};
return Ok(crate::subprocess::SubprocessResult {
state,
exit_code: Some(exit_code),
stdout: stdout_collected,
});
}
if let Some(msg) = resp.get("error").and_then(|v| v.as_str()) {
return Err(SessionError::SubprocessStart {
command: config.args[0].clone(),
source: std::io::Error::other(msg.to_string()),
});
}
return Err(SessionError::HelperCommunication(format!(
"Unexpected helper response: {}",
resp
)));
}
_ = &mut timeout_fut, if !timed_out => {
timed_out = true;
if let Some(writer) = cancel_writer {
use std::io::Write;
let mut w = writer.try_clone().map_err(|e| {
SessionError::HelperCommunication(format!("Failed to clone cancel_writer: {e}"))
})?;
let token = helper.auth_token();
let cancel_method = &config.cancel_method;
let notify_period = match cancel_method {
crate::runner::CancelMethod::NotifyThenTerminate { terminate_delay } => {
terminate_delay.as_secs()
}
crate::runner::CancelMethod::Terminate => 0,
};
if notify_period == 0 {
let _ = writeln!(w, r#"{{"token":"{token}","cancel":"TERMINATE"}}"#);
} else {
let _ = writeln!(
w,
r#"{{"token":"{token}","cancel":"NOTIFY_THEN_TERMINATE","notifyPeriodInSeconds":{notify_period}}}"#
);
}
let _ = w.flush();
}
}
}
}
}
#[cfg(test)]
mod token_tests {
use super::*;
#[test]
fn token_is_22_chars_from_url_safe_alphabet() {
for _ in 0..100 {
let t = generate_auth_token().unwrap();
assert_eq!(
t.len(),
AUTH_TOKEN_LEN,
"token should be {AUTH_TOKEN_LEN} chars",
);
assert!(t.is_ascii(), "token must be ASCII");
for c in t.bytes() {
assert!(
AUTH_TOKEN_ALPHABET.contains(&c),
"token contains non-alphabet byte: {c:#x}",
);
}
}
}
#[test]
fn generated_tokens_differ() {
let a = generate_auth_token().unwrap();
let b = generate_auth_token().unwrap();
assert_ne!(a, b);
}
#[test]
fn augment_with_token_inserts_token() {
let cmd = serde_json::json!({"command": "echo", "args": ["hi"]});
let out = augment_with_token(&cmd, "T-O-K-E-N").unwrap();
assert_eq!(out.get("token").and_then(|v| v.as_str()), Some("T-O-K-E-N"),);
assert_eq!(out.get("command").and_then(|v| v.as_str()), Some("echo"));
}
#[test]
fn augment_rejects_non_object() {
let err = augment_with_token(&serde_json::json!("shutdown"), "tok").unwrap_err();
assert!(err.to_string().contains("must be a JSON object"));
}
#[test]
fn augment_overrides_caller_supplied_token() {
let cmd = serde_json::json!({"token": "bad", "command": "x"});
let out = augment_with_token(&cmd, "good").unwrap();
assert_eq!(
out.get("token").and_then(|v| v.as_str()),
Some("good"),
"augment_with_token must not let the caller override the token",
);
}
}