Skip to main content

atomr_remote/
system.rs

1//! `RemoteSystem` — convenience wrapper that builds and wires up the
2//! whole remoting stack on top of a `atomr_core::ActorSystem`.
3//!
4//! Most users hold one `RemoteSystem` per process. It owns:
5//!
6//! * the underlying `Transport` (default: `TcpTransport`),
7//! * the `AkkaProtocolTransport` handshake/heartbeat layer,
8//! * the `EndpointManager` association state machine,
9//! * the `RemoteSystemDaemon` for inbound dispatch,
10//! * the `RemoteWatcher` for cross-system death watch,
11//! * a `SerializerRegistry` and `AddressUid`.
12//!
13//! Spawn it with [`RemoteSystem::start`], register your message types with
14//! [`RemoteSystem::register_bincode::<MyMsg>()`], and then deliver a remote
15//! actor handle to local code with [`RemoteSystem::actor_selection`].
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use atomr_core::actor::{ActorPath, ActorRef, ActorSystem, Address, SerializedMessage, UntypedActorRef};
21
22use crate::address_uid::AddressUid;
23use crate::endpoint::InboundEnvelope;
24use crate::endpoint_manager::EndpointManager;
25use crate::pdu::DisassociateReason;
26use crate::provider::RemoteActorRefProvider;
27use crate::remote_watcher::RemoteWatcher;
28use crate::serialization::SerializerRegistry;
29use crate::settings::RemoteSettings;
30use crate::system_daemon::{LocalDispatch, RemoteSystemDaemon};
31use crate::transport::{AkkaProtocolTransport, TcpTransport, Transport};
32
33/// Returned by [`RemoteSystem::start`].
34pub struct RemoteSystem {
35    pub system: ActorSystem,
36    pub provider: Arc<RemoteActorRefProvider>,
37    pub daemon: Arc<RemoteSystemDaemon>,
38    pub watcher: Arc<RemoteWatcher>,
39    pub address_uid: AddressUid,
40    pub local_address: Address,
41}
42
43impl RemoteSystem {
44    /// Convenience: build a `TcpTransport` bound to `bind`, install it on
45    /// `system`, and return the wired [`RemoteSystem`].
46    pub async fn start(
47        system: ActorSystem,
48        bind: SocketAddr,
49        settings: RemoteSettings,
50    ) -> Result<Self, crate::transport::TransportError> {
51        let transport: Arc<dyn Transport> = Arc::new(TcpTransport::with_advertised(
52            system.name().to_string(),
53            bind,
54            settings.hostname.clone(),
55            settings.max_frame_size,
56        ));
57        Self::start_with_transport(system, transport, settings).await
58    }
59
60    pub async fn start_with_transport(
61        system: ActorSystem,
62        transport: Arc<dyn Transport>,
63        settings: RemoteSettings,
64    ) -> Result<Self, crate::transport::TransportError> {
65        let address_uid = AddressUid::new();
66        let protocol = AkkaProtocolTransport::new(transport, settings.clone(), address_uid.clone());
67        let endpoint_manager = EndpointManager::new(protocol.clone(), settings.clone());
68        let local_address = endpoint_manager.start().await?;
69
70        let registry = SerializerRegistry::standard();
71        let local_uid = address_uid.get();
72        let daemon =
73            RemoteSystemDaemon::new(system.clone(), registry.clone(), endpoint_manager.clone(), local_uid);
74        let watcher = RemoteWatcher::new(endpoint_manager.clone(), registry.clone(), local_uid);
75
76        // Drain the manager's inbound stream into the daemon dispatcher.
77        let mut inbound = endpoint_manager.take_inbound();
78        let daemon_for_pump = daemon.clone();
79        tokio::spawn(async move {
80            while let Some(env) = inbound.recv().await {
81                handle_inbound(&daemon_for_pump, env);
82            }
83        });
84
85        let provider = RemoteActorRefProvider::new(
86            local_address.clone(),
87            local_uid,
88            endpoint_manager.clone(),
89            registry,
90            daemon.clone(),
91        );
92        provider.install(&system);
93
94        Ok(Self { system, provider, daemon, watcher, address_uid, local_address })
95    }
96
97    pub fn endpoint_manager(&self) -> &EndpointManager {
98        self.provider.endpoint_manager()
99    }
100
101    pub fn registry(&self) -> &SerializerRegistry {
102        self.provider.registry()
103    }
104
105    /// Register the bincode codec for `T`. Required for any user message
106    /// type that crosses the wire.
107    pub fn register_bincode<T>(&self)
108    where
109        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
110    {
111        self.registry().register_bincode::<T>();
112    }
113
114    /// Register the JSON codec for `T`.
115    pub fn register_json<T>(&self)
116    where
117        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
118    {
119        self.registry().register_json::<T>();
120    }
121
122    /// Register a local actor as the destination for inbound remote
123    /// messages addressed to its path. Caller must already have the
124    /// codec for `M` registered.
125    pub fn expose_actor<M>(&self, target: ActorRef<M>)
126    where
127        M: Send + 'static,
128    {
129        let target = target.clone();
130        let path = target.path().clone();
131        let dispatch: LocalDispatch = Arc::new(move |_p, _manifest, value| {
132            // Downcast to M and forward.
133            match value.downcast::<M>() {
134                Ok(m) => target.tell(*m),
135                Err(_) => {
136                    tracing::warn!(target = %target.path(), "remote msg type mismatch");
137                }
138            }
139        });
140        self.daemon.register(path, dispatch);
141    }
142
143    /// Look up a remote actor by full path string, returning a typed
144    /// `ActorRef<M>` with the bincode codec for `M`. Caller is responsible
145    /// for matching `M` to whatever the receiving side declares.
146    pub fn actor_selection<M>(&self, path: &str) -> Option<ActorRef<M>>
147    where
148        M: serde::Serialize + Send + 'static,
149    {
150        let endpoint_manager = self.endpoint_manager().clone();
151        let registry = self.registry().clone();
152        let local_uid = self.address_uid.get();
153        // Reject malformed paths up front so callers don't get a
154        // dangling `ActorRef<M>` whose underlying handle never resolves.
155        let _parsed = parse_actor_path(path)?;
156        let serialize: Arc<dyn Fn(M, Option<ActorPath>) -> SerializedMessage + Send + Sync> =
157            Arc::new(move |msg: M, sender: Option<ActorPath>| {
158                let manifest = std::any::type_name::<M>().to_string();
159                let payload =
160                    bincode::serde::encode_to_vec(&msg, bincode::config::standard()).unwrap_or_default();
161                SerializedMessage {
162                    serializer_id: crate::serialization::BINCODE_SERIALIZER_ID,
163                    manifest,
164                    payload,
165                    sender,
166                }
167            });
168        let _ = (registry, local_uid, endpoint_manager);
169        self.system.actor_selection_with(path, serialize)
170    }
171
172    /// Untyped variant — useful for system-message-only refs (e.g.
173    /// remote watchers).
174    pub fn actor_selection_untyped(&self, path: &str) -> Option<UntypedActorRef> {
175        self.system.actor_selection(path)
176    }
177
178    pub async fn shutdown(&self) {
179        let _ = self.endpoint_manager().shutdown().await;
180        self.daemon.clear();
181        let _ = DisassociateReason::Normal; // referenced for clarity
182    }
183}
184
185fn handle_inbound(daemon: &Arc<RemoteSystemDaemon>, inbound: InboundEnvelope) {
186    let env = inbound.envelope;
187    // Parse recipient_path → ActorPath. The dispatcher only needs the
188    // path-without-address segment to look up the local route.
189    let Some(path) = parse_actor_path(&env.recipient_path) else {
190        tracing::warn!(rec = %env.recipient_path, "could not parse recipient");
191        return;
192    };
193    if env.system {
194        // System-control payload — decode RemoteSystemMsg and dispatch.
195        match daemon.registry().decode_dyn(&env.manifest, env.serializer_id, &env.payload) {
196            Ok((value, _)) => {
197                if let Ok(msg) = value.downcast::<atomr_core::actor::RemoteSystemMsg>() {
198                    daemon.dispatch_system(&path, *msg);
199                }
200            }
201            Err(e) => {
202                tracing::warn!("system payload decode failed: {e}");
203            }
204        }
205    } else {
206        if let Err(e) = daemon.dispatch_user(&path, &env.manifest, env.serializer_id, &env.payload) {
207            tracing::warn!(rec = %env.recipient_path, "user payload dispatch failed: {e}");
208        }
209    }
210}
211
212fn parse_actor_path(s: &str) -> Option<ActorPath> {
213    let scheme_end = s.find("://")?;
214    let after = &s[scheme_end + 3..];
215    let split = after.find('/').unwrap_or(after.len());
216    let (addr_str, path_str) = (&s[..scheme_end + 3 + split], &after[split..]);
217    let address = Address::parse(addr_str)?;
218    let mut path = ActorPath::root(address);
219    for seg in path_str.split('/').filter(|x| !x.is_empty()) {
220        if let Some((name, uid)) = seg.split_once('#') {
221            path = path.child(name).with_uid(uid.parse().ok()?);
222        } else {
223            path = path.child(seg);
224        }
225    }
226    Some(path)
227}