hyperion_framework/containerisation/
hyperion_container.rs1use std::fmt::Debug;
25use std::sync::Arc as StdArc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28use serde::{Serialize, de::DeserializeOwned};
30use tokio::sync::{Notify, mpsc};
31use tokio::task;
32use tokio::time::{Duration, sleep};
33
34use crate::containerisation::client_broker::ClientBroker;
36use crate::containerisation::container_state::ContainerState;
37use crate::containerisation::traits::{HyperionContainerDirectiveMessage, Run};
38use crate::messages::client_broker_message::ClientBrokerMessage;
39use crate::messages::container_directive::ContainerDirective;
40use crate::utilities::tx_sender::add_to_tx_with_retry;
41
42#[allow(dead_code)]
45pub struct HyperionContainer<A, T> {
46 component_archetype: A, component_handle: task::JoinHandle<()>,
48 container_state: StdArc<AtomicUsize>,
49 container_state_notify: StdArc<Notify>,
50 client_broker: ClientBroker<T>,
51 component_in_tx: mpsc::Sender<T>,
52 component_out_rx: mpsc::Receiver<ClientBrokerMessage<T>>,
53 main_rx: mpsc::Receiver<T>,
54 server_rx: mpsc::Receiver<T>,
55}
56
57impl<A, T> HyperionContainer<A, T>
59where
60 A: Run<Message = T> + Send + 'static + Sync + Clone + Debug,
61 T: HyperionContainerDirectiveMessage
62 + Debug
63 + Send
64 + 'static
65 + DeserializeOwned
66 + Sync
67 + Clone
68 + Serialize,
69{
70 pub fn create(
71 component_archetype: A,
72 container_state: StdArc<AtomicUsize>,
73 container_state_notify: StdArc<Notify>,
74 client_broker: ClientBroker<T>,
75 main_rx: mpsc::Receiver<T>,
76 server_rx: mpsc::Receiver<T>,
77 ) -> Self {
78 log::info!("Starting Hyperion Container...");
79 let (component_in_tx, component_in_rx) = mpsc::channel::<T>(32);
80 let (component_out_tx, component_out_rx) = mpsc::channel::<ClientBrokerMessage<T>>(32);
81 let component_handle: task::JoinHandle<()> = HyperionContainer::start_component(
82 component_archetype.clone(),
83 component_in_rx,
84 component_out_tx,
85 );
86 Self {
87 component_archetype,
88 component_handle,
89 container_state,
90 container_state_notify,
91 client_broker,
92 component_in_tx,
93 component_out_rx,
94 main_rx,
95 server_rx,
96 }
97 }
98
99 fn start_component(
100 component_archetype: A,
101 component_in_rx: mpsc::Receiver<T>,
102 component_out_tx: mpsc::Sender<ClientBrokerMessage<T>>,
103 ) -> task::JoinHandle<()> {
104 let component = component_archetype.clone();
105 let component_task: task::JoinHandle<()> = {
106 task::spawn(async move {
107 component.run(component_in_rx, component_out_tx).await;
108 })
109 };
110 component_task
111 }
112
113 pub async fn run(&mut self) {
114 log::info!("Hyperion Container is running!");
116 loop {
117 let state = self.container_state.load(Ordering::SeqCst);
119 if state == ContainerState::ShuttingDown as usize
120 || state == ContainerState::DeadComponent as usize
121 {
122 log::info!("Container is shutting down...");
123 self.container_state
124 .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
125 self.container_state_notify.notify_waiters();
126
127 sleep(Duration::from_secs(3)).await;
129 self.container_state
130 .store(ContainerState::Closed as usize, Ordering::SeqCst);
131 self.container_state_notify.notify_waiters();
132 break;
133 }
134
135 if self.component_handle.is_finished() {
137 log::warn!("Component task has finished unexpectedly.");
138 self.container_state
139 .store(ContainerState::DeadComponent as usize, Ordering::SeqCst);
140 self.container_state_notify.notify_waiters();
141 }
142
143 tokio::select! {
145 Some(message) = self.main_rx.recv() => { log::trace!("Container received message form console: {message:?}");
147 self.process_incoming_message(message).await;
148 }
149 Some(message) = self.server_rx.recv() => { log::trace!("Container received message form server: {message:?}");
151 self.process_incoming_message(message).await;
152 }
153 Some(message) = self.component_out_rx.recv() => { log::trace!("Container received message form Component: {message:?}");
155 self.client_broker.handle_message(message).await;
156 }
157 }
158 }
159 }
160
161 async fn process_incoming_message(&mut self, message: T)
162 where
163 T: HyperionContainerDirectiveMessage + Debug + Clone + Send + 'static,
164 {
165 if let Some(container_directive) = message.get_container_directive_message() {
166 match container_directive {
167 ContainerDirective::Shutdown => {
168 log::info!("Container received shutdown directive");
169 self.container_state
171 .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
172 self.container_state_notify.notify_waiters();
173 }
174 ContainerDirective::SystemShutdown => {
175 log::info!("Container received system shutdown directive");
176 self.client_broker.forward_shutdown(message.clone()).await;
178 self.container_state
180 .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
181 self.container_state_notify.notify_waiters();
182 self.client_broker.shutdown().await;
184 }
185 _ => {
186 log::warn!("Container received unmapped directive: {container_directive:?}");
187 }
188 }
189 } else {
190 log::trace!("Forwarding non-framework message: {message:?}");
192 add_to_tx_with_retry(
193 &self.component_in_tx,
194 &message,
195 "Container main loop",
196 "Component main loop",
197 )
198 .await;
199 }
200 }
201}