use super::spec::{DiscoverSpec, RigSpec};
use crate::error::Result;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ServiceStatus {
Running(u32),
Stopped,
Stale(u32),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DiscoveredProcess {
pub pid: u32,
pub started_at_epoch: u64,
}
pub fn start(rig: &RigSpec, service_id: &str) -> Result<u32> {
platform::start(rig, service_id)
}
pub fn stop(rig: &RigSpec, service_id: &str) -> Result<()> {
platform::stop(rig, service_id)
}
pub fn status(rig_id: &str, service_id: &str) -> Result<ServiceStatus> {
platform::status(rig_id, service_id)
}
pub fn log_path(rig_id: &str, service_id: &str) -> Result<std::path::PathBuf> {
platform::log_file_path(rig_id, service_id)
}
pub fn discover_newest(pattern: &str) -> Result<Option<DiscoveredProcess>> {
platform::discover_newest(pattern, &[])
}
pub fn discover_newest_for_spec(discover: &DiscoverSpec) -> Result<Option<DiscoveredProcess>> {
platform::discover_newest(&discover.pattern, &discover.argv_contains)
}
pub fn discover_external_pid(discover: &DiscoverSpec) -> Result<Option<u32>> {
Ok(discover_newest_for_spec(discover)?.map(|p| p.pid))
}
#[cfg(unix)]
pub fn parse_etime_seconds(s: &str) -> Option<u64> {
platform::parse_etime_seconds(s)
}
#[cfg(unix)]
mod platform {
use std::fs::{File, OpenOptions};
use std::os::unix::process::CommandExt;
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::thread;
use std::time::{Duration, Instant};
use super::super::expand::expand_vars;
use super::super::spec::{RigSpec, ServiceKind, ServiceSpec};
use super::super::state::{now_rfc3339, ServiceState};
use super::ServiceStatus;
use crate::error::{Error, Result};
use crate::paths;
pub fn start(rig: &RigSpec, service_id: &str) -> Result<u32> {
let spec = rig.services.get(service_id).ok_or_else(|| {
Error::rig_service_failed(&rig.id, service_id, "service not declared in rig spec")
})?;
if spec.kind == ServiceKind::External {
return Err(Error::rig_service_failed(
&rig.id,
service_id,
"external services are adopted, not spawned — `start` is not supported. Use `stop` to recycle a discovered process.",
));
}
let mut state = super::super::state::RigState::load(&rig.id)?;
if let Some(svc_state) = state.services.get(service_id) {
if let Some(pid) = svc_state.pid {
if pid_alive(pid) {
return Ok(pid);
}
}
}
let (program, args) = build_command(rig, service_id, spec)?;
let cwd = resolve_cwd(rig, spec)?;
let log_path = log_file_for(&rig.id, service_id)?;
let log_file = open_log(&log_path)?;
let err_file = log_file
.try_clone()
.map_err(|e| Error::internal_unexpected(format!("failed to clone log fd: {}", e)))?;
let mut command = Command::new(&program);
command
.args(&args)
.stdin(Stdio::null())
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(err_file));
if let Some(cwd) = cwd {
command.current_dir(cwd);
}
for (k, v) in &spec.env {
command.env(k, expand_vars(rig, v));
}
unsafe {
command.pre_exec(|| {
libc::setsid();
Ok(())
});
}
let child = command.spawn().map_err(|e| {
Error::rig_service_failed(&rig.id, service_id, format!("spawn failed: {}", e))
})?;
let pid = child.id();
std::mem::forget(child);
state.services.insert(
service_id.to_string(),
ServiceState {
pid: Some(pid),
started_at: Some(now_rfc3339()),
status: "running".to_string(),
},
);
state.save(&rig.id)?;
Ok(pid)
}
pub fn stop(rig: &RigSpec, service_id: &str) -> Result<()> {
let spec = rig.services.get(service_id).ok_or_else(|| {
Error::rig_service_failed(&rig.id, service_id, "service not declared in rig spec")
})?;
let mut state = super::super::state::RigState::load(&rig.id)?;
let pid = if spec.kind == ServiceKind::External {
let discover = spec.discover.as_ref().ok_or_else(|| {
Error::rig_service_failed(
&rig.id,
service_id,
"external service requires `discover.pattern`",
)
})?;
let expanded = expand_discover(rig, discover);
match super::discover_external_pid(&expanded)? {
Some(pid) => pid,
None => return Ok(()),
}
} else {
match state.services.get(service_id).and_then(|s| s.pid) {
Some(pid) => pid,
None => return Ok(()),
}
};
if !pid_alive(pid) {
state.services.remove(service_id);
state.save(&rig.id)?;
return Ok(());
}
let managed_process_group = spec.kind != ServiceKind::External;
signal(pid, managed_process_group, libc::SIGTERM);
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if !target_alive(pid, managed_process_group) {
break;
}
thread::sleep(Duration::from_millis(100));
}
if target_alive(pid, managed_process_group) {
signal(pid, managed_process_group, libc::SIGKILL);
thread::sleep(Duration::from_millis(200));
}
if managed_process_group {
reap_child(pid);
}
state.services.remove(service_id);
state.save(&rig.id)?;
Ok(())
}
pub fn status(rig_id: &str, service_id: &str) -> Result<ServiceStatus> {
let state = super::super::state::RigState::load(rig_id)?;
let pid = match state.services.get(service_id).and_then(|s| s.pid) {
Some(pid) => pid,
None => return Ok(ServiceStatus::Stopped),
};
if pid_alive(pid) {
Ok(ServiceStatus::Running(pid))
} else {
Ok(ServiceStatus::Stale(pid))
}
}
fn build_command(
rig: &RigSpec,
service_id: &str,
spec: &ServiceSpec,
) -> Result<(String, Vec<String>)> {
match spec.kind {
ServiceKind::HttpStatic => {
let port = spec.port.ok_or_else(|| {
Error::rig_service_failed(&rig.id, service_id, "http-static requires `port`")
})?;
Ok((
"python3".to_string(),
vec![
"-m".to_string(),
"http.server".to_string(),
port.to_string(),
],
))
}
ServiceKind::Command => {
let cmd = spec.command.as_ref().ok_or_else(|| {
Error::rig_service_failed(
&rig.id,
service_id,
"command kind requires `command`",
)
})?;
let expanded = expand_vars(rig, cmd);
Ok(("sh".to_string(), vec!["-c".to_string(), expanded]))
}
ServiceKind::External => {
Err(Error::rig_service_failed(
&rig.id,
service_id,
"external services cannot be spawned — adoption-only",
))
}
}
}
fn resolve_cwd(rig: &RigSpec, spec: &ServiceSpec) -> Result<Option<PathBuf>> {
match &spec.cwd {
None => Ok(None),
Some(cwd) => {
let expanded = expand_vars(rig, cwd);
let path = shellexpand::tilde(&expanded).into_owned();
Ok(Some(PathBuf::from(path)))
}
}
}
fn expand_discover(
rig: &RigSpec,
discover: &super::super::spec::DiscoverSpec,
) -> super::super::spec::DiscoverSpec {
super::super::spec::DiscoverSpec {
pattern: expand_vars(rig, &discover.pattern),
argv_contains: discover
.argv_contains
.iter()
.map(|selector| expand_vars(rig, selector))
.collect(),
}
}
pub(super) fn log_file_path(rig_id: &str, service_id: &str) -> Result<PathBuf> {
Ok(paths::rig_logs_dir(rig_id)?.join(format!("{}.log", service_id)))
}
fn log_file_for(rig_id: &str, service_id: &str) -> Result<PathBuf> {
let dir = paths::rig_logs_dir(rig_id)?;
std::fs::create_dir_all(&dir).map_err(|e| {
Error::internal_unexpected(format!(
"Failed to create logs dir {}: {}",
dir.display(),
e
))
})?;
log_file_path(rig_id, service_id)
}
fn open_log(path: &PathBuf) -> Result<File> {
OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| {
Error::internal_unexpected(format!("Failed to open log {}: {}", path.display(), e))
})
}
fn pid_alive(pid: u32) -> bool {
if pid == 0 {
return false;
}
unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
}
fn process_group_alive(pgid: u32) -> bool {
if pgid == 0 {
return false;
}
unsafe { libc::kill(-(pgid as libc::pid_t), 0) == 0 }
}
fn target_alive(pid: u32, process_group: bool) -> bool {
if process_group {
process_group_alive(pid)
} else {
pid_alive(pid)
}
}
fn signal(pid: u32, process_group: bool, sig: libc::c_int) {
let target = if process_group {
-(pid as libc::pid_t)
} else {
pid as libc::pid_t
};
unsafe {
libc::kill(target, sig);
}
}
fn reap_child(pid: u32) {
if pid > i32::MAX as u32 {
return;
}
let mut status = 0;
unsafe {
libc::waitpid(pid as libc::pid_t, &mut status, libc::WNOHANG);
}
}
pub fn discover_newest(
pattern: &str,
argv_contains: &[String],
) -> Result<Option<super::DiscoveredProcess>> {
let output = Command::new("ps")
.args(["-axo", "pid=,etime=,args="])
.output()
.map_err(|e| {
Error::internal_unexpected(format!("ps for process discovery failed: {}", e))
})?;
if !output.status.success() {
return Err(Error::internal_unexpected(format!(
"ps exited {}",
output.status.code().unwrap_or(-1)
)));
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let self_pid = std::process::id();
let stdout = String::from_utf8_lossy(&output.stdout);
Ok(discover_newest_from_ps(
pattern,
argv_contains,
now,
self_pid,
&stdout,
))
}
pub(super) fn discover_newest_from_ps(
pattern: &str,
argv_contains: &[String],
now: u64,
self_pid: u32,
ps_stdout: &str,
) -> Option<super::DiscoveredProcess> {
let mut newest: Option<super::DiscoveredProcess> = None;
for line in ps_stdout.lines() {
let trimmed = line.trim_start();
let pid_end = match trimmed.find(char::is_whitespace) {
Some(e) => e,
None => continue,
};
let pid: u32 = match trimmed[..pid_end].parse() {
Ok(p) => p,
Err(_) => continue,
};
let after_pid = trimmed[pid_end..].trim_start();
let etime_end = match after_pid.find(char::is_whitespace) {
Some(e) => e,
None => continue,
};
let etime_str = &after_pid[..etime_end];
let args = after_pid[etime_end..].trim_start();
if !args.contains(pattern) {
continue;
}
if argv_contains
.iter()
.any(|selector| !args.contains(selector))
{
continue;
}
if pid == self_pid {
continue;
}
let elapsed = match parse_etime_seconds(etime_str) {
Some(s) => s,
None => continue,
};
let started_at = now.saturating_sub(elapsed);
let candidate = super::DiscoveredProcess {
pid,
started_at_epoch: started_at,
};
match &newest {
None => newest = Some(candidate),
Some(curr) if candidate.started_at_epoch > curr.started_at_epoch => {
newest = Some(candidate)
}
_ => {}
}
}
newest
}
pub(super) fn parse_etime_seconds(s: &str) -> Option<u64> {
let (days, rest) = match s.split_once('-') {
Some((d, r)) => (d.parse::<u64>().ok()?, r),
None => (0u64, s),
};
let parts: Vec<&str> = rest.split(':').collect();
let (h, m, sec) = match parts.as_slice() {
[m, s] => (0u64, m.parse::<u64>().ok()?, s.parse::<u64>().ok()?),
[h, m, s] => (
h.parse::<u64>().ok()?,
m.parse::<u64>().ok()?,
s.parse::<u64>().ok()?,
),
_ => return None,
};
Some(days * 86_400 + h * 3_600 + m * 60 + sec)
}
}
#[cfg(not(unix))]
mod platform {
use super::super::spec::RigSpec;
use super::ServiceStatus;
use crate::error::{Error, Result};
const UNSUPPORTED: &str = "rig services are not supported on this platform (Unix only)";
pub fn start(rig: &RigSpec, service_id: &str) -> Result<u32> {
Err(Error::rig_service_failed(&rig.id, service_id, UNSUPPORTED))
}
pub fn stop(rig: &RigSpec, service_id: &str) -> Result<()> {
Err(Error::rig_service_failed(&rig.id, service_id, UNSUPPORTED))
}
pub fn status(rig_id: &str, service_id: &str) -> Result<ServiceStatus> {
Err(Error::rig_service_failed(rig_id, service_id, UNSUPPORTED))
}
pub fn log_file_path(rig_id: &str, service_id: &str) -> Result<std::path::PathBuf> {
Err(Error::rig_service_failed(rig_id, service_id, UNSUPPORTED))
}
pub fn discover_newest(
_pattern: &str,
_argv_contains: &[String],
) -> Result<Option<super::DiscoveredProcess>> {
Err(Error::internal_unexpected(UNSUPPORTED))
}
}
#[cfg(test)]
#[cfg(unix)]
#[path = "../../../tests/core/rig/service_test.rs"]
mod service_test;