actix_telepathy/
lib.rs

1//! Actix-Telepathy is an extension to [Actix](https://docs.rs/actix) that enables remote messaging and clustering support.
2//!
3//! Telepathy does not change Actix' messaging system but _extends_ the
4//!
5//! - [actix::Actor](https://docs.rs/actix/latest/actix/trait.Actor.html) with the [RemoteActor](./trait.RemoteActor.html) trait and the
6//! - [actix::Message](https://docs.rs/actix/latest/actix/trait.Message.html) with the [RemoteMessage](./trait.RemoteMessage.html) trait.
7//!
8//! Hence, an example actor receiving a remote message is defined as follows.
9//! To connect multiple computers in a cluster, a [Cluster](./struct.Cluster.html) must be generated.
10//!
11//! ```rust
12//! use actix::prelude::*;
13//! use actix_broker::BrokerSubscribe;
14//! use actix_telepathy::prelude::*;  // <-- Telepathy extension
15//! use serde::{Serialize, Deserialize};
16//! use std::net::SocketAddr;
17//!
18//! #[derive(RemoteMessage, Serialize, Deserialize)]  // <-- Telepathy extension
19//! struct MyMessage {}
20//!
21//! #[derive(RemoteActor)]  // <-- Telepathy extension
22//! #[remote_messages(MyMessage)]  // <-- Telepathy extension
23//! struct MyActor {
24//!     state: usize
25//! }
26//!
27//! impl Actor for MyActor {
28//!     type Context = Context<Self>;
29//!
30//!     fn started(&mut self, ctx: &mut Self::Context) {
31//!         self.register(ctx.address().recipient());  // <-- Telepathy extension
32//!     }
33//! }
34//!
35//! impl Handler<MyMessage> for MyActor {
36//!     type Result = ();
37//!
38//!     fn handle(&mut self, msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
39//!         todo!()
40//!     }
41//! }
42//!
43//! #[actix_rt::main]
44//! pub async fn start_cluster(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) {
45//!     let _addr = MyActor { state: 0 }.start();
46//!     let _cluster = Cluster::new(own_addr, seed_nodes);
47//!     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
48//! }
49//!
50//! ```
51//!
52//! The previous example will not do anything. However, the cluster will try to connect to the given addresses in `seed_nodes`.
53//! To react to new joining members, a [ClusterListener](./trait.ClusterListener.html) actor should be used:
54//!
55//! ```rust
56//! use actix::prelude::*;
57//! use actix_broker::BrokerSubscribe;
58//! use actix_telepathy::prelude::*;  // <-- Telepathy extension
59//! use serde::{Serialize, Deserialize};
60//! use std::net::SocketAddr;
61//!
62//! #[derive(RemoteMessage, Serialize, Deserialize)]  // <-- Telepathy extension
63//! struct MyMessage {}
64//!
65//! #[derive(RemoteActor)]  // <-- Telepathy extension
66//! #[remote_messages(MyMessage)]  // <-- Telepathy extension
67//! struct MyActor {
68//!     state: usize
69//! }
70//!
71//! impl Actor for MyActor {
72//!     type Context = Context<Self>;
73//!
74//!     fn started(&mut self, ctx: &mut Self::Context) {
75//!         self.register(ctx.address().recipient());  // <-- Telepathy extension
76//!         self.subscribe_system_async::<ClusterLog>(ctx);  // <-- Telepathy extension
77//!     }
78//! }
79//!
80//! impl Handler<MyMessage> for MyActor {
81//!     type Result = ();
82//!
83//!     fn handle(&mut self, msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
84//!         todo!()
85//!     }
86//! }
87//!
88//! impl Handler<ClusterLog> for MyActor {  // <-- Telepathy extension
89//!     type Result = ();
90//!
91//!     fn handle(&mut self, msg: ClusterLog, ctx: &mut Self::Context) -> Self::Result {
92//!         match msg {
93//!             ClusterLog::NewMember(_node) => {
94//!                 println!("New member joined the cluster.")
95//!             },
96//!             ClusterLog::MemberLeft(_ip_addr) => {
97//!                 println!("Member left the cluster.")
98//!             }
99//!         }
100//!     }
101//! }
102//! impl ClusterListener for MyActor {}  // <-- Telepathy extension
103//!
104//! #[actix_rt::main]
105//! pub async fn start_cluster(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) {
106//!     let _addr = MyActor { state: 0 }.start();
107//!     let _cluster = Cluster::new(own_addr, seed_nodes);  // <-- Telepathy extension
108//!     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
109//! }
110//!
111//! ```
112//!
113//! Now, we receive a printed message whenever a new member joins the cluster or when a member leaves.
114//! To send messages between remote actors to other members in the cluster, we have to utilize the
115//! [RemoteAddr](./struct.RemoteAddr.html) that the [ClusterListener](./trait.ClusterListener.html) receives.
116//!
117//! ```rust
118//! use actix::prelude::*;
119//! use actix_broker::BrokerSubscribe;
120//! use actix_telepathy::prelude::*;  // <-- Telepathy extension
121//! use serde::{Serialize, Deserialize};
122//! use std::net::SocketAddr;
123//!
124//! #[derive(RemoteMessage, Serialize, Deserialize)]  // <-- Telepathy extension
125//! struct MyMessage {}
126//!
127//! #[derive(RemoteActor)]  // <-- Telepathy extension
128//! #[remote_messages(MyMessage)]  // <-- Telepathy extension
129//! struct MyActor {
130//!     state: usize
131//! }
132//!
133//! impl Actor for MyActor {
134//!     type Context = Context<Self>;
135//!
136//!     fn started(&mut self, ctx: &mut Self::Context) {
137//!         self.register(ctx.address().recipient());  // <-- Telepathy extension
138//!         self.subscribe_system_async::<ClusterLog>(ctx);  // <-- Telepathy extension
139//!     }
140//! }
141//!
142//! impl Handler<MyMessage> for MyActor {
143//!     type Result = ();
144//!
145//!     fn handle(&mut self, msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
146//!         println!("RemoteMessage received!")
147//!     }
148//! }
149//!
150//! impl Handler<ClusterLog> for MyActor {  // <-- Telepathy extension
151//!     type Result = ();
152//!
153//!     fn handle(&mut self, msg: ClusterLog, ctx: &mut Self::Context) -> Self::Result {
154//!         match msg {
155//!             ClusterLog::NewMember(node) => {
156//!                 println!("New member joined the cluster.");
157//!                 let remote_addr = node.get_remote_addr(Self::ACTOR_ID.to_string());
158//!                 remote_addr.do_send(MyMessage {})
159//!             },
160//!             ClusterLog::MemberLeft(_ip_addr) => {
161//!                 println!("Member left the cluster.")
162//!             }
163//!         }
164//!     }
165//! }
166//! impl ClusterListener for MyActor {}  // <-- Telepathy extension
167//!
168//! #[actix_rt::main]
169//! pub async fn start_cluster(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) {
170//!     let _addr = MyActor { state: 0 }.start();
171//!     let _cluster = Cluster::new(own_addr, seed_nodes);  // <-- Telepathy extension
172//!     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
173//! }
174//!
175//! ```
176//!
177//! Now, every new member receives a `MyMessage` from every [ClusterListener](./trait.ClusterListener.html) in the cluster.
178//!
179//! Before we could use the [RemoteAddr](./struct.RemoteAddr.html), we had to make sure, that it is pointing to the correct [RemoteActor](./trait.RemoteActor.html), which is `MyActor` in that case.
180//! Therefore, we had to call `get_remote_addr` on the [Node](./struct.Node.html). A [RemoteAddr](./struct.RemoteAddr.html) points to a specific actor on a remote machine.
181
182#[cfg(feature = "derive")]
183pub use actix_telepathy_derive::*;
184
185mod cluster;
186mod codec;
187mod network;
188mod remote;
189mod serialization;
190#[cfg(test)]
191pub(crate) mod test_utils;
192mod utils;
193
194pub use crate::cluster::*;
195pub use crate::codec::ClusterMessage;
196pub use crate::network::*;
197pub use crate::remote::*;
198pub use crate::serialization::*;
199pub use crate::utils::*;
200
201pub mod prelude {
202    #[cfg(feature = "derive")]
203    pub use actix_telepathy_derive::*;
204
205    pub use crate::cluster::{Cluster, ClusterListener, ClusterLog, NodeResolving};
206    pub use crate::network::NetworkInterface;
207    pub use crate::remote::{AnyAddr, RemoteActor, RemoteAddr, RemoteMessage, RemoteWrapper};
208    pub use crate::serialization::{
209        CustomSerialization, CustomSerializationError, DefaultSerialization,
210    };
211}