1#![cfg_attr(docsrs, feature(doc_cfg))]
7
8#[macro_use]
9extern crate static_assertions;
10#[macro_use]
11extern crate elfo_utils;
12
13use std::{
14 fmt::{self, Display},
15 hash::Hash,
16};
17
18use elfo_core::{
19 messages::UpdateConfig,
20 msg,
21 routers::{MapRouter, Outcome},
22 ActorGroup, Blueprint, Context, Topology,
23};
24
25use crate::{
26 config::Config,
27 protocol::{AbortConnection, ConnectionFailed, GroupMeta, HandleConnection},
28};
29
30pub mod config;
31
32mod codec;
33mod connman;
34mod discovery;
35mod frame;
36mod node_map;
37mod protocol;
38mod rtt;
39mod socket;
40mod worker;
41
42#[derive(PartialEq, Eq, Hash, Clone)]
43enum ActorKey {
44 Discovery,
45 Worker { local: GroupMeta, remote: GroupMeta },
46}
47
48impl Display for ActorKey {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 match self {
51 ActorKey::Discovery => f.write_str("discovery"),
52 ActorKey::Worker { local, remote } => {
53 write!(
54 f,
55 "{}:{}:{}",
56 local.group_name, remote.node_no, remote.group_name
57 )
58 }
59 }
60 }
61}
62
63type NetworkContext = Context<Config, ActorKey>;
64
65pub fn new(topology: &Topology) -> Blueprint {
67 let topology = topology.clone();
68
69 ActorGroup::new()
70 .config::<Config>()
71 .stop_order(100)
72 .router(MapRouter::new(|envelope| {
73 msg!(match envelope {
74 UpdateConfig => Outcome::Unicast(ActorKey::Discovery),
76 msg @ HandleConnection => Outcome::Unicast(ActorKey::Worker {
77 local: msg.local.clone(),
78 remote: msg.remote.clone(),
79 }),
80 msg @ AbortConnection => Outcome::GentleUnicast(ActorKey::Worker {
81 local: msg.local.clone(),
82 remote: msg.remote.clone(),
83 }),
84 ConnectionFailed => Outcome::Unicast(ActorKey::Discovery),
85 _ => Outcome::Default,
86 })
87 }))
88 .exec(move |ctx: Context<Config, ActorKey>| {
89 let topology = topology.clone();
90 async move {
91 match ctx.key().clone() {
92 ActorKey::Discovery => discovery::Discovery::new(ctx, topology).main().await,
93 ActorKey::Worker { local, remote } => {
94 worker::Worker::new(ctx, local, remote, topology)
95 .main()
96 .await
97 }
98 }
99 }
100 })
101}