1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//! TODO

#![warn(rust_2018_idioms, unreachable_pub, missing_docs)]

#[macro_use]
extern crate static_assertions;
#[macro_use]
extern crate elfo_utils;

use std::{
    fmt::{self, Display},
    hash::Hash,
};

use elfo_core::{
    messages::UpdateConfig,
    msg,
    node::NodeNo,
    routers::{MapRouter, Outcome},
    ActorGroup, Blueprint, Context, GroupNo, RestartPolicy, Topology,
};

use crate::{config::Config, protocol::HandleConnection};

mod codec;
mod config;
mod discovery;
mod frame;
mod node_map;
mod protocol;
mod rtt;
mod socket;
mod worker;

#[derive(PartialEq, Eq, Hash, Clone)]
enum ActorKey {
    Discovery,
    Worker {
        local: (GroupNo, String),
        remote: (NodeNo, GroupNo, String),
    },
}

impl Display for ActorKey {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        // TODO: resolve `group_no` to name.
        match self {
            ActorKey::Discovery => f.write_str("discovery"),
            ActorKey::Worker { local, remote } => {
                write!(f, "{}:{}:{}", local.1, remote.0, remote.2)
            }
        }
    }
}

type NetworkContext = Context<Config, ActorKey>;

/// TODO
pub fn new(topology: &Topology) -> Blueprint {
    let topology = topology.clone();

    ActorGroup::new()
        .config::<Config>()
        // The restart policy is overrided by the discovery actor.
        .restart_policy(RestartPolicy::never())
        .router(MapRouter::new(|envelope| {
            msg!(match envelope {
                // TODO: send to all connections.
                UpdateConfig => Outcome::Unicast(ActorKey::Discovery),
                msg @ HandleConnection => Outcome::Unicast(ActorKey::Worker {
                    local: msg.local.clone(),
                    remote: msg.remote.clone(),
                }),
                _ => Outcome::Default,
            })
        }))
        .exec(move |ctx: Context<Config, ActorKey>| {
            let topology = topology.clone();
            async move {
                match ctx.key().clone() {
                    ActorKey::Discovery => discovery::Discovery::new(ctx, topology).main().await,
                    ActorKey::Worker { local, remote } => {
                        worker::Worker::new(ctx, local, remote, topology)
                            .main()
                            .await
                    }
                }
            }
        })
}