hyperstack_server/
lib.rs

1//! # hyperstack-server
2//!
3//! WebSocket server and projection handlers for HyperStack streaming pipelines.
4//!
5//! This crate provides a builder API for creating HyperStack servers that:
6//!
7//! - Process Solana blockchain data via Yellowstone gRPC
8//! - Transform data using the HyperStack VM
9//! - Stream entity updates over WebSockets to connected clients
10//! - Support multiple streaming modes (State, List, Append)
11//!
12//! ## Quick Start
13//!
14//! ```rust,ignore
15//! use hyperstack_server::{Server, Spec};
16//!
17//! #[tokio::main]
18//! async fn main() -> anyhow::Result<()> {
19//!     Server::builder()
20//!         .spec(my_spec())
21//!         .websocket()
22//!         .bind("[::]:8877".parse()?)
23//!         .health_monitoring()
24//!         .start()
25//!         .await
26//! }
27//! ```
28//!
29//! ## Feature Flags
30//!
31//! - `otel` - OpenTelemetry integration for metrics and distributed tracing
32
33pub mod bus;
34pub mod cache;
35pub mod compression;
36pub mod config;
37pub mod health;
38pub mod http_health;
39pub mod materialized_view;
40#[cfg(feature = "otel")]
41pub mod metrics;
42pub mod mutation_batch;
43pub mod projector;
44pub mod runtime;
45pub mod sorted_cache;
46pub mod telemetry;
47pub mod view;
48pub mod websocket;
49
50pub use bus::{BusManager, BusMessage};
51pub use cache::{EntityCache, EntityCacheConfig};
52pub use config::{
53    HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
54    YellowstoneConfig,
55};
56pub use health::{HealthMonitor, SlotTracker, StreamStatus};
57pub use http_health::HttpHealthServer;
58pub use materialized_view::{MaterializedView, MaterializedViewRegistry, ViewEffect};
59#[cfg(feature = "otel")]
60pub use metrics::Metrics;
61pub use mutation_batch::{MutationBatch, SlotContext};
62pub use projector::Projector;
63pub use runtime::Runtime;
64pub use telemetry::{init as init_telemetry, TelemetryConfig};
65#[cfg(feature = "otel")]
66pub use telemetry::{init_with_otel, TelemetryGuard};
67pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
68pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
69
70use anyhow::Result;
71use hyperstack_interpreter::ast::ViewDef;
72use std::net::SocketAddr;
73use std::sync::Arc;
74
75/// Type alias for a parser setup function.
76pub type ParserSetupFn = Arc<
77    dyn Fn(
78            tokio::sync::mpsc::Sender<MutationBatch>,
79            Option<HealthMonitor>,
80            ReconnectionConfig,
81        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
82        + Send
83        + Sync,
84>;
85
86/// Specification for a HyperStack server
87/// Contains bytecode, parsers, and program information
88pub struct Spec {
89    pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
90    pub program_id: String,
91    pub parser_setup: Option<ParserSetupFn>,
92    pub views: Vec<ViewDef>,
93}
94
95impl Spec {
96    pub fn new(
97        bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
98        program_id: impl Into<String>,
99    ) -> Self {
100        Self {
101            bytecode,
102            program_id: program_id.into(),
103            parser_setup: None,
104            views: Vec::new(),
105        }
106    }
107
108    pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
109        self.parser_setup = Some(setup_fn);
110        self
111    }
112
113    pub fn with_views(mut self, views: Vec<ViewDef>) -> Self {
114        self.views = views;
115        self
116    }
117}
118
119/// Main server interface with fluent builder API
120pub struct Server;
121
122impl Server {
123    /// Create a new server builder
124    pub fn builder() -> ServerBuilder {
125        ServerBuilder::new()
126    }
127}
128
129/// Builder for configuring and creating a HyperStack server
130pub struct ServerBuilder {
131    spec: Option<Spec>,
132    views: Option<ViewIndex>,
133    materialized_views: Option<MaterializedViewRegistry>,
134    config: ServerConfig,
135    #[cfg(feature = "otel")]
136    metrics: Option<Arc<Metrics>>,
137}
138
139impl ServerBuilder {
140    fn new() -> Self {
141        Self {
142            spec: None,
143            views: None,
144            materialized_views: None,
145            config: ServerConfig::new(),
146            #[cfg(feature = "otel")]
147            metrics: None,
148        }
149    }
150
151    /// Set the specification (bytecode, parsers, program_id)
152    pub fn spec(mut self, spec: Spec) -> Self {
153        self.spec = Some(spec);
154        self
155    }
156
157    /// Set custom view index
158    pub fn views(mut self, views: ViewIndex) -> Self {
159        self.views = Some(views);
160        self
161    }
162
163    /// Enable metrics collection (requires 'otel' feature)
164    #[cfg(feature = "otel")]
165    pub fn metrics(mut self, metrics: Metrics) -> Self {
166        self.metrics = Some(Arc::new(metrics));
167        self
168    }
169
170    /// Enable WebSocket server with default configuration
171    pub fn websocket(mut self) -> Self {
172        self.config.websocket = Some(WebSocketConfig::default());
173        self
174    }
175
176    /// Configure WebSocket server
177    pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
178        self.config.websocket = Some(config);
179        self
180    }
181
182    /// Set the bind address for WebSocket server
183    pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
184        if let Some(ws_config) = &mut self.config.websocket {
185            ws_config.bind_address = addr.into();
186        } else {
187            self.config.websocket = Some(WebSocketConfig::new(addr.into()));
188        }
189        self
190    }
191
192    /// Configure Yellowstone gRPC connection
193    pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
194        self.config.yellowstone = Some(config);
195        self
196    }
197
198    /// Enable health monitoring with default configuration
199    pub fn health_monitoring(mut self) -> Self {
200        self.config.health = Some(HealthConfig::default());
201        self
202    }
203
204    /// Configure health monitoring
205    pub fn health_config(mut self, config: HealthConfig) -> Self {
206        self.config.health = Some(config);
207        self
208    }
209
210    /// Enable reconnection with default configuration
211    pub fn reconnection(mut self) -> Self {
212        self.config.reconnection = Some(ReconnectionConfig::default());
213        self
214    }
215
216    /// Configure reconnection behavior
217    pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
218        self.config.reconnection = Some(config);
219        self
220    }
221
222    /// Enable HTTP health server with default configuration (port 8081)
223    pub fn http_health(mut self) -> Self {
224        self.config.http_health = Some(HttpHealthConfig::default());
225        self
226    }
227
228    /// Configure HTTP health server
229    pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
230        self.config.http_health = Some(config);
231        self
232    }
233
234    /// Set the bind address for HTTP health server
235    pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
236        if let Some(http_config) = &mut self.config.http_health {
237            http_config.bind_address = addr.into();
238        } else {
239            self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
240        }
241        self
242    }
243
244    pub async fn start(self) -> Result<()> {
245        let (view_index, materialized_registry) =
246            Self::build_view_index_and_registry(self.views, self.materialized_views, &self.spec);
247
248        #[cfg(feature = "otel")]
249        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
250        #[cfg(not(feature = "otel"))]
251        let mut runtime = Runtime::new(self.config, view_index);
252
253        if let Some(registry) = materialized_registry {
254            runtime = runtime.with_materialized_views(registry);
255        }
256
257        if let Some(spec) = self.spec {
258            runtime = runtime.with_spec(spec);
259        }
260
261        runtime.run().await
262    }
263
264    fn build_view_index_and_registry(
265        views: Option<ViewIndex>,
266        materialized_views: Option<MaterializedViewRegistry>,
267        spec: &Option<Spec>,
268    ) -> (ViewIndex, Option<MaterializedViewRegistry>) {
269        let mut index = views.unwrap_or_default();
270        let mut registry = materialized_views;
271
272        if let Some(ref spec) = spec {
273            for entity_name in spec.bytecode.entities.keys() {
274                index.add_spec(ViewSpec {
275                    id: format!("{}/list", entity_name),
276                    export: entity_name.clone(),
277                    mode: Mode::List,
278                    projection: Projection::all(),
279                    filters: Filters::all(),
280                    delivery: Delivery::default(),
281                    pipeline: None,
282                    source_view: None,
283                });
284
285                index.add_spec(ViewSpec {
286                    id: format!("{}/state", entity_name),
287                    export: entity_name.clone(),
288                    mode: Mode::State,
289                    projection: Projection::all(),
290                    filters: Filters::all(),
291                    delivery: Delivery::default(),
292                    pipeline: None,
293                    source_view: None,
294                });
295
296                index.add_spec(ViewSpec {
297                    id: format!("{}/append", entity_name),
298                    export: entity_name.clone(),
299                    mode: Mode::Append,
300                    projection: Projection::all(),
301                    filters: Filters::all(),
302                    delivery: Delivery::default(),
303                    pipeline: None,
304                    source_view: None,
305                });
306            }
307
308            if !spec.views.is_empty() {
309                let reg = registry.get_or_insert_with(MaterializedViewRegistry::new);
310
311                for view_def in &spec.views {
312                    let export = match &view_def.source {
313                        hyperstack_interpreter::ast::ViewSource::Entity { name } => name.clone(),
314                        hyperstack_interpreter::ast::ViewSource::View { id } => {
315                            id.split('/').next().unwrap_or(id).to_string()
316                        }
317                    };
318
319                    let view_spec = ViewSpec::from_view_def(view_def, &export);
320                    let pipeline = view_spec.pipeline.clone().unwrap_or_default();
321                    let source_id = view_spec.source_view.clone().unwrap_or_default();
322                    tracing::debug!(
323                        view_id = %view_def.id,
324                        source = %source_id,
325                        "Registering derived view"
326                    );
327
328                    index.add_spec(view_spec);
329
330                    let materialized =
331                        MaterializedView::new(view_def.id.clone(), source_id, pipeline);
332                    reg.register(materialized);
333                }
334            }
335        }
336
337        (index, registry)
338    }
339
340    pub fn build(self) -> Result<Runtime> {
341        let (view_index, materialized_registry) =
342            Self::build_view_index_and_registry(self.views, self.materialized_views, &self.spec);
343
344        #[cfg(feature = "otel")]
345        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
346        #[cfg(not(feature = "otel"))]
347        let mut runtime = Runtime::new(self.config, view_index);
348
349        if let Some(registry) = materialized_registry {
350            runtime = runtime.with_materialized_views(registry);
351        }
352
353        if let Some(spec) = self.spec {
354            runtime = runtime.with_spec(spec);
355        }
356        Ok(runtime)
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    #[test]
365    fn test_builder_pattern() {
366        let _builder = Server::builder()
367            .websocket()
368            .bind("[::]:8877".parse::<SocketAddr>().unwrap());
369    }
370
371    #[test]
372    fn test_spec_creation() {
373        let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
374        let spec = Spec::new(bytecode, "test_program");
375        assert_eq!(spec.program_id, "test_program");
376    }
377}