use std::collections::BTreeMap;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, SystemTime};
use harn_vm::VmValue;
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use crate::error::HostlibError;
use crate::process::{self as process_handle, ProcessError, SpawnSpec};
use crate::tools::response::ResponseBuilder;
mod artifacts;
pub(crate) use self::artifacts::{persist_artifacts, planned_artifact_paths, resolve_output_path};
static COMMAND_COUNTER: AtomicU64 = AtomicU64::new(1);
const DEFAULT_MAX_INLINE_BYTES: usize = 50_000;
#[derive(Debug, Clone)]
pub(crate) struct SpawnRequest {
pub(crate) builtin: &'static str,
pub(crate) program: String,
pub(crate) args: Vec<String>,
pub(crate) cwd: Option<PathBuf>,
pub(crate) env: BTreeMap<String, String>,
pub(crate) env_mode: EnvMode,
pub(crate) stdin: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) capture: CaptureConfig,
}
#[derive(Debug, Clone)]
pub(crate) struct SpawnOutcome {
pub(crate) command_id: String,
pub(crate) status: CommandStatus,
pub(crate) pid: Option<u32>,
pub(crate) process_group_id: Option<u32>,
pub(crate) started_at: String,
pub(crate) ended_at: Option<String>,
pub(crate) exit_code: i32,
pub(crate) signal: Option<String>,
pub(crate) stdout: String,
pub(crate) stderr: String,
pub(crate) output_path: PathBuf,
pub(crate) stdout_path: PathBuf,
pub(crate) stderr_path: PathBuf,
pub(crate) line_count: u64,
pub(crate) byte_count: u64,
pub(crate) output_sha256: String,
pub(crate) duration: Duration,
pub(crate) timed_out: bool,
}
pub(crate) use crate::process::EnvMode;
#[derive(Debug, Clone, Copy)]
pub(crate) struct CaptureConfig {
pub(crate) stdout: bool,
pub(crate) stderr: bool,
pub(crate) merge_stderr: bool,
pub(crate) max_inline_bytes: usize,
}
impl Default for CaptureConfig {
fn default() -> Self {
Self {
stdout: true,
stderr: true,
merge_stderr: false,
max_inline_bytes: DEFAULT_MAX_INLINE_BYTES,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum CommandStatus {
Completed,
Running,
TimedOut,
Killed,
}
impl CommandStatus {
pub(crate) fn as_str(self) -> &'static str {
match self {
CommandStatus::Completed => "completed",
CommandStatus::Running => "running",
CommandStatus::TimedOut => "timed_out",
CommandStatus::Killed => "killed",
}
}
}
pub(crate) fn run(req: SpawnRequest) -> Result<SpawnOutcome, HostlibError> {
let started = std::time::Instant::now();
let started_at = now_rfc3339();
let command_id = next_command_id();
let spec = SpawnSpec {
builtin: req.builtin,
program: req.program.clone(),
args: req.args.clone(),
cwd: req.cwd.clone(),
env: req.env.clone(),
env_mode: req.env_mode,
use_stdin: req.stdin.is_some(),
configure_process_group: false,
};
let mut handle = process_handle::spawn_process(spec)
.map_err(|e| process_error_to_hostlib(req.builtin, e))?;
let pid = handle.pid();
let process_group_id = handle.process_group_id();
if let Some(stdin_data) = req.stdin.as_ref() {
if let Some(mut stdin) = handle.take_stdin() {
use std::io::Write;
let _ = stdin.write_all(stdin_data.as_bytes());
}
}
let stdout_reader = handle.take_stdout();
let stderr_reader = handle.take_stderr();
let (out_tx, out_rx) = mpsc::channel::<Vec<u8>>();
let (err_tx, err_rx) = mpsc::channel::<Vec<u8>>();
let stdout_thread = stdout_reader.map(|mut reader| {
thread::spawn(move || {
let mut buf = Vec::new();
let _ = reader.read_to_end(&mut buf);
let _ = out_tx.send(buf);
})
});
let stderr_thread = stderr_reader.map(|mut reader| {
thread::spawn(move || {
let mut buf = Vec::new();
let _ = reader.read_to_end(&mut buf);
let _ = err_tx.send(buf);
})
});
let (status, timed_out): (Option<process_handle::ExitStatus>, bool) =
handle.wait_with_timeout(req.timeout).unwrap_or_default();
if let Some(t) = stdout_thread {
let _ = t.join();
}
if let Some(t) = stderr_thread {
let _ = t.join();
}
let stdout_bytes: Vec<u8> = out_rx.try_iter().flatten().collect();
let stderr_bytes: Vec<u8> = err_rx.try_iter().flatten().collect();
let ended_at = Some(now_rfc3339());
let exited = status.is_some();
let (exit_code, signal) = match status {
Some(s) => decode_status(s),
None => (-1, Some("SIGKILL".to_string())),
};
let command_status = if timed_out {
CommandStatus::TimedOut
} else if exited {
CommandStatus::Completed
} else {
CommandStatus::Killed
};
let artifacts = persist_artifacts(&command_id, &stdout_bytes, &stderr_bytes, None)?;
let (stdout, stderr) = inline_output(&stdout_bytes, &stderr_bytes, req.capture);
Ok(SpawnOutcome {
command_id,
status: command_status,
pid,
process_group_id,
started_at,
ended_at,
exit_code,
signal,
stdout,
stderr,
output_path: artifacts.output_path,
stdout_path: artifacts.stdout_path,
stderr_path: artifacts.stderr_path,
line_count: artifacts.line_count,
byte_count: artifacts.byte_count,
output_sha256: artifacts.output_sha256,
duration: started.elapsed(),
timed_out,
})
}
pub(crate) fn process_error_to_hostlib(builtin: &'static str, err: ProcessError) -> HostlibError {
match err {
ProcessError::InvalidArgv(message) => HostlibError::InvalidParameter {
builtin,
param: "argv",
message,
},
ProcessError::SandboxSetup(message) => HostlibError::Backend {
builtin,
message: format!("sandbox setup failed: {message}"),
},
ProcessError::SandboxCwd(message) => HostlibError::Backend {
builtin,
message: format!("sandbox cwd rejected: {message}"),
},
ProcessError::SandboxSpawn(message) => HostlibError::Backend {
builtin,
message: format!("sandbox rejected spawn: {message}"),
},
ProcessError::Spawn(message) => HostlibError::Backend {
builtin,
message: format!("spawn failed: {message}"),
},
}
}
pub(crate) fn build_response(
outcome: SpawnOutcome,
handle_id: Option<String>,
policy_context: Option<BTreeMap<String, VmValue>>,
) -> VmValue {
let mut builder = ResponseBuilder::new()
.str("command_id", outcome.command_id.clone())
.str("status", outcome.status.as_str())
.int("duration_ms", outcome.duration.as_millis() as i64)
.int("exit_code", outcome.exit_code as i64)
.opt_str("signal", outcome.signal)
.bool("timed_out", outcome.timed_out)
.str("stdout", outcome.stdout)
.str("stderr", outcome.stderr)
.str("output_path", outcome.output_path.display().to_string())
.str("stdout_path", outcome.stdout_path.display().to_string())
.str("stderr_path", outcome.stderr_path.display().to_string())
.int("line_count", outcome.line_count as i64)
.int("byte_count", outcome.byte_count as i64)
.str("output_sha256", outcome.output_sha256)
.str("started_at", outcome.started_at)
.str("audit_id", format!("audit_{}", outcome.command_id));
builder = match outcome.ended_at {
Some(ended_at) => builder.str("ended_at", ended_at),
None => builder.nil("ended_at"),
};
builder = match outcome.pid {
Some(pid) => builder.int("pid", pid as i64),
None => builder.nil("pid"),
};
builder = match outcome.process_group_id {
Some(pgid) => builder.int("process_group_id", pgid as i64),
None => builder.nil("process_group_id"),
};
builder = match handle_id {
Some(handle_id) => builder.str("handle_id", handle_id),
None => builder.nil("handle_id"),
};
let mut sandbox = BTreeMap::new();
sandbox.insert(
"kind".to_string(),
VmValue::String(Rc::from(sandbox_kind())),
);
sandbox.insert("enforced".to_string(), VmValue::Bool(sandbox_enforced()));
builder = builder.dict("sandbox", sandbox);
if let Some(policy_context) = policy_context {
builder = builder.dict("policy_context", policy_context);
}
builder.build()
}
pub(crate) fn running_response(
command_id: String,
handle_id: String,
pid: u32,
process_group_id: Option<u32>,
started_at: String,
command_display: String,
) -> VmValue {
let artifacts = planned_artifact_paths(&command_id);
let mut sandbox = BTreeMap::new();
sandbox.insert(
"kind".to_string(),
VmValue::String(Rc::from(sandbox_kind())),
);
sandbox.insert("enforced".to_string(), VmValue::Bool(sandbox_enforced()));
ResponseBuilder::new()
.str("command_id", command_id.clone())
.str("status", CommandStatus::Running.as_str())
.int("pid", pid as i64)
.int("process_group_id", process_group_id.unwrap_or(pid) as i64)
.str("handle_id", handle_id)
.str("started_at", started_at)
.nil("ended_at")
.int("duration_ms", 0)
.nil("exit_code")
.nil("signal")
.bool("timed_out", false)
.str("stdout", "")
.str("stderr", "")
.str("output_path", artifacts.output_path.display().to_string())
.str("stdout_path", artifacts.stdout_path.display().to_string())
.str("stderr_path", artifacts.stderr_path.display().to_string())
.int("line_count", 0)
.int("byte_count", 0)
.str("output_sha256", "")
.dict("sandbox", sandbox)
.str("audit_id", format!("audit_{command_id}"))
.str("command", command_display.clone())
.str("command_or_op_descriptor", command_display)
.build()
}
pub(crate) fn next_command_id() -> String {
let id = COMMAND_COUNTER.fetch_add(1, Ordering::SeqCst);
let now_nanos = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or(0);
format!("cmd_{}_{}_{}", std::process::id(), now_nanos, id)
}
pub(crate) fn now_rfc3339() -> String {
let now: OffsetDateTime = SystemTime::now().into();
now.format(&Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
}
pub(crate) fn inline_output(
stdout: &[u8],
stderr: &[u8],
capture: CaptureConfig,
) -> (String, String) {
if capture.merge_stderr {
let mut merged = Vec::with_capacity(stdout.len() + stderr.len() + 1);
merged.extend_from_slice(stdout);
if !stdout.is_empty() && !stdout.ends_with(b"\n") && !stderr.is_empty() {
merged.push(b'\n');
}
merged.extend_from_slice(stderr);
return (
if capture.stdout {
lossy_prefix(&merged, capture.max_inline_bytes)
} else {
String::new()
},
String::new(),
);
}
(
if capture.stdout {
lossy_prefix(stdout, capture.max_inline_bytes)
} else {
String::new()
},
if capture.stderr {
lossy_prefix(stderr, capture.max_inline_bytes)
} else {
String::new()
},
)
}
fn lossy_prefix(bytes: &[u8], max_inline_bytes: usize) -> String {
let cap = bytes.len().min(max_inline_bytes);
match std::str::from_utf8(&bytes[..cap]) {
Ok(text) => text.to_string(),
Err(error) => String::from_utf8_lossy(&bytes[..error.valid_up_to()]).into_owned(),
}
}
fn sandbox_kind() -> &'static str {
if cfg!(target_os = "linux") {
"landlock"
} else if cfg!(target_os = "macos") {
"sandbox-exec"
} else if cfg!(target_os = "windows") {
"appcontainer"
} else {
"none"
}
}
fn sandbox_enforced() -> bool {
harn_vm::orchestration::current_execution_policy().is_some()
}
fn decode_status(status: process_handle::ExitStatus) -> (i32, Option<String>) {
if let Some(code) = status.code {
(code, None)
} else if let Some(sig) = status.signal {
(-1, Some(format_signal(sig)))
} else {
(-1, None)
}
}
fn format_signal(sig: i32) -> String {
match sig {
1 => "SIGHUP".into(),
2 => "SIGINT".into(),
3 => "SIGQUIT".into(),
6 => "SIGABRT".into(),
9 => "SIGKILL".into(),
13 => "SIGPIPE".into(),
14 => "SIGALRM".into(),
15 => "SIGTERM".into(),
24 => "SIGXCPU".into(),
25 => "SIGXFSZ".into(),
other => format!("SIG{other}"),
}
}
pub(crate) fn parse_cwd(
builtin: &'static str,
raw: Option<&str>,
) -> Result<Option<PathBuf>, HostlibError> {
let Some(raw) = raw else { return Ok(None) };
if raw.is_empty() {
return Ok(None);
}
let path = Path::new(raw);
if !path.is_dir() {
return Err(HostlibError::InvalidParameter {
builtin,
param: "cwd",
message: format!("not an existing directory: {raw}"),
});
}
let canonical = path
.canonicalize()
.map_err(|error| HostlibError::InvalidParameter {
builtin,
param: "cwd",
message: format!("failed to canonicalize cwd `{raw}`: {error}"),
})?;
Ok(Some(canonical))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn inline_output_does_not_split_utf8_codepoint() {
let (stdout, stderr) = inline_output(
"alpha 🚀 beta".as_bytes(),
&[],
CaptureConfig {
max_inline_bytes: b"alpha \xF0\x9F".len(),
..CaptureConfig::default()
},
);
assert_eq!(stdout, "alpha ");
assert_eq!(stderr, "");
}
#[test]
fn parse_cwd_returns_canonical_directory() {
let temp = tempfile::tempdir().unwrap();
let nested = temp.path().join("a").join("..");
std::fs::create_dir_all(temp.path().join("a")).unwrap();
let parsed = parse_cwd("test", Some(nested.to_str().unwrap()))
.unwrap()
.unwrap();
assert_eq!(parsed, temp.path().canonicalize().unwrap());
}
}