Skip to main content

hyperstack_server/
runtime.rs

1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::materialized_view::MaterializedViewRegistry;
7use crate::mutation_batch::MutationBatch;
8use crate::projector::Projector;
9use crate::view::ViewIndex;
10use crate::websocket::client_manager::RateLimitConfig;
11use crate::websocket::WebSocketServer;
12use crate::Spec;
13use crate::WebSocketAuthPlugin;
14use crate::WebSocketUsageEmitter;
15use anyhow::Result;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::mpsc;
19use tracing::{error, info, info_span, Instrument};
20
21#[cfg(feature = "otel")]
22use crate::metrics::Metrics;
23
24/// Wait for shutdown signal (SIGINT on all platforms, SIGTERM on Unix)
25async fn shutdown_signal() {
26    let ctrl_c = async {
27        tokio::signal::ctrl_c()
28            .await
29            .expect("Failed to install Ctrl+C handler");
30    };
31
32    #[cfg(unix)]
33    let terminate = async {
34        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
35            .expect("Failed to install SIGTERM handler")
36            .recv()
37            .await;
38    };
39
40    #[cfg(not(unix))]
41    let terminate = std::future::pending::<()>();
42
43    tokio::select! {
44        _ = ctrl_c => {
45            info!("Received SIGINT (Ctrl+C), initiating shutdown");
46        }
47        _ = terminate => {
48            info!("Received SIGTERM, initiating graceful shutdown");
49        }
50    }
51}
52
53pub struct Runtime {
54    config: ServerConfig,
55    view_index: Arc<ViewIndex>,
56    spec: Option<Spec>,
57    materialized_views: Option<MaterializedViewRegistry>,
58    websocket_auth_plugin: Option<Arc<dyn WebSocketAuthPlugin>>,
59    websocket_usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
60    websocket_max_clients: Option<usize>,
61    websocket_rate_limit_config: Option<RateLimitConfig>,
62    #[cfg(feature = "otel")]
63    metrics: Option<Arc<Metrics>>,
64}
65
66impl Runtime {
67    #[cfg(feature = "otel")]
68    pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
69        Self {
70            config,
71            view_index: Arc::new(view_index),
72            spec: None,
73            materialized_views: None,
74            websocket_auth_plugin: None,
75            websocket_usage_emitter: None,
76            websocket_max_clients: None,
77            websocket_rate_limit_config: None,
78            metrics,
79        }
80    }
81
82    #[cfg(not(feature = "otel"))]
83    pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
84        Self {
85            config,
86            view_index: Arc::new(view_index),
87            spec: None,
88            materialized_views: None,
89            websocket_auth_plugin: None,
90            websocket_usage_emitter: None,
91            websocket_max_clients: None,
92            websocket_rate_limit_config: None,
93        }
94    }
95
96    pub fn with_spec(mut self, spec: Spec) -> Self {
97        self.spec = Some(spec);
98        self
99    }
100
101    pub fn with_materialized_views(mut self, registry: MaterializedViewRegistry) -> Self {
102        self.materialized_views = Some(registry);
103        self
104    }
105
106    pub fn with_websocket_auth_plugin(
107        mut self,
108        websocket_auth_plugin: Arc<dyn WebSocketAuthPlugin>,
109    ) -> Self {
110        self.websocket_auth_plugin = Some(websocket_auth_plugin);
111        self
112    }
113
114    pub fn with_websocket_usage_emitter(
115        mut self,
116        websocket_usage_emitter: Arc<dyn WebSocketUsageEmitter>,
117    ) -> Self {
118        self.websocket_usage_emitter = Some(websocket_usage_emitter);
119        self
120    }
121
122    pub fn with_websocket_max_clients(mut self, websocket_max_clients: usize) -> Self {
123        self.websocket_max_clients = Some(websocket_max_clients);
124        self
125    }
126
127    /// Configure rate limiting for WebSocket connections.
128    ///
129    /// This sets global rate limits such as maximum connections per IP,
130    /// timeouts, and rate windows. Per-subject limits are controlled
131    /// via AuthContext.Limits from the authentication token.
132    pub fn with_websocket_rate_limit_config(mut self, config: RateLimitConfig) -> Self {
133        self.websocket_rate_limit_config = Some(config);
134        self
135    }
136
137    pub async fn run(self) -> Result<()> {
138        info!("Starting HyperStack runtime");
139
140        let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
141
142        let bus_manager = BusManager::new();
143        let entity_cache = EntityCache::new();
144
145        let health_monitor = if let Some(health_config) = &self.config.health {
146            let monitor = HealthMonitor::new(health_config.clone());
147            let _health_task = monitor.start().await;
148            info!("Health monitoring enabled");
149            Some(monitor)
150        } else {
151            None
152        };
153
154        #[cfg(feature = "otel")]
155        let projector = Projector::new(
156            self.view_index.clone(),
157            bus_manager.clone(),
158            entity_cache.clone(),
159            mutations_rx,
160            self.metrics.clone(),
161        );
162        #[cfg(not(feature = "otel"))]
163        let projector = Projector::new(
164            self.view_index.clone(),
165            bus_manager.clone(),
166            entity_cache.clone(),
167            mutations_rx,
168        );
169
170        let projector_handle = tokio::spawn(
171            async move {
172                projector.run().await;
173            }
174            .instrument(info_span!("projector")),
175        );
176
177        let ws_handle = if let Some(ws_config) = &self.config.websocket {
178            #[cfg(feature = "otel")]
179            let mut ws_server = WebSocketServer::new(
180                ws_config.bind_address,
181                bus_manager.clone(),
182                entity_cache.clone(),
183                self.view_index.clone(),
184                self.metrics.clone(),
185            );
186            #[cfg(not(feature = "otel"))]
187            let mut ws_server = WebSocketServer::new(
188                ws_config.bind_address,
189                bus_manager.clone(),
190                entity_cache.clone(),
191                self.view_index.clone(),
192            );
193
194            if let Some(max_clients) = self.websocket_max_clients {
195                ws_server = ws_server.with_max_clients(max_clients);
196            }
197
198            if let Some(plugin) = self.websocket_auth_plugin.clone() {
199                ws_server = ws_server.with_auth_plugin(plugin);
200            }
201
202            if let Some(emitter) = self.websocket_usage_emitter.clone() {
203                ws_server = ws_server.with_usage_emitter(emitter);
204            }
205
206            if let Some(rate_limit_config) = self.websocket_rate_limit_config {
207                ws_server = ws_server.with_rate_limit_config(rate_limit_config);
208            }
209
210            let bind_addr = ws_config.bind_address;
211            Some(tokio::spawn(
212                async move {
213                    if let Err(e) = ws_server.start().await {
214                        error!("WebSocket server error: {}", e);
215                    }
216                }
217                .instrument(info_span!("ws.server", %bind_addr)),
218            ))
219        } else {
220            None
221        };
222
223        let parser_handle = if let Some(spec) = self.spec {
224            if let Some(parser_setup) = spec.parser_setup {
225                let program_id = spec
226                    .program_ids
227                    .first()
228                    .cloned()
229                    .unwrap_or_else(|| "unknown".to_string());
230                info!("Starting Vixen parser runtime for program: {}", program_id);
231                let tx = mutations_tx.clone();
232                let health = health_monitor.clone();
233                let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
234                Some(tokio::spawn(
235                    async move {
236                        if let Err(e) = parser_setup(tx, health, reconnection_config).await {
237                            error!("Vixen parser runtime error: {}", e);
238                        }
239                    }
240                    .instrument(info_span!("vixen.parser", %program_id)),
241                ))
242            } else {
243                info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
244                None
245            }
246        } else {
247            info!("No spec provided - running in websocket-only mode");
248            None
249        };
250
251        // Run the HTTP health server on a dedicated OS thread with its own single-threaded
252        // tokio runtime. This isolates it from the main runtime so that liveness probes
253        // always respond even when the event processing pipeline saturates worker threads
254        // (e.g. due to std::sync::Mutex contention on VmContext under high throughput).
255        let _http_health_handle = if let Some(http_health_config) = &self.config.http_health {
256            let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
257            if let Some(monitor) = health_monitor.clone() {
258                http_server = http_server.with_health_monitor(monitor);
259            }
260
261            let bind_addr = http_health_config.bind_address;
262            let join_handle = std::thread::Builder::new()
263                .name("health-server".into())
264                .spawn(move || {
265                    let rt = tokio::runtime::Builder::new_current_thread()
266                        .enable_all()
267                        .build()
268                        .expect("Failed to create health server runtime");
269                    rt.block_on(async move {
270                        let _span = info_span!("http.health", %bind_addr).entered();
271                        if let Err(e) = http_server.start().await {
272                            error!("HTTP health server error: {}", e);
273                        }
274                    });
275                })
276                .expect("Failed to spawn health server thread");
277            info!(
278                "HTTP health server running on dedicated thread at {}",
279                bind_addr
280            );
281            Some(join_handle)
282        } else {
283            None
284        };
285
286        let bus_cleanup_handle = {
287            let bus = bus_manager.clone();
288            tokio::spawn(
289                async move {
290                    let mut interval = tokio::time::interval(Duration::from_secs(60));
291                    loop {
292                        interval.tick().await;
293                        let state_cleaned = bus.cleanup_stale_state_buses().await;
294                        let list_cleaned = bus.cleanup_stale_list_buses().await;
295                        if state_cleaned > 0 || list_cleaned > 0 {
296                            let (state_count, list_count) = bus.bus_counts().await;
297                            info!(
298                                "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
299                                state_cleaned, list_cleaned, state_count, list_count
300                            );
301                        }
302                    }
303                }
304                .instrument(info_span!("bus.cleanup")),
305            )
306        };
307
308        let stats_handle = {
309            let bus = bus_manager.clone();
310            let cache = entity_cache.clone();
311            tokio::spawn(
312                async move {
313                    let mut interval = tokio::time::interval(Duration::from_secs(30));
314                    loop {
315                        interval.tick().await;
316                        let (_state_buses, _list_buses) = bus.bus_counts().await;
317                        let _cache_stats = cache.stats().await;
318                    }
319                }
320                .instrument(info_span!("stats.reporter")),
321            )
322        };
323
324        info!("HyperStack runtime is running. Press Ctrl+C to stop.");
325
326        // Wait for any task to complete (or handle shutdown signals)
327        tokio::select! {
328            _ = async {
329                if let Some(handle) = ws_handle {
330                    handle.await
331                } else {
332                    std::future::pending().await
333                }
334            } => {
335                info!("WebSocket server task completed");
336            }
337            _ = projector_handle => {
338                info!("Projector task completed");
339            }
340            _ = async {
341                if let Some(handle) = parser_handle {
342                    handle.await
343                } else {
344                    std::future::pending().await
345                }
346            } => {
347                info!("Parser runtime task completed");
348            }
349            _ = bus_cleanup_handle => {
350                info!("Bus cleanup task completed");
351            }
352            _ = stats_handle => {
353                info!("Stats reporter task completed");
354            }
355            _ = shutdown_signal() => {}
356        }
357
358        info!("Shutting down HyperStack runtime");
359        Ok(())
360    }
361}