1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use futures::TryFutureExt;
use crate::{
actor::Actor,
context::Context,
demux::Demux,
errors::{RequestError, StartError},
message,
messages::{Ping, UpdateConfig},
object::Object,
topology::Topology,
};
type Result<T, E = StartError> = std::result::Result<T, E>;
async fn send_configs_to_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
let futures = topology
.actor_groups()
.filter(|group| group.is_entrypoint)
.map(|group| {
let config = Default::default();
ctx.request(UpdateConfig { config })
.from(group.addr)
.resolve()
.or_else(|err| async move {
match err {
RequestError::Ignored => Ok(Ok(())),
_ => Err(StartError::Other(
"initial messages cannot be delivered".into(),
)),
}
})
});
let error_count = futures::future::try_join_all(futures)
.await?
.into_iter()
.filter_map(Result::err)
.count();
if error_count == 0 {
Ok(())
} else {
Err(StartError::InvalidConfig)
}
}
async fn start_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
let futures = topology
.actor_groups()
.filter(|group| group.is_entrypoint)
.map(|group| ctx.request(Ping).from(group.addr).resolve());
futures::future::try_join_all(futures)
.await
.map_err(|_| StartError::Other("entrypoint cannot started".into()))?;
Ok(())
}
pub async fn start(topology: Topology) {
try_start(topology).await.expect("cannot start")
}
pub async fn try_start(topology: Topology) -> Result<()> {
do_start(topology).await?;
let () = futures::future::pending().await;
Ok(())
}
#[doc(hidden)]
pub async fn do_start(topology: Topology) -> Result<Context> {
message::init();
let entry = topology.book.vacant_entry();
let addr = entry.addr();
entry.insert(Object::new(addr, Actor::new(addr)));
let ctx = Context::new(topology.book.clone(), Demux::default()).with_addr(addr);
send_configs_to_entrypoints(&ctx, &topology).await?;
start_entrypoints(&ctx, &topology).await?;
Ok(ctx)
}