ractor_cluster 0.6.1

Distributed cluster environment of Ractor actors
Documentation
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! A [NodeSession] is an individual connection between a specific pair of
//! `node()`s and all of its authentication and communication for that
//! pairing

use std::collections::HashMap;
use std::convert::TryInto;
use std::net::SocketAddr;

use ractor::message::SerializedMessage;
use ractor::pg::GroupChangeMessage;
use ractor::registry::PidLifecycleEvent;
use ractor::rpc::CallResult;
use ractor::{Actor, ActorId, ActorProcessingErr, ActorRef, SpawnErr, SupervisionEvent};
use rand::Rng;
use tokio::time::Duration;

use super::{auth, NodeServer};
use crate::net::session::SessionMessage;
use crate::protocol::auth as auth_protocol;
use crate::protocol::control as control_protocol;
use crate::protocol::node as node_protocol;
use crate::remote_actor::RemoteActor;

const MIN_PING_LATENCY_MS: u64 = 1000;
const MAX_PING_LATENCY_MS: u64 = 5000;

#[cfg(test)]
mod tests;

#[derive(Debug)]
enum AuthenticationState {
    AsClient(auth::ClientAuthenticationProcess),
    AsServer(auth::ServerAuthenticationProcess),
}

impl AuthenticationState {
    fn is_ok(&self) -> bool {
        match self {
            Self::AsClient(c) => matches!(c, auth::ClientAuthenticationProcess::Ok),
            Self::AsServer(s) => matches!(s, auth::ServerAuthenticationProcess::Ok(_)),
        }
    }

    fn is_close(&self) -> bool {
        match self {
            Self::AsClient(c) => matches!(c, auth::ClientAuthenticationProcess::Close),
            Self::AsServer(s) => matches!(s, auth::ServerAuthenticationProcess::Close),
        }
    }
}

/// Represents a remote connection to a `node()`. The [NodeSession] is the main
/// handler for all inter-node communication and handles
///
/// 1. The state of the authentication handshake
/// 2. Control messages for actor synchronization + group membership changes
/// 3. `RemoteActor`s wishing to send messages to their remote counterparts on the
/// remote system (and receive replies)
///
/// A [NodeSession] can either be a client or server session, depending on the connection sequence.
/// If it was an incoming request to the [NodeServer] then it's a "server" session, as
/// the server spawned this actor. Otherwise it's an outgoing "client" request.
///
/// If the [NodeSession] is a client session, it will start the authentication handshake with
/// a `auth_protocol::NameMessage` announcing this node's name to the remote system for deduplication
/// and starting the rest of the handshake. For full authentication pattern details, see
///
/// 1. `src/protocol/auth.proto`
/// 2. `src/node/auth.rs`
///
/// Lastly the node's have an intern-node "ping" operation which occurs to keep the TCP session alive
/// and additionally measure peer latency.
pub struct NodeSession {
    node_id: crate::NodeId,
    is_server: bool,
    cookie: String,
    node_server: ActorRef<NodeServer>,
    node_name: auth_protocol::NameMessage,
}

impl NodeSession {
    /// Construct a new [NodeSession] with the supplied
    /// arguments
    pub fn new(
        node_id: crate::NodeId,
        is_server: bool,
        cookie: String,
        node_server: ActorRef<NodeServer>,
        node_name: auth_protocol::NameMessage,
    ) -> Self {
        Self {
            node_id,
            is_server,
            cookie,
            node_server,
            node_name,
        }
    }
}

impl NodeSession {
    async fn handle_auth(
        &self,
        state: &mut NodeSessionState,
        message: auth_protocol::AuthenticationMessage,
        myself: ActorRef<Self>,
    ) {
        if state.auth.is_ok() {
            // nothing to do, we're already authenticated
            return;
        }
        if state.auth.is_close() {
            log::info!(
                "Node Session {} is shutting down due to authentication failure",
                self.node_id
            );
            // we need to shutdown, the session needs to be terminated
            myself.stop(Some("auth_fail".to_string()));
            if let Some(tcp) = &state.tcp {
                tcp.stop(Some("auth_fail".to_string()));
            }
        }

        match &state.auth {
            AuthenticationState::AsClient(client_auth) => {
                let mut next = client_auth.next(message, &self.cookie);
                match &next {
                    auth::ClientAuthenticationProcess::WaitingForServerChallenge(server_status) => {
                        match server_status.status() {
                            auth_protocol::server_status::Status::Ok => {
                                // this handshake will continue
                            }
                            auth_protocol::server_status::Status::OkSimultaneous => {
                                // this handshake will continue, but there is another handshake underway
                                // that will be shut down (i.e. this was a server connection and we're currently trying
                                // a client connection)
                            }
                            auth_protocol::server_status::Status::NotOk => {
                                // The handshake will not continue, as there's already another client handshake underway
                                // which itself initiated (Simultaneous connect where the other connection's name is > this node
                                // name)
                                next = auth::ClientAuthenticationProcess::Close;
                            }
                            auth_protocol::server_status::Status::NotAllowed => {
                                // unspecified auth reason
                                next = auth::ClientAuthenticationProcess::Close;
                            }
                            auth_protocol::server_status::Status::Alive => {
                                // A connection to the node is already alive, which means either the
                                // node is confused in its connection state or the previous TCP connection is
                                // breaking down. Send ClientStatus
                                // TODO: check the status properly
                                state.tcp_send_auth(auth_protocol::AuthenticationMessage {
                                    msg: Some(
                                        auth_protocol::authentication_message::Msg::ClientStatus(
                                            auth_protocol::ClientStatus { status: true },
                                        ),
                                    ),
                                });
                            }
                        }
                    }
                    auth::ClientAuthenticationProcess::WaitingForServerChallengeAck(
                        server_challenge_value,
                        reply_to_server,
                        our_challenge,
                        _expected_digest,
                    ) => {
                        // record the name
                        state.name = Some(auth_protocol::NameMessage {
                            name: server_challenge_value.name.clone(),
                            flags: server_challenge_value.flags.clone(),
                        });
                        // tell the node server that we now know this peer's name information
                        let _ = self
                            .node_server
                            .cast(super::NodeServerMessage::UpdateSession {
                                actor_id: myself.get_id(),
                                name: self.node_name.clone(),
                            });
                        // send the client challenge to the server
                        let reply = auth_protocol::AuthenticationMessage {
                            msg: Some(auth_protocol::authentication_message::Msg::ClientChallenge(
                                auth_protocol::ChallengeReply {
                                    digest: reply_to_server.to_vec(),
                                    challenge: *our_challenge,
                                },
                            )),
                        };
                        state.tcp_send_auth(reply);
                    }
                    _ => {
                        // no message to send
                    }
                }

                if let auth::ClientAuthenticationProcess::Close = &next {
                    log::info!(
                        "Node Session {} is shutting down due to authentication failure",
                        self.node_id
                    );
                    myself.stop(Some("auth_fail".to_string()));
                }
                if let auth::ClientAuthenticationProcess::Ok = &next {
                    log::info!("Node Session {} is authenticated", self.node_id);
                }
                log::debug!("Next client auth state: {:?}", next);
                state.auth = AuthenticationState::AsClient(next);
            }
            AuthenticationState::AsServer(server_auth) => {
                let mut next = server_auth.next(message, &self.cookie);

                match &next {
                    auth::ServerAuthenticationProcess::HavePeerName(peer_name) => {
                        // store the peer node's name in the session state
                        state.name = Some(peer_name.clone());

                        // send the status message, followed by the server's challenge
                        let server_status_result = self
                            .node_server
                            .call(
                                |tx| super::NodeServerMessage::CheckSession {
                                    peer_name: peer_name.clone(),
                                    reply: tx,
                                },
                                Some(Duration::from_millis(500)),
                            )
                            .await;
                        match server_status_result {
                            Err(_) | Ok(CallResult::Timeout) | Ok(CallResult::SenderError) => {
                                next = auth::ServerAuthenticationProcess::Close;
                            }
                            Ok(CallResult::Success(reply)) => {
                                let server_status: auth_protocol::server_status::Status =
                                    reply.into();
                                // Send the server's status message
                                let status_msg = auth_protocol::AuthenticationMessage {
                                    msg: Some(
                                        auth_protocol::authentication_message::Msg::ServerStatus(
                                            auth_protocol::ServerStatus {
                                                status: server_status.into(),
                                            },
                                        ),
                                    ),
                                };
                                state.tcp_send_auth(status_msg);

                                match server_status {
                                    auth_protocol::server_status::Status::Ok
                                    | auth_protocol::server_status::Status::OkSimultaneous => {
                                        // Good to proceed, start a challenge
                                        next = next.start_challenge(&self.cookie);
                                        if let auth::ServerAuthenticationProcess::WaitingOnClientChallengeReply(
                                            challenge,
                                            _digest,
                                        ) = &next
                                        {
                                            let challenge_msg = auth_protocol::AuthenticationMessage {
                                                msg: Some(
                                                    auth_protocol::authentication_message::Msg::ServerChallenge(
                                                        auth_protocol::Challenge {
                                                            name: self.node_name.name.clone(),
                                                            flags: self.node_name.flags.clone(),
                                                            challenge: *challenge,
                                                        },
                                                    ),
                                                ),
                                            };
                                            state.tcp_send_auth(challenge_msg);
                                        }
                                    }
                                    auth_protocol::server_status::Status::NotOk
                                    | auth_protocol::server_status::Status::NotAllowed => {
                                        next = auth::ServerAuthenticationProcess::Close;
                                    }
                                    auth_protocol::server_status::Status::Alive => {
                                        // we sent the `Alive` status, so we're waiting on the client to confirm their status
                                        // before continuing
                                        next = auth::ServerAuthenticationProcess::WaitingOnClientStatus;
                                    }
                                }
                            }
                        }
                    }
                    auth::ServerAuthenticationProcess::Ok(digest) => {
                        let client_challenge_reply = auth_protocol::AuthenticationMessage {
                            msg: Some(auth_protocol::authentication_message::Msg::ServerAck(
                                auth_protocol::ChallengeAck {
                                    digest: digest.to_vec(),
                                },
                            )),
                        };
                        state.tcp_send_auth(client_challenge_reply);
                    }
                    _ => {
                        // no message to send
                    }
                }

                if let auth::ServerAuthenticationProcess::Close = &next {
                    log::info!(
                        "Node Session {} is shutting down due to authentication failure",
                        self.node_id
                    );
                    myself.stop(Some("auth_fail".to_string()));
                }
                if let auth::ServerAuthenticationProcess::Ok(_) = &next {
                    log::info!("Node Session {} is authenticated", self.node_id);
                }
                log::debug!("Next server auth state: {:?}", next);
                state.auth = AuthenticationState::AsServer(next);
            }
        }
    }

    fn handle_node(
        &self,
        state: &mut NodeSessionState,
        message: node_protocol::NodeMessage,
        myself: ActorRef<Self>,
    ) {
        if !state.auth.is_ok() {
            log::warn!("Inter-node message received on unauthenticated NodeSession");
            return;
        }

        if let Some(msg) = message.msg {
            match msg {
                node_protocol::node_message::Msg::Cast(cast_args) => {
                    if let Some(actor) =
                        ractor::registry::where_is_pid(ActorId::Local(cast_args.to))
                    {
                        let _ = actor.send_serialized(SerializedMessage::Cast {
                            variant: cast_args.variant,
                            data: cast_args.what,
                        });
                    }
                }
                node_protocol::node_message::Msg::Call(call_args) => {
                    let to = call_args.to;
                    let tag = call_args.tag;
                    if let Some(actor) =
                        ractor::registry::where_is_pid(ActorId::Local(call_args.to))
                    {
                        let (tx, rx) = ractor::concurrency::oneshot();

                        // send off the transmission in the serialized format, letting the message's own deserialization handle
                        // the conversion
                        let maybe_timeout = call_args.timeout_ms.map(Duration::from_millis);
                        if let Some(timeout) = maybe_timeout {
                            let _ = actor.send_serialized(SerializedMessage::Call {
                                args: call_args.what,
                                reply: (tx, timeout).into(),
                                variant: call_args.variant,
                            });
                        } else {
                            let _ = actor.send_serialized(SerializedMessage::Call {
                                args: call_args.what,
                                reply: tx.into(),
                                variant: call_args.variant,
                            });
                        }

                        // kick off a background task to reply to the channel request, threading the tag and who to reply to
                        #[allow(clippy::let_underscore_future)]
                        let _ = ractor::concurrency::spawn(async move {
                            if let Some(timeout) = maybe_timeout {
                                if let Ok(Ok(result)) =
                                    ractor::concurrency::timeout(timeout, rx).await
                                {
                                    let reply = node_protocol::node_message::Msg::Reply(
                                        node_protocol::CallReply {
                                            tag,
                                            to,
                                            what: result,
                                        },
                                    );
                                    let _ = ractor::cast!(
                                        myself,
                                        super::NodeSessionMessage::SendMessage(
                                            node_protocol::NodeMessage { msg: Some(reply) }
                                        )
                                    );
                                }
                            } else if let Ok(result) = rx.await {
                                let reply = node_protocol::node_message::Msg::Reply(
                                    node_protocol::CallReply {
                                        tag,
                                        to,
                                        what: result,
                                    },
                                );
                                let _ = ractor::cast!(
                                    myself,
                                    super::NodeSessionMessage::SendMessage(
                                        node_protocol::NodeMessage { msg: Some(reply) }
                                    )
                                );
                            }
                        });
                    }
                }
                node_protocol::node_message::Msg::Reply(call_reply_args) => {
                    if let Some(actor) = state.remote_actors.get(&call_reply_args.to) {
                        let _ = actor.send_serialized(SerializedMessage::CallReply(
                            call_reply_args.tag,
                            call_reply_args.what,
                        ));
                    }
                }
            }
        }
    }

    async fn handle_control(
        &self,
        state: &mut NodeSessionState,
        message: control_protocol::ControlMessage,
        myself: ActorRef<Self>,
    ) {
        if !state.auth.is_ok() {
            log::warn!("Control message received on unauthenticated NodeSession");
            return;
        }

        if let Some(msg) = message.msg {
            match msg {
                control_protocol::control_message::Msg::Spawn(spawned_actors) => {
                    for net_actor in spawned_actors.actors {
                        if let Err(spawn_err) = self
                            .get_or_spawn_remote_actor(
                                &myself,
                                net_actor.name,
                                net_actor.pid,
                                state,
                            )
                            .await
                        {
                            log::error!("Failed to spawn remote actor with {}", spawn_err);
                        } else {
                            log::debug!("Spawned remote actor");
                        }
                    }
                }
                control_protocol::control_message::Msg::Terminate(termination) => {
                    for pid in termination.ids {
                        if let Some(actor) = state.remote_actors.remove(&pid) {
                            actor.stop(Some("remote".to_string()));
                            log::debug!(
                                "Actor {} on node {} exited, terminating local `RemoteActor` {}",
                                pid,
                                self.node_id,
                                actor.get_id()
                            );
                        }
                    }
                }
                control_protocol::control_message::Msg::Ping(ping) => {
                    state.tcp_send_control(control_protocol::ControlMessage {
                        msg: Some(control_protocol::control_message::Msg::Pong(
                            control_protocol::Pong {
                                timestamp: ping.timestamp,
                            },
                        )),
                    });
                }
                control_protocol::control_message::Msg::Pong(pong) => {
                    let ts: std::time::SystemTime = pong
                        .timestamp
                        .expect("Timestamp missing in Pong")
                        .try_into()
                        .expect("Failed to convert Pong(Timestamp) to SystemTime");
                    let delta_ms = std::time::SystemTime::now()
                        .duration_since(ts)
                        .expect("Time went backwards")
                        .as_millis();
                    log::debug!("Ping -> Pong took {}ms", delta_ms);
                    if delta_ms > 50 {
                        let default = || {
                            SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0)
                        };
                        log::warn!(
                            "Super long ping detected {} - {} ({}ms)",
                            state.local_addr.unwrap_or_else(default),
                            state.peer_addr.unwrap_or_else(default),
                            delta_ms
                        );
                    }
                    // schedule next ping
                    state.schedule_tcp_ping();
                }
                control_protocol::control_message::Msg::PgJoin(join) => {
                    let mut cells = vec![];
                    for control_protocol::Actor { name, pid } in join.actors {
                        match self
                            .get_or_spawn_remote_actor(&myself, name, pid, state)
                            .await
                        {
                            Ok(actor) => {
                                cells.push(actor.get_cell());
                            }
                            Err(spawn_err) => {
                                log::error!("Failed to spawn remote actor with '{}'", spawn_err);
                            }
                        }
                    }
                    // join the remote actors to the local PG group
                    if !cells.is_empty() {
                        log::debug!(
                            "PG Join group '{}' for {} remote actors",
                            join.group,
                            cells.len()
                        );
                        ractor::pg::join(join.group, cells);
                    }
                }
                control_protocol::control_message::Msg::PgLeave(leave) => {
                    let mut cells = vec![];
                    for control_protocol::Actor { name, pid } in leave.actors {
                        match self
                            .get_or_spawn_remote_actor(&myself, name, pid, state)
                            .await
                        {
                            Ok(actor) => {
                                cells.push(actor.get_cell());
                            }
                            Err(spawn_err) => {
                                log::error!("Failed to spawn remote actor with '{}'", spawn_err);
                            }
                        }
                    }
                    // join the remote actors to the local PG group
                    if !cells.is_empty() {
                        log::debug!(
                            "PG Leave group '{}' for {} remote actors",
                            leave.group,
                            cells.len()
                        );
                        ractor::pg::leave(leave.group, cells);
                    }
                }
            }
        }
    }

    /// Called once the session is authenticated
    fn after_authenticated(&self, myself: ActorRef<Self>, state: &mut NodeSessionState) {
        log::info!(
            "Session authenticated on NodeSession {} - ({:?})",
            self.node_id,
            state.peer_addr
        );

        // startup the ping healthcheck activity
        state.schedule_tcp_ping();

        // setup PID monitoring
        ractor::registry::pid_registry::monitor(myself.get_cell());

        // Scan all PIDs and spawn them on the remote host
        let pids = ractor::registry::pid_registry::get_all_pids()
            .into_iter()
            .filter(|act| act.supports_remoting())
            .map(|a| control_protocol::Actor {
                name: a.get_name(),
                pid: a.get_id().pid(),
            })
            .collect::<Vec<_>>();
        if !pids.is_empty() {
            let msg = control_protocol::ControlMessage {
                msg: Some(control_protocol::control_message::Msg::Spawn(
                    control_protocol::Spawn { actors: pids },
                )),
            };
            state.tcp_send_control(msg);
        }

        // setup PG monitoring
        ractor::pg::monitor(
            ractor::pg::ALL_GROUPS_NOTIFICATION.to_string(),
            myself.get_cell(),
        );

        // Scan all PG groups + synchronize them
        let groups = ractor::pg::which_groups();
        for group in groups {
            let local_members = ractor::pg::get_local_members(&group)
                .into_iter()
                .filter(|v| v.supports_remoting())
                .map(|act| control_protocol::Actor {
                    name: act.get_name(),
                    pid: act.get_id().get_pid(),
                })
                .collect::<Vec<_>>();
            if !local_members.is_empty() {
                let control_message = control_protocol::ControlMessage {
                    msg: Some(control_protocol::control_message::Msg::PgJoin(
                        control_protocol::PgJoin {
                            group,
                            actors: local_members,
                        },
                    )),
                };
                state.tcp_send_control(control_message);
            }
        }
        // TODO: subscribe to the named registry and synchronize it? What happes on a name clash? How would this be handled
        // if both sessions had a "node_a" for example? Which resolves, local only?
    }

    /// Get a given remote actor, or spawn it if it doesn't exist.
    async fn get_or_spawn_remote_actor(
        &self,
        myself: &ActorRef<Self>,
        actor_name: Option<String>,
        actor_pid: u64,
        state: &mut NodeSessionState,
    ) -> Result<ActorRef<RemoteActor>, SpawnErr> {
        match state.remote_actors.get(&actor_pid) {
            Some(actor) => Ok(actor.clone()),
            _ => {
                let remote_actor_s: RemoteActor = myself.into();
                let (remote_actor, _) = remote_actor_s
                    .spawn_linked(actor_name, actor_pid, self.node_id, myself.get_cell())
                    .await?;
                state.remote_actors.insert(actor_pid, remote_actor.clone());
                Ok(remote_actor)
            }
        }
    }
}

/// The state of the node session
pub struct NodeSessionState {
    tcp: Option<ActorRef<crate::net::session::Session>>,
    peer_addr: Option<SocketAddr>,
    local_addr: Option<SocketAddr>,
    name: Option<auth_protocol::NameMessage>,
    auth: AuthenticationState,
    remote_actors: HashMap<u64, ActorRef<RemoteActor>>,
}

impl NodeSessionState {
    fn is_tcp_actor(&self, actor: ActorId) -> bool {
        self.tcp
            .as_ref()
            .map(|t| t.get_id() == actor)
            .unwrap_or(false)
    }

    fn tcp_send_auth(&self, msg: auth_protocol::AuthenticationMessage) {
        if let Some(tcp) = &self.tcp {
            let net_msg = crate::protocol::NetworkMessage {
                message: Some(crate::protocol::meta::network_message::Message::Auth(msg)),
            };
            let _ = ractor::cast!(tcp, SessionMessage::Send(net_msg));
        }
    }

    fn tcp_send_node(&self, msg: node_protocol::NodeMessage) {
        if let Some(tcp) = &self.tcp {
            let net_msg = crate::protocol::NetworkMessage {
                message: Some(crate::protocol::meta::network_message::Message::Node(msg)),
            };
            let _ = ractor::cast!(tcp, SessionMessage::Send(net_msg));
        }
    }

    fn tcp_send_control(&self, msg: control_protocol::ControlMessage) {
        if let Some(tcp) = &self.tcp {
            let net_msg = crate::protocol::NetworkMessage {
                message: Some(crate::protocol::meta::network_message::Message::Control(
                    msg,
                )),
            };
            let _ = ractor::cast!(tcp, SessionMessage::Send(net_msg));
        }
    }

    fn schedule_tcp_ping(&self) {
        if let Some(tcp) = &self.tcp {
            #[allow(clippy::let_underscore_future)]
            let _ = tcp.send_after(Self::get_send_delay(), || {
                let ping = control_protocol::ControlMessage {
                    msg: Some(control_protocol::control_message::Msg::Ping(
                        control_protocol::Ping {
                            timestamp: Some(prost_types::Timestamp::from(
                                std::time::SystemTime::now(),
                            )),
                        },
                    )),
                };
                let net_msg = crate::protocol::NetworkMessage {
                    message: Some(crate::protocol::meta::network_message::Message::Control(
                        ping,
                    )),
                };
                SessionMessage::Send(net_msg)
            });
        }
    }

    fn get_send_delay() -> Duration {
        Duration::from_millis(
            rand::thread_rng().gen_range(MIN_PING_LATENCY_MS..MAX_PING_LATENCY_MS),
        )
    }
}

#[async_trait::async_trait]
impl Actor for NodeSession {
    type Msg = super::NodeSessionMessage;
    type State = NodeSessionState;

    async fn pre_start(&self, _myself: ActorRef<Self>) -> Result<Self::State, ActorProcessingErr> {
        Ok(Self::State {
            tcp: None,
            name: None,
            auth: if self.is_server {
                AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init())
            } else {
                AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init())
            },
            remote_actors: HashMap::new(),
            peer_addr: None,
            local_addr: None,
        })
    }

    async fn post_stop(
        &self,
        myself: ActorRef<Self>,
        _state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        // unhook monitoring sessions
        ractor::pg::demonitor(
            ractor::pg::ALL_GROUPS_NOTIFICATION.to_string(),
            myself.get_id(),
        );
        ractor::registry::pid_registry::demonitor(myself.get_id());

        Ok(())
    }

    async fn handle(
        &self,
        myself: ActorRef<Self>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            super::NodeSessionMessage::SetTcpStream(stream) if state.tcp.is_none() => {
                let peer_addr = stream.peer_addr()?;
                let my_addr = stream.local_addr()?;
                // startup the TCP socket handler for message write + reading
                let actor = crate::net::session::Session::spawn_linked(
                    myself.clone(),
                    stream,
                    peer_addr,
                    my_addr,
                    myself.get_cell(),
                )
                .await?;

                state.tcp = Some(actor);
                state.peer_addr = Some(peer_addr);
                state.local_addr = Some(my_addr);

                // If a client-connection, startup the handshake
                if !self.is_server {
                    state.tcp_send_auth(auth_protocol::AuthenticationMessage {
                        msg: Some(auth_protocol::authentication_message::Msg::Name(
                            self.node_name.clone(),
                        )),
                    });
                }
            }
            Self::Msg::MessageReceived(maybe_network_message) if state.tcp.is_some() => {
                if let Some(network_message) = maybe_network_message.message {
                    match network_message {
                        crate::protocol::meta::network_message::Message::Auth(auth_message) => {
                            let p_state = state.auth.is_ok();
                            self.handle_auth(state, auth_message, myself.clone()).await;
                            // If we were not originally authenticated, but now we are, startup the node sync'ing logic
                            if !p_state && state.auth.is_ok() {
                                self.after_authenticated(myself, state);
                            }
                        }
                        crate::protocol::meta::network_message::Message::Node(node_message) => {
                            self.handle_node(state, node_message, myself);
                        }
                        crate::protocol::meta::network_message::Message::Control(
                            control_message,
                        ) => {
                            self.handle_control(state, control_message, myself).await;
                        }
                    }
                }
            }
            Self::Msg::SendMessage(node_message) if state.tcp.is_some() => {
                state.tcp_send_node(node_message);
            }
            Self::Msg::GetAuthenticationState(reply) => {
                let _ = reply.send(state.auth.is_ok());
            }
            _ => {
                // no-op, ignore
            }
        }
        Ok(())
    }

    async fn handle_supervisor_evt(
        &self,
        myself: ActorRef<Self>,
        message: SupervisionEvent,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            SupervisionEvent::ActorStarted(_) => {}
            SupervisionEvent::ActorPanicked(actor, msg) => {
                if state.is_tcp_actor(actor.get_id()) {
                    log::error!(
                        "Node session {:?}'s TCP session panicked with '{}'",
                        state.name,
                        msg
                    );
                    myself.stop(Some("tcp_session_err".to_string()));
                } else if let Some(actor) = state.remote_actors.remove(&actor.get_id().get_pid()) {
                    log::warn!(
                        "Node session {:?} had a remote actor ({}) panic with {}",
                        state.name,
                        actor.get_id(),
                        msg
                    );
                    actor.kill();

                    // NOTE: This is a legitimate panic of the `RemoteActor`, not the actor on the remote machine panicking (which
                    // is handled by the remote actor's supervisor). Therefore we should re-spawn the actor, and if we can't we
                    // should ourself die. Something is seriously wrong...
                    let pid = actor.get_id().get_pid();
                    let name = actor.get_name();
                    let _ = self
                        .get_or_spawn_remote_actor(&myself, name, pid, state)
                        .await?;
                } else {
                    log::error!("NodeSesion {:?} received an unknown child panic superivision message from {} - '{}'",
                        state.name,
                        actor.get_id(),
                        msg
                    );
                }
            }
            SupervisionEvent::ActorTerminated(actor, _, maybe_reason) => {
                if state.is_tcp_actor(actor.get_id()) {
                    log::info!("NodeSession {:?} connection closed", state.name);
                    myself.stop(Some("tcp_session_closed".to_string()));
                    // TODO: resilient connection?
                } else if let Some(actor) = state.remote_actors.remove(&actor.get_id().get_pid()) {
                    log::debug!(
                        "NodeSession {:?} received a child exit with reason '{:?}'",
                        state.name,
                        maybe_reason
                    );
                    actor.stop(Some("remote_exit".to_string()));
                } else {
                    log::warn!("NodeSession {:?} received an unknown child actor exit event from {} - '{:?}'",
                        state.name,
                        actor.get_id(),
                        maybe_reason,
                    );
                }
            }
            // ======== Lifecycle event handlers (PG groups + PID registry) ======== //
            SupervisionEvent::ProcessGroupChanged(change) => match change {
                GroupChangeMessage::Join(group, actors) => {
                    let filtered = actors
                        .into_iter()
                        .filter(|act| act.supports_remoting())
                        .map(|act| control_protocol::Actor {
                            name: act.get_name(),
                            pid: act.get_id().get_pid(),
                        })
                        .collect::<Vec<_>>();
                    if !filtered.is_empty() {
                        let msg = control_protocol::ControlMessage {
                            msg: Some(control_protocol::control_message::Msg::PgJoin(
                                control_protocol::PgJoin {
                                    group,
                                    actors: filtered,
                                },
                            )),
                        };
                        state.tcp_send_control(msg);
                    }
                }
                GroupChangeMessage::Leave(group, actors) => {
                    let filtered = actors
                        .into_iter()
                        .filter(|act| act.supports_remoting())
                        .map(|act| control_protocol::Actor {
                            name: act.get_name(),
                            pid: act.get_id().get_pid(),
                        })
                        .collect::<Vec<_>>();
                    if !filtered.is_empty() {
                        let msg = control_protocol::ControlMessage {
                            msg: Some(control_protocol::control_message::Msg::PgLeave(
                                control_protocol::PgLeave {
                                    group,
                                    actors: filtered,
                                },
                            )),
                        };
                        state.tcp_send_control(msg);
                    }
                }
            },
            SupervisionEvent::PidLifecycleEvent(pid) => match pid {
                PidLifecycleEvent::Spawn(who) => {
                    if who.supports_remoting() {
                        let msg = control_protocol::ControlMessage {
                            msg: Some(control_protocol::control_message::Msg::Spawn(
                                control_protocol::Spawn {
                                    actors: vec![control_protocol::Actor {
                                        pid: who.get_id().get_pid(),
                                        name: who.get_name(),
                                    }],
                                },
                            )),
                        };
                        state.tcp_send_control(msg);
                    }
                }
                PidLifecycleEvent::Terminate(who) => {
                    if who.supports_remoting() {
                        let msg = control_protocol::ControlMessage {
                            msg: Some(control_protocol::control_message::Msg::Terminate(
                                control_protocol::Terminate {
                                    ids: vec![who.get_id().get_pid()],
                                },
                            )),
                        };
                        state.tcp_send_control(msg);
                    }
                }
            },
        }
        Ok(())
    }
}