Skip to main content

dactor_coerce/
system_actors.rs

1//! Native coerce actor implementations for dactor system actors.
2//!
3//! Each system actor (SpawnManager, WatchManager, CancelManager, NodeDirectory)
4//! is wrapped in a real coerce `Actor` with its own mailbox. Messages are
5//! defined as separate types implementing `coerce::actor::message::Message`,
6//! enabling coerce's native notify/send semantics.
7
8use dactor::node::{ActorId, NodeId};
9use dactor::system_actors::*;
10use dactor::type_registry::TypeRegistry;
11
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, Mutex};
14
15use coerce::actor::context::ActorContext;
16use coerce::actor::message::{Handler, Message};
17use coerce::actor::Actor;
18
19// ---------------------------------------------------------------------------
20// CP5: SpawnManager actor
21// ---------------------------------------------------------------------------
22
23/// Reply for spawn requests — wraps the outcome as a plain type so domain
24/// failures (unknown type, bad args) stay as reply data.
25///
26/// The actor box is wrapped in `Mutex<Option<…>>` to satisfy `Sync`, which
27/// coerce requires for all `Message::Result` types. Callers extract the
28/// value via [`SpawnOutcome::take_actor`].
29pub enum SpawnOutcome {
30    /// Actor created successfully.
31    Success {
32        actor_id: ActorId,
33        actor: Mutex<Option<Box<dyn std::any::Any + Send>>>,
34    },
35    /// Spawn failed (unknown type, deserialization error).
36    Failure(SpawnResponse),
37}
38
39impl SpawnOutcome {
40    /// Construct a `Success` variant, wrapping the actor in a `Mutex`.
41    pub fn success(actor_id: ActorId, actor: Box<dyn std::any::Any + Send>) -> Self {
42        Self::Success {
43            actor_id,
44            actor: Mutex::new(Some(actor)),
45        }
46    }
47
48    /// Take the actor box out of a `Success` variant. Returns `None` if
49    /// already taken or if the outcome is `Failure`.
50    pub fn take_actor(&self) -> Option<Box<dyn std::any::Any + Send>> {
51        match self {
52            Self::Success { actor, .. } => actor.lock().unwrap_or_else(|e| e.into_inner()).take(),
53            Self::Failure(_) => None,
54        }
55    }
56}
57
58/// Reply wrapper for CancelResponse.
59pub struct CancelOutcome(pub CancelResponse);
60
61/// Factory function type for creating actors from serialized bytes.
62pub type FactoryFn = Box<
63    dyn Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
64        + Send
65        + Sync,
66>;
67
68/// Native coerce actor wrapping [`SpawnManager`].
69pub struct SpawnManagerActor {
70    manager: SpawnManager,
71    node_id: NodeId,
72    next_local: Arc<AtomicU64>,
73}
74
75impl SpawnManagerActor {
76    /// Create a new `SpawnManagerActor`.
77    pub fn new(node_id: NodeId, registry: TypeRegistry, next_local: Arc<AtomicU64>) -> Self {
78        Self {
79            manager: SpawnManager::new(registry),
80            node_id,
81            next_local,
82        }
83    }
84}
85
86#[async_trait::async_trait]
87impl Actor for SpawnManagerActor {}
88
89/// Message: process a remote spawn request.
90pub struct HandleSpawnRequest(pub SpawnRequest);
91
92impl Message for HandleSpawnRequest {
93    type Result = SpawnOutcome;
94}
95
96#[async_trait::async_trait]
97impl Handler<HandleSpawnRequest> for SpawnManagerActor {
98    async fn handle(&mut self, msg: HandleSpawnRequest, _ctx: &mut ActorContext) -> SpawnOutcome {
99        match self.manager.create_actor(&msg.0) {
100            Ok(actor) => {
101                let local = self.next_local.fetch_add(1, Ordering::SeqCst);
102                let actor_id = ActorId {
103                    node: self.node_id.clone(),
104                    local,
105                };
106                self.manager.record_spawn(actor_id.clone());
107                SpawnOutcome::success(actor_id, actor)
108            }
109            Err(e) => SpawnOutcome::Failure(SpawnResponse::Failure {
110                request_id: msg.0.request_id.clone(),
111                error: e.to_string(),
112            }),
113        }
114    }
115}
116
117/// Message: register a factory for a type name.
118pub struct RegisterFactory {
119    pub type_name: String,
120    pub factory: FactoryFn,
121}
122
123impl Message for RegisterFactory {
124    type Result = ();
125}
126
127#[async_trait::async_trait]
128impl Handler<RegisterFactory> for SpawnManagerActor {
129    async fn handle(&mut self, msg: RegisterFactory, _ctx: &mut ActorContext) {
130        self.manager
131            .type_registry_mut()
132            .register_factory(msg.type_name, msg.factory);
133    }
134}
135
136/// Message: query spawned actors list.
137pub struct GetSpawnedActors;
138
139impl Message for GetSpawnedActors {
140    type Result = Vec<ActorId>;
141}
142
143#[async_trait::async_trait]
144impl Handler<GetSpawnedActors> for SpawnManagerActor {
145    async fn handle(&mut self, _msg: GetSpawnedActors, _ctx: &mut ActorContext) -> Vec<ActorId> {
146        self.manager.spawned_actors().to_vec()
147    }
148}
149
150// ---------------------------------------------------------------------------
151// CP5: WatchManager actor
152// ---------------------------------------------------------------------------
153
154/// Native coerce actor wrapping [`WatchManager`].
155pub struct WatchManagerActor {
156    manager: WatchManager,
157}
158
159impl WatchManagerActor {
160    /// Create a new `WatchManagerActor`.
161    pub fn new() -> Self {
162        Self {
163            manager: WatchManager::new(),
164        }
165    }
166}
167
168impl Default for WatchManagerActor {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174#[async_trait::async_trait]
175impl Actor for WatchManagerActor {}
176
177/// Message: register a remote watch.
178pub struct RemoteWatch {
179    pub target: ActorId,
180    pub watcher: ActorId,
181}
182
183impl Message for RemoteWatch {
184    type Result = ();
185}
186
187#[async_trait::async_trait]
188impl Handler<RemoteWatch> for WatchManagerActor {
189    async fn handle(&mut self, msg: RemoteWatch, _ctx: &mut ActorContext) {
190        self.manager.watch(msg.target, msg.watcher);
191    }
192}
193
194/// Message: remove a remote watch.
195pub struct RemoteUnwatch {
196    pub target: ActorId,
197    pub watcher: ActorId,
198}
199
200impl Message for RemoteUnwatch {
201    type Result = ();
202}
203
204#[async_trait::async_trait]
205impl Handler<RemoteUnwatch> for WatchManagerActor {
206    async fn handle(&mut self, msg: RemoteUnwatch, _ctx: &mut ActorContext) {
207        self.manager.unwatch(&msg.target, &msg.watcher);
208    }
209}
210
211/// Message: actor terminated — return notifications for remote watchers.
212pub struct OnTerminated(pub ActorId);
213
214impl Message for OnTerminated {
215    type Result = Vec<WatchNotification>;
216}
217
218#[async_trait::async_trait]
219impl Handler<OnTerminated> for WatchManagerActor {
220    async fn handle(
221        &mut self,
222        msg: OnTerminated,
223        _ctx: &mut ActorContext,
224    ) -> Vec<WatchNotification> {
225        self.manager.on_terminated(&msg.0)
226    }
227}
228
229/// Message: query watched count.
230pub struct GetWatchedCount;
231
232impl Message for GetWatchedCount {
233    type Result = usize;
234}
235
236#[async_trait::async_trait]
237impl Handler<GetWatchedCount> for WatchManagerActor {
238    async fn handle(&mut self, _msg: GetWatchedCount, _ctx: &mut ActorContext) -> usize {
239        self.manager.watched_count()
240    }
241}
242
243// ---------------------------------------------------------------------------
244// CP5: CancelManager actor
245// ---------------------------------------------------------------------------
246
247/// Native coerce actor wrapping [`CancelManager`].
248pub struct CancelManagerActor {
249    manager: CancelManager,
250}
251
252impl CancelManagerActor {
253    /// Create a new `CancelManagerActor`.
254    pub fn new() -> Self {
255        Self {
256            manager: CancelManager::new(),
257        }
258    }
259}
260
261impl Default for CancelManagerActor {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267#[async_trait::async_trait]
268impl Actor for CancelManagerActor {}
269
270/// Message: register a cancellation token for a request.
271pub struct RegisterCancel {
272    pub request_id: String,
273    pub token: tokio_util::sync::CancellationToken,
274}
275
276impl Message for RegisterCancel {
277    type Result = ();
278}
279
280#[async_trait::async_trait]
281impl Handler<RegisterCancel> for CancelManagerActor {
282    async fn handle(&mut self, msg: RegisterCancel, _ctx: &mut ActorContext) {
283        self.manager.register(msg.request_id, msg.token);
284    }
285}
286
287/// Message: cancel a request by ID.
288pub struct CancelById(pub String);
289
290impl Message for CancelById {
291    type Result = CancelOutcome;
292}
293
294#[async_trait::async_trait]
295impl Handler<CancelById> for CancelManagerActor {
296    async fn handle(&mut self, msg: CancelById, _ctx: &mut ActorContext) -> CancelOutcome {
297        CancelOutcome(self.manager.cancel(&msg.0))
298    }
299}
300
301/// Message: clean up after a request completes normally.
302pub struct CompleteRequest(pub String);
303
304impl Message for CompleteRequest {
305    type Result = ();
306}
307
308#[async_trait::async_trait]
309impl Handler<CompleteRequest> for CancelManagerActor {
310    async fn handle(&mut self, msg: CompleteRequest, _ctx: &mut ActorContext) {
311        self.manager.remove(&msg.0);
312    }
313}
314
315/// Message: query active count.
316pub struct GetActiveCount;
317
318impl Message for GetActiveCount {
319    type Result = usize;
320}
321
322#[async_trait::async_trait]
323impl Handler<GetActiveCount> for CancelManagerActor {
324    async fn handle(&mut self, _msg: GetActiveCount, _ctx: &mut ActorContext) -> usize {
325        self.manager.active_count()
326    }
327}
328
329// ---------------------------------------------------------------------------
330// CP5: NodeDirectory actor
331// ---------------------------------------------------------------------------
332
333/// Native coerce actor wrapping [`NodeDirectory`].
334pub struct NodeDirectoryActor {
335    directory: NodeDirectory,
336}
337
338impl NodeDirectoryActor {
339    /// Create a new `NodeDirectoryActor`.
340    pub fn new() -> Self {
341        Self {
342            directory: NodeDirectory::new(),
343        }
344    }
345}
346
347impl Default for NodeDirectoryActor {
348    fn default() -> Self {
349        Self::new()
350    }
351}
352
353#[async_trait::async_trait]
354impl Actor for NodeDirectoryActor {}
355
356/// Message: register a peer node as connected.
357pub struct ConnectPeer {
358    pub peer_id: NodeId,
359    pub address: Option<String>,
360}
361
362impl Message for ConnectPeer {
363    type Result = ();
364}
365
366#[async_trait::async_trait]
367impl Handler<ConnectPeer> for NodeDirectoryActor {
368    async fn handle(&mut self, msg: ConnectPeer, _ctx: &mut ActorContext) {
369        if let Some(existing) = self.directory.get_peer(&msg.peer_id) {
370            let resolved = msg.address.or_else(|| existing.address.clone());
371            self.directory.remove_peer(&msg.peer_id);
372            self.directory.add_peer(msg.peer_id.clone(), resolved);
373        } else {
374            self.directory.add_peer(msg.peer_id.clone(), msg.address);
375        }
376        self.directory
377            .set_status(&msg.peer_id, PeerStatus::Connected);
378    }
379}
380
381/// Message: mark a peer as disconnected.
382pub struct DisconnectPeer(pub NodeId);
383
384impl Message for DisconnectPeer {
385    type Result = ();
386}
387
388#[async_trait::async_trait]
389impl Handler<DisconnectPeer> for NodeDirectoryActor {
390    async fn handle(&mut self, msg: DisconnectPeer, _ctx: &mut ActorContext) {
391        self.directory
392            .set_status(&msg.0, PeerStatus::Disconnected);
393    }
394}
395
396/// Message: check if a peer is connected.
397pub struct IsConnected(pub NodeId);
398
399impl Message for IsConnected {
400    type Result = bool;
401}
402
403#[async_trait::async_trait]
404impl Handler<IsConnected> for NodeDirectoryActor {
405    async fn handle(&mut self, msg: IsConnected, _ctx: &mut ActorContext) -> bool {
406        self.directory.is_connected(&msg.0)
407    }
408}
409
410/// Message: query peer count.
411pub struct GetPeerCount;
412
413impl Message for GetPeerCount {
414    type Result = usize;
415}
416
417#[async_trait::async_trait]
418impl Handler<GetPeerCount> for NodeDirectoryActor {
419    async fn handle(&mut self, _msg: GetPeerCount, _ctx: &mut ActorContext) -> usize {
420        self.directory.peer_count()
421    }
422}
423
424/// Message: query connected count.
425pub struct GetConnectedCount;
426
427impl Message for GetConnectedCount {
428    type Result = usize;
429}
430
431#[async_trait::async_trait]
432impl Handler<GetConnectedCount> for NodeDirectoryActor {
433    async fn handle(&mut self, _msg: GetConnectedCount, _ctx: &mut ActorContext) -> usize {
434        self.directory.connected_count()
435    }
436}
437
438/// Message: query peer info.
439pub struct GetPeerInfo(pub NodeId);
440
441impl Message for GetPeerInfo {
442    type Result = Option<PeerInfo>;
443}
444
445#[async_trait::async_trait]
446impl Handler<GetPeerInfo> for NodeDirectoryActor {
447    async fn handle(&mut self, msg: GetPeerInfo, _ctx: &mut ActorContext) -> Option<PeerInfo> {
448        self.directory.get_peer(&msg.0).cloned()
449    }
450}