1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::runtime::{Handle, Runtime};
5use tracing::{instrument, warn};
6
7use super::config::EffectiveActorConfig;
8use crate::runtime::actor_key::ActorKey;
9use crate::runtime::config::Mm1Config;
10use crate::runtime::container::{Container, ContainerArgs, ContainerError};
11use crate::runtime::context;
12use crate::runtime::rt_api::{RequestAddressError, RtApi};
13use crate::runtime::runnable::BoxedRunnable;
14
15#[derive(Debug)]
16pub struct Rt {
17 #[allow(unused)]
18 config: Mm1Config,
19
20 rt_default: Runtime,
21 rt_named: HashMap<String, Runtime>,
22}
23
24#[derive(Debug, thiserror::Error)]
25pub enum RtCreateError {
26 #[error("runtime config error: {}", _0)]
27 RuntimeConfigError(String),
28 #[error("runtime init error: {}", _0)]
29 RuntimeInitError(#[source] std::io::Error),
30}
31
32#[derive(Debug, thiserror::Error)]
33pub enum RtRunError {
34 #[error("request address error: {}", _0)]
35 RequestAddressError(
36 #[allow(private_interfaces)]
37 #[source]
38 RequestAddressError,
39 ),
40 #[error("container error: {}", _0)]
41 #[allow(private_interfaces)]
42 ContainerError(#[source] ContainerError),
43}
44
45impl Rt {
46 pub fn create(config: Mm1Config) -> Result<Self, RtCreateError> {
47 config
48 .validate()
49 .map_err(RtCreateError::RuntimeConfigError)?;
50 let (rt_default, rt_named) = config
51 .build_runtimes()
52 .map_err(RtCreateError::RuntimeInitError)?;
53
54 Ok(Self {
55 config,
56 rt_default,
57 rt_named,
58 })
59 }
60
61 pub fn run(&self, main_actor: BoxedRunnable<context::ActorContext>) -> Result<(), RtRunError> {
62 let config = self.config.clone();
63 let rt_default = self.rt_default.handle().to_owned();
64 let rt_named = self
65 .rt_named
66 .iter()
67 .map(|(k, v)| (k.to_owned(), v.handle().to_owned()))
68 .collect::<HashMap<_, _>>();
69
70 let main_actor_key = ActorKey::root().child(main_actor.func_name());
71 let main_actor_config = config.actor_config(&main_actor_key);
72 let rt_handle = if let Some(rt_key) = main_actor_config.runtime_key() {
73 rt_named
74 .get(rt_key)
75 .cloned()
76 .expect("the config's validity should have been checked")
77 } else {
78 rt_default.clone()
79 };
80
81 rt_handle.block_on(run_inner(
82 config.clone(),
83 main_actor_key,
84 main_actor_config,
85 rt_default,
86 rt_named,
87 main_actor,
88 ))
89 }
90}
91
92#[instrument(skip_all, fields(func = main_actor.func_name()))]
93async fn run_inner(
94 config: Mm1Config,
95 actor_key: ActorKey,
96 actor_config: impl EffectiveActorConfig,
97 rt_default: Handle,
98 rt_named: HashMap<String, Handle>,
99 main_actor: BoxedRunnable<context::ActorContext>,
100) -> Result<(), RtRunError> {
101 let rt_api = RtApi::create(config.subnet, rt_default, rt_named);
102
103 let subnet_lease = rt_api
104 .request_address(actor_config.netmask())
105 .await
106 .map_err(RtRunError::RequestAddressError)?;
107
108 let args = ContainerArgs {
109 ack_to: None,
110 link_to: Default::default(),
111 actor_key,
112
113 subnet_lease,
114 rt_api: rt_api.clone(),
115 rt_config: Arc::new(config),
116 };
117 let (_subnet_lease, exit_reason) = Container::create(args, main_actor)
118 .map_err(RtRunError::ContainerError)?
119 .run()
120 .await
121 .map_err(RtRunError::ContainerError)?;
122
123 if let Err(failure) = exit_reason {
124 warn!("main-actor failure: {}", failure);
125 }
126
127 Ok(())
128}