mm1_node/runtime/
rt.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use mm1_address::address::Address;
5use mm1_common::types::AnyError;
6use mm1_core::tracing::TraceId;
7use mm1_runnable::local::{self, BoxedRunnable};
8use tokio::runtime::{Handle, Runtime};
9use tokio::sync::mpsc;
10use tracing::{error, instrument};
11
12use crate::actor_key::ActorKey;
13use crate::config::{EffectiveActorConfig, Mm1NodeConfig, Valid};
14use crate::init::InitActorArgs;
15use crate::runtime::container::{Container, ContainerArgs, ContainerError};
16use crate::runtime::context;
17use crate::runtime::rt_api::{RequestAddressError, RtApi};
18
19#[derive(Debug)]
20pub struct Rt {
21    #[allow(unused)]
22    config:           Valid<Mm1NodeConfig>,
23    rt_default:       Runtime,
24    rt_named:         HashMap<String, Runtime>,
25    tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
26}
27
28#[derive(Debug, thiserror::Error)]
29pub enum RtCreateError {
30    #[error("runtime config error: {}", _0)]
31    RuntimeConfigError(crate::config::ValidationError<Mm1NodeConfig>),
32    #[error("runtime init error: {}", _0)]
33    RuntimeInitError(#[source] std::io::Error),
34}
35
36#[derive(Debug, thiserror::Error)]
37pub enum RtRunError {
38    #[error("request address error: {}", _0)]
39    RequestAddressError(
40        #[allow(private_interfaces)]
41        #[source]
42        RequestAddressError,
43    ),
44    #[error("container error: {}", _0)]
45    #[allow(private_interfaces)]
46    ContainerError(#[source] ContainerError),
47}
48
49impl Rt {
50    pub fn create(config: Mm1NodeConfig) -> Result<Self, RtCreateError> {
51        let config = config
52            .validate()
53            .map_err(RtCreateError::RuntimeConfigError)?;
54        let (rt_default, rt_named) = config
55            .build_runtimes()
56            .map_err(RtCreateError::RuntimeInitError)?;
57
58        let (tx_actor_failure, _rx_actor_failure) = mpsc::unbounded_channel();
59        Ok(Self {
60            config,
61            rt_default,
62            rt_named,
63            tx_actor_failure,
64        })
65    }
66
67    pub fn with_actor_failure_sink(
68        self,
69        tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
70    ) -> Self {
71        Self {
72            tx_actor_failure,
73            ..self
74        }
75    }
76
77    pub fn run(&self, main_actor: BoxedRunnable<context::ActorContext>) -> Result<(), RtRunError> {
78        let config = self.config.clone();
79        let rt_default = self.rt_default.handle().to_owned();
80        let rt_named = self
81            .rt_named
82            .iter()
83            .map(|(k, v)| (k.to_owned(), v.handle().to_owned()))
84            .collect::<HashMap<_, _>>();
85
86        let rt_handle = rt_default.clone();
87
88        let init_actor_args = InitActorArgs {
89            local_subnet_auto: config.local_subnet_address_auto(),
90            local_subnets_bind: config.local_subnet_addresses_bind().collect(),
91            #[cfg(feature = "multinode")]
92            multinode_inbound: config.multinode_inbound().collect(),
93            #[cfg(feature = "multinode")]
94            multinode_outbound: config.multinode_outbound().collect(),
95        };
96
97        rt_handle.block_on(run_inner(
98            config.clone(),
99            ActorKey::root(),
100            crate::init::init_actor_config(),
101            rt_default,
102            rt_named,
103            local::boxed_from_fn((crate::init::run, (main_actor, init_actor_args))),
104            self.tx_actor_failure.clone(),
105        ))
106    }
107}
108
109#[instrument(skip_all, fields(func = main_actor.func_name()))]
110async fn run_inner(
111    config: Valid<Mm1NodeConfig>,
112    actor_key: ActorKey,
113    actor_config: impl EffectiveActorConfig,
114    rt_default: Handle,
115    rt_named: HashMap<String, Handle>,
116    main_actor: BoxedRunnable<context::ActorContext>,
117    tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
118) -> Result<(), RtRunError> {
119    let rt_api = RtApi::create(config.local_subnet_address_auto(), rt_default, rt_named);
120
121    let subnet_lease = rt_api
122        .request_address(actor_config.netmask())
123        .await
124        .map_err(RtRunError::RequestAddressError)?;
125
126    let args = ContainerArgs {
127        ack_to: None,
128        link_to: Default::default(),
129        actor_key,
130        trace_id: TraceId::random(),
131        subnet_lease,
132        rt_api: rt_api.clone(),
133        rt_config: Arc::new(config),
134    };
135    let exit_reason = Container::create(args, main_actor, tx_actor_failure)
136        .map_err(RtRunError::ContainerError)?
137        .run()
138        .await
139        .map_err(RtRunError::ContainerError)?;
140
141    if let Err(failure) = exit_reason {
142        error!(
143            "main-actor failure: {}",
144            failure
145                .chain()
146                .map(|reason| reason.to_string())
147                .collect::<Vec<_>>()
148                .join(" <- ")
149        );
150    }
151
152    Ok(())
153}