hyperstack_server/
runtime.rs1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::materialized_view::MaterializedViewRegistry;
7use crate::mutation_batch::MutationBatch;
8use crate::projector::Projector;
9use crate::view::ViewIndex;
10use crate::websocket::WebSocketServer;
11use crate::Spec;
12use anyhow::Result;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tracing::{error, info, info_span, Instrument};
17
18#[cfg(feature = "otel")]
19use crate::metrics::Metrics;
20
21async fn shutdown_signal() {
23 let ctrl_c = async {
24 tokio::signal::ctrl_c()
25 .await
26 .expect("Failed to install Ctrl+C handler");
27 };
28
29 #[cfg(unix)]
30 let terminate = async {
31 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
32 .expect("Failed to install SIGTERM handler")
33 .recv()
34 .await;
35 };
36
37 #[cfg(not(unix))]
38 let terminate = std::future::pending::<()>();
39
40 tokio::select! {
41 _ = ctrl_c => {
42 info!("Received SIGINT (Ctrl+C), initiating shutdown");
43 }
44 _ = terminate => {
45 info!("Received SIGTERM, initiating graceful shutdown");
46 }
47 }
48}
49
50pub struct Runtime {
51 config: ServerConfig,
52 view_index: Arc<ViewIndex>,
53 spec: Option<Spec>,
54 materialized_views: Option<MaterializedViewRegistry>,
55 #[cfg(feature = "otel")]
56 metrics: Option<Arc<Metrics>>,
57}
58
59impl Runtime {
60 #[cfg(feature = "otel")]
61 pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
62 Self {
63 config,
64 view_index: Arc::new(view_index),
65 spec: None,
66 materialized_views: None,
67 metrics,
68 }
69 }
70
71 #[cfg(not(feature = "otel"))]
72 pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
73 Self {
74 config,
75 view_index: Arc::new(view_index),
76 spec: None,
77 materialized_views: None,
78 }
79 }
80
81 pub fn with_spec(mut self, spec: Spec) -> Self {
82 self.spec = Some(spec);
83 self
84 }
85
86 pub fn with_materialized_views(mut self, registry: MaterializedViewRegistry) -> Self {
87 self.materialized_views = Some(registry);
88 self
89 }
90
91 pub async fn run(self) -> Result<()> {
92 info!("Starting HyperStack runtime");
93
94 let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
95
96 let bus_manager = BusManager::new();
97 let entity_cache = EntityCache::new();
98
99 let health_monitor = if let Some(health_config) = &self.config.health {
100 let monitor = HealthMonitor::new(health_config.clone());
101 let _health_task = monitor.start().await;
102 info!("Health monitoring enabled");
103 Some(monitor)
104 } else {
105 None
106 };
107
108 #[cfg(feature = "otel")]
109 let projector = Projector::new(
110 self.view_index.clone(),
111 bus_manager.clone(),
112 entity_cache.clone(),
113 mutations_rx,
114 self.metrics.clone(),
115 );
116 #[cfg(not(feature = "otel"))]
117 let projector = Projector::new(
118 self.view_index.clone(),
119 bus_manager.clone(),
120 entity_cache.clone(),
121 mutations_rx,
122 );
123
124 let projector_handle = tokio::spawn(
125 async move {
126 projector.run().await;
127 }
128 .instrument(info_span!("projector")),
129 );
130
131 let ws_handle = if let Some(ws_config) = &self.config.websocket {
132 #[cfg(feature = "otel")]
133 let ws_server = WebSocketServer::new(
134 ws_config.bind_address,
135 bus_manager.clone(),
136 entity_cache.clone(),
137 self.view_index.clone(),
138 self.metrics.clone(),
139 );
140 #[cfg(not(feature = "otel"))]
141 let ws_server = WebSocketServer::new(
142 ws_config.bind_address,
143 bus_manager.clone(),
144 entity_cache.clone(),
145 self.view_index.clone(),
146 );
147
148 let bind_addr = ws_config.bind_address;
149 Some(tokio::spawn(
150 async move {
151 if let Err(e) = ws_server.start().await {
152 error!("WebSocket server error: {}", e);
153 }
154 }
155 .instrument(info_span!("ws.server", %bind_addr)),
156 ))
157 } else {
158 None
159 };
160
161 let parser_handle = if let Some(spec) = self.spec {
162 if let Some(parser_setup) = spec.parser_setup {
163 let program_id = spec
164 .program_ids
165 .first()
166 .cloned()
167 .unwrap_or_else(|| "unknown".to_string());
168 info!(
169 "Starting Vixen parser runtime for program: {}",
170 program_id
171 );
172 let tx = mutations_tx.clone();
173 let health = health_monitor.clone();
174 let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
175 Some(tokio::spawn(
176 async move {
177 if let Err(e) = parser_setup(tx, health, reconnection_config).await {
178 error!("Vixen parser runtime error: {}", e);
179 }
180 }
181 .instrument(info_span!("vixen.parser", %program_id)),
182 ))
183 } else {
184 info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
185 None
186 }
187 } else {
188 info!("No spec provided - running in websocket-only mode");
189 None
190 };
191
192 let _http_health_handle = if let Some(http_health_config) = &self.config.http_health {
197 let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
198 if let Some(monitor) = health_monitor.clone() {
199 http_server = http_server.with_health_monitor(monitor);
200 }
201
202 let bind_addr = http_health_config.bind_address;
203 let join_handle = std::thread::Builder::new()
204 .name("health-server".into())
205 .spawn(move || {
206 let rt = tokio::runtime::Builder::new_current_thread()
207 .enable_all()
208 .build()
209 .expect("Failed to create health server runtime");
210 rt.block_on(async move {
211 let _span = info_span!("http.health", %bind_addr).entered();
212 if let Err(e) = http_server.start().await {
213 error!("HTTP health server error: {}", e);
214 }
215 });
216 })
217 .expect("Failed to spawn health server thread");
218 info!("HTTP health server running on dedicated thread at {}", bind_addr);
219 Some(join_handle)
220 } else {
221 None
222 };
223
224 let bus_cleanup_handle = {
225 let bus = bus_manager.clone();
226 tokio::spawn(
227 async move {
228 let mut interval = tokio::time::interval(Duration::from_secs(60));
229 loop {
230 interval.tick().await;
231 let state_cleaned = bus.cleanup_stale_state_buses().await;
232 let list_cleaned = bus.cleanup_stale_list_buses().await;
233 if state_cleaned > 0 || list_cleaned > 0 {
234 let (state_count, list_count) = bus.bus_counts().await;
235 info!(
236 "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
237 state_cleaned, list_cleaned, state_count, list_count
238 );
239 }
240 }
241 }
242 .instrument(info_span!("bus.cleanup")),
243 )
244 };
245
246 let stats_handle = {
247 let bus = bus_manager.clone();
248 let cache = entity_cache.clone();
249 tokio::spawn(
250 async move {
251 let mut interval = tokio::time::interval(Duration::from_secs(30));
252 loop {
253 interval.tick().await;
254 let (_state_buses, _list_buses) = bus.bus_counts().await;
255 let _cache_stats = cache.stats().await;
256 }
257 }
258 .instrument(info_span!("stats.reporter")),
259 )
260 };
261
262 info!("HyperStack runtime is running. Press Ctrl+C to stop.");
263
264 tokio::select! {
266 _ = async {
267 if let Some(handle) = ws_handle {
268 handle.await
269 } else {
270 std::future::pending().await
271 }
272 } => {
273 info!("WebSocket server task completed");
274 }
275 _ = projector_handle => {
276 info!("Projector task completed");
277 }
278 _ = async {
279 if let Some(handle) = parser_handle {
280 handle.await
281 } else {
282 std::future::pending().await
283 }
284 } => {
285 info!("Parser runtime task completed");
286 }
287 _ = bus_cleanup_handle => {
288 info!("Bus cleanup task completed");
289 }
290 _ = stats_handle => {
291 info!("Stats reporter task completed");
292 }
293 _ = shutdown_signal() => {}
294 }
295
296 info!("Shutting down HyperStack runtime");
297 Ok(())
298 }
299}