atomr_remote/
system_daemon.rs1use std::collections::HashMap;
10use std::sync::Arc;
11
12use atomr_core::actor::{ActorPath, ActorSystem, RemoteSystemMsg, UntypedActorRef};
13use parking_lot::RwLock;
14
15use crate::endpoint_manager::EndpointManager;
16use crate::serialization::{SerializeError, SerializerRegistry};
17
18pub type LocalDispatch = Arc<dyn Fn(&ActorPath, &str, Box<dyn std::any::Any + Send>) + Send + Sync>;
20
21#[derive(Clone)]
22pub struct RemoteSystemDaemon {
23 inner: Arc<RemoteSystemDaemonInner>,
24}
25
26struct RemoteSystemDaemonInner {
27 system: ActorSystem,
28 registry: SerializerRegistry,
29 endpoint_manager: EndpointManager,
30 local_uid: u64,
31 routes: RwLock<HashMap<String, LocalDispatch>>,
32 remote_watchers: RwLock<HashMap<String, Vec<UntypedActorRef>>>,
34}
35
36impl RemoteSystemDaemon {
37 pub fn new(
38 system: ActorSystem,
39 registry: SerializerRegistry,
40 endpoint_manager: EndpointManager,
41 local_uid: u64,
42 ) -> Arc<Self> {
43 Arc::new(Self {
44 inner: Arc::new(RemoteSystemDaemonInner {
45 system,
46 registry,
47 endpoint_manager,
48 local_uid,
49 routes: RwLock::new(HashMap::new()),
50 remote_watchers: RwLock::new(HashMap::new()),
51 }),
52 })
53 }
54
55 pub fn registry(&self) -> &SerializerRegistry {
56 &self.inner.registry
57 }
58
59 pub fn system(&self) -> &ActorSystem {
60 &self.inner.system
61 }
62
63 pub fn register(&self, path: ActorPath, dispatch: LocalDispatch) {
65 self.inner.routes.write().insert(path.to_string_without_address(), dispatch);
66 }
67
68 pub fn unregister(&self, path: &ActorPath) {
69 self.inner.routes.write().remove(&path.to_string_without_address());
70 }
71
72 pub fn clear(&self) {
73 self.inner.routes.write().clear();
74 }
75
76 pub fn dispatch_user(
77 &self,
78 path: &ActorPath,
79 manifest: &str,
80 serializer_id: u32,
81 bytes: &[u8],
82 ) -> Result<(), SerializeError> {
83 let routes = self.inner.routes.read();
84 let key = path.to_string_without_address();
85 let Some(dispatch) = routes.get(&key).cloned() else {
86 tracing::debug!(path = %path, "no remote route registered");
87 return Ok(());
88 };
89 drop(routes);
90 let (value, _codec) = self.inner.registry.decode_dyn(manifest, serializer_id, bytes)?;
91 dispatch(path, manifest, value);
92 Ok(())
93 }
94
95 pub fn dispatch_system(&self, path: &ActorPath, msg: RemoteSystemMsg) {
96 match msg {
97 RemoteSystemMsg::Stop => {
98 if let Some(untyped) = self.inner.system.actor_selection(&path.to_string()) {
99 untyped.stop();
100 }
101 }
102 RemoteSystemMsg::Watch { watcher } => {
103 let proxy = crate::remote_watcher::RemoteWatcherProxy::new(
104 watcher.clone(),
105 self.inner.endpoint_manager.clone(),
106 self.inner.registry.clone(),
107 self.inner.local_uid,
108 );
109 self.inner
110 .remote_watchers
111 .write()
112 .entry(path.to_string_without_address())
113 .or_default()
114 .push(UntypedActorRef::from_remote(Arc::new(proxy)));
115 }
116 RemoteSystemMsg::Unwatch { watcher } => {
117 let mut g = self.inner.remote_watchers.write();
118 if let Some(list) = g.get_mut(&path.to_string_without_address()) {
119 list.retain(|w| w.path() != &watcher);
120 }
121 }
122 RemoteSystemMsg::Terminated { actor: _ } => {
123 }
126 }
127 }
128
129 pub fn notify_terminated(&self, path: &ActorPath) {
131 let mut g = self.inner.remote_watchers.write();
132 let key = path.to_string_without_address();
133 let Some(watchers) = g.remove(&key) else { return };
134 drop(g);
135 for w in watchers {
136 w.notify_watchers(path.clone());
137 }
138 }
139}
140
141pub struct RemoteDeployer {
144 pub endpoint_manager: EndpointManager,
145}
146
147impl RemoteDeployer {
148 pub fn new(endpoint_manager: EndpointManager) -> Self {
149 Self { endpoint_manager }
150 }
151
152 pub async fn deploy(
153 &self,
154 target_address: atomr_core::actor::Address,
155 path: ActorPath,
156 manifest: String,
157 bytes: Vec<u8>,
158 ) -> Result<ActorPath, crate::transport::TransportError> {
159 let env = crate::envelope::RemoteEnvelope::user(
160 format!("{}/remote/__deploy__", target_address),
161 None,
162 0,
163 0,
164 0,
165 crate::serialization::BINCODE_SERIALIZER_ID,
166 manifest,
167 bytes,
168 );
169 let handle = self.endpoint_manager.endpoint_for(&target_address).await?;
170 handle.send(env);
171 Ok(path)
172 }
173}