mm1_node/runtime/
rt.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use mm1_address::address::Address;
5use mm1_common::types::AnyError;
6use mm1_runnable::local::{self, BoxedRunnable};
7use tokio::runtime::{Handle, Runtime};
8use tokio::sync::mpsc;
9use tracing::{error, instrument};
10
11use crate::actor_key::ActorKey;
12use crate::config::{EffectiveActorConfig, Mm1NodeConfig, Valid};
13use crate::init::InitActorArgs;
14use crate::runtime::container::{Container, ContainerArgs, ContainerError};
15use crate::runtime::context;
16use crate::runtime::rt_api::{RequestAddressError, RtApi};
17
18#[derive(Debug)]
19pub struct Rt {
20    #[allow(unused)]
21    config:           Valid<Mm1NodeConfig>,
22    rt_default:       Runtime,
23    rt_named:         HashMap<String, Runtime>,
24    tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
25}
26
27#[derive(Debug, thiserror::Error)]
28pub enum RtCreateError {
29    #[error("runtime config error: {}", _0)]
30    RuntimeConfigError(crate::config::ValidationError<Mm1NodeConfig>),
31    #[error("runtime init error: {}", _0)]
32    RuntimeInitError(#[source] std::io::Error),
33}
34
35#[derive(Debug, thiserror::Error)]
36pub enum RtRunError {
37    #[error("request address error: {}", _0)]
38    RequestAddressError(
39        #[allow(private_interfaces)]
40        #[source]
41        RequestAddressError,
42    ),
43    #[error("container error: {}", _0)]
44    #[allow(private_interfaces)]
45    ContainerError(#[source] ContainerError),
46}
47
48impl Rt {
49    pub fn create(config: Mm1NodeConfig) -> Result<Self, RtCreateError> {
50        let config = config
51            .validate()
52            .map_err(RtCreateError::RuntimeConfigError)?;
53        let (rt_default, rt_named) = config
54            .build_runtimes()
55            .map_err(RtCreateError::RuntimeInitError)?;
56
57        let (tx_actor_failure, _rx_actor_failure) = mpsc::unbounded_channel();
58        Ok(Self {
59            config,
60            rt_default,
61            rt_named,
62            tx_actor_failure,
63        })
64    }
65
66    pub fn with_actor_failure_sink(
67        self,
68        tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
69    ) -> Self {
70        Self {
71            tx_actor_failure,
72            ..self
73        }
74    }
75
76    pub fn run(&self, main_actor: BoxedRunnable<context::ActorContext>) -> Result<(), RtRunError> {
77        let config = self.config.clone();
78        let rt_default = self.rt_default.handle().to_owned();
79        let rt_named = self
80            .rt_named
81            .iter()
82            .map(|(k, v)| (k.to_owned(), v.handle().to_owned()))
83            .collect::<HashMap<_, _>>();
84
85        let rt_handle = rt_default.clone();
86
87        let init_actor_args = InitActorArgs {
88            local_subnet_auto: config.local_subnet_address_auto(),
89            local_subnets_bind: config.local_subnet_addresses_bind().collect(),
90            #[cfg(feature = "multinode")]
91            multinode_inbound: config.multinode_inbound().collect(),
92            #[cfg(feature = "multinode")]
93            multinode_outbound: config.multinode_outbound().collect(),
94        };
95
96        rt_handle.block_on(run_inner(
97            config.clone(),
98            ActorKey::root(),
99            crate::init::init_actor_config(),
100            rt_default,
101            rt_named,
102            local::boxed_from_fn((crate::init::run, (main_actor, init_actor_args))),
103            self.tx_actor_failure.clone(),
104        ))
105    }
106}
107
108#[instrument(skip_all, fields(func = main_actor.func_name()))]
109async fn run_inner(
110    config: Valid<Mm1NodeConfig>,
111    actor_key: ActorKey,
112    actor_config: impl EffectiveActorConfig,
113    rt_default: Handle,
114    rt_named: HashMap<String, Handle>,
115    main_actor: BoxedRunnable<context::ActorContext>,
116    tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
117) -> Result<(), RtRunError> {
118    let rt_api = RtApi::create(config.local_subnet_address_auto(), rt_default, rt_named);
119
120    let subnet_lease = rt_api
121        .request_address(actor_config.netmask())
122        .await
123        .map_err(RtRunError::RequestAddressError)?;
124
125    let args = ContainerArgs {
126        ack_to: None,
127        link_to: Default::default(),
128        actor_key,
129
130        subnet_lease,
131        rt_api: rt_api.clone(),
132        rt_config: Arc::new(config),
133    };
134    let exit_reason = Container::create(args, main_actor, tx_actor_failure)
135        .map_err(RtRunError::ContainerError)?
136        .run()
137        .await
138        .map_err(RtRunError::ContainerError)?;
139
140    if let Err(failure) = exit_reason {
141        error!(
142            "main-actor failure: {}",
143            failure
144                .chain()
145                .map(|reason| reason.to_string())
146                .collect::<Vec<_>>()
147                .join(" <- ")
148        );
149    }
150
151    Ok(())
152}