hyperstack_server/
runtime.rs1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::materialized_view::MaterializedViewRegistry;
7use crate::mutation_batch::MutationBatch;
8use crate::projector::Projector;
9use crate::view::ViewIndex;
10use crate::websocket::WebSocketServer;
11use crate::Spec;
12use anyhow::Result;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tracing::{error, info, info_span, Instrument};
17
18#[cfg(feature = "otel")]
19use crate::metrics::Metrics;
20
21async fn shutdown_signal() {
23 let ctrl_c = async {
24 tokio::signal::ctrl_c()
25 .await
26 .expect("Failed to install Ctrl+C handler");
27 };
28
29 #[cfg(unix)]
30 let terminate = async {
31 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
32 .expect("Failed to install SIGTERM handler")
33 .recv()
34 .await;
35 };
36
37 #[cfg(not(unix))]
38 let terminate = std::future::pending::<()>();
39
40 tokio::select! {
41 _ = ctrl_c => {
42 info!("Received SIGINT (Ctrl+C), initiating shutdown");
43 }
44 _ = terminate => {
45 info!("Received SIGTERM, initiating graceful shutdown");
46 }
47 }
48}
49
50pub struct Runtime {
51 config: ServerConfig,
52 view_index: Arc<ViewIndex>,
53 spec: Option<Spec>,
54 materialized_views: Option<MaterializedViewRegistry>,
55 #[cfg(feature = "otel")]
56 metrics: Option<Arc<Metrics>>,
57}
58
59impl Runtime {
60 #[cfg(feature = "otel")]
61 pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
62 Self {
63 config,
64 view_index: Arc::new(view_index),
65 spec: None,
66 materialized_views: None,
67 metrics,
68 }
69 }
70
71 #[cfg(not(feature = "otel"))]
72 pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
73 Self {
74 config,
75 view_index: Arc::new(view_index),
76 spec: None,
77 materialized_views: None,
78 }
79 }
80
81 pub fn with_spec(mut self, spec: Spec) -> Self {
82 self.spec = Some(spec);
83 self
84 }
85
86 pub fn with_materialized_views(mut self, registry: MaterializedViewRegistry) -> Self {
87 self.materialized_views = Some(registry);
88 self
89 }
90
91 pub async fn run(self) -> Result<()> {
92 info!("Starting HyperStack runtime");
93
94 let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
95
96 let bus_manager = BusManager::new();
97 let entity_cache = EntityCache::new();
98
99 let health_monitor = if let Some(health_config) = &self.config.health {
100 let monitor = HealthMonitor::new(health_config.clone());
101 let _health_task = monitor.start().await;
102 info!("Health monitoring enabled");
103 Some(monitor)
104 } else {
105 None
106 };
107
108 #[cfg(feature = "otel")]
109 let projector = Projector::new(
110 self.view_index.clone(),
111 bus_manager.clone(),
112 entity_cache.clone(),
113 mutations_rx,
114 self.metrics.clone(),
115 );
116 #[cfg(not(feature = "otel"))]
117 let projector = Projector::new(
118 self.view_index.clone(),
119 bus_manager.clone(),
120 entity_cache.clone(),
121 mutations_rx,
122 );
123
124 let projector_handle = tokio::spawn(
125 async move {
126 projector.run().await;
127 }
128 .instrument(info_span!("projector")),
129 );
130
131 let ws_handle = if let Some(ws_config) = &self.config.websocket {
132 #[cfg(feature = "otel")]
133 let ws_server = WebSocketServer::new(
134 ws_config.bind_address,
135 bus_manager.clone(),
136 entity_cache.clone(),
137 self.view_index.clone(),
138 self.metrics.clone(),
139 );
140 #[cfg(not(feature = "otel"))]
141 let ws_server = WebSocketServer::new(
142 ws_config.bind_address,
143 bus_manager.clone(),
144 entity_cache.clone(),
145 self.view_index.clone(),
146 );
147
148 let bind_addr = ws_config.bind_address;
149 Some(tokio::spawn(
150 async move {
151 if let Err(e) = ws_server.start().await {
152 error!("WebSocket server error: {}", e);
153 }
154 }
155 .instrument(info_span!("ws.server", %bind_addr)),
156 ))
157 } else {
158 None
159 };
160
161 let parser_handle = if let Some(spec) = self.spec {
162 if let Some(parser_setup) = spec.parser_setup {
163 info!(
164 "Starting Vixen parser runtime for program: {}",
165 spec.program_id
166 );
167 let tx = mutations_tx.clone();
168 let health = health_monitor.clone();
169 let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
170 let program_id = spec.program_id.clone();
171 Some(tokio::spawn(
172 async move {
173 if let Err(e) = parser_setup(tx, health, reconnection_config).await {
174 error!("Vixen parser runtime error: {}", e);
175 }
176 }
177 .instrument(info_span!("vixen.parser", %program_id)),
178 ))
179 } else {
180 info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
181 None
182 }
183 } else {
184 info!("No spec provided - running in websocket-only mode");
185 None
186 };
187
188 let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
189 let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
190 if let Some(monitor) = health_monitor.clone() {
191 http_server = http_server.with_health_monitor(monitor);
192 }
193
194 let bind_addr = http_health_config.bind_address;
195 Some(tokio::spawn(
196 async move {
197 if let Err(e) = http_server.start().await {
198 error!("HTTP health server error: {}", e);
199 }
200 }
201 .instrument(info_span!("http.health", %bind_addr)),
202 ))
203 } else {
204 None
205 };
206
207 let bus_cleanup_handle = {
208 let bus = bus_manager.clone();
209 tokio::spawn(
210 async move {
211 let mut interval = tokio::time::interval(Duration::from_secs(60));
212 loop {
213 interval.tick().await;
214 let state_cleaned = bus.cleanup_stale_state_buses().await;
215 let list_cleaned = bus.cleanup_stale_list_buses().await;
216 if state_cleaned > 0 || list_cleaned > 0 {
217 let (state_count, list_count) = bus.bus_counts().await;
218 info!(
219 "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
220 state_cleaned, list_cleaned, state_count, list_count
221 );
222 }
223 }
224 }
225 .instrument(info_span!("bus.cleanup")),
226 )
227 };
228
229 let stats_handle = {
230 let bus = bus_manager.clone();
231 let cache = entity_cache.clone();
232 tokio::spawn(
233 async move {
234 let mut interval = tokio::time::interval(Duration::from_secs(30));
235 loop {
236 interval.tick().await;
237 let (_state_buses, _list_buses) = bus.bus_counts().await;
238 let _cache_stats = cache.stats().await;
239 }
240 }
241 .instrument(info_span!("stats.reporter")),
242 )
243 };
244
245 info!("HyperStack runtime is running. Press Ctrl+C to stop.");
246
247 tokio::select! {
249 _ = async {
250 if let Some(handle) = ws_handle {
251 handle.await
252 } else {
253 std::future::pending().await
254 }
255 } => {
256 info!("WebSocket server task completed");
257 }
258 _ = projector_handle => {
259 info!("Projector task completed");
260 }
261 _ = async {
262 if let Some(handle) = parser_handle {
263 handle.await
264 } else {
265 std::future::pending().await
266 }
267 } => {
268 info!("Parser runtime task completed");
269 }
270 _ = async {
271 if let Some(handle) = http_health_handle {
272 handle.await
273 } else {
274 std::future::pending().await
275 }
276 } => {
277 info!("HTTP health server task completed");
278 }
279 _ = bus_cleanup_handle => {
280 info!("Bus cleanup task completed");
281 }
282 _ = stats_handle => {
283 info!("Stats reporter task completed");
284 }
285 _ = shutdown_signal() => {}
286 }
287
288 info!("Shutting down HyperStack runtime");
289 Ok(())
290 }
291}