use std::fmt;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ActorId(pub u32);
impl fmt::Display for ActorId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "actor:{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u32)]
pub enum ActorState {
Dormant = 0,
Initializing = 1,
Active = 2,
Draining = 3,
Terminated = 4,
Failed = 5,
}
impl ActorState {
pub fn from_u32(v: u32) -> Option<Self> {
match v {
0 => Some(Self::Dormant),
1 => Some(Self::Initializing),
2 => Some(Self::Active),
3 => Some(Self::Draining),
4 => Some(Self::Terminated),
5 => Some(Self::Failed),
_ => None,
}
}
pub fn is_alive(self) -> bool {
matches!(self, Self::Initializing | Self::Active)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RestartPolicy {
Permanent,
OneForOne {
max_restarts: u32,
window: Duration,
},
OneForAll {
max_restarts: u32,
window: Duration,
},
RestForOne {
max_restarts: u32,
window: Duration,
},
}
impl Default for RestartPolicy {
fn default() -> Self {
Self::OneForOne {
max_restarts: 3,
window: Duration::from_secs(60),
}
}
}
#[derive(Debug, Clone)]
pub struct ActorConfig {
pub name: String,
pub queue_capacity: u32,
pub restart_policy: RestartPolicy,
pub heartbeat_interval: Duration,
pub heartbeat_timeout: Duration,
pub initial_state: Option<Vec<u8>>,
}
impl Default for ActorConfig {
fn default() -> Self {
Self {
name: String::new(),
queue_capacity: 1024,
restart_policy: RestartPolicy::default(),
heartbeat_interval: Duration::from_millis(100),
heartbeat_timeout: Duration::from_millis(500),
initial_state: None,
}
}
}
impl ActorConfig {
pub fn named(name: impl Into<String>) -> Self {
Self {
name: name.into(),
..Default::default()
}
}
pub fn with_restart_policy(mut self, policy: RestartPolicy) -> Self {
self.restart_policy = policy;
self
}
pub fn with_heartbeat(mut self, interval: Duration, timeout: Duration) -> Self {
self.heartbeat_interval = interval;
self.heartbeat_timeout = timeout;
self
}
pub fn with_queue_capacity(mut self, capacity: u32) -> Self {
self.queue_capacity = capacity;
self
}
}
#[repr(C, align(64))]
#[derive(Debug, Clone, Copy)]
pub struct SupervisionEntry {
pub actor_id: u32,
pub state: u32,
pub parent_id: u32,
pub restart_count: u32,
pub last_heartbeat_ns: u64,
pub restart_window_start_ns: u64,
pub max_restarts: u32,
pub restart_window_ns: u64,
pub _pad: [u8; 8],
}
impl SupervisionEntry {
pub fn dormant(actor_id: u32) -> Self {
Self {
actor_id,
state: ActorState::Dormant as u32,
parent_id: 0,
restart_count: 0,
last_heartbeat_ns: 0,
restart_window_start_ns: 0,
max_restarts: 3,
restart_window_ns: 60_000_000_000, _pad: [0; 8],
}
}
pub fn is_available(&self) -> bool {
self.state == ActorState::Dormant as u32
}
pub fn actor_state(&self) -> ActorState {
ActorState::from_u32(self.state).unwrap_or(ActorState::Failed)
}
}
pub struct ActorSupervisor {
entries: Vec<SupervisionEntry>,
free_list: Vec<u32>,
capacity: u32,
active_count: u32,
}
impl ActorSupervisor {
pub fn new(grid_size: u32) -> Self {
let mut entries = Vec::with_capacity(grid_size as usize);
let mut free_list = Vec::with_capacity(grid_size as usize);
for i in 0..grid_size {
entries.push(SupervisionEntry::dormant(i));
if i > 0 {
free_list.push(i);
}
}
Self {
entries,
free_list,
capacity: grid_size - 1, active_count: 0,
}
}
pub fn create_actor(
&mut self,
config: &ActorConfig,
parent_id: Option<ActorId>,
) -> Result<ActorId, ActorError> {
let slot = self.free_list.pop().ok_or(ActorError::PoolExhausted {
capacity: self.capacity,
active: self.active_count,
})?;
let entry = &mut self.entries[slot as usize];
entry.state = ActorState::Initializing as u32;
entry.parent_id = parent_id.map(|p| p.0).unwrap_or(0);
entry.restart_count = 0;
entry.last_heartbeat_ns = 0;
match config.restart_policy {
RestartPolicy::OneForOne {
max_restarts,
window,
}
| RestartPolicy::OneForAll {
max_restarts,
window,
}
| RestartPolicy::RestForOne {
max_restarts,
window,
} => {
entry.max_restarts = max_restarts;
entry.restart_window_ns = window.as_nanos() as u64;
}
RestartPolicy::Permanent => {
entry.max_restarts = 0;
entry.restart_window_ns = 0;
}
}
self.active_count += 1;
Ok(ActorId(slot))
}
pub fn activate_actor(&mut self, id: ActorId) -> Result<(), ActorError> {
let entry = self
.entries
.get_mut(id.0 as usize)
.ok_or(ActorError::InvalidId(id))?;
if entry.state != ActorState::Initializing as u32 {
return Err(ActorError::InvalidStateTransition {
actor: id,
from: entry.actor_state(),
to: ActorState::Active,
});
}
entry.state = ActorState::Active as u32;
Ok(())
}
pub fn destroy_actor(&mut self, id: ActorId) -> Result<(), ActorError> {
let entry = self
.entries
.get_mut(id.0 as usize)
.ok_or(ActorError::InvalidId(id))?;
if entry.state == ActorState::Dormant as u32 {
return Err(ActorError::InvalidStateTransition {
actor: id,
from: ActorState::Dormant,
to: ActorState::Terminated,
});
}
entry.state = ActorState::Dormant as u32;
entry.parent_id = 0;
entry.restart_count = 0;
entry.last_heartbeat_ns = 0;
self.free_list.push(id.0);
self.active_count = self.active_count.saturating_sub(1);
Ok(())
}
pub fn restart_actor(
&mut self,
id: ActorId,
config: &ActorConfig,
) -> Result<ActorId, ActorError> {
let parent_id = {
let entry = self
.entries
.get(id.0 as usize)
.ok_or(ActorError::InvalidId(id))?;
if entry.restart_count >= entry.max_restarts && entry.max_restarts > 0 {
return Err(ActorError::MaxRestartsExceeded {
actor: id,
restarts: entry.restart_count,
max: entry.max_restarts,
});
}
if entry.parent_id > 0 {
Some(ActorId(entry.parent_id))
} else {
None
}
};
let restart_count = self.entries[id.0 as usize].restart_count;
self.destroy_actor(id)?;
let new_id = self.create_actor(config, parent_id)?;
self.entries[new_id.0 as usize].restart_count = restart_count + 1;
Ok(new_id)
}
pub fn heartbeat(&mut self, id: ActorId, timestamp_ns: u64) {
if let Some(entry) = self.entries.get_mut(id.0 as usize) {
entry.last_heartbeat_ns = timestamp_ns;
}
}
pub fn check_heartbeats(&self, now_ns: u64, timeout_ns: u64) -> Vec<ActorId> {
self.entries
.iter()
.filter(|e| {
e.actor_state().is_alive()
&& e.last_heartbeat_ns > 0
&& (now_ns - e.last_heartbeat_ns) > timeout_ns
})
.map(|e| ActorId(e.actor_id))
.collect()
}
pub fn children_of(&self, parent: ActorId) -> Vec<ActorId> {
self.entries
.iter()
.filter(|e| e.parent_id == parent.0 && e.actor_state().is_alive())
.map(|e| ActorId(e.actor_id))
.collect()
}
pub fn get(&self, id: ActorId) -> Option<&SupervisionEntry> {
self.entries.get(id.0 as usize)
}
pub fn active_count(&self) -> u32 {
self.active_count
}
pub fn available_count(&self) -> u32 {
self.free_list.len() as u32
}
pub fn capacity(&self) -> u32 {
self.capacity
}
pub fn entries(&self) -> &[SupervisionEntry] {
&self.entries
}
pub fn kill_tree(&mut self, root: ActorId) -> Vec<ActorId> {
let mut destroyed = Vec::new();
self.kill_tree_recursive(root, &mut destroyed);
destroyed
}
fn kill_tree_recursive(&mut self, id: ActorId, destroyed: &mut Vec<ActorId>) {
let children = self.children_of(id);
for child in children {
self.kill_tree_recursive(child, destroyed);
}
if self.destroy_actor(id).is_ok() {
destroyed.push(id);
}
}
pub fn handle_failure(
&mut self,
failed_id: ActorId,
config: &ActorConfig,
) -> Vec<SupervisionAction> {
let mut actions = Vec::new();
let (parent_id, policy) = {
let entry = match self.get(failed_id) {
Some(e) => e,
None => return actions,
};
let parent = if entry.parent_id > 0 {
Some(ActorId(entry.parent_id))
} else {
None
};
(parent, config.restart_policy)
};
if let Some(entry) = self.entries.get_mut(failed_id.0 as usize) {
entry.state = ActorState::Failed as u32;
}
actions.push(SupervisionAction::MarkedFailed(failed_id));
match policy {
RestartPolicy::Permanent => {
actions.push(SupervisionAction::Escalated {
failed: failed_id,
escalated_to: parent_id,
});
}
RestartPolicy::OneForOne { .. } => {
match self.restart_actor(failed_id, config) {
Ok(new_id) => {
actions.push(SupervisionAction::Restarted {
old_id: failed_id,
new_id,
});
}
Err(ActorError::MaxRestartsExceeded { .. }) => {
actions.push(SupervisionAction::Escalated {
failed: failed_id,
escalated_to: parent_id,
});
}
Err(_) => {
actions.push(SupervisionAction::Escalated {
failed: failed_id,
escalated_to: parent_id,
});
}
}
}
RestartPolicy::OneForAll { .. } => {
if let Some(parent) = parent_id {
let siblings = self.children_of(parent);
for sibling in siblings {
let _ = self.destroy_actor(sibling);
actions.push(SupervisionAction::DestroyedSibling(sibling));
}
actions.push(SupervisionAction::AllSiblingsDestroyed { parent });
}
}
RestartPolicy::RestForOne { .. } => {
if let Some(parent) = parent_id {
let siblings = self.children_of(parent);
let mut found = false;
for sibling in siblings {
if sibling == failed_id {
found = true;
}
if found {
let _ = self.destroy_actor(sibling);
actions.push(SupervisionAction::DestroyedSibling(sibling));
}
}
}
}
}
actions
}
pub fn depth(&self, id: ActorId) -> u32 {
let mut depth = 0;
let mut current = id;
while let Some(entry) = self.get(current) {
if entry.parent_id == 0 {
break;
}
current = ActorId(entry.parent_id);
depth += 1;
if depth > 100 {
break; }
}
depth
}
pub fn tree_view(&self) -> String {
let mut out = String::new();
out.push_str("Supervision Tree:\n");
let roots: Vec<ActorId> = self
.entries
.iter()
.filter(|e| e.parent_id == 0 && e.actor_state().is_alive())
.map(|e| ActorId(e.actor_id))
.collect();
for root in roots {
self.tree_view_recursive(root, &mut out, 0);
}
out
}
fn tree_view_recursive(&self, id: ActorId, out: &mut String, indent: usize) {
if let Some(entry) = self.get(id) {
let prefix = " ".repeat(indent);
let state = match entry.actor_state() {
ActorState::Active => "ACTIVE",
ActorState::Dormant => "DORMANT",
ActorState::Initializing => "INIT",
ActorState::Failed => "FAILED",
ActorState::Terminated => "TERM",
ActorState::Draining => "DRAIN",
};
out.push_str(&format!(
"{}[{}] actor:{} state={} restarts={} msgs={}\n",
prefix,
if indent == 0 { "R" } else { "C" },
id.0,
state,
entry.restart_count,
entry.last_heartbeat_ns
));
let children = self.children_of(id);
for child in children {
self.tree_view_recursive(child, out, indent + 1);
}
}
}
}
#[derive(Debug, Clone)]
pub enum SupervisionAction {
MarkedFailed(ActorId),
Restarted {
old_id: ActorId,
new_id: ActorId,
},
DestroyedSibling(ActorId),
AllSiblingsDestroyed {
parent: ActorId,
},
Escalated {
failed: ActorId,
escalated_to: Option<ActorId>,
},
}
#[derive(Debug, Clone)]
pub enum ActorError {
PoolExhausted {
capacity: u32,
active: u32,
},
InvalidId(ActorId),
InvalidStateTransition {
actor: ActorId,
from: ActorState,
to: ActorState,
},
MaxRestartsExceeded {
actor: ActorId,
restarts: u32,
max: u32,
},
}
impl fmt::Display for ActorError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::PoolExhausted { capacity, active } => write!(
f,
"Actor pool exhausted: {}/{} slots active",
active, capacity
),
Self::InvalidId(id) => write!(f, "Invalid actor ID: {}", id),
Self::InvalidStateTransition { actor, from, to } => write!(
f,
"Invalid state transition for {}: {:?} → {:?}",
actor, from, to
),
Self::MaxRestartsExceeded {
actor,
restarts,
max,
} => write!(
f,
"Actor {} exceeded max restarts: {}/{}",
actor, restarts, max
),
}
}
}
impl std::error::Error for ActorError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_actor_lifecycle_create_destroy() {
let mut supervisor = ActorSupervisor::new(8);
assert_eq!(supervisor.capacity(), 7);
assert_eq!(supervisor.active_count(), 0);
assert_eq!(supervisor.available_count(), 7);
let config = ActorConfig::named("test-actor");
let id = supervisor.create_actor(&config, None).unwrap();
assert_eq!(supervisor.active_count(), 1);
assert_eq!(supervisor.available_count(), 6);
supervisor.activate_actor(id).unwrap();
let entry = supervisor.get(id).unwrap();
assert_eq!(entry.actor_state(), ActorState::Active);
supervisor.destroy_actor(id).unwrap();
assert_eq!(supervisor.active_count(), 0);
assert_eq!(supervisor.available_count(), 7);
}
#[test]
fn test_actor_parent_child() {
let mut supervisor = ActorSupervisor::new(8);
let parent_config = ActorConfig::named("parent");
let parent = supervisor.create_actor(&parent_config, None).unwrap();
supervisor.activate_actor(parent).unwrap();
let child_config = ActorConfig::named("child");
let child1 = supervisor
.create_actor(&child_config, Some(parent))
.unwrap();
let child2 = supervisor
.create_actor(&child_config, Some(parent))
.unwrap();
supervisor.activate_actor(child1).unwrap();
supervisor.activate_actor(child2).unwrap();
let children = supervisor.children_of(parent);
assert_eq!(children.len(), 2);
assert!(children.contains(&child1));
assert!(children.contains(&child2));
}
#[test]
fn test_actor_restart() {
let mut supervisor = ActorSupervisor::new(8);
let config =
ActorConfig::named("restartable").with_restart_policy(RestartPolicy::OneForOne {
max_restarts: 3,
window: Duration::from_secs(60),
});
let id = supervisor.create_actor(&config, None).unwrap();
supervisor.activate_actor(id).unwrap();
for i in 0..3 {
let new_id = supervisor.restart_actor(id, &config).unwrap();
supervisor.activate_actor(new_id).unwrap();
let entry = supervisor.get(new_id).unwrap();
assert_eq!(entry.restart_count, i + 1);
}
}
#[test]
fn test_actor_max_restarts_exceeded() {
let mut supervisor = ActorSupervisor::new(8);
let config = ActorConfig::named("fragile").with_restart_policy(RestartPolicy::OneForOne {
max_restarts: 1,
window: Duration::from_secs(60),
});
let id = supervisor.create_actor(&config, None).unwrap();
supervisor.activate_actor(id).unwrap();
let new_id = supervisor.restart_actor(id, &config).unwrap();
supervisor.activate_actor(new_id).unwrap();
let result = supervisor.restart_actor(new_id, &config);
assert!(matches!(
result,
Err(ActorError::MaxRestartsExceeded { .. })
));
}
#[test]
fn test_pool_exhaustion() {
let mut supervisor = ActorSupervisor::new(4); let config = ActorConfig::named("actor");
for _ in 0..3 {
let id = supervisor.create_actor(&config, None).unwrap();
supervisor.activate_actor(id).unwrap();
}
let result = supervisor.create_actor(&config, None);
assert!(matches!(result, Err(ActorError::PoolExhausted { .. })));
}
#[test]
fn test_heartbeat_failure_detection() {
let mut supervisor = ActorSupervisor::new(8);
let config = ActorConfig::named("monitored");
let id1 = supervisor.create_actor(&config, None).unwrap();
let id2 = supervisor.create_actor(&config, None).unwrap();
supervisor.activate_actor(id1).unwrap();
supervisor.activate_actor(id2).unwrap();
supervisor.heartbeat(id1, 1_000_000); supervisor.heartbeat(id2, 1_000_000);
let timeout_ns = 500_000_000;
supervisor.heartbeat(id1, 400_000_000);
let failed = supervisor.check_heartbeats(600_000_000, timeout_ns);
assert_eq!(failed.len(), 1);
assert_eq!(failed[0], id2);
}
#[test]
fn test_supervision_entry_size() {
assert_eq!(
std::mem::size_of::<SupervisionEntry>(),
64,
"SupervisionEntry must be exactly 64 bytes for GPU cache efficiency"
);
}
#[test]
fn test_actor_state_roundtrip() {
for state in [
ActorState::Dormant,
ActorState::Initializing,
ActorState::Active,
ActorState::Draining,
ActorState::Terminated,
ActorState::Failed,
] {
let raw = state as u32;
let recovered = ActorState::from_u32(raw).unwrap();
assert_eq!(recovered, state);
}
}
#[test]
fn test_cascading_kill_tree() {
let mut sup = ActorSupervisor::new(16);
let config = ActorConfig::named("node");
let root = sup.create_actor(&config, None).unwrap();
sup.activate_actor(root).unwrap();
let child1 = sup.create_actor(&config, Some(root)).unwrap();
sup.activate_actor(child1).unwrap();
let child2 = sup.create_actor(&config, Some(root)).unwrap();
sup.activate_actor(child2).unwrap();
let grandchild1 = sup.create_actor(&config, Some(child1)).unwrap();
sup.activate_actor(grandchild1).unwrap();
assert_eq!(sup.active_count(), 4);
let destroyed = sup.kill_tree(root);
assert_eq!(destroyed.len(), 4);
assert_eq!(sup.active_count(), 0);
assert_eq!(sup.available_count(), 15); }
#[test]
fn test_handle_failure_one_for_one() {
let mut sup = ActorSupervisor::new(8);
let config = ActorConfig::named("worker").with_restart_policy(RestartPolicy::OneForOne {
max_restarts: 2,
window: Duration::from_secs(60),
});
let parent = sup
.create_actor(&ActorConfig::named("parent"), None)
.unwrap();
sup.activate_actor(parent).unwrap();
let child = sup.create_actor(&config, Some(parent)).unwrap();
sup.activate_actor(child).unwrap();
let actions = sup.handle_failure(child, &config);
assert!(actions
.iter()
.any(|a| matches!(a, SupervisionAction::Restarted { .. })));
}
#[test]
fn test_handle_failure_escalation() {
let mut sup = ActorSupervisor::new(8);
let config = ActorConfig::named("fragile").with_restart_policy(RestartPolicy::Permanent);
let parent = sup
.create_actor(&ActorConfig::named("parent"), None)
.unwrap();
sup.activate_actor(parent).unwrap();
let child = sup.create_actor(&config, Some(parent)).unwrap();
sup.activate_actor(child).unwrap();
let actions = sup.handle_failure(child, &config);
assert!(actions
.iter()
.any(|a| matches!(a, SupervisionAction::Escalated { .. })));
}
#[test]
fn test_tree_depth() {
let mut sup = ActorSupervisor::new(16);
let config = ActorConfig::named("node");
let root = sup.create_actor(&config, None).unwrap();
sup.activate_actor(root).unwrap();
assert_eq!(sup.depth(root), 0);
let child = sup.create_actor(&config, Some(root)).unwrap();
sup.activate_actor(child).unwrap();
assert_eq!(sup.depth(child), 1);
let grandchild = sup.create_actor(&config, Some(child)).unwrap();
sup.activate_actor(grandchild).unwrap();
assert_eq!(sup.depth(grandchild), 2);
}
#[test]
fn test_tree_view() {
let mut sup = ActorSupervisor::new(8);
let config = ActorConfig::named("actor");
let root = sup.create_actor(&config, None).unwrap();
sup.activate_actor(root).unwrap();
let child1 = sup.create_actor(&config, Some(root)).unwrap();
sup.activate_actor(child1).unwrap();
let child2 = sup.create_actor(&config, Some(root)).unwrap();
sup.activate_actor(child2).unwrap();
let view = sup.tree_view();
assert!(view.contains("ACTIVE"));
assert!(view.contains("actor:"));
}
}