hyperstack_server/
runtime.rs1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::materialized_view::MaterializedViewRegistry;
7use crate::mutation_batch::MutationBatch;
8use crate::projector::Projector;
9use crate::view::ViewIndex;
10use crate::websocket::client_manager::RateLimitConfig;
11use crate::websocket::WebSocketServer;
12use crate::Spec;
13use crate::WebSocketAuthPlugin;
14use crate::WebSocketUsageEmitter;
15use anyhow::Result;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::mpsc;
19use tracing::{error, info, info_span, Instrument};
20
21#[cfg(feature = "otel")]
22use crate::metrics::Metrics;
23
24async fn shutdown_signal() {
26 let ctrl_c = async {
27 tokio::signal::ctrl_c()
28 .await
29 .expect("Failed to install Ctrl+C handler");
30 };
31
32 #[cfg(unix)]
33 let terminate = async {
34 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
35 .expect("Failed to install SIGTERM handler")
36 .recv()
37 .await;
38 };
39
40 #[cfg(not(unix))]
41 let terminate = std::future::pending::<()>();
42
43 tokio::select! {
44 _ = ctrl_c => {
45 info!("Received SIGINT (Ctrl+C), initiating shutdown");
46 }
47 _ = terminate => {
48 info!("Received SIGTERM, initiating graceful shutdown");
49 }
50 }
51}
52
53pub struct Runtime {
54 config: ServerConfig,
55 view_index: Arc<ViewIndex>,
56 spec: Option<Spec>,
57 materialized_views: Option<MaterializedViewRegistry>,
58 websocket_auth_plugin: Option<Arc<dyn WebSocketAuthPlugin>>,
59 websocket_usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
60 websocket_max_clients: Option<usize>,
61 websocket_rate_limit_config: Option<RateLimitConfig>,
62 #[cfg(feature = "otel")]
63 metrics: Option<Arc<Metrics>>,
64}
65
66impl Runtime {
67 #[cfg(feature = "otel")]
68 pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
69 Self {
70 config,
71 view_index: Arc::new(view_index),
72 spec: None,
73 materialized_views: None,
74 websocket_auth_plugin: None,
75 websocket_usage_emitter: None,
76 websocket_max_clients: None,
77 websocket_rate_limit_config: None,
78 metrics,
79 }
80 }
81
82 #[cfg(not(feature = "otel"))]
83 pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
84 Self {
85 config,
86 view_index: Arc::new(view_index),
87 spec: None,
88 materialized_views: None,
89 websocket_auth_plugin: None,
90 websocket_usage_emitter: None,
91 websocket_max_clients: None,
92 websocket_rate_limit_config: None,
93 }
94 }
95
96 pub fn with_spec(mut self, spec: Spec) -> Self {
97 self.spec = Some(spec);
98 self
99 }
100
101 pub fn with_materialized_views(mut self, registry: MaterializedViewRegistry) -> Self {
102 self.materialized_views = Some(registry);
103 self
104 }
105
106 pub fn with_websocket_auth_plugin(
107 mut self,
108 websocket_auth_plugin: Arc<dyn WebSocketAuthPlugin>,
109 ) -> Self {
110 self.websocket_auth_plugin = Some(websocket_auth_plugin);
111 self
112 }
113
114 pub fn with_websocket_usage_emitter(
115 mut self,
116 websocket_usage_emitter: Arc<dyn WebSocketUsageEmitter>,
117 ) -> Self {
118 self.websocket_usage_emitter = Some(websocket_usage_emitter);
119 self
120 }
121
122 pub fn with_websocket_max_clients(mut self, websocket_max_clients: usize) -> Self {
123 self.websocket_max_clients = Some(websocket_max_clients);
124 self
125 }
126
127 pub fn with_websocket_rate_limit_config(mut self, config: RateLimitConfig) -> Self {
133 self.websocket_rate_limit_config = Some(config);
134 self
135 }
136
137 pub async fn run(self) -> Result<()> {
138 info!("Starting HyperStack runtime");
139
140 let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
141
142 let bus_manager = BusManager::new();
143 let entity_cache = EntityCache::new();
144
145 let health_monitor = if let Some(health_config) = &self.config.health {
146 let monitor = HealthMonitor::new(health_config.clone());
147 let _health_task = monitor.start().await;
148 info!("Health monitoring enabled");
149 Some(monitor)
150 } else {
151 None
152 };
153
154 #[cfg(feature = "otel")]
155 let projector = Projector::new(
156 self.view_index.clone(),
157 bus_manager.clone(),
158 entity_cache.clone(),
159 mutations_rx,
160 self.metrics.clone(),
161 );
162 #[cfg(not(feature = "otel"))]
163 let projector = Projector::new(
164 self.view_index.clone(),
165 bus_manager.clone(),
166 entity_cache.clone(),
167 mutations_rx,
168 );
169
170 let projector_handle = tokio::spawn(
171 async move {
172 projector.run().await;
173 }
174 .instrument(info_span!("projector")),
175 );
176
177 let ws_handle = if let Some(ws_config) = &self.config.websocket {
178 #[cfg(feature = "otel")]
179 let mut ws_server = WebSocketServer::new(
180 ws_config.bind_address,
181 bus_manager.clone(),
182 entity_cache.clone(),
183 self.view_index.clone(),
184 self.metrics.clone(),
185 );
186 #[cfg(not(feature = "otel"))]
187 let mut ws_server = WebSocketServer::new(
188 ws_config.bind_address,
189 bus_manager.clone(),
190 entity_cache.clone(),
191 self.view_index.clone(),
192 );
193
194 if let Some(max_clients) = self.websocket_max_clients {
195 ws_server = ws_server.with_max_clients(max_clients);
196 }
197
198 if let Some(plugin) = self.websocket_auth_plugin.clone() {
199 ws_server = ws_server.with_auth_plugin(plugin);
200 }
201
202 if let Some(emitter) = self.websocket_usage_emitter.clone() {
203 ws_server = ws_server.with_usage_emitter(emitter);
204 }
205
206 if let Some(rate_limit_config) = self.websocket_rate_limit_config {
207 ws_server = ws_server.with_rate_limit_config(rate_limit_config);
208 }
209
210 let bind_addr = ws_config.bind_address;
211 Some(tokio::spawn(
212 async move {
213 if let Err(e) = ws_server.start().await {
214 error!("WebSocket server error: {}", e);
215 }
216 }
217 .instrument(info_span!("ws.server", %bind_addr)),
218 ))
219 } else {
220 None
221 };
222
223 let parser_handle = if let Some(spec) = self.spec {
224 if let Some(parser_setup) = spec.parser_setup {
225 let program_id = spec
226 .program_ids
227 .first()
228 .cloned()
229 .unwrap_or_else(|| "unknown".to_string());
230 info!("Starting Vixen parser runtime for program: {}", program_id);
231 let tx = mutations_tx.clone();
232 let health = health_monitor.clone();
233 let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
234 Some(tokio::spawn(
235 async move {
236 if let Err(e) = parser_setup(tx, health, reconnection_config).await {
237 error!("Vixen parser runtime error: {}", e);
238 }
239 }
240 .instrument(info_span!("vixen.parser", %program_id)),
241 ))
242 } else {
243 info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
244 None
245 }
246 } else {
247 info!("No spec provided - running in websocket-only mode");
248 None
249 };
250
251 let _http_health_handle = if let Some(http_health_config) = &self.config.http_health {
256 let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
257 if let Some(monitor) = health_monitor.clone() {
258 http_server = http_server.with_health_monitor(monitor);
259 }
260
261 let bind_addr = http_health_config.bind_address;
262 let join_handle = std::thread::Builder::new()
263 .name("health-server".into())
264 .spawn(move || {
265 let rt = tokio::runtime::Builder::new_current_thread()
266 .enable_all()
267 .build()
268 .expect("Failed to create health server runtime");
269 rt.block_on(async move {
270 let _span = info_span!("http.health", %bind_addr).entered();
271 if let Err(e) = http_server.start().await {
272 error!("HTTP health server error: {}", e);
273 }
274 });
275 })
276 .expect("Failed to spawn health server thread");
277 info!(
278 "HTTP health server running on dedicated thread at {}",
279 bind_addr
280 );
281 Some(join_handle)
282 } else {
283 None
284 };
285
286 let bus_cleanup_handle = {
287 let bus = bus_manager.clone();
288 tokio::spawn(
289 async move {
290 let mut interval = tokio::time::interval(Duration::from_secs(60));
291 loop {
292 interval.tick().await;
293 let state_cleaned = bus.cleanup_stale_state_buses().await;
294 let list_cleaned = bus.cleanup_stale_list_buses().await;
295 if state_cleaned > 0 || list_cleaned > 0 {
296 let (state_count, list_count) = bus.bus_counts().await;
297 info!(
298 "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
299 state_cleaned, list_cleaned, state_count, list_count
300 );
301 }
302 }
303 }
304 .instrument(info_span!("bus.cleanup")),
305 )
306 };
307
308 let stats_handle = {
309 let bus = bus_manager.clone();
310 let cache = entity_cache.clone();
311 tokio::spawn(
312 async move {
313 let mut interval = tokio::time::interval(Duration::from_secs(30));
314 loop {
315 interval.tick().await;
316 let (_state_buses, _list_buses) = bus.bus_counts().await;
317 let _cache_stats = cache.stats().await;
318 }
319 }
320 .instrument(info_span!("stats.reporter")),
321 )
322 };
323
324 info!("HyperStack runtime is running. Press Ctrl+C to stop.");
325
326 tokio::select! {
328 _ = async {
329 if let Some(handle) = ws_handle {
330 handle.await
331 } else {
332 std::future::pending().await
333 }
334 } => {
335 info!("WebSocket server task completed");
336 }
337 _ = projector_handle => {
338 info!("Projector task completed");
339 }
340 _ = async {
341 if let Some(handle) = parser_handle {
342 handle.await
343 } else {
344 std::future::pending().await
345 }
346 } => {
347 info!("Parser runtime task completed");
348 }
349 _ = bus_cleanup_handle => {
350 info!("Bus cleanup task completed");
351 }
352 _ = stats_handle => {
353 info!("Stats reporter task completed");
354 }
355 _ = shutdown_signal() => {}
356 }
357
358 info!("Shutting down HyperStack runtime");
359 Ok(())
360 }
361}