actix_telepathy/lib.rs
1//! Actix-Telepathy is an extension to [Actix](https://docs.rs/actix) that enables remote messaging and clustering support.
2//!
3//! Telepathy does not change Actix' messaging system but _extends_ the
4//!
5//! - [actix::Actor](https://docs.rs/actix/latest/actix/trait.Actor.html) with the [RemoteActor](./trait.RemoteActor.html) trait and the
6//! - [actix::Message](https://docs.rs/actix/latest/actix/trait.Message.html) with the [RemoteMessage](./trait.RemoteMessage.html) trait.
7//!
8//! Hence, an example actor receiving a remote message is defined as follows.
9//! To connect multiple computers in a cluster, a [Cluster](./struct.Cluster.html) must be generated.
10//!
11//! ```rust
12//! use actix::prelude::*;
13//! use actix_broker::BrokerSubscribe;
14//! use actix_telepathy::prelude::*; // <-- Telepathy extension
15//! use serde::{Serialize, Deserialize};
16//! use std::net::SocketAddr;
17//!
18//! #[derive(RemoteMessage, Serialize, Deserialize)] // <-- Telepathy extension
19//! struct MyMessage {}
20//!
21//! #[derive(RemoteActor)] // <-- Telepathy extension
22//! #[remote_messages(MyMessage)] // <-- Telepathy extension
23//! struct MyActor {
24//! state: usize
25//! }
26//!
27//! impl Actor for MyActor {
28//! type Context = Context<Self>;
29//!
30//! fn started(&mut self, ctx: &mut Self::Context) {
31//! self.register(ctx.address().recipient()); // <-- Telepathy extension
32//! }
33//! }
34//!
35//! impl Handler<MyMessage> for MyActor {
36//! type Result = ();
37//!
38//! fn handle(&mut self, msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
39//! todo!()
40//! }
41//! }
42//!
43//! #[actix_rt::main]
44//! pub async fn start_cluster(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) {
45//! let _addr = MyActor { state: 0 }.start();
46//! let _cluster = Cluster::new(own_addr, seed_nodes);
47//! tokio::time::sleep(std::time::Duration::from_secs(5)).await;
48//! }
49//!
50//! ```
51//!
52//! The previous example will not do anything. However, the cluster will try to connect to the given addresses in `seed_nodes`.
53//! To react to new joining members, a [ClusterListener](./trait.ClusterListener.html) actor should be used:
54//!
55//! ```rust
56//! use actix::prelude::*;
57//! use actix_broker::BrokerSubscribe;
58//! use actix_telepathy::prelude::*; // <-- Telepathy extension
59//! use serde::{Serialize, Deserialize};
60//! use std::net::SocketAddr;
61//!
62//! #[derive(RemoteMessage, Serialize, Deserialize)] // <-- Telepathy extension
63//! struct MyMessage {}
64//!
65//! #[derive(RemoteActor)] // <-- Telepathy extension
66//! #[remote_messages(MyMessage)] // <-- Telepathy extension
67//! struct MyActor {
68//! state: usize
69//! }
70//!
71//! impl Actor for MyActor {
72//! type Context = Context<Self>;
73//!
74//! fn started(&mut self, ctx: &mut Self::Context) {
75//! self.register(ctx.address().recipient()); // <-- Telepathy extension
76//! self.subscribe_system_async::<ClusterLog>(ctx); // <-- Telepathy extension
77//! }
78//! }
79//!
80//! impl Handler<MyMessage> for MyActor {
81//! type Result = ();
82//!
83//! fn handle(&mut self, msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
84//! todo!()
85//! }
86//! }
87//!
88//! impl Handler<ClusterLog> for MyActor { // <-- Telepathy extension
89//! type Result = ();
90//!
91//! fn handle(&mut self, msg: ClusterLog, ctx: &mut Self::Context) -> Self::Result {
92//! match msg {
93//! ClusterLog::NewMember(_node) => {
94//! println!("New member joined the cluster.")
95//! },
96//! ClusterLog::MemberLeft(_ip_addr) => {
97//! println!("Member left the cluster.")
98//! }
99//! }
100//! }
101//! }
102//! impl ClusterListener for MyActor {} // <-- Telepathy extension
103//!
104//! #[actix_rt::main]
105//! pub async fn start_cluster(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) {
106//! let _addr = MyActor { state: 0 }.start();
107//! let _cluster = Cluster::new(own_addr, seed_nodes); // <-- Telepathy extension
108//! tokio::time::sleep(std::time::Duration::from_secs(5)).await;
109//! }
110//!
111//! ```
112//!
113//! Now, we receive a printed message whenever a new member joins the cluster or when a member leaves.
114//! To send messages between remote actors to other members in the cluster, we have to utilize the
115//! [RemoteAddr](./struct.RemoteAddr.html) that the [ClusterListener](./trait.ClusterListener.html) receives.
116//!
117//! ```rust
118//! use actix::prelude::*;
119//! use actix_broker::BrokerSubscribe;
120//! use actix_telepathy::prelude::*; // <-- Telepathy extension
121//! use serde::{Serialize, Deserialize};
122//! use std::net::SocketAddr;
123//!
124//! #[derive(RemoteMessage, Serialize, Deserialize)] // <-- Telepathy extension
125//! struct MyMessage {}
126//!
127//! #[derive(RemoteActor)] // <-- Telepathy extension
128//! #[remote_messages(MyMessage)] // <-- Telepathy extension
129//! struct MyActor {
130//! state: usize
131//! }
132//!
133//! impl Actor for MyActor {
134//! type Context = Context<Self>;
135//!
136//! fn started(&mut self, ctx: &mut Self::Context) {
137//! self.register(ctx.address().recipient()); // <-- Telepathy extension
138//! self.subscribe_system_async::<ClusterLog>(ctx); // <-- Telepathy extension
139//! }
140//! }
141//!
142//! impl Handler<MyMessage> for MyActor {
143//! type Result = ();
144//!
145//! fn handle(&mut self, msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
146//! println!("RemoteMessage received!")
147//! }
148//! }
149//!
150//! impl Handler<ClusterLog> for MyActor { // <-- Telepathy extension
151//! type Result = ();
152//!
153//! fn handle(&mut self, msg: ClusterLog, ctx: &mut Self::Context) -> Self::Result {
154//! match msg {
155//! ClusterLog::NewMember(node) => {
156//! println!("New member joined the cluster.");
157//! let remote_addr = node.get_remote_addr(Self::ACTOR_ID.to_string());
158//! remote_addr.do_send(MyMessage {})
159//! },
160//! ClusterLog::MemberLeft(_ip_addr) => {
161//! println!("Member left the cluster.")
162//! }
163//! }
164//! }
165//! }
166//! impl ClusterListener for MyActor {} // <-- Telepathy extension
167//!
168//! #[actix_rt::main]
169//! pub async fn start_cluster(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) {
170//! let _addr = MyActor { state: 0 }.start();
171//! let _cluster = Cluster::new(own_addr, seed_nodes); // <-- Telepathy extension
172//! tokio::time::sleep(std::time::Duration::from_secs(5)).await;
173//! }
174//!
175//! ```
176//!
177//! Now, every new member receives a `MyMessage` from every [ClusterListener](./trait.ClusterListener.html) in the cluster.
178//!
179//! Before we could use the [RemoteAddr](./struct.RemoteAddr.html), we had to make sure, that it is pointing to the correct [RemoteActor](./trait.RemoteActor.html), which is `MyActor` in that case.
180//! Therefore, we had to call `get_remote_addr` on the [Node](./struct.Node.html). A [RemoteAddr](./struct.RemoteAddr.html) points to a specific actor on a remote machine.
181
182#[cfg(feature = "derive")]
183pub use actix_telepathy_derive::*;
184
185mod cluster;
186mod codec;
187mod network;
188mod remote;
189mod serialization;
190#[cfg(test)]
191pub(crate) mod test_utils;
192mod utils;
193
194pub use crate::cluster::*;
195pub use crate::codec::ClusterMessage;
196pub use crate::network::*;
197pub use crate::remote::*;
198pub use crate::serialization::*;
199pub use crate::utils::*;
200
201pub mod prelude {
202 #[cfg(feature = "derive")]
203 pub use actix_telepathy_derive::*;
204
205 pub use crate::cluster::{Cluster, ClusterListener, ClusterLog, NodeResolving};
206 pub use crate::network::NetworkInterface;
207 pub use crate::remote::{AnyAddr, RemoteActor, RemoteAddr, RemoteMessage, RemoteWrapper};
208 pub use crate::serialization::{
209 CustomSerialization, CustomSerializationError, DefaultSerialization,
210 };
211}