atomr_remote/
system_daemon.rs1use std::collections::HashMap;
12use std::sync::Arc;
13
14use atomr_core::actor::{ActorPath, ActorSystem, RemoteSystemMsg, UntypedActorRef};
15use parking_lot::RwLock;
16
17use crate::endpoint_manager::EndpointManager;
18use crate::serialization::{SerializeError, SerializerRegistry};
19
20pub type LocalDispatch = Arc<dyn Fn(&ActorPath, &str, Box<dyn std::any::Any + Send>) + Send + Sync>;
22
23#[derive(Clone)]
24pub struct RemoteSystemDaemon {
25 inner: Arc<RemoteSystemDaemonInner>,
26}
27
28struct RemoteSystemDaemonInner {
29 system: ActorSystem,
30 registry: SerializerRegistry,
31 endpoint_manager: EndpointManager,
32 local_uid: u64,
33 routes: RwLock<HashMap<String, LocalDispatch>>,
34 remote_watchers: RwLock<HashMap<String, Vec<UntypedActorRef>>>,
36}
37
38impl RemoteSystemDaemon {
39 pub fn new(
40 system: ActorSystem,
41 registry: SerializerRegistry,
42 endpoint_manager: EndpointManager,
43 local_uid: u64,
44 ) -> Arc<Self> {
45 Arc::new(Self {
46 inner: Arc::new(RemoteSystemDaemonInner {
47 system,
48 registry,
49 endpoint_manager,
50 local_uid,
51 routes: RwLock::new(HashMap::new()),
52 remote_watchers: RwLock::new(HashMap::new()),
53 }),
54 })
55 }
56
57 pub fn registry(&self) -> &SerializerRegistry {
58 &self.inner.registry
59 }
60
61 pub fn system(&self) -> &ActorSystem {
62 &self.inner.system
63 }
64
65 pub fn register(&self, path: ActorPath, dispatch: LocalDispatch) {
67 self.inner.routes.write().insert(path.to_string_without_address(), dispatch);
68 }
69
70 pub fn unregister(&self, path: &ActorPath) {
71 self.inner.routes.write().remove(&path.to_string_without_address());
72 }
73
74 pub fn clear(&self) {
75 self.inner.routes.write().clear();
76 }
77
78 pub fn dispatch_user(
79 &self,
80 path: &ActorPath,
81 manifest: &str,
82 serializer_id: u32,
83 bytes: &[u8],
84 ) -> Result<(), SerializeError> {
85 let routes = self.inner.routes.read();
86 let key = path.to_string_without_address();
87 let Some(dispatch) = routes.get(&key).cloned() else {
88 tracing::debug!(path = %path, "no remote route registered");
89 return Ok(());
90 };
91 drop(routes);
92 let (value, _codec) = self.inner.registry.decode_dyn(manifest, serializer_id, bytes)?;
93 dispatch(path, manifest, value);
94 Ok(())
95 }
96
97 pub fn dispatch_system(&self, path: &ActorPath, msg: RemoteSystemMsg) {
98 match msg {
99 RemoteSystemMsg::Stop => {
100 if let Some(untyped) = self.inner.system.actor_selection(&path.to_string()) {
101 untyped.stop();
102 }
103 }
104 RemoteSystemMsg::Watch { watcher } => {
105 let proxy = crate::remote_watcher::RemoteWatcherProxy::new(
106 watcher.clone(),
107 self.inner.endpoint_manager.clone(),
108 self.inner.registry.clone(),
109 self.inner.local_uid,
110 );
111 self.inner
112 .remote_watchers
113 .write()
114 .entry(path.to_string_without_address())
115 .or_default()
116 .push(UntypedActorRef::from_remote(Arc::new(proxy)));
117 }
118 RemoteSystemMsg::Unwatch { watcher } => {
119 let mut g = self.inner.remote_watchers.write();
120 if let Some(list) = g.get_mut(&path.to_string_without_address()) {
121 list.retain(|w| w.path() != &watcher);
122 }
123 }
124 RemoteSystemMsg::Terminated { actor: _ } => {
125 }
128 }
129 }
130
131 pub fn notify_terminated(&self, path: &ActorPath) {
133 let mut g = self.inner.remote_watchers.write();
134 let key = path.to_string_without_address();
135 let Some(watchers) = g.remove(&key) else { return };
136 drop(g);
137 for w in watchers {
138 w.notify_watchers(path.clone());
139 }
140 }
141}
142
143pub struct RemoteDeployer {
146 pub endpoint_manager: EndpointManager,
147}
148
149impl RemoteDeployer {
150 pub fn new(endpoint_manager: EndpointManager) -> Self {
151 Self { endpoint_manager }
152 }
153
154 pub async fn deploy(
155 &self,
156 target_address: atomr_core::actor::Address,
157 path: ActorPath,
158 manifest: String,
159 bytes: Vec<u8>,
160 ) -> Result<ActorPath, crate::transport::TransportError> {
161 let env = crate::envelope::RemoteEnvelope::user(
162 format!("{}/remote/__deploy__", target_address),
163 None,
164 0,
165 0,
166 0,
167 crate::serialization::BINCODE_SERIALIZER_ID,
168 manifest,
169 bytes,
170 );
171 let handle = self.endpoint_manager.endpoint_for(&target_address).await?;
172 handle.send(env);
173 Ok(path)
174 }
175}