1use dactor::node::{ActorId, NodeId};
14use dactor::system_actors::*;
15use dactor::type_registry::TypeRegistry;
16
17use std::sync::Arc;
18use tokio::sync::oneshot;
19
20pub type SpawnResult = Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse>;
22
23pub type FactoryFn = Box<
25 dyn Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
26 + Send
27 + Sync,
28>;
29
30pub enum SpawnManagerMsg {
36 HandleRequest {
38 request: SpawnRequest,
39 reply: oneshot::Sender<SpawnResult>,
40 },
41 RegisterFactory {
43 type_name: String,
44 factory: FactoryFn,
45 reply: oneshot::Sender<()>,
46 },
47 GetSpawnedActors {
49 reply: oneshot::Sender<Vec<ActorId>>,
50 },
51}
52
53pub struct SpawnManagerActor;
55
56pub struct SpawnManagerState {
58 manager: SpawnManager,
59 node_id: NodeId,
60 next_local: Arc<std::sync::atomic::AtomicU64>,
63}
64
65impl ractor::Actor for SpawnManagerActor {
66 type Msg = SpawnManagerMsg;
67 type State = SpawnManagerState;
68 type Arguments = (NodeId, TypeRegistry, Arc<std::sync::atomic::AtomicU64>);
69
70 async fn pre_start(
71 &self,
72 _myself: ractor::ActorRef<Self::Msg>,
73 args: Self::Arguments,
74 ) -> Result<Self::State, ractor::ActorProcessingErr> {
75 Ok(SpawnManagerState {
76 manager: SpawnManager::new(args.1),
77 node_id: args.0,
78 next_local: args.2,
79 })
80 }
81
82 async fn handle(
83 &self,
84 _myself: ractor::ActorRef<Self::Msg>,
85 message: Self::Msg,
86 state: &mut Self::State,
87 ) -> Result<(), ractor::ActorProcessingErr> {
88 match message {
89 SpawnManagerMsg::HandleRequest { request, reply } => {
90 let result = match state.manager.create_actor(&request) {
91 Ok(actor) => {
92 let local = state
93 .next_local
94 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
95 let actor_id = ActorId {
96 node: state.node_id.clone(),
97 local,
98 };
99 state.manager.record_spawn(actor_id.clone());
100 Ok((actor_id, actor))
101 }
102 Err(e) => Err(SpawnResponse::Failure {
103 request_id: request.request_id.clone(),
104 error: e.to_string(),
105 }),
106 };
107 let _ = reply.send(result);
108 }
109 SpawnManagerMsg::RegisterFactory {
110 type_name,
111 factory,
112 reply,
113 } => {
114 state
115 .manager
116 .type_registry_mut()
117 .register_factory(type_name, factory);
118 let _ = reply.send(());
119 }
120 SpawnManagerMsg::GetSpawnedActors { reply } => {
121 let _ = reply.send(state.manager.spawned_actors().to_vec());
122 }
123 }
124 Ok(())
125 }
126}
127
128pub enum WatchManagerMsg {
134 Watch { target: ActorId, watcher: ActorId },
136 Unwatch { target: ActorId, watcher: ActorId },
138 OnTerminated {
140 terminated: ActorId,
141 reply: oneshot::Sender<Vec<WatchNotification>>,
142 },
143 GetWatchedCount {
145 reply: oneshot::Sender<usize>,
146 },
147}
148
149pub struct WatchManagerActor;
151
152impl ractor::Actor for WatchManagerActor {
153 type Msg = WatchManagerMsg;
154 type State = WatchManager;
155 type Arguments = ();
156
157 async fn pre_start(
158 &self,
159 _myself: ractor::ActorRef<Self::Msg>,
160 _args: Self::Arguments,
161 ) -> Result<Self::State, ractor::ActorProcessingErr> {
162 Ok(WatchManager::new())
163 }
164
165 async fn handle(
166 &self,
167 _myself: ractor::ActorRef<Self::Msg>,
168 message: Self::Msg,
169 state: &mut Self::State,
170 ) -> Result<(), ractor::ActorProcessingErr> {
171 match message {
172 WatchManagerMsg::Watch { target, watcher } => {
173 state.watch(target, watcher);
174 }
175 WatchManagerMsg::Unwatch { target, watcher } => {
176 state.unwatch(&target, &watcher);
177 }
178 WatchManagerMsg::OnTerminated { terminated, reply } => {
179 let notifications = state.on_terminated(&terminated);
180 let _ = reply.send(notifications);
181 }
182 WatchManagerMsg::GetWatchedCount { reply } => {
183 let _ = reply.send(state.watched_count());
184 }
185 }
186 Ok(())
187 }
188}
189
190pub enum CancelManagerMsg {
196 Register {
198 request_id: String,
199 token: tokio_util::sync::CancellationToken,
200 },
201 Cancel {
203 request_id: String,
204 reply: oneshot::Sender<CancelResponse>,
205 },
206 Complete { request_id: String },
208 GetActiveCount {
210 reply: oneshot::Sender<usize>,
211 },
212}
213
214pub struct CancelManagerActor;
216
217impl ractor::Actor for CancelManagerActor {
218 type Msg = CancelManagerMsg;
219 type State = CancelManager;
220 type Arguments = ();
221
222 async fn pre_start(
223 &self,
224 _myself: ractor::ActorRef<Self::Msg>,
225 _args: Self::Arguments,
226 ) -> Result<Self::State, ractor::ActorProcessingErr> {
227 Ok(CancelManager::new())
228 }
229
230 async fn handle(
231 &self,
232 _myself: ractor::ActorRef<Self::Msg>,
233 message: Self::Msg,
234 state: &mut Self::State,
235 ) -> Result<(), ractor::ActorProcessingErr> {
236 match message {
237 CancelManagerMsg::Register { request_id, token } => {
238 state.register(request_id, token);
239 }
240 CancelManagerMsg::Cancel { request_id, reply } => {
241 let response = state.cancel(&request_id);
242 let _ = reply.send(response);
243 }
244 CancelManagerMsg::Complete { request_id } => {
245 state.remove(&request_id);
246 }
247 CancelManagerMsg::GetActiveCount { reply } => {
248 let _ = reply.send(state.active_count());
249 }
250 }
251 Ok(())
252 }
253}
254
255pub enum NodeDirectoryMsg {
261 ConnectPeer {
263 peer_id: NodeId,
264 address: Option<String>,
265 },
266 DisconnectPeer { peer_id: NodeId },
268 IsConnected {
270 peer_id: NodeId,
271 reply: oneshot::Sender<bool>,
272 },
273 GetPeerCount {
275 reply: oneshot::Sender<usize>,
276 },
277 GetConnectedCount {
279 reply: oneshot::Sender<usize>,
280 },
281 GetPeerInfo {
283 peer_id: NodeId,
284 reply: oneshot::Sender<Option<PeerInfo>>,
285 },
286}
287
288pub struct NodeDirectoryActor;
290
291impl ractor::Actor for NodeDirectoryActor {
292 type Msg = NodeDirectoryMsg;
293 type State = NodeDirectory;
294 type Arguments = ();
295
296 async fn pre_start(
297 &self,
298 _myself: ractor::ActorRef<Self::Msg>,
299 _args: Self::Arguments,
300 ) -> Result<Self::State, ractor::ActorProcessingErr> {
301 Ok(NodeDirectory::new())
302 }
303
304 async fn handle(
305 &self,
306 _myself: ractor::ActorRef<Self::Msg>,
307 message: Self::Msg,
308 state: &mut Self::State,
309 ) -> Result<(), ractor::ActorProcessingErr> {
310 match message {
311 NodeDirectoryMsg::ConnectPeer { peer_id, address } => {
312 if let Some(existing) = state.get_peer(&peer_id) {
313 let resolved = address.or_else(|| existing.address.clone());
314 state.remove_peer(&peer_id);
315 state.add_peer(peer_id.clone(), resolved);
316 } else {
317 state.add_peer(peer_id.clone(), address);
318 }
319 state.set_status(&peer_id, PeerStatus::Connected);
320 }
321 NodeDirectoryMsg::DisconnectPeer { peer_id } => {
322 state.set_status(&peer_id, PeerStatus::Disconnected);
323 }
324 NodeDirectoryMsg::IsConnected { peer_id, reply } => {
325 let _ = reply.send(state.is_connected(&peer_id));
326 }
327 NodeDirectoryMsg::GetPeerCount { reply } => {
328 let _ = reply.send(state.peer_count());
329 }
330 NodeDirectoryMsg::GetConnectedCount { reply } => {
331 let _ = reply.send(state.connected_count());
332 }
333 NodeDirectoryMsg::GetPeerInfo { peer_id, reply } => {
334 let _ = reply.send(state.get_peer(&peer_id).cloned());
335 }
336 }
337 Ok(())
338 }
339}