Skip to main content

hyperstack_server/
runtime.rs

1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::materialized_view::MaterializedViewRegistry;
7use crate::mutation_batch::MutationBatch;
8use crate::projector::Projector;
9use crate::view::ViewIndex;
10use crate::websocket::WebSocketServer;
11use crate::Spec;
12use anyhow::Result;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tracing::{error, info, info_span, Instrument};
17
18#[cfg(feature = "otel")]
19use crate::metrics::Metrics;
20
21/// Wait for shutdown signal (SIGINT on all platforms, SIGTERM on Unix)
22async fn shutdown_signal() {
23    let ctrl_c = async {
24        tokio::signal::ctrl_c()
25            .await
26            .expect("Failed to install Ctrl+C handler");
27    };
28
29    #[cfg(unix)]
30    let terminate = async {
31        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
32            .expect("Failed to install SIGTERM handler")
33            .recv()
34            .await;
35    };
36
37    #[cfg(not(unix))]
38    let terminate = std::future::pending::<()>();
39
40    tokio::select! {
41        _ = ctrl_c => {
42            info!("Received SIGINT (Ctrl+C), initiating shutdown");
43        }
44        _ = terminate => {
45            info!("Received SIGTERM, initiating graceful shutdown");
46        }
47    }
48}
49
50pub struct Runtime {
51    config: ServerConfig,
52    view_index: Arc<ViewIndex>,
53    spec: Option<Spec>,
54    materialized_views: Option<MaterializedViewRegistry>,
55    #[cfg(feature = "otel")]
56    metrics: Option<Arc<Metrics>>,
57}
58
59impl Runtime {
60    #[cfg(feature = "otel")]
61    pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
62        Self {
63            config,
64            view_index: Arc::new(view_index),
65            spec: None,
66            materialized_views: None,
67            metrics,
68        }
69    }
70
71    #[cfg(not(feature = "otel"))]
72    pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
73        Self {
74            config,
75            view_index: Arc::new(view_index),
76            spec: None,
77            materialized_views: None,
78        }
79    }
80
81    pub fn with_spec(mut self, spec: Spec) -> Self {
82        self.spec = Some(spec);
83        self
84    }
85
86    pub fn with_materialized_views(mut self, registry: MaterializedViewRegistry) -> Self {
87        self.materialized_views = Some(registry);
88        self
89    }
90
91    pub async fn run(self) -> Result<()> {
92        info!("Starting HyperStack runtime");
93
94        let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
95
96        let bus_manager = BusManager::new();
97        let entity_cache = EntityCache::new();
98
99        let health_monitor = if let Some(health_config) = &self.config.health {
100            let monitor = HealthMonitor::new(health_config.clone());
101            let _health_task = monitor.start().await;
102            info!("Health monitoring enabled");
103            Some(monitor)
104        } else {
105            None
106        };
107
108        #[cfg(feature = "otel")]
109        let projector = Projector::new(
110            self.view_index.clone(),
111            bus_manager.clone(),
112            entity_cache.clone(),
113            mutations_rx,
114            self.metrics.clone(),
115        );
116        #[cfg(not(feature = "otel"))]
117        let projector = Projector::new(
118            self.view_index.clone(),
119            bus_manager.clone(),
120            entity_cache.clone(),
121            mutations_rx,
122        );
123
124        let projector_handle = tokio::spawn(
125            async move {
126                projector.run().await;
127            }
128            .instrument(info_span!("projector")),
129        );
130
131        let ws_handle = if let Some(ws_config) = &self.config.websocket {
132            #[cfg(feature = "otel")]
133            let ws_server = WebSocketServer::new(
134                ws_config.bind_address,
135                bus_manager.clone(),
136                entity_cache.clone(),
137                self.view_index.clone(),
138                self.metrics.clone(),
139            );
140            #[cfg(not(feature = "otel"))]
141            let ws_server = WebSocketServer::new(
142                ws_config.bind_address,
143                bus_manager.clone(),
144                entity_cache.clone(),
145                self.view_index.clone(),
146            );
147
148            let bind_addr = ws_config.bind_address;
149            Some(tokio::spawn(
150                async move {
151                    if let Err(e) = ws_server.start().await {
152                        error!("WebSocket server error: {}", e);
153                    }
154                }
155                .instrument(info_span!("ws.server", %bind_addr)),
156            ))
157        } else {
158            None
159        };
160
161        let parser_handle = if let Some(spec) = self.spec {
162            if let Some(parser_setup) = spec.parser_setup {
163                let program_id = spec
164                    .program_ids
165                    .first()
166                    .cloned()
167                    .unwrap_or_else(|| "unknown".to_string());
168                info!(
169                    "Starting Vixen parser runtime for program: {}",
170                    program_id
171                );
172                let tx = mutations_tx.clone();
173                let health = health_monitor.clone();
174                let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
175                Some(tokio::spawn(
176                    async move {
177                        if let Err(e) = parser_setup(tx, health, reconnection_config).await {
178                            error!("Vixen parser runtime error: {}", e);
179                        }
180                    }
181                    .instrument(info_span!("vixen.parser", %program_id)),
182                ))
183            } else {
184                info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
185                None
186            }
187        } else {
188            info!("No spec provided - running in websocket-only mode");
189            None
190        };
191
192        // Run the HTTP health server on a dedicated OS thread with its own single-threaded
193        // tokio runtime. This isolates it from the main runtime so that liveness probes
194        // always respond even when the event processing pipeline saturates worker threads
195        // (e.g. due to std::sync::Mutex contention on VmContext under high throughput).
196        let _http_health_handle = if let Some(http_health_config) = &self.config.http_health {
197            let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
198            if let Some(monitor) = health_monitor.clone() {
199                http_server = http_server.with_health_monitor(monitor);
200            }
201
202            let bind_addr = http_health_config.bind_address;
203            let join_handle = std::thread::Builder::new()
204                .name("health-server".into())
205                .spawn(move || {
206                    let rt = tokio::runtime::Builder::new_current_thread()
207                        .enable_all()
208                        .build()
209                        .expect("Failed to create health server runtime");
210                    rt.block_on(async move {
211                        let _span = info_span!("http.health", %bind_addr).entered();
212                        if let Err(e) = http_server.start().await {
213                            error!("HTTP health server error: {}", e);
214                        }
215                    });
216                })
217                .expect("Failed to spawn health server thread");
218            info!("HTTP health server running on dedicated thread at {}", bind_addr);
219            Some(join_handle)
220        } else {
221            None
222        };
223
224        let bus_cleanup_handle = {
225            let bus = bus_manager.clone();
226            tokio::spawn(
227                async move {
228                    let mut interval = tokio::time::interval(Duration::from_secs(60));
229                    loop {
230                        interval.tick().await;
231                        let state_cleaned = bus.cleanup_stale_state_buses().await;
232                        let list_cleaned = bus.cleanup_stale_list_buses().await;
233                        if state_cleaned > 0 || list_cleaned > 0 {
234                            let (state_count, list_count) = bus.bus_counts().await;
235                            info!(
236                                "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
237                                state_cleaned, list_cleaned, state_count, list_count
238                            );
239                        }
240                    }
241                }
242                .instrument(info_span!("bus.cleanup")),
243            )
244        };
245
246        let stats_handle = {
247            let bus = bus_manager.clone();
248            let cache = entity_cache.clone();
249            tokio::spawn(
250                async move {
251                    let mut interval = tokio::time::interval(Duration::from_secs(30));
252                    loop {
253                        interval.tick().await;
254                        let (_state_buses, _list_buses) = bus.bus_counts().await;
255                        let _cache_stats = cache.stats().await;
256                    }
257                }
258                .instrument(info_span!("stats.reporter")),
259            )
260        };
261
262        info!("HyperStack runtime is running. Press Ctrl+C to stop.");
263
264        // Wait for any task to complete (or handle shutdown signals)
265        tokio::select! {
266            _ = async {
267                if let Some(handle) = ws_handle {
268                    handle.await
269                } else {
270                    std::future::pending().await
271                }
272            } => {
273                info!("WebSocket server task completed");
274            }
275            _ = projector_handle => {
276                info!("Projector task completed");
277            }
278            _ = async {
279                if let Some(handle) = parser_handle {
280                    handle.await
281                } else {
282                    std::future::pending().await
283                }
284            } => {
285                info!("Parser runtime task completed");
286            }
287            _ = bus_cleanup_handle => {
288                info!("Bus cleanup task completed");
289            }
290            _ = stats_handle => {
291                info!("Stats reporter task completed");
292            }
293            _ = shutdown_signal() => {}
294        }
295
296        info!("Shutting down HyperStack runtime");
297        Ok(())
298    }
299}