hyperstack_server/
runtime.rs1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::projector::Projector;
7use crate::view::ViewIndex;
8use crate::websocket::WebSocketServer;
9use crate::Spec;
10use anyhow::Result;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tracing::{error, info};
14
15#[cfg(feature = "otel")]
16use crate::metrics::Metrics;
17
18async fn shutdown_signal() {
20 let ctrl_c = async {
21 tokio::signal::ctrl_c()
22 .await
23 .expect("Failed to install Ctrl+C handler");
24 };
25
26 #[cfg(unix)]
27 let terminate = async {
28 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
29 .expect("Failed to install SIGTERM handler")
30 .recv()
31 .await;
32 };
33
34 #[cfg(not(unix))]
35 let terminate = std::future::pending::<()>();
36
37 tokio::select! {
38 _ = ctrl_c => {
39 info!("Received SIGINT (Ctrl+C), initiating shutdown");
40 }
41 _ = terminate => {
42 info!("Received SIGTERM, initiating graceful shutdown");
43 }
44 }
45}
46
47pub struct Runtime {
49 config: ServerConfig,
50 view_index: Arc<ViewIndex>,
51 spec: Option<Spec>,
52 #[cfg(feature = "otel")]
53 metrics: Option<Arc<Metrics>>,
54}
55
56impl Runtime {
57 #[cfg(feature = "otel")]
58 pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
59 Self {
60 config,
61 view_index: Arc::new(view_index),
62 spec: None,
63 metrics,
64 }
65 }
66
67 #[cfg(not(feature = "otel"))]
68 pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
69 Self {
70 config,
71 view_index: Arc::new(view_index),
72 spec: None,
73 }
74 }
75
76 pub fn with_spec(mut self, spec: Spec) -> Self {
77 self.spec = Some(spec);
78 self
79 }
80
81 pub async fn run(self) -> Result<()> {
82 info!("Starting HyperStack runtime");
83
84 let (mutations_tx, mutations_rx) =
86 mpsc::channel::<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>(1024);
87
88 let bus_manager = BusManager::new();
89 let entity_cache = EntityCache::new();
90
91 let health_monitor = if let Some(health_config) = &self.config.health {
92 let monitor = HealthMonitor::new(health_config.clone());
93 let _health_task = monitor.start().await;
94 info!("Health monitoring enabled");
95 Some(monitor)
96 } else {
97 None
98 };
99
100 #[cfg(feature = "otel")]
101 let projector = Projector::new(
102 self.view_index.clone(),
103 bus_manager.clone(),
104 entity_cache.clone(),
105 mutations_rx,
106 self.metrics.clone(),
107 );
108 #[cfg(not(feature = "otel"))]
109 let projector = Projector::new(
110 self.view_index.clone(),
111 bus_manager.clone(),
112 entity_cache.clone(),
113 mutations_rx,
114 );
115
116 let projector_handle = tokio::spawn(async move {
117 projector.run().await;
118 });
119
120 let ws_handle = if let Some(ws_config) = &self.config.websocket {
121 #[cfg(feature = "otel")]
122 let ws_server = WebSocketServer::new(
123 ws_config.bind_address,
124 bus_manager.clone(),
125 entity_cache.clone(),
126 self.view_index.clone(),
127 self.metrics.clone(),
128 );
129 #[cfg(not(feature = "otel"))]
130 let ws_server = WebSocketServer::new(
131 ws_config.bind_address,
132 bus_manager.clone(),
133 entity_cache.clone(),
134 self.view_index.clone(),
135 );
136
137 Some(tokio::spawn(async move {
138 if let Err(e) = ws_server.start().await {
139 error!("WebSocket server error: {}", e);
140 }
141 }))
142 } else {
143 None
144 };
145
146 let parser_handle = if let Some(spec) = self.spec {
148 if let Some(parser_setup) = spec.parser_setup {
149 info!(
150 "Starting Vixen parser runtime for program: {}",
151 spec.program_id
152 );
153 let tx = mutations_tx.clone();
154 let health = health_monitor.clone();
155 Some(tokio::spawn(async move {
156 if let Err(e) = parser_setup(tx, health).await {
157 error!("Vixen parser runtime error: {}", e);
158 }
159 }))
160 } else {
161 info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
162 None
163 }
164 } else {
165 info!("No spec provided - running in websocket-only mode");
166 None
167 };
168
169 let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
171 let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
172 if let Some(monitor) = health_monitor.clone() {
173 http_server = http_server.with_health_monitor(monitor);
174 }
175
176 Some(tokio::spawn(async move {
177 if let Err(e) = http_server.start().await {
178 error!("HTTP health server error: {}", e);
179 }
180 }))
181 } else {
182 None
183 };
184
185 info!("HyperStack runtime is running. Press Ctrl+C to stop.");
186
187 tokio::select! {
189 _ = async {
190 if let Some(handle) = ws_handle {
191 handle.await
192 } else {
193 std::future::pending().await
194 }
195 } => {
196 info!("WebSocket server task completed");
197 }
198 _ = projector_handle => {
199 info!("Projector task completed");
200 }
201 _ = async {
202 if let Some(handle) = parser_handle {
203 handle.await
204 } else {
205 std::future::pending().await
206 }
207 } => {
208 info!("Parser runtime task completed");
209 }
210 _ = async {
211 if let Some(handle) = http_health_handle {
212 handle.await
213 } else {
214 std::future::pending().await
215 }
216 } => {
217 info!("HTTP health server task completed");
218 }
219 _ = shutdown_signal() => {}
220 }
221
222 info!("Shutting down HyperStack runtime");
223 Ok(())
224 }
225}