hyperstack_server/
runtime.rs

1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::projector::Projector;
7use crate::view::ViewIndex;
8use crate::websocket::WebSocketServer;
9use crate::Spec;
10use anyhow::Result;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tracing::{error, info};
14
15#[cfg(feature = "otel")]
16use crate::metrics::Metrics;
17
18/// Wait for shutdown signal (SIGINT on all platforms, SIGTERM on Unix)
19async fn shutdown_signal() {
20    let ctrl_c = async {
21        tokio::signal::ctrl_c()
22            .await
23            .expect("Failed to install Ctrl+C handler");
24    };
25
26    #[cfg(unix)]
27    let terminate = async {
28        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
29            .expect("Failed to install SIGTERM handler")
30            .recv()
31            .await;
32    };
33
34    #[cfg(not(unix))]
35    let terminate = std::future::pending::<()>();
36
37    tokio::select! {
38        _ = ctrl_c => {
39            info!("Received SIGINT (Ctrl+C), initiating shutdown");
40        }
41        _ = terminate => {
42            info!("Received SIGTERM, initiating graceful shutdown");
43        }
44    }
45}
46
47/// Runtime orchestrator that manages all server components
48pub struct Runtime {
49    config: ServerConfig,
50    view_index: Arc<ViewIndex>,
51    spec: Option<Spec>,
52    #[cfg(feature = "otel")]
53    metrics: Option<Arc<Metrics>>,
54}
55
56impl Runtime {
57    #[cfg(feature = "otel")]
58    pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
59        Self {
60            config,
61            view_index: Arc::new(view_index),
62            spec: None,
63            metrics,
64        }
65    }
66
67    #[cfg(not(feature = "otel"))]
68    pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
69        Self {
70            config,
71            view_index: Arc::new(view_index),
72            spec: None,
73        }
74    }
75
76    pub fn with_spec(mut self, spec: Spec) -> Self {
77        self.spec = Some(spec);
78        self
79    }
80
81    pub async fn run(self) -> Result<()> {
82        info!("Starting HyperStack runtime");
83
84        // Create bounded mutations channel for Transform Library -> Projector communication
85        let (mutations_tx, mutations_rx) =
86            mpsc::channel::<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>(1024);
87
88        let bus_manager = BusManager::new();
89        let entity_cache = EntityCache::new();
90
91        let health_monitor = if let Some(health_config) = &self.config.health {
92            let monitor = HealthMonitor::new(health_config.clone());
93            let _health_task = monitor.start().await;
94            info!("Health monitoring enabled");
95            Some(monitor)
96        } else {
97            None
98        };
99
100        #[cfg(feature = "otel")]
101        let projector = Projector::new(
102            self.view_index.clone(),
103            bus_manager.clone(),
104            entity_cache.clone(),
105            mutations_rx,
106            self.metrics.clone(),
107        );
108        #[cfg(not(feature = "otel"))]
109        let projector = Projector::new(
110            self.view_index.clone(),
111            bus_manager.clone(),
112            entity_cache.clone(),
113            mutations_rx,
114        );
115
116        let projector_handle = tokio::spawn(async move {
117            projector.run().await;
118        });
119
120        let ws_handle = if let Some(ws_config) = &self.config.websocket {
121            #[cfg(feature = "otel")]
122            let ws_server = WebSocketServer::new(
123                ws_config.bind_address,
124                bus_manager.clone(),
125                entity_cache.clone(),
126                self.view_index.clone(),
127                self.metrics.clone(),
128            );
129            #[cfg(not(feature = "otel"))]
130            let ws_server = WebSocketServer::new(
131                ws_config.bind_address,
132                bus_manager.clone(),
133                entity_cache.clone(),
134                self.view_index.clone(),
135            );
136
137            Some(tokio::spawn(async move {
138                if let Err(e) = ws_server.start().await {
139                    error!("WebSocket server error: {}", e);
140                }
141            }))
142        } else {
143            None
144        };
145
146        // Start Vixen parser/stream consumer (if spec with parser setup is provided)
147        let parser_handle = if let Some(spec) = self.spec {
148            if let Some(parser_setup) = spec.parser_setup {
149                info!(
150                    "Starting Vixen parser runtime for program: {}",
151                    spec.program_id
152                );
153                let tx = mutations_tx.clone();
154                let health = health_monitor.clone();
155                let reconnection_config = self.config.reconnection.clone().unwrap_or_default();
156                Some(tokio::spawn(async move {
157                    if let Err(e) = parser_setup(tx, health, reconnection_config).await {
158                        error!("Vixen parser runtime error: {}", e);
159                    }
160                }))
161            } else {
162                info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
163                None
164            }
165        } else {
166            info!("No spec provided - running in websocket-only mode");
167            None
168        };
169
170        // Start HTTP health server (if configured)
171        let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
172            let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
173            if let Some(monitor) = health_monitor.clone() {
174                http_server = http_server.with_health_monitor(monitor);
175            }
176
177            Some(tokio::spawn(async move {
178                if let Err(e) = http_server.start().await {
179                    error!("HTTP health server error: {}", e);
180                }
181            }))
182        } else {
183            None
184        };
185
186        info!("HyperStack runtime is running. Press Ctrl+C to stop.");
187
188        // Wait for any task to complete (or handle shutdown signals)
189        tokio::select! {
190            _ = async {
191                if let Some(handle) = ws_handle {
192                    handle.await
193                } else {
194                    std::future::pending().await
195                }
196            } => {
197                info!("WebSocket server task completed");
198            }
199            _ = projector_handle => {
200                info!("Projector task completed");
201            }
202            _ = async {
203                if let Some(handle) = parser_handle {
204                    handle.await
205                } else {
206                    std::future::pending().await
207                }
208            } => {
209                info!("Parser runtime task completed");
210            }
211            _ = async {
212                if let Some(handle) = http_health_handle {
213                    handle.await
214                } else {
215                    std::future::pending().await
216                }
217            } => {
218                info!("HTTP health server task completed");
219            }
220            _ = shutdown_signal() => {}
221        }
222
223        info!("Shutting down HyperStack runtime");
224        Ok(())
225    }
226}