hyperstack_server/
runtime.rs1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::projector::Projector;
7use crate::view::ViewIndex;
8use crate::websocket::WebSocketServer;
9use crate::Spec;
10use anyhow::Result;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tracing::{error, info};
14
15#[cfg(feature = "otel")]
16use crate::metrics::Metrics;
17
18async fn shutdown_signal() {
20 let ctrl_c = async {
21 tokio::signal::ctrl_c()
22 .await
23 .expect("Failed to install Ctrl+C handler");
24 };
25
26 #[cfg(unix)]
27 let terminate = async {
28 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
29 .expect("Failed to install SIGTERM handler")
30 .recv()
31 .await;
32 };
33
34 #[cfg(not(unix))]
35 let terminate = std::future::pending::<()>();
36
37 tokio::select! {
38 _ = ctrl_c => {
39 info!("Received SIGINT (Ctrl+C), initiating shutdown");
40 }
41 _ = terminate => {
42 info!("Received SIGTERM, initiating graceful shutdown");
43 }
44 }
45}
46
47pub struct Runtime {
49 config: ServerConfig,
50 view_index: Arc<ViewIndex>,
51 spec: Option<Spec>,
52 #[cfg(feature = "otel")]
53 metrics: Option<Arc<Metrics>>,
54}
55
56impl Runtime {
57 #[cfg(feature = "otel")]
58 pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
59 Self {
60 config,
61 view_index: Arc::new(view_index),
62 spec: None,
63 metrics,
64 }
65 }
66
67 #[cfg(not(feature = "otel"))]
68 pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
69 Self {
70 config,
71 view_index: Arc::new(view_index),
72 spec: None,
73 }
74 }
75
76 pub fn with_spec(mut self, spec: Spec) -> Self {
77 self.spec = Some(spec);
78 self
79 }
80
81 pub async fn run(self) -> Result<()> {
82 info!("Starting HyperStack runtime");
83
84 let (mutations_tx, mutations_rx) =
86 mpsc::channel::<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>(1024);
87
88 let bus_manager = BusManager::new();
89 let entity_cache = EntityCache::new();
90
91 let health_monitor = if let Some(health_config) = &self.config.health {
92 let monitor = HealthMonitor::new(health_config.clone());
93 let _health_task = monitor.start().await;
94 info!("Health monitoring enabled");
95 Some(monitor)
96 } else {
97 None
98 };
99
100 #[cfg(feature = "otel")]
101 let projector = Projector::new(
102 self.view_index.clone(),
103 bus_manager.clone(),
104 entity_cache.clone(),
105 mutations_rx,
106 self.metrics.clone(),
107 );
108 #[cfg(not(feature = "otel"))]
109 let projector = Projector::new(
110 self.view_index.clone(),
111 bus_manager.clone(),
112 entity_cache.clone(),
113 mutations_rx,
114 );
115
116 let projector_handle = tokio::spawn(async move {
117 projector.run().await;
118 });
119
120 let ws_handle = if let Some(ws_config) = &self.config.websocket {
121 #[cfg(feature = "otel")]
122 let ws_server = WebSocketServer::new(
123 ws_config.bind_address,
124 bus_manager.clone(),
125 entity_cache.clone(),
126 self.view_index.clone(),
127 self.metrics.clone(),
128 );
129 #[cfg(not(feature = "otel"))]
130 let ws_server = WebSocketServer::new(
131 ws_config.bind_address,
132 bus_manager.clone(),
133 entity_cache.clone(),
134 self.view_index.clone(),
135 );
136
137 Some(tokio::spawn(async move {
138 if let Err(e) = ws_server.start().await {
139 error!("WebSocket server error: {}", e);
140 }
141 }))
142 } else {
143 None
144 };
145
146 let parser_handle = if let Some(spec) = self.spec {
148 if let Some(parser_setup) = spec.parser_setup {
149 info!(
150 "Starting Vixen parser runtime for program: {}",
151 spec.program_id
152 );
153 let tx = mutations_tx.clone();
154 let health = health_monitor.clone();
155 let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
156 Some(tokio::spawn(async move {
157 if let Err(e) = parser_setup(tx, health, reconnection_config).await {
158 error!("Vixen parser runtime error: {}", e);
159 }
160 }))
161 } else {
162 info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
163 None
164 }
165 } else {
166 info!("No spec provided - running in websocket-only mode");
167 None
168 };
169
170 let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
172 let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
173 if let Some(monitor) = health_monitor.clone() {
174 http_server = http_server.with_health_monitor(monitor);
175 }
176
177 Some(tokio::spawn(async move {
178 if let Err(e) = http_server.start().await {
179 error!("HTTP health server error: {}", e);
180 }
181 }))
182 } else {
183 None
184 };
185
186 info!("HyperStack runtime is running. Press Ctrl+C to stop.");
187
188 tokio::select! {
190 _ = async {
191 if let Some(handle) = ws_handle {
192 handle.await
193 } else {
194 std::future::pending().await
195 }
196 } => {
197 info!("WebSocket server task completed");
198 }
199 _ = projector_handle => {
200 info!("Projector task completed");
201 }
202 _ = async {
203 if let Some(handle) = parser_handle {
204 handle.await
205 } else {
206 std::future::pending().await
207 }
208 } => {
209 info!("Parser runtime task completed");
210 }
211 _ = async {
212 if let Some(handle) = http_health_handle {
213 handle.await
214 } else {
215 std::future::pending().await
216 }
217 } => {
218 info!("HTTP health server task completed");
219 }
220 _ = shutdown_signal() => {}
221 }
222
223 info!("Shutting down HyperStack runtime");
224 Ok(())
225 }
226}