hyperion_framework/containerisation/
hyperion_container.rs1use std::fmt::Debug;
25use std::sync::Arc as StdArc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28use serde::{Serialize, de::DeserializeOwned};
30use tokio::sync::{Notify, mpsc};
31use tokio::task;
32use tokio::time::{Duration, sleep};
33
34use crate::containerisation::client_broker::ClientBroker;
36use crate::containerisation::container_state::ContainerState;
37use crate::containerisation::traits::{HyperionContainerDirectiveMessage, Run};
38use crate::messages::client_broker_message::ClientBrokerMessage;
39use crate::messages::container_directive::ContainerDirective;
40use crate::utilities::tx_sender::add_to_tx_with_retry;
41
42#[allow(dead_code)]
45pub struct HyperionContainer<T> {
46 component_handle: task::JoinHandle<()>,
47 container_state: StdArc<AtomicUsize>,
48 container_state_notify: StdArc<Notify>,
49 client_broker: ClientBroker<T>,
50 component_in_tx: mpsc::Sender<T>,
51 component_out_rx: mpsc::Receiver<ClientBrokerMessage<T>>,
52 main_rx: mpsc::Receiver<T>,
53 server_rx: mpsc::Receiver<T>,
54}
55
56impl<T> HyperionContainer<T>
57where
58 T: HyperionContainerDirectiveMessage
59 + Debug
60 + Send
61 + 'static
62 + DeserializeOwned
63 + Sync
64 + Clone
65 + Serialize,
66{
67 pub fn create<A>(
68 component_archetype: A,
70 container_state: StdArc<AtomicUsize>,
71 container_state_notify: StdArc<Notify>,
72 client_broker: ClientBroker<T>,
73 main_rx: mpsc::Receiver<T>,
74 server_rx: mpsc::Receiver<T>,
75 ) -> Self
76 where
77 A: Run<Message = T> + Send + 'static + Sync + Debug,
78 {
79 log::info!("Starting Hyperion Container...");
80 let (component_in_tx, component_in_rx) = mpsc::channel::<T>(32);
81 let (component_out_tx, component_out_rx) = mpsc::channel::<ClientBrokerMessage<T>>(32);
82 let component_handle: task::JoinHandle<()> = HyperionContainer::start_component(
83 component_archetype,
84 component_in_rx,
85 component_out_tx,
86 );
87 Self {
88 component_handle,
89 container_state,
90 container_state_notify,
91 client_broker,
92 component_in_tx,
93 component_out_rx,
94 main_rx,
95 server_rx,
96 }
97 }
98
99 fn start_component<A>(
100 component_archetype: A,
101 component_in_rx: mpsc::Receiver<T>,
102 component_out_tx: mpsc::Sender<ClientBrokerMessage<T>>,
103 ) -> task::JoinHandle<()>
104 where
105 A: Run<Message = T> + Send + 'static + Sync + Debug,
106 {
107 let component_task: task::JoinHandle<()> = {
108 task::spawn(async move {
109 component_archetype
110 .run(component_in_rx, component_out_tx)
111 .await;
112 })
113 };
114 component_task
115 }
116
117 pub async fn run(&mut self) {
118 log::info!("Hyperion Container is running!");
120 loop {
121 let state = self.container_state.load(Ordering::SeqCst);
123 if state == ContainerState::ShuttingDown as usize
124 || state == ContainerState::DeadComponent as usize
125 {
126 log::info!("Container is shutting down...");
127 self.container_state
128 .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
129 self.container_state_notify.notify_waiters();
130
131 sleep(Duration::from_secs(3)).await;
133 self.container_state
134 .store(ContainerState::Closed as usize, Ordering::SeqCst);
135 self.container_state_notify.notify_waiters();
136 break;
137 }
138
139 if self.component_handle.is_finished() {
141 log::warn!("Component task has finished unexpectedly.");
142 self.container_state
143 .store(ContainerState::DeadComponent as usize, Ordering::SeqCst);
144 self.container_state_notify.notify_waiters();
145 }
146
147 tokio::select! {
149 Some(message) = self.main_rx.recv() => { log::trace!("Container received message form console: {message:?}");
151 self.process_incoming_message(message).await;
152 }
153 Some(message) = self.server_rx.recv() => { log::trace!("Container received message form server: {message:?}");
155 self.process_incoming_message(message).await;
156 }
157 Some(message) = self.component_out_rx.recv() => { log::trace!("Container received message form Component: {message:?}");
159 self.client_broker.handle_message(message).await;
160 }
161 }
162 }
163 }
164
165 async fn process_incoming_message(&mut self, message: T)
166 where
167 T: HyperionContainerDirectiveMessage + Debug + Clone + Send + 'static,
168 {
169 if let Some(container_directive) = message.get_container_directive_message() {
170 match container_directive {
171 ContainerDirective::Shutdown => {
172 log::info!("Container received shutdown directive");
173 self.container_state
175 .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
176 self.container_state_notify.notify_waiters();
177 }
178 ContainerDirective::SystemShutdown => {
179 log::info!("Container received system shutdown directive");
180 self.client_broker.forward_shutdown(message.clone()).await;
182 self.container_state
184 .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
185 self.container_state_notify.notify_waiters();
186 self.client_broker.shutdown().await;
188 }
189 _ => {
190 log::warn!("Container received unmapped directive: {container_directive:?}");
191 }
192 }
193 } else {
194 log::trace!("Forwarding non-framework message: {message:?}");
196 add_to_tx_with_retry(
197 &self.component_in_tx,
198 &message,
199 "Container main loop",
200 "Component main loop",
201 )
202 .await;
203 }
204 }
205}