Skip to main content

actix_telepathy/remote/addr/
mod.rs

1use std::hash::{Hash, Hasher};
2use std::net::SocketAddr;
3use std::str::FromStr;
4
5use actix::prelude::*;
6use serde::{Deserialize, Serialize};
7
8use crate::codec::ClusterMessage;
9use crate::remote::{AddrRepresentation, RemoteMessage, RemoteWrapper};
10use crate::{NetworkInterface, WrappedClusterMessage};
11use actix::dev::ToEnvelope;
12
13pub use self::node::Node;
14
15pub mod node;
16pub mod resolver;
17#[cfg(test)]
18mod tests;
19
20/// Similar to actix::prelude::Addr but supports communication to remote actors on other nodes.
21#[derive(Deserialize, Serialize, Debug)]
22pub struct RemoteAddr {
23    pub node: Node,
24    pub(crate) id: AddrRepresentation,
25}
26
27impl RemoteAddr {
28    pub fn new(node: Node, id: AddrRepresentation) -> Self {
29        RemoteAddr { node, id }
30    }
31
32    pub fn new_from_id(socket_addr: SocketAddr, id: &str) -> Self {
33        RemoteAddr {
34            node: Node::new(socket_addr, None),
35            id: AddrRepresentation::from_str(id).unwrap(),
36        }
37    }
38
39    pub fn new_from_key(
40        socket_addr: SocketAddr,
41        network_interface: Addr<NetworkInterface>,
42        id: &str,
43    ) -> Self {
44        RemoteAddr {
45            node: Node::new(socket_addr, Some(network_interface)),
46            id: AddrRepresentation::from_str(id).unwrap(),
47        }
48    }
49
50    pub fn new_connector(
51        socket_addr: SocketAddr,
52        network_interface: Option<Addr<NetworkInterface>>,
53    ) -> Self {
54        RemoteAddr::new(
55            Node::new(socket_addr, network_interface),
56            AddrRepresentation::Connector,
57        )
58    }
59
60    pub fn set_network_interface(&mut self, network_interface: Addr<NetworkInterface>) {
61        self.node.network_interface = Some(network_interface);
62    }
63
64    pub fn change_id(&mut self, id: String) {
65        self.id = AddrRepresentation::Key(id);
66    }
67
68    pub fn do_send<T: RemoteMessage + Serialize>(&self, msg: T) {
69        self.node
70            .network_interface
71            .as_ref()
72            .expect("Network interface must be set!")
73            .do_send(ClusterMessage::Message(RemoteWrapper::new(
74                self.clone(),
75                msg,
76                None,
77            )));
78    }
79
80    pub fn try_send<T: RemoteMessage + Serialize>(
81        &self,
82        _msg: T,
83    ) -> RecipientRequest<ClusterMessage> {
84        unimplemented!("So far, it is not possible to use this method!")
85    }
86
87    pub fn send<T: RemoteMessage + Serialize>(&self, _msg: T) {
88        unimplemented!(
89            "So far, it is not possible to receive responses from remote destinations as futures!"
90        )
91    }
92
93    pub fn wait_send<T: RemoteMessage + Serialize>(
94        &self,
95        msg: T,
96    ) -> Request<NetworkInterface, WrappedClusterMessage> {
97        self.node
98            .network_interface
99            .as_ref()
100            .expect("Network interface must be set!")
101            .send(WrappedClusterMessage(ClusterMessage::Message(
102                RemoteWrapper::new(self.clone(), msg, None),
103            )))
104    }
105}
106
107impl Default for RemoteAddr {
108    fn default() -> Self {
109        RemoteAddr {
110            node: Node::default(),
111            id: AddrRepresentation::Key("Default".to_string()),
112        }
113    }
114}
115
116impl Clone for RemoteAddr {
117    fn clone(&self) -> Self {
118        RemoteAddr::new(self.node.clone(), self.id.clone())
119    }
120}
121
122impl PartialEq for RemoteAddr {
123    fn eq(&self, other: &Self) -> bool {
124        self.node.eq(&other.node) && self.id.eq(&other.id)
125    }
126}
127
128impl Eq for RemoteAddr {}
129
130impl Hash for RemoteAddr {
131    fn hash<H: Hasher>(&self, state: &mut H) {
132        self.node.hash(state);
133        self.id.hash(state);
134    }
135}
136
137/// A helper Enum that can either be an [Addr](https://github.com/actix/actix/blob/master/actix/src/address/mod.rs) or a [RemoteAddr](https://github.com/wenig/actix-telepathy/blob/main/src/remote/addr/mod.rs).
138#[derive(Deserialize, Serialize)]
139pub enum AnyAddr<A: Actor> {
140    #[serde(skip_serializing, skip_deserializing)]
141    Local(Addr<A>),
142    Remote(RemoteAddr),
143}
144
145impl<A: Actor> AnyAddr<A> {
146    pub fn do_send<M>(&self, msg: M)
147    where
148        M: RemoteMessage,
149        M::Result: Send,
150        A: Handler<M>,
151        A::Context: ToEnvelope<A, M>,
152    {
153        match self {
154            AnyAddr::Local(addr) => addr.do_send(msg),
155            AnyAddr::Remote(addr) => addr.do_send(msg),
156        }
157    }
158
159    pub fn change_id(&mut self, id: &str) {
160        if let AnyAddr::Remote(addr) = self {
161            addr.change_id(id.to_string());
162        }
163    }
164}
165
166impl<T: Actor> Clone for AnyAddr<T> {
167    fn clone(&self) -> Self {
168        match self {
169            AnyAddr::Local(addr) => AnyAddr::Local(addr.clone()),
170            AnyAddr::Remote(addr) => AnyAddr::Remote(addr.clone()),
171        }
172    }
173}
174
175impl<T: Actor> PartialEq for AnyAddr<T> {
176    fn eq(&self, other: &Self) -> bool {
177        match self {
178            AnyAddr::Local(addr) => match other {
179                AnyAddr::Local(other_addr) => addr.eq(other_addr),
180                AnyAddr::Remote(_) => false,
181            },
182            AnyAddr::Remote(addr) => match other {
183                AnyAddr::Local(_) => false,
184                AnyAddr::Remote(other_addr) => addr.eq(other_addr),
185            },
186        }
187    }
188}
189
190impl<T: Actor> Eq for AnyAddr<T> {}
191
192impl<T: Actor> Hash for AnyAddr<T> {
193    fn hash<H: Hasher>(&self, state: &mut H) {
194        match self {
195            AnyAddr::Local(addr) => addr.hash(state),
196            AnyAddr::Remote(addr) => addr.hash(state),
197        }
198    }
199}