use anyhow::{Context, Result, bail};
use async_trait::async_trait;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use tokio::process::{Child, Command};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use super::log_forwarder::{StreamKind, spawn_stream_forwarder};
use super::{BackendKind, InstanceHandle, ModuleRuntimeBackend, OopModuleConfig};
const SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
const INSTANCE_STOP_GRACE_PERIOD: Duration = Duration::from_secs(2);
const FORWARDER_DRAIN_TIMEOUT: Duration = Duration::from_millis(100);
#[cfg(unix)]
fn send_terminate_signal(child: &Child) -> bool {
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
let Some(pid) = child.id() else {
return false;
};
let Ok(pid_i32) = i32::try_from(pid) else {
tracing::warn!(
pid = pid,
"Failed to convert PID to i32, cannot send SIGTERM (PID exceeds i32::MAX: {})",
i32::MAX
);
return false;
};
kill(Pid::from_raw(pid_i32), Signal::SIGTERM).is_ok()
}
#[cfg(windows)]
fn send_terminate_signal(_child: &Child) -> bool {
false
}
async fn stop_child_with_grace(
child: &mut Child,
handle: &InstanceHandle,
grace: Duration,
context: &str,
) {
let pid = child.id();
let sent = send_terminate_signal(child);
if !sent && pid.is_some() {
tracing::debug!(
module = %handle.module,
instance_id = %handle.instance_id,
pid = ?pid,
"{context}: graceful termination not available, will force kill"
);
}
tracing::debug!(
module = %handle.module,
instance_id = %handle.instance_id,
pid = ?pid,
graceful = sent,
"{context}: sent termination signal"
);
match tokio::time::timeout(grace, child.wait()).await {
Ok(Ok(status)) => {
tracing::debug!(
module = %handle.module,
instance_id = %handle.instance_id,
status = ?status,
"{context}: process exited gracefully"
);
}
Ok(Err(e)) => {
tracing::warn!(
module = %handle.module,
instance_id = %handle.instance_id,
error = %e,
"{context}: failed to wait for process"
);
}
Err(_) => {
tracing::debug!(
module = %handle.module,
instance_id = %handle.instance_id,
"{context}: grace period expired, force killing"
);
if let Err(e) = child.kill().await {
tracing::warn!(
module = %handle.module,
instance_id = %handle.instance_id,
error = %e,
"{context}: failed to force kill"
);
}
}
}
}
async fn wait_forwarder(
handle: Option<JoinHandle<()>>,
module: &str,
instance_id: uuid::Uuid,
stream: &str,
) {
let Some(h) = handle else { return };
match tokio::time::timeout(FORWARDER_DRAIN_TIMEOUT, h).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
if e.is_panic() {
tracing::warn!(module, %instance_id, stream, error = %e, "log forwarder task panicked");
} else {
tracing::warn!(module, %instance_id, stream, error = %e, "log forwarder task cancelled");
}
}
Err(_) => {
tracing::warn!(
module,
%instance_id,
stream,
timeout_ms = FORWARDER_DRAIN_TIMEOUT.as_millis(),
"log forwarder did not finish within drain timeout",
);
}
}
}
struct LocalInstance {
handle: InstanceHandle,
child: Child,
stdout_forwarder: Option<JoinHandle<()>>,
stderr_forwarder: Option<JoinHandle<()>>,
}
type InstanceMap = HashMap<Uuid, LocalInstance>;
pub struct LocalProcessBackend {
instances: Arc<RwLock<InstanceMap>>,
cancel: CancellationToken,
}
impl LocalProcessBackend {
#[must_use]
pub fn new(cancel: CancellationToken) -> Self {
let backend = Self {
instances: Arc::new(RwLock::new(HashMap::new())),
cancel: cancel.clone(),
};
let instances = Arc::clone(&backend.instances);
tokio::spawn(async move {
cancel.cancelled().await;
tracing::info!("LocalProcessBackend: shutdown signal received, stopping all processes");
Self::shutdown_all_instances(instances).await;
});
backend
}
async fn shutdown_all_instances(instances: Arc<RwLock<InstanceMap>>) {
let mut all_instances: Vec<LocalInstance> = {
let mut guard = instances.write();
guard.drain().map(|(_, inst)| inst).collect()
};
if all_instances.is_empty() {
return;
}
tracing::info!(count = all_instances.len(), "Stopping OoP module processes");
for inst in &mut all_instances {
stop_child_with_grace(
&mut inst.child,
&inst.handle,
SHUTDOWN_GRACE_PERIOD,
"shutdown",
)
.await;
}
for inst in all_instances {
wait_forwarder(
inst.stdout_forwarder,
&inst.handle.module,
inst.handle.instance_id,
"stdout",
)
.await;
wait_forwarder(
inst.stderr_forwarder,
&inst.handle.module,
inst.handle.instance_id,
"stderr",
)
.await;
}
tracing::info!("All OoP module processes stopped");
}
}
#[async_trait]
impl ModuleRuntimeBackend for LocalProcessBackend {
async fn spawn_instance(&self, cfg: &OopModuleConfig) -> Result<InstanceHandle> {
if cfg.backend != BackendKind::LocalProcess {
bail!(
"LocalProcessBackend can only spawn LocalProcess instances, got {:?}",
cfg.backend
);
}
let binary = cfg
.binary
.as_ref()
.context("executable_path must be set for LocalProcess backend")?;
let instance_id = Uuid::now_v7();
let mut cmd = Command::new(binary);
cmd.args(&cfg.args);
cmd.envs(&cfg.env);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
if let Some(ref working_dir) = cfg.working_directory {
let path = Path::new(working_dir);
if path.exists() && path.is_dir() {
cmd.current_dir(path);
} else {
tracing::warn!(
module = %cfg.name,
working_dir = %working_dir,
"Working directory does not exist or is not a directory, using current dir"
);
}
}
let mut child = cmd
.spawn()
.with_context(|| format!("failed to spawn process: {}", binary.display()))?;
let pid = child.id();
let module_name = cfg.name.clone();
let cancel = self.cancel.clone();
let stdout_forwarder = child.stdout.take().map(|stdout| {
spawn_stream_forwarder(
stdout,
module_name.clone(),
instance_id,
cancel.clone(),
StreamKind::Stdout,
)
});
let stderr_forwarder = child.stderr.take().map(|stderr| {
spawn_stream_forwarder(
stderr,
module_name.clone(),
instance_id,
cancel.clone(),
StreamKind::Stderr,
)
});
tracing::info!(
module = %cfg.name,
instance_id = %instance_id,
pid = ?pid,
"Spawned OoP module with log forwarding"
);
let handle = InstanceHandle {
module: cfg.name.clone(),
instance_id,
backend: BackendKind::LocalProcess,
pid,
created_at: std::time::Instant::now(),
};
{
let mut instances = self.instances.write();
instances.insert(
instance_id,
LocalInstance {
handle: handle.clone(),
child,
stdout_forwarder,
stderr_forwarder,
},
);
}
Ok(handle)
}
async fn stop_instance(&self, handle: &InstanceHandle) -> Result<()> {
let local = {
let mut instances = self.instances.write();
instances.remove(&handle.instance_id)
};
if let Some(mut local) = local {
stop_child_with_grace(
&mut local.child,
&local.handle,
INSTANCE_STOP_GRACE_PERIOD,
"stop_instance",
)
.await;
} else {
tracing::debug!(
module = %handle.module,
instance_id = %handle.instance_id,
"stop_instance called for unknown instance, ignoring"
);
}
Ok(())
}
async fn list_instances(&self, module: &str) -> Result<Vec<InstanceHandle>> {
let instances = self.instances.read();
let result = instances
.values()
.filter(|inst| inst.handle.module == module)
.map(|inst| inst.handle.clone())
.collect();
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::time::Instant;
fn test_backend() -> LocalProcessBackend {
LocalProcessBackend::new(CancellationToken::new())
}
#[tokio::test]
async fn test_spawn_instance_requires_binary() {
let backend = test_backend();
let cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
let result = backend.spawn_instance(&cfg).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("executable_path must be set")
);
}
#[tokio::test]
async fn test_spawn_instance_requires_correct_backend() {
let backend = test_backend();
let mut cfg = OopModuleConfig::new("test_module", BackendKind::K8s);
cfg.binary = Some(PathBuf::from("/bin/echo"));
let result = backend.spawn_instance(&cfg).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("can only spawn LocalProcess")
);
}
#[tokio::test]
async fn test_spawn_list_stop_lifecycle() {
let backend = test_backend();
let mut cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
#[cfg(windows)]
let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
#[cfg(not(windows))]
let binary = PathBuf::from("/bin/sleep");
cfg.binary = Some(binary);
cfg.args = vec!["10".to_owned()];
let handle = backend
.spawn_instance(&cfg)
.await
.expect("should spawn instance");
assert_eq!(handle.module, "test_module");
assert!(!handle.instance_id.is_nil());
assert_eq!(handle.backend, BackendKind::LocalProcess);
let instances = backend
.list_instances("test_module")
.await
.expect("should list instances");
assert_eq!(instances.len(), 1);
assert_eq!(instances[0].module, "test_module");
assert_eq!(instances[0].instance_id, handle.instance_id);
backend
.stop_instance(&handle)
.await
.expect("should stop instance");
let instances = backend
.list_instances("test_module")
.await
.expect("should list instances");
assert_eq!(instances.len(), 0);
}
#[tokio::test]
async fn test_list_instances_filters_by_module() {
let backend = test_backend();
#[cfg(windows)]
let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
#[cfg(not(windows))]
let binary = PathBuf::from("/bin/sleep");
let mut cfg_a = OopModuleConfig::new("module_a", BackendKind::LocalProcess);
cfg_a.binary = Some(binary.clone());
cfg_a.args = vec!["10".to_owned()];
let handle_a = backend
.spawn_instance(&cfg_a)
.await
.expect("should spawn module_a");
let mut cfg_b = OopModuleConfig::new("module_b", BackendKind::LocalProcess);
cfg_b.binary = Some(binary);
cfg_b.args = vec!["10".to_owned()];
let handle_b = backend
.spawn_instance(&cfg_b)
.await
.expect("should spawn module_b");
let instances_a = backend
.list_instances("module_a")
.await
.expect("should list module_a");
assert_eq!(instances_a.len(), 1);
assert_eq!(instances_a[0].module, "module_a");
let instances_b = backend
.list_instances("module_b")
.await
.expect("should list module_b");
assert_eq!(instances_b.len(), 1);
assert_eq!(instances_b[0].module, "module_b");
backend.stop_instance(&handle_a).await.ok();
backend.stop_instance(&handle_b).await.ok();
}
#[tokio::test]
async fn test_stop_nonexistent_instance() {
let backend = test_backend();
let handle = InstanceHandle {
module: "test_module".to_owned(),
instance_id: Uuid::new_v4(),
backend: BackendKind::LocalProcess,
pid: None,
created_at: Instant::now(),
};
let result = backend.stop_instance(&handle).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_list_instances_empty() {
let backend = test_backend();
let instances = backend
.list_instances("nonexistent_module")
.await
.expect("should list instances");
assert_eq!(instances.len(), 0);
}
mod send_terminate_signal_tests {
#[cfg(unix)]
use {super::send_terminate_signal, std::time::Duration};
#[cfg(unix)]
#[tokio::test]
async fn test_send_terminate_signal_to_valid_process() {
let mut cmd = tokio::process::Command::new("sleep");
cmd.args(["30"]);
let mut child = cmd.spawn().expect("should spawn test process");
let result = send_terminate_signal(&child);
assert!(result, "Should successfully send SIGTERM to valid process");
tokio::time::timeout(Duration::from_secs(1), child.wait())
.await
.expect("process should exit within timeout")
.expect("wait should succeed");
}
#[cfg(unix)]
#[tokio::test]
async fn test_send_terminate_signal_to_exited_process() {
let mut cmd = tokio::process::Command::new("/bin/sh");
cmd.args(["-c", "exit 0"]);
let mut child = cmd.spawn().expect("should spawn test process");
tokio::time::timeout(Duration::from_millis(100), child.wait())
.await
.expect("process should exit within timeout")
.expect("wait should succeed");
let result = send_terminate_signal(&child);
assert!(!result, "Should return false for already-exited process");
}
#[cfg(unix)]
#[test]
fn test_pid_conversion_edge_case_documentation() {
let max_u32_pid: u32 = u32::MAX;
let result = i32::try_from(max_u32_pid);
assert!(result.is_err(), "u32::MAX should not fit in i32");
}
#[cfg(unix)]
#[test]
fn test_pid_conversion_normal_range() {
let normal_pid: u32 = 12345;
let result = i32::try_from(normal_pid);
assert!(result.is_ok(), "Normal PID should convert to i32");
assert_eq!(result.unwrap(), 12345);
}
}
}