hyperstack_server/
lib.rs

1//! # hyperstack-server
2//!
3//! WebSocket server and projection handlers for HyperStack streaming pipelines.
4//!
5//! This crate provides a builder API for creating HyperStack servers that:
6//!
7//! - Process Solana blockchain data via Yellowstone gRPC
8//! - Transform data using the HyperStack VM
9//! - Stream entity updates over WebSockets to connected clients
10//! - Support multiple streaming modes (State, KV, List, Append)
11//!
12//! ## Quick Start
13//!
14//! ```rust,ignore
15//! use hyperstack_server::{Server, Spec};
16//!
17//! #[tokio::main]
18//! async fn main() -> anyhow::Result<()> {
19//!     Server::builder()
20//!         .spec(my_spec())
21//!         .websocket()
22//!         .bind("[::]:8877".parse()?)
23//!         .health_monitoring()
24//!         .start()
25//!         .await
26//! }
27//! ```
28//!
29//! ## Feature Flags
30//!
31//! - `otel` - OpenTelemetry integration for metrics and distributed tracing
32
33pub mod bus;
34pub mod config;
35pub mod health;
36pub mod http_health;
37#[cfg(feature = "otel")]
38pub mod metrics;
39pub mod projector;
40pub mod runtime;
41pub mod view;
42pub mod websocket;
43
44pub use bus::{BusManager, BusMessage};
45pub use config::{
46    HealthConfig, HttpHealthConfig, ServerConfig, WebSocketConfig, YellowstoneConfig,
47};
48pub use health::{HealthMonitor, StreamStatus};
49pub use http_health::HttpHealthServer;
50#[cfg(feature = "otel")]
51pub use metrics::Metrics;
52pub use projector::Projector;
53pub use runtime::Runtime;
54pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
55pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
56
57use anyhow::Result;
58use std::net::SocketAddr;
59use std::sync::Arc;
60
61/// Type alias for a parser setup function
62/// This function receives a mutations sender and optional health monitor, then sets up the Vixen runtime
63pub type ParserSetupFn = Arc<
64    dyn Fn(
65            tokio::sync::mpsc::Sender<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
66            Option<HealthMonitor>,
67        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
68        + Send
69        + Sync,
70>;
71
72/// Specification for a HyperStack server
73/// Contains bytecode, parsers, and program information
74pub struct Spec {
75    pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
76    pub program_id: String,
77    pub parser_setup: Option<ParserSetupFn>,
78}
79
80impl Spec {
81    pub fn new(
82        bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
83        program_id: impl Into<String>,
84    ) -> Self {
85        Self {
86            bytecode,
87            program_id: program_id.into(),
88            parser_setup: None,
89        }
90    }
91
92    /// Add a parser setup function that will configure Vixen parsers
93    pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
94        self.parser_setup = Some(setup_fn);
95        self
96    }
97}
98
99/// Main server interface with fluent builder API
100pub struct Server;
101
102impl Server {
103    /// Create a new server builder
104    pub fn builder() -> ServerBuilder {
105        ServerBuilder::new()
106    }
107}
108
109/// Builder for configuring and creating a HyperStack server
110pub struct ServerBuilder {
111    spec: Option<Spec>,
112    views: Option<ViewIndex>,
113    config: ServerConfig,
114    #[cfg(feature = "otel")]
115    metrics: Option<Arc<Metrics>>,
116}
117
118impl ServerBuilder {
119    fn new() -> Self {
120        Self {
121            spec: None,
122            views: None,
123            config: ServerConfig::new(),
124            #[cfg(feature = "otel")]
125            metrics: None,
126        }
127    }
128
129    /// Set the specification (bytecode, parsers, program_id)
130    pub fn spec(mut self, spec: Spec) -> Self {
131        self.spec = Some(spec);
132        self
133    }
134
135    /// Set custom view index
136    pub fn views(mut self, views: ViewIndex) -> Self {
137        self.views = Some(views);
138        self
139    }
140
141    /// Enable metrics collection (requires 'otel' feature)
142    #[cfg(feature = "otel")]
143    pub fn metrics(mut self, metrics: Metrics) -> Self {
144        self.metrics = Some(Arc::new(metrics));
145        self
146    }
147
148    /// Enable WebSocket server with default configuration
149    pub fn websocket(mut self) -> Self {
150        self.config.websocket = Some(WebSocketConfig::default());
151        self
152    }
153
154    /// Configure WebSocket server
155    pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
156        self.config.websocket = Some(config);
157        self
158    }
159
160    /// Set the bind address for WebSocket server
161    pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
162        if let Some(ws_config) = &mut self.config.websocket {
163            ws_config.bind_address = addr.into();
164        } else {
165            self.config.websocket = Some(WebSocketConfig::new(addr.into()));
166        }
167        self
168    }
169
170    /// Configure Yellowstone gRPC connection
171    pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
172        self.config.yellowstone = Some(config);
173        self
174    }
175
176    /// Enable health monitoring with default configuration
177    pub fn health_monitoring(mut self) -> Self {
178        self.config.health = Some(HealthConfig::default());
179        self
180    }
181
182    /// Configure health monitoring
183    pub fn health_config(mut self, config: HealthConfig) -> Self {
184        self.config.health = Some(config);
185        self
186    }
187
188    /// Enable HTTP health server with default configuration (port 8081)
189    pub fn http_health(mut self) -> Self {
190        self.config.http_health = Some(HttpHealthConfig::default());
191        self
192    }
193
194    /// Configure HTTP health server
195    pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
196        self.config.http_health = Some(config);
197        self
198    }
199
200    /// Set the bind address for HTTP health server
201    pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
202        if let Some(http_config) = &mut self.config.http_health {
203            http_config.bind_address = addr.into();
204        } else {
205            self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
206        }
207        self
208    }
209
210    /// Start the server (consumes the builder)
211    pub async fn start(self) -> Result<()> {
212        // Use provided views or create default views from spec
213        let view_index = self.views.unwrap_or_else(|| {
214            let mut index = ViewIndex::new();
215
216            // Auto-register default views for each entity in the spec
217            if let Some(ref spec) = self.spec {
218                for entity_name in spec.bytecode.entities.keys() {
219                    // Register kv view (key-value lookups)
220                    index.add_spec(ViewSpec {
221                        id: format!("{}/kv", entity_name),
222                        export: entity_name.clone(),
223                        mode: Mode::Kv,
224                        projection: Projection::all(),
225                        filters: Filters::all(),
226                        delivery: Delivery::default(),
227                    });
228
229                    // Register list view (all entities)
230                    index.add_spec(ViewSpec {
231                        id: format!("{}/list", entity_name),
232                        export: entity_name.clone(),
233                        mode: Mode::List,
234                        projection: Projection::all(),
235                        filters: Filters::all(),
236                        delivery: Delivery::default(),
237                    });
238
239                    // Register state view (single shared state)
240                    index.add_spec(ViewSpec {
241                        id: format!("{}/state", entity_name),
242                        export: entity_name.clone(),
243                        mode: Mode::State,
244                        projection: Projection::all(),
245                        filters: Filters::all(),
246                        delivery: Delivery::default(),
247                    });
248
249                    // Register append view (append-only log)
250                    index.add_spec(ViewSpec {
251                        id: format!("{}/append", entity_name),
252                        export: entity_name.clone(),
253                        mode: Mode::Append,
254                        projection: Projection::all(),
255                        filters: Filters::all(),
256                        delivery: Delivery::default(),
257                    });
258
259                    tracing::info!("Registered views for entity: {}", entity_name);
260                }
261            }
262
263            index
264        });
265
266        // Create runtime
267        #[cfg(feature = "otel")]
268        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
269        #[cfg(not(feature = "otel"))]
270        let mut runtime = Runtime::new(self.config, view_index);
271
272        // Add spec if provided
273        if let Some(spec) = self.spec {
274            runtime = runtime.with_spec(spec);
275        }
276
277        // Run the server
278        runtime.run().await
279    }
280
281    /// Build the runtime without starting it
282    pub fn build(self) -> Result<Runtime> {
283        // Use provided views or create default views from spec
284        let view_index = self.views.unwrap_or_else(|| {
285            let mut index = ViewIndex::new();
286
287            // Auto-register default views for each entity in the spec
288            if let Some(ref spec) = self.spec {
289                for entity_name in spec.bytecode.entities.keys() {
290                    // Register kv view
291                    index.add_spec(ViewSpec {
292                        id: format!("{}/kv", entity_name),
293                        export: entity_name.clone(),
294                        mode: Mode::Kv,
295                        projection: Projection::all(),
296                        filters: Filters::all(),
297                        delivery: Delivery::default(),
298                    });
299
300                    // Register list view
301                    index.add_spec(ViewSpec {
302                        id: format!("{}/list", entity_name),
303                        export: entity_name.clone(),
304                        mode: Mode::List,
305                        projection: Projection::all(),
306                        filters: Filters::all(),
307                        delivery: Delivery::default(),
308                    });
309
310                    // Register state view
311                    index.add_spec(ViewSpec {
312                        id: format!("{}/state", entity_name),
313                        export: entity_name.clone(),
314                        mode: Mode::State,
315                        projection: Projection::all(),
316                        filters: Filters::all(),
317                        delivery: Delivery::default(),
318                    });
319
320                    // Register append view
321                    index.add_spec(ViewSpec {
322                        id: format!("{}/append", entity_name),
323                        export: entity_name.clone(),
324                        mode: Mode::Append,
325                        projection: Projection::all(),
326                        filters: Filters::all(),
327                        delivery: Delivery::default(),
328                    });
329                }
330            }
331
332            index
333        });
334
335        #[cfg(feature = "otel")]
336        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
337        #[cfg(not(feature = "otel"))]
338        let mut runtime = Runtime::new(self.config, view_index);
339
340        if let Some(spec) = self.spec {
341            runtime = runtime.with_spec(spec);
342        }
343        Ok(runtime)
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    #[test]
352    fn test_builder_pattern() {
353        let _builder = Server::builder()
354            .websocket()
355            .bind("[::]:8877".parse::<SocketAddr>().unwrap());
356    }
357
358    #[test]
359    fn test_spec_creation() {
360        let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
361        let spec = Spec::new(bytecode, "test_program");
362        assert_eq!(spec.program_id, "test_program");
363    }
364}