hyperstack_server/
runtime.rs1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::materialized_view::MaterializedViewRegistry;
7use crate::mutation_batch::MutationBatch;
8use crate::projector::Projector;
9use crate::view::ViewIndex;
10use crate::websocket::WebSocketServer;
11use crate::Spec;
12use anyhow::Result;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tracing::{error, info, info_span, Instrument};
17
18#[cfg(feature = "otel")]
19use crate::metrics::Metrics;
20
21async fn shutdown_signal() {
23 let ctrl_c = async {
24 tokio::signal::ctrl_c()
25 .await
26 .expect("Failed to install Ctrl+C handler");
27 };
28
29 #[cfg(unix)]
30 let terminate = async {
31 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
32 .expect("Failed to install SIGTERM handler")
33 .recv()
34 .await;
35 };
36
37 #[cfg(not(unix))]
38 let terminate = std::future::pending::<()>();
39
40 tokio::select! {
41 _ = ctrl_c => {
42 info!("Received SIGINT (Ctrl+C), initiating shutdown");
43 }
44 _ = terminate => {
45 info!("Received SIGTERM, initiating graceful shutdown");
46 }
47 }
48}
49
50pub struct Runtime {
51 config: ServerConfig,
52 view_index: Arc<ViewIndex>,
53 spec: Option<Spec>,
54 materialized_views: Option<MaterializedViewRegistry>,
55 #[cfg(feature = "otel")]
56 metrics: Option<Arc<Metrics>>,
57}
58
59impl Runtime {
60 #[cfg(feature = "otel")]
61 pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
62 Self {
63 config,
64 view_index: Arc::new(view_index),
65 spec: None,
66 materialized_views: None,
67 metrics,
68 }
69 }
70
71 #[cfg(not(feature = "otel"))]
72 pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
73 Self {
74 config,
75 view_index: Arc::new(view_index),
76 spec: None,
77 materialized_views: None,
78 }
79 }
80
81 pub fn with_spec(mut self, spec: Spec) -> Self {
82 self.spec = Some(spec);
83 self
84 }
85
86 pub fn with_materialized_views(mut self, registry: MaterializedViewRegistry) -> Self {
87 self.materialized_views = Some(registry);
88 self
89 }
90
91 pub async fn run(self) -> Result<()> {
92 info!("Starting HyperStack runtime");
93
94 let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
95
96 let bus_manager = BusManager::new();
97 let entity_cache = EntityCache::new();
98
99 let health_monitor = if let Some(health_config) = &self.config.health {
100 let monitor = HealthMonitor::new(health_config.clone());
101 let _health_task = monitor.start().await;
102 info!("Health monitoring enabled");
103 Some(monitor)
104 } else {
105 None
106 };
107
108 #[cfg(feature = "otel")]
109 let projector = Projector::new(
110 self.view_index.clone(),
111 bus_manager.clone(),
112 entity_cache.clone(),
113 mutations_rx,
114 self.metrics.clone(),
115 );
116 #[cfg(not(feature = "otel"))]
117 let projector = Projector::new(
118 self.view_index.clone(),
119 bus_manager.clone(),
120 entity_cache.clone(),
121 mutations_rx,
122 );
123
124 let projector_handle = tokio::spawn(
125 async move {
126 projector.run().await;
127 }
128 .instrument(info_span!("projector")),
129 );
130
131 let ws_handle = if let Some(ws_config) = &self.config.websocket {
132 #[cfg(feature = "otel")]
133 let ws_server = WebSocketServer::new(
134 ws_config.bind_address,
135 bus_manager.clone(),
136 entity_cache.clone(),
137 self.view_index.clone(),
138 self.metrics.clone(),
139 );
140 #[cfg(not(feature = "otel"))]
141 let ws_server = WebSocketServer::new(
142 ws_config.bind_address,
143 bus_manager.clone(),
144 entity_cache.clone(),
145 self.view_index.clone(),
146 );
147
148 let bind_addr = ws_config.bind_address;
149 Some(tokio::spawn(
150 async move {
151 if let Err(e) = ws_server.start().await {
152 error!("WebSocket server error: {}", e);
153 }
154 }
155 .instrument(info_span!("ws.server", %bind_addr)),
156 ))
157 } else {
158 None
159 };
160
161 let parser_handle = if let Some(spec) = self.spec {
162 if let Some(parser_setup) = spec.parser_setup {
163 info!(
164 "Starting Vixen parser runtime for program: {}",
165 spec.program_id
166 );
167 let tx = mutations_tx.clone();
168 let health = health_monitor.clone();
169 let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
170 let program_id = spec.program_id.clone();
171 Some(tokio::spawn(
172 async move {
173 if let Err(e) = parser_setup(tx, health, reconnection_config).await {
174 error!("Vixen parser runtime error: {}", e);
175 }
176 }
177 .instrument(info_span!("vixen.parser", %program_id)),
178 ))
179 } else {
180 info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
181 None
182 }
183 } else {
184 info!("No spec provided - running in websocket-only mode");
185 None
186 };
187
188 let _http_health_handle = if let Some(http_health_config) = &self.config.http_health {
193 let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
194 if let Some(monitor) = health_monitor.clone() {
195 http_server = http_server.with_health_monitor(monitor);
196 }
197
198 let bind_addr = http_health_config.bind_address;
199 let join_handle = std::thread::Builder::new()
200 .name("health-server".into())
201 .spawn(move || {
202 let rt = tokio::runtime::Builder::new_current_thread()
203 .enable_all()
204 .build()
205 .expect("Failed to create health server runtime");
206 rt.block_on(async move {
207 let _span = info_span!("http.health", %bind_addr).entered();
208 if let Err(e) = http_server.start().await {
209 error!("HTTP health server error: {}", e);
210 }
211 });
212 })
213 .expect("Failed to spawn health server thread");
214 info!("HTTP health server running on dedicated thread at {}", bind_addr);
215 Some(join_handle)
216 } else {
217 None
218 };
219
220 let bus_cleanup_handle = {
221 let bus = bus_manager.clone();
222 tokio::spawn(
223 async move {
224 let mut interval = tokio::time::interval(Duration::from_secs(60));
225 loop {
226 interval.tick().await;
227 let state_cleaned = bus.cleanup_stale_state_buses().await;
228 let list_cleaned = bus.cleanup_stale_list_buses().await;
229 if state_cleaned > 0 || list_cleaned > 0 {
230 let (state_count, list_count) = bus.bus_counts().await;
231 info!(
232 "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
233 state_cleaned, list_cleaned, state_count, list_count
234 );
235 }
236 }
237 }
238 .instrument(info_span!("bus.cleanup")),
239 )
240 };
241
242 let stats_handle = {
243 let bus = bus_manager.clone();
244 let cache = entity_cache.clone();
245 tokio::spawn(
246 async move {
247 let mut interval = tokio::time::interval(Duration::from_secs(30));
248 loop {
249 interval.tick().await;
250 let (_state_buses, _list_buses) = bus.bus_counts().await;
251 let _cache_stats = cache.stats().await;
252 }
253 }
254 .instrument(info_span!("stats.reporter")),
255 )
256 };
257
258 info!("HyperStack runtime is running. Press Ctrl+C to stop.");
259
260 tokio::select! {
262 _ = async {
263 if let Some(handle) = ws_handle {
264 handle.await
265 } else {
266 std::future::pending().await
267 }
268 } => {
269 info!("WebSocket server task completed");
270 }
271 _ = projector_handle => {
272 info!("Projector task completed");
273 }
274 _ = async {
275 if let Some(handle) = parser_handle {
276 handle.await
277 } else {
278 std::future::pending().await
279 }
280 } => {
281 info!("Parser runtime task completed");
282 }
283 _ = bus_cleanup_handle => {
284 info!("Bus cleanup task completed");
285 }
286 _ = stats_handle => {
287 info!("Stats reporter task completed");
288 }
289 _ = shutdown_signal() => {}
290 }
291
292 info!("Shutting down HyperStack runtime");
293 Ok(())
294 }
295}