hyperstack_server/
runtime.rs

1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::materialized_view::MaterializedViewRegistry;
7use crate::mutation_batch::MutationBatch;
8use crate::projector::Projector;
9use crate::view::ViewIndex;
10use crate::websocket::WebSocketServer;
11use crate::Spec;
12use anyhow::Result;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tracing::{error, info, info_span, Instrument};
17
18#[cfg(feature = "otel")]
19use crate::metrics::Metrics;
20
21/// Wait for shutdown signal (SIGINT on all platforms, SIGTERM on Unix)
22async fn shutdown_signal() {
23    let ctrl_c = async {
24        tokio::signal::ctrl_c()
25            .await
26            .expect("Failed to install Ctrl+C handler");
27    };
28
29    #[cfg(unix)]
30    let terminate = async {
31        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
32            .expect("Failed to install SIGTERM handler")
33            .recv()
34            .await;
35    };
36
37    #[cfg(not(unix))]
38    let terminate = std::future::pending::<()>();
39
40    tokio::select! {
41        _ = ctrl_c => {
42            info!("Received SIGINT (Ctrl+C), initiating shutdown");
43        }
44        _ = terminate => {
45            info!("Received SIGTERM, initiating graceful shutdown");
46        }
47    }
48}
49
50pub struct Runtime {
51    config: ServerConfig,
52    view_index: Arc<ViewIndex>,
53    spec: Option<Spec>,
54    materialized_views: Option<MaterializedViewRegistry>,
55    #[cfg(feature = "otel")]
56    metrics: Option<Arc<Metrics>>,
57}
58
59impl Runtime {
60    #[cfg(feature = "otel")]
61    pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
62        Self {
63            config,
64            view_index: Arc::new(view_index),
65            spec: None,
66            materialized_views: None,
67            metrics,
68        }
69    }
70
71    #[cfg(not(feature = "otel"))]
72    pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
73        Self {
74            config,
75            view_index: Arc::new(view_index),
76            spec: None,
77            materialized_views: None,
78        }
79    }
80
81    pub fn with_spec(mut self, spec: Spec) -> Self {
82        self.spec = Some(spec);
83        self
84    }
85
86    pub fn with_materialized_views(mut self, registry: MaterializedViewRegistry) -> Self {
87        self.materialized_views = Some(registry);
88        self
89    }
90
91    pub async fn run(self) -> Result<()> {
92        info!("Starting HyperStack runtime");
93
94        let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
95
96        let bus_manager = BusManager::new();
97        let entity_cache = EntityCache::new();
98
99        let health_monitor = if let Some(health_config) = &self.config.health {
100            let monitor = HealthMonitor::new(health_config.clone());
101            let _health_task = monitor.start().await;
102            info!("Health monitoring enabled");
103            Some(monitor)
104        } else {
105            None
106        };
107
108        #[cfg(feature = "otel")]
109        let projector = Projector::new(
110            self.view_index.clone(),
111            bus_manager.clone(),
112            entity_cache.clone(),
113            mutations_rx,
114            self.metrics.clone(),
115        );
116        #[cfg(not(feature = "otel"))]
117        let projector = Projector::new(
118            self.view_index.clone(),
119            bus_manager.clone(),
120            entity_cache.clone(),
121            mutations_rx,
122        );
123
124        let projector_handle = tokio::spawn(
125            async move {
126                projector.run().await;
127            }
128            .instrument(info_span!("projector")),
129        );
130
131        let ws_handle = if let Some(ws_config) = &self.config.websocket {
132            #[cfg(feature = "otel")]
133            let ws_server = WebSocketServer::new(
134                ws_config.bind_address,
135                bus_manager.clone(),
136                entity_cache.clone(),
137                self.view_index.clone(),
138                self.metrics.clone(),
139            );
140            #[cfg(not(feature = "otel"))]
141            let ws_server = WebSocketServer::new(
142                ws_config.bind_address,
143                bus_manager.clone(),
144                entity_cache.clone(),
145                self.view_index.clone(),
146            );
147
148            let bind_addr = ws_config.bind_address;
149            Some(tokio::spawn(
150                async move {
151                    if let Err(e) = ws_server.start().await {
152                        error!("WebSocket server error: {}", e);
153                    }
154                }
155                .instrument(info_span!("ws.server", %bind_addr)),
156            ))
157        } else {
158            None
159        };
160
161        let parser_handle = if let Some(spec) = self.spec {
162            if let Some(parser_setup) = spec.parser_setup {
163                info!(
164                    "Starting Vixen parser runtime for program: {}",
165                    spec.program_id
166                );
167                let tx = mutations_tx.clone();
168                let health = health_monitor.clone();
169                let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
170                let program_id = spec.program_id.clone();
171                Some(tokio::spawn(
172                    async move {
173                        if let Err(e) = parser_setup(tx, health, reconnection_config).await {
174                            error!("Vixen parser runtime error: {}", e);
175                        }
176                    }
177                    .instrument(info_span!("vixen.parser", %program_id)),
178                ))
179            } else {
180                info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
181                None
182            }
183        } else {
184            info!("No spec provided - running in websocket-only mode");
185            None
186        };
187
188        let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
189            let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
190            if let Some(monitor) = health_monitor.clone() {
191                http_server = http_server.with_health_monitor(monitor);
192            }
193
194            let bind_addr = http_health_config.bind_address;
195            Some(tokio::spawn(
196                async move {
197                    if let Err(e) = http_server.start().await {
198                        error!("HTTP health server error: {}", e);
199                    }
200                }
201                .instrument(info_span!("http.health", %bind_addr)),
202            ))
203        } else {
204            None
205        };
206
207        let bus_cleanup_handle = {
208            let bus = bus_manager.clone();
209            tokio::spawn(
210                async move {
211                    let mut interval = tokio::time::interval(Duration::from_secs(60));
212                    loop {
213                        interval.tick().await;
214                        let state_cleaned = bus.cleanup_stale_state_buses().await;
215                        let list_cleaned = bus.cleanup_stale_list_buses().await;
216                        if state_cleaned > 0 || list_cleaned > 0 {
217                            let (state_count, list_count) = bus.bus_counts().await;
218                            info!(
219                                "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
220                                state_cleaned, list_cleaned, state_count, list_count
221                            );
222                        }
223                    }
224                }
225                .instrument(info_span!("bus.cleanup")),
226            )
227        };
228
229        let stats_handle = {
230            let bus = bus_manager.clone();
231            let cache = entity_cache.clone();
232            tokio::spawn(
233                async move {
234                    let mut interval = tokio::time::interval(Duration::from_secs(30));
235                    loop {
236                        interval.tick().await;
237                        let (_state_buses, _list_buses) = bus.bus_counts().await;
238                        let _cache_stats = cache.stats().await;
239                    }
240                }
241                .instrument(info_span!("stats.reporter")),
242            )
243        };
244
245        info!("HyperStack runtime is running. Press Ctrl+C to stop.");
246
247        // Wait for any task to complete (or handle shutdown signals)
248        tokio::select! {
249            _ = async {
250                if let Some(handle) = ws_handle {
251                    handle.await
252                } else {
253                    std::future::pending().await
254                }
255            } => {
256                info!("WebSocket server task completed");
257            }
258            _ = projector_handle => {
259                info!("Projector task completed");
260            }
261            _ = async {
262                if let Some(handle) = parser_handle {
263                    handle.await
264                } else {
265                    std::future::pending().await
266                }
267            } => {
268                info!("Parser runtime task completed");
269            }
270            _ = async {
271                if let Some(handle) = http_health_handle {
272                    handle.await
273                } else {
274                    std::future::pending().await
275                }
276            } => {
277                info!("HTTP health server task completed");
278            }
279            _ = bus_cleanup_handle => {
280                info!("Bus cleanup task completed");
281            }
282            _ = stats_handle => {
283                info!("Stats reporter task completed");
284            }
285            _ = shutdown_signal() => {}
286        }
287
288        info!("Shutting down HyperStack runtime");
289        Ok(())
290    }
291}