elfo-network 0.2.0-alpha.21

Distributed actors for elfo
Documentation
//! Implements the network layer of the distributed elfo system.
//! [Configuration].
//!
//! [Configuration]: config::Config

#![cfg_attr(docsrs, feature(doc_cfg))]

#[macro_use]
extern crate static_assertions;
#[macro_use]
extern crate elfo_utils;

use std::{
    fmt::{self, Display},
    hash::Hash,
};

use elfo_core::{
    messages::UpdateConfig,
    msg,
    routers::{MapRouter, Outcome},
    ActorGroup, Blueprint, Context, Topology,
};

use crate::{
    config::Config,
    protocol::{AbortConnection, ConnectionFailed, GroupMeta, HandleConnection},
};

pub mod config;

mod codec;
mod connman;
mod discovery;
mod frame;
mod node_map;
mod protocol;
mod rtt;
mod socket;
mod worker;

#[derive(PartialEq, Eq, Hash, Clone)]
enum ActorKey {
    Discovery,
    Worker { local: GroupMeta, remote: GroupMeta },
}

impl Display for ActorKey {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ActorKey::Discovery => f.write_str("discovery"),
            ActorKey::Worker { local, remote } => {
                write!(
                    f,
                    "{}:{}:{}",
                    local.group_name, remote.node_no, remote.group_name
                )
            }
        }
    }
}

type NetworkContext = Context<Config, ActorKey>;

/// Creates a blueprint for the internode network layer.
pub fn new(topology: &Topology) -> Blueprint {
    let topology = topology.clone();

    ActorGroup::new()
        .config::<Config>()
        .stop_order(100)
        .router(MapRouter::new(|envelope| {
            msg!(match envelope {
                // TODO: send to all connections.
                UpdateConfig => Outcome::Unicast(ActorKey::Discovery),
                msg @ HandleConnection => Outcome::Unicast(ActorKey::Worker {
                    local: msg.local.clone(),
                    remote: msg.remote.clone(),
                }),
                msg @ AbortConnection => Outcome::GentleUnicast(ActorKey::Worker {
                    local: msg.local.clone(),
                    remote: msg.remote.clone(),
                }),
                ConnectionFailed => Outcome::Unicast(ActorKey::Discovery),
                _ => Outcome::Default,
            })
        }))
        .exec(move |ctx: Context<Config, ActorKey>| {
            let topology = topology.clone();
            async move {
                match ctx.key().clone() {
                    ActorKey::Discovery => discovery::Discovery::new(ctx, topology).main().await,
                    ActorKey::Worker { local, remote } => {
                        worker::Worker::new(ctx, local, remote, topology)
                            .main()
                            .await
                    }
                }
            }
        })
}