hyperstack_server/
lib.rs

1//! # hyperstack-server
2//!
3//! WebSocket server and projection handlers for HyperStack streaming pipelines.
4//!
5//! This crate provides a builder API for creating HyperStack servers that:
6//!
7//! - Process Solana blockchain data via Yellowstone gRPC
8//! - Transform data using the HyperStack VM
9//! - Stream entity updates over WebSockets to connected clients
10//! - Support multiple streaming modes (State, List, Append)
11//!
12//! ## Quick Start
13//!
14//! ```rust,ignore
15//! use hyperstack_server::{Server, Spec};
16//!
17//! #[tokio::main]
18//! async fn main() -> anyhow::Result<()> {
19//!     Server::builder()
20//!         .spec(my_spec())
21//!         .websocket()
22//!         .bind("[::]:8877".parse()?)
23//!         .health_monitoring()
24//!         .start()
25//!         .await
26//! }
27//! ```
28//!
29//! ## Feature Flags
30//!
31//! - `otel` - OpenTelemetry integration for metrics and distributed tracing
32
33pub mod bus;
34pub mod cache;
35pub mod config;
36pub mod health;
37pub mod http_health;
38#[cfg(feature = "otel")]
39pub mod metrics;
40pub mod projector;
41pub mod runtime;
42pub mod view;
43pub mod websocket;
44
45pub use bus::{BusManager, BusMessage};
46pub use cache::{EntityCache, EntityCacheConfig};
47pub use config::{
48    HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
49    YellowstoneConfig,
50};
51pub use health::{HealthMonitor, SlotTracker, StreamStatus};
52pub use http_health::HttpHealthServer;
53#[cfg(feature = "otel")]
54pub use metrics::Metrics;
55pub use projector::Projector;
56pub use runtime::Runtime;
57pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
58pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
59
60use anyhow::Result;
61use std::net::SocketAddr;
62use std::sync::Arc;
63
64/// Type alias for a parser setup function.
65/// Receives mutations sender, optional health monitor, and reconnection config.
66pub type ParserSetupFn = Arc<
67    dyn Fn(
68            tokio::sync::mpsc::Sender<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
69            Option<HealthMonitor>,
70            ReconnectionConfig,
71        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
72        + Send
73        + Sync,
74>;
75
76/// Specification for a HyperStack server
77/// Contains bytecode, parsers, and program information
78pub struct Spec {
79    pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
80    pub program_id: String,
81    pub parser_setup: Option<ParserSetupFn>,
82}
83
84impl Spec {
85    pub fn new(
86        bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
87        program_id: impl Into<String>,
88    ) -> Self {
89        Self {
90            bytecode,
91            program_id: program_id.into(),
92            parser_setup: None,
93        }
94    }
95
96    /// Add a parser setup function that will configure Vixen parsers
97    pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
98        self.parser_setup = Some(setup_fn);
99        self
100    }
101}
102
103/// Main server interface with fluent builder API
104pub struct Server;
105
106impl Server {
107    /// Create a new server builder
108    pub fn builder() -> ServerBuilder {
109        ServerBuilder::new()
110    }
111}
112
113/// Builder for configuring and creating a HyperStack server
114pub struct ServerBuilder {
115    spec: Option<Spec>,
116    views: Option<ViewIndex>,
117    config: ServerConfig,
118    #[cfg(feature = "otel")]
119    metrics: Option<Arc<Metrics>>,
120}
121
122impl ServerBuilder {
123    fn new() -> Self {
124        Self {
125            spec: None,
126            views: None,
127            config: ServerConfig::new(),
128            #[cfg(feature = "otel")]
129            metrics: None,
130        }
131    }
132
133    /// Set the specification (bytecode, parsers, program_id)
134    pub fn spec(mut self, spec: Spec) -> Self {
135        self.spec = Some(spec);
136        self
137    }
138
139    /// Set custom view index
140    pub fn views(mut self, views: ViewIndex) -> Self {
141        self.views = Some(views);
142        self
143    }
144
145    /// Enable metrics collection (requires 'otel' feature)
146    #[cfg(feature = "otel")]
147    pub fn metrics(mut self, metrics: Metrics) -> Self {
148        self.metrics = Some(Arc::new(metrics));
149        self
150    }
151
152    /// Enable WebSocket server with default configuration
153    pub fn websocket(mut self) -> Self {
154        self.config.websocket = Some(WebSocketConfig::default());
155        self
156    }
157
158    /// Configure WebSocket server
159    pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
160        self.config.websocket = Some(config);
161        self
162    }
163
164    /// Set the bind address for WebSocket server
165    pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
166        if let Some(ws_config) = &mut self.config.websocket {
167            ws_config.bind_address = addr.into();
168        } else {
169            self.config.websocket = Some(WebSocketConfig::new(addr.into()));
170        }
171        self
172    }
173
174    /// Configure Yellowstone gRPC connection
175    pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
176        self.config.yellowstone = Some(config);
177        self
178    }
179
180    /// Enable health monitoring with default configuration
181    pub fn health_monitoring(mut self) -> Self {
182        self.config.health = Some(HealthConfig::default());
183        self
184    }
185
186    /// Configure health monitoring
187    pub fn health_config(mut self, config: HealthConfig) -> Self {
188        self.config.health = Some(config);
189        self
190    }
191
192    /// Enable reconnection with default configuration
193    pub fn reconnection(mut self) -> Self {
194        self.config.reconnection = Some(ReconnectionConfig::default());
195        self
196    }
197
198    /// Configure reconnection behavior
199    pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
200        self.config.reconnection = Some(config);
201        self
202    }
203
204    /// Enable HTTP health server with default configuration (port 8081)
205    pub fn http_health(mut self) -> Self {
206        self.config.http_health = Some(HttpHealthConfig::default());
207        self
208    }
209
210    /// Configure HTTP health server
211    pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
212        self.config.http_health = Some(config);
213        self
214    }
215
216    /// Set the bind address for HTTP health server
217    pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
218        if let Some(http_config) = &mut self.config.http_health {
219            http_config.bind_address = addr.into();
220        } else {
221            self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
222        }
223        self
224    }
225
226    /// Start the server (consumes the builder)
227    pub async fn start(self) -> Result<()> {
228        // Use provided views or create default views from spec
229        let view_index = self.views.unwrap_or_else(|| {
230            let mut index = ViewIndex::new();
231
232            if let Some(ref spec) = self.spec {
233                for entity_name in spec.bytecode.entities.keys() {
234                    index.add_spec(ViewSpec {
235                        id: format!("{}/list", entity_name),
236                        export: entity_name.clone(),
237                        mode: Mode::List,
238                        projection: Projection::all(),
239                        filters: Filters::all(),
240                        delivery: Delivery::default(),
241                    });
242
243                    index.add_spec(ViewSpec {
244                        id: format!("{}/state", entity_name),
245                        export: entity_name.clone(),
246                        mode: Mode::State,
247                        projection: Projection::all(),
248                        filters: Filters::all(),
249                        delivery: Delivery::default(),
250                    });
251
252                    index.add_spec(ViewSpec {
253                        id: format!("{}/append", entity_name),
254                        export: entity_name.clone(),
255                        mode: Mode::Append,
256                        projection: Projection::all(),
257                        filters: Filters::all(),
258                        delivery: Delivery::default(),
259                    });
260
261                    tracing::info!("Registered views for entity: {}", entity_name);
262                }
263            }
264
265            index
266        });
267
268        // Create runtime
269        #[cfg(feature = "otel")]
270        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
271        #[cfg(not(feature = "otel"))]
272        let mut runtime = Runtime::new(self.config, view_index);
273
274        // Add spec if provided
275        if let Some(spec) = self.spec {
276            runtime = runtime.with_spec(spec);
277        }
278
279        // Run the server
280        runtime.run().await
281    }
282
283    /// Build the runtime without starting it
284    pub fn build(self) -> Result<Runtime> {
285        // Use provided views or create default views from spec
286        let view_index = self.views.unwrap_or_else(|| {
287            let mut index = ViewIndex::new();
288
289            if let Some(ref spec) = self.spec {
290                for entity_name in spec.bytecode.entities.keys() {
291                    index.add_spec(ViewSpec {
292                        id: format!("{}/list", entity_name),
293                        export: entity_name.clone(),
294                        mode: Mode::List,
295                        projection: Projection::all(),
296                        filters: Filters::all(),
297                        delivery: Delivery::default(),
298                    });
299
300                    index.add_spec(ViewSpec {
301                        id: format!("{}/state", entity_name),
302                        export: entity_name.clone(),
303                        mode: Mode::State,
304                        projection: Projection::all(),
305                        filters: Filters::all(),
306                        delivery: Delivery::default(),
307                    });
308
309                    index.add_spec(ViewSpec {
310                        id: format!("{}/append", entity_name),
311                        export: entity_name.clone(),
312                        mode: Mode::Append,
313                        projection: Projection::all(),
314                        filters: Filters::all(),
315                        delivery: Delivery::default(),
316                    });
317                }
318            }
319
320            index
321        });
322
323        #[cfg(feature = "otel")]
324        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
325        #[cfg(not(feature = "otel"))]
326        let mut runtime = Runtime::new(self.config, view_index);
327
328        if let Some(spec) = self.spec {
329            runtime = runtime.with_spec(spec);
330        }
331        Ok(runtime)
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn test_builder_pattern() {
341        let _builder = Server::builder()
342            .websocket()
343            .bind("[::]:8877".parse::<SocketAddr>().unwrap());
344    }
345
346    #[test]
347    fn test_spec_creation() {
348        let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
349        let spec = Spec::new(bytecode, "test_program");
350        assert_eq!(spec.program_id, "test_program");
351    }
352}