use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use process_wrap::tokio::{KillOnDrop, TokioChildWrapper, TokioCommandWrap};
use tokio::io::{AsyncRead, BufReader};
use tokio::sync::oneshot;
use tracing::{debug, info, warn};
use crate::error::{OlError, OL_4301_PROCESS_SPAWN_FAILED, OL_4305_ORPHAN_RECONCILE_FAILED};
use crate::runtime::supervisor::spec::ProcessSpec;
const MAX_LINE_BYTES: usize = 16 * 1024;
const SECRET_NAME_SUBSTRINGS: &[&str] = &["secret", "token", "password", "apikey", "api_key"];
pub fn runtime_dir() -> Result<PathBuf, OlError> {
let dir = crate::config::provider_dir().join("runtime");
std::fs::create_dir_all(&dir).map_err(|e| {
OlError::new(
OL_4305_ORPHAN_RECONCILE_FAILED,
format!("create {}: {e}", dir.display()),
)
})?;
Ok(dir)
}
pub fn pid_file_for(binding_id: &str) -> Result<PathBuf, OlError> {
Ok(runtime_dir()?.join(format!("{binding_id}.pid")))
}
pub fn logs_dir() -> Result<PathBuf, OlError> {
let dir = crate::config::provider_dir().join("logs");
std::fs::create_dir_all(&dir).map_err(|e| {
OlError::new(
OL_4305_ORPHAN_RECONCILE_FAILED,
format!("create {}: {e}", dir.display()),
)
})?;
Ok(dir)
}
pub fn tool_log_path(binding_id: &str) -> Result<PathBuf, OlError> {
Ok(logs_dir()?.join(format!("tool-{binding_id}.jsonl")))
}
fn is_blocked_env(name: &str) -> bool {
if name.starts_with("OPENLATCH_") {
return true;
}
let lower = name.to_ascii_lowercase();
SECRET_NAME_SUBSTRINGS.iter().any(|p| lower.contains(p))
}
fn child_env(spec_env: &HashMap<String, String>) -> HashMap<String, String> {
let mut out: HashMap<String, String> = std::env::vars()
.filter(|(k, _)| !is_blocked_env(k))
.collect();
for (k, v) in spec_env {
out.insert(k.clone(), v.clone());
}
out
}
pub struct ManagedChild {
pub binding_id: String,
pub tool_slug: String,
pub pid: Option<u32>,
pub log_path: PathBuf,
wrap: Box<dyn TokioChildWrapper>,
pid_file: PathBuf,
stdout_done: Option<oneshot::Receiver<()>>,
stderr_done: Option<oneshot::Receiver<()>>,
}
impl std::fmt::Debug for ManagedChild {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ManagedChild")
.field("binding_id", &self.binding_id)
.field("tool_slug", &self.tool_slug)
.field("pid", &self.pid)
.field("log_path", &self.log_path)
.finish_non_exhaustive()
}
}
impl ManagedChild {
pub async fn wait(&mut self) -> Result<bool, OlError> {
let status = Box::into_pin(self.wrap.wait()).await.map_err(|e| {
OlError::new(
OL_4301_PROCESS_SPAWN_FAILED,
format!("binding `{}`: wait on child failed: {e}", self.binding_id),
)
})?;
Ok(status.code() == Some(0))
}
pub async fn shutdown(mut self, kill_timeout: std::time::Duration) {
if let Err(e) = self.wrap.start_kill() {
warn!(binding_id = %self.binding_id, error = %e, "start_kill failed");
}
let wait_fut = Box::into_pin(self.wrap.wait());
let reaped = match tokio::time::timeout(kill_timeout, wait_fut).await {
Ok(Ok(_status)) => {
debug!(binding_id = %self.binding_id, "child exited within grace period");
true
}
Ok(Err(e)) => {
warn!(binding_id = %self.binding_id, error = %e, "wait after start_kill failed");
false
}
Err(_) => {
warn!(
binding_id = %self.binding_id,
kill_timeout_ms = kill_timeout.as_millis() as u64,
"grace period elapsed; KillOnDrop will reap on Drop"
);
false
}
};
if let Some(rx) = self.stdout_done.take() {
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), rx).await;
}
if let Some(rx) = self.stderr_done.take() {
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), rx).await;
}
if reaped {
let _ = std::fs::remove_file(&self.pid_file);
}
}
}
pub fn spawn_process(spec: &ProcessSpec) -> Result<ManagedChild, OlError> {
let log_path = tool_log_path(&spec.binding_id)?;
let pid_file = pid_file_for(&spec.binding_id)?;
let env = child_env(&spec.env);
let mut cmd = tokio::process::Command::new(&spec.command[0]);
cmd.args(&spec.command[1..])
.current_dir(&spec.cwd)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env_clear()
.envs(env);
let mut wrap = TokioCommandWrap::from(cmd);
wrap.wrap(KillOnDrop);
#[cfg(unix)]
{
wrap.wrap(process_wrap::tokio::ProcessSession);
}
#[cfg(windows)]
{
wrap.wrap(process_wrap::tokio::JobObject);
}
let mut child = wrap.spawn().map_err(|e| {
OlError::new(
OL_4301_PROCESS_SPAWN_FAILED,
format!(
"binding `{}`: spawn `{}` failed: {e}",
spec.binding_id, spec.command[0]
),
)
.with_suggestion(
"Check that the program exists, is executable, and that `cwd` resolves correctly.",
)
})?;
let pid = child.id();
if let Some(pid) = pid {
if let Err(e) = std::fs::write(&pid_file, pid.to_string()) {
warn!(
binding_id = %spec.binding_id,
error = %e,
pid_file = %pid_file.display(),
"PID file write failed (continuing anyway)"
);
}
}
let stdout = child.stdout().take().ok_or_else(|| {
OlError::new(
OL_4301_PROCESS_SPAWN_FAILED,
format!(
"binding `{}`: child has no stdout pipe after spawn",
spec.binding_id
),
)
})?;
let stderr = child.stderr().take().ok_or_else(|| {
OlError::new(
OL_4301_PROCESS_SPAWN_FAILED,
format!(
"binding `{}`: child has no stderr pipe after spawn",
spec.binding_id
),
)
})?;
let (stdout_done_tx, stdout_done_rx) = oneshot::channel();
let (stderr_done_tx, stderr_done_rx) = oneshot::channel();
spawn_stdio_reader(
spec.tool_slug.clone(),
spec.binding_id.clone(),
StdioStream::Stdout,
stdout,
log_path.clone(),
stdout_done_tx,
);
spawn_stdio_reader(
spec.tool_slug.clone(),
spec.binding_id.clone(),
StdioStream::Stderr,
stderr,
log_path.clone(),
stderr_done_tx,
);
info!(
binding_id = %spec.binding_id,
tool = %spec.tool_slug,
pid = ?pid,
cwd = %spec.cwd.display(),
"spawned managed tool"
);
Ok(ManagedChild {
binding_id: spec.binding_id.clone(),
tool_slug: spec.tool_slug.clone(),
pid,
log_path,
wrap: child,
pid_file,
stdout_done: Some(stdout_done_rx),
stderr_done: Some(stderr_done_rx),
})
}
#[derive(Copy, Clone)]
enum StdioStream {
Stdout,
Stderr,
}
impl StdioStream {
fn as_str(self) -> &'static str {
match self {
StdioStream::Stdout => "stdout",
StdioStream::Stderr => "stderr",
}
}
}
fn spawn_stdio_reader<R>(
tool_slug: String,
binding_id: String,
stream: StdioStream,
reader: R,
log_path: PathBuf,
done: oneshot::Sender<()>,
) where
R: AsyncRead + Unpin + Send + 'static,
{
tokio::spawn(async move {
let mut buf = BufReader::new(reader);
let target = format!("tool.{tool_slug}");
let mut file = match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
{
Ok(f) => Some(f),
Err(e) => {
warn!(
binding_id = %binding_id,
path = %log_path.display(),
error = %e,
"could not open per-tool JSONL log; continuing with tracing only"
);
None
}
};
loop {
match read_capped_line(&mut buf).await {
Ok(None) => break,
Ok(Some(line)) => emit_line(&binding_id, &target, stream, &line, file.as_mut()),
Err(e) => {
warn!(
binding_id = %binding_id,
stream = stream.as_str(),
error = %e,
"reader I/O error; closing stream"
);
break;
}
}
}
let _ = done.send(());
});
}
async fn read_capped_line<R: AsyncRead + Unpin>(
reader: &mut BufReader<R>,
) -> std::io::Result<Option<String>> {
let mut bytes: Vec<u8> = Vec::with_capacity(256);
loop {
let mut byte = [0u8; 1];
match tokio::io::AsyncReadExt::read(reader, &mut byte).await? {
0 => {
if bytes.is_empty() {
return Ok(None);
}
break;
}
_ => {
if byte[0] == b'\n' {
break;
}
if bytes.len() < MAX_LINE_BYTES {
bytes.push(byte[0]);
}
}
}
}
let truncated = bytes.len() == MAX_LINE_BYTES;
if bytes.last() == Some(&b'\r') {
bytes.pop();
}
let mut s = String::from_utf8_lossy(&bytes).into_owned();
if truncated {
s.push_str(" …<truncated>");
}
Ok(Some(s))
}
fn emit_line(
binding_id: &str,
target: &str,
stream: StdioStream,
line: &str,
file: Option<&mut std::fs::File>,
) {
match stream {
StdioStream::Stdout => {
tracing::info!(target: "openlatch_provider::tool", tool = %target, stream = "stdout", "{line}");
}
StdioStream::Stderr => {
tracing::warn!(target: "openlatch_provider::tool", tool = %target, stream = "stderr", "{line}");
}
}
if let Some(f) = file {
use std::io::Write;
let rec = serde_json::json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"binding_id": binding_id,
"stream": stream.as_str(),
"line": line,
});
if let Err(e) = writeln!(f, "{rec}") {
warn!(binding_id, error = %e, "JSONL log append failed");
}
}
}
pub fn reconcile_orphans(known_binding_ids: &[&str]) -> Result<(), OlError> {
let dir = runtime_dir()?;
for binding_id in known_binding_ids {
let pid_file = dir.join(format!("{binding_id}.pid"));
let Ok(content) = std::fs::read_to_string(&pid_file) else {
continue;
};
let trimmed = content.trim();
let pid: u32 = match trimmed.parse() {
Ok(p) => p,
Err(_) => {
warn!(
binding_id = %binding_id,
pid_file = %pid_file.display(),
content = trimmed,
"unreadable PID file; removing"
);
let _ = std::fs::remove_file(&pid_file);
continue;
}
};
if let Err(e) = kill_pid_best_effort(pid) {
warn!(
binding_id = %binding_id,
pid = pid,
error = %e,
"orphan kill failed (process may already be gone)"
);
} else {
info!(binding_id = %binding_id, pid = pid, "reaped orphan from prior daemon");
}
let _ = std::fs::remove_file(&pid_file);
}
Ok(())
}
#[cfg(unix)]
fn kill_pid_best_effort(pid: u32) -> std::io::Result<()> {
let res = unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
if res == 0 {
Ok(())
} else {
Err(std::io::Error::last_os_error())
}
}
#[cfg(windows)]
fn kill_pid_best_effort(pid: u32) -> std::io::Result<()> {
use std::ptr::null_mut;
use winapi::shared::minwindef::FALSE;
use winapi::um::handleapi::CloseHandle;
use winapi::um::processthreadsapi::{OpenProcess, TerminateProcess};
use winapi::um::winnt::PROCESS_TERMINATE;
unsafe {
let h = OpenProcess(PROCESS_TERMINATE, FALSE, pid);
if h.is_null() {
return Err(std::io::Error::last_os_error());
}
let ok = TerminateProcess(h, 1);
let _ = CloseHandle(h);
if ok == 0 {
return Err(std::io::Error::last_os_error());
}
let _ = null_mut::<()>(); Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn openlatch_prefix_is_blocked() {
assert!(is_blocked_env("OPENLATCH_TOKEN"));
assert!(is_blocked_env("OPENLATCH_PROVIDER_POSTHOG_KEY"));
assert!(is_blocked_env("OPENLATCH_anything"));
}
#[test]
fn substring_match_blocks_secret_shaped_names() {
assert!(is_blocked_env("MY_API_TOKEN"));
assert!(is_blocked_env("AWS_SECRET_ACCESS_KEY"));
assert!(is_blocked_env("DB_PASSWORD"));
assert!(is_blocked_env("apikey"));
}
#[test]
fn benign_env_passes_through() {
assert!(!is_blocked_env("PATH"));
assert!(!is_blocked_env("HOME"));
assert!(!is_blocked_env("AWS_PROFILE"));
assert!(!is_blocked_env("KUBECONFIG"));
assert!(!is_blocked_env("LANG"));
}
#[test]
fn user_declared_env_overrides_inherited() {
std::env::set_var("OPENLATCH_TEST_FAKE", "from-parent");
let mut user = HashMap::new();
user.insert("PATH_OVERRIDE".into(), "child-only".into());
let merged = child_env(&user);
assert!(!merged.contains_key("OPENLATCH_TEST_FAKE"));
assert_eq!(merged.get("PATH_OVERRIDE"), Some(&"child-only".to_string()));
std::env::remove_var("OPENLATCH_TEST_FAKE");
}
}