hyperstack_server/
lib.rs

1//! # hyperstack-server
2//!
3//! WebSocket server and projection handlers for HyperStack streaming pipelines.
4//!
5//! This crate provides a builder API for creating HyperStack servers that:
6//!
7//! - Process Solana blockchain data via Yellowstone gRPC
8//! - Transform data using the HyperStack VM
9//! - Stream entity updates over WebSockets to connected clients
10//! - Support multiple streaming modes (State, List, Append)
11//!
12//! ## Quick Start
13//!
14//! ```rust,ignore
15//! use hyperstack_server::{Server, Spec};
16//!
17//! #[tokio::main]
18//! async fn main() -> anyhow::Result<()> {
19//!     Server::builder()
20//!         .spec(my_spec())
21//!         .websocket()
22//!         .bind("[::]:8877".parse()?)
23//!         .health_monitoring()
24//!         .start()
25//!         .await
26//! }
27//! ```
28//!
29//! ## Feature Flags
30//!
31//! - `otel` - OpenTelemetry integration for metrics and distributed tracing
32
33pub mod bus;
34pub mod cache;
35pub mod config;
36pub mod health;
37pub mod http_health;
38#[cfg(feature = "otel")]
39pub mod metrics;
40pub mod mutation_batch;
41pub mod projector;
42pub mod runtime;
43pub mod telemetry;
44pub mod view;
45pub mod websocket;
46
47pub use bus::{BusManager, BusMessage};
48pub use cache::{EntityCache, EntityCacheConfig};
49pub use config::{
50    HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
51    YellowstoneConfig,
52};
53pub use health::{HealthMonitor, SlotTracker, StreamStatus};
54pub use http_health::HttpHealthServer;
55#[cfg(feature = "otel")]
56pub use metrics::Metrics;
57pub use mutation_batch::MutationBatch;
58pub use projector::Projector;
59pub use runtime::Runtime;
60pub use telemetry::{init as init_telemetry, TelemetryConfig};
61#[cfg(feature = "otel")]
62pub use telemetry::{init_with_otel, TelemetryGuard};
63pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
64pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
65
66use anyhow::Result;
67use std::net::SocketAddr;
68use std::sync::Arc;
69
70/// Type alias for a parser setup function.
71pub type ParserSetupFn = Arc<
72    dyn Fn(
73            tokio::sync::mpsc::Sender<MutationBatch>,
74            Option<HealthMonitor>,
75            ReconnectionConfig,
76        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
77        + Send
78        + Sync,
79>;
80
81/// Specification for a HyperStack server
82/// Contains bytecode, parsers, and program information
83pub struct Spec {
84    pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
85    pub program_id: String,
86    pub parser_setup: Option<ParserSetupFn>,
87}
88
89impl Spec {
90    pub fn new(
91        bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
92        program_id: impl Into<String>,
93    ) -> Self {
94        Self {
95            bytecode,
96            program_id: program_id.into(),
97            parser_setup: None,
98        }
99    }
100
101    /// Add a parser setup function that will configure Vixen parsers
102    pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
103        self.parser_setup = Some(setup_fn);
104        self
105    }
106}
107
108/// Main server interface with fluent builder API
109pub struct Server;
110
111impl Server {
112    /// Create a new server builder
113    pub fn builder() -> ServerBuilder {
114        ServerBuilder::new()
115    }
116}
117
118/// Builder for configuring and creating a HyperStack server
119pub struct ServerBuilder {
120    spec: Option<Spec>,
121    views: Option<ViewIndex>,
122    config: ServerConfig,
123    #[cfg(feature = "otel")]
124    metrics: Option<Arc<Metrics>>,
125}
126
127impl ServerBuilder {
128    fn new() -> Self {
129        Self {
130            spec: None,
131            views: None,
132            config: ServerConfig::new(),
133            #[cfg(feature = "otel")]
134            metrics: None,
135        }
136    }
137
138    /// Set the specification (bytecode, parsers, program_id)
139    pub fn spec(mut self, spec: Spec) -> Self {
140        self.spec = Some(spec);
141        self
142    }
143
144    /// Set custom view index
145    pub fn views(mut self, views: ViewIndex) -> Self {
146        self.views = Some(views);
147        self
148    }
149
150    /// Enable metrics collection (requires 'otel' feature)
151    #[cfg(feature = "otel")]
152    pub fn metrics(mut self, metrics: Metrics) -> Self {
153        self.metrics = Some(Arc::new(metrics));
154        self
155    }
156
157    /// Enable WebSocket server with default configuration
158    pub fn websocket(mut self) -> Self {
159        self.config.websocket = Some(WebSocketConfig::default());
160        self
161    }
162
163    /// Configure WebSocket server
164    pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
165        self.config.websocket = Some(config);
166        self
167    }
168
169    /// Set the bind address for WebSocket server
170    pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
171        if let Some(ws_config) = &mut self.config.websocket {
172            ws_config.bind_address = addr.into();
173        } else {
174            self.config.websocket = Some(WebSocketConfig::new(addr.into()));
175        }
176        self
177    }
178
179    /// Configure Yellowstone gRPC connection
180    pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
181        self.config.yellowstone = Some(config);
182        self
183    }
184
185    /// Enable health monitoring with default configuration
186    pub fn health_monitoring(mut self) -> Self {
187        self.config.health = Some(HealthConfig::default());
188        self
189    }
190
191    /// Configure health monitoring
192    pub fn health_config(mut self, config: HealthConfig) -> Self {
193        self.config.health = Some(config);
194        self
195    }
196
197    /// Enable reconnection with default configuration
198    pub fn reconnection(mut self) -> Self {
199        self.config.reconnection = Some(ReconnectionConfig::default());
200        self
201    }
202
203    /// Configure reconnection behavior
204    pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
205        self.config.reconnection = Some(config);
206        self
207    }
208
209    /// Enable HTTP health server with default configuration (port 8081)
210    pub fn http_health(mut self) -> Self {
211        self.config.http_health = Some(HttpHealthConfig::default());
212        self
213    }
214
215    /// Configure HTTP health server
216    pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
217        self.config.http_health = Some(config);
218        self
219    }
220
221    /// Set the bind address for HTTP health server
222    pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
223        if let Some(http_config) = &mut self.config.http_health {
224            http_config.bind_address = addr.into();
225        } else {
226            self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
227        }
228        self
229    }
230
231    /// Start the server (consumes the builder)
232    pub async fn start(self) -> Result<()> {
233        // Use provided views or create default views from spec
234        let view_index = self.views.unwrap_or_else(|| {
235            let mut index = ViewIndex::new();
236
237            if let Some(ref spec) = self.spec {
238                for entity_name in spec.bytecode.entities.keys() {
239                    index.add_spec(ViewSpec {
240                        id: format!("{}/list", entity_name),
241                        export: entity_name.clone(),
242                        mode: Mode::List,
243                        projection: Projection::all(),
244                        filters: Filters::all(),
245                        delivery: Delivery::default(),
246                    });
247
248                    index.add_spec(ViewSpec {
249                        id: format!("{}/state", entity_name),
250                        export: entity_name.clone(),
251                        mode: Mode::State,
252                        projection: Projection::all(),
253                        filters: Filters::all(),
254                        delivery: Delivery::default(),
255                    });
256
257                    index.add_spec(ViewSpec {
258                        id: format!("{}/append", entity_name),
259                        export: entity_name.clone(),
260                        mode: Mode::Append,
261                        projection: Projection::all(),
262                        filters: Filters::all(),
263                        delivery: Delivery::default(),
264                    });
265                }
266            }
267
268            index
269        });
270
271        // Create runtime
272        #[cfg(feature = "otel")]
273        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
274        #[cfg(not(feature = "otel"))]
275        let mut runtime = Runtime::new(self.config, view_index);
276
277        // Add spec if provided
278        if let Some(spec) = self.spec {
279            runtime = runtime.with_spec(spec);
280        }
281
282        // Run the server
283        runtime.run().await
284    }
285
286    /// Build the runtime without starting it
287    pub fn build(self) -> Result<Runtime> {
288        // Use provided views or create default views from spec
289        let view_index = self.views.unwrap_or_else(|| {
290            let mut index = ViewIndex::new();
291
292            if let Some(ref spec) = self.spec {
293                for entity_name in spec.bytecode.entities.keys() {
294                    index.add_spec(ViewSpec {
295                        id: format!("{}/list", entity_name),
296                        export: entity_name.clone(),
297                        mode: Mode::List,
298                        projection: Projection::all(),
299                        filters: Filters::all(),
300                        delivery: Delivery::default(),
301                    });
302
303                    index.add_spec(ViewSpec {
304                        id: format!("{}/state", entity_name),
305                        export: entity_name.clone(),
306                        mode: Mode::State,
307                        projection: Projection::all(),
308                        filters: Filters::all(),
309                        delivery: Delivery::default(),
310                    });
311
312                    index.add_spec(ViewSpec {
313                        id: format!("{}/append", entity_name),
314                        export: entity_name.clone(),
315                        mode: Mode::Append,
316                        projection: Projection::all(),
317                        filters: Filters::all(),
318                        delivery: Delivery::default(),
319                    });
320                }
321            }
322
323            index
324        });
325
326        #[cfg(feature = "otel")]
327        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
328        #[cfg(not(feature = "otel"))]
329        let mut runtime = Runtime::new(self.config, view_index);
330
331        if let Some(spec) = self.spec {
332            runtime = runtime.with_spec(spec);
333        }
334        Ok(runtime)
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341
342    #[test]
343    fn test_builder_pattern() {
344        let _builder = Server::builder()
345            .websocket()
346            .bind("[::]:8877".parse::<SocketAddr>().unwrap());
347    }
348
349    #[test]
350    fn test_spec_creation() {
351        let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
352        let spec = Spec::new(bytecode, "test_program");
353        assert_eq!(spec.program_id, "test_program");
354    }
355}