Crate actix_telepathy

source ·
Expand description

Actix-Telepathy is an extension to Actix that enables remote messaging and clustering support.

Telepathy does not change Actix’ messaging system but extends the

Hence, an example actor receiving a remote message is defined as follows. To connect multiple computers in a cluster, a Cluster must be generated.

use actix::prelude::*;
use actix_broker::BrokerSubscribe;
use actix_telepathy::prelude::*;  // <-- Telepathy extension
use serde::{Serialize, Deserialize};
use std::net::SocketAddr;

#[derive(RemoteMessage, Serialize, Deserialize)]  // <-- Telepathy extension
struct MyMessage {}

#[derive(RemoteActor)]  // <-- Telepathy extension
#[remote_messages(MyMessage)]  // <-- Telepathy extension
struct MyActor {
    state: usize
}

impl Actor for MyActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.register(ctx.address().recipient());  // <-- Telepathy extension
    }
}

impl Handler<MyMessage> for MyActor {
    type Result = ();

    fn handle(&mut self, msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
        todo!()
    }
}

#[actix_rt::main]
pub async fn start_cluster(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) {
    let _addr = MyActor { state: 0 }.start();
    let _cluster = Cluster::new(own_addr, seed_nodes);
    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}

The previous example will not do anything. However, the cluster will try to connect to the given addresses in seed_nodes. To react to new joining members, a ClusterListener actor should be used:

use actix::prelude::*;
use actix_broker::BrokerSubscribe;
use actix_telepathy::prelude::*;  // <-- Telepathy extension
use serde::{Serialize, Deserialize};
use std::net::SocketAddr;

#[derive(RemoteMessage, Serialize, Deserialize)]  // <-- Telepathy extension
struct MyMessage {}

#[derive(RemoteActor)]  // <-- Telepathy extension
#[remote_messages(MyMessage)]  // <-- Telepathy extension
struct MyActor {
    state: usize
}

impl Actor for MyActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.register(ctx.address().recipient());  // <-- Telepathy extension
        self.subscribe_system_async::<ClusterLog>(ctx);  // <-- Telepathy extension
    }
}

impl Handler<MyMessage> for MyActor {
    type Result = ();

    fn handle(&mut self, msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
        todo!()
    }
}

impl Handler<ClusterLog> for MyActor {  // <-- Telepathy extension
    type Result = ();

    fn handle(&mut self, msg: ClusterLog, ctx: &mut Self::Context) -> Self::Result {
        match msg {
            ClusterLog::NewMember(_node) => {
                println!("New member joined the cluster.")
            },
            ClusterLog::MemberLeft(_ip_addr) => {
                println!("Member left the cluster.")
            }
        }
    }
}
impl ClusterListener for MyActor {}  // <-- Telepathy extension

#[actix_rt::main]
pub async fn start_cluster(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) {
    let _addr = MyActor { state: 0 }.start();
    let _cluster = Cluster::new(own_addr, seed_nodes);  // <-- Telepathy extension
    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}

Now, we receive a printed message whenever a new member joins the cluster or when a member leaves. To send messages between remote actors to other members in the cluster, we have to utilize the RemoteAddr that the ClusterListener receives.

use actix::prelude::*;
use actix_broker::BrokerSubscribe;
use actix_telepathy::prelude::*;  // <-- Telepathy extension
use serde::{Serialize, Deserialize};
use std::net::SocketAddr;

#[derive(RemoteMessage, Serialize, Deserialize)]  // <-- Telepathy extension
struct MyMessage {}

#[derive(RemoteActor)]  // <-- Telepathy extension
#[remote_messages(MyMessage)]  // <-- Telepathy extension
struct MyActor {
    state: usize
}

impl Actor for MyActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.register(ctx.address().recipient());  // <-- Telepathy extension
        self.subscribe_system_async::<ClusterLog>(ctx);  // <-- Telepathy extension
    }
}

impl Handler<MyMessage> for MyActor {
    type Result = ();

    fn handle(&mut self, msg: MyMessage, ctx: &mut Self::Context) -> Self::Result {
        println!("RemoteMessage received!")
    }
}

impl Handler<ClusterLog> for MyActor {  // <-- Telepathy extension
    type Result = ();

    fn handle(&mut self, msg: ClusterLog, ctx: &mut Self::Context) -> Self::Result {
        match msg {
            ClusterLog::NewMember(node) => {
                println!("New member joined the cluster.");
                let remote_addr = node.get_remote_addr(Self::ACTOR_ID.to_string());
                remote_addr.do_send(MyMessage {})
            },
            ClusterLog::MemberLeft(_ip_addr) => {
                println!("Member left the cluster.")
            }
        }
    }
}
impl ClusterListener for MyActor {}  // <-- Telepathy extension

#[actix_rt::main]
pub async fn start_cluster(own_addr: SocketAddr, seed_nodes: Vec<SocketAddr>) {
    let _addr = MyActor { state: 0 }.start();
    let _cluster = Cluster::new(own_addr, seed_nodes);  // <-- Telepathy extension
    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}

Now, every new member receives a MyMessage from every ClusterListener in the cluster.

Before we could use the RemoteAddr, we had to make sure, that it is pointing to the correct RemoteActor, which is MyActor in that case. Therefore, we had to call get_remote_addr on the Node. A RemoteAddr points to a specific actor on a remote machine.

Modules

Structs

  • Central Actor for cluster handling
  • Occurs if either the serialization or the deserialization fails for the CustomSerialization trait.
  • The default serialization used for remote messages
  • The Gossip connector variant can connect the nodes to each other. Each node can have a different seed node. When joining the cluster, the node will connect to its seed node and receives the number of nodes that are about to join. The seed node of that node will then send the joining node’s information to the other nodes via the Gossip protocol. Thereby, the seed node randomly chooses 3 nodes and sends the information to them. These 3 nodes will connect to the joining node. Then the 3 nodes will send the information to 3 other nodes and so on. This variant is recommended if the seed node is not always available. This variant is not recommended if the cluster is very large, because the gossip protocol takes more time the larger the cluster is.
  • Similar to actix::prelude::Addr but supports communication to remote actors on other nodes.
  • Wrapper for messages to be sent to remote actor
  • The SingleSeed connector variant expects all nodes to have the same seed node (except the seed node itself, it has no seed node). If another node is added, it will be added to the cluster by the seed node. If a node has a different seed node, errors can occur. This variant is recommended for a fast connection setup, but it is not recommended if the seed node is not always available.

Enums

Traits

Derive Macros