Skip to main content

atomr_remote/
remote_ref.rs

1//! `RemoteActorRefImpl`.
2//!
3//! Concrete implementation of `atomr_core::actor::RemoteRef` that
4//! serializes outbound messages via the [`SerializerRegistry`] and ships
5//! them through an `EndpointManager`.
6
7use atomr_core::actor::{ActorPath, RemoteRef, RemoteSystemMsg, SerializedMessage};
8
9use crate::endpoint_manager::EndpointManager;
10use crate::envelope::RemoteEnvelope;
11use crate::serialization::{SerializerRegistry, SYSTEM_SERIALIZER_ID};
12
13pub struct RemoteActorRefImpl {
14    pub path: ActorPath,
15    pub endpoint_manager: EndpointManager,
16    pub registry: SerializerRegistry,
17    /// Local `ActorSystem` UID, written into `sender_uid` for replies.
18    pub local_uid: u64,
19}
20
21impl std::fmt::Debug for RemoteActorRefImpl {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        f.debug_struct("RemoteActorRefImpl").field("path", &self.path.to_string()).finish()
24    }
25}
26
27impl RemoteActorRefImpl {
28    pub fn new(
29        path: ActorPath,
30        endpoint_manager: EndpointManager,
31        registry: SerializerRegistry,
32        local_uid: u64,
33    ) -> Self {
34        Self { path, endpoint_manager, registry, local_uid }
35    }
36
37    fn target_address(&self) -> atomr_core::actor::Address {
38        self.path.address.clone()
39    }
40}
41
42impl RemoteRef for RemoteActorRefImpl {
43    fn path(&self) -> &ActorPath {
44        &self.path
45    }
46
47    fn tell_serialized(&self, msg: SerializedMessage) {
48        let env = RemoteEnvelope::user(
49            self.path.to_string(),
50            msg.sender.as_ref().map(|p| p.to_string()),
51            self.local_uid,
52            self.path.uid,
53            0, // seq_no assigned by writer
54            msg.serializer_id,
55            msg.manifest,
56            msg.payload,
57        );
58        let mgr = self.endpoint_manager.clone();
59        let target = self.target_address();
60        let metrics = mgr.metrics();
61        let bytes = env.payload.len();
62        tokio::spawn(async move {
63            match mgr.endpoint_for(&target).await {
64                Ok(handle) => {
65                    metrics.record_send(&target, bytes);
66                    handle.send(env);
67                }
68                Err(e) => {
69                    metrics.record_error(&target);
70                    tracing::warn!(target = %target, "remote tell failed: {e}");
71                }
72            }
73        });
74    }
75
76    fn tell_system(&self, msg: RemoteSystemMsg) {
77        // Encode the RemoteSystemMsg via the system serializer and send
78        // it as a Payload PDU with `system = true`.
79        let manifest = std::any::type_name::<RemoteSystemMsg>().to_string();
80        let bytes = match self.registry.encode_typed(&msg) {
81            Ok((_id, _m, b)) => b,
82            Err(e) => {
83                tracing::warn!("system msg encode failed: {e}");
84                return;
85            }
86        };
87        let env = RemoteEnvelope::system_msg(
88            self.path.to_string(),
89            self.local_uid,
90            self.path.uid,
91            0,
92            manifest,
93            bytes,
94        );
95        let mgr = self.endpoint_manager.clone();
96        let target = self.target_address();
97        let _ = SYSTEM_SERIALIZER_ID; // referenced for clarity
98        tokio::spawn(async move {
99            if let Ok(handle) = mgr.endpoint_for(&target).await {
100                handle.send_system(env);
101            }
102        });
103    }
104}