hyperstack_server/
runtime.rs

1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::config::ServerConfig;
4use crate::health::HealthMonitor;
5use crate::http_health::HttpHealthServer;
6use crate::projector::Projector;
7use crate::view::ViewIndex;
8use crate::websocket::WebSocketServer;
9use crate::Spec;
10use anyhow::Result;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tracing::{error, info};
14
15#[cfg(feature = "otel")]
16use crate::metrics::Metrics;
17
18/// Wait for shutdown signal (SIGINT on all platforms, SIGTERM on Unix)
19async fn shutdown_signal() {
20    let ctrl_c = async {
21        tokio::signal::ctrl_c()
22            .await
23            .expect("Failed to install Ctrl+C handler");
24    };
25
26    #[cfg(unix)]
27    let terminate = async {
28        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
29            .expect("Failed to install SIGTERM handler")
30            .recv()
31            .await;
32    };
33
34    #[cfg(not(unix))]
35    let terminate = std::future::pending::<()>();
36
37    tokio::select! {
38        _ = ctrl_c => {
39            info!("Received SIGINT (Ctrl+C), initiating shutdown");
40        }
41        _ = terminate => {
42            info!("Received SIGTERM, initiating graceful shutdown");
43        }
44    }
45}
46
47/// Runtime orchestrator that manages all server components
48pub struct Runtime {
49    config: ServerConfig,
50    view_index: Arc<ViewIndex>,
51    spec: Option<Spec>,
52    #[cfg(feature = "otel")]
53    metrics: Option<Arc<Metrics>>,
54}
55
56impl Runtime {
57    #[cfg(feature = "otel")]
58    pub fn new(config: ServerConfig, view_index: ViewIndex, metrics: Option<Arc<Metrics>>) -> Self {
59        Self {
60            config,
61            view_index: Arc::new(view_index),
62            spec: None,
63            metrics,
64        }
65    }
66
67    #[cfg(not(feature = "otel"))]
68    pub fn new(config: ServerConfig, view_index: ViewIndex) -> Self {
69        Self {
70            config,
71            view_index: Arc::new(view_index),
72            spec: None,
73        }
74    }
75
76    pub fn with_spec(mut self, spec: Spec) -> Self {
77        self.spec = Some(spec);
78        self
79    }
80
81    pub async fn run(self) -> Result<()> {
82        info!("Starting HyperStack runtime");
83
84        // Create bounded mutations channel for Transform Library -> Projector communication
85        let (mutations_tx, mutations_rx) =
86            mpsc::channel::<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>(1024);
87
88        let bus_manager = BusManager::new();
89        let entity_cache = EntityCache::new();
90
91        let health_monitor = if let Some(health_config) = &self.config.health {
92            let monitor = HealthMonitor::new(health_config.clone());
93            let _health_task = monitor.start().await;
94            info!("Health monitoring enabled");
95            Some(monitor)
96        } else {
97            None
98        };
99
100        #[cfg(feature = "otel")]
101        let projector = Projector::new(
102            self.view_index.clone(),
103            bus_manager.clone(),
104            entity_cache.clone(),
105            mutations_rx,
106            self.metrics.clone(),
107        );
108        #[cfg(not(feature = "otel"))]
109        let projector = Projector::new(
110            self.view_index.clone(),
111            bus_manager.clone(),
112            entity_cache.clone(),
113            mutations_rx,
114        );
115
116        let projector_handle = tokio::spawn(async move {
117            projector.run().await;
118        });
119
120        let ws_handle = if let Some(ws_config) = &self.config.websocket {
121            #[cfg(feature = "otel")]
122            let ws_server = WebSocketServer::new(
123                ws_config.bind_address,
124                bus_manager.clone(),
125                entity_cache.clone(),
126                self.view_index.clone(),
127                self.metrics.clone(),
128            );
129            #[cfg(not(feature = "otel"))]
130            let ws_server = WebSocketServer::new(
131                ws_config.bind_address,
132                bus_manager.clone(),
133                entity_cache.clone(),
134                self.view_index.clone(),
135            );
136
137            Some(tokio::spawn(async move {
138                if let Err(e) = ws_server.start().await {
139                    error!("WebSocket server error: {}", e);
140                }
141            }))
142        } else {
143            None
144        };
145
146        // Start Vixen parser/stream consumer (if spec with parser setup is provided)
147        let parser_handle = if let Some(spec) = self.spec {
148            if let Some(parser_setup) = spec.parser_setup {
149                info!(
150                    "Starting Vixen parser runtime for program: {}",
151                    spec.program_id
152                );
153                let tx = mutations_tx.clone();
154                let health = health_monitor.clone();
155                Some(tokio::spawn(async move {
156                    if let Err(e) = parser_setup(tx, health).await {
157                        error!("Vixen parser runtime error: {}", e);
158                    }
159                }))
160            } else {
161                info!("Spec provided but no parser_setup configured - skipping Vixen runtime");
162                None
163            }
164        } else {
165            info!("No spec provided - running in websocket-only mode");
166            None
167        };
168
169        // Start HTTP health server (if configured)
170        let http_health_handle = if let Some(http_health_config) = &self.config.http_health {
171            let mut http_server = HttpHealthServer::new(http_health_config.bind_address);
172            if let Some(monitor) = health_monitor.clone() {
173                http_server = http_server.with_health_monitor(monitor);
174            }
175
176            Some(tokio::spawn(async move {
177                if let Err(e) = http_server.start().await {
178                    error!("HTTP health server error: {}", e);
179                }
180            }))
181        } else {
182            None
183        };
184
185        info!("HyperStack runtime is running. Press Ctrl+C to stop.");
186
187        // Wait for any task to complete (or handle shutdown signals)
188        tokio::select! {
189            _ = async {
190                if let Some(handle) = ws_handle {
191                    handle.await
192                } else {
193                    std::future::pending().await
194                }
195            } => {
196                info!("WebSocket server task completed");
197            }
198            _ = projector_handle => {
199                info!("Projector task completed");
200            }
201            _ = async {
202                if let Some(handle) = parser_handle {
203                    handle.await
204                } else {
205                    std::future::pending().await
206                }
207            } => {
208                info!("Parser runtime task completed");
209            }
210            _ = async {
211                if let Some(handle) = http_health_handle {
212                    handle.await
213                } else {
214                    std::future::pending().await
215                }
216            } => {
217                info!("HTTP health server task completed");
218            }
219            _ = shutdown_signal() => {}
220        }
221
222        info!("Shutting down HyperStack runtime");
223        Ok(())
224    }
225}