1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//! TODO

#![warn(rust_2018_idioms, unreachable_pub, missing_docs)]

#[macro_use]
extern crate static_assertions;
#[macro_use]
extern crate elfo_utils;

use std::{
    fmt::{self, Display},
    hash::Hash,
};

use elfo_core::{
    messages::UpdateConfig,
    msg,
    routers::{MapRouter, Outcome},
    ActorGroup, Blueprint, Context, Topology,
};

use crate::{
    config::Config,
    protocol::{DataConnectionFailed, GroupInfo, HandleConnection},
};

mod codec;
mod config;
mod discovery;
mod frame;
mod node_map;
mod protocol;
mod rtt;
mod socket;
mod worker;

#[derive(PartialEq, Eq, Hash, Clone)]
enum ActorKey {
    Discovery,
    Worker { local: GroupInfo, remote: GroupInfo },
}

impl Display for ActorKey {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ActorKey::Discovery => f.write_str("discovery"),
            ActorKey::Worker { local, remote } => {
                write!(
                    f,
                    "{}:{}:{}",
                    local.group_name, remote.node_no, remote.group_name
                )
            }
        }
    }
}

type NetworkContext = Context<Config, ActorKey>;

/// TODO
pub fn new(topology: &Topology) -> Blueprint {
    let topology = topology.clone();

    ActorGroup::new()
        .config::<Config>()
        .stop_order(100)
        .router(MapRouter::new(|envelope| {
            msg!(match envelope {
                // TODO: send to all connections.
                UpdateConfig => Outcome::Unicast(ActorKey::Discovery),
                msg @ HandleConnection => Outcome::Unicast(ActorKey::Worker {
                    local: msg.local.clone(),
                    remote: msg.remote.clone(),
                }),
                DataConnectionFailed => Outcome::Unicast(ActorKey::Discovery),
                _ => Outcome::Default,
            })
        }))
        .exec(move |ctx: Context<Config, ActorKey>| {
            let topology = topology.clone();
            async move {
                match ctx.key().clone() {
                    ActorKey::Discovery => discovery::Discovery::new(ctx, topology).main().await,
                    ActorKey::Worker { local, remote } => {
                        worker::Worker::new(ctx, local, remote, topology)
                            .main()
                            .await
                    }
                }
            }
        })
}