use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::process::{Child, Command};
use tokio::time::{timeout, Duration};
use tracing::{debug, info, warn};
use crate::utils::retry::{retry_async_with_backoff, RetryConfig};
use crate::module::ipc::ModuleIpcClient;
#[cfg(unix)]
use crate::module::ipc::protocol::{MessageType, RequestMessage, RequestPayload};
use crate::module::sandbox::{FileSystemSandbox, NetworkSandbox, ProcessSandbox, SandboxConfig};
use crate::module::traits::{ModuleContext, ModuleError};
pub struct ModuleProcessSpawner {
pub modules_dir: PathBuf,
pub data_dir: PathBuf,
pub socket_dir: PathBuf,
process_sandbox: Option<ProcessSandbox>,
filesystem_sandbox: Option<FileSystemSandbox>,
network_sandbox: NetworkSandbox,
resource_limits_config: Option<crate::config::ModuleResourceLimitsConfig>,
}
impl ModuleProcessSpawner {
pub fn new<P: AsRef<Path>>(modules_dir: P, data_dir: P, socket_dir: P) -> Self {
Self::with_config(modules_dir, data_dir, socket_dir, None)
}
pub fn with_config<P: AsRef<Path>>(
modules_dir: P,
data_dir: P,
socket_dir: P,
resource_limits_config: Option<&crate::config::ModuleResourceLimitsConfig>,
) -> Self {
let data_dir_path = data_dir.as_ref().to_path_buf();
let limits_config = resource_limits_config
.cloned()
.unwrap_or_else(crate::config::ModuleResourceLimitsConfig::default);
let sandbox_config = SandboxConfig::with_resource_limits(&data_dir_path, &limits_config);
let process_sandbox = Some(ProcessSandbox::new(sandbox_config));
let filesystem_sandbox = Some(FileSystemSandbox::new(&data_dir_path));
let network_sandbox = NetworkSandbox::new();
Self {
modules_dir: modules_dir.as_ref().to_path_buf(),
data_dir: data_dir_path,
socket_dir: socket_dir.as_ref().to_path_buf(),
process_sandbox,
filesystem_sandbox,
network_sandbox,
resource_limits_config: Some(limits_config),
}
}
pub async fn spawn(
&self,
module_name: &str,
binary_path: &Path,
context: ModuleContext,
) -> Result<ModuleProcess, ModuleError> {
info!("Spawning module process: {}", module_name);
if !binary_path.exists() {
return Err(ModuleError::ModuleNotFound(format!(
"Module binary not found: {binary_path:?}"
)));
}
let module_data_dir = self.data_dir.join(module_name);
std::fs::create_dir_all(&module_data_dir).map_err(|e| {
ModuleError::InitializationError(format!("Failed to create module data directory: {e}"))
})?;
if let Some(ref fs_sandbox) = self.filesystem_sandbox {
fs_sandbox.validate_path(&module_data_dir)?;
}
let socket_path = PathBuf::from(&context.socket_path);
let mut command = Command::new(binary_path);
command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("MODULE_NAME", module_name)
.env("MODULE_ID", &context.module_id)
.env("SOCKET_PATH", &socket_path)
.env("DATA_DIR", &module_data_dir);
for (key, value) in &context.config {
command.env(
format!("MODULE_CONFIG_{}", key.to_uppercase().as_str()),
value,
);
}
debug!(
"Spawning process: {:?} with args: {:?}",
binary_path, command
);
let child = command.spawn().map_err(|e| {
ModuleError::InitializationError(format!("Failed to spawn module process: {e}"))
})?;
if let Some(ref sandbox) = self.process_sandbox {
let pid = child.id();
if let Err(e) = sandbox.apply_limits(pid) {
warn!(
"Failed to apply resource limits to module {}: {}",
module_name, e
);
}
}
let startup_wait = self
.resource_limits_config
.as_ref()
.map(|c| c.module_startup_wait_millis)
.unwrap_or(100);
tokio::time::sleep(Duration::from_millis(startup_wait)).await;
let socket_timeout = self
.resource_limits_config
.as_ref()
.map(|c| c.module_socket_timeout_seconds)
.unwrap_or(5);
let socket_ready = timeout(
Duration::from_secs(socket_timeout),
self.wait_for_socket(&socket_path),
)
.await;
match socket_ready {
Ok(Ok(_)) => {
info!("Module {} socket ready", module_name);
}
Ok(Err(e)) => {
return Err(ModuleError::InitializationError(format!(
"Failed to wait for module socket: {e}"
)));
}
Err(_) => {
return Err(ModuleError::Timeout);
}
}
let mut ipc_client = ModuleIpcClient::connect(&socket_path)
.await
.map_err(|e| ModuleError::IpcError(format!("Failed to connect to node IPC: {e}")))?;
let version = context
.config
.get("version")
.cloned()
.unwrap_or_else(|| "0.1.0".to_string());
let handshake = RequestMessage {
correlation_id: 1,
request_type: MessageType::Handshake,
payload: RequestPayload::Handshake {
module_id: context.module_id.clone(),
module_name: module_name.to_string(),
version,
},
};
let response = ipc_client
.request(handshake)
.await
.map_err(|e| ModuleError::IpcError(format!("Failed to send handshake: {e}")))?;
if !response.success {
return Err(ModuleError::IpcError(
response
.error
.unwrap_or_else(|| "Handshake failed".to_string()),
));
}
let client = Some(ipc_client);
Ok(ModuleProcess {
module_name: module_name.to_string(),
process: child,
socket_path,
client,
})
}
async fn wait_for_socket(&self, socket_path: &Path) -> Result<(), ModuleError> {
let check_interval = self
.resource_limits_config
.as_ref()
.map(|c| c.module_socket_check_interval_millis)
.unwrap_or(100);
let max_attempts = self
.resource_limits_config
.as_ref()
.map(|c| c.module_socket_max_attempts)
.unwrap_or(50);
let path = socket_path.to_path_buf();
let config = RetryConfig::new(
max_attempts.try_into().unwrap_or(u32::MAX),
Duration::from_millis(check_interval),
);
retry_async_with_backoff(&config, || {
let p = path.clone();
async move {
#[cfg(unix)]
if p.exists() {
return Ok(());
}
#[cfg(windows)]
if ModuleIpcClient::connect(&p).await.is_ok() {
return Ok(());
}
Err(SocketNotReady)
}
})
.await
.map_err(|_| {
ModuleError::InitializationError(
"Module socket/pipe did not appear within timeout".to_string(),
)
})
}
}
struct SocketNotReady;
impl std::fmt::Display for SocketNotReady {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "socket not ready")
}
}
pub struct ModuleProcess {
pub module_name: String,
pub process: Child,
pub socket_path: PathBuf,
client: Option<ModuleIpcClient>,
}
impl ModuleProcess {
pub fn id(&self) -> Option<u32> {
self.process.id()
}
pub fn is_running(&mut self) -> bool {
!matches!(self.process.try_wait(), Ok(Some(_)))
}
pub async fn wait(&mut self) -> Result<Option<std::process::ExitStatus>, ModuleError> {
self.process
.wait()
.await
.map_err(|e| ModuleError::op_err("Failed to wait for process", e))
.map(Some)
}
pub async fn kill(&mut self) -> Result<(), ModuleError> {
debug!("Killing module process: {}", self.module_name);
if let Err(e) = self.process.kill().await {
warn!("Failed to kill module process {}: {}", self.module_name, e);
}
let _ = self.process.wait().await;
if self.socket_path.exists() {
if let Err(e) = std::fs::remove_file(&self.socket_path) {
warn!("Failed to remove socket file {:?}: {}", self.socket_path, e);
}
}
Ok(())
}
pub fn client_mut(&mut self) -> Option<&mut ModuleIpcClient> {
self.client.as_mut()
}
pub fn take_client(&mut self) -> Option<ModuleIpcClient> {
self.client.take()
}
}