Skip to main content

hyperstack_server/
lib.rs

1//! # hyperstack-server
2//!
3//! WebSocket server and projection handlers for HyperStack streaming pipelines.
4//!
5//! This crate provides a builder API for creating HyperStack servers that:
6//!
7//! - Process Solana blockchain data via Yellowstone gRPC
8//! - Transform data using the HyperStack VM
9//! - Stream entity updates over WebSockets to connected clients
10//! - Support multiple streaming modes (State, List, Append)
11//!
12//! ## Quick Start
13//!
14//! ```rust,ignore
15//! use hyperstack_server::{Server, Spec};
16//!
17//! #[tokio::main]
18//! async fn main() -> anyhow::Result<()> {
19//!     Server::builder()
20//!         .spec(my_spec())
21//!         .websocket()
22//!         .bind("[::]:8877".parse()?)
23//!         .health_monitoring()
24//!         .start()
25//!         .await
26//! }
27//! ```
28//!
29//! ## Feature Flags
30//!
31//! - `otel` - OpenTelemetry integration for metrics and distributed tracing
32
33pub mod bus;
34pub mod cache;
35pub mod compression;
36pub mod config;
37pub mod health;
38pub mod http_health;
39pub mod materialized_view;
40#[cfg(feature = "otel")]
41pub mod metrics;
42pub mod mutation_batch;
43pub mod projector;
44pub mod runtime;
45pub mod sorted_cache;
46pub mod telemetry;
47pub mod view;
48pub mod websocket;
49
50pub use bus::{BusManager, BusMessage};
51pub use cache::{EntityCache, EntityCacheConfig};
52pub use config::{
53    HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
54    YellowstoneConfig,
55};
56pub use health::{HealthMonitor, SlotTracker, StreamStatus};
57pub use http_health::HttpHealthServer;
58pub use hyperstack_auth::{AsyncVerifier, KeyLoader, Limits, TokenVerifier, VerifyingKey};
59pub use materialized_view::{MaterializedView, MaterializedViewRegistry, ViewEffect};
60#[cfg(feature = "otel")]
61pub use metrics::Metrics;
62pub use mutation_batch::{EventContext, MutationBatch, SlotContext};
63pub use projector::Projector;
64pub use runtime::Runtime;
65pub use telemetry::{init as init_telemetry, TelemetryConfig};
66#[cfg(feature = "otel")]
67pub use telemetry::{init_with_otel, TelemetryGuard};
68pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
69pub use websocket::{
70    AllowAllAuthPlugin, AuthContext, AuthDecision, AuthDeny, AuthErrorDetails, ChannelUsageEmitter,
71    ClientInfo, ClientManager, ConnectionAuthRequest, ErrorResponse, Frame, HttpUsageEmitter, Mode,
72    RateLimitConfig, RateLimitResult, RateLimiterConfig, RefreshAuthRequest, RefreshAuthResponse,
73    RetryPolicy, SignedSessionAuthPlugin, SocketIssueMessage, StaticTokenAuthPlugin, Subscription,
74    WebSocketAuthPlugin, WebSocketRateLimiter, WebSocketServer, WebSocketUsageBatch,
75    WebSocketUsageEmitter, WebSocketUsageEnvelope, WebSocketUsageEvent,
76};
77
78use anyhow::Result;
79use hyperstack_interpreter::ast::ViewDef;
80use std::net::SocketAddr;
81use std::sync::Arc;
82
83/// Type alias for a parser setup function.
84pub type ParserSetupFn = Arc<
85    dyn Fn(
86            tokio::sync::mpsc::Sender<MutationBatch>,
87            Option<HealthMonitor>,
88            ReconnectionConfig,
89        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
90        + Send
91        + Sync,
92>;
93
94/// Specification for a HyperStack server
95/// Contains bytecode, parsers, and program information
96pub struct Spec {
97    pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
98    pub program_ids: Vec<String>,
99    pub parser_setup: Option<ParserSetupFn>,
100    pub views: Vec<ViewDef>,
101}
102
103impl Spec {
104    pub fn new(
105        bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
106        program_id: impl Into<String>,
107    ) -> Self {
108        Self {
109            bytecode,
110            program_ids: vec![program_id.into()],
111            parser_setup: None,
112            views: Vec::new(),
113        }
114    }
115
116    pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
117        self.parser_setup = Some(setup_fn);
118        self
119    }
120
121    pub fn with_views(mut self, views: Vec<ViewDef>) -> Self {
122        self.views = views;
123        self
124    }
125}
126
127/// Main server interface with fluent builder API
128pub struct Server;
129
130impl Server {
131    /// Create a new server builder
132    pub fn builder() -> ServerBuilder {
133        ServerBuilder::new()
134    }
135}
136
137/// Builder for configuring and creating a HyperStack server
138pub struct ServerBuilder {
139    spec: Option<Spec>,
140    views: Option<ViewIndex>,
141    materialized_views: Option<MaterializedViewRegistry>,
142    config: ServerConfig,
143    websocket_auth_plugin: Option<Arc<dyn WebSocketAuthPlugin>>,
144    websocket_usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
145    websocket_max_clients: Option<usize>,
146    websocket_rate_limit_config: Option<crate::websocket::client_manager::RateLimitConfig>,
147    #[cfg(feature = "otel")]
148    metrics: Option<Arc<Metrics>>,
149}
150
151impl ServerBuilder {
152    fn new() -> Self {
153        Self {
154            spec: None,
155            views: None,
156            materialized_views: None,
157            config: ServerConfig::new(),
158            websocket_auth_plugin: None,
159            websocket_usage_emitter: None,
160            websocket_max_clients: None,
161            websocket_rate_limit_config: None,
162            #[cfg(feature = "otel")]
163            metrics: None,
164        }
165    }
166
167    /// Set the specification (bytecode, parsers, program_ids)
168    pub fn spec(mut self, spec: Spec) -> Self {
169        self.spec = Some(spec);
170        self
171    }
172
173    /// Set custom view index
174    pub fn views(mut self, views: ViewIndex) -> Self {
175        self.views = Some(views);
176        self
177    }
178
179    /// Enable metrics collection (requires 'otel' feature)
180    #[cfg(feature = "otel")]
181    pub fn metrics(mut self, metrics: Metrics) -> Self {
182        self.metrics = Some(Arc::new(metrics));
183        self
184    }
185
186    /// Enable WebSocket server with default configuration
187    pub fn websocket(mut self) -> Self {
188        self.config.websocket = Some(WebSocketConfig::default());
189        self
190    }
191
192    /// Configure WebSocket server
193    pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
194        self.config.websocket = Some(config);
195        self
196    }
197
198    /// Set a WebSocket auth plugin used to authorize inbound connections.
199    pub fn websocket_auth_plugin(mut self, plugin: Arc<dyn WebSocketAuthPlugin>) -> Self {
200        self.websocket_auth_plugin = Some(plugin);
201        self
202    }
203
204    /// Set an async usage emitter for billing-grade websocket usage events.
205    pub fn websocket_usage_emitter(mut self, emitter: Arc<dyn WebSocketUsageEmitter>) -> Self {
206        self.websocket_usage_emitter = Some(emitter);
207        self
208    }
209
210    /// Set the maximum number of concurrent WebSocket clients.
211    pub fn websocket_max_clients(mut self, max_clients: usize) -> Self {
212        self.websocket_max_clients = Some(max_clients);
213        self
214    }
215
216    /// Configure rate limiting for WebSocket connections.
217    ///
218    /// This sets global rate limits such as maximum connections per IP,
219    /// timeouts, and rate windows. Per-subject limits are controlled
220    /// via AuthContext.Limits from the authentication token.
221    pub fn websocket_rate_limit_config(
222        mut self,
223        config: crate::websocket::client_manager::RateLimitConfig,
224    ) -> Self {
225        self.websocket_rate_limit_config = Some(config);
226        self
227    }
228
229    /// Set the bind address for WebSocket server
230    pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
231        if let Some(ws_config) = &mut self.config.websocket {
232            ws_config.bind_address = addr.into();
233        } else {
234            self.config.websocket = Some(WebSocketConfig::new(addr.into()));
235        }
236        self
237    }
238
239    /// Configure Yellowstone gRPC connection
240    pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
241        self.config.yellowstone = Some(config);
242        self
243    }
244
245    /// Enable health monitoring with default configuration
246    pub fn health_monitoring(mut self) -> Self {
247        self.config.health = Some(HealthConfig::default());
248        self
249    }
250
251    /// Configure health monitoring
252    pub fn health_config(mut self, config: HealthConfig) -> Self {
253        self.config.health = Some(config);
254        self
255    }
256
257    /// Enable reconnection with default configuration
258    pub fn reconnection(mut self) -> Self {
259        self.config.reconnection = Some(ReconnectionConfig::default());
260        self
261    }
262
263    /// Configure reconnection behavior
264    pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
265        self.config.reconnection = Some(config);
266        self
267    }
268
269    /// Enable HTTP health server with default configuration (port 8081)
270    pub fn http_health(mut self) -> Self {
271        self.config.http_health = Some(HttpHealthConfig::default());
272        self
273    }
274
275    /// Configure HTTP health server
276    pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
277        self.config.http_health = Some(config);
278        self
279    }
280
281    /// Set the bind address for HTTP health server
282    pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
283        if let Some(http_config) = &mut self.config.http_health {
284            http_config.bind_address = addr.into();
285        } else {
286            self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
287        }
288        self
289    }
290
291    pub async fn start(self) -> Result<()> {
292        let (view_index, materialized_registry) =
293            Self::build_view_index_and_registry(self.views, self.materialized_views, &self.spec);
294
295        #[cfg(feature = "otel")]
296        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
297        #[cfg(not(feature = "otel"))]
298        let mut runtime = Runtime::new(self.config, view_index);
299
300        if let Some(plugin) = self.websocket_auth_plugin {
301            runtime = runtime.with_websocket_auth_plugin(plugin);
302        }
303
304        if let Some(emitter) = self.websocket_usage_emitter {
305            runtime = runtime.with_websocket_usage_emitter(emitter);
306        }
307
308        if let Some(max_clients) = self.websocket_max_clients {
309            runtime = runtime.with_websocket_max_clients(max_clients);
310        }
311
312        if let Some(rate_limit_config) = self.websocket_rate_limit_config {
313            runtime = runtime.with_websocket_rate_limit_config(rate_limit_config);
314        }
315
316        if let Some(registry) = materialized_registry {
317            runtime = runtime.with_materialized_views(registry);
318        }
319
320        if let Some(spec) = self.spec {
321            runtime = runtime.with_spec(spec);
322        }
323
324        runtime.run().await
325    }
326
327    fn build_view_index_and_registry(
328        views: Option<ViewIndex>,
329        materialized_views: Option<MaterializedViewRegistry>,
330        spec: &Option<Spec>,
331    ) -> (ViewIndex, Option<MaterializedViewRegistry>) {
332        let mut index = views.unwrap_or_default();
333        let mut registry = materialized_views;
334
335        if let Some(ref spec) = spec {
336            for entity_name in spec.bytecode.entities.keys() {
337                index.add_spec(ViewSpec {
338                    id: format!("{}/list", entity_name),
339                    export: entity_name.clone(),
340                    mode: Mode::List,
341                    projection: Projection::all(),
342                    filters: Filters::all(),
343                    delivery: Delivery::default(),
344                    pipeline: None,
345                    source_view: None,
346                });
347
348                index.add_spec(ViewSpec {
349                    id: format!("{}/state", entity_name),
350                    export: entity_name.clone(),
351                    mode: Mode::State,
352                    projection: Projection::all(),
353                    filters: Filters::all(),
354                    delivery: Delivery::default(),
355                    pipeline: None,
356                    source_view: None,
357                });
358
359                index.add_spec(ViewSpec {
360                    id: format!("{}/append", entity_name),
361                    export: entity_name.clone(),
362                    mode: Mode::Append,
363                    projection: Projection::all(),
364                    filters: Filters::all(),
365                    delivery: Delivery::default(),
366                    pipeline: None,
367                    source_view: None,
368                });
369            }
370
371            if !spec.views.is_empty() {
372                let reg = registry.get_or_insert_with(MaterializedViewRegistry::new);
373
374                for view_def in &spec.views {
375                    let export = match &view_def.source {
376                        hyperstack_interpreter::ast::ViewSource::Entity { name } => name.clone(),
377                        hyperstack_interpreter::ast::ViewSource::View { id } => {
378                            id.split('/').next().unwrap_or(id).to_string()
379                        }
380                    };
381
382                    let view_spec = ViewSpec::from_view_def(view_def, &export);
383                    let pipeline = view_spec.pipeline.clone().unwrap_or_default();
384                    let source_id = view_spec.source_view.clone().unwrap_or_default();
385                    tracing::debug!(
386                        view_id = %view_def.id,
387                        source = %source_id,
388                        "Registering derived view"
389                    );
390
391                    index.add_spec(view_spec);
392
393                    let materialized =
394                        MaterializedView::new(view_def.id.clone(), source_id, pipeline);
395                    reg.register(materialized);
396                }
397            }
398        }
399
400        (index, registry)
401    }
402
403    pub fn build(self) -> Result<Runtime> {
404        let (view_index, materialized_registry) =
405            Self::build_view_index_and_registry(self.views, self.materialized_views, &self.spec);
406
407        #[cfg(feature = "otel")]
408        let mut runtime = Runtime::new(self.config, view_index, self.metrics);
409        #[cfg(not(feature = "otel"))]
410        let mut runtime = Runtime::new(self.config, view_index);
411
412        if let Some(plugin) = self.websocket_auth_plugin {
413            runtime = runtime.with_websocket_auth_plugin(plugin);
414        }
415
416        if let Some(max_clients) = self.websocket_max_clients {
417            runtime = runtime.with_websocket_max_clients(max_clients);
418        }
419
420        if let Some(registry) = materialized_registry {
421            runtime = runtime.with_materialized_views(registry);
422        }
423
424        if let Some(spec) = self.spec {
425            runtime = runtime.with_spec(spec);
426        }
427        Ok(runtime)
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    #[test]
436    fn test_builder_pattern() {
437        let _builder = Server::builder()
438            .websocket()
439            .bind("[::]:8877".parse::<SocketAddr>().unwrap());
440    }
441
442    #[test]
443    fn test_spec_creation() {
444        let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
445        let spec = Spec::new(bytecode, "test_program");
446        assert_eq!(
447            spec.program_ids.first().map(String::as_str),
448            Some("test_program")
449        );
450    }
451}