1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use futures::TryFutureExt;

use crate::{
    actor::Actor,
    context::Context,
    demux::Demux,
    errors::{RequestError, StartError},
    message,
    messages::{Ping, UpdateConfig},
    object::Object,
    topology::Topology,
};

type Result<T, E = StartError> = std::result::Result<T, E>;

async fn send_configs_to_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
    let futures = topology
        .actor_groups()
        .filter(|group| group.is_entrypoint)
        .map(|group| {
            let config = Default::default();
            ctx.request(UpdateConfig { config })
                .from(group.addr)
                .resolve()
                .or_else(|err| async move {
                    match err {
                        RequestError::Ignored => Ok(Ok(())),
                        _ => Err(StartError::Other(
                            "initial messages cannot be delivered".into(),
                        )),
                    }
                })
        });

    let error_count = futures::future::try_join_all(futures)
        .await?
        .into_iter()
        .filter_map(Result::err)
        .count();

    if error_count == 0 {
        Ok(())
    } else {
        Err(StartError::InvalidConfig)
    }
}

async fn start_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
    let futures = topology
        .actor_groups()
        .filter(|group| group.is_entrypoint)
        .map(|group| ctx.request(Ping).from(group.addr).resolve());

    futures::future::try_join_all(futures)
        .await
        .map_err(|_| StartError::Other("entrypoint cannot started".into()))?;

    Ok(())
}

pub async fn start(topology: Topology) {
    try_start(topology).await.expect("cannot start")
}

pub async fn try_start(topology: Topology) -> Result<()> {
    do_start(topology).await?;

    // TODO: graceful termination based on topology.
    let () = futures::future::pending().await;
    Ok(())
}

#[doc(hidden)]
pub async fn do_start(topology: Topology) -> Result<Context> {
    message::init();

    let entry = topology.book.vacant_entry();
    let addr = entry.addr();
    entry.insert(Object::new(addr, Actor::new(addr)));

    let ctx = Context::new(topology.book.clone(), Demux::default()).with_addr(addr);
    send_configs_to_entrypoints(&ctx, &topology).await?;
    start_entrypoints(&ctx, &topology).await?;

    Ok(ctx)
}