hyperstack_server/
runtime.rs

1use crate::bus::BusManager;
2use crate::config::ServerConfig;
3use crate::health::HealthMonitor;
4use crate::http_health::HttpHealthServer;
5use crate::projector::Projector;
6use crate::view::ViewIndex;
7use crate::websocket::WebSocketServer;
8use crate::Spec;
9use anyhow::Result;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tracing::{error, info};
13
14#[cfg(feature = "otel")]
15use crate::metrics::Metrics;
16
17/// Wait for shutdown signal (SIGINT on all platforms, SIGTERM on Unix)
18async fn shutdown_signal() {
19    let ctrl_c = async {
20        tokio::signal::ctrl_c()
21            .await
22            .expect("Failed to install Ctrl+C handler");
23    };
24
25    #[cfg(unix)]
26    let terminate = async {
27        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
28            .expect("Failed to install SIGTERM handler")
29            .recv()
30            .await;
31    };
32
33    #[cfg(not(unix))]
34    let terminate = std::future::pending::<()>();
35
36    tokio::select! {
37        _ = ctrl_c => {
38            info!("Received SIGINT (Ctrl+C), initiating shutdown");
39        }
40        _ = terminate => {
41            info!("Received SIGTERM, initiating graceful shutdown");
42        }
43    }
44}
45
46/// Runtime orchestrator that manages all server components
47pub struct Runtime {
48    config: ServerConfig,
49    view_index: Arc<ViewIndex>,
50    spec: Option<Spec>,
51    #[cfg(feature = "otel")]
52    metrics: Option<Arc<Metrics>>,
53}
54
55impl Runtime {
56    #[cfg(feature = "otel")]
57    pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
58        Self {
59            config,
60            view_index: Arc::new(view_index),
61            spec: None,
62            metrics,
63        }
64    }
65
66    #[cfg(not(feature = "otel"))]
67    pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
68        Self {
69            config,
70            view_index: Arc::new(view_index),
71            spec: None,
72        }
73    }
74
75    pub fn with_spec(mut self, spec: Spec) -> Self {
76        self.spec = Some(spec);
77        self
78    }
79
80    pub async fn run(self) -> Result<()> {
81        info!("Starting HyperStack runtime");
82
83        // Create bounded mutations channel for Transform Library -> Projector communication
84        let (mutations_tx, mutations_rx) =
85            mpsc::channel::<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>(1024);
86
87        let bus_manager = BusManager::new();
88
89        // Initialize health monitor if configured
90        let health_monitor = if let Some(health_config) = &self.config.health {
91            let monitor = HealthMonitor::new(health_config.clone());
92            let _health_task = monitor.start().await;
93            info!("Health monitoring enabled");
94            Some(monitor)
95        } else {
96            None
97        };
98
99        // Start Projector task
100        #[cfg(feature = "otel")]
101        let projector = Projector::new(
102            self.view_index.clone(),
103            bus_manager.clone(),
104            mutations_rx,
105            self.metrics.clone(),
106        );
107        #[cfg(not(feature = "otel"))]
108        let projector = Projector::new(self.view_index.clone(), bus_manager.clone(), mutations_rx);
109
110        let projector_handle = tokio::spawn(async move {
111            projector.run().await;
112        });
113
114        // Start WebSocket server (if configured)
115        let ws_handle = if let Some(ws_config) = &self.config.websocket {
116            #[cfg(feature = "otel")]
117            let ws_server = WebSocketServer::new(
118                ws_config.bind_address,
119                bus_manager.clone(),
120                self.view_index.clone(),
121                self.metrics.clone(),
122            );
123            #[cfg(not(feature = "otel"))]
124            let ws_server = WebSocketServer::new(
125                ws_config.bind_address,
126                bus_manager.clone(),
127                self.view_index.clone(),
128            );
129
130            Some(tokio::spawn(async move {
131                if let Err(e) = ws_server.start().await {
132                    error!("WebSocket server error: {}", e);
133                }
134            }))
135        } else {
136            None
137        };
138
139        // Start Vixen parser/stream consumer (if spec with parser setup is provided)
140        let parser_handle = if let Some(spec) = self.spec {
141            if let Some(parser_setup) = spec.parser_setup {
142                info!(
143                    "Starting Vixen parser runtime for program: {}",
144                    spec.program_id
145                );
146                let tx = mutations_tx.clone();
147                let health = health_monitor.clone();
148                Some(tokio::spawn(async move {
149                    if let Err(e) = parser_setup(tx, health).await {
150                        error!("Vixen parser runtime error: {}", e);
151                    }
152                }))
153            } else {
154                info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
155                None
156            }
157        } else {
158            info!("No spec provided - running in websocket-only mode");
159            None
160        };
161
162        // Start HTTP health server (if configured)
163        let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
164            let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
165            if let Some(monitor) = health_monitor.clone() {
166                http_server = http_server.with_health_monitor(monitor);
167            }
168
169            Some(tokio::spawn(async move {
170                if let Err(e) = http_server.start().await {
171                    error!("HTTP health server error: {}", e);
172                }
173            }))
174        } else {
175            None
176        };
177
178        info!("HyperStack runtime is running. Press Ctrl+C to stop.");
179
180        // Wait for any task to complete (or handle shutdown signals)
181        tokio::select! {
182            _ = async {
183                if let Some(handle) = ws_handle {
184                    handle.await
185                } else {
186                    std::future::pending().await
187                }
188            } => {
189                info!("WebSocket server task completed");
190            }
191            _ = projector_handle => {
192                info!("Projector task completed");
193            }
194            _ = async {
195                if let Some(handle) = parser_handle {
196                    handle.await
197                } else {
198                    std::future::pending().await
199                }
200            } => {
201                info!("Parser runtime task completed");
202            }
203            _ = async {
204                if let Some(handle) = http_health_handle {
205                    handle.await
206                } else {
207                    std::future::pending().await
208                }
209            } => {
210                info!("HTTP health server task completed");
211            }
212            _ = shutdown_signal() => {}
213        }
214
215        info!("Shutting down HyperStack runtime");
216        Ok(())
217    }
218}