mm1_node/runtime/
rt.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::collections::HashMap;
use std::sync::Arc;

use tokio::runtime::{Handle, Runtime};
use tracing::{instrument, warn};

use super::config::EffectiveActorConfig;
use crate::runtime::actor_key::ActorKey;
use crate::runtime::config::Mm1Config;
use crate::runtime::container::{Container, ContainerArgs, ContainerError};
use crate::runtime::context;
use crate::runtime::rt_api::{RequestAddressError, RtApi};
use crate::runtime::runnable::BoxedRunnable;

#[derive(Debug)]
pub struct Rt {
    #[allow(unused)]
    config: Mm1Config,

    rt_default: Runtime,
    rt_named:   HashMap<String, Runtime>,
}

#[derive(Debug, thiserror::Error)]
pub enum RtCreateError {
    #[error("runtime config error: {}", _0)]
    RuntimeConfigError(String),
    #[error("runtime init error: {}", _0)]
    RuntimeInitError(#[source] std::io::Error),
}

#[derive(Debug, thiserror::Error)]
pub enum RtRunError {
    #[error("request address error: {}", _0)]
    RequestAddressError(
        #[allow(private_interfaces)]
        #[source]
        RequestAddressError,
    ),
    #[error("container error: {}", _0)]
    #[allow(private_interfaces)]
    ContainerError(#[source] ContainerError),
}

impl Rt {
    pub fn create(config: Mm1Config) -> Result<Self, RtCreateError> {
        config
            .validate()
            .map_err(RtCreateError::RuntimeConfigError)?;
        let (rt_default, rt_named) = config
            .build_runtimes()
            .map_err(RtCreateError::RuntimeInitError)?;

        Ok(Self {
            config,
            rt_default,
            rt_named,
        })
    }

    pub fn run(&self, main_actor: BoxedRunnable<context::ActorContext>) -> Result<(), RtRunError> {
        let config = self.config.clone();
        let rt_default = self.rt_default.handle().to_owned();
        let rt_named = self
            .rt_named
            .iter()
            .map(|(k, v)| (k.to_owned(), v.handle().to_owned()))
            .collect::<HashMap<_, _>>();

        let main_actor_key = ActorKey::root().child(main_actor.func_name());
        let main_actor_config = config.actor_config(&main_actor_key);
        let rt_handle = if let Some(rt_key) = main_actor_config.runtime_key() {
            rt_named
                .get(rt_key)
                .cloned()
                .expect("the config's validity should have been checked")
        } else {
            rt_default.clone()
        };

        rt_handle.block_on(run_inner(
            config.clone(),
            main_actor_key,
            main_actor_config,
            rt_default,
            rt_named,
            main_actor,
        ))
    }
}

#[instrument(skip_all, fields(func = main_actor.func_name()))]
async fn run_inner(
    config: Mm1Config,
    actor_key: ActorKey,
    actor_config: impl EffectiveActorConfig,
    rt_default: Handle,
    rt_named: HashMap<String, Handle>,
    main_actor: BoxedRunnable<context::ActorContext>,
) -> Result<(), RtRunError> {
    let rt_api = RtApi::create(config.subnet_address, rt_default, rt_named);

    let subnet_lease = rt_api
        .request_address(actor_config.netmask())
        .await
        .map_err(RtRunError::RequestAddressError)?;

    let args = ContainerArgs {
        ack_to: None,
        link_to: Default::default(),
        actor_key,

        subnet_lease,
        rt_api: rt_api.clone(),
        rt_config: Arc::new(config),
    };
    let (_subnet_lease, exit_reason) = Container::create(args, main_actor)
        .map_err(RtRunError::ContainerError)?
        .run()
        .await
        .map_err(RtRunError::ContainerError)?;

    if let Err(failure) = exit_reason {
        warn!("main-actor failure: {}", failure);
    }

    Ok(())
}