hyperstack_server/
runtime.rs1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::mutation_batch::MutationBatch;
7use crate::projector::Projector;
8use crate::view::ViewIndex;
9use crate::websocket::WebSocketServer;
10use crate::Spec;
11use anyhow::Result;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tracing::{error, info, info_span, Instrument};
16
17#[cfg(feature = "otel")]
18use crate::metrics::Metrics;
19
20async fn shutdown_signal() {
22 let ctrl_c = async {
23 tokio::signal::ctrl_c()
24 .await
25 .expect("Failed to install Ctrl+C handler");
26 };
27
28 #[cfg(unix)]
29 let terminate = async {
30 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
31 .expect("Failed to install SIGTERM handler")
32 .recv()
33 .await;
34 };
35
36 #[cfg(not(unix))]
37 let terminate = std::future::pending::<()>();
38
39 tokio::select! {
40 _ = ctrl_c => {
41 info!("Received SIGINT (Ctrl+C), initiating shutdown");
42 }
43 _ = terminate => {
44 info!("Received SIGTERM, initiating graceful shutdown");
45 }
46 }
47}
48
49pub struct Runtime {
51 config: ServerConfig,
52 view_index: Arc<ViewIndex>,
53 spec: Option<Spec>,
54 #[cfg(feature = "otel")]
55 metrics: Option<Arc<Metrics>>,
56}
57
58impl Runtime {
59 #[cfg(feature = "otel")]
60 pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
61 Self {
62 config,
63 view_index: Arc::new(view_index),
64 spec: None,
65 metrics,
66 }
67 }
68
69 #[cfg(not(feature = "otel"))]
70 pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
71 Self {
72 config,
73 view_index: Arc::new(view_index),
74 spec: None,
75 }
76 }
77
78 pub fn with_spec(mut self, spec: Spec) -> Self {
79 self.spec = Some(spec);
80 self
81 }
82
83 pub async fn run(self) -> Result<()> {
84 info!("Starting HyperStack runtime");
85
86 let (mutations_tx, mutations_rx) = mpsc::channel::<MutationBatch>(1024);
87
88 let bus_manager = BusManager::new();
89 let entity_cache = EntityCache::new();
90
91 let health_monitor = if let Some(health_config) = &self.config.health {
92 let monitor = HealthMonitor::new(health_config.clone());
93 let _health_task = monitor.start().await;
94 info!("Health monitoring enabled");
95 Some(monitor)
96 } else {
97 None
98 };
99
100 #[cfg(feature = "otel")]
101 let projector = Projector::new(
102 self.view_index.clone(),
103 bus_manager.clone(),
104 entity_cache.clone(),
105 mutations_rx,
106 self.metrics.clone(),
107 );
108 #[cfg(not(feature = "otel"))]
109 let projector = Projector::new(
110 self.view_index.clone(),
111 bus_manager.clone(),
112 entity_cache.clone(),
113 mutations_rx,
114 );
115
116 let projector_handle = tokio::spawn(
117 async move {
118 projector.run().await;
119 }
120 .instrument(info_span!("projector")),
121 );
122
123 let ws_handle = if let Some(ws_config) = &self.config.websocket {
124 #[cfg(feature = "otel")]
125 let ws_server = WebSocketServer::new(
126 ws_config.bind_address,
127 bus_manager.clone(),
128 entity_cache.clone(),
129 self.view_index.clone(),
130 self.metrics.clone(),
131 );
132 #[cfg(not(feature = "otel"))]
133 let ws_server = WebSocketServer::new(
134 ws_config.bind_address,
135 bus_manager.clone(),
136 entity_cache.clone(),
137 self.view_index.clone(),
138 );
139
140 let bind_addr = ws_config.bind_address;
141 Some(tokio::spawn(
142 async move {
143 if let Err(e) = ws_server.start().await {
144 error!("WebSocket server error: {}", e);
145 }
146 }
147 .instrument(info_span!("ws.server", %bind_addr)),
148 ))
149 } else {
150 None
151 };
152
153 let parser_handle = if let Some(spec) = self.spec {
154 if let Some(parser_setup) = spec.parser_setup {
155 info!(
156 "Starting Vixen parser runtime for program: {}",
157 spec.program_id
158 );
159 let tx = mutations_tx.clone();
160 let health = health_monitor.clone();
161 let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
162 let program_id = spec.program_id.clone();
163 Some(tokio::spawn(
164 async move {
165 if let Err(e) = parser_setup(tx, health, reconnection_config).await {
166 error!("Vixen parser runtime error: {}", e);
167 }
168 }
169 .instrument(info_span!("vixen.parser", %program_id)),
170 ))
171 } else {
172 info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
173 None
174 }
175 } else {
176 info!("No spec provided - running in websocket-only mode");
177 None
178 };
179
180 let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
181 let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
182 if let Some(monitor) = health_monitor.clone() {
183 http_server = http_server.with_health_monitor(monitor);
184 }
185
186 let bind_addr = http_health_config.bind_address;
187 Some(tokio::spawn(
188 async move {
189 if let Err(e) = http_server.start().await {
190 error!("HTTP health server error: {}", e);
191 }
192 }
193 .instrument(info_span!("http.health", %bind_addr)),
194 ))
195 } else {
196 None
197 };
198
199 let bus_cleanup_handle = {
200 let bus = bus_manager.clone();
201 tokio::spawn(
202 async move {
203 let mut interval = tokio::time::interval(Duration::from_secs(60));
204 loop {
205 interval.tick().await;
206 let state_cleaned = bus.cleanup_stale_state_buses().await;
207 let list_cleaned = bus.cleanup_stale_list_buses().await;
208 if state_cleaned > 0 || list_cleaned > 0 {
209 let (state_count, list_count) = bus.bus_counts().await;
210 info!(
211 "Bus cleanup: removed {} state, {} list buses. Current: {} state, {} list",
212 state_cleaned, list_cleaned, state_count, list_count
213 );
214 }
215 }
216 }
217 .instrument(info_span!("bus.cleanup")),
218 )
219 };
220
221 let stats_handle = {
222 let bus = bus_manager.clone();
223 let cache = entity_cache.clone();
224 tokio::spawn(
225 async move {
226 let mut interval = tokio::time::interval(Duration::from_secs(30));
227 loop {
228 interval.tick().await;
229 let (_state_buses, _list_buses) = bus.bus_counts().await;
230 let _cache_stats = cache.stats().await;
231 }
232 }
233 .instrument(info_span!("stats.reporter")),
234 )
235 };
236
237 info!("HyperStack runtime is running. Press Ctrl+C to stop.");
238
239 tokio::select! {
241 _ = async {
242 if let Some(handle) = ws_handle {
243 handle.await
244 } else {
245 std::future::pending().await
246 }
247 } => {
248 info!("WebSocket server task completed");
249 }
250 _ = projector_handle => {
251 info!("Projector task completed");
252 }
253 _ = async {
254 if let Some(handle) = parser_handle {
255 handle.await
256 } else {
257 std::future::pending().await
258 }
259 } => {
260 info!("Parser runtime task completed");
261 }
262 _ = async {
263 if let Some(handle) = http_health_handle {
264 handle.await
265 } else {
266 std::future::pending().await
267 }
268 } => {
269 info!("HTTP health server task completed");
270 }
271 _ = bus_cleanup_handle => {
272 info!("Bus cleanup task completed");
273 }
274 _ = stats_handle => {
275 info!("Stats reporter task completed");
276 }
277 _ = shutdown_signal() => {}
278 }
279
280 info!("Shutting down HyperStack runtime");
281 Ok(())
282 }
283}