use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, Instant, SystemTime},
};
use serde::{Deserialize, Serialize};
use crate::{
config::{SpawnLimitsConfig, TerminationPolicy},
error::ProcessManagerError,
};
#[derive(Debug, Clone)]
pub struct SpawnTree {
pub service_name: String,
pub max_depth: usize,
pub max_children: usize,
pub max_descendants: usize,
pub memory_quota: Option<u64>,
pub memory_used: u64,
pub termination_policy: TerminationPolicy,
pub current_depth: usize,
pub total_descendants: usize,
}
impl SpawnTree {
pub fn from_config(service_name: String, config: &SpawnLimitsConfig) -> Self {
Self {
service_name,
max_depth: config.depth.unwrap_or(3) as usize,
max_children: config.children.unwrap_or(100) as usize,
max_descendants: config.descendants.unwrap_or(500) as usize,
memory_quota: config
.total_memory
.as_ref()
.and_then(|m| parse_memory_limit(m)),
memory_used: 0,
termination_policy: config
.termination_policy
.clone()
.unwrap_or(TerminationPolicy::Cascade),
current_depth: 0,
total_descendants: 0,
}
}
pub fn can_spawn(&self, depth: usize) -> Result<(), ProcessManagerError> {
if depth >= self.max_depth {
return Err(ProcessManagerError::SpawnLimitExceeded(
"Maximum spawn depth reached".into(),
));
}
if self.total_descendants >= self.max_descendants {
return Err(ProcessManagerError::SpawnLimitExceeded(
"Descendant limit exceeded".into(),
));
}
if let Some(quota) = self.memory_quota
&& self.memory_used >= quota
{
return Err(ProcessManagerError::SpawnLimitExceeded(
"Memory quota exceeded".into(),
));
}
Ok(())
}
pub fn create_child(&self) -> Self {
let mut child = self.clone();
child.current_depth += 1;
child
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpawnedChild {
pub name: String,
pub pid: u32,
pub parent_pid: u32,
pub command: String,
pub started_at: SystemTime,
#[serde(skip_serializing_if = "Option::is_none")]
pub ttl: Option<Duration>,
pub depth: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cpu_percent: Option<f32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rss_bytes: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_exit: Option<SpawnedExit>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
#[serde(default, skip_serializing_if = "is_spawned_kind")]
pub kind: SpawnedChildKind,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SpawnedChildKind {
Spawned,
Peripheral,
}
impl Default for SpawnedChildKind {
fn default() -> Self {
Self::Spawned
}
}
fn is_spawned_kind(kind: &SpawnedChildKind) -> bool {
matches!(kind, SpawnedChildKind::Spawned)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpawnedExit {
#[serde(skip_serializing_if = "Option::is_none")]
pub exit_code: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub signal: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub finished_at: Option<SystemTime>,
}
#[derive(Debug, Clone)]
pub struct SpawnAuthorization {
pub depth: usize,
pub root_service: Option<String>,
}
#[derive(Clone)]
pub struct DynamicSpawnManager {
spawn_trees: Arc<Mutex<HashMap<String, SpawnTree>>>,
service_pids: Arc<Mutex<HashMap<u32, String>>>,
children_by_parent: Arc<Mutex<HashMap<u32, Vec<SpawnedChild>>>>,
children_by_pid: Arc<Mutex<HashMap<u32, SpawnedChild>>>,
spawn_timestamps: Arc<Mutex<HashMap<u32, Vec<Instant>>>>,
}
impl DynamicSpawnManager {
pub fn new() -> Self {
Self {
spawn_trees: Arc::new(Mutex::new(HashMap::new())),
service_pids: Arc::new(Mutex::new(HashMap::new())),
children_by_parent: Arc::new(Mutex::new(HashMap::new())),
children_by_pid: Arc::new(Mutex::new(HashMap::new())),
spawn_timestamps: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn register_service(
&self,
service_name: String,
config: &SpawnLimitsConfig,
) -> Result<(), ProcessManagerError> {
let mut trees = self.spawn_trees.lock().unwrap();
trees.insert(
service_name.clone(),
SpawnTree::from_config(service_name, config),
);
Ok(())
}
pub fn register_service_pid(&self, service_name: String, pid: u32) {
let mut service_pids = self.service_pids.lock().unwrap();
service_pids.insert(pid, service_name);
}
pub fn authorize_spawn(
&self,
parent_pid: u32,
_child_name: &str,
) -> Result<SpawnAuthorization, ProcessManagerError> {
self.check_rate_limit(parent_pid)?;
let trees = self.spawn_trees.lock().unwrap();
let children = self.children_by_pid.lock().unwrap();
let depth = if let Some(parent_info) = children.get(&parent_pid) {
parent_info.depth + 1
} else {
1
};
let (root_service, tree) = self.find_spawn_tree(parent_pid, &trees, &children)?;
tree.can_spawn(depth)?;
let parent_children = self.children_by_parent.lock().unwrap();
if let Some(siblings) = parent_children.get(&parent_pid)
&& siblings.len() >= tree.max_children
{
return Err(ProcessManagerError::SpawnLimitExceeded(
"Maximum direct children reached".into(),
));
}
Ok(SpawnAuthorization {
depth,
root_service: Some(root_service),
})
}
pub fn record_spawn(
&self,
parent_pid: u32,
child: SpawnedChild,
root_hint: Option<String>,
) -> Result<Option<String>, ProcessManagerError> {
{
let mut children_by_parent = self.children_by_parent.lock().unwrap();
children_by_parent
.entry(parent_pid)
.or_default()
.push(child.clone());
}
{
let mut children_by_pid = self.children_by_pid.lock().unwrap();
children_by_pid.insert(child.pid, child.clone());
}
let mut service_name =
root_hint.or_else(|| self.resolve_root_service_name(parent_pid));
if service_name.is_none() {
service_name = self.resolve_root_service_name(child.pid);
}
{
let mut trees = self.spawn_trees.lock().unwrap();
if let Some(name) = service_name.as_ref()
&& let Some(tree) = trees.get_mut(name)
{
tree.total_descendants += 1;
} else if trees.len() == 1
&& let Some((_, tree)) = trees.iter_mut().next()
{
tree.total_descendants += 1;
}
}
{
let mut timestamps = self.spawn_timestamps.lock().unwrap();
timestamps
.entry(parent_pid)
.or_default()
.push(Instant::now());
}
Ok(service_name)
}
pub fn record_spawn_exit(
&self,
child_pid: u32,
exit: SpawnedExit,
) -> Option<SpawnedChild> {
let mut children_by_pid = self.children_by_pid.lock().unwrap();
let updated = children_by_pid.get_mut(&child_pid).map(|child| {
child.last_exit = Some(exit.clone());
child.clone()
});
if updated.is_some() {
let mut children_by_parent = self.children_by_parent.lock().unwrap();
for siblings in children_by_parent.values_mut() {
if let Some(node) =
siblings.iter_mut().find(|sibling| sibling.pid == child_pid)
{
node.last_exit = Some(exit.clone());
break;
}
}
}
updated
}
pub fn update_child_metrics(
&self,
child_pid: u32,
cpu_percent: Option<f32>,
rss_bytes: Option<u64>,
) {
{
let mut children_by_pid = self.children_by_pid.lock().unwrap();
if let Some(child) = children_by_pid.get_mut(&child_pid) {
child.cpu_percent = cpu_percent;
child.rss_bytes = rss_bytes;
}
}
let mut children_by_parent = self.children_by_parent.lock().unwrap();
for siblings in children_by_parent.values_mut() {
if let Some(node) =
siblings.iter_mut().find(|sibling| sibling.pid == child_pid)
{
node.cpu_percent = cpu_percent;
node.rss_bytes = rss_bytes;
break;
}
}
}
pub fn get_children(&self, parent_pid: u32) -> Vec<SpawnedChild> {
let children = self.children_by_parent.lock().unwrap();
children.get(&parent_pid).cloned().unwrap_or_default()
}
pub fn get_spawn_tree(&self, pid: u32) -> Option<SpawnTree> {
let trees = self.spawn_trees.lock().unwrap();
let children = self.children_by_pid.lock().unwrap();
self.find_spawn_tree(pid, &trees, &children)
.map(|(_, tree)| tree.clone())
.ok()
}
pub fn termination_policy_for(&self, pid: u32) -> Option<TerminationPolicy> {
let trees = self.spawn_trees.lock().unwrap();
let children = self.children_by_pid.lock().unwrap();
self.find_spawn_tree(pid, &trees, &children)
.map(|(_, tree)| tree.termination_policy.clone())
.ok()
}
pub fn remove_subtree(&self, root_pid: u32) -> Vec<SpawnedChild> {
let mut removed = Vec::new();
let mut pid_guard = self.children_by_pid.lock().unwrap();
let mut parent_guard = self.children_by_parent.lock().unwrap();
let Some(root_child) = pid_guard.get(&root_pid).cloned() else {
return removed;
};
let mut stack = vec![root_child.clone()];
let ancestor_parent = root_child.parent_pid;
while let Some(node) = stack.pop() {
let pid = node.pid;
if let Some(children) = parent_guard.remove(&pid) {
for child in children.into_iter().rev() {
stack.push(child.clone());
}
}
if let Some(child) = pid_guard.remove(&pid) {
removed.push(child);
}
}
if let Some(siblings) = parent_guard.get_mut(&ancestor_parent) {
siblings.retain(|s| s.pid != root_pid);
if siblings.is_empty() {
parent_guard.remove(&ancestor_parent);
}
}
drop(parent_guard);
drop(pid_guard);
if !removed.is_empty() {
let mut timestamps = self.spawn_timestamps.lock().unwrap();
for child in &removed {
timestamps.remove(&child.pid);
}
}
removed
}
fn check_rate_limit(&self, parent_pid: u32) -> Result<(), ProcessManagerError> {
let mut timestamps = self.spawn_timestamps.lock().unwrap();
let now = Instant::now();
if let Some(recent_spawns) = timestamps.get_mut(&parent_pid) {
recent_spawns.retain(|t| now.duration_since(*t) < Duration::from_secs(1));
if recent_spawns.len() >= 10 {
return Err(ProcessManagerError::SpawnLimitExceeded(
"Spawn rate limit exceeded (max 10/sec)".into(),
));
}
}
Ok(())
}
fn find_spawn_tree<'a>(
&self,
pid: u32,
trees: &'a HashMap<String, SpawnTree>,
children: &HashMap<u32, SpawnedChild>,
) -> Result<(String, &'a SpawnTree), ProcessManagerError> {
let service_pids = self.service_pids.lock().unwrap();
if let Some(service_name) = service_pids.get(&pid)
&& let Some(tree) = trees.get(service_name)
{
return Ok((service_name.clone(), tree));
}
let mut current_pid = pid;
while let Some(child_info) = children.get(¤t_pid) {
if let Some(parent_service) = service_pids.get(&child_info.parent_pid)
&& let Some(tree) = trees.get(parent_service)
{
return Ok((parent_service.clone(), tree));
}
current_pid = child_info.parent_pid;
}
if let Some(service_name) = service_pids.get(¤t_pid)
&& let Some(tree) = trees.get(service_name)
{
return Ok((service_name.clone(), tree));
}
if trees.len() == 1
&& let Some((name, tree)) = trees.iter().next()
{
return Ok((name.clone(), tree));
}
Err(ProcessManagerError::SpawnAuthorizationFailed(
"No spawn tree found for process".into(),
))
}
pub fn remove_child(&self, child_pid: u32) -> Option<SpawnedChild> {
let child = {
let mut children_by_pid = self.children_by_pid.lock().unwrap();
children_by_pid.remove(&child_pid)
};
if let Some(child) = child {
let mut children_by_parent = self.children_by_parent.lock().unwrap();
if let Some(siblings) = children_by_parent.get_mut(&child.parent_pid) {
siblings.retain(|c| c.pid != child_pid);
if siblings.is_empty() {
children_by_parent.remove(&child.parent_pid);
}
}
Some(child)
} else {
None
}
}
fn resolve_root_service_name(&self, mut pid: u32) -> Option<String> {
loop {
{
let service_pids = self.service_pids.lock().unwrap();
if let Some(service_name) = service_pids.get(&pid) {
return Some(service_name.clone());
}
}
let next_pid = {
let children_by_pid = self.children_by_pid.lock().unwrap();
children_by_pid.get(&pid).map(|child| child.parent_pid)
};
match next_pid {
Some(parent) => pid = parent,
None => return None,
}
}
}
}
fn parse_memory_limit(input: &str) -> Option<u64> {
let trimmed = input.trim();
let normalized = trimmed.replace('_', "");
let without_bytes = normalized.trim_end_matches(&['B', 'b'][..]);
let (number_part, factor) = match without_bytes.chars().last() {
Some(suffix) if suffix.is_ascii_alphabetic() => {
let len = without_bytes.len() - suffix.len_utf8();
let number_part = &without_bytes[..len];
let multiplier = match suffix.to_ascii_uppercase() {
'K' => 1u64 << 10,
'M' => 1u64 << 20,
'G' => 1u64 << 30,
'T' => 1u64 << 40,
_ => return None,
};
(number_part.trim(), multiplier)
}
_ => (without_bytes.trim(), 1u64),
};
number_part.parse::<u64>().ok().map(|v| v * factor)
}
impl Default for DynamicSpawnManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
#[test]
fn record_spawn_completes_without_deadlock() {
let manager = DynamicSpawnManager::new();
let limits = SpawnLimitsConfig {
children: Some(10),
depth: Some(6),
descendants: Some(50),
total_memory: None,
termination_policy: Some(TerminationPolicy::Cascade),
};
manager
.register_service("svc".to_string(), &limits)
.unwrap();
manager.register_service_pid("svc".to_string(), 1);
let child = SpawnedChild {
name: "child".to_string(),
pid: 2,
parent_pid: 1,
command: "cmd".to_string(),
started_at: SystemTime::now(),
ttl: None,
depth: 1,
cpu_percent: None,
rss_bytes: None,
last_exit: None,
user: None,
kind: SpawnedChildKind::Spawned,
};
let (tx, rx) = std::sync::mpsc::channel();
let manager_clone = manager.clone();
std::thread::spawn(move || {
manager_clone
.record_spawn(1, child, None)
.expect("record_spawn should succeed");
tx.send(()).expect("should signal completion");
});
assert!(
rx.recv_timeout(Duration::from_secs(1)).is_ok(),
"record_spawn did not complete in time"
);
}
#[test]
fn record_spawn_uses_root_hint_when_parent_untracked() {
let manager = DynamicSpawnManager::new();
let limits = SpawnLimitsConfig {
children: Some(10),
depth: Some(6),
descendants: Some(50),
total_memory: None,
termination_policy: Some(TerminationPolicy::Cascade),
};
manager
.register_service("svc".to_string(), &limits)
.unwrap();
let child = SpawnedChild {
name: "child".to_string(),
pid: 42,
parent_pid: 9999,
command: "cmd".to_string(),
started_at: SystemTime::now(),
ttl: None,
depth: 1,
cpu_percent: None,
rss_bytes: None,
last_exit: None,
user: None,
kind: SpawnedChildKind::Spawned,
};
let root = manager
.record_spawn(9999, child, Some("svc".to_string()))
.expect("record_spawn should succeed");
assert_eq!(root.as_deref(), Some("svc"));
}
#[test]
fn record_spawn_exit_tracks_metadata() {
let manager = DynamicSpawnManager::new();
let limits = SpawnLimitsConfig {
children: Some(10),
depth: Some(6),
descendants: Some(50),
total_memory: None,
termination_policy: Some(TerminationPolicy::Cascade),
};
manager
.register_service("svc".to_string(), &limits)
.unwrap();
manager.register_service_pid("svc".to_string(), 1);
let child = SpawnedChild {
name: "child".to_string(),
pid: 2,
parent_pid: 1,
command: "cmd".to_string(),
started_at: SystemTime::now(),
ttl: None,
depth: 1,
cpu_percent: None,
rss_bytes: None,
last_exit: None,
user: None,
kind: SpawnedChildKind::Spawned,
};
manager
.record_spawn(1, child, Some("svc".to_string()))
.expect("record_spawn should succeed");
let exit = SpawnedExit {
exit_code: Some(0),
signal: None,
finished_at: Some(SystemTime::now()),
};
manager.record_spawn_exit(2, exit.clone());
let children = manager.get_children(1);
assert_eq!(children.len(), 1);
let recorded_exit = children[0]
.last_exit
.as_ref()
.expect("exit metadata present");
assert_eq!(recorded_exit.exit_code, exit.exit_code);
}
#[test]
fn update_child_metrics_caches_latest_values() {
let manager = DynamicSpawnManager::new();
let limits = SpawnLimitsConfig {
children: Some(10),
depth: Some(6),
descendants: Some(50),
total_memory: None,
termination_policy: Some(TerminationPolicy::Cascade),
};
manager
.register_service("svc".to_string(), &limits)
.unwrap();
manager.register_service_pid("svc".to_string(), 1);
let child = SpawnedChild {
name: "child".to_string(),
pid: 2,
parent_pid: 1,
command: "cmd".to_string(),
started_at: SystemTime::now(),
ttl: None,
depth: 1,
cpu_percent: None,
rss_bytes: None,
last_exit: None,
user: None,
kind: SpawnedChildKind::Spawned,
};
manager
.record_spawn(1, child, Some("svc".to_string()))
.expect("record_spawn should succeed");
manager.update_child_metrics(2, Some(42.0), Some(1024));
let children = manager.get_children(1);
assert_eq!(children[0].cpu_percent, Some(42.0));
assert_eq!(children[0].rss_bytes, Some(1024));
}
#[test]
fn termination_policy_for_returns_configured_policy() {
let manager = DynamicSpawnManager::new();
let limits = SpawnLimitsConfig {
children: Some(10),
depth: Some(6),
descendants: Some(50),
total_memory: None,
termination_policy: Some(TerminationPolicy::Orphan),
};
manager
.register_service("svc".to_string(), &limits)
.unwrap();
manager.register_service_pid("svc".to_string(), 1);
let child = SpawnedChild {
name: "child".to_string(),
pid: 2,
parent_pid: 1,
command: "cmd".to_string(),
started_at: SystemTime::now(),
ttl: None,
depth: 1,
cpu_percent: None,
rss_bytes: None,
last_exit: None,
user: None,
kind: SpawnedChildKind::Spawned,
};
manager
.record_spawn(1, child, Some("svc".to_string()))
.expect("record_spawn should succeed");
let policy = manager
.termination_policy_for(2)
.expect("termination policy should be resolvable");
assert_eq!(policy, TerminationPolicy::Orphan);
}
#[test]
fn remove_subtree_removes_all_descendants() {
let manager = DynamicSpawnManager::new();
let limits = SpawnLimitsConfig {
children: Some(10),
depth: Some(6),
descendants: Some(50),
total_memory: None,
termination_policy: Some(TerminationPolicy::Cascade),
};
manager
.register_service("svc".to_string(), &limits)
.unwrap();
manager.register_service_pid("svc".to_string(), 1);
let child = SpawnedChild {
name: "child".to_string(),
pid: 2,
parent_pid: 1,
command: "cmd".to_string(),
started_at: SystemTime::now(),
ttl: None,
depth: 1,
cpu_percent: None,
rss_bytes: None,
last_exit: None,
user: None,
kind: SpawnedChildKind::Spawned,
};
let grandchild = SpawnedChild {
name: "grandchild".to_string(),
pid: 3,
parent_pid: 2,
command: "cmd".to_string(),
started_at: SystemTime::now(),
ttl: None,
depth: 2,
cpu_percent: None,
rss_bytes: None,
last_exit: None,
user: None,
kind: SpawnedChildKind::Spawned,
};
manager
.record_spawn(1, child, Some("svc".to_string()))
.expect("record_spawn should succeed");
manager
.record_spawn(2, grandchild, Some("svc".to_string()))
.expect("record_spawn should succeed");
let removed = manager.remove_subtree(2);
let removed_pids: HashSet<_> = removed.into_iter().map(|c| c.pid).collect();
assert_eq!(removed_pids, HashSet::from([2, 3]));
assert!(
manager.get_children(1).is_empty(),
"parent should have no children"
);
assert!(
manager.get_children(2).is_empty(),
"removed child should have no tracked descendants"
);
}
}