Skip to main content

hyperion_framework/containerisation/
hyperion_container.rs

1// -------------------------------------------------------------------------------------------------
2// Hyperion Framework
3// https://github.com/robert-hannah/hyperion-framework
4//
5// A lightweight component-based TCP framework for building service-oriented Rust applications with
6// CLI control, async messaging, and lifecycle management.
7//
8// Copyright 2025 Robert Hannah
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License at
13//
14//     http://www.apache.org/licenses/LICENSE-2.0
15//
16// Unless required by applicable law or agreed to in writing, software
17// distributed under the License is distributed on an "AS IS" BASIS,
18// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19// See the License for the specific language governing permissions and
20// limitations under the License.
21// -------------------------------------------------------------------------------------------------
22
23// Standard
24use std::fmt::Debug;
25use std::sync::Arc as StdArc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28// Package
29use serde::{Serialize, de::DeserializeOwned};
30use tokio::sync::{Notify, mpsc};
31use tokio::task;
32use tokio::time::{Duration, sleep};
33
34// Local
35use crate::containerisation::client_broker::ClientBroker;
36use crate::containerisation::container_state::ContainerState;
37use crate::containerisation::traits::{HyperionContainerDirectiveMessage, Run};
38use crate::messages::client_broker_message::ClientBrokerMessage;
39use crate::messages::container_directive::ContainerDirective;
40use crate::utilities::tx_sender::add_to_tx_with_retry;
41
42// A is the HyperionContainer Component template - must implement Initialisable and Run traits
43// T is the primary message type
44#[allow(dead_code)]
45pub struct HyperionContainer<T> {
46    component_handle: task::JoinHandle<()>,
47    container_state: StdArc<AtomicUsize>,
48    container_state_notify: StdArc<Notify>,
49    client_broker: ClientBroker<T>,
50    component_in_tx: mpsc::Sender<T>,
51    component_out_rx: mpsc::Receiver<ClientBrokerMessage<T>>,
52    main_rx: mpsc::Receiver<T>,
53    server_rx: mpsc::Receiver<T>,
54}
55
56impl<T> HyperionContainer<T>
57where
58    T: HyperionContainerDirectiveMessage
59        + Debug
60        + Send
61        + 'static
62        + DeserializeOwned
63        + Sync
64        + Clone
65        + Serialize,
66{
67    pub fn create<A>(
68        // TODO: Use component_archetype for component restart? Can we store a clean one inside the container without enforcing clone?
69        component_archetype: A,
70        container_state: StdArc<AtomicUsize>,
71        container_state_notify: StdArc<Notify>,
72        client_broker: ClientBroker<T>,
73        main_rx: mpsc::Receiver<T>,
74        server_rx: mpsc::Receiver<T>,
75    ) -> Self
76    where
77        A: Run<Message = T> + Send + 'static + Sync + Debug,
78    {
79        log::info!("Starting Hyperion Container...");
80        let (component_in_tx, component_in_rx) = mpsc::channel::<T>(32);
81        let (component_out_tx, component_out_rx) = mpsc::channel::<ClientBrokerMessage<T>>(32);
82        let component_handle: task::JoinHandle<()> = HyperionContainer::start_component(
83            component_archetype,
84            component_in_rx,
85            component_out_tx,
86        );
87        Self {
88            component_handle,
89            container_state,
90            container_state_notify,
91            client_broker,
92            component_in_tx,
93            component_out_rx,
94            main_rx,
95            server_rx,
96        }
97    }
98
99    fn start_component<A>(
100        component_archetype: A,
101        component_in_rx: mpsc::Receiver<T>,
102        component_out_tx: mpsc::Sender<ClientBrokerMessage<T>>,
103    ) -> task::JoinHandle<()>
104    where
105        A: Run<Message = T> + Send + 'static + Sync + Debug,
106    {
107        let component_task: task::JoinHandle<()> = {
108            task::spawn(async move {
109                component_archetype
110                    .run(component_in_rx, component_out_tx)
111                    .await;
112            })
113        };
114        component_task
115    }
116
117    pub async fn run(&mut self) {
118        // HyperionContainer main loop
119        log::info!("Hyperion Container is running!");
120        loop {
121            // Check if Container is dying
122            let state = self.container_state.load(Ordering::SeqCst);
123            if state == ContainerState::ShuttingDown as usize
124                || state == ContainerState::DeadComponent as usize
125            {
126                log::info!("Container is shutting down...");
127                self.container_state
128                    .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
129                self.container_state_notify.notify_waiters();
130
131                // Allow time for comms to stop etc before stoppping main.rs
132                sleep(Duration::from_secs(3)).await;
133                self.container_state
134                    .store(ContainerState::Closed as usize, Ordering::SeqCst);
135                self.container_state_notify.notify_waiters();
136                break;
137            }
138
139            // Check Component task handle
140            if self.component_handle.is_finished() {
141                log::warn!("Component task has finished unexpectedly.");
142                self.container_state
143                    .store(ContainerState::DeadComponent as usize, Ordering::SeqCst);
144                self.container_state_notify.notify_waiters();
145            }
146
147            // Process incoming and outgoing messages
148            tokio::select! {
149                Some(message) = self.main_rx.recv() => {                // Messages from command line
150                    log::trace!("Container received message form console: {message:?}");
151                    self.process_incoming_message(message).await;
152                }
153                Some(message) = self.server_rx.recv() => {              // Messages from Server
154                    log::trace!("Container received message form server: {message:?}");
155                    self.process_incoming_message(message).await;
156                }
157                Some(message) = self.component_out_rx.recv() => {       // Messages from Component
158                    log::trace!("Container received message form Component: {message:?}");
159                    self.client_broker.handle_message(message).await;
160                }
161            }
162        }
163    }
164
165    async fn process_incoming_message(&mut self, message: T)
166    where
167        T: HyperionContainerDirectiveMessage + Debug + Clone + Send + 'static,
168    {
169        if let Some(container_directive) = message.get_container_directive_message() {
170            match container_directive {
171                ContainerDirective::Shutdown => {
172                    log::info!("Container received shutdown directive");
173                    // Set shutdown state
174                    self.container_state
175                        .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
176                    self.container_state_notify.notify_waiters();
177                }
178                ContainerDirective::SystemShutdown => {
179                    log::info!("Container received system shutdown directive");
180                    // Forward shutdown message
181                    self.client_broker.forward_shutdown(message.clone()).await;
182                    // Set shutdown state
183                    self.container_state
184                        .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
185                    self.container_state_notify.notify_waiters();
186                    // Wait for clients to finish
187                    self.client_broker.shutdown().await;
188                }
189                _ => {
190                    log::warn!("Container received unmapped directive: {container_directive:?}");
191                }
192            }
193        } else {
194            // Send to component without inspection (ComponentDirective or non-generic message)
195            log::trace!("Forwarding non-framework message: {message:?}");
196            add_to_tx_with_retry(
197                &self.component_in_tx,
198                &message,
199                "Container main loop",
200                "Component main loop",
201            )
202            .await;
203        }
204    }
205}