Skip to main content

atomr_remote/
system_daemon.rs

1//! `RemoteSystemDaemon` + `RemoteDeployer`.
2//!
3//! On the receiving side every inbound envelope addressed at
4//! `/remote/<system>@<host>:<port>/...` is dispatched here. The daemon
5//! resolves local actor paths under `/user`, decodes the payload via the
6//! [`SerializerRegistry`], and hands it to the appropriate user actor's
7//! mailbox.
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use atomr_core::actor::{ActorPath, ActorSystem, RemoteSystemMsg, UntypedActorRef};
13use parking_lot::RwLock;
14
15use crate::endpoint_manager::EndpointManager;
16use crate::serialization::{SerializeError, SerializerRegistry};
17
18/// Function that dispatches a decoded user-message payload to a local actor.
19pub type LocalDispatch = Arc<dyn Fn(&ActorPath, &str, Box<dyn std::any::Any + Send>) + Send + Sync>;
20
21#[derive(Clone)]
22pub struct RemoteSystemDaemon {
23    inner: Arc<RemoteSystemDaemonInner>,
24}
25
26struct RemoteSystemDaemonInner {
27    system: ActorSystem,
28    registry: SerializerRegistry,
29    endpoint_manager: EndpointManager,
30    local_uid: u64,
31    routes: RwLock<HashMap<String, LocalDispatch>>,
32    /// Path → list of remote watchers that should receive `Terminated`.
33    remote_watchers: RwLock<HashMap<String, Vec<UntypedActorRef>>>,
34}
35
36impl RemoteSystemDaemon {
37    pub fn new(
38        system: ActorSystem,
39        registry: SerializerRegistry,
40        endpoint_manager: EndpointManager,
41        local_uid: u64,
42    ) -> Arc<Self> {
43        Arc::new(Self {
44            inner: Arc::new(RemoteSystemDaemonInner {
45                system,
46                registry,
47                endpoint_manager,
48                local_uid,
49                routes: RwLock::new(HashMap::new()),
50                remote_watchers: RwLock::new(HashMap::new()),
51            }),
52        })
53    }
54
55    pub fn registry(&self) -> &SerializerRegistry {
56        &self.inner.registry
57    }
58
59    pub fn system(&self) -> &ActorSystem {
60        &self.inner.system
61    }
62
63    /// Register a dispatcher for inbound messages addressed to `path`.
64    pub fn register(&self, path: ActorPath, dispatch: LocalDispatch) {
65        self.inner.routes.write().insert(path.to_string_without_address(), dispatch);
66    }
67
68    pub fn unregister(&self, path: &ActorPath) {
69        self.inner.routes.write().remove(&path.to_string_without_address());
70    }
71
72    pub fn clear(&self) {
73        self.inner.routes.write().clear();
74    }
75
76    pub fn dispatch_user(
77        &self,
78        path: &ActorPath,
79        manifest: &str,
80        serializer_id: u32,
81        bytes: &[u8],
82    ) -> Result<(), SerializeError> {
83        let routes = self.inner.routes.read();
84        let key = path.to_string_without_address();
85        let Some(dispatch) = routes.get(&key).cloned() else {
86            tracing::debug!(path = %path, "no remote route registered");
87            return Ok(());
88        };
89        drop(routes);
90        let (value, _codec) = self.inner.registry.decode_dyn(manifest, serializer_id, bytes)?;
91        dispatch(path, manifest, value);
92        Ok(())
93    }
94
95    pub fn dispatch_system(&self, path: &ActorPath, msg: RemoteSystemMsg) {
96        match msg {
97            RemoteSystemMsg::Stop => {
98                if let Some(untyped) = self.inner.system.actor_selection(&path.to_string()) {
99                    untyped.stop();
100                }
101            }
102            RemoteSystemMsg::Watch { watcher } => {
103                let proxy = crate::remote_watcher::RemoteWatcherProxy::new(
104                    watcher.clone(),
105                    self.inner.endpoint_manager.clone(),
106                    self.inner.registry.clone(),
107                    self.inner.local_uid,
108                );
109                self.inner
110                    .remote_watchers
111                    .write()
112                    .entry(path.to_string_without_address())
113                    .or_default()
114                    .push(UntypedActorRef::from_remote(Arc::new(proxy)));
115            }
116            RemoteSystemMsg::Unwatch { watcher } => {
117                let mut g = self.inner.remote_watchers.write();
118                if let Some(list) = g.get_mut(&path.to_string_without_address()) {
119                    list.retain(|w| w.path() != &watcher);
120                }
121            }
122            RemoteSystemMsg::Terminated { actor: _ } => {
123                // Surfaced to the local watching actor by the dispatcher
124                // path that delivered this PDU; nothing extra here.
125            }
126        }
127    }
128
129    /// Notify all remote watchers of `path` that the actor has stopped.
130    pub fn notify_terminated(&self, path: &ActorPath) {
131        let mut g = self.inner.remote_watchers.write();
132        let key = path.to_string_without_address();
133        let Some(watchers) = g.remove(&key) else { return };
134        drop(g);
135        for w in watchers {
136            w.notify_watchers(path.clone());
137        }
138    }
139}
140
141/// `RemoteDeployer` ships a `Props`-equivalent payload (manifest+bytes)
142/// to a remote peer's daemon for remote actor creation.
143pub struct RemoteDeployer {
144    pub endpoint_manager: EndpointManager,
145}
146
147impl RemoteDeployer {
148    pub fn new(endpoint_manager: EndpointManager) -> Self {
149        Self { endpoint_manager }
150    }
151
152    pub async fn deploy(
153        &self,
154        target_address: atomr_core::actor::Address,
155        path: ActorPath,
156        manifest: String,
157        bytes: Vec<u8>,
158    ) -> Result<ActorPath, crate::transport::TransportError> {
159        let env = crate::envelope::RemoteEnvelope::user(
160            format!("{}/remote/__deploy__", target_address),
161            None,
162            0,
163            0,
164            0,
165            crate::serialization::BINCODE_SERIALIZER_ID,
166            manifest,
167            bytes,
168        );
169        let handle = self.endpoint_manager.endpoint_for(&target_address).await?;
170        handle.send(env);
171        Ok(path)
172    }
173}