1use dactor::node::{ActorId, NodeId};
9use dactor::system_actors::*;
10use dactor::type_registry::TypeRegistry;
11
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, Mutex};
14
15use coerce::actor::context::ActorContext;
16use coerce::actor::message::{Handler, Message};
17use coerce::actor::Actor;
18
19pub enum SpawnOutcome {
30 Success {
32 actor_id: ActorId,
33 actor: Mutex<Option<Box<dyn std::any::Any + Send>>>,
34 },
35 Failure(SpawnResponse),
37}
38
39impl SpawnOutcome {
40 pub fn success(actor_id: ActorId, actor: Box<dyn std::any::Any + Send>) -> Self {
42 Self::Success {
43 actor_id,
44 actor: Mutex::new(Some(actor)),
45 }
46 }
47
48 pub fn take_actor(&self) -> Option<Box<dyn std::any::Any + Send>> {
51 match self {
52 Self::Success { actor, .. } => actor.lock().unwrap_or_else(|e| e.into_inner()).take(),
53 Self::Failure(_) => None,
54 }
55 }
56}
57
58pub struct CancelOutcome(pub CancelResponse);
60
61pub type FactoryFn = Box<
63 dyn Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
64 + Send
65 + Sync,
66>;
67
68pub struct SpawnManagerActor {
70 manager: SpawnManager,
71 node_id: NodeId,
72 next_local: Arc<AtomicU64>,
73}
74
75impl SpawnManagerActor {
76 pub fn new(node_id: NodeId, registry: TypeRegistry, next_local: Arc<AtomicU64>) -> Self {
78 Self {
79 manager: SpawnManager::new(registry),
80 node_id,
81 next_local,
82 }
83 }
84}
85
86#[async_trait::async_trait]
87impl Actor for SpawnManagerActor {}
88
89pub struct HandleSpawnRequest(pub SpawnRequest);
91
92impl Message for HandleSpawnRequest {
93 type Result = SpawnOutcome;
94}
95
96#[async_trait::async_trait]
97impl Handler<HandleSpawnRequest> for SpawnManagerActor {
98 async fn handle(&mut self, msg: HandleSpawnRequest, _ctx: &mut ActorContext) -> SpawnOutcome {
99 match self.manager.create_actor(&msg.0) {
100 Ok(actor) => {
101 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
102 let actor_id = ActorId {
103 node: self.node_id.clone(),
104 local,
105 };
106 self.manager.record_spawn(actor_id.clone());
107 SpawnOutcome::success(actor_id, actor)
108 }
109 Err(e) => SpawnOutcome::Failure(SpawnResponse::Failure {
110 request_id: msg.0.request_id.clone(),
111 error: e.to_string(),
112 }),
113 }
114 }
115}
116
117pub struct RegisterFactory {
119 pub type_name: String,
120 pub factory: FactoryFn,
121}
122
123impl Message for RegisterFactory {
124 type Result = ();
125}
126
127#[async_trait::async_trait]
128impl Handler<RegisterFactory> for SpawnManagerActor {
129 async fn handle(&mut self, msg: RegisterFactory, _ctx: &mut ActorContext) {
130 self.manager
131 .type_registry_mut()
132 .register_factory(msg.type_name, msg.factory);
133 }
134}
135
136pub struct GetSpawnedActors;
138
139impl Message for GetSpawnedActors {
140 type Result = Vec<ActorId>;
141}
142
143#[async_trait::async_trait]
144impl Handler<GetSpawnedActors> for SpawnManagerActor {
145 async fn handle(&mut self, _msg: GetSpawnedActors, _ctx: &mut ActorContext) -> Vec<ActorId> {
146 self.manager.spawned_actors().to_vec()
147 }
148}
149
150pub struct WatchManagerActor {
156 manager: WatchManager,
157}
158
159impl WatchManagerActor {
160 pub fn new() -> Self {
162 Self {
163 manager: WatchManager::new(),
164 }
165 }
166}
167
168impl Default for WatchManagerActor {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174#[async_trait::async_trait]
175impl Actor for WatchManagerActor {}
176
177pub struct RemoteWatch {
179 pub target: ActorId,
180 pub watcher: ActorId,
181}
182
183impl Message for RemoteWatch {
184 type Result = ();
185}
186
187#[async_trait::async_trait]
188impl Handler<RemoteWatch> for WatchManagerActor {
189 async fn handle(&mut self, msg: RemoteWatch, _ctx: &mut ActorContext) {
190 self.manager.watch(msg.target, msg.watcher);
191 }
192}
193
194pub struct RemoteUnwatch {
196 pub target: ActorId,
197 pub watcher: ActorId,
198}
199
200impl Message for RemoteUnwatch {
201 type Result = ();
202}
203
204#[async_trait::async_trait]
205impl Handler<RemoteUnwatch> for WatchManagerActor {
206 async fn handle(&mut self, msg: RemoteUnwatch, _ctx: &mut ActorContext) {
207 self.manager.unwatch(&msg.target, &msg.watcher);
208 }
209}
210
211pub struct OnTerminated(pub ActorId);
213
214impl Message for OnTerminated {
215 type Result = Vec<WatchNotification>;
216}
217
218#[async_trait::async_trait]
219impl Handler<OnTerminated> for WatchManagerActor {
220 async fn handle(
221 &mut self,
222 msg: OnTerminated,
223 _ctx: &mut ActorContext,
224 ) -> Vec<WatchNotification> {
225 self.manager.on_terminated(&msg.0)
226 }
227}
228
229pub struct GetWatchedCount;
231
232impl Message for GetWatchedCount {
233 type Result = usize;
234}
235
236#[async_trait::async_trait]
237impl Handler<GetWatchedCount> for WatchManagerActor {
238 async fn handle(&mut self, _msg: GetWatchedCount, _ctx: &mut ActorContext) -> usize {
239 self.manager.watched_count()
240 }
241}
242
243pub struct CancelManagerActor {
249 manager: CancelManager,
250}
251
252impl CancelManagerActor {
253 pub fn new() -> Self {
255 Self {
256 manager: CancelManager::new(),
257 }
258 }
259}
260
261impl Default for CancelManagerActor {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267#[async_trait::async_trait]
268impl Actor for CancelManagerActor {}
269
270pub struct RegisterCancel {
272 pub request_id: String,
273 pub token: tokio_util::sync::CancellationToken,
274}
275
276impl Message for RegisterCancel {
277 type Result = ();
278}
279
280#[async_trait::async_trait]
281impl Handler<RegisterCancel> for CancelManagerActor {
282 async fn handle(&mut self, msg: RegisterCancel, _ctx: &mut ActorContext) {
283 self.manager.register(msg.request_id, msg.token);
284 }
285}
286
287pub struct CancelById(pub String);
289
290impl Message for CancelById {
291 type Result = CancelOutcome;
292}
293
294#[async_trait::async_trait]
295impl Handler<CancelById> for CancelManagerActor {
296 async fn handle(&mut self, msg: CancelById, _ctx: &mut ActorContext) -> CancelOutcome {
297 CancelOutcome(self.manager.cancel(&msg.0))
298 }
299}
300
301pub struct CompleteRequest(pub String);
303
304impl Message for CompleteRequest {
305 type Result = ();
306}
307
308#[async_trait::async_trait]
309impl Handler<CompleteRequest> for CancelManagerActor {
310 async fn handle(&mut self, msg: CompleteRequest, _ctx: &mut ActorContext) {
311 self.manager.remove(&msg.0);
312 }
313}
314
315pub struct GetActiveCount;
317
318impl Message for GetActiveCount {
319 type Result = usize;
320}
321
322#[async_trait::async_trait]
323impl Handler<GetActiveCount> for CancelManagerActor {
324 async fn handle(&mut self, _msg: GetActiveCount, _ctx: &mut ActorContext) -> usize {
325 self.manager.active_count()
326 }
327}
328
329pub struct NodeDirectoryActor {
335 directory: NodeDirectory,
336}
337
338impl NodeDirectoryActor {
339 pub fn new() -> Self {
341 Self {
342 directory: NodeDirectory::new(),
343 }
344 }
345}
346
347impl Default for NodeDirectoryActor {
348 fn default() -> Self {
349 Self::new()
350 }
351}
352
353#[async_trait::async_trait]
354impl Actor for NodeDirectoryActor {}
355
356pub struct ConnectPeer {
358 pub peer_id: NodeId,
359 pub address: Option<String>,
360}
361
362impl Message for ConnectPeer {
363 type Result = ();
364}
365
366#[async_trait::async_trait]
367impl Handler<ConnectPeer> for NodeDirectoryActor {
368 async fn handle(&mut self, msg: ConnectPeer, _ctx: &mut ActorContext) {
369 if let Some(existing) = self.directory.get_peer(&msg.peer_id) {
370 let resolved = msg.address.or_else(|| existing.address.clone());
371 self.directory.remove_peer(&msg.peer_id);
372 self.directory.add_peer(msg.peer_id.clone(), resolved);
373 } else {
374 self.directory.add_peer(msg.peer_id.clone(), msg.address);
375 }
376 self.directory
377 .set_status(&msg.peer_id, PeerStatus::Connected);
378 }
379}
380
381pub struct DisconnectPeer(pub NodeId);
383
384impl Message for DisconnectPeer {
385 type Result = ();
386}
387
388#[async_trait::async_trait]
389impl Handler<DisconnectPeer> for NodeDirectoryActor {
390 async fn handle(&mut self, msg: DisconnectPeer, _ctx: &mut ActorContext) {
391 self.directory
392 .set_status(&msg.0, PeerStatus::Disconnected);
393 }
394}
395
396pub struct IsConnected(pub NodeId);
398
399impl Message for IsConnected {
400 type Result = bool;
401}
402
403#[async_trait::async_trait]
404impl Handler<IsConnected> for NodeDirectoryActor {
405 async fn handle(&mut self, msg: IsConnected, _ctx: &mut ActorContext) -> bool {
406 self.directory.is_connected(&msg.0)
407 }
408}
409
410pub struct GetPeerCount;
412
413impl Message for GetPeerCount {
414 type Result = usize;
415}
416
417#[async_trait::async_trait]
418impl Handler<GetPeerCount> for NodeDirectoryActor {
419 async fn handle(&mut self, _msg: GetPeerCount, _ctx: &mut ActorContext) -> usize {
420 self.directory.peer_count()
421 }
422}
423
424pub struct GetConnectedCount;
426
427impl Message for GetConnectedCount {
428 type Result = usize;
429}
430
431#[async_trait::async_trait]
432impl Handler<GetConnectedCount> for NodeDirectoryActor {
433 async fn handle(&mut self, _msg: GetConnectedCount, _ctx: &mut ActorContext) -> usize {
434 self.directory.connected_count()
435 }
436}
437
438pub struct GetPeerInfo(pub NodeId);
440
441impl Message for GetPeerInfo {
442 type Result = Option<PeerInfo>;
443}
444
445#[async_trait::async_trait]
446impl Handler<GetPeerInfo> for NodeDirectoryActor {
447 async fn handle(&mut self, msg: GetPeerInfo, _ctx: &mut ActorContext) -> Option<PeerInfo> {
448 self.directory.get_peer(&msg.0).cloned()
449 }
450}