Skip to main content

dactor_ractor/
system_actors.rs

1//! Native ractor actor implementations for dactor system actors.
2//!
3//! Each system actor (SpawnManager, WatchManager, CancelManager, NodeDirectory)
4//! is wrapped in a real `ractor::Actor` with its own mailbox. This enables:
5//!
6//! - **Message-based access** — system actor operations arrive as mailbox
7//!   messages, processed single-threaded (no `&mut self` contention).
8//! - **Transport integration** — incoming `WireEnvelope` system messages can
9//!   be routed directly to the actor's mailbox.
10//! - **Backpressure** — mailbox depth limits prevent system actor overload.
11//! - **Supervision** — system actors can be supervised and restarted.
12
13use dactor::node::{ActorId, NodeId};
14use dactor::system_actors::*;
15use dactor::type_registry::TypeRegistry;
16
17use std::sync::Arc;
18use tokio::sync::oneshot;
19
20/// Result type for spawn requests.
21pub type SpawnResult = Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse>;
22
23/// Factory function type for creating actors from serialized bytes.
24pub type FactoryFn = Box<
25    dyn Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
26        + Send
27        + Sync,
28>;
29
30// ---------------------------------------------------------------------------
31// NA1: SpawnManager actor
32// ---------------------------------------------------------------------------
33
34/// Message enum for the SpawnManager actor.
35pub enum SpawnManagerMsg {
36    /// Process a remote spawn request.
37    HandleRequest {
38        request: SpawnRequest,
39        reply: oneshot::Sender<SpawnResult>,
40    },
41    /// Register a factory for a type name.
42    RegisterFactory {
43        type_name: String,
44        factory: FactoryFn,
45        reply: oneshot::Sender<()>,
46    },
47    /// Query spawned actors list.
48    GetSpawnedActors {
49        reply: oneshot::Sender<Vec<ActorId>>,
50    },
51}
52
53/// Native ractor actor wrapping [`SpawnManager`].
54pub struct SpawnManagerActor;
55
56/// Internal state for SpawnManagerActor.
57pub struct SpawnManagerState {
58    manager: SpawnManager,
59    node_id: NodeId,
60    /// Shared ID counter — must be the same `Arc` used by the runtime
61    /// to prevent local/remote ActorId collisions.
62    next_local: Arc<std::sync::atomic::AtomicU64>,
63}
64
65impl ractor::Actor for SpawnManagerActor {
66    type Msg = SpawnManagerMsg;
67    type State = SpawnManagerState;
68    type Arguments = (NodeId, TypeRegistry, Arc<std::sync::atomic::AtomicU64>);
69
70    async fn pre_start(
71        &self,
72        _myself: ractor::ActorRef<Self::Msg>,
73        args: Self::Arguments,
74    ) -> Result<Self::State, ractor::ActorProcessingErr> {
75        Ok(SpawnManagerState {
76            manager: SpawnManager::new(args.1),
77            node_id: args.0,
78            next_local: args.2,
79        })
80    }
81
82    async fn handle(
83        &self,
84        _myself: ractor::ActorRef<Self::Msg>,
85        message: Self::Msg,
86        state: &mut Self::State,
87    ) -> Result<(), ractor::ActorProcessingErr> {
88        match message {
89            SpawnManagerMsg::HandleRequest { request, reply } => {
90                let result = match state.manager.create_actor(&request) {
91                    Ok(actor) => {
92                        let local = state
93                            .next_local
94                            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
95                        let actor_id = ActorId {
96                            node: state.node_id.clone(),
97                            local,
98                        };
99                        state.manager.record_spawn(actor_id.clone());
100                        Ok((actor_id, actor))
101                    }
102                    Err(e) => Err(SpawnResponse::Failure {
103                        request_id: request.request_id.clone(),
104                        error: e.to_string(),
105                    }),
106                };
107                let _ = reply.send(result);
108            }
109            SpawnManagerMsg::RegisterFactory {
110                type_name,
111                factory,
112                reply,
113            } => {
114                state
115                    .manager
116                    .type_registry_mut()
117                    .register_factory(type_name, factory);
118                let _ = reply.send(());
119            }
120            SpawnManagerMsg::GetSpawnedActors { reply } => {
121                let _ = reply.send(state.manager.spawned_actors().to_vec());
122            }
123        }
124        Ok(())
125    }
126}
127
128// ---------------------------------------------------------------------------
129// NA2: WatchManager actor
130// ---------------------------------------------------------------------------
131
132/// Message enum for the WatchManager actor.
133pub enum WatchManagerMsg {
134    /// Register a remote watch.
135    Watch { target: ActorId, watcher: ActorId },
136    /// Remove a remote watch.
137    Unwatch { target: ActorId, watcher: ActorId },
138    /// Actor terminated — return notifications for remote watchers.
139    OnTerminated {
140        terminated: ActorId,
141        reply: oneshot::Sender<Vec<WatchNotification>>,
142    },
143    /// Query watched count.
144    GetWatchedCount {
145        reply: oneshot::Sender<usize>,
146    },
147}
148
149/// Native ractor actor wrapping [`WatchManager`].
150pub struct WatchManagerActor;
151
152impl ractor::Actor for WatchManagerActor {
153    type Msg = WatchManagerMsg;
154    type State = WatchManager;
155    type Arguments = ();
156
157    async fn pre_start(
158        &self,
159        _myself: ractor::ActorRef<Self::Msg>,
160        _args: Self::Arguments,
161    ) -> Result<Self::State, ractor::ActorProcessingErr> {
162        Ok(WatchManager::new())
163    }
164
165    async fn handle(
166        &self,
167        _myself: ractor::ActorRef<Self::Msg>,
168        message: Self::Msg,
169        state: &mut Self::State,
170    ) -> Result<(), ractor::ActorProcessingErr> {
171        match message {
172            WatchManagerMsg::Watch { target, watcher } => {
173                state.watch(target, watcher);
174            }
175            WatchManagerMsg::Unwatch { target, watcher } => {
176                state.unwatch(&target, &watcher);
177            }
178            WatchManagerMsg::OnTerminated { terminated, reply } => {
179                let notifications = state.on_terminated(&terminated);
180                let _ = reply.send(notifications);
181            }
182            WatchManagerMsg::GetWatchedCount { reply } => {
183                let _ = reply.send(state.watched_count());
184            }
185        }
186        Ok(())
187    }
188}
189
190// ---------------------------------------------------------------------------
191// NA3: CancelManager actor
192// ---------------------------------------------------------------------------
193
194/// Message enum for the CancelManager actor.
195pub enum CancelManagerMsg {
196    /// Register a cancellation token for a request.
197    Register {
198        request_id: String,
199        token: tokio_util::sync::CancellationToken,
200    },
201    /// Cancel a request by ID.
202    Cancel {
203        request_id: String,
204        reply: oneshot::Sender<CancelResponse>,
205    },
206    /// Clean up after a request completes normally.
207    Complete { request_id: String },
208    /// Query active count.
209    GetActiveCount {
210        reply: oneshot::Sender<usize>,
211    },
212}
213
214/// Native ractor actor wrapping [`CancelManager`].
215pub struct CancelManagerActor;
216
217impl ractor::Actor for CancelManagerActor {
218    type Msg = CancelManagerMsg;
219    type State = CancelManager;
220    type Arguments = ();
221
222    async fn pre_start(
223        &self,
224        _myself: ractor::ActorRef<Self::Msg>,
225        _args: Self::Arguments,
226    ) -> Result<Self::State, ractor::ActorProcessingErr> {
227        Ok(CancelManager::new())
228    }
229
230    async fn handle(
231        &self,
232        _myself: ractor::ActorRef<Self::Msg>,
233        message: Self::Msg,
234        state: &mut Self::State,
235    ) -> Result<(), ractor::ActorProcessingErr> {
236        match message {
237            CancelManagerMsg::Register { request_id, token } => {
238                state.register(request_id, token);
239            }
240            CancelManagerMsg::Cancel { request_id, reply } => {
241                let response = state.cancel(&request_id);
242                let _ = reply.send(response);
243            }
244            CancelManagerMsg::Complete { request_id } => {
245                state.remove(&request_id);
246            }
247            CancelManagerMsg::GetActiveCount { reply } => {
248                let _ = reply.send(state.active_count());
249            }
250        }
251        Ok(())
252    }
253}
254
255// ---------------------------------------------------------------------------
256// NA4: NodeDirectory actor
257// ---------------------------------------------------------------------------
258
259/// Message enum for the NodeDirectory actor.
260pub enum NodeDirectoryMsg {
261    /// Register a peer node as connected.
262    ConnectPeer {
263        peer_id: NodeId,
264        address: Option<String>,
265    },
266    /// Mark a peer as disconnected.
267    DisconnectPeer { peer_id: NodeId },
268    /// Check if a peer is connected.
269    IsConnected {
270        peer_id: NodeId,
271        reply: oneshot::Sender<bool>,
272    },
273    /// Query peer count.
274    GetPeerCount {
275        reply: oneshot::Sender<usize>,
276    },
277    /// Query connected count.
278    GetConnectedCount {
279        reply: oneshot::Sender<usize>,
280    },
281    /// Query peer info (address, status).
282    GetPeerInfo {
283        peer_id: NodeId,
284        reply: oneshot::Sender<Option<PeerInfo>>,
285    },
286}
287
288/// Native ractor actor wrapping [`NodeDirectory`].
289pub struct NodeDirectoryActor;
290
291impl ractor::Actor for NodeDirectoryActor {
292    type Msg = NodeDirectoryMsg;
293    type State = NodeDirectory;
294    type Arguments = ();
295
296    async fn pre_start(
297        &self,
298        _myself: ractor::ActorRef<Self::Msg>,
299        _args: Self::Arguments,
300    ) -> Result<Self::State, ractor::ActorProcessingErr> {
301        Ok(NodeDirectory::new())
302    }
303
304    async fn handle(
305        &self,
306        _myself: ractor::ActorRef<Self::Msg>,
307        message: Self::Msg,
308        state: &mut Self::State,
309    ) -> Result<(), ractor::ActorProcessingErr> {
310        match message {
311            NodeDirectoryMsg::ConnectPeer { peer_id, address } => {
312                if let Some(existing) = state.get_peer(&peer_id) {
313                    let resolved = address.or_else(|| existing.address.clone());
314                    state.remove_peer(&peer_id);
315                    state.add_peer(peer_id.clone(), resolved);
316                } else {
317                    state.add_peer(peer_id.clone(), address);
318                }
319                state.set_status(&peer_id, PeerStatus::Connected);
320            }
321            NodeDirectoryMsg::DisconnectPeer { peer_id } => {
322                state.set_status(&peer_id, PeerStatus::Disconnected);
323            }
324            NodeDirectoryMsg::IsConnected { peer_id, reply } => {
325                let _ = reply.send(state.is_connected(&peer_id));
326            }
327            NodeDirectoryMsg::GetPeerCount { reply } => {
328                let _ = reply.send(state.peer_count());
329            }
330            NodeDirectoryMsg::GetConnectedCount { reply } => {
331                let _ = reply.send(state.connected_count());
332            }
333            NodeDirectoryMsg::GetPeerInfo { peer_id, reply } => {
334                let _ = reply.send(state.get_peer(&peer_id).cloned());
335            }
336        }
337        Ok(())
338    }
339}