1use std::net::SocketAddr;
18use std::sync::Arc;
19
20use atomr_core::actor::{ActorPath, ActorRef, ActorSystem, Address, SerializedMessage, UntypedActorRef};
21
22use crate::address_uid::AddressUid;
23use crate::endpoint::InboundEnvelope;
24use crate::endpoint_manager::EndpointManager;
25use crate::pdu::DisassociateReason;
26use crate::provider::RemoteActorRefProvider;
27use crate::remote_watcher::RemoteWatcher;
28use crate::serialization::SerializerRegistry;
29use crate::settings::RemoteSettings;
30use crate::system_daemon::{LocalDispatch, RemoteSystemDaemon};
31use crate::transport::{AkkaProtocolTransport, TcpTransport, Transport};
32
33pub struct RemoteSystem {
35 pub system: ActorSystem,
36 pub provider: Arc<RemoteActorRefProvider>,
37 pub daemon: Arc<RemoteSystemDaemon>,
38 pub watcher: Arc<RemoteWatcher>,
39 pub address_uid: AddressUid,
40 pub local_address: Address,
41}
42
43impl RemoteSystem {
44 pub async fn start(
47 system: ActorSystem,
48 bind: SocketAddr,
49 settings: RemoteSettings,
50 ) -> Result<Self, crate::transport::TransportError> {
51 let transport: Arc<dyn Transport> = Arc::new(TcpTransport::with_advertised(
52 system.name().to_string(),
53 bind,
54 settings.hostname.clone(),
55 settings.max_frame_size,
56 ));
57 Self::start_with_transport(system, transport, settings).await
58 }
59
60 pub async fn start_with_transport(
61 system: ActorSystem,
62 transport: Arc<dyn Transport>,
63 settings: RemoteSettings,
64 ) -> Result<Self, crate::transport::TransportError> {
65 let address_uid = AddressUid::new();
66 let protocol = AkkaProtocolTransport::new(transport, settings.clone(), address_uid.clone());
67 let endpoint_manager = EndpointManager::new(protocol.clone(), settings.clone());
68 let local_address = endpoint_manager.start().await?;
69
70 let registry = SerializerRegistry::standard();
71 let local_uid = address_uid.get();
72 let daemon =
73 RemoteSystemDaemon::new(system.clone(), registry.clone(), endpoint_manager.clone(), local_uid);
74 let watcher = RemoteWatcher::new(endpoint_manager.clone(), registry.clone(), local_uid);
75
76 let mut inbound = endpoint_manager.take_inbound();
78 let daemon_for_pump = daemon.clone();
79 tokio::spawn(async move {
80 while let Some(env) = inbound.recv().await {
81 handle_inbound(&daemon_for_pump, env);
82 }
83 });
84
85 let provider = RemoteActorRefProvider::new(
86 local_address.clone(),
87 local_uid,
88 endpoint_manager.clone(),
89 registry,
90 daemon.clone(),
91 );
92 provider.install(&system);
93
94 Ok(Self { system, provider, daemon, watcher, address_uid, local_address })
95 }
96
97 pub fn endpoint_manager(&self) -> &EndpointManager {
98 self.provider.endpoint_manager()
99 }
100
101 pub fn registry(&self) -> &SerializerRegistry {
102 self.provider.registry()
103 }
104
105 pub fn register_bincode<T>(&self)
108 where
109 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
110 {
111 self.registry().register_bincode::<T>();
112 }
113
114 pub fn register_json<T>(&self)
116 where
117 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
118 {
119 self.registry().register_json::<T>();
120 }
121
122 pub fn expose_actor<M>(&self, target: ActorRef<M>)
126 where
127 M: Send + 'static,
128 {
129 let target = target.clone();
130 let path = target.path().clone();
131 let dispatch: LocalDispatch = Arc::new(move |_p, _manifest, value| {
132 match value.downcast::<M>() {
134 Ok(m) => target.tell(*m),
135 Err(_) => {
136 tracing::warn!(target = %target.path(), "remote msg type mismatch");
137 }
138 }
139 });
140 self.daemon.register(path, dispatch);
141 }
142
143 pub fn actor_selection<M>(&self, path: &str) -> Option<ActorRef<M>>
147 where
148 M: serde::Serialize + Send + 'static,
149 {
150 let endpoint_manager = self.endpoint_manager().clone();
151 let registry = self.registry().clone();
152 let local_uid = self.address_uid.get();
153 let _parsed = parse_actor_path(path)?;
156 let serialize: Arc<dyn Fn(M, Option<ActorPath>) -> SerializedMessage + Send + Sync> =
157 Arc::new(move |msg: M, sender: Option<ActorPath>| {
158 let manifest = std::any::type_name::<M>().to_string();
159 let payload =
160 bincode::serde::encode_to_vec(&msg, bincode::config::standard()).unwrap_or_default();
161 SerializedMessage {
162 serializer_id: crate::serialization::BINCODE_SERIALIZER_ID,
163 manifest,
164 payload,
165 sender,
166 }
167 });
168 let _ = (registry, local_uid, endpoint_manager);
169 self.system.actor_selection_with(path, serialize)
170 }
171
172 pub fn actor_selection_untyped(&self, path: &str) -> Option<UntypedActorRef> {
175 self.system.actor_selection(path)
176 }
177
178 pub async fn shutdown(&self) {
179 let _ = self.endpoint_manager().shutdown().await;
180 self.daemon.clear();
181 let _ = DisassociateReason::Normal; }
183}
184
185fn handle_inbound(daemon: &Arc<RemoteSystemDaemon>, inbound: InboundEnvelope) {
186 let env = inbound.envelope;
187 let Some(path) = parse_actor_path(&env.recipient_path) else {
190 tracing::warn!(rec = %env.recipient_path, "could not parse recipient");
191 return;
192 };
193 if env.system {
194 match daemon.registry().decode_dyn(&env.manifest, env.serializer_id, &env.payload) {
196 Ok((value, _)) => {
197 if let Ok(msg) = value.downcast::<atomr_core::actor::RemoteSystemMsg>() {
198 daemon.dispatch_system(&path, *msg);
199 }
200 }
201 Err(e) => {
202 tracing::warn!("system payload decode failed: {e}");
203 }
204 }
205 } else {
206 if let Err(e) = daemon.dispatch_user(&path, &env.manifest, env.serializer_id, &env.payload) {
207 tracing::warn!(rec = %env.recipient_path, "user payload dispatch failed: {e}");
208 }
209 }
210}
211
212fn parse_actor_path(s: &str) -> Option<ActorPath> {
213 let scheme_end = s.find("://")?;
214 let after = &s[scheme_end + 3..];
215 let split = after.find('/').unwrap_or(after.len());
216 let (addr_str, path_str) = (&s[..scheme_end + 3 + split], &after[split..]);
217 let address = Address::parse(addr_str)?;
218 let mut path = ActorPath::root(address);
219 for seg in path_str.split('/').filter(|x| !x.is_empty()) {
220 if let Some((name, uid)) = seg.split_once('#') {
221 path = path.child(name).with_uid(uid.parse().ok()?);
222 } else {
223 path = path.child(seg);
224 }
225 }
226 Some(path)
227}