1use std::collections::HashMap;
2use std::sync::Arc;
3
4use mm1_address::address::Address;
5use mm1_common::types::AnyError;
6use mm1_core::tracing::TraceId;
7use mm1_runnable::local::{self, BoxedRunnable};
8use tokio::runtime::{Handle, Runtime};
9use tokio::sync::mpsc;
10use tracing::{error, instrument};
11
12use crate::actor_key::ActorKey;
13use crate::config::{EffectiveActorConfig, Mm1NodeConfig, Valid};
14use crate::init::InitActorArgs;
15use crate::runtime::container::{Container, ContainerArgs, ContainerError};
16use crate::runtime::context;
17use crate::runtime::rt_api::{RequestAddressError, RtApi};
18
19#[derive(Debug)]
20pub struct Rt {
21 #[allow(unused)]
22 config: Valid<Mm1NodeConfig>,
23 rt_default: Runtime,
24 rt_named: HashMap<String, Runtime>,
25 tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
26}
27
28#[derive(Debug, thiserror::Error)]
29pub enum RtCreateError {
30 #[error("runtime config error: {}", _0)]
31 RuntimeConfigError(crate::config::ValidationError<Mm1NodeConfig>),
32 #[error("runtime init error: {}", _0)]
33 RuntimeInitError(#[source] std::io::Error),
34}
35
36#[derive(Debug, thiserror::Error)]
37pub enum RtRunError {
38 #[error("request address error: {}", _0)]
39 RequestAddressError(
40 #[allow(private_interfaces)]
41 #[source]
42 RequestAddressError,
43 ),
44 #[error("container error: {}", _0)]
45 #[allow(private_interfaces)]
46 ContainerError(#[source] ContainerError),
47}
48
49impl Rt {
50 pub fn create(config: Mm1NodeConfig) -> Result<Self, RtCreateError> {
51 let config = config
52 .validate()
53 .map_err(RtCreateError::RuntimeConfigError)?;
54 let (rt_default, rt_named) = config
55 .build_runtimes()
56 .map_err(RtCreateError::RuntimeInitError)?;
57
58 let (tx_actor_failure, _rx_actor_failure) = mpsc::unbounded_channel();
59 Ok(Self {
60 config,
61 rt_default,
62 rt_named,
63 tx_actor_failure,
64 })
65 }
66
67 pub fn with_actor_failure_sink(
68 self,
69 tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
70 ) -> Self {
71 Self {
72 tx_actor_failure,
73 ..self
74 }
75 }
76
77 pub fn run(&self, main_actor: BoxedRunnable<context::ActorContext>) -> Result<(), RtRunError> {
78 let config = self.config.clone();
79 let rt_default = self.rt_default.handle().to_owned();
80 let rt_named = self
81 .rt_named
82 .iter()
83 .map(|(k, v)| (k.to_owned(), v.handle().to_owned()))
84 .collect::<HashMap<_, _>>();
85
86 let rt_handle = rt_default.clone();
87
88 let init_actor_args = InitActorArgs {
89 local_subnet_auto: config.local_subnet_address_auto(),
90 local_subnets_bind: config.local_subnet_addresses_bind().collect(),
91 #[cfg(feature = "multinode")]
92 multinode_inbound: config.multinode_inbound().collect(),
93 #[cfg(feature = "multinode")]
94 multinode_outbound: config.multinode_outbound().collect(),
95 };
96
97 rt_handle.block_on(run_inner(
98 config.clone(),
99 ActorKey::root(),
100 crate::init::init_actor_config(),
101 rt_default,
102 rt_named,
103 local::boxed_from_fn((crate::init::run, (main_actor, init_actor_args))),
104 self.tx_actor_failure.clone(),
105 ))
106 }
107}
108
109#[instrument(skip_all, fields(func = main_actor.func_name()))]
110async fn run_inner(
111 config: Valid<Mm1NodeConfig>,
112 actor_key: ActorKey,
113 actor_config: impl EffectiveActorConfig,
114 rt_default: Handle,
115 rt_named: HashMap<String, Handle>,
116 main_actor: BoxedRunnable<context::ActorContext>,
117 tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
118) -> Result<(), RtRunError> {
119 let rt_api = RtApi::create(config.local_subnet_address_auto(), rt_default, rt_named);
120
121 let subnet_lease = rt_api
122 .request_address(actor_config.netmask())
123 .await
124 .map_err(RtRunError::RequestAddressError)?;
125
126 let args = ContainerArgs {
127 ack_to: None,
128 link_to: Default::default(),
129 actor_key,
130 trace_id: TraceId::random(),
131 subnet_lease,
132 rt_api: rt_api.clone(),
133 rt_config: Arc::new(config),
134 };
135 let exit_reason = Container::create(args, main_actor, tx_actor_failure)
136 .map_err(RtRunError::ContainerError)?
137 .run()
138 .await
139 .map_err(RtRunError::ContainerError)?;
140
141 if let Err(failure) = exit_reason {
142 error!(
143 "main-actor failure: {}",
144 failure
145 .chain()
146 .map(|reason| reason.to_string())
147 .collect::<Vec<_>>()
148 .join(" <- ")
149 );
150 }
151
152 Ok(())
153}