hyperstack_server/
lib.rs

1//! # hyperstack-server
2//!
3//! WebSocket server and projection handlers for HyperStack streaming pipelines.
4//!
5//! This crate provides a builder API for creating HyperStack servers that:
6//!
7//! - Process Solana blockchain data via Yellowstone gRPC
8//! - Transform data using the HyperStack VM
9//! - Stream entity updates over WebSockets to connected clients
10//! - Support multiple streaming modes (State, KV, List, Append)
11//!
12//! ## Quick Start
13//!
14//! ```rust,ignore
15//! use hyperstack_server::{Server, Spec};
16//!
17//! #[tokio::main]
18//! async fn main() -> anyhow::Result<()> {
19//!     Server::builder()
20//!         .spec(my_spec())
21//!         .websocket()
22//!         .bind("[::]:8877".parse()?)
23//!         .health_monitoring()
24//!         .start()
25//!         .await
26//! }
27//! ```
28//!
29//! ## Feature Flags
30//!
31//! - `otel` - OpenTelemetry integration for metrics and distributed tracing
32
33pub mod bus;
34pub mod config;
35pub mod health;
36pub mod http_health;
37#[cfg(feature = "otel")]
38pub mod metrics;
39pub mod projector;
40pub mod runtime;
41pub mod view;
42pub mod websocket;
43
44pub use bus::{BusManager, BusMessage};
45pub use config::{HealthConfig, HttpHealthConfig, ServerConfig, WebSocketConfig, YellowstoneConfig};
46pub use health::{HealthMonitor, StreamStatus};
47pub use http_health::HttpHealthServer;
48#[cfg(feature = "otel")]
49pub use metrics::Metrics;
50pub use projector::Projector;
51pub use runtime::Runtime;
52pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
53pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
54
55use anyhow::Result;
56use std::net::SocketAddr;
57use std::sync::Arc;
58
59/// Type alias for a parser setup function
60/// This function receives a mutations sender and optional health monitor, then sets up the Vixen runtime
61pub type ParserSetupFn = Arc<
62    dyn Fn(
63        tokio::sync::mpsc::Sender<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
64        Option<HealthMonitor>,
65    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>> 
66        + Send 
67        + Sync
68>;
69
70/// Specification for a HyperStack server
71/// Contains bytecode, parsers, and program information
72pub struct Spec {
73    pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
74    pub program_id: String,
75    pub parser_setup: Option<ParserSetupFn>,
76}
77
78impl Spec {
79    pub fn new(bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode, program_id: impl Into<String>) -> Self {
80        Self {
81            bytecode,
82            program_id: program_id.into(),
83            parser_setup: None,
84        }
85    }
86
87    /// Add a parser setup function that will configure Vixen parsers
88    pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
89        self.parser_setup = Some(setup_fn);
90        self
91    }
92}
93
94/// Main server interface with fluent builder API
95pub struct Server;
96
97impl Server {
98    /// Create a new server builder
99    pub fn builder() -> ServerBuilder {
100        ServerBuilder::new()
101    }
102}
103
104/// Builder for configuring and creating a HyperStack server
105pub struct ServerBuilder {
106    spec: Option<Spec>,
107    views: Option<ViewIndex>,
108    config: ServerConfig,
109    #[cfg(feature = "otel")]
110    metrics: Option<Arc<Metrics>>,
111}
112
113impl ServerBuilder {
114    fn new() -> Self {
115        Self {
116            spec: None,
117            views: None,
118            config: ServerConfig::new(),
119            #[cfg(feature = "otel")]
120            metrics: None,
121        }
122    }
123
124    /// Set the specification (bytecode, parsers, program_id)
125    pub fn spec(mut self, spec: Spec) -> Self {
126        self.spec = Some(spec);
127        self
128    }
129
130    /// Set custom view index
131    pub fn views(mut self, views: ViewIndex) -> Self {
132        self.views = Some(views);
133        self
134    }
135
136    /// Enable metrics collection (requires 'otel' feature)
137    #[cfg(feature = "otel")]
138    pub fn metrics(mut self, metrics: Metrics) -> Self {
139        self.metrics = Some(Arc::new(metrics));
140        self
141    }
142
143    /// Enable WebSocket server with default configuration
144    pub fn websocket(mut self) -> Self {
145        self.config.websocket = Some(WebSocketConfig::default());
146        self
147    }
148
149    /// Configure WebSocket server
150    pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
151        self.config.websocket = Some(config);
152        self
153    }
154
155    /// Set the bind address for WebSocket server
156    pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
157        if let Some(ws_config) = &mut self.config.websocket {
158            ws_config.bind_address = addr.into();
159        } else {
160            self.config.websocket = Some(WebSocketConfig::new(addr.into()));
161        }
162        self
163    }
164
165    /// Configure Yellowstone gRPC connection
166    pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
167        self.config.yellowstone = Some(config);
168        self
169    }
170
171    /// Enable health monitoring with default configuration
172    pub fn health_monitoring(mut self) -> Self {
173        self.config.health = Some(HealthConfig::default());
174        self
175    }
176
177    /// Configure health monitoring
178    pub fn health_config(mut self, config: HealthConfig) -> Self {
179        self.config.health = Some(config);
180        self
181    }
182
183    /// Enable HTTP health server with default configuration (port 8081)
184    pub fn http_health(mut self) -> Self {
185        self.config.http_health = Some(HttpHealthConfig::default());
186        self
187    }
188
189    /// Configure HTTP health server
190    pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
191        self.config.http_health = Some(config);
192        self
193    }
194
195    /// Set the bind address for HTTP health server
196    pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
197        if let Some(http_config) = &mut self.config.http_health {
198            http_config.bind_address = addr.into();
199        } else {
200            self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
201        }
202        self
203    }
204
205    /// Start the server (consumes the builder)
206    pub async fn start(self) -> Result<()> {
207        // Use provided views or create default views from spec
208        let view_index = self.views.unwrap_or_else(|| {
209            let mut index = ViewIndex::new();
210            
211            // Auto-register default views for each entity in the spec
212            if let Some(ref spec) = self.spec {
213                for entity_name in spec.bytecode.entities.keys() {
214                    // Register kv view (key-value lookups)
215                    index.add_spec(ViewSpec {
216                        id: format!("{}/kv", entity_name),
217                        export: entity_name.clone(),
218                        mode: Mode::Kv,
219                        projection: Projection::all(),
220                        filters: Filters::all(),
221                        delivery: Delivery::default(),
222                    });
223                    
224                    // Register list view (all entities)
225                    index.add_spec(ViewSpec {
226                        id: format!("{}/list", entity_name),
227                        export: entity_name.clone(),
228                        mode: Mode::List,
229                        projection: Projection::all(),
230                        filters: Filters::all(),
231                        delivery: Delivery::default(),
232                    });
233                    
234                    // Register state view (single shared state)
235                    index.add_spec(ViewSpec {
236                        id: format!("{}/state", entity_name),
237                        export: entity_name.clone(),
238                        mode: Mode::State,
239                        projection: Projection::all(),
240                        filters: Filters::all(),
241                        delivery: Delivery::default(),
242                    });
243                    
244                    // Register append view (append-only log)
245                    index.add_spec(ViewSpec {
246                        id: format!("{}/append", entity_name),
247                        export: entity_name.clone(),
248                        mode: Mode::Append,
249                        projection: Projection::all(),
250                        filters: Filters::all(),
251                        delivery: Delivery::default(),
252                    });
253                    
254                    tracing::info!("Registered views for entity: {}", entity_name);
255                }
256            }
257            
258            index
259        });
260
261        // Create runtime
262        #[cfg(feature = "otel")]
263        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
264        #[cfg(not(feature = "otel"))]
265        let mut runtime = Runtime::new(self.config, view_index);
266        
267        // Add spec if provided
268        if let Some(spec) = self.spec {
269            runtime = runtime.with_spec(spec);
270        }
271
272        // Run the server
273        runtime.run().await
274    }
275
276    /// Build the runtime without starting it
277    pub fn build(self) -> Result<Runtime> {
278        // Use provided views or create default views from spec
279        let view_index = self.views.unwrap_or_else(|| {
280            let mut index = ViewIndex::new();
281            
282            // Auto-register default views for each entity in the spec
283            if let Some(ref spec) = self.spec {
284                for entity_name in spec.bytecode.entities.keys() {
285                    // Register kv view
286                    index.add_spec(ViewSpec {
287                        id: format!("{}/kv", entity_name),
288                        export: entity_name.clone(),
289                        mode: Mode::Kv,
290                        projection: Projection::all(),
291                        filters: Filters::all(),
292                        delivery: Delivery::default(),
293                    });
294                    
295                    // Register list view
296                    index.add_spec(ViewSpec {
297                        id: format!("{}/list", entity_name),
298                        export: entity_name.clone(),
299                        mode: Mode::List,
300                        projection: Projection::all(),
301                        filters: Filters::all(),
302                        delivery: Delivery::default(),
303                    });
304                    
305                    // Register state view
306                    index.add_spec(ViewSpec {
307                        id: format!("{}/state", entity_name),
308                        export: entity_name.clone(),
309                        mode: Mode::State,
310                        projection: Projection::all(),
311                        filters: Filters::all(),
312                        delivery: Delivery::default(),
313                    });
314                    
315                    // Register append view
316                    index.add_spec(ViewSpec {
317                        id: format!("{}/append", entity_name),
318                        export: entity_name.clone(),
319                        mode: Mode::Append,
320                        projection: Projection::all(),
321                        filters: Filters::all(),
322                        delivery: Delivery::default(),
323                    });
324                }
325            }
326            
327            index
328        });
329        
330        #[cfg(feature = "otel")]
331        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
332        #[cfg(not(feature = "otel"))]
333        let mut runtime = Runtime::new(self.config, view_index);
334        
335        if let Some(spec) = self.spec {
336            runtime = runtime.with_spec(spec);
337        }
338        Ok(runtime)
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    #[test]
347    fn test_builder_pattern() {
348        let _builder = Server::builder()
349            .websocket()
350            .bind("[::]:8877".parse::<SocketAddr>().unwrap());
351    }
352
353    #[test]
354    fn test_spec_creation() {
355        let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
356        let spec = Spec::new(bytecode, "test_program");
357        assert_eq!(spec.program_id, "test_program");
358    }
359}