1pub mod bus;
34pub mod cache;
35pub mod compression;
36pub mod config;
37pub mod health;
38pub mod http_health;
39pub mod materialized_view;
40#[cfg(feature = "otel")]
41pub mod metrics;
42pub mod mutation_batch;
43pub mod projector;
44pub mod runtime;
45pub mod sorted_cache;
46pub mod telemetry;
47pub mod view;
48pub mod websocket;
49
50pub use bus::{BusManager, BusMessage};
51pub use cache::{EntityCache, EntityCacheConfig};
52pub use config::{
53 HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
54 YellowstoneConfig,
55};
56pub use health::{HealthMonitor, SlotTracker, StreamStatus};
57pub use http_health::HttpHealthServer;
58pub use hyperstack_auth::{AsyncVerifier, KeyLoader, Limits, TokenVerifier, VerifyingKey};
59pub use materialized_view::{MaterializedView, MaterializedViewRegistry, ViewEffect};
60#[cfg(feature = "otel")]
61pub use metrics::Metrics;
62pub use mutation_batch::{EventContext, MutationBatch, SlotContext};
63pub use projector::Projector;
64pub use runtime::Runtime;
65pub use telemetry::{init as init_telemetry, TelemetryConfig};
66#[cfg(feature = "otel")]
67pub use telemetry::{init_with_otel, TelemetryGuard};
68pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
69pub use websocket::{
70 AllowAllAuthPlugin, AuthContext, AuthDecision, AuthDeny, AuthErrorDetails, ChannelUsageEmitter,
71 ClientInfo, ClientManager, ConnectionAuthRequest, ErrorResponse, Frame, HttpUsageEmitter, Mode,
72 RateLimitConfig, RateLimitResult, RateLimiterConfig, RefreshAuthRequest, RefreshAuthResponse,
73 RetryPolicy, SignedSessionAuthPlugin, SocketIssueMessage, StaticTokenAuthPlugin, Subscription,
74 WebSocketAuthPlugin, WebSocketRateLimiter, WebSocketServer, WebSocketUsageBatch,
75 WebSocketUsageEmitter, WebSocketUsageEnvelope, WebSocketUsageEvent,
76};
77
78use anyhow::Result;
79use hyperstack_interpreter::ast::ViewDef;
80use std::net::SocketAddr;
81use std::sync::Arc;
82
83pub type ParserSetupFn = Arc<
85 dyn Fn(
86 tokio::sync::mpsc::Sender<MutationBatch>,
87 Option<HealthMonitor>,
88 ReconnectionConfig,
89 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
90 + Send
91 + Sync,
92>;
93
94pub struct Spec {
97 pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
98 pub program_ids: Vec<String>,
99 pub parser_setup: Option<ParserSetupFn>,
100 pub views: Vec<ViewDef>,
101}
102
103impl Spec {
104 pub fn new(
105 bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
106 program_id: impl Into<String>,
107 ) -> Self {
108 Self {
109 bytecode,
110 program_ids: vec![program_id.into()],
111 parser_setup: None,
112 views: Vec::new(),
113 }
114 }
115
116 pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
117 self.parser_setup = Some(setup_fn);
118 self
119 }
120
121 pub fn with_views(mut self, views: Vec<ViewDef>) -> Self {
122 self.views = views;
123 self
124 }
125}
126
127pub struct Server;
129
130impl Server {
131 pub fn builder() -> ServerBuilder {
133 ServerBuilder::new()
134 }
135}
136
137pub struct ServerBuilder {
139 spec: Option<Spec>,
140 views: Option<ViewIndex>,
141 materialized_views: Option<MaterializedViewRegistry>,
142 config: ServerConfig,
143 websocket_auth_plugin: Option<Arc<dyn WebSocketAuthPlugin>>,
144 websocket_usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
145 websocket_max_clients: Option<usize>,
146 websocket_rate_limit_config: Option<crate::websocket::client_manager::RateLimitConfig>,
147 #[cfg(feature = "otel")]
148 metrics: Option<Arc<Metrics>>,
149}
150
151impl ServerBuilder {
152 fn new() -> Self {
153 Self {
154 spec: None,
155 views: None,
156 materialized_views: None,
157 config: ServerConfig::new(),
158 websocket_auth_plugin: None,
159 websocket_usage_emitter: None,
160 websocket_max_clients: None,
161 websocket_rate_limit_config: None,
162 #[cfg(feature = "otel")]
163 metrics: None,
164 }
165 }
166
167 pub fn spec(mut self, spec: Spec) -> Self {
169 self.spec = Some(spec);
170 self
171 }
172
173 pub fn views(mut self, views: ViewIndex) -> Self {
175 self.views = Some(views);
176 self
177 }
178
179 #[cfg(feature = "otel")]
181 pub fn metrics(mut self, metrics: Metrics) -> Self {
182 self.metrics = Some(Arc::new(metrics));
183 self
184 }
185
186 pub fn websocket(mut self) -> Self {
188 self.config.websocket = Some(WebSocketConfig::default());
189 self
190 }
191
192 pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
194 self.config.websocket = Some(config);
195 self
196 }
197
198 pub fn websocket_auth_plugin(mut self, plugin: Arc<dyn WebSocketAuthPlugin>) -> Self {
200 self.websocket_auth_plugin = Some(plugin);
201 self
202 }
203
204 pub fn websocket_usage_emitter(mut self, emitter: Arc<dyn WebSocketUsageEmitter>) -> Self {
206 self.websocket_usage_emitter = Some(emitter);
207 self
208 }
209
210 pub fn websocket_max_clients(mut self, max_clients: usize) -> Self {
212 self.websocket_max_clients = Some(max_clients);
213 self
214 }
215
216 pub fn websocket_rate_limit_config(
222 mut self,
223 config: crate::websocket::client_manager::RateLimitConfig,
224 ) -> Self {
225 self.websocket_rate_limit_config = Some(config);
226 self
227 }
228
229 pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
231 if let Some(ws_config) = &mut self.config.websocket {
232 ws_config.bind_address = addr.into();
233 } else {
234 self.config.websocket = Some(WebSocketConfig::new(addr.into()));
235 }
236 self
237 }
238
239 pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
241 self.config.yellowstone = Some(config);
242 self
243 }
244
245 pub fn health_monitoring(mut self) -> Self {
247 self.config.health = Some(HealthConfig::default());
248 self
249 }
250
251 pub fn health_config(mut self, config: HealthConfig) -> Self {
253 self.config.health = Some(config);
254 self
255 }
256
257 pub fn reconnection(mut self) -> Self {
259 self.config.reconnection = Some(ReconnectionConfig::default());
260 self
261 }
262
263 pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
265 self.config.reconnection = Some(config);
266 self
267 }
268
269 pub fn http_health(mut self) -> Self {
271 self.config.http_health = Some(HttpHealthConfig::default());
272 self
273 }
274
275 pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
277 self.config.http_health = Some(config);
278 self
279 }
280
281 pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
283 if let Some(http_config) = &mut self.config.http_health {
284 http_config.bind_address = addr.into();
285 } else {
286 self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
287 }
288 self
289 }
290
291 pub async fn start(self) -> Result<()> {
292 let (view_index, materialized_registry) =
293 Self::build_view_index_and_registry(self.views, self.materialized_views, &self.spec);
294
295 #[cfg(feature = "otel")]
296 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
297 #[cfg(not(feature = "otel"))]
298 let mut runtime = Runtime::new(self.config, view_index);
299
300 if let Some(plugin) = self.websocket_auth_plugin {
301 runtime = runtime.with_websocket_auth_plugin(plugin);
302 }
303
304 if let Some(emitter) = self.websocket_usage_emitter {
305 runtime = runtime.with_websocket_usage_emitter(emitter);
306 }
307
308 if let Some(max_clients) = self.websocket_max_clients {
309 runtime = runtime.with_websocket_max_clients(max_clients);
310 }
311
312 if let Some(rate_limit_config) = self.websocket_rate_limit_config {
313 runtime = runtime.with_websocket_rate_limit_config(rate_limit_config);
314 }
315
316 if let Some(registry) = materialized_registry {
317 runtime = runtime.with_materialized_views(registry);
318 }
319
320 if let Some(spec) = self.spec {
321 runtime = runtime.with_spec(spec);
322 }
323
324 runtime.run().await
325 }
326
327 fn build_view_index_and_registry(
328 views: Option<ViewIndex>,
329 materialized_views: Option<MaterializedViewRegistry>,
330 spec: &Option<Spec>,
331 ) -> (ViewIndex, Option<MaterializedViewRegistry>) {
332 let mut index = views.unwrap_or_default();
333 let mut registry = materialized_views;
334
335 if let Some(ref spec) = spec {
336 for entity_name in spec.bytecode.entities.keys() {
337 index.add_spec(ViewSpec {
338 id: format!("{}/list", entity_name),
339 export: entity_name.clone(),
340 mode: Mode::List,
341 projection: Projection::all(),
342 filters: Filters::all(),
343 delivery: Delivery::default(),
344 pipeline: None,
345 source_view: None,
346 });
347
348 index.add_spec(ViewSpec {
349 id: format!("{}/state", entity_name),
350 export: entity_name.clone(),
351 mode: Mode::State,
352 projection: Projection::all(),
353 filters: Filters::all(),
354 delivery: Delivery::default(),
355 pipeline: None,
356 source_view: None,
357 });
358
359 index.add_spec(ViewSpec {
360 id: format!("{}/append", entity_name),
361 export: entity_name.clone(),
362 mode: Mode::Append,
363 projection: Projection::all(),
364 filters: Filters::all(),
365 delivery: Delivery::default(),
366 pipeline: None,
367 source_view: None,
368 });
369 }
370
371 if !spec.views.is_empty() {
372 let reg = registry.get_or_insert_with(MaterializedViewRegistry::new);
373
374 for view_def in &spec.views {
375 let export = match &view_def.source {
376 hyperstack_interpreter::ast::ViewSource::Entity { name } => name.clone(),
377 hyperstack_interpreter::ast::ViewSource::View { id } => {
378 id.split('/').next().unwrap_or(id).to_string()
379 }
380 };
381
382 let view_spec = ViewSpec::from_view_def(view_def, &export);
383 let pipeline = view_spec.pipeline.clone().unwrap_or_default();
384 let source_id = view_spec.source_view.clone().unwrap_or_default();
385 tracing::debug!(
386 view_id = %view_def.id,
387 source = %source_id,
388 "Registering derived view"
389 );
390
391 index.add_spec(view_spec);
392
393 let materialized =
394 MaterializedView::new(view_def.id.clone(), source_id, pipeline);
395 reg.register(materialized);
396 }
397 }
398 }
399
400 (index, registry)
401 }
402
403 pub fn build(self) -> Result<Runtime> {
404 let (view_index, materialized_registry) =
405 Self::build_view_index_and_registry(self.views, self.materialized_views, &self.spec);
406
407 #[cfg(feature = "otel")]
408 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
409 #[cfg(not(feature = "otel"))]
410 let mut runtime = Runtime::new(self.config, view_index);
411
412 if let Some(plugin) = self.websocket_auth_plugin {
413 runtime = runtime.with_websocket_auth_plugin(plugin);
414 }
415
416 if let Some(max_clients) = self.websocket_max_clients {
417 runtime = runtime.with_websocket_max_clients(max_clients);
418 }
419
420 if let Some(registry) = materialized_registry {
421 runtime = runtime.with_materialized_views(registry);
422 }
423
424 if let Some(spec) = self.spec {
425 runtime = runtime.with_spec(spec);
426 }
427 Ok(runtime)
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434
435 #[test]
436 fn test_builder_pattern() {
437 let _builder = Server::builder()
438 .websocket()
439 .bind("[::]:8877".parse::<SocketAddr>().unwrap());
440 }
441
442 #[test]
443 fn test_spec_creation() {
444 let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
445 let spec = Spec::new(bytecode, "test_program");
446 assert_eq!(
447 spec.program_ids.first().map(String::as_str),
448 Some("test_program")
449 );
450 }
451}