hyperion_framework/containerisation/
hyperion_container_factory.rs1use std::fmt::Debug;
25use std::fs;
26use std::path::PathBuf;
27use std::str::FromStr;
28use std::sync::{Arc as StdArc, atomic::AtomicUsize};
29
30use log::LevelFilter;
32use serde::{Serialize, de::DeserializeOwned};
33use tokio::sync::{Notify, mpsc};
34use tokio::task;
35use tokio::time::{Duration, sleep};
36
37use crate::containerisation::client_broker::ClientBroker;
39use crate::containerisation::hyperion_container::HyperionContainer;
40use crate::containerisation::traits::{
41 ContainerIdentidy, HyperionContainerDirectiveMessage, Initialisable, LogLevel, Run,
42};
43use crate::logging::logging_service::initialise_logger;
44use crate::network::network_topology::NetworkTopology;
45use crate::network::server::Server;
46use crate::utilities::load_config;
47
48pub async fn create<A, C, T>(
52 config_path_str: &str,
53 network_topology_path_str: &str,
54 container_state: StdArc<AtomicUsize>,
55 container_state_notify: StdArc<Notify>,
56 main_rx: mpsc::Receiver<T>,
57) -> HyperionContainer<T>
58where
59 A: Initialisable<ConfigType = C> + Run<Message = T> + Send + 'static + Sync + Debug,
60 C: Debug + Send + 'static + DeserializeOwned + Sync + LogLevel + ContainerIdentidy,
61 T: HyperionContainerDirectiveMessage
62 + Debug
63 + Send
64 + 'static
65 + DeserializeOwned
66 + Sync
67 + Clone
68 + Serialize,
69{
70 let config_path: PathBuf = fs::canonicalize(config_path_str)
72 .unwrap_or_else(|e| panic!("Could not canonicalize '{config_path_str}': {e}"));
73 let component_config: StdArc<C> = load_config::load_config::<C>(&config_path)
74 .unwrap_or_else(|e| panic!("Failed to load component config from '{config_path:?}': {e}"));
75 let network_topology_path: PathBuf = fs::canonicalize(network_topology_path_str)
76 .unwrap_or_else(|e| panic!("Could not canonicalize '{network_topology_path_str}': {e}"));
77 let network_topology: StdArc<NetworkTopology> =
78 load_config::load_config::<NetworkTopology>(&network_topology_path).unwrap_or_else(|e| {
79 panic!("Failed to load network topology from '{network_topology_path:?}': {e}")
80 });
81
82 let log_level: LevelFilter = LevelFilter::from_str(component_config.log_level())
84 .unwrap_or_else(|e| {
85 println!("Log level was not parsed correctly: {e:?}\nDefaulting to 'Trace' log level.");
87 LevelFilter::Trace
88 });
89 initialise_logger(log_level).unwrap_or_else(|e| panic!("Failed to initialise logger: {e:?}"));
90
91 for (key, value) in component_config.container_identity().iter() {
93 log::debug!("{key}: {value}");
94 }
95
96 log::info!(
98 "Building Hyperion Container for {}...",
99 component_config
100 .container_identity()
101 .get("name")
102 .unwrap_or(&"Unknown".to_string())
103 );
104
105 let component_archetype = A::initialise(
107 container_state.clone(),
108 container_state_notify.clone(),
109 component_config.clone(),
110 );
111
112 let (server_tx, server_rx) = mpsc::channel::<T>(32);
114 let arc_server: StdArc<Server<T>> = Server::new(
115 network_topology.server_address.clone(),
116 server_tx,
117 container_state.clone(),
118 container_state_notify.clone(),
119 );
120 task::spawn(async move {
121 if let Err(e) = Server::run(arc_server).await {
123 log::error!("Server encountered an error: {e:?}");
124 }
125 });
126
127 sleep(Duration::from_secs(2)).await;
129
130 let client_broker: ClientBroker<T> = ClientBroker::init(
132 network_topology,
133 container_state.clone(),
134 container_state_notify.clone(),
135 );
136
137 sleep(Duration::from_secs(2)).await;
139
140 HyperionContainer::<T>::create(
142 component_archetype,
143 container_state,
144 container_state_notify,
145 client_broker,
146 main_rx,
147 server_rx,
148 )
149}