Skip to main content

hyperstack_server/
runtime.rs

1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::materialized_view::MaterializedViewRegistry;
7use crate::mutation_batch::MutationBatch;
8use crate::projector::Projector;
9use crate::view::ViewIndex;
10use crate::websocket::WebSocketServer;
11use crate::Spec;
12use anyhow::Result;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tracing::{error, info, info_span, Instrument};
17
18#[cfg(feature = "otel")]
19use crate::metrics::Metrics;
20
21/// Wait for shutdown signal (SIGINT on all platforms, SIGTERM on Unix)
22async fn shutdown_signal() {
23    let ctrl_c = async {
24        tokio::signal::ctrl_c()
25            .await
26            .expect("Failed to install Ctrl+C handler");
27    };
28
29    #[cfg(unix)]
30    let terminate = async {
31        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
32            .expect("Failed to install SIGTERM handler")
33            .recv()
34            .await;
35    };
36
37    #[cfg(not(unix))]
38    let terminate = std::future::pending::<()>();
39
40    tokio::select! {
41        _ = ctrl_c => {
42            info!("Received SIGINT (Ctrl+C), initiating shutdown");
43        }
44        _ = terminate => {
45            info!("Received SIGTERM, initiating graceful shutdown");
46        }
47    }
48}
49
50pub struct Runtime {
51    config: ServerConfig,
52    view_index: Arc<ViewIndex>,
53    spec: Option<Spec>,
54    materialized_views: Option<MaterializedViewRegistry>,
55    #[cfg(feature = "otel")]
56    metrics: Option<Arc<Metrics>>,
57}
58
59impl Runtime {
60    #[cfg(feature = "otel")]
61    pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
62        Self {
63            config,
64            view_index: Arc::new(view_index),
65            spec: None,
66            materialized_views: None,
67            metrics,
68        }
69    }
70
71    #[cfg(not(feature = "otel"))]
72    pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
73        Self {
74            config,
75            view_index: Arc::new(view_index),
76            spec: None,
77            materialized_views: None,
78        }
79    }
80
81    pub fn with_spec(mut self, spec: Spec) -> Self {
82        self.spec = Some(spec);
83        self
84    }
85
86    pub fn with_materialized_views(mut self, registry: MaterializedViewRegistry) -> Self {
87        self.materialized_views = Some(registry);
88        self
89    }
90
91    pub async fn run(self) -> Result<()> {
92        info!("Starting HyperStack runtime");
93
94        let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
95
96        let bus_manager = BusManager::new();
97        let entity_cache = EntityCache::new();
98
99        let health_monitor = if let Some(health_config) = &self.config.health {
100            let monitor = HealthMonitor::new(health_config.clone());
101            let _health_task = monitor.start().await;
102            info!("Health monitoring enabled");
103            Some(monitor)
104        } else {
105            None
106        };
107
108        #[cfg(feature = "otel")]
109        let projector = Projector::new(
110            self.view_index.clone(),
111            bus_manager.clone(),
112            entity_cache.clone(),
113            mutations_rx,
114            self.metrics.clone(),
115        );
116        #[cfg(not(feature = "otel"))]
117        let projector = Projector::new(
118            self.view_index.clone(),
119            bus_manager.clone(),
120            entity_cache.clone(),
121            mutations_rx,
122        );
123
124        let projector_handle = tokio::spawn(
125            async move {
126                projector.run().await;
127            }
128            .instrument(info_span!("projector")),
129        );
130
131        let ws_handle = if let Some(ws_config) = &self.config.websocket {
132            #[cfg(feature = "otel")]
133            let ws_server = WebSocketServer::new(
134                ws_config.bind_address,
135                bus_manager.clone(),
136                entity_cache.clone(),
137                self.view_index.clone(),
138                self.metrics.clone(),
139            );
140            #[cfg(not(feature = "otel"))]
141            let ws_server = WebSocketServer::new(
142                ws_config.bind_address,
143                bus_manager.clone(),
144                entity_cache.clone(),
145                self.view_index.clone(),
146            );
147
148            let bind_addr = ws_config.bind_address;
149            Some(tokio::spawn(
150                async move {
151                    if let Err(e) = ws_server.start().await {
152                        error!("WebSocket server error: {}", e);
153                    }
154                }
155                .instrument(info_span!("ws.server", %bind_addr)),
156            ))
157        } else {
158            None
159        };
160
161        let parser_handle = if let Some(spec) = self.spec {
162            if let Some(parser_setup) = spec.parser_setup {
163                info!(
164                    "Starting Vixen parser runtime for program: {}",
165                    spec.program_id
166                );
167                let tx = mutations_tx.clone();
168                let health = health_monitor.clone();
169                let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
170                let program_id = spec.program_id.clone();
171                Some(tokio::spawn(
172                    async move {
173                        if let Err(e) = parser_setup(tx, health, reconnection_config).await {
174                            error!("Vixen parser runtime error: {}", e);
175                        }
176                    }
177                    .instrument(info_span!("vixen.parser", %program_id)),
178                ))
179            } else {
180                info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
181                None
182            }
183        } else {
184            info!("No spec provided - running in websocket-only mode");
185            None
186        };
187
188        // Run the HTTP health server on a dedicated OS thread with its own single-threaded
189        // tokio runtime. This isolates it from the main runtime so that liveness probes
190        // always respond even when the event processing pipeline saturates worker threads
191        // (e.g. due to std::sync::Mutex contention on VmContext under high throughput).
192        let _http_health_handle = if let Some(http_health_config) = &self.config.http_health {
193            let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
194            if let Some(monitor) = health_monitor.clone() {
195                http_server = http_server.with_health_monitor(monitor);
196            }
197
198            let bind_addr = http_health_config.bind_address;
199            let join_handle = std::thread::Builder::new()
200                .name("health-server".into())
201                .spawn(move || {
202                    let rt = tokio::runtime::Builder::new_current_thread()
203                        .enable_all()
204                        .build()
205                        .expect("Failed to create health server runtime");
206                    rt.block_on(async move {
207                        let _span = info_span!("http.health", %bind_addr).entered();
208                        if let Err(e) = http_server.start().await {
209                            error!("HTTP health server error: {}", e);
210                        }
211                    });
212                })
213                .expect("Failed to spawn health server thread");
214            info!("HTTP health server running on dedicated thread at {}", bind_addr);
215            Some(join_handle)
216        } else {
217            None
218        };
219
220        let bus_cleanup_handle = {
221            let bus = bus_manager.clone();
222            tokio::spawn(
223                async move {
224                    let mut interval = tokio::time::interval(Duration::from_secs(60));
225                    loop {
226                        interval.tick().await;
227                        let state_cleaned = bus.cleanup_stale_state_buses().await;
228                        let list_cleaned = bus.cleanup_stale_list_buses().await;
229                        if state_cleaned > 0 || list_cleaned > 0 {
230                            let (state_count, list_count) = bus.bus_counts().await;
231                            info!(
232                                "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
233                                state_cleaned, list_cleaned, state_count, list_count
234                            );
235                        }
236                    }
237                }
238                .instrument(info_span!("bus.cleanup")),
239            )
240        };
241
242        let stats_handle = {
243            let bus = bus_manager.clone();
244            let cache = entity_cache.clone();
245            tokio::spawn(
246                async move {
247                    let mut interval = tokio::time::interval(Duration::from_secs(30));
248                    loop {
249                        interval.tick().await;
250                        let (_state_buses, _list_buses) = bus.bus_counts().await;
251                        let _cache_stats = cache.stats().await;
252                    }
253                }
254                .instrument(info_span!("stats.reporter")),
255            )
256        };
257
258        info!("HyperStack runtime is running. Press Ctrl+C to stop.");
259
260        // Wait for any task to complete (or handle shutdown signals)
261        tokio::select! {
262            _ = async {
263                if let Some(handle) = ws_handle {
264                    handle.await
265                } else {
266                    std::future::pending().await
267                }
268            } => {
269                info!("WebSocket server task completed");
270            }
271            _ = projector_handle => {
272                info!("Projector task completed");
273            }
274            _ = async {
275                if let Some(handle) = parser_handle {
276                    handle.await
277                } else {
278                    std::future::pending().await
279                }
280            } => {
281                info!("Parser runtime task completed");
282            }
283            _ = bus_cleanup_handle => {
284                info!("Bus cleanup task completed");
285            }
286            _ = stats_handle => {
287                info!("Stats reporter task completed");
288            }
289            _ = shutdown_signal() => {}
290        }
291
292        info!("Shutting down HyperStack runtime");
293        Ok(())
294    }
295}