Skip to main content

atomr_remote/
system_daemon.rs

1//! `RemoteSystemDaemon` + `RemoteDeployer`.
2//! akka.net: `Remote/RemoteSystemDaemon.cs`, `Remote/RemoteDeployer.cs`,
3//! `Remote/RemoteDeploymentWatcher.cs`.
4//!
5//! On the receiving side every inbound envelope addressed at
6//! `/remote/<system>@<host>:<port>/...` is dispatched here. The daemon
7//! resolves local actor paths under `/user`, decodes the payload via the
8//! [`SerializerRegistry`], and hands it to the appropriate user actor's
9//! mailbox.
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use atomr_core::actor::{ActorPath, ActorSystem, RemoteSystemMsg, UntypedActorRef};
15use parking_lot::RwLock;
16
17use crate::endpoint_manager::EndpointManager;
18use crate::serialization::{SerializeError, SerializerRegistry};
19
20/// Function that dispatches a decoded user-message payload to a local actor.
21pub type LocalDispatch = Arc<dyn Fn(&ActorPath, &str, Box<dyn std::any::Any + Send>) + Send + Sync>;
22
23#[derive(Clone)]
24pub struct RemoteSystemDaemon {
25    inner: Arc<RemoteSystemDaemonInner>,
26}
27
28struct RemoteSystemDaemonInner {
29    system: ActorSystem,
30    registry: SerializerRegistry,
31    endpoint_manager: EndpointManager,
32    local_uid: u64,
33    routes: RwLock<HashMap<String, LocalDispatch>>,
34    /// Path → list of remote watchers that should receive `Terminated`.
35    remote_watchers: RwLock<HashMap<String, Vec<UntypedActorRef>>>,
36}
37
38impl RemoteSystemDaemon {
39    pub fn new(
40        system: ActorSystem,
41        registry: SerializerRegistry,
42        endpoint_manager: EndpointManager,
43        local_uid: u64,
44    ) -> Arc<Self> {
45        Arc::new(Self {
46            inner: Arc::new(RemoteSystemDaemonInner {
47                system,
48                registry,
49                endpoint_manager,
50                local_uid,
51                routes: RwLock::new(HashMap::new()),
52                remote_watchers: RwLock::new(HashMap::new()),
53            }),
54        })
55    }
56
57    pub fn registry(&self) -> &SerializerRegistry {
58        &self.inner.registry
59    }
60
61    pub fn system(&self) -> &ActorSystem {
62        &self.inner.system
63    }
64
65    /// Register a dispatcher for inbound messages addressed to `path`.
66    pub fn register(&self, path: ActorPath, dispatch: LocalDispatch) {
67        self.inner.routes.write().insert(path.to_string_without_address(), dispatch);
68    }
69
70    pub fn unregister(&self, path: &ActorPath) {
71        self.inner.routes.write().remove(&path.to_string_without_address());
72    }
73
74    pub fn clear(&self) {
75        self.inner.routes.write().clear();
76    }
77
78    pub fn dispatch_user(
79        &self,
80        path: &ActorPath,
81        manifest: &str,
82        serializer_id: u32,
83        bytes: &[u8],
84    ) -> Result<(), SerializeError> {
85        let routes = self.inner.routes.read();
86        let key = path.to_string_without_address();
87        let Some(dispatch) = routes.get(&key).cloned() else {
88            tracing::debug!(path = %path, "no remote route registered");
89            return Ok(());
90        };
91        drop(routes);
92        let (value, _codec) = self.inner.registry.decode_dyn(manifest, serializer_id, bytes)?;
93        dispatch(path, manifest, value);
94        Ok(())
95    }
96
97    pub fn dispatch_system(&self, path: &ActorPath, msg: RemoteSystemMsg) {
98        match msg {
99            RemoteSystemMsg::Stop => {
100                if let Some(untyped) = self.inner.system.actor_selection(&path.to_string()) {
101                    untyped.stop();
102                }
103            }
104            RemoteSystemMsg::Watch { watcher } => {
105                let proxy = crate::remote_watcher::RemoteWatcherProxy::new(
106                    watcher.clone(),
107                    self.inner.endpoint_manager.clone(),
108                    self.inner.registry.clone(),
109                    self.inner.local_uid,
110                );
111                self.inner
112                    .remote_watchers
113                    .write()
114                    .entry(path.to_string_without_address())
115                    .or_default()
116                    .push(UntypedActorRef::from_remote(Arc::new(proxy)));
117            }
118            RemoteSystemMsg::Unwatch { watcher } => {
119                let mut g = self.inner.remote_watchers.write();
120                if let Some(list) = g.get_mut(&path.to_string_without_address()) {
121                    list.retain(|w| w.path() != &watcher);
122                }
123            }
124            RemoteSystemMsg::Terminated { actor: _ } => {
125                // Surfaced to the local watching actor by the dispatcher
126                // path that delivered this PDU; nothing extra here.
127            }
128        }
129    }
130
131    /// Notify all remote watchers of `path` that the actor has stopped.
132    pub fn notify_terminated(&self, path: &ActorPath) {
133        let mut g = self.inner.remote_watchers.write();
134        let key = path.to_string_without_address();
135        let Some(watchers) = g.remove(&key) else { return };
136        drop(g);
137        for w in watchers {
138            w.notify_watchers(path.clone());
139        }
140    }
141}
142
143/// `RemoteDeployer` ships a `Props`-equivalent payload (manifest+bytes)
144/// to a remote peer's daemon for remote actor creation.
145pub struct RemoteDeployer {
146    pub endpoint_manager: EndpointManager,
147}
148
149impl RemoteDeployer {
150    pub fn new(endpoint_manager: EndpointManager) -> Self {
151        Self { endpoint_manager }
152    }
153
154    pub async fn deploy(
155        &self,
156        target_address: atomr_core::actor::Address,
157        path: ActorPath,
158        manifest: String,
159        bytes: Vec<u8>,
160    ) -> Result<ActorPath, crate::transport::TransportError> {
161        let env = crate::envelope::RemoteEnvelope::user(
162            format!("{}/remote/__deploy__", target_address),
163            None,
164            0,
165            0,
166            0,
167            crate::serialization::BINCODE_SERIALIZER_ID,
168            manifest,
169            bytes,
170        );
171        let handle = self.endpoint_manager.endpoint_for(&target_address).await?;
172        handle.send(env);
173        Ok(path)
174    }
175}