hyperstack_server/
runtime.rs1use crate::bus::BusManager;
2use crate::config::ServerConfig;
3use crate::health::HealthMonitor;
4use crate::http_health::HttpHealthServer;
5use crate::projector::Projector;
6use crate::view::ViewIndex;
7use crate::websocket::WebSocketServer;
8use crate::Spec;
9use anyhow::Result;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tracing::{error, info};
13
14#[cfg(feature = "otel")]
15use crate::metrics::Metrics;
16
17async fn shutdown_signal() {
19 let ctrl_c = async {
20 tokio::signal::ctrl_c()
21 .await
22 .expect("Failed to install Ctrl+C handler");
23 };
24
25 #[cfg(unix)]
26 let terminate = async {
27 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
28 .expect("Failed to install SIGTERM handler")
29 .recv()
30 .await;
31 };
32
33 #[cfg(not(unix))]
34 let terminate = std::future::pending::<()>();
35
36 tokio::select! {
37 _ = ctrl_c => {
38 info!("Received SIGINT (Ctrl+C), initiating shutdown");
39 }
40 _ = terminate => {
41 info!("Received SIGTERM, initiating graceful shutdown");
42 }
43 }
44}
45
46pub struct Runtime {
48 config: ServerConfig,
49 view_index: Arc<ViewIndex>,
50 spec: Option<Spec>,
51 #[cfg(feature = "otel")]
52 metrics: Option<Arc<Metrics>>,
53}
54
55impl Runtime {
56 #[cfg(feature = "otel")]
57 pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
58 Self {
59 config,
60 view_index: Arc::new(view_index),
61 spec: None,
62 metrics,
63 }
64 }
65
66 #[cfg(not(feature = "otel"))]
67 pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
68 Self {
69 config,
70 view_index: Arc::new(view_index),
71 spec: None,
72 }
73 }
74
75 pub fn with_spec(mut self, spec: Spec) -> Self {
76 self.spec = Some(spec);
77 self
78 }
79
80 pub async fn run(self) -> Result<()> {
81 info!("Starting HyperStack runtime");
82
83 let (mutations_tx, mutations_rx) =
85 mpsc::channel::<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>(1024);
86
87 let bus_manager = BusManager::new();
88
89 let health_monitor = if let Some(health_config) = &self.config.health {
91 let monitor = HealthMonitor::new(health_config.clone());
92 let _health_task = monitor.start().await;
93 info!("Health monitoring enabled");
94 Some(monitor)
95 } else {
96 None
97 };
98
99 #[cfg(feature = "otel")]
101 let projector = Projector::new(
102 self.view_index.clone(),
103 bus_manager.clone(),
104 mutations_rx,
105 self.metrics.clone(),
106 );
107 #[cfg(not(feature = "otel"))]
108 let projector = Projector::new(self.view_index.clone(), bus_manager.clone(), mutations_rx);
109
110 let projector_handle = tokio::spawn(async move {
111 projector.run().await;
112 });
113
114 let ws_handle = if let Some(ws_config) = &self.config.websocket {
116 #[cfg(feature = "otel")]
117 let ws_server = WebSocketServer::new(
118 ws_config.bind_address,
119 bus_manager.clone(),
120 self.view_index.clone(),
121 self.metrics.clone(),
122 );
123 #[cfg(not(feature = "otel"))]
124 let ws_server = WebSocketServer::new(
125 ws_config.bind_address,
126 bus_manager.clone(),
127 self.view_index.clone(),
128 );
129
130 Some(tokio::spawn(async move {
131 if let Err(e) = ws_server.start().await {
132 error!("WebSocket server error: {}", e);
133 }
134 }))
135 } else {
136 None
137 };
138
139 let parser_handle = if let Some(spec) = self.spec {
141 if let Some(parser_setup) = spec.parser_setup {
142 info!(
143 "Starting Vixen parser runtime for program: {}",
144 spec.program_id
145 );
146 let tx = mutations_tx.clone();
147 let health = health_monitor.clone();
148 Some(tokio::spawn(async move {
149 if let Err(e) = parser_setup(tx, health).await {
150 error!("Vixen parser runtime error: {}", e);
151 }
152 }))
153 } else {
154 info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
155 None
156 }
157 } else {
158 info!("No spec provided - running in websocket-only mode");
159 None
160 };
161
162 let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
164 let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
165 if let Some(monitor) = health_monitor.clone() {
166 http_server = http_server.with_health_monitor(monitor);
167 }
168
169 Some(tokio::spawn(async move {
170 if let Err(e) = http_server.start().await {
171 error!("HTTP health server error: {}", e);
172 }
173 }))
174 } else {
175 None
176 };
177
178 info!("HyperStack runtime is running. Press Ctrl+C to stop.");
179
180 tokio::select! {
182 _ = async {
183 if let Some(handle) = ws_handle {
184 handle.await
185 } else {
186 std::future::pending().await
187 }
188 } => {
189 info!("WebSocket server task completed");
190 }
191 _ = projector_handle => {
192 info!("Projector task completed");
193 }
194 _ = async {
195 if let Some(handle) = parser_handle {
196 handle.await
197 } else {
198 std::future::pending().await
199 }
200 } => {
201 info!("Parser runtime task completed");
202 }
203 _ = async {
204 if let Some(handle) = http_health_handle {
205 handle.await
206 } else {
207 std::future::pending().await
208 }
209 } => {
210 info!("HTTP health server task completed");
211 }
212 _ = shutdown_signal() => {}
213 }
214
215 info!("Shutting down HyperStack runtime");
216 Ok(())
217 }
218}