mm1_node/runtime/
rt.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::runtime::{Handle, Runtime};
5use tracing::{instrument, warn};
6
7use super::config::EffectiveActorConfig;
8use crate::runtime::actor_key::ActorKey;
9use crate::runtime::config::Mm1Config;
10use crate::runtime::container::{Container, ContainerArgs, ContainerError};
11use crate::runtime::context;
12use crate::runtime::rt_api::{RequestAddressError, RtApi};
13use crate::runtime::runnable::BoxedRunnable;
14
15#[derive(Debug)]
16pub struct Rt {
17    #[allow(unused)]
18    config: Mm1Config,
19
20    rt_default: Runtime,
21    rt_named:   HashMap<String, Runtime>,
22}
23
24#[derive(Debug, thiserror::Error)]
25pub enum RtCreateError {
26    #[error("runtime config error: {}", _0)]
27    RuntimeConfigError(String),
28    #[error("runtime init error: {}", _0)]
29    RuntimeInitError(#[source] std::io::Error),
30}
31
32#[derive(Debug, thiserror::Error)]
33pub enum RtRunError {
34    #[error("request address error: {}", _0)]
35    RequestAddressError(
36        #[allow(private_interfaces)]
37        #[source]
38        RequestAddressError,
39    ),
40    #[error("container error: {}", _0)]
41    #[allow(private_interfaces)]
42    ContainerError(#[source] ContainerError),
43}
44
45impl Rt {
46    pub fn create(config: Mm1Config) -> Result<Self, RtCreateError> {
47        config
48            .validate()
49            .map_err(RtCreateError::RuntimeConfigError)?;
50        let (rt_default, rt_named) = config
51            .build_runtimes()
52            .map_err(RtCreateError::RuntimeInitError)?;
53
54        Ok(Self {
55            config,
56            rt_default,
57            rt_named,
58        })
59    }
60
61    pub fn run(&self, main_actor: BoxedRunnable<context::ActorContext>) -> Result<(), RtRunError> {
62        let config = self.config.clone();
63        let rt_default = self.rt_default.handle().to_owned();
64        let rt_named = self
65            .rt_named
66            .iter()
67            .map(|(k, v)| (k.to_owned(), v.handle().to_owned()))
68            .collect::<HashMap<_, _>>();
69
70        let main_actor_key = ActorKey::root().child(main_actor.func_name());
71        let main_actor_config = config.actor_config(&main_actor_key);
72        let rt_handle = if let Some(rt_key) = main_actor_config.runtime_key() {
73            rt_named
74                .get(rt_key)
75                .cloned()
76                .expect("the config's validity should have been checked")
77        } else {
78            rt_default.clone()
79        };
80
81        rt_handle.block_on(run_inner(
82            config.clone(),
83            main_actor_key,
84            main_actor_config,
85            rt_default,
86            rt_named,
87            main_actor,
88        ))
89    }
90}
91
92#[instrument(skip_all, fields(func = main_actor.func_name()))]
93async fn run_inner(
94    config: Mm1Config,
95    actor_key: ActorKey,
96    actor_config: impl EffectiveActorConfig,
97    rt_default: Handle,
98    rt_named: HashMap<String, Handle>,
99    main_actor: BoxedRunnable<context::ActorContext>,
100) -> Result<(), RtRunError> {
101    let rt_api = RtApi::create(config.subnet, rt_default, rt_named);
102
103    let subnet_lease = rt_api
104        .request_address(actor_config.netmask())
105        .await
106        .map_err(RtRunError::RequestAddressError)?;
107
108    let args = ContainerArgs {
109        ack_to: None,
110        link_to: Default::default(),
111        actor_key,
112
113        subnet_lease,
114        rt_api: rt_api.clone(),
115        rt_config: Arc::new(config),
116    };
117    let (_subnet_lease, exit_reason) = Container::create(args, main_actor)
118        .map_err(RtRunError::ContainerError)?
119        .run()
120        .await
121        .map_err(RtRunError::ContainerError)?;
122
123    if let Err(failure) = exit_reason {
124        warn!("main-actor failure: {}", failure);
125    }
126
127    Ok(())
128}