hyperstack_server/
lib.rs

1//! # hyperstack-server
2//!
3//! WebSocket server and projection handlers for HyperStack streaming pipelines.
4//!
5//! This crate provides a builder API for creating HyperStack servers that:
6//!
7//! - Process Solana blockchain data via Yellowstone gRPC
8//! - Transform data using the HyperStack VM
9//! - Stream entity updates over WebSockets to connected clients
10//! - Support multiple streaming modes (State, List, Append)
11//!
12//! ## Quick Start
13//!
14//! ```rust,ignore
15//! use hyperstack_server::{Server, Spec};
16//!
17//! #[tokio::main]
18//! async fn main() -> anyhow::Result<()> {
19//!     Server::builder()
20//!         .spec(my_spec())
21//!         .websocket()
22//!         .bind("[::]:8877".parse()?)
23//!         .health_monitoring()
24//!         .start()
25//!         .await
26//! }
27//! ```
28//!
29//! ## Feature Flags
30//!
31//! - `otel` - OpenTelemetry integration for metrics and distributed tracing
32
33pub mod bus;
34pub mod cache;
35pub mod compression;
36pub mod config;
37pub mod health;
38pub mod http_health;
39#[cfg(feature = "otel")]
40pub mod metrics;
41pub mod mutation_batch;
42pub mod projector;
43pub mod runtime;
44pub mod telemetry;
45pub mod view;
46pub mod websocket;
47
48pub use bus::{BusManager, BusMessage};
49pub use cache::{EntityCache, EntityCacheConfig};
50pub use config::{
51    HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
52    YellowstoneConfig,
53};
54pub use health::{HealthMonitor, SlotTracker, StreamStatus};
55pub use http_health::HttpHealthServer;
56#[cfg(feature = "otel")]
57pub use metrics::Metrics;
58pub use mutation_batch::MutationBatch;
59pub use projector::Projector;
60pub use runtime::Runtime;
61pub use telemetry::{init as init_telemetry, TelemetryConfig};
62#[cfg(feature = "otel")]
63pub use telemetry::{init_with_otel, TelemetryGuard};
64pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
65pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
66
67use anyhow::Result;
68use std::net::SocketAddr;
69use std::sync::Arc;
70
71/// Type alias for a parser setup function.
72pub type ParserSetupFn = Arc<
73    dyn Fn(
74            tokio::sync::mpsc::Sender<MutationBatch>,
75            Option<HealthMonitor>,
76            ReconnectionConfig,
77        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
78        + Send
79        + Sync,
80>;
81
82/// Specification for a HyperStack server
83/// Contains bytecode, parsers, and program information
84pub struct Spec {
85    pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
86    pub program_id: String,
87    pub parser_setup: Option<ParserSetupFn>,
88}
89
90impl Spec {
91    pub fn new(
92        bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
93        program_id: impl Into<String>,
94    ) -> Self {
95        Self {
96            bytecode,
97            program_id: program_id.into(),
98            parser_setup: None,
99        }
100    }
101
102    /// Add a parser setup function that will configure Vixen parsers
103    pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
104        self.parser_setup = Some(setup_fn);
105        self
106    }
107}
108
109/// Main server interface with fluent builder API
110pub struct Server;
111
112impl Server {
113    /// Create a new server builder
114    pub fn builder() -> ServerBuilder {
115        ServerBuilder::new()
116    }
117}
118
119/// Builder for configuring and creating a HyperStack server
120pub struct ServerBuilder {
121    spec: Option<Spec>,
122    views: Option<ViewIndex>,
123    config: ServerConfig,
124    #[cfg(feature = "otel")]
125    metrics: Option<Arc<Metrics>>,
126}
127
128impl ServerBuilder {
129    fn new() -> Self {
130        Self {
131            spec: None,
132            views: None,
133            config: ServerConfig::new(),
134            #[cfg(feature = "otel")]
135            metrics: None,
136        }
137    }
138
139    /// Set the specification (bytecode, parsers, program_id)
140    pub fn spec(mut self, spec: Spec) -> Self {
141        self.spec = Some(spec);
142        self
143    }
144
145    /// Set custom view index
146    pub fn views(mut self, views: ViewIndex) -> Self {
147        self.views = Some(views);
148        self
149    }
150
151    /// Enable metrics collection (requires 'otel' feature)
152    #[cfg(feature = "otel")]
153    pub fn metrics(mut self, metrics: Metrics) -> Self {
154        self.metrics = Some(Arc::new(metrics));
155        self
156    }
157
158    /// Enable WebSocket server with default configuration
159    pub fn websocket(mut self) -> Self {
160        self.config.websocket = Some(WebSocketConfig::default());
161        self
162    }
163
164    /// Configure WebSocket server
165    pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
166        self.config.websocket = Some(config);
167        self
168    }
169
170    /// Set the bind address for WebSocket server
171    pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
172        if let Some(ws_config) = &mut self.config.websocket {
173            ws_config.bind_address = addr.into();
174        } else {
175            self.config.websocket = Some(WebSocketConfig::new(addr.into()));
176        }
177        self
178    }
179
180    /// Configure Yellowstone gRPC connection
181    pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
182        self.config.yellowstone = Some(config);
183        self
184    }
185
186    /// Enable health monitoring with default configuration
187    pub fn health_monitoring(mut self) -> Self {
188        self.config.health = Some(HealthConfig::default());
189        self
190    }
191
192    /// Configure health monitoring
193    pub fn health_config(mut self, config: HealthConfig) -> Self {
194        self.config.health = Some(config);
195        self
196    }
197
198    /// Enable reconnection with default configuration
199    pub fn reconnection(mut self) -> Self {
200        self.config.reconnection = Some(ReconnectionConfig::default());
201        self
202    }
203
204    /// Configure reconnection behavior
205    pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
206        self.config.reconnection = Some(config);
207        self
208    }
209
210    /// Enable HTTP health server with default configuration (port 8081)
211    pub fn http_health(mut self) -> Self {
212        self.config.http_health = Some(HttpHealthConfig::default());
213        self
214    }
215
216    /// Configure HTTP health server
217    pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
218        self.config.http_health = Some(config);
219        self
220    }
221
222    /// Set the bind address for HTTP health server
223    pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
224        if let Some(http_config) = &mut self.config.http_health {
225            http_config.bind_address = addr.into();
226        } else {
227            self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
228        }
229        self
230    }
231
232    /// Start the server (consumes the builder)
233    pub async fn start(self) -> Result<()> {
234        // Use provided views or create default views from spec
235        let view_index = self.views.unwrap_or_else(|| {
236            let mut index = ViewIndex::new();
237
238            if let Some(ref spec) = self.spec {
239                for entity_name in spec.bytecode.entities.keys() {
240                    index.add_spec(ViewSpec {
241                        id: format!("{}/list", entity_name),
242                        export: entity_name.clone(),
243                        mode: Mode::List,
244                        projection: Projection::all(),
245                        filters: Filters::all(),
246                        delivery: Delivery::default(),
247                    });
248
249                    index.add_spec(ViewSpec {
250                        id: format!("{}/state", entity_name),
251                        export: entity_name.clone(),
252                        mode: Mode::State,
253                        projection: Projection::all(),
254                        filters: Filters::all(),
255                        delivery: Delivery::default(),
256                    });
257
258                    index.add_spec(ViewSpec {
259                        id: format!("{}/append", entity_name),
260                        export: entity_name.clone(),
261                        mode: Mode::Append,
262                        projection: Projection::all(),
263                        filters: Filters::all(),
264                        delivery: Delivery::default(),
265                    });
266                }
267            }
268
269            index
270        });
271
272        // Create runtime
273        #[cfg(feature = "otel")]
274        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
275        #[cfg(not(feature = "otel"))]
276        let mut runtime = Runtime::new(self.config, view_index);
277
278        // Add spec if provided
279        if let Some(spec) = self.spec {
280            runtime = runtime.with_spec(spec);
281        }
282
283        // Run the server
284        runtime.run().await
285    }
286
287    /// Build the runtime without starting it
288    pub fn build(self) -> Result<Runtime> {
289        // Use provided views or create default views from spec
290        let view_index = self.views.unwrap_or_else(|| {
291            let mut index = ViewIndex::new();
292
293            if let Some(ref spec) = self.spec {
294                for entity_name in spec.bytecode.entities.keys() {
295                    index.add_spec(ViewSpec {
296                        id: format!("{}/list", entity_name),
297                        export: entity_name.clone(),
298                        mode: Mode::List,
299                        projection: Projection::all(),
300                        filters: Filters::all(),
301                        delivery: Delivery::default(),
302                    });
303
304                    index.add_spec(ViewSpec {
305                        id: format!("{}/state", entity_name),
306                        export: entity_name.clone(),
307                        mode: Mode::State,
308                        projection: Projection::all(),
309                        filters: Filters::all(),
310                        delivery: Delivery::default(),
311                    });
312
313                    index.add_spec(ViewSpec {
314                        id: format!("{}/append", entity_name),
315                        export: entity_name.clone(),
316                        mode: Mode::Append,
317                        projection: Projection::all(),
318                        filters: Filters::all(),
319                        delivery: Delivery::default(),
320                    });
321                }
322            }
323
324            index
325        });
326
327        #[cfg(feature = "otel")]
328        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
329        #[cfg(not(feature = "otel"))]
330        let mut runtime = Runtime::new(self.config, view_index);
331
332        if let Some(spec) = self.spec {
333            runtime = runtime.with_spec(spec);
334        }
335        Ok(runtime)
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_builder_pattern() {
345        let _builder = Server::builder()
346            .websocket()
347            .bind("[::]:8877".parse::<SocketAddr>().unwrap());
348    }
349
350    #[test]
351    fn test_spec_creation() {
352        let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
353        let spec = Spec::new(bytecode, "test_program");
354        assert_eq!(spec.program_id, "test_program");
355    }
356}