hyperstack_server/
lib.rs

1//! # hyperstack-server
2//!
3//! WebSocket server and projection handlers for HyperStack streaming pipelines.
4//!
5//! This crate provides a builder API for creating HyperStack servers that:
6//!
7//! - Process Solana blockchain data via Yellowstone gRPC
8//! - Transform data using the HyperStack VM
9//! - Stream entity updates over WebSockets to connected clients
10//! - Support multiple streaming modes (State, List, Append)
11//!
12//! ## Quick Start
13//!
14//! ```rust,ignore
15//! use hyperstack_server::{Server, Spec};
16//!
17//! #[tokio::main]
18//! async fn main() -> anyhow::Result<()> {
19//!     Server::builder()
20//!         .spec(my_spec())
21//!         .websocket()
22//!         .bind("[::]:8877".parse()?)
23//!         .health_monitoring()
24//!         .start()
25//!         .await
26//! }
27//! ```
28//!
29//! ## Feature Flags
30//!
31//! - `otel` - OpenTelemetry integration for metrics and distributed tracing
32
33pub mod bus;
34pub mod cache;
35pub mod config;
36pub mod health;
37pub mod http_health;
38#[cfg(feature = "otel")]
39pub mod metrics;
40pub mod projector;
41pub mod runtime;
42pub mod view;
43pub mod websocket;
44
45pub use bus::{BusManager, BusMessage};
46pub use cache::{EntityCache, EntityCacheConfig};
47pub use config::{
48    HealthConfig, HttpHealthConfig, ServerConfig, WebSocketConfig, YellowstoneConfig,
49};
50pub use health::{HealthMonitor, StreamStatus};
51pub use http_health::HttpHealthServer;
52#[cfg(feature = "otel")]
53pub use metrics::Metrics;
54pub use projector::Projector;
55pub use runtime::Runtime;
56pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
57pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
58
59use anyhow::Result;
60use std::net::SocketAddr;
61use std::sync::Arc;
62
63/// Type alias for a parser setup function
64/// This function receives a mutations sender and optional health monitor, then sets up the Vixen runtime
65pub type ParserSetupFn = Arc<
66    dyn Fn(
67            tokio::sync::mpsc::Sender<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
68            Option<HealthMonitor>,
69        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
70        + Send
71        + Sync,
72>;
73
74/// Specification for a HyperStack server
75/// Contains bytecode, parsers, and program information
76pub struct Spec {
77    pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
78    pub program_id: String,
79    pub parser_setup: Option<ParserSetupFn>,
80}
81
82impl Spec {
83    pub fn new(
84        bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
85        program_id: impl Into<String>,
86    ) -> Self {
87        Self {
88            bytecode,
89            program_id: program_id.into(),
90            parser_setup: None,
91        }
92    }
93
94    /// Add a parser setup function that will configure Vixen parsers
95    pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
96        self.parser_setup = Some(setup_fn);
97        self
98    }
99}
100
101/// Main server interface with fluent builder API
102pub struct Server;
103
104impl Server {
105    /// Create a new server builder
106    pub fn builder() -> ServerBuilder {
107        ServerBuilder::new()
108    }
109}
110
111/// Builder for configuring and creating a HyperStack server
112pub struct ServerBuilder {
113    spec: Option<Spec>,
114    views: Option<ViewIndex>,
115    config: ServerConfig,
116    #[cfg(feature = "otel")]
117    metrics: Option<Arc<Metrics>>,
118}
119
120impl ServerBuilder {
121    fn new() -> Self {
122        Self {
123            spec: None,
124            views: None,
125            config: ServerConfig::new(),
126            #[cfg(feature = "otel")]
127            metrics: None,
128        }
129    }
130
131    /// Set the specification (bytecode, parsers, program_id)
132    pub fn spec(mut self, spec: Spec) -> Self {
133        self.spec = Some(spec);
134        self
135    }
136
137    /// Set custom view index
138    pub fn views(mut self, views: ViewIndex) -> Self {
139        self.views = Some(views);
140        self
141    }
142
143    /// Enable metrics collection (requires 'otel' feature)
144    #[cfg(feature = "otel")]
145    pub fn metrics(mut self, metrics: Metrics) -> Self {
146        self.metrics = Some(Arc::new(metrics));
147        self
148    }
149
150    /// Enable WebSocket server with default configuration
151    pub fn websocket(mut self) -> Self {
152        self.config.websocket = Some(WebSocketConfig::default());
153        self
154    }
155
156    /// Configure WebSocket server
157    pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
158        self.config.websocket = Some(config);
159        self
160    }
161
162    /// Set the bind address for WebSocket server
163    pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
164        if let Some(ws_config) = &mut self.config.websocket {
165            ws_config.bind_address = addr.into();
166        } else {
167            self.config.websocket = Some(WebSocketConfig::new(addr.into()));
168        }
169        self
170    }
171
172    /// Configure Yellowstone gRPC connection
173    pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
174        self.config.yellowstone = Some(config);
175        self
176    }
177
178    /// Enable health monitoring with default configuration
179    pub fn health_monitoring(mut self) -> Self {
180        self.config.health = Some(HealthConfig::default());
181        self
182    }
183
184    /// Configure health monitoring
185    pub fn health_config(mut self, config: HealthConfig) -> Self {
186        self.config.health = Some(config);
187        self
188    }
189
190    /// Enable HTTP health server with default configuration (port 8081)
191    pub fn http_health(mut self) -> Self {
192        self.config.http_health = Some(HttpHealthConfig::default());
193        self
194    }
195
196    /// Configure HTTP health server
197    pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
198        self.config.http_health = Some(config);
199        self
200    }
201
202    /// Set the bind address for HTTP health server
203    pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
204        if let Some(http_config) = &mut self.config.http_health {
205            http_config.bind_address = addr.into();
206        } else {
207            self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
208        }
209        self
210    }
211
212    /// Start the server (consumes the builder)
213    pub async fn start(self) -> Result<()> {
214        // Use provided views or create default views from spec
215        let view_index = self.views.unwrap_or_else(|| {
216            let mut index = ViewIndex::new();
217
218            if let Some(ref spec) = self.spec {
219                for entity_name in spec.bytecode.entities.keys() {
220                    index.add_spec(ViewSpec {
221                        id: format!("{}/list", entity_name),
222                        export: entity_name.clone(),
223                        mode: Mode::List,
224                        projection: Projection::all(),
225                        filters: Filters::all(),
226                        delivery: Delivery::default(),
227                    });
228
229                    index.add_spec(ViewSpec {
230                        id: format!("{}/state", entity_name),
231                        export: entity_name.clone(),
232                        mode: Mode::State,
233                        projection: Projection::all(),
234                        filters: Filters::all(),
235                        delivery: Delivery::default(),
236                    });
237
238                    index.add_spec(ViewSpec {
239                        id: format!("{}/append", entity_name),
240                        export: entity_name.clone(),
241                        mode: Mode::Append,
242                        projection: Projection::all(),
243                        filters: Filters::all(),
244                        delivery: Delivery::default(),
245                    });
246
247                    tracing::info!("Registered views for entity: {}", entity_name);
248                }
249            }
250
251            index
252        });
253
254        // Create runtime
255        #[cfg(feature = "otel")]
256        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
257        #[cfg(not(feature = "otel"))]
258        let mut runtime = Runtime::new(self.config, view_index);
259
260        // Add spec if provided
261        if let Some(spec) = self.spec {
262            runtime = runtime.with_spec(spec);
263        }
264
265        // Run the server
266        runtime.run().await
267    }
268
269    /// Build the runtime without starting it
270    pub fn build(self) -> Result<Runtime> {
271        // Use provided views or create default views from spec
272        let view_index = self.views.unwrap_or_else(|| {
273            let mut index = ViewIndex::new();
274
275            if let Some(ref spec) = self.spec {
276                for entity_name in spec.bytecode.entities.keys() {
277                    index.add_spec(ViewSpec {
278                        id: format!("{}/list", entity_name),
279                        export: entity_name.clone(),
280                        mode: Mode::List,
281                        projection: Projection::all(),
282                        filters: Filters::all(),
283                        delivery: Delivery::default(),
284                    });
285
286                    index.add_spec(ViewSpec {
287                        id: format!("{}/state", entity_name),
288                        export: entity_name.clone(),
289                        mode: Mode::State,
290                        projection: Projection::all(),
291                        filters: Filters::all(),
292                        delivery: Delivery::default(),
293                    });
294
295                    index.add_spec(ViewSpec {
296                        id: format!("{}/append", entity_name),
297                        export: entity_name.clone(),
298                        mode: Mode::Append,
299                        projection: Projection::all(),
300                        filters: Filters::all(),
301                        delivery: Delivery::default(),
302                    });
303                }
304            }
305
306            index
307        });
308
309        #[cfg(feature = "otel")]
310        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
311        #[cfg(not(feature = "otel"))]
312        let mut runtime = Runtime::new(self.config, view_index);
313
314        if let Some(spec) = self.spec {
315            runtime = runtime.with_spec(spec);
316        }
317        Ok(runtime)
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn test_builder_pattern() {
327        let _builder = Server::builder()
328            .websocket()
329            .bind("[::]:8877".parse::<SocketAddr>().unwrap());
330    }
331
332    #[test]
333    fn test_spec_creation() {
334        let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
335        let spec = Spec::new(bytecode, "test_program");
336        assert_eq!(spec.program_id, "test_program");
337    }
338}