mm1_node/runtime/
rt.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use mm1_address::address::Address;
5use mm1_common::errors::chain::ExactTypeDisplayChainExt;
6use mm1_common::types::AnyError;
7use mm1_core::tracing::TraceId;
8use mm1_runnable::local::{self, BoxedRunnable};
9use tokio::runtime::{Handle, Runtime};
10use tokio::sync::mpsc;
11use tracing::{error, instrument, trace};
12
13use crate::actor_key::ActorKey;
14use crate::config::{EffectiveActorConfig, Mm1NodeConfig, Valid};
15use crate::init::InitActorArgs;
16use crate::runtime::container::{Container, ContainerArgs, ContainerError};
17use crate::runtime::context;
18use crate::runtime::rt_api::{RequestAddressError, RtApi};
19
20#[derive(Debug)]
21pub struct Rt {
22    #[allow(unused)]
23    config:           Valid<Mm1NodeConfig>,
24    rt_default:       Runtime,
25    rt_named:         HashMap<String, Runtime>,
26    tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
27}
28
29#[derive(Debug, thiserror::Error)]
30pub enum RtCreateError {
31    #[error("runtime config error: {}", _0)]
32    RuntimeConfigError(crate::config::ValidationError<Mm1NodeConfig>),
33    #[error("runtime init error: {}", _0)]
34    RuntimeInitError(#[source] std::io::Error),
35}
36
37#[derive(Debug, thiserror::Error)]
38pub enum RtRunError {
39    #[error("request address error: {}", _0)]
40    RequestAddressError(
41        #[allow(private_interfaces)]
42        #[source]
43        RequestAddressError,
44    ),
45    #[error("container error: {}", _0)]
46    #[allow(private_interfaces)]
47    ContainerError(#[source] ContainerError),
48}
49
50impl Rt {
51    pub fn create(config: Mm1NodeConfig) -> Result<Self, RtCreateError> {
52        let config = config
53            .validate()
54            .map_err(RtCreateError::RuntimeConfigError)?;
55        let (rt_default, rt_named) = config
56            .build_runtimes()
57            .map_err(RtCreateError::RuntimeInitError)?;
58
59        let (tx_actor_failure, _rx_actor_failure) = mpsc::unbounded_channel();
60        Ok(Self {
61            config,
62            rt_default,
63            rt_named,
64            tx_actor_failure,
65        })
66    }
67
68    pub fn with_actor_failure_sink(
69        self,
70        tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
71    ) -> Self {
72        Self {
73            tx_actor_failure,
74            ..self
75        }
76    }
77
78    pub fn run(&self, main_actor: BoxedRunnable<context::ActorContext>) -> Result<(), RtRunError> {
79        let config = self.config.clone();
80        let rt_default = self.rt_default.handle().to_owned();
81        let rt_named = self
82            .rt_named
83            .iter()
84            .map(|(k, v)| (k.to_owned(), v.handle().to_owned()))
85            .collect::<HashMap<_, _>>();
86
87        let rt_handle = rt_default.clone();
88
89        let init_actor_args = InitActorArgs {
90            local_subnet_auto: config.local_subnet_address_auto(),
91            local_subnets_bind: config.local_subnet_addresses_bind().collect(),
92            #[cfg(feature = "multinode")]
93            multinode_inbound: config.multinode_inbound().collect(),
94            #[cfg(feature = "multinode")]
95            multinode_outbound: config.multinode_outbound().collect(),
96        };
97
98        rt_handle.block_on(run_inner(
99            config.clone(),
100            ActorKey::root(),
101            crate::init::init_actor_config(),
102            rt_default,
103            rt_named,
104            local::boxed_from_fn((crate::init::run, (main_actor, init_actor_args))),
105            self.tx_actor_failure.clone(),
106        ))
107    }
108}
109
110#[instrument(skip_all, fields(func = main_actor.func_name()))]
111async fn run_inner(
112    config: Valid<Mm1NodeConfig>,
113    actor_key: ActorKey,
114    actor_config: impl EffectiveActorConfig,
115    rt_default: Handle,
116    rt_named: HashMap<String, Handle>,
117    main_actor: BoxedRunnable<context::ActorContext>,
118    tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
119) -> Result<(), RtRunError> {
120    let rt_api = RtApi::create(config.local_subnet_address_auto(), rt_default, rt_named);
121
122    let subnet_lease = rt_api
123        .request_address(actor_config.netmask())
124        .await
125        .map_err(RtRunError::RequestAddressError)?;
126
127    let args = ContainerArgs {
128        ack_to: None,
129        link_to: Default::default(),
130        actor_key,
131        trace_id: TraceId::random(),
132        subnet_lease,
133        rt_api: rt_api.clone(),
134        rt_config: Arc::new(config),
135    };
136    trace!("creating and running container...");
137    let exit_reason = Container::create(args, main_actor, tx_actor_failure)
138        .map_err(RtRunError::ContainerError)?
139        .run()
140        .await
141        .map_err(RtRunError::ContainerError)?;
142
143    trace!(reason = ?exit_reason.as_ref().map_err(|e| e.as_display_chain()), "container exited");
144
145    if let Err(failure) = exit_reason {
146        error!(
147            error = %failure.as_display_chain(),
148            "main-actor failure"
149        );
150    }
151
152    Ok(())
153}