hyperstack_server/
runtime.rs

1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::mutation_batch::MutationBatch;
7use crate::projector::Projector;
8use crate::view::ViewIndex;
9use crate::websocket::WebSocketServer;
10use crate::Spec;
11use anyhow::Result;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tracing::{error, info, info_span, Instrument};
16
17#[cfg(feature = "otel")]
18use crate::metrics::Metrics;
19
20/// Wait for shutdown signal (SIGINT on all platforms, SIGTERM on Unix)
21async fn shutdown_signal() {
22    let ctrl_c = async {
23        tokio::signal::ctrl_c()
24            .await
25            .expect("Failed to install Ctrl+C handler");
26    };
27
28    #[cfg(unix)]
29    let terminate = async {
30        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
31            .expect("Failed to install SIGTERM handler")
32            .recv()
33            .await;
34    };
35
36    #[cfg(not(unix))]
37    let terminate = std::future::pending::<()>();
38
39    tokio::select! {
40        _ = ctrl_c => {
41            info!("Received SIGINT (Ctrl+C), initiating shutdown");
42        }
43        _ = terminate => {
44            info!("Received SIGTERM, initiating graceful shutdown");
45        }
46    }
47}
48
49/// Runtime orchestrator that manages all server components
50pub struct Runtime {
51    config: ServerConfig,
52    view_index: Arc<ViewIndex>,
53    spec: Option<Spec>,
54    #[cfg(feature = "otel")]
55    metrics: Option<Arc<Metrics>>,
56}
57
58impl Runtime {
59    #[cfg(feature = "otel")]
60    pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
61        Self {
62            config,
63            view_index: Arc::new(view_index),
64            spec: None,
65            metrics,
66        }
67    }
68
69    #[cfg(not(feature = "otel"))]
70    pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
71        Self {
72            config,
73            view_index: Arc::new(view_index),
74            spec: None,
75        }
76    }
77
78    pub fn with_spec(mut self, spec: Spec) -> Self {
79        self.spec = Some(spec);
80        self
81    }
82
83    pub async fn run(self) -> Result<()> {
84        info!("Starting HyperStack runtime");
85
86        let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
87
88        let bus_manager = BusManager::new();
89        let entity_cache = EntityCache::new();
90
91        let health_monitor = if let Some(health_config) = &self.config.health {
92            let monitor = HealthMonitor::new(health_config.clone());
93            let _health_task = monitor.start().await;
94            info!("Health monitoring enabled");
95            Some(monitor)
96        } else {
97            None
98        };
99
100        #[cfg(feature = "otel")]
101        let projector = Projector::new(
102            self.view_index.clone(),
103            bus_manager.clone(),
104            entity_cache.clone(),
105            mutations_rx,
106            self.metrics.clone(),
107        );
108        #[cfg(not(feature = "otel"))]
109        let projector = Projector::new(
110            self.view_index.clone(),
111            bus_manager.clone(),
112            entity_cache.clone(),
113            mutations_rx,
114        );
115
116        let projector_handle = tokio::spawn(
117            async move {
118                projector.run().await;
119            }
120            .instrument(info_span!("projector")),
121        );
122
123        let ws_handle = if let Some(ws_config) = &self.config.websocket {
124            #[cfg(feature = "otel")]
125            let ws_server = WebSocketServer::new(
126                ws_config.bind_address,
127                bus_manager.clone(),
128                entity_cache.clone(),
129                self.view_index.clone(),
130                self.metrics.clone(),
131            );
132            #[cfg(not(feature = "otel"))]
133            let ws_server = WebSocketServer::new(
134                ws_config.bind_address,
135                bus_manager.clone(),
136                entity_cache.clone(),
137                self.view_index.clone(),
138            );
139
140            let bind_addr = ws_config.bind_address;
141            Some(tokio::spawn(
142                async move {
143                    if let Err(e) = ws_server.start().await {
144                        error!("WebSocket server error: {}", e);
145                    }
146                }
147                .instrument(info_span!("ws.server", %bind_addr)),
148            ))
149        } else {
150            None
151        };
152
153        let parser_handle = if let Some(spec) = self.spec {
154            if let Some(parser_setup) = spec.parser_setup {
155                info!(
156                    "Starting Vixen parser runtime for program: {}",
157                    spec.program_id
158                );
159                let tx = mutations_tx.clone();
160                let health = health_monitor.clone();
161                let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
162                let program_id = spec.program_id.clone();
163                Some(tokio::spawn(
164                    async move {
165                        if let Err(e) = parser_setup(tx, health, reconnection_config).await {
166                            error!("Vixen parser runtime error: {}", e);
167                        }
168                    }
169                    .instrument(info_span!("vixen.parser", %program_id)),
170                ))
171            } else {
172                info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
173                None
174            }
175        } else {
176            info!("No spec provided - running in websocket-only mode");
177            None
178        };
179
180        let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
181            let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
182            if let Some(monitor) = health_monitor.clone() {
183                http_server = http_server.with_health_monitor(monitor);
184            }
185
186            let bind_addr = http_health_config.bind_address;
187            Some(tokio::spawn(
188                async move {
189                    if let Err(e) = http_server.start().await {
190                        error!("HTTP health server error: {}", e);
191                    }
192                }
193                .instrument(info_span!("http.health", %bind_addr)),
194            ))
195        } else {
196            None
197        };
198
199        let bus_cleanup_handle = {
200            let bus = bus_manager.clone();
201            tokio::spawn(
202                async move {
203                    let mut interval = tokio::time::interval(Duration::from_secs(60));
204                    loop {
205                        interval.tick().await;
206                        let state_cleaned = bus.cleanup_stale_state_buses().await;
207                        let list_cleaned = bus.cleanup_stale_list_buses().await;
208                        if state_cleaned > 0 || list_cleaned > 0 {
209                            let (state_count, list_count) = bus.bus_counts().await;
210                            info!(
211                                "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
212                                state_cleaned, list_cleaned, state_count, list_count
213                            );
214                        }
215                    }
216                }
217                .instrument(info_span!("bus.cleanup")),
218            )
219        };
220
221        let stats_handle = {
222            let bus = bus_manager.clone();
223            let cache = entity_cache.clone();
224            tokio::spawn(
225                async move {
226                    let mut interval = tokio::time::interval(Duration::from_secs(30));
227                    loop {
228                        interval.tick().await;
229                        let (_state_buses, _list_buses) = bus.bus_counts().await;
230                        let _cache_stats = cache.stats().await;
231                    }
232                }
233                .instrument(info_span!("stats.reporter")),
234            )
235        };
236
237        info!("HyperStack runtime is running. Press Ctrl+C to stop.");
238
239        // Wait for any task to complete (or handle shutdown signals)
240        tokio::select! {
241            _ = async {
242                if let Some(handle) = ws_handle {
243                    handle.await
244                } else {
245                    std::future::pending().await
246                }
247            } => {
248                info!("WebSocket server task completed");
249            }
250            _ = projector_handle => {
251                info!("Projector task completed");
252            }
253            _ = async {
254                if let Some(handle) = parser_handle {
255                    handle.await
256                } else {
257                    std::future::pending().await
258                }
259            } => {
260                info!("Parser runtime task completed");
261            }
262            _ = async {
263                if let Some(handle) = http_health_handle {
264                    handle.await
265                } else {
266                    std::future::pending().await
267                }
268            } => {
269                info!("HTTP health server task completed");
270            }
271            _ = bus_cleanup_handle => {
272                info!("Bus cleanup task completed");
273            }
274            _ = stats_handle => {
275                info!("Stats reporter task completed");
276            }
277            _ = shutdown_signal() => {}
278        }
279
280        info!("Shutting down HyperStack runtime");
281        Ok(())
282    }
283}