hyperion_framework/containerisation/
hyperion_container.rs

1// -------------------------------------------------------------------------------------------------
2// Hyperion Framework
3// https://github.com/Bazzz-1/hyperion-framework
4//
5// A lightweight component-based TCP framework for building service-oriented Rust applications with
6// CLI control, async messaging, and lifecycle management.
7//
8// Copyright 2025 Robert Hannah
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License at
13//
14//     http://www.apache.org/licenses/LICENSE-2.0
15//
16// Unless required by applicable law or agreed to in writing, software
17// distributed under the License is distributed on an "AS IS" BASIS,
18// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19// See the License for the specific language governing permissions and
20// limitations under the License.
21// -------------------------------------------------------------------------------------------------
22
23// Standard
24use std::fmt::Debug;
25use std::sync::Arc as StdArc;
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28// Package
29use serde::{Serialize, de::DeserializeOwned};
30use tokio::sync::{Notify, mpsc};
31use tokio::task;
32use tokio::time::{Duration, sleep};
33
34// Local
35use crate::containerisation::client_broker::ClientBroker;
36use crate::containerisation::container_state::ContainerState;
37use crate::containerisation::traits::{HyperionContainerDirectiveMessage, Run};
38use crate::messages::client_broker_message::ClientBrokerMessage;
39use crate::messages::container_directive::ContainerDirective;
40use crate::utilities::tx_sender::add_to_tx_with_retry;
41
42// A is the HyperionContainer Component template - must implement Initialisable and Run traits
43// T is the primary message type
44#[allow(dead_code)]
45pub struct HyperionContainer<A, T> {
46    component_archetype: A, // Can be used to restart the component task
47    component_handle: task::JoinHandle<()>,
48    container_state: StdArc<AtomicUsize>,
49    container_state_notify: StdArc<Notify>,
50    client_broker: ClientBroker<T>,
51    component_in_tx: mpsc::Sender<T>,
52    component_out_rx: mpsc::Receiver<ClientBrokerMessage<T>>,
53    main_rx: mpsc::Receiver<T>,
54    server_rx: mpsc::Receiver<T>,
55}
56
57// TODO: Implementations need separated out as not all implement all template types
58impl<A, T> HyperionContainer<A, T>
59where
60    A: Run<Message = T> + Send + 'static + Sync + Clone + Debug,
61    T: HyperionContainerDirectiveMessage
62        + Debug
63        + Send
64        + 'static
65        + DeserializeOwned
66        + Sync
67        + Clone
68        + Serialize,
69{
70    pub fn create(
71        component_archetype: A,
72        container_state: StdArc<AtomicUsize>,
73        container_state_notify: StdArc<Notify>,
74        client_broker: ClientBroker<T>,
75        main_rx: mpsc::Receiver<T>,
76        server_rx: mpsc::Receiver<T>,
77    ) -> Self {
78        log::info!("Starting Hyperion Container...");
79        let (component_in_tx, component_in_rx) = mpsc::channel::<T>(32);
80        let (component_out_tx, component_out_rx) = mpsc::channel::<ClientBrokerMessage<T>>(32);
81        let component_handle: task::JoinHandle<()> = HyperionContainer::start_component(
82            component_archetype.clone(),
83            component_in_rx,
84            component_out_tx,
85        );
86        Self {
87            component_archetype,
88            component_handle,
89            container_state,
90            container_state_notify,
91            client_broker,
92            component_in_tx,
93            component_out_rx,
94            main_rx,
95            server_rx,
96        }
97    }
98
99    fn start_component(
100        component_archetype: A,
101        component_in_rx: mpsc::Receiver<T>,
102        component_out_tx: mpsc::Sender<ClientBrokerMessage<T>>,
103    ) -> task::JoinHandle<()> {
104        let component = component_archetype.clone();
105        let component_task: task::JoinHandle<()> = {
106            task::spawn(async move {
107                component.run(component_in_rx, component_out_tx).await;
108            })
109        };
110        component_task
111    }
112
113    pub async fn run(&mut self) {
114        // HyperionContainer main loop
115        log::info!("Hyperion Container is running!");
116        loop {
117            // Check if Container is dying
118            let state = self.container_state.load(Ordering::SeqCst);
119            if state == ContainerState::ShuttingDown as usize
120                || state == ContainerState::DeadComponent as usize
121            {
122                log::info!("Container is shutting down...");
123                self.container_state
124                    .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
125                self.container_state_notify.notify_waiters();
126
127                // Allow time for comms to stop etc before stoppping main.rs
128                sleep(Duration::from_secs(3)).await;
129                self.container_state
130                    .store(ContainerState::Closed as usize, Ordering::SeqCst);
131                self.container_state_notify.notify_waiters();
132                break;
133            }
134
135            // Check Component task handle
136            if self.component_handle.is_finished() {
137                log::warn!("Component task has finished unexpectedly.");
138                self.container_state
139                    .store(ContainerState::DeadComponent as usize, Ordering::SeqCst);
140                self.container_state_notify.notify_waiters();
141            }
142
143            // Process incoming and outgoing messages
144            tokio::select! {
145                Some(message) = self.main_rx.recv() => {                // Messages from command line
146                    log::trace!("Container received message form console: {message:?}");
147                    self.process_incoming_message(message).await;
148                }
149                Some(message) = self.server_rx.recv() => {              // Messages from Server
150                    log::trace!("Container received message form server: {message:?}");
151                    self.process_incoming_message(message).await;
152                }
153                Some(message) = self.component_out_rx.recv() => {       // Messages from Component
154                    log::trace!("Container received message form Component: {message:?}");
155                    self.client_broker.handle_message(message).await;
156                }
157            }
158        }
159    }
160
161    async fn process_incoming_message(&mut self, message: T)
162    where
163        T: HyperionContainerDirectiveMessage + Debug + Clone + Send + 'static,
164    {
165        if let Some(container_directive) = message.get_container_directive_message() {
166            match container_directive {
167                ContainerDirective::Shutdown => {
168                    log::info!("Container received shutdown directive");
169                    // Set shutdown state
170                    self.container_state
171                        .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
172                    self.container_state_notify.notify_waiters();
173                }
174                ContainerDirective::SystemShutdown => {
175                    log::info!("Container received system shutdown directive");
176                    // Forward shutdown message
177                    self.client_broker.forward_shutdown(message.clone()).await;
178                    // Set shutdown state
179                    self.container_state
180                        .store(ContainerState::ShuttingDown as usize, Ordering::SeqCst);
181                    self.container_state_notify.notify_waiters();
182                    // Wait for clients to finish
183                    self.client_broker.shutdown().await;
184                }
185                _ => {
186                    log::warn!("Container received unmapped directive: {container_directive:?}");
187                }
188            }
189        } else {
190            // Send to component without inspection (ComponentDirective or non-generic message)
191            log::trace!("Forwarding non-framework message: {message:?}");
192            add_to_tx_with_retry(
193                &self.component_in_tx,
194                &message,
195                "Container main loop",
196                "Component main loop",
197            )
198            .await;
199        }
200    }
201}