1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
use std::{
    future::{self, Future},
    sync::Arc,
};

use futures::TryFutureExt;

use crate::{
    actor::Actor,
    context::Context,
    demux::Demux,
    errors::{RequestError, StartError},
    message,
    messages::{Ping, UpdateConfig},
    object::{Object, ObjectMeta},
    tls,
    topology::Topology,
    trace_id,
};

type Result<T, E = StartError> = std::result::Result<T, E>;

async fn send_configs_to_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
    let futures = topology
        .actor_groups()
        .filter(|group| group.is_entrypoint)
        .map(|group| {
            let config = Default::default();
            ctx.request(UpdateConfig { config })
                .from(group.addr)
                .resolve()
                .or_else(|err| async move {
                    match err {
                        RequestError::Ignored => Ok(Ok(())),
                        _ => Err(StartError::Other(
                            "initial messages cannot be delivered".into(),
                        )),
                    }
                })
        });

    let error_count = futures::future::try_join_all(futures)
        .await?
        .into_iter()
        .filter_map(Result::err)
        .count();

    if error_count == 0 {
        Ok(())
    } else {
        Err(StartError::InvalidConfig)
    }
}

async fn start_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
    let futures = topology
        .actor_groups()
        .filter(|group| group.is_entrypoint)
        .map(|group| ctx.request(Ping).from(group.addr).resolve());

    futures::future::try_join_all(futures)
        .await
        .map_err(|_| StartError::Other("entrypoint cannot started".into()))?;

    Ok(())
}

pub async fn start(topology: Topology) {
    try_start(topology).await.expect("cannot start")
}

pub async fn try_start(topology: Topology) -> Result<()> {
    do_start(topology, |_| future::ready(())).await?;

    // TODO: graceful termination based on topology.
    let () = future::pending().await;
    Ok(())
}

#[doc(hidden)]
pub async fn do_start<F: Future>(
    topology: Topology,
    f: impl FnOnce(Context) -> F,
) -> Result<F::Output> {
    message::init();

    let entry = topology.book.vacant_entry();
    let addr = entry.addr();
    entry.insert(Object::new(addr, Actor::new(addr)));

    let dumper = topology.dumper.for_group(false); // TODO: should we dump the starter?

    let meta = ObjectMeta {
        group: "starter".into(),
        key: Some("_".into()), // Just like `Singleton`.
    };
    let initial_trace_id = trace_id::generate();
    tls::scope(Arc::new(meta), initial_trace_id, async move {
        let ctx = Context::new(topology.book.clone(), dumper, Demux::default()).with_addr(addr);
        send_configs_to_entrypoints(&ctx, &topology).await?;
        start_entrypoints(&ctx, &topology).await?;
        Ok(f(ctx).await)
    })
    .await
}