atomr_remote/
remote_ref.rs1use atomr_core::actor::{ActorPath, RemoteRef, RemoteSystemMsg, SerializedMessage};
8
9use crate::endpoint_manager::EndpointManager;
10use crate::envelope::RemoteEnvelope;
11use crate::serialization::{SerializerRegistry, SYSTEM_SERIALIZER_ID};
12
13pub struct RemoteActorRefImpl {
14 pub path: ActorPath,
15 pub endpoint_manager: EndpointManager,
16 pub registry: SerializerRegistry,
17 pub local_uid: u64,
19}
20
21impl std::fmt::Debug for RemoteActorRefImpl {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 f.debug_struct("RemoteActorRefImpl").field("path", &self.path.to_string()).finish()
24 }
25}
26
27impl RemoteActorRefImpl {
28 pub fn new(
29 path: ActorPath,
30 endpoint_manager: EndpointManager,
31 registry: SerializerRegistry,
32 local_uid: u64,
33 ) -> Self {
34 Self { path, endpoint_manager, registry, local_uid }
35 }
36
37 fn target_address(&self) -> atomr_core::actor::Address {
38 self.path.address.clone()
39 }
40}
41
42impl RemoteRef for RemoteActorRefImpl {
43 fn path(&self) -> &ActorPath {
44 &self.path
45 }
46
47 fn tell_serialized(&self, msg: SerializedMessage) {
48 let env = RemoteEnvelope::user(
49 self.path.to_string(),
50 msg.sender.as_ref().map(|p| p.to_string()),
51 self.local_uid,
52 self.path.uid,
53 0, msg.serializer_id,
55 msg.manifest,
56 msg.payload,
57 );
58 let mgr = self.endpoint_manager.clone();
59 let target = self.target_address();
60 let metrics = mgr.metrics();
61 let bytes = env.payload.len();
62 tokio::spawn(async move {
63 match mgr.endpoint_for(&target).await {
64 Ok(handle) => {
65 metrics.record_send(&target, bytes);
66 handle.send(env);
67 }
68 Err(e) => {
69 metrics.record_error(&target);
70 tracing::warn!(target = %target, "remote tell failed: {e}");
71 }
72 }
73 });
74 }
75
76 fn tell_system(&self, msg: RemoteSystemMsg) {
77 let manifest = std::any::type_name::<RemoteSystemMsg>().to_string();
80 let bytes = match self.registry.encode_typed(&msg) {
81 Ok((_id, _m, b)) => b,
82 Err(e) => {
83 tracing::warn!("system msg encode failed: {e}");
84 return;
85 }
86 };
87 let env = RemoteEnvelope::system_msg(
88 self.path.to_string(),
89 self.local_uid,
90 self.path.uid,
91 0,
92 manifest,
93 bytes,
94 );
95 let mgr = self.endpoint_manager.clone();
96 let target = self.target_address();
97 let _ = SYSTEM_SERIALIZER_ID; tokio::spawn(async move {
99 if let Ok(handle) = mgr.endpoint_for(&target).await {
100 handle.send_system(env);
101 }
102 });
103 }
104}