Skip to main content

dactor/
system_actors.rs

1//! System actors for remote operations.
2//!
3//! When a node needs to perform a remote operation (spawn an actor, watch a
4//! remote actor, cancel a remote operation), it sends a message to a
5//! **system actor** on the target node. System actors are automatically
6//! started by the runtime and handle these requests.
7//!
8//! ## System Actor Types
9//!
10//! - [`SpawnManager`] — handles remote actor spawn requests
11//! - [`WatchManager`] — handles remote watch/unwatch subscriptions
12//! - [`CancelManager`] — handles remote cancellation requests
13//! - [`NodeDirectory`] — maps NodeId → connection metadata
14//!
15//! ## System Message Type Constants
16//!
17//! Well-known [`WireEnvelope::message_type`](crate::remote::WireEnvelope::message_type)
18//! values used by the transport router to dispatch incoming envelopes to the
19//! correct system actor mailbox. See [`system_router`](crate::system_router).
20
21use std::collections::{HashMap, HashSet};
22
23use crate::node::{ActorId, NodeId};
24use crate::remote::SerializationError;
25
26// ---------------------------------------------------------------------------
27// System message type constants (for WireEnvelope.message_type matching)
28// ---------------------------------------------------------------------------
29//
30// ⚠️  WIRE PROTOCOL — DO NOT CHANGE THESE VALUES ⚠️
31//
32// These strings are **wire protocol identifiers** sent between nodes.
33// They are NOT Rust type paths, even though they happen to resemble them.
34// Changing any value will break compatibility with remote nodes running
35// older code.  If you rename or refactor the corresponding Rust struct,
36// the constant value here MUST stay the same.
37//
38// A `wire_protocol_constants_are_stable` unit test enforces this.
39// ---------------------------------------------------------------------------
40
41/// Wire protocol identifier for [`SpawnRequest`] messages.
42///
43/// **Do not change** — this is a frozen wire protocol value, not a Rust path.
44pub const SYSTEM_MSG_TYPE_SPAWN: &str = "dactor::system_actors::SpawnRequest";
45
46/// Wire protocol identifier for [`WatchRequest`] messages.
47///
48/// **Do not change** — this is a frozen wire protocol value, not a Rust path.
49pub const SYSTEM_MSG_TYPE_WATCH: &str = "dactor::system_actors::WatchRequest";
50
51/// Wire protocol identifier for [`UnwatchRequest`] messages.
52///
53/// **Do not change** — this is a frozen wire protocol value, not a Rust path.
54pub const SYSTEM_MSG_TYPE_UNWATCH: &str = "dactor::system_actors::UnwatchRequest";
55
56/// Wire protocol identifier for [`CancelRequest`] messages.
57///
58/// **Do not change** — this is a frozen wire protocol value, not a Rust path.
59pub const SYSTEM_MSG_TYPE_CANCEL: &str = "dactor::system_actors::CancelRequest";
60
61/// Wire protocol identifier for peer connect messages routed to [`NodeDirectory`].
62///
63/// **Do not change** — this is a frozen wire protocol value, not a Rust path.
64pub const SYSTEM_MSG_TYPE_CONNECT_PEER: &str = "dactor::system_actors::ConnectPeer";
65
66/// Wire protocol identifier for peer disconnect messages routed to [`NodeDirectory`].
67///
68/// **Do not change** — this is a frozen wire protocol value, not a Rust path.
69pub const SYSTEM_MSG_TYPE_DISCONNECT_PEER: &str = "dactor::system_actors::DisconnectPeer";
70
71/// Returns `true` if `message_type` is a well-known system message type.
72pub fn is_system_message_type(message_type: &str) -> bool {
73    matches!(
74        message_type,
75        SYSTEM_MSG_TYPE_SPAWN
76            | SYSTEM_MSG_TYPE_WATCH
77            | SYSTEM_MSG_TYPE_UNWATCH
78            | SYSTEM_MSG_TYPE_CANCEL
79            | SYSTEM_MSG_TYPE_CONNECT_PEER
80            | SYSTEM_MSG_TYPE_DISCONNECT_PEER
81    )
82}
83
84// ---------------------------------------------------------------------------
85// SpawnManager
86// ---------------------------------------------------------------------------
87
88/// Message requesting a remote actor spawn.
89#[derive(Debug, Clone)]
90#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
91pub struct SpawnRequest {
92    /// Fully-qualified Rust type name of the actor (e.g. "myapp::Counter").
93    pub type_name: String,
94    /// Serialized actor `Args` (construction arguments).
95    pub args_bytes: Vec<u8>,
96    /// Name for the spawned actor.
97    pub name: String,
98    /// Request ID for correlating the response.
99    pub request_id: String,
100}
101
102/// Response to a spawn request.
103#[derive(Debug, Clone)]
104#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
105pub enum SpawnResponse {
106    /// Actor spawned successfully. Contains the assigned ActorId.
107    Success {
108        /// Request ID from the original SpawnRequest.
109        request_id: String,
110        /// ActorId assigned to the newly spawned actor.
111        actor_id: ActorId,
112    },
113    /// Spawn failed.
114    Failure {
115        /// Request ID from the original SpawnRequest.
116        request_id: String,
117        /// Error description.
118        error: String,
119    },
120}
121
122/// Manages remote actor spawn requests on a node.
123///
124/// When a remote node wants to spawn an actor on this node, it sends a
125/// [`SpawnRequest`] to the SpawnManager. The SpawnManager looks up the
126/// actor type in the [`TypeRegistry`](crate::type_registry::TypeRegistry),
127/// deserializes the Args, creates the actor, and returns a [`SpawnResponse`].
128pub struct SpawnManager {
129    /// Type registry for looking up actor factories.
130    type_registry: crate::type_registry::TypeRegistry,
131    /// Actors spawned via remote requests.
132    spawned: Vec<ActorId>,
133}
134
135impl SpawnManager {
136    /// Create a new SpawnManager with the given type registry.
137    pub fn new(type_registry: crate::type_registry::TypeRegistry) -> Self {
138        Self {
139            type_registry,
140            spawned: Vec::new(),
141        }
142    }
143
144    /// Process a spawn request.
145    ///
146    /// Looks up the actor type in the registry, deserializes Args from bytes,
147    /// and returns the constructed actor as `Box<dyn Any + Send>`. The caller
148    /// (runtime) is responsible for actually spawning the actor and assigning
149    /// an ActorId.
150    pub fn create_actor(
151        &self,
152        request: &SpawnRequest,
153    ) -> Result<Box<dyn std::any::Any + Send>, SerializationError> {
154        self.type_registry
155            .create_actor(&request.type_name, &request.args_bytes)
156    }
157
158    /// Record that an actor was spawned via remote request.
159    pub fn record_spawn(&mut self, id: ActorId) {
160        self.spawned.push(id);
161    }
162
163    /// List all actors spawned via remote requests.
164    pub fn spawned_actors(&self) -> &[ActorId] {
165        &self.spawned
166    }
167
168    /// Access the type registry.
169    pub fn type_registry(&self) -> &crate::type_registry::TypeRegistry {
170        &self.type_registry
171    }
172
173    /// Access the type registry mutably (for registering new factories).
174    pub fn type_registry_mut(&mut self) -> &mut crate::type_registry::TypeRegistry {
175        &mut self.type_registry
176    }
177}
178
179// ---------------------------------------------------------------------------
180// WatchManager
181// ---------------------------------------------------------------------------
182
183/// Request to watch a remote actor for termination.
184#[derive(Debug, Clone)]
185#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
186pub struct WatchRequest {
187    /// The actor to watch.
188    pub target: ActorId,
189    /// The watcher (on the requesting node) to notify on termination.
190    pub watcher: ActorId,
191}
192
193/// Request to stop watching a remote actor.
194#[derive(Debug, Clone)]
195#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
196pub struct UnwatchRequest {
197    /// The actor being watched.
198    pub target: ActorId,
199    /// The watcher to remove.
200    pub watcher: ActorId,
201}
202
203/// Notification that a watched actor has terminated.
204#[derive(Debug, Clone)]
205#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
206pub struct WatchNotification {
207    /// The actor that terminated.
208    pub terminated: ActorId,
209    /// The watcher that should be notified.
210    pub watcher: ActorId,
211}
212
213/// Manages remote watch/unwatch subscriptions.
214///
215/// Tracks which remote actors are watching which local actors. When a
216/// local actor terminates, the WatchManager sends a [`WatchNotification`]
217/// to the watcher's node.
218pub struct WatchManager {
219    /// target ActorId → set of watcher ActorIds (watchers from remote nodes).
220    watchers: HashMap<ActorId, HashSet<ActorId>>,
221}
222
223impl WatchManager {
224    /// Create a new empty WatchManager.
225    pub fn new() -> Self {
226        Self {
227            watchers: HashMap::new(),
228        }
229    }
230
231    /// Register a watch: `watcher` wants to know when `target` terminates.
232    pub fn watch(&mut self, target: ActorId, watcher: ActorId) {
233        self.watchers.entry(target).or_default().insert(watcher);
234    }
235
236    /// Remove a watch subscription.
237    pub fn unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
238        if let Some(set) = self.watchers.get_mut(target) {
239            set.remove(watcher);
240            if set.is_empty() {
241                self.watchers.remove(target);
242            }
243        }
244    }
245
246    /// Called when a local actor terminates. Returns notifications for all
247    /// remote watchers of this actor.
248    pub fn on_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
249        self.watchers
250            .remove(terminated)
251            .unwrap_or_default()
252            .into_iter()
253            .map(|watcher| WatchNotification {
254                terminated: terminated.clone(),
255                watcher,
256            })
257            .collect()
258    }
259
260    /// Get all watchers for a given actor.
261    pub fn watchers_of(&self, target: &ActorId) -> Vec<ActorId> {
262        self.watchers
263            .get(target)
264            .map(|s| s.iter().cloned().collect())
265            .unwrap_or_default()
266    }
267
268    /// Number of watched actors.
269    pub fn watched_count(&self) -> usize {
270        self.watchers.len()
271    }
272}
273
274impl Default for WatchManager {
275    fn default() -> Self {
276        Self::new()
277    }
278}
279
280// ---------------------------------------------------------------------------
281// CancelManager
282// ---------------------------------------------------------------------------
283
284/// Request to cancel a remote operation.
285#[derive(Debug, Clone)]
286#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
287pub struct CancelRequest {
288    /// The actor whose operation should be cancelled.
289    pub target: ActorId,
290    /// The request ID to cancel (for ask/stream/feed correlation).
291    pub request_id: Option<String>,
292}
293
294/// Acknowledgement of a cancel request.
295#[derive(Debug, Clone)]
296#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
297pub enum CancelResponse {
298    /// Cancellation was delivered to the target.
299    Acknowledged,
300    /// The target actor or request was not found.
301    NotFound {
302        /// Description of what was not found.
303        reason: String,
304    },
305}
306
307/// Manages remote cancellation requests.
308///
309/// Tracks active cancellation tokens and delivers cancel signals to local
310/// actors when requested by remote nodes.
311pub struct CancelManager {
312    /// Active cancellation tokens: request_id → CancellationToken.
313    tokens: HashMap<String, tokio_util::sync::CancellationToken>,
314}
315
316impl CancelManager {
317    /// Create a new empty CancelManager.
318    pub fn new() -> Self {
319        Self {
320            tokens: HashMap::new(),
321        }
322    }
323
324    /// Register a cancellation token for a request.
325    pub fn register(&mut self, request_id: String, token: tokio_util::sync::CancellationToken) {
326        self.tokens.insert(request_id, token);
327    }
328
329    /// Process a cancel request. Cancels the token if found.
330    pub fn cancel(&mut self, request_id: &str) -> CancelResponse {
331        if let Some(token) = self.tokens.remove(request_id) {
332            token.cancel();
333            CancelResponse::Acknowledged
334        } else {
335            CancelResponse::NotFound {
336                reason: format!("no active request with id '{request_id}'"),
337            }
338        }
339    }
340
341    /// Remove a token after the operation completes (cleanup).
342    pub fn remove(&mut self, request_id: &str) {
343        self.tokens.remove(request_id);
344    }
345
346    /// Number of active cancellable operations.
347    pub fn active_count(&self) -> usize {
348        self.tokens.len()
349    }
350}
351
352impl Default for CancelManager {
353    fn default() -> Self {
354        Self::new()
355    }
356}
357
358// ---------------------------------------------------------------------------
359// NodeDirectory
360// ---------------------------------------------------------------------------
361
362/// Connection status of a peer node.
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
365pub enum PeerStatus {
366    /// Connection is being established.
367    Connecting,
368    /// Node is connected and reachable.
369    Connected,
370    /// Node is suspected unreachable (health check failed).
371    Unreachable,
372    /// Node has been disconnected (gracefully or due to failure).
373    Disconnected,
374}
375
376/// Metadata about a peer node.
377#[derive(Debug, Clone)]
378pub struct PeerInfo {
379    /// The peer's node ID.
380    pub node_id: NodeId,
381    /// Current connection status.
382    pub status: PeerStatus,
383    /// Optional endpoint address (e.g. "10.0.0.1:4697").
384    pub address: Option<String>,
385}
386
387/// Maps [`NodeId`]s to peer connection information.
388///
389/// The NodeDirectory is the runtime's view of the cluster topology. It
390/// tracks which nodes are known, their connection status, and optional
391/// endpoint addresses.
392pub struct NodeDirectory {
393    peers: HashMap<NodeId, PeerInfo>,
394}
395
396impl NodeDirectory {
397    /// Create a new empty directory.
398    pub fn new() -> Self {
399        Self {
400            peers: HashMap::new(),
401        }
402    }
403
404    /// Register a peer node.
405    pub fn add_peer(&mut self, node_id: NodeId, address: Option<String>) {
406        self.peers.insert(
407            node_id.clone(),
408            PeerInfo {
409                node_id,
410                status: PeerStatus::Connecting,
411                address,
412            },
413        );
414    }
415
416    /// Update the status of a peer.
417    pub fn set_status(&mut self, node_id: &NodeId, status: PeerStatus) {
418        if let Some(info) = self.peers.get_mut(node_id) {
419            info.status = status;
420        }
421    }
422
423    /// Remove a peer from the directory.
424    pub fn remove_peer(&mut self, node_id: &NodeId) {
425        self.peers.remove(node_id);
426    }
427
428    /// Look up a peer by node ID.
429    pub fn get_peer(&self, node_id: &NodeId) -> Option<&PeerInfo> {
430        self.peers.get(node_id)
431    }
432
433    /// Get all known peer node IDs.
434    pub fn peer_nodes(&self) -> Vec<NodeId> {
435        self.peers.keys().cloned().collect()
436    }
437
438    /// Get all peers with a specific status.
439    pub fn peers_with_status(&self, status: PeerStatus) -> Vec<&PeerInfo> {
440        self.peers.values().filter(|p| p.status == status).collect()
441    }
442
443    /// Number of known peers.
444    pub fn peer_count(&self) -> usize {
445        self.peers.len()
446    }
447
448    /// Number of connected peers.
449    pub fn connected_count(&self) -> usize {
450        self.peers
451            .values()
452            .filter(|p| p.status == PeerStatus::Connected)
453            .count()
454    }
455
456    /// Check if a node is known and connected.
457    pub fn is_connected(&self, node_id: &NodeId) -> bool {
458        self.peers
459            .get(node_id)
460            .map(|p| p.status == PeerStatus::Connected)
461            .unwrap_or(false)
462    }
463}
464
465impl Default for NodeDirectory {
466    fn default() -> Self {
467        Self::new()
468    }
469}
470
471// ---------------------------------------------------------------------------
472// Handshake types (version compatibility)
473// ---------------------------------------------------------------------------
474
475/// Information sent by a node when initiating a version handshake.
476///
477/// Both nodes exchange a `HandshakeRequest` during connection setup.
478/// The receiving node compares the request against its own configuration
479/// and returns a [`HandshakeResponse`].
480#[derive(Debug, Clone, PartialEq, Eq)]
481pub struct HandshakeRequest {
482    /// The node's identity.
483    pub node_id: NodeId,
484    /// The wire protocol version this node speaks.
485    pub wire_version: crate::version::WireVersion,
486    /// The application version, if configured. Informational only — does
487    /// not affect compatibility.
488    pub app_version: Option<String>,
489    /// The actor framework adapter (e.g. "ractor", "kameo", "coerce").
490    pub adapter: String,
491}
492
493impl HandshakeRequest {
494    /// Build a [`HandshakeRequest`] from runtime configuration.
495    ///
496    /// Uses [`DACTOR_WIRE_VERSION`](crate::version::DACTOR_WIRE_VERSION) as
497    /// the wire version. This is the canonical way for adapters to construct
498    /// a handshake request — avoids duplicating version/field logic across
499    /// adapter crates.
500    pub fn from_runtime(
501        node_id: NodeId,
502        app_version: Option<String>,
503        adapter: impl Into<String>,
504    ) -> Self {
505        Self {
506            node_id,
507            wire_version: crate::version::WireVersion::parse(
508                crate::version::DACTOR_WIRE_VERSION,
509            )
510            .expect("DACTOR_WIRE_VERSION must be valid"),
511            app_version,
512            adapter: adapter.into(),
513        }
514    }
515}
516
517/// The result of a version handshake.
518///
519/// Returned by the remote node after comparing its own configuration
520/// against the incoming [`HandshakeRequest`].
521#[derive(Debug, Clone, PartialEq, Eq)]
522pub enum HandshakeResponse {
523    /// The remote node accepted the handshake. The cluster can proceed.
524    Accepted {
525        /// The remote node's identity.
526        node_id: NodeId,
527        /// The remote node's wire protocol version.
528        wire_version: crate::version::WireVersion,
529        /// The remote node's application version, if configured.
530        app_version: Option<String>,
531        /// The remote node's adapter name.
532        adapter: String,
533    },
534    /// The remote node rejected the handshake.
535    Rejected {
536        /// The remote node's identity.
537        node_id: NodeId,
538        /// The remote node's wire protocol version.
539        wire_version: crate::version::WireVersion,
540        /// Why the handshake was rejected.
541        reason: RejectionReason,
542        /// Human-readable detail message.
543        detail: String,
544    },
545}
546
547/// Reason a version handshake was rejected.
548#[derive(Debug, Clone, Copy, PartialEq, Eq)]
549pub enum RejectionReason {
550    /// The remote node's wire protocol MAJOR version differs. Nodes cannot
551    /// form a cluster (Category 1 — infrastructure-level change).
552    IncompatibleProtocol,
553    /// The remote node uses a different actor framework adapter. Mixed-adapter
554    /// clusters are rejected by default.
555    IncompatibleAdapter,
556}
557
558impl std::fmt::Display for RejectionReason {
559    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560        match self {
561            RejectionReason::IncompatibleProtocol => write!(f, "incompatible wire protocol"),
562            RejectionReason::IncompatibleAdapter => write!(f, "incompatible adapter"),
563        }
564    }
565}
566
567/// Validate a remote node's [`HandshakeRequest`] against local configuration.
568///
569/// Returns a [`HandshakeResponse`] indicating whether the connection is accepted.
570///
571/// The check order is:
572/// 1. Wire protocol compatibility (same MAJOR version)
573/// 2. Adapter match (must be the same adapter)
574pub fn validate_handshake(
575    local: &HandshakeRequest,
576    remote: &HandshakeRequest,
577) -> HandshakeResponse {
578    // Check wire protocol compatibility first
579    if !local.wire_version.is_compatible(&remote.wire_version) {
580        return HandshakeResponse::Rejected {
581            node_id: local.node_id.clone(),
582            wire_version: local.wire_version,
583            reason: RejectionReason::IncompatibleProtocol,
584            detail: format!(
585                "wire version {remote} is incompatible with local {local} \
586                 (different MAJOR version)",
587                remote = remote.wire_version,
588                local = local.wire_version,
589            ),
590        };
591    }
592
593    // Check adapter match
594    if local.adapter != remote.adapter {
595        return HandshakeResponse::Rejected {
596            node_id: local.node_id.clone(),
597            wire_version: local.wire_version,
598            reason: RejectionReason::IncompatibleAdapter,
599            detail: format!(
600                "adapter \"{remote}\" does not match local \"{local}\"",
601                remote = remote.adapter,
602                local = local.adapter,
603            ),
604        };
605    }
606
607    // All checks passed
608    HandshakeResponse::Accepted {
609        node_id: local.node_id.clone(),
610        wire_version: local.wire_version,
611        app_version: local.app_version.clone(),
612        adapter: local.adapter.clone(),
613    }
614}
615
616/// Verify that an accepted handshake response came from the expected peer.
617///
618/// Adapters should call this after receiving [`HandshakeResponse::Accepted`]
619/// to ensure the responding node's identity matches who they intended to
620/// connect to. Returns `Ok(())` if the identities match, or `Err` with
621/// a descriptive message if they don't.
622///
623/// This prevents connecting to the wrong peer when a misconfigured or
624/// spoofed node responds with a different identity.
625pub fn verify_peer_identity(
626    expected: &NodeId,
627    response: &HandshakeResponse,
628) -> Result<(), String> {
629    match response {
630        HandshakeResponse::Accepted { node_id, .. } => {
631            if node_id != expected {
632                Err(format!(
633                    "peer identity mismatch: expected {expected}, got {node_id}"
634                ))
635            } else {
636                Ok(())
637            }
638        }
639        HandshakeResponse::Rejected { .. } => {
640            // Rejected responses don't need identity verification
641            Ok(())
642        }
643    }
644}
645
646// ---------------------------------------------------------------------------
647// SystemActorConfig
648// ---------------------------------------------------------------------------
649
650/// Configuration for system actors spawned by the runtime.
651///
652/// System actors (SpawnManager, WatchManager, CancelManager, NodeDirectory)
653/// are internal actors that handle remote operations. Under high fan-in from
654/// many nodes, the default unbounded mailbox may become a throughput bottleneck
655/// or memory risk.
656///
657/// This configuration allows tuning mailbox capacity per actor type and
658/// enabling pooling for stateless system actors like SpawnManager.
659///
660/// # Example
661///
662/// ```rust
663/// use dactor::system_actors::SystemActorConfig;
664/// use dactor::mailbox::{MailboxConfig, OverflowStrategy};
665///
666/// let config = SystemActorConfig::default()
667///     .with_spawn_manager_mailbox(
668///         MailboxConfig::bounded(10_000, OverflowStrategy::Block)
669///     )
670///     .with_spawn_manager_pool_size(4)
671///     .with_control_plane_mailbox(
672///         MailboxConfig::bounded(5_000, OverflowStrategy::Block)
673///     );
674/// ```
675#[derive(Debug, Clone)]
676pub struct SystemActorConfig {
677    /// Mailbox config for SpawnManager actors. Defaults to unbounded.
678    pub spawn_manager_mailbox: crate::mailbox::MailboxConfig,
679
680    /// Pool size for SpawnManager. `None` = single actor (default).
681    ///
682    /// When set to `Some(n)`, the runtime spawns `n` SpawnManager instances
683    /// with round-robin routing. Each instance shares a single `TypeRegistry`
684    /// (via `Arc`), so factory registrations are visible to all workers.
685    ///
686    /// SpawnManager is safe to pool because:
687    /// - Actor ID allocation uses a shared `AtomicU64` counter
688    /// - The type registry is read-only after initial setup
689    /// - Spawn tracking is aggregated at the runtime level
690    pub spawn_manager_pool_size: Option<usize>,
691
692    /// Mailbox config for control-plane actors (WatchManager, CancelManager,
693    /// NodeDirectory). Defaults to unbounded.
694    ///
695    /// These actors hold state that must be consistent (watch subscriptions,
696    /// cancellation tokens, peer directory), so they are **not poolable**.
697    /// Use a larger bounded mailbox with `OverflowStrategy::Block` to apply
698    /// backpressure without losing messages.
699    ///
700    /// **Warning:** Using `OverflowStrategy::DropNewest` or
701    /// `OverflowStrategy::RejectWithError` for control-plane actors may cause
702    /// correctness issues (missed watch notifications, leaked cancellation
703    /// tokens, stale peer state).
704    pub control_plane_mailbox: crate::mailbox::MailboxConfig,
705}
706
707impl Default for SystemActorConfig {
708    fn default() -> Self {
709        Self {
710            spawn_manager_mailbox: crate::mailbox::MailboxConfig::Unbounded,
711            spawn_manager_pool_size: None,
712            control_plane_mailbox: crate::mailbox::MailboxConfig::Unbounded,
713        }
714    }
715}
716
717impl SystemActorConfig {
718    /// Set the SpawnManager mailbox configuration.
719    pub fn with_spawn_manager_mailbox(mut self, mailbox: crate::mailbox::MailboxConfig) -> Self {
720        self.spawn_manager_mailbox = mailbox;
721        self
722    }
723
724    /// Set the SpawnManager pool size.
725    ///
726    /// `None` disables pooling (single actor). `Some(n)` creates `n` workers.
727    pub fn with_spawn_manager_pool_size(mut self, size: usize) -> Self {
728        self.spawn_manager_pool_size = Some(size);
729        self
730    }
731
732    /// Set the mailbox configuration for control-plane actors
733    /// (WatchManager, CancelManager, NodeDirectory).
734    pub fn with_control_plane_mailbox(mut self, mailbox: crate::mailbox::MailboxConfig) -> Self {
735        self.control_plane_mailbox = mailbox;
736        self
737    }
738}
739
740// ---------------------------------------------------------------------------
741// Tests
742// ---------------------------------------------------------------------------
743
744#[cfg(test)]
745mod tests {
746    use super::*;
747
748    // -- SpawnManager tests --
749
750    #[test]
751    fn spawn_manager_create_actor() {
752        let mut registry = crate::type_registry::TypeRegistry::new();
753        registry.register_factory("test::Worker", |bytes: &[u8]| {
754            if bytes.len() != 8 {
755                return Err(SerializationError::new("expected 8 bytes"));
756            }
757            let val = u64::from_be_bytes(bytes.try_into().unwrap());
758            Ok(Box::new(val))
759        });
760
761        let manager = SpawnManager::new(registry);
762        let request = SpawnRequest {
763            type_name: "test::Worker".into(),
764            args_bytes: 42u64.to_be_bytes().to_vec(),
765            name: "worker-1".into(),
766            request_id: "req-1".into(),
767        };
768
769        let actor = manager.create_actor(&request).unwrap();
770        let val = actor.downcast::<u64>().unwrap();
771        assert_eq!(*val, 42);
772    }
773
774    #[test]
775    fn spawn_manager_unknown_type() {
776        let registry = crate::type_registry::TypeRegistry::new();
777        let manager = SpawnManager::new(registry);
778        let request = SpawnRequest {
779            type_name: "unknown::Type".into(),
780            args_bytes: vec![],
781            name: "x".into(),
782            request_id: "req-2".into(),
783        };
784
785        let result = manager.create_actor(&request);
786        assert!(result.is_err());
787    }
788
789    #[test]
790    fn spawn_manager_records_spawned() {
791        let registry = crate::type_registry::TypeRegistry::new();
792        let mut manager = SpawnManager::new(registry);
793        assert!(manager.spawned_actors().is_empty());
794
795        let id = ActorId {
796            node: NodeId("n1".into()),
797            local: 1,
798        };
799        manager.record_spawn(id.clone());
800        assert_eq!(manager.spawned_actors().len(), 1);
801        assert_eq!(manager.spawned_actors()[0], id);
802    }
803
804    // -- WatchManager tests --
805
806    #[test]
807    fn watch_and_terminate() {
808        let mut wm = WatchManager::new();
809        let target = ActorId {
810            node: NodeId("n1".into()),
811            local: 1,
812        };
813        let watcher = ActorId {
814            node: NodeId("n2".into()),
815            local: 10,
816        };
817
818        wm.watch(target.clone(), watcher.clone());
819        assert_eq!(wm.watched_count(), 1);
820        assert_eq!(wm.watchers_of(&target).len(), 1);
821
822        let notifications = wm.on_terminated(&target);
823        assert_eq!(notifications.len(), 1);
824        assert_eq!(notifications[0].terminated, target);
825        assert_eq!(notifications[0].watcher, watcher);
826        assert_eq!(wm.watched_count(), 0);
827    }
828
829    #[test]
830    fn watch_multiple_watchers() {
831        let mut wm = WatchManager::new();
832        let target = ActorId {
833            node: NodeId("n1".into()),
834            local: 1,
835        };
836        let w1 = ActorId {
837            node: NodeId("n2".into()),
838            local: 10,
839        };
840        let w2 = ActorId {
841            node: NodeId("n3".into()),
842            local: 20,
843        };
844
845        wm.watch(target.clone(), w1.clone());
846        wm.watch(target.clone(), w2.clone());
847        assert_eq!(wm.watchers_of(&target).len(), 2);
848
849        let notifications = wm.on_terminated(&target);
850        assert_eq!(notifications.len(), 2);
851    }
852
853    #[test]
854    fn unwatch_removes_subscription() {
855        let mut wm = WatchManager::new();
856        let target = ActorId {
857            node: NodeId("n1".into()),
858            local: 1,
859        };
860        let watcher = ActorId {
861            node: NodeId("n2".into()),
862            local: 10,
863        };
864
865        wm.watch(target.clone(), watcher.clone());
866        wm.unwatch(&target, &watcher);
867        assert_eq!(wm.watched_count(), 0);
868
869        let notifications = wm.on_terminated(&target);
870        assert!(notifications.is_empty());
871    }
872
873    #[test]
874    fn terminate_unwatched_actor_returns_empty() {
875        let mut wm = WatchManager::new();
876        let target = ActorId {
877            node: NodeId("n1".into()),
878            local: 99,
879        };
880        let notifications = wm.on_terminated(&target);
881        assert!(notifications.is_empty());
882    }
883
884    // -- CancelManager tests --
885
886    #[test]
887    fn cancel_registered_request() {
888        let mut cm = CancelManager::new();
889        let token = tokio_util::sync::CancellationToken::new();
890        let token_check = token.clone();
891
892        cm.register("req-1".into(), token);
893        assert_eq!(cm.active_count(), 1);
894
895        let response = cm.cancel("req-1");
896        assert!(matches!(response, CancelResponse::Acknowledged));
897        assert!(token_check.is_cancelled());
898        assert_eq!(cm.active_count(), 0);
899    }
900
901    #[test]
902    fn cancel_unknown_request_returns_not_found() {
903        let mut cm = CancelManager::new();
904        let response = cm.cancel("nonexistent");
905        assert!(matches!(response, CancelResponse::NotFound { .. }));
906    }
907
908    #[test]
909    fn remove_cleans_up_token() {
910        let mut cm = CancelManager::new();
911        let token = tokio_util::sync::CancellationToken::new();
912        cm.register("req-1".into(), token);
913        assert_eq!(cm.active_count(), 1);
914
915        cm.remove("req-1");
916        assert_eq!(cm.active_count(), 0);
917    }
918
919    // -- NodeDirectory tests --
920
921    #[test]
922    fn add_and_query_peers() {
923        let mut dir = NodeDirectory::new();
924        dir.add_peer(NodeId("n1".into()), Some("10.0.0.1:4697".into()));
925        dir.add_peer(NodeId("n2".into()), None);
926
927        assert_eq!(dir.peer_count(), 2);
928        assert!(!dir.is_connected(&NodeId("n1".into())));
929
930        let info = dir.get_peer(&NodeId("n1".into())).unwrap();
931        assert_eq!(info.status, PeerStatus::Connecting);
932        assert_eq!(info.address.as_deref(), Some("10.0.0.1:4697"));
933    }
934
935    #[test]
936    fn set_status_and_filter() {
937        let mut dir = NodeDirectory::new();
938        dir.add_peer(NodeId("n1".into()), None);
939        dir.add_peer(NodeId("n2".into()), None);
940        dir.add_peer(NodeId("n3".into()), None);
941
942        dir.set_status(&NodeId("n1".into()), PeerStatus::Connected);
943        dir.set_status(&NodeId("n2".into()), PeerStatus::Connected);
944        dir.set_status(&NodeId("n3".into()), PeerStatus::Unreachable);
945
946        assert_eq!(dir.connected_count(), 2);
947        assert!(dir.is_connected(&NodeId("n1".into())));
948        assert!(!dir.is_connected(&NodeId("n3".into())));
949
950        let connected = dir.peers_with_status(PeerStatus::Connected);
951        assert_eq!(connected.len(), 2);
952
953        let unreachable = dir.peers_with_status(PeerStatus::Unreachable);
954        assert_eq!(unreachable.len(), 1);
955    }
956
957    #[test]
958    fn remove_peer() {
959        let mut dir = NodeDirectory::new();
960        dir.add_peer(NodeId("n1".into()), None);
961        assert_eq!(dir.peer_count(), 1);
962
963        dir.remove_peer(&NodeId("n1".into()));
964        assert_eq!(dir.peer_count(), 0);
965        assert!(dir.get_peer(&NodeId("n1".into())).is_none());
966    }
967
968    #[test]
969    fn peer_nodes_list() {
970        let mut dir = NodeDirectory::new();
971        dir.add_peer(NodeId("n1".into()), None);
972        dir.add_peer(NodeId("n2".into()), None);
973
974        let nodes = dir.peer_nodes();
975        assert_eq!(nodes.len(), 2);
976    }
977
978    #[test]
979    fn spawn_response_variants() {
980        let success = SpawnResponse::Success {
981            request_id: "r1".into(),
982            actor_id: ActorId {
983                node: NodeId("n1".into()),
984                local: 42,
985            },
986        };
987        assert!(matches!(success, SpawnResponse::Success { .. }));
988
989        let failure = SpawnResponse::Failure {
990            request_id: "r2".into(),
991            error: "type not found".into(),
992        };
993        assert!(matches!(failure, SpawnResponse::Failure { .. }));
994    }
995
996    #[test]
997    fn watch_notification_fields() {
998        let notif = WatchNotification {
999            terminated: ActorId {
1000                node: NodeId("n1".into()),
1001                local: 1,
1002            },
1003            watcher: ActorId {
1004                node: NodeId("n2".into()),
1005                local: 2,
1006            },
1007        };
1008        assert_eq!(notif.terminated.local, 1);
1009        assert_eq!(notif.watcher.local, 2);
1010    }
1011
1012    #[test]
1013    fn peer_status_transitions() {
1014        let mut dir = NodeDirectory::new();
1015        dir.add_peer(NodeId("n1".into()), None);
1016
1017        // Connecting → Connected → Unreachable → Disconnected
1018        assert_eq!(
1019            dir.get_peer(&NodeId("n1".into())).unwrap().status,
1020            PeerStatus::Connecting
1021        );
1022        dir.set_status(&NodeId("n1".into()), PeerStatus::Connected);
1023        assert_eq!(
1024            dir.get_peer(&NodeId("n1".into())).unwrap().status,
1025            PeerStatus::Connected
1026        );
1027        dir.set_status(&NodeId("n1".into()), PeerStatus::Unreachable);
1028        assert_eq!(
1029            dir.get_peer(&NodeId("n1".into())).unwrap().status,
1030            PeerStatus::Unreachable
1031        );
1032        dir.set_status(&NodeId("n1".into()), PeerStatus::Disconnected);
1033        assert_eq!(
1034            dir.get_peer(&NodeId("n1".into())).unwrap().status,
1035            PeerStatus::Disconnected
1036        );
1037    }
1038
1039    /// Guard against accidental changes to wire protocol constants.
1040    ///
1041    /// These strings are sent over the network between nodes. Changing them
1042    /// breaks cross-node compatibility. If this test fails, you almost
1043    /// certainly need to revert your change to the constant — the wire
1044    /// value must stay the same even if the Rust struct is renamed.
1045    #[test]
1046    fn wire_protocol_constants_are_stable() {
1047        assert_eq!(
1048            SYSTEM_MSG_TYPE_SPAWN,
1049            "dactor::system_actors::SpawnRequest",
1050            "SYSTEM_MSG_TYPE_SPAWN is a wire protocol value — do not change"
1051        );
1052        assert_eq!(
1053            SYSTEM_MSG_TYPE_WATCH,
1054            "dactor::system_actors::WatchRequest",
1055            "SYSTEM_MSG_TYPE_WATCH is a wire protocol value — do not change"
1056        );
1057        assert_eq!(
1058            SYSTEM_MSG_TYPE_UNWATCH,
1059            "dactor::system_actors::UnwatchRequest",
1060            "SYSTEM_MSG_TYPE_UNWATCH is a wire protocol value — do not change"
1061        );
1062        assert_eq!(
1063            SYSTEM_MSG_TYPE_CANCEL,
1064            "dactor::system_actors::CancelRequest",
1065            "SYSTEM_MSG_TYPE_CANCEL is a wire protocol value — do not change"
1066        );
1067        assert_eq!(
1068            SYSTEM_MSG_TYPE_CONNECT_PEER,
1069            "dactor::system_actors::ConnectPeer",
1070            "SYSTEM_MSG_TYPE_CONNECT_PEER is a wire protocol value — do not change"
1071        );
1072        assert_eq!(
1073            SYSTEM_MSG_TYPE_DISCONNECT_PEER,
1074            "dactor::system_actors::DisconnectPeer",
1075            "SYSTEM_MSG_TYPE_DISCONNECT_PEER is a wire protocol value — do not change"
1076        );
1077    }
1078
1079    // -- Handshake validation tests --
1080
1081    fn make_handshake_request(
1082        node_id: &str,
1083        wire: &str,
1084        adapter: &str,
1085        app_version: Option<&str>,
1086    ) -> HandshakeRequest {
1087        HandshakeRequest {
1088            node_id: NodeId(node_id.into()),
1089            wire_version: crate::version::WireVersion::parse(wire).unwrap(),
1090            app_version: app_version.map(|s| s.into()),
1091            adapter: adapter.into(),
1092        }
1093    }
1094
1095    #[test]
1096    fn handshake_same_version_and_adapter_accepted() {
1097        let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1098        let remote = make_handshake_request("node-b", "0.2.0", "ractor", None);
1099        let resp = validate_handshake(&local, &remote);
1100        assert!(matches!(resp, HandshakeResponse::Accepted { .. }));
1101    }
1102
1103    #[test]
1104    fn handshake_same_major_different_minor_accepted() {
1105        let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1106        let remote = make_handshake_request("node-b", "0.3.0", "ractor", None);
1107        let resp = validate_handshake(&local, &remote);
1108        assert!(matches!(resp, HandshakeResponse::Accepted { .. }));
1109    }
1110
1111    #[test]
1112    fn handshake_different_major_rejected() {
1113        let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1114        let remote = make_handshake_request("node-b", "1.0.0", "ractor", None);
1115        let resp = validate_handshake(&local, &remote);
1116        match resp {
1117            HandshakeResponse::Rejected { reason, detail, .. } => {
1118                assert_eq!(reason, RejectionReason::IncompatibleProtocol);
1119                assert!(detail.contains("MAJOR"));
1120            }
1121            _ => panic!("expected Rejected"),
1122        }
1123    }
1124
1125    #[test]
1126    fn handshake_different_adapter_rejected() {
1127        let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1128        let remote = make_handshake_request("node-b", "0.2.0", "kameo", None);
1129        let resp = validate_handshake(&local, &remote);
1130        match resp {
1131            HandshakeResponse::Rejected { reason, detail, .. } => {
1132                assert_eq!(reason, RejectionReason::IncompatibleAdapter);
1133                assert!(detail.contains("kameo"));
1134                assert!(detail.contains("ractor"));
1135            }
1136            _ => panic!("expected Rejected"),
1137        }
1138    }
1139
1140    #[test]
1141    fn handshake_protocol_checked_before_adapter() {
1142        let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1143        let remote = make_handshake_request("node-b", "1.0.0", "kameo", None);
1144        let resp = validate_handshake(&local, &remote);
1145        // Protocol mismatch should be reported even though adapter also differs
1146        match resp {
1147            HandshakeResponse::Rejected { reason, .. } => {
1148                assert_eq!(reason, RejectionReason::IncompatibleProtocol);
1149            }
1150            _ => panic!("expected Rejected"),
1151        }
1152    }
1153
1154    #[test]
1155    fn handshake_accepted_carries_local_info() {
1156        let local = make_handshake_request("node-a", "0.2.0", "ractor", Some("1.0.0"));
1157        let remote = make_handshake_request("node-b", "0.2.0", "ractor", Some("2.0.0"));
1158        match validate_handshake(&local, &remote) {
1159            HandshakeResponse::Accepted {
1160                node_id,
1161                wire_version,
1162                app_version,
1163                adapter,
1164            } => {
1165                assert_eq!(node_id, NodeId("node-a".into()));
1166                assert_eq!(wire_version.to_string(), "0.2.0");
1167                assert_eq!(app_version.as_deref(), Some("1.0.0"));
1168                assert_eq!(adapter, "ractor");
1169            }
1170            _ => panic!("expected Accepted"),
1171        }
1172    }
1173
1174    #[test]
1175    fn rejection_reason_display() {
1176        assert_eq!(
1177            RejectionReason::IncompatibleProtocol.to_string(),
1178            "incompatible wire protocol"
1179        );
1180        assert_eq!(
1181            RejectionReason::IncompatibleAdapter.to_string(),
1182            "incompatible adapter"
1183        );
1184    }
1185
1186    // -- verify_peer_identity tests --
1187
1188    #[test]
1189    fn verify_peer_identity_matching() {
1190        let resp = HandshakeResponse::Accepted {
1191            node_id: NodeId("node-2".into()),
1192            wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
1193            app_version: None,
1194            adapter: "ractor".into(),
1195        };
1196        assert!(verify_peer_identity(&NodeId("node-2".into()), &resp).is_ok());
1197    }
1198
1199    #[test]
1200    fn verify_peer_identity_mismatch() {
1201        let resp = HandshakeResponse::Accepted {
1202            node_id: NodeId("node-X".into()),
1203            wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
1204            app_version: None,
1205            adapter: "ractor".into(),
1206        };
1207        let result = verify_peer_identity(&NodeId("node-2".into()), &resp);
1208        assert!(result.is_err());
1209        assert!(result.unwrap_err().contains("mismatch"));
1210    }
1211
1212    #[test]
1213    fn verify_peer_identity_rejected_is_ok() {
1214        let resp = HandshakeResponse::Rejected {
1215            node_id: NodeId("node-2".into()),
1216            wire_version: crate::version::WireVersion::parse("1.0.0").unwrap(),
1217            reason: RejectionReason::IncompatibleProtocol,
1218            detail: "test".into(),
1219        };
1220        // Rejected responses don't need identity verification
1221        assert!(verify_peer_identity(&NodeId("node-2".into()), &resp).is_ok());
1222    }
1223}