Skip to main content

ringkernel_core/
actor.rs

1//! GPU Actor Lifecycle Model
2//!
3//! Implements the full actor model lifecycle on GPU hardware:
4//! - **Create**: Activate a dormant actor slot from the pool
5//! - **Destroy**: Deactivate an actor, return its slot to the pool
6//! - **Restart**: Destroy + reinitialize state + Create
7//! - **Supervise**: Parent-child tree with failure detection and restart policies
8//!
9//! # Architecture
10//!
11//! On a GPU, thread blocks are fixed at kernel launch time — you can't spawn
12//! new blocks dynamically. The solution is a **pool-based** design:
13//!
14//! ```text
15//! ┌─── Persistent Kernel (N blocks pre-allocated at launch) ──────────┐
16//! │                                                                    │
17//! │  Block 0: SUPERVISOR                                               │
18//! │  ├─ Actor registry (who is active, who is dormant)                 │
19//! │  ├─ Free list (available actor slots)                              │
20//! │  ├─ Supervision tree (parent-child relationships)                  │
21//! │  └─ Heartbeat monitor (detect actor failures)                      │
22//! │                                                                    │
23//! │  Block 1: ACTIVE actor "sensor-reader"                             │
24//! │  ├─ Processing messages from H2K queue                             │
25//! │  ├─ Created child: Block 3                                         │
26//! │  └─ Heartbeat: last_seen = 1.2ms ago                               │
27//! │                                                                    │
28//! │  Block 2: DORMANT (in free pool)                                   │
29//! │  └─ Zero cost when idle — just checks is_active flag               │
30//! │                                                                    │
31//! │  Block 3: ACTIVE actor "data-processor" (child of Block 1)         │
32//! │  ├─ Processing K2K messages from Block 1                           │
33//! │  └─ Heartbeat: last_seen = 0.5ms ago                               │
34//! │                                                                    │
35//! │  Block 4-N: DORMANT (in free pool)                                 │
36//! │                                                                    │
37//! │  "Create actor" = supervisor activates dormant block               │
38//! │  "Kill actor"   = supervisor deactivates, returns to free pool     │
39//! │  "Restart"      = destroy + reinit state + create                  │
40//! └────────────────────────────────────────────────────────────────────┘
41//! ```
42//!
43//! # Comparison with Erlang/Akka
44//!
45//! | Erlang/Akka | RingKernel GPU |
46//! |-------------|----------------|
47//! | `spawn(Fun)` | `supervisor.create_actor(config)` → activates dormant block |
48//! | `Pid ! Message` | K2K channel or H2K queue injection |
49//! | `exit(Pid, Reason)` | `supervisor.destroy_actor(id)` → deactivates block |
50//! | Supervisor tree | Parent-child tree in mapped memory |
51//! | `one_for_one` restart | Heartbeat timeout → destroy + create |
52//! | Process isolation | Separate control block + queue per block |
53
54use std::fmt;
55use std::time::Duration;
56
57/// Unique identifier for a GPU actor within a persistent kernel.
58///
59/// This maps to a thread block index in the persistent kernel grid.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61pub struct ActorId(pub u32);
62
63impl fmt::Display for ActorId {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        write!(f, "actor:{}", self.0)
66    }
67}
68
69/// State of a GPU actor slot.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71#[repr(u32)]
72pub enum ActorState {
73    /// Slot is unused and available for allocation.
74    Dormant = 0,
75    /// Actor is initializing (setting up state before becoming active).
76    Initializing = 1,
77    /// Actor is active and processing messages.
78    Active = 2,
79    /// Actor is draining its queue before shutdown.
80    Draining = 3,
81    /// Actor has terminated (waiting to be reclaimed by supervisor).
82    Terminated = 4,
83    /// Actor encountered a fatal error.
84    Failed = 5,
85}
86
87impl ActorState {
88    /// Create from raw u32 value (for reading from mapped memory).
89    pub fn from_u32(v: u32) -> Option<Self> {
90        match v {
91            0 => Some(Self::Dormant),
92            1 => Some(Self::Initializing),
93            2 => Some(Self::Active),
94            3 => Some(Self::Draining),
95            4 => Some(Self::Terminated),
96            5 => Some(Self::Failed),
97            _ => None,
98        }
99    }
100
101    /// Check if the actor is alive (initializing or active).
102    pub fn is_alive(self) -> bool {
103        matches!(self, Self::Initializing | Self::Active)
104    }
105}
106
107/// Restart policy for supervised actors.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum RestartPolicy {
110    /// Never restart — if the actor fails, it stays failed.
111    Permanent,
112    /// Restart on failure, up to `max_restarts` within `window`.
113    OneForOne {
114        /// Maximum restart attempts within the time window.
115        max_restarts: u32,
116        /// Time window for counting restarts.
117        window: Duration,
118    },
119    /// Restart the failed actor and all its siblings (children of the same parent).
120    OneForAll {
121        /// Maximum restart attempts within the time window.
122        max_restarts: u32,
123        /// Time window for counting restarts.
124        window: Duration,
125    },
126    /// Restart the failed actor and all actors that were started after it.
127    RestForOne {
128        /// Maximum restart attempts within the time window.
129        max_restarts: u32,
130        /// Time window for counting restarts.
131        window: Duration,
132    },
133}
134
135impl Default for RestartPolicy {
136    fn default() -> Self {
137        Self::OneForOne {
138            max_restarts: 3,
139            window: Duration::from_secs(60),
140        }
141    }
142}
143
144/// Configuration for creating a new GPU actor.
145#[derive(Debug, Clone)]
146pub struct ActorConfig {
147    /// Human-readable name for the actor.
148    pub name: String,
149    /// Message queue capacity for this actor.
150    pub queue_capacity: u32,
151    /// Restart policy if the actor fails.
152    pub restart_policy: RestartPolicy,
153    /// Heartbeat interval — supervisor checks liveness at this rate.
154    pub heartbeat_interval: Duration,
155    /// Heartbeat timeout — actor is considered failed if no heartbeat for this long.
156    pub heartbeat_timeout: Duration,
157    /// Initial state data to copy into the actor's shared memory.
158    pub initial_state: Option<Vec<u8>>,
159}
160
161impl Default for ActorConfig {
162    fn default() -> Self {
163        Self {
164            name: String::new(),
165            queue_capacity: 1024,
166            restart_policy: RestartPolicy::default(),
167            heartbeat_interval: Duration::from_millis(100),
168            heartbeat_timeout: Duration::from_millis(500),
169            initial_state: None,
170        }
171    }
172}
173
174impl ActorConfig {
175    /// Create a config with a name.
176    pub fn named(name: impl Into<String>) -> Self {
177        Self {
178            name: name.into(),
179            ..Default::default()
180        }
181    }
182
183    /// Set the restart policy.
184    pub fn with_restart_policy(mut self, policy: RestartPolicy) -> Self {
185        self.restart_policy = policy;
186        self
187    }
188
189    /// Set heartbeat parameters.
190    pub fn with_heartbeat(mut self, interval: Duration, timeout: Duration) -> Self {
191        self.heartbeat_interval = interval;
192        self.heartbeat_timeout = timeout;
193        self
194    }
195
196    /// Set the queue capacity.
197    pub fn with_queue_capacity(mut self, capacity: u32) -> Self {
198        self.queue_capacity = capacity;
199        self
200    }
201}
202
203/// Entry in the supervision tree.
204///
205/// Stored in mapped memory so both CPU and GPU can access it.
206/// Fixed 64-byte size for GPU cache efficiency.
207#[repr(C, align(64))]
208#[derive(Debug, Clone, Copy)]
209pub struct SupervisionEntry {
210    /// Actor ID (thread block index).
211    pub actor_id: u32,
212    /// Current state (see ActorState).
213    pub state: u32,
214    /// Parent actor ID (0 = root / no parent).
215    pub parent_id: u32,
216    /// Restart count within current window.
217    pub restart_count: u32,
218    /// Last heartbeat timestamp (nanoseconds since kernel start).
219    pub last_heartbeat_ns: u64,
220    /// Restart window start timestamp.
221    pub restart_window_start_ns: u64,
222    /// Maximum restarts allowed in window.
223    pub max_restarts: u32,
224    /// Restart window duration in nanoseconds.
225    pub restart_window_ns: u64,
226    /// Padding to 64 bytes.
227    pub _pad: [u8; 8],
228}
229
230impl SupervisionEntry {
231    /// Create a new dormant entry.
232    pub fn dormant(actor_id: u32) -> Self {
233        Self {
234            actor_id,
235            state: ActorState::Dormant as u32,
236            parent_id: 0,
237            restart_count: 0,
238            last_heartbeat_ns: 0,
239            restart_window_start_ns: 0,
240            max_restarts: 3,
241            restart_window_ns: 60_000_000_000, // 60 seconds
242            _pad: [0; 8],
243        }
244    }
245
246    /// Check if this entry is available for allocation.
247    pub fn is_available(&self) -> bool {
248        self.state == ActorState::Dormant as u32
249    }
250
251    /// Get the actor state.
252    pub fn actor_state(&self) -> ActorState {
253        ActorState::from_u32(self.state).unwrap_or(ActorState::Failed)
254    }
255}
256
257/// The supervisor manages the actor pool lifecycle.
258///
259/// This is the host-side component. The GPU-side supervisor runs as
260/// block 0 in the persistent kernel and mirrors this state.
261pub struct ActorSupervisor {
262    /// Supervision tree entries (one per thread block in the grid).
263    entries: Vec<SupervisionEntry>,
264    /// Free list (indices of dormant actors).
265    free_list: Vec<u32>,
266    /// Total actor slots (= grid size of persistent kernel).
267    capacity: u32,
268    /// Number of currently active actors.
269    active_count: u32,
270}
271
272impl ActorSupervisor {
273    /// Create a supervisor for a persistent kernel with `grid_size` blocks.
274    ///
275    /// Block 0 is reserved for the supervisor itself.
276    /// Blocks 1..grid_size are available as actor slots.
277    pub fn new(grid_size: u32) -> Self {
278        let mut entries = Vec::with_capacity(grid_size as usize);
279        let mut free_list = Vec::with_capacity(grid_size as usize);
280
281        for i in 0..grid_size {
282            entries.push(SupervisionEntry::dormant(i));
283            if i > 0 {
284                // Block 0 = supervisor, blocks 1+ = actor pool
285                free_list.push(i);
286            }
287        }
288
289        Self {
290            entries,
291            free_list,
292            capacity: grid_size - 1, // exclude supervisor block
293            active_count: 0,
294        }
295    }
296
297    /// Create a new actor, returning its ID.
298    ///
299    /// Allocates a dormant slot from the free pool, initializes it with
300    /// the given config, and sets its state to Initializing.
301    pub fn create_actor(
302        &mut self,
303        config: &ActorConfig,
304        parent_id: Option<ActorId>,
305    ) -> Result<ActorId, ActorError> {
306        let slot = self.free_list.pop().ok_or(ActorError::PoolExhausted {
307            capacity: self.capacity,
308            active: self.active_count,
309        })?;
310
311        let entry = &mut self.entries[slot as usize];
312        entry.state = ActorState::Initializing as u32;
313        entry.parent_id = parent_id.map(|p| p.0).unwrap_or(0);
314        entry.restart_count = 0;
315        entry.last_heartbeat_ns = 0;
316
317        match config.restart_policy {
318            RestartPolicy::OneForOne {
319                max_restarts,
320                window,
321            }
322            | RestartPolicy::OneForAll {
323                max_restarts,
324                window,
325            }
326            | RestartPolicy::RestForOne {
327                max_restarts,
328                window,
329            } => {
330                entry.max_restarts = max_restarts;
331                entry.restart_window_ns = window.as_nanos() as u64;
332            }
333            RestartPolicy::Permanent => {
334                entry.max_restarts = 0;
335                entry.restart_window_ns = 0;
336            }
337        }
338
339        self.active_count += 1;
340
341        Ok(ActorId(slot))
342    }
343
344    /// Activate an actor (transition from Initializing to Active).
345    pub fn activate_actor(&mut self, id: ActorId) -> Result<(), ActorError> {
346        let entry = self
347            .entries
348            .get_mut(id.0 as usize)
349            .ok_or(ActorError::InvalidId(id))?;
350
351        if entry.state != ActorState::Initializing as u32 {
352            return Err(ActorError::InvalidStateTransition {
353                actor: id,
354                from: entry.actor_state(),
355                to: ActorState::Active,
356            });
357        }
358
359        entry.state = ActorState::Active as u32;
360        Ok(())
361    }
362
363    /// Destroy an actor, returning its slot to the free pool.
364    pub fn destroy_actor(&mut self, id: ActorId) -> Result<(), ActorError> {
365        let entry = self
366            .entries
367            .get_mut(id.0 as usize)
368            .ok_or(ActorError::InvalidId(id))?;
369
370        if entry.state == ActorState::Dormant as u32 {
371            return Err(ActorError::InvalidStateTransition {
372                actor: id,
373                from: ActorState::Dormant,
374                to: ActorState::Terminated,
375            });
376        }
377
378        entry.state = ActorState::Dormant as u32;
379        entry.parent_id = 0;
380        entry.restart_count = 0;
381        entry.last_heartbeat_ns = 0;
382
383        self.free_list.push(id.0);
384        self.active_count = self.active_count.saturating_sub(1);
385
386        Ok(())
387    }
388
389    /// Restart an actor (destroy + re-create with same config).
390    ///
391    /// Returns the new ActorId (may be the same slot if available).
392    pub fn restart_actor(
393        &mut self,
394        id: ActorId,
395        config: &ActorConfig,
396    ) -> Result<ActorId, ActorError> {
397        let parent_id = {
398            let entry = self
399                .entries
400                .get(id.0 as usize)
401                .ok_or(ActorError::InvalidId(id))?;
402
403            // Check restart budget
404            if entry.restart_count >= entry.max_restarts && entry.max_restarts > 0 {
405                return Err(ActorError::MaxRestartsExceeded {
406                    actor: id,
407                    restarts: entry.restart_count,
408                    max: entry.max_restarts,
409                });
410            }
411
412            if entry.parent_id > 0 {
413                Some(ActorId(entry.parent_id))
414            } else {
415                None
416            }
417        };
418
419        // Increment restart count before destroy (to preserve it)
420        let restart_count = self.entries[id.0 as usize].restart_count;
421
422        self.destroy_actor(id)?;
423        let new_id = self.create_actor(config, parent_id)?;
424
425        // Restore restart count (incremented)
426        self.entries[new_id.0 as usize].restart_count = restart_count + 1;
427
428        Ok(new_id)
429    }
430
431    /// Record a heartbeat from an actor.
432    pub fn heartbeat(&mut self, id: ActorId, timestamp_ns: u64) {
433        if let Some(entry) = self.entries.get_mut(id.0 as usize) {
434            entry.last_heartbeat_ns = timestamp_ns;
435        }
436    }
437
438    /// Check for actors that have missed their heartbeat deadline.
439    ///
440    /// Returns a list of actor IDs that should be restarted.
441    pub fn check_heartbeats(&self, now_ns: u64, timeout_ns: u64) -> Vec<ActorId> {
442        self.entries
443            .iter()
444            .filter(|e| {
445                e.actor_state().is_alive()
446                    && e.last_heartbeat_ns > 0
447                    && (now_ns - e.last_heartbeat_ns) > timeout_ns
448            })
449            .map(|e| ActorId(e.actor_id))
450            .collect()
451    }
452
453    /// Get children of a given actor.
454    pub fn children_of(&self, parent: ActorId) -> Vec<ActorId> {
455        self.entries
456            .iter()
457            .filter(|e| e.parent_id == parent.0 && e.actor_state().is_alive())
458            .map(|e| ActorId(e.actor_id))
459            .collect()
460    }
461
462    /// Get the supervision entry for an actor.
463    pub fn get(&self, id: ActorId) -> Option<&SupervisionEntry> {
464        self.entries.get(id.0 as usize)
465    }
466
467    /// Number of currently active actors.
468    pub fn active_count(&self) -> u32 {
469        self.active_count
470    }
471
472    /// Number of available (dormant) actor slots.
473    pub fn available_count(&self) -> u32 {
474        self.free_list.len() as u32
475    }
476
477    /// Total capacity (excluding supervisor block).
478    pub fn capacity(&self) -> u32 {
479        self.capacity
480    }
481
482    /// Get all supervision entries as a slice (for copying to mapped memory).
483    pub fn entries(&self) -> &[SupervisionEntry] {
484        &self.entries
485    }
486
487    // === FR-001: Cascading Termination & Escalation ===
488
489    /// Cascading kill: destroy an actor and all its descendants.
490    ///
491    /// Walks the supervision tree depth-first, destroying children
492    /// before parents. Returns the list of destroyed actor IDs.
493    pub fn kill_tree(&mut self, root: ActorId) -> Vec<ActorId> {
494        let mut destroyed = Vec::new();
495        self.kill_tree_recursive(root, &mut destroyed);
496        destroyed
497    }
498
499    fn kill_tree_recursive(&mut self, id: ActorId, destroyed: &mut Vec<ActorId>) {
500        // First, recursively kill all children
501        let children = self.children_of(id);
502        for child in children {
503            self.kill_tree_recursive(child, destroyed);
504        }
505
506        // Then destroy this actor
507        if self.destroy_actor(id).is_ok() {
508            destroyed.push(id);
509        }
510    }
511
512    /// Handle a failed actor according to its parent's restart policy.
513    ///
514    /// Returns a list of actions taken (for logging/auditing).
515    pub fn handle_failure(
516        &mut self,
517        failed_id: ActorId,
518        config: &ActorConfig,
519    ) -> Vec<SupervisionAction> {
520        let mut actions = Vec::new();
521
522        let (parent_id, policy) = {
523            let entry = match self.get(failed_id) {
524                Some(e) => e,
525                None => return actions,
526            };
527            let parent = if entry.parent_id > 0 {
528                Some(ActorId(entry.parent_id))
529            } else {
530                None
531            };
532            (parent, config.restart_policy)
533        };
534
535        // Mark as failed
536        if let Some(entry) = self.entries.get_mut(failed_id.0 as usize) {
537            entry.state = ActorState::Failed as u32;
538        }
539        actions.push(SupervisionAction::MarkedFailed(failed_id));
540
541        match policy {
542            RestartPolicy::Permanent => {
543                // No restart — escalate to parent
544                actions.push(SupervisionAction::Escalated {
545                    failed: failed_id,
546                    escalated_to: parent_id,
547                });
548            }
549
550            RestartPolicy::OneForOne { .. } => {
551                // Restart only the failed actor
552                match self.restart_actor(failed_id, config) {
553                    Ok(new_id) => {
554                        actions.push(SupervisionAction::Restarted {
555                            old_id: failed_id,
556                            new_id,
557                        });
558                    }
559                    Err(ActorError::MaxRestartsExceeded { .. }) => {
560                        // Budget exhausted — escalate to parent
561                        actions.push(SupervisionAction::Escalated {
562                            failed: failed_id,
563                            escalated_to: parent_id,
564                        });
565                    }
566                    Err(_) => {
567                        actions.push(SupervisionAction::Escalated {
568                            failed: failed_id,
569                            escalated_to: parent_id,
570                        });
571                    }
572                }
573            }
574
575            RestartPolicy::OneForAll { .. } => {
576                // Restart the failed actor AND all its siblings
577                if let Some(parent) = parent_id {
578                    let siblings = self.children_of(parent);
579                    for sibling in siblings {
580                        let _ = self.destroy_actor(sibling);
581                        actions.push(SupervisionAction::DestroyedSibling(sibling));
582                    }
583                    // Re-create all (including the failed one)
584                    // Note: caller is responsible for re-creating with proper configs
585                    actions.push(SupervisionAction::AllSiblingsDestroyed { parent });
586                }
587            }
588
589            RestartPolicy::RestForOne { .. } => {
590                // Restart the failed actor and all actors started after it
591                if let Some(parent) = parent_id {
592                    let siblings = self.children_of(parent);
593                    let mut found = false;
594                    for sibling in siblings {
595                        if sibling == failed_id {
596                            found = true;
597                        }
598                        if found {
599                            let _ = self.destroy_actor(sibling);
600                            actions.push(SupervisionAction::DestroyedSibling(sibling));
601                        }
602                    }
603                }
604            }
605        }
606
607        actions
608    }
609
610    /// Get the depth of the supervision tree from root to the given actor.
611    pub fn depth(&self, id: ActorId) -> u32 {
612        let mut depth = 0;
613        let mut current = id;
614        while let Some(entry) = self.get(current) {
615            if entry.parent_id == 0 {
616                break;
617            }
618            current = ActorId(entry.parent_id);
619            depth += 1;
620            if depth > 100 {
621                break; // Safety: prevent infinite loop on corrupted tree
622            }
623        }
624        depth
625    }
626
627    /// Produce a textual visualization of the supervision tree.
628    pub fn tree_view(&self) -> String {
629        let mut out = String::new();
630        out.push_str("Supervision Tree:\n");
631
632        // Find root actors (no parent)
633        let roots: Vec<ActorId> = self
634            .entries
635            .iter()
636            .filter(|e| e.parent_id == 0 && e.actor_state().is_alive())
637            .map(|e| ActorId(e.actor_id))
638            .collect();
639
640        for root in roots {
641            self.tree_view_recursive(root, &mut out, 0);
642        }
643
644        out
645    }
646
647    fn tree_view_recursive(&self, id: ActorId, out: &mut String, indent: usize) {
648        if let Some(entry) = self.get(id) {
649            let prefix = "  ".repeat(indent);
650            let state = match entry.actor_state() {
651                ActorState::Active => "ACTIVE",
652                ActorState::Dormant => "DORMANT",
653                ActorState::Initializing => "INIT",
654                ActorState::Failed => "FAILED",
655                ActorState::Terminated => "TERM",
656                ActorState::Draining => "DRAIN",
657            };
658            out.push_str(&format!(
659                "{}[{}] actor:{} state={} restarts={} msgs={}\n",
660                prefix,
661                if indent == 0 { "R" } else { "C" },
662                id.0,
663                state,
664                entry.restart_count,
665                entry.last_heartbeat_ns
666            ));
667
668            let children = self.children_of(id);
669            for child in children {
670                self.tree_view_recursive(child, out, indent + 1);
671            }
672        }
673    }
674}
675
676/// Action taken by the supervisor in response to a failure.
677#[derive(Debug, Clone)]
678pub enum SupervisionAction {
679    /// Actor marked as failed.
680    MarkedFailed(ActorId),
681    /// Actor was restarted.
682    Restarted {
683        /// The original actor ID before restart.
684        old_id: ActorId,
685        /// The new actor ID after restart.
686        new_id: ActorId,
687    },
688    /// A sibling was destroyed (OneForAll/RestForOne).
689    DestroyedSibling(ActorId),
690    /// All siblings destroyed, parent needs to re-create them.
691    AllSiblingsDestroyed {
692        /// Parent actor that owns the destroyed siblings.
693        parent: ActorId,
694    },
695    /// Failure escalated to parent supervisor.
696    Escalated {
697        /// Actor that failed.
698        failed: ActorId,
699        /// Parent supervisor the failure was escalated to.
700        escalated_to: Option<ActorId>,
701    },
702}
703
704/// Errors from actor lifecycle operations.
705#[derive(Debug, Clone)]
706pub enum ActorError {
707    /// No available actor slots in the pool.
708    PoolExhausted {
709        /// Total pool capacity.
710        capacity: u32,
711        /// Number of currently active actors.
712        active: u32,
713    },
714    /// Invalid actor ID.
715    InvalidId(ActorId),
716    /// Invalid state transition.
717    InvalidStateTransition {
718        /// Actor that attempted the invalid transition.
719        actor: ActorId,
720        /// Current state of the actor.
721        from: ActorState,
722        /// Attempted target state.
723        to: ActorState,
724    },
725    /// Actor has exceeded its maximum restart budget.
726    MaxRestartsExceeded {
727        /// Actor that exceeded its restart budget.
728        actor: ActorId,
729        /// Number of restarts attempted.
730        restarts: u32,
731        /// Maximum allowed restarts.
732        max: u32,
733    },
734}
735
736impl fmt::Display for ActorError {
737    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
738        match self {
739            Self::PoolExhausted { capacity, active } => write!(
740                f,
741                "Actor pool exhausted: {}/{} slots active",
742                active, capacity
743            ),
744            Self::InvalidId(id) => write!(f, "Invalid actor ID: {}", id),
745            Self::InvalidStateTransition { actor, from, to } => write!(
746                f,
747                "Invalid state transition for {}: {:?} → {:?}",
748                actor, from, to
749            ),
750            Self::MaxRestartsExceeded {
751                actor,
752                restarts,
753                max,
754            } => write!(
755                f,
756                "Actor {} exceeded max restarts: {}/{}",
757                actor, restarts, max
758            ),
759        }
760    }
761}
762
763impl std::error::Error for ActorError {}
764
765#[cfg(test)]
766mod tests {
767    use super::*;
768
769    #[test]
770    fn test_actor_lifecycle_create_destroy() {
771        let mut supervisor = ActorSupervisor::new(8); // 7 actor slots + 1 supervisor
772
773        assert_eq!(supervisor.capacity(), 7);
774        assert_eq!(supervisor.active_count(), 0);
775        assert_eq!(supervisor.available_count(), 7);
776
777        // Create an actor
778        let config = ActorConfig::named("test-actor");
779        let id = supervisor.create_actor(&config, None).unwrap();
780        assert_eq!(supervisor.active_count(), 1);
781        assert_eq!(supervisor.available_count(), 6);
782
783        // Activate it
784        supervisor.activate_actor(id).unwrap();
785        let entry = supervisor.get(id).unwrap();
786        assert_eq!(entry.actor_state(), ActorState::Active);
787
788        // Destroy it
789        supervisor.destroy_actor(id).unwrap();
790        assert_eq!(supervisor.active_count(), 0);
791        assert_eq!(supervisor.available_count(), 7);
792    }
793
794    #[test]
795    fn test_actor_parent_child() {
796        let mut supervisor = ActorSupervisor::new(8);
797
798        let parent_config = ActorConfig::named("parent");
799        let parent = supervisor.create_actor(&parent_config, None).unwrap();
800        supervisor.activate_actor(parent).unwrap();
801
802        // Create children
803        let child_config = ActorConfig::named("child");
804        let child1 = supervisor
805            .create_actor(&child_config, Some(parent))
806            .unwrap();
807        let child2 = supervisor
808            .create_actor(&child_config, Some(parent))
809            .unwrap();
810        supervisor.activate_actor(child1).unwrap();
811        supervisor.activate_actor(child2).unwrap();
812
813        let children = supervisor.children_of(parent);
814        assert_eq!(children.len(), 2);
815        assert!(children.contains(&child1));
816        assert!(children.contains(&child2));
817    }
818
819    #[test]
820    fn test_actor_restart() {
821        let mut supervisor = ActorSupervisor::new(8);
822        let config =
823            ActorConfig::named("restartable").with_restart_policy(RestartPolicy::OneForOne {
824                max_restarts: 3,
825                window: Duration::from_secs(60),
826            });
827
828        let id = supervisor.create_actor(&config, None).unwrap();
829        supervisor.activate_actor(id).unwrap();
830
831        // Restart 3 times (should succeed)
832        for i in 0..3 {
833            let new_id = supervisor.restart_actor(id, &config).unwrap();
834            supervisor.activate_actor(new_id).unwrap();
835            let entry = supervisor.get(new_id).unwrap();
836            assert_eq!(entry.restart_count, i + 1);
837        }
838    }
839
840    #[test]
841    fn test_actor_max_restarts_exceeded() {
842        let mut supervisor = ActorSupervisor::new(8);
843        let config = ActorConfig::named("fragile").with_restart_policy(RestartPolicy::OneForOne {
844            max_restarts: 1,
845            window: Duration::from_secs(60),
846        });
847
848        let id = supervisor.create_actor(&config, None).unwrap();
849        supervisor.activate_actor(id).unwrap();
850
851        // First restart: OK
852        let new_id = supervisor.restart_actor(id, &config).unwrap();
853        supervisor.activate_actor(new_id).unwrap();
854
855        // Second restart: should fail
856        let result = supervisor.restart_actor(new_id, &config);
857        assert!(matches!(
858            result,
859            Err(ActorError::MaxRestartsExceeded { .. })
860        ));
861    }
862
863    #[test]
864    fn test_pool_exhaustion() {
865        let mut supervisor = ActorSupervisor::new(4); // 3 actor slots
866        let config = ActorConfig::named("actor");
867
868        // Fill the pool
869        for _ in 0..3 {
870            let id = supervisor.create_actor(&config, None).unwrap();
871            supervisor.activate_actor(id).unwrap();
872        }
873
874        // Next create should fail
875        let result = supervisor.create_actor(&config, None);
876        assert!(matches!(result, Err(ActorError::PoolExhausted { .. })));
877    }
878
879    #[test]
880    fn test_heartbeat_failure_detection() {
881        let mut supervisor = ActorSupervisor::new(8);
882        let config = ActorConfig::named("monitored");
883
884        let id1 = supervisor.create_actor(&config, None).unwrap();
885        let id2 = supervisor.create_actor(&config, None).unwrap();
886        supervisor.activate_actor(id1).unwrap();
887        supervisor.activate_actor(id2).unwrap();
888
889        // Actor 1 sends heartbeat, actor 2 doesn't
890        supervisor.heartbeat(id1, 1_000_000); // 1ms
891        supervisor.heartbeat(id2, 1_000_000); // 1ms
892
893        // Check at 600ms — both should be fine (timeout is 500ms but we'll set a custom one)
894        let timeout_ns = 500_000_000; // 500ms
895
896        // Advance time: actor 1 sends again at 400ms, actor 2 is silent
897        supervisor.heartbeat(id1, 400_000_000);
898
899        // Check at 600ms — actor 2 should be timed out
900        let failed = supervisor.check_heartbeats(600_000_000, timeout_ns);
901        assert_eq!(failed.len(), 1);
902        assert_eq!(failed[0], id2);
903    }
904
905    #[test]
906    fn test_supervision_entry_size() {
907        assert_eq!(
908            std::mem::size_of::<SupervisionEntry>(),
909            64,
910            "SupervisionEntry must be exactly 64 bytes for GPU cache efficiency"
911        );
912    }
913
914    #[test]
915    fn test_actor_state_roundtrip() {
916        for state in [
917            ActorState::Dormant,
918            ActorState::Initializing,
919            ActorState::Active,
920            ActorState::Draining,
921            ActorState::Terminated,
922            ActorState::Failed,
923        ] {
924            let raw = state as u32;
925            let recovered = ActorState::from_u32(raw).unwrap();
926            assert_eq!(recovered, state);
927        }
928    }
929
930    // === FR-001: Cascading termination & escalation tests ===
931
932    #[test]
933    fn test_cascading_kill_tree() {
934        let mut sup = ActorSupervisor::new(16);
935        let config = ActorConfig::named("node");
936
937        // Build a tree: root → child1, child2; child1 → grandchild1
938        let root = sup.create_actor(&config, None).unwrap();
939        sup.activate_actor(root).unwrap();
940
941        let child1 = sup.create_actor(&config, Some(root)).unwrap();
942        sup.activate_actor(child1).unwrap();
943
944        let child2 = sup.create_actor(&config, Some(root)).unwrap();
945        sup.activate_actor(child2).unwrap();
946
947        let grandchild1 = sup.create_actor(&config, Some(child1)).unwrap();
948        sup.activate_actor(grandchild1).unwrap();
949
950        assert_eq!(sup.active_count(), 4);
951
952        // Kill the root — should cascade to all descendants
953        let destroyed = sup.kill_tree(root);
954        assert_eq!(destroyed.len(), 4);
955        assert_eq!(sup.active_count(), 0);
956        assert_eq!(sup.available_count(), 15); // All back in pool
957    }
958
959    #[test]
960    fn test_handle_failure_one_for_one() {
961        let mut sup = ActorSupervisor::new(8);
962        let config = ActorConfig::named("worker").with_restart_policy(RestartPolicy::OneForOne {
963            max_restarts: 2,
964            window: Duration::from_secs(60),
965        });
966
967        let parent = sup
968            .create_actor(&ActorConfig::named("parent"), None)
969            .unwrap();
970        sup.activate_actor(parent).unwrap();
971
972        let child = sup.create_actor(&config, Some(parent)).unwrap();
973        sup.activate_actor(child).unwrap();
974
975        // Simulate failure
976        let actions = sup.handle_failure(child, &config);
977        assert!(actions
978            .iter()
979            .any(|a| matches!(a, SupervisionAction::Restarted { .. })));
980    }
981
982    #[test]
983    fn test_handle_failure_escalation() {
984        let mut sup = ActorSupervisor::new(8);
985        let config = ActorConfig::named("fragile").with_restart_policy(RestartPolicy::Permanent);
986
987        let parent = sup
988            .create_actor(&ActorConfig::named("parent"), None)
989            .unwrap();
990        sup.activate_actor(parent).unwrap();
991
992        let child = sup.create_actor(&config, Some(parent)).unwrap();
993        sup.activate_actor(child).unwrap();
994
995        // Permanent policy → failure escalates to parent
996        let actions = sup.handle_failure(child, &config);
997        assert!(actions
998            .iter()
999            .any(|a| matches!(a, SupervisionAction::Escalated { .. })));
1000    }
1001
1002    #[test]
1003    fn test_tree_depth() {
1004        let mut sup = ActorSupervisor::new(16);
1005        let config = ActorConfig::named("node");
1006
1007        let root = sup.create_actor(&config, None).unwrap();
1008        sup.activate_actor(root).unwrap();
1009        assert_eq!(sup.depth(root), 0);
1010
1011        let child = sup.create_actor(&config, Some(root)).unwrap();
1012        sup.activate_actor(child).unwrap();
1013        assert_eq!(sup.depth(child), 1);
1014
1015        let grandchild = sup.create_actor(&config, Some(child)).unwrap();
1016        sup.activate_actor(grandchild).unwrap();
1017        assert_eq!(sup.depth(grandchild), 2);
1018    }
1019
1020    #[test]
1021    fn test_tree_view() {
1022        let mut sup = ActorSupervisor::new(8);
1023        let config = ActorConfig::named("actor");
1024
1025        let root = sup.create_actor(&config, None).unwrap();
1026        sup.activate_actor(root).unwrap();
1027
1028        let child1 = sup.create_actor(&config, Some(root)).unwrap();
1029        sup.activate_actor(child1).unwrap();
1030
1031        let child2 = sup.create_actor(&config, Some(root)).unwrap();
1032        sup.activate_actor(child2).unwrap();
1033
1034        let view = sup.tree_view();
1035        assert!(view.contains("ACTIVE"));
1036        assert!(view.contains("actor:"));
1037    }
1038}