Skip to main content

elfo_network/
lib.rs

1//! Implements the network layer of the distributed elfo system.
2//! [Configuration].
3//!
4//! [Configuration]: config::Config
5
6#![cfg_attr(docsrs, feature(doc_cfg))]
7
8#[macro_use]
9extern crate static_assertions;
10#[macro_use]
11extern crate elfo_utils;
12
13use std::{
14    fmt::{self, Display},
15    hash::Hash,
16};
17
18use elfo_core::{
19    messages::UpdateConfig,
20    msg,
21    routers::{MapRouter, Outcome},
22    ActorGroup, Blueprint, Context, Topology,
23};
24
25use crate::{
26    config::Config,
27    protocol::{AbortConnection, ConnectionFailed, GroupMeta, HandleConnection},
28};
29
30pub mod config;
31
32mod codec;
33mod connman;
34mod discovery;
35mod frame;
36mod node_map;
37mod protocol;
38mod rtt;
39mod socket;
40mod worker;
41
42#[derive(PartialEq, Eq, Hash, Clone)]
43enum ActorKey {
44    Discovery,
45    Worker { local: GroupMeta, remote: GroupMeta },
46}
47
48impl Display for ActorKey {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        match self {
51            ActorKey::Discovery => f.write_str("discovery"),
52            ActorKey::Worker { local, remote } => {
53                write!(
54                    f,
55                    "{}:{}:{}",
56                    local.group_name, remote.node_no, remote.group_name
57                )
58            }
59        }
60    }
61}
62
63type NetworkContext = Context<Config, ActorKey>;
64
65/// Creates a blueprint for the internode network layer.
66pub fn new(topology: &Topology) -> Blueprint {
67    let topology = topology.clone();
68
69    ActorGroup::new()
70        .config::<Config>()
71        .stop_order(100)
72        .router(MapRouter::new(|envelope| {
73            msg!(match envelope {
74                // TODO: send to all connections.
75                UpdateConfig => Outcome::Unicast(ActorKey::Discovery),
76                msg @ HandleConnection => Outcome::Unicast(ActorKey::Worker {
77                    local: msg.local.clone(),
78                    remote: msg.remote.clone(),
79                }),
80                msg @ AbortConnection => Outcome::GentleUnicast(ActorKey::Worker {
81                    local: msg.local.clone(),
82                    remote: msg.remote.clone(),
83                }),
84                ConnectionFailed => Outcome::Unicast(ActorKey::Discovery),
85                _ => Outcome::Default,
86            })
87        }))
88        .exec(move |ctx: Context<Config, ActorKey>| {
89            let topology = topology.clone();
90            async move {
91                match ctx.key().clone() {
92                    ActorKey::Discovery => discovery::Discovery::new(ctx, topology).main().await,
93                    ActorKey::Worker { local, remote } => {
94                        worker::Worker::new(ctx, local, remote, topology)
95                            .main()
96                            .await
97                    }
98                }
99            }
100        })
101}