mm1_node/runtime/
rt.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use mm1_address::address::Address;
5use mm1_common::errors::chain::ExactTypeDisplayChainExt;
6use mm1_common::types::AnyError;
7use mm1_core::tap::{MessageTap, NoopTap};
8use mm1_core::tracing::TraceId;
9use mm1_runnable::local::{self, BoxedRunnable};
10use tokio::runtime::{Handle, Runtime};
11use tokio::sync::mpsc;
12use tracing::{error, instrument, trace};
13
14use crate::actor_key::ActorKey;
15use crate::config::{EffectiveActorConfig, Mm1NodeConfig, Valid};
16use crate::init::InitActorArgs;
17use crate::runtime::container::{Container, ContainerArgs, ContainerError};
18use crate::runtime::context;
19use crate::runtime::rt_api::{RequestAddressError, RtApi};
20
21#[derive(derive_more::Debug)]
22pub struct Rt {
23    #[allow(unused)]
24    config:           Valid<Mm1NodeConfig>,
25    rt_default:       Runtime,
26    rt_named:         HashMap<String, Runtime>,
27    tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
28    #[debug(skip)]
29    message_taps:     HashMap<String, Arc<dyn MessageTap>>,
30}
31
32#[derive(Debug, thiserror::Error)]
33pub enum RtCreateError {
34    #[error("runtime config error: {}", _0)]
35    RuntimeConfigError(crate::config::ValidationError<Mm1NodeConfig>),
36    #[error("runtime init error: {}", _0)]
37    RuntimeInitError(#[source] std::io::Error),
38}
39
40#[derive(Debug, thiserror::Error)]
41pub enum RtRunError {
42    #[error("request address error: {}", _0)]
43    RequestAddressError(
44        #[allow(private_interfaces)]
45        #[source]
46        RequestAddressError,
47    ),
48    #[error("container error: {}", _0)]
49    #[allow(private_interfaces)]
50    ContainerError(#[source] ContainerError),
51}
52
53impl Rt {
54    pub fn create(config: Mm1NodeConfig) -> Result<Self, RtCreateError> {
55        let config = config
56            .validate()
57            .map_err(RtCreateError::RuntimeConfigError)?;
58        let (rt_default, rt_named) = config
59            .build_runtimes()
60            .map_err(RtCreateError::RuntimeInitError)?;
61
62        let (tx_actor_failure, _rx_actor_failure) = mpsc::unbounded_channel();
63        Ok(Self {
64            config,
65            rt_default,
66            rt_named,
67            tx_actor_failure,
68            message_taps: Default::default(),
69        })
70    }
71
72    pub fn with_actor_failure_sink(
73        self,
74        tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
75    ) -> Self {
76        Self {
77            tx_actor_failure,
78            ..self
79        }
80    }
81
82    pub fn with_tap(mut self, key: impl Into<String>, tap: Arc<dyn MessageTap>) -> Self {
83        self.message_taps.insert(key.into(), tap);
84        self
85    }
86
87    pub fn run(&self, main_actor: BoxedRunnable<context::ActorContext>) -> Result<(), RtRunError> {
88        let config = self.config.clone();
89        let rt_default = self.rt_default.handle().to_owned();
90        let rt_named = self
91            .rt_named
92            .iter()
93            .map(|(k, v)| (k.to_owned(), v.handle().to_owned()))
94            .collect::<HashMap<_, _>>();
95
96        let rt_handle = rt_default.clone();
97
98        let init_actor_args = InitActorArgs {
99            local_subnet_auto: config.local_subnet_address_auto(),
100            local_subnets_bind: config.local_subnet_addresses_bind().collect(),
101            #[cfg(feature = "multinode")]
102            multinode_inbound: config.multinode_inbound().collect(),
103            #[cfg(feature = "multinode")]
104            multinode_outbound: config.multinode_outbound().collect(),
105        };
106
107        rt_handle.block_on(run_inner(
108            config.clone(),
109            ActorKey::root(),
110            crate::init::init_actor_config(),
111            rt_default,
112            rt_named,
113            local::boxed_from_fn((crate::init::run, (main_actor, init_actor_args))),
114            &self.message_taps,
115            self.tx_actor_failure.clone(),
116        ))
117    }
118}
119
120#[allow(clippy::too_many_arguments)]
121#[instrument(skip_all, fields(func = main_actor.func_name()))]
122async fn run_inner(
123    config: Valid<Mm1NodeConfig>,
124    actor_key: ActorKey,
125    actor_config: impl EffectiveActorConfig,
126    rt_default: Handle,
127    rt_named: HashMap<String, Handle>,
128    main_actor: BoxedRunnable<context::ActorContext>,
129    message_taps: &HashMap<String, Arc<dyn MessageTap>>,
130    tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
131) -> Result<(), RtRunError> {
132    let rt_api = RtApi::create(
133        config.local_subnet_address_auto(),
134        rt_default,
135        rt_named,
136        Arc::new(NoopTap),
137        message_taps.clone(),
138    );
139
140    let subnet_lease = rt_api
141        .request_address(actor_config.netmask())
142        .await
143        .map_err(RtRunError::RequestAddressError)?;
144    let message_tap_key = actor_config.message_tap_key();
145    let message_tap = rt_api.message_tap(message_tap_key);
146
147    let args = ContainerArgs {
148        ack_to: None,
149        link_to: Default::default(),
150        actor_key,
151        trace_id: TraceId::random(),
152        subnet_lease,
153        rt_api: rt_api.clone(),
154        rt_config: Arc::new(config),
155        message_tap,
156        tx_actor_failure,
157    };
158    trace!("creating and running container...");
159    let exit_reason = Container::create(args, main_actor)
160        .map_err(RtRunError::ContainerError)?
161        .run()
162        .await
163        .map_err(RtRunError::ContainerError)?;
164
165    trace!(reason = ?exit_reason.as_ref().map_err(|e| e.as_display_chain()), "container exited");
166
167    if let Err(failure) = exit_reason {
168        error!(
169            error = %failure.as_display_chain(),
170            "main-actor failure"
171        );
172    }
173
174    Ok(())
175}