1pub mod bus;
34pub mod cache;
35pub mod compression;
36pub mod config;
37pub mod health;
38pub mod http_health;
39pub mod materialized_view;
40#[cfg(feature = "otel")]
41pub mod metrics;
42pub mod mutation_batch;
43pub mod projector;
44pub mod runtime;
45pub mod sorted_cache;
46pub mod telemetry;
47pub mod view;
48pub mod websocket;
49
50pub use bus::{BusManager, BusMessage};
51pub use cache::{EntityCache, EntityCacheConfig};
52pub use config::{
53 HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
54 YellowstoneConfig,
55};
56pub use health::{HealthMonitor, SlotTracker, StreamStatus};
57pub use http_health::HttpHealthServer;
58pub use materialized_view::{MaterializedView, MaterializedViewRegistry, ViewEffect};
59#[cfg(feature = "otel")]
60pub use metrics::Metrics;
61pub use mutation_batch::{MutationBatch, SlotContext};
62pub use projector::Projector;
63pub use runtime::Runtime;
64pub use telemetry::{init as init_telemetry, TelemetryConfig};
65#[cfg(feature = "otel")]
66pub use telemetry::{init_with_otel, TelemetryGuard};
67pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
68pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
69
70use anyhow::Result;
71use hyperstack_interpreter::ast::ViewDef;
72use std::net::SocketAddr;
73use std::sync::Arc;
74
75pub type ParserSetupFn = Arc<
77 dyn Fn(
78 tokio::sync::mpsc::Sender<MutationBatch>,
79 Option<HealthMonitor>,
80 ReconnectionConfig,
81 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
82 + Send
83 + Sync,
84>;
85
86pub struct Spec {
89 pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
90 pub program_id: String,
91 pub parser_setup: Option<ParserSetupFn>,
92 pub views: Vec<ViewDef>,
93}
94
95impl Spec {
96 pub fn new(
97 bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
98 program_id: impl Into<String>,
99 ) -> Self {
100 Self {
101 bytecode,
102 program_id: program_id.into(),
103 parser_setup: None,
104 views: Vec::new(),
105 }
106 }
107
108 pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
109 self.parser_setup = Some(setup_fn);
110 self
111 }
112
113 pub fn with_views(mut self, views: Vec<ViewDef>) -> Self {
114 self.views = views;
115 self
116 }
117}
118
119pub struct Server;
121
122impl Server {
123 pub fn builder() -> ServerBuilder {
125 ServerBuilder::new()
126 }
127}
128
129pub struct ServerBuilder {
131 spec: Option<Spec>,
132 views: Option<ViewIndex>,
133 materialized_views: Option<MaterializedViewRegistry>,
134 config: ServerConfig,
135 #[cfg(feature = "otel")]
136 metrics: Option<Arc<Metrics>>,
137}
138
139impl ServerBuilder {
140 fn new() -> Self {
141 Self {
142 spec: None,
143 views: None,
144 materialized_views: None,
145 config: ServerConfig::new(),
146 #[cfg(feature = "otel")]
147 metrics: None,
148 }
149 }
150
151 pub fn spec(mut self, spec: Spec) -> Self {
153 self.spec = Some(spec);
154 self
155 }
156
157 pub fn views(mut self, views: ViewIndex) -> Self {
159 self.views = Some(views);
160 self
161 }
162
163 #[cfg(feature = "otel")]
165 pub fn metrics(mut self, metrics: Metrics) -> Self {
166 self.metrics = Some(Arc::new(metrics));
167 self
168 }
169
170 pub fn websocket(mut self) -> Self {
172 self.config.websocket = Some(WebSocketConfig::default());
173 self
174 }
175
176 pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
178 self.config.websocket = Some(config);
179 self
180 }
181
182 pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
184 if let Some(ws_config) = &mut self.config.websocket {
185 ws_config.bind_address = addr.into();
186 } else {
187 self.config.websocket = Some(WebSocketConfig::new(addr.into()));
188 }
189 self
190 }
191
192 pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
194 self.config.yellowstone = Some(config);
195 self
196 }
197
198 pub fn health_monitoring(mut self) -> Self {
200 self.config.health = Some(HealthConfig::default());
201 self
202 }
203
204 pub fn health_config(mut self, config: HealthConfig) -> Self {
206 self.config.health = Some(config);
207 self
208 }
209
210 pub fn reconnection(mut self) -> Self {
212 self.config.reconnection = Some(ReconnectionConfig::default());
213 self
214 }
215
216 pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
218 self.config.reconnection = Some(config);
219 self
220 }
221
222 pub fn http_health(mut self) -> Self {
224 self.config.http_health = Some(HttpHealthConfig::default());
225 self
226 }
227
228 pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
230 self.config.http_health = Some(config);
231 self
232 }
233
234 pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
236 if let Some(http_config) = &mut self.config.http_health {
237 http_config.bind_address = addr.into();
238 } else {
239 self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
240 }
241 self
242 }
243
244 pub async fn start(self) -> Result<()> {
245 let (view_index, materialized_registry) =
246 Self::build_view_index_and_registry(self.views, self.materialized_views, &self.spec);
247
248 #[cfg(feature = "otel")]
249 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
250 #[cfg(not(feature = "otel"))]
251 let mut runtime = Runtime::new(self.config, view_index);
252
253 if let Some(registry) = materialized_registry {
254 runtime = runtime.with_materialized_views(registry);
255 }
256
257 if let Some(spec) = self.spec {
258 runtime = runtime.with_spec(spec);
259 }
260
261 runtime.run().await
262 }
263
264 fn build_view_index_and_registry(
265 views: Option<ViewIndex>,
266 materialized_views: Option<MaterializedViewRegistry>,
267 spec: &Option<Spec>,
268 ) -> (ViewIndex, Option<MaterializedViewRegistry>) {
269 let mut index = views.unwrap_or_default();
270 let mut registry = materialized_views;
271
272 if let Some(ref spec) = spec {
273 for entity_name in spec.bytecode.entities.keys() {
274 index.add_spec(ViewSpec {
275 id: format!("{}/list", entity_name),
276 export: entity_name.clone(),
277 mode: Mode::List,
278 projection: Projection::all(),
279 filters: Filters::all(),
280 delivery: Delivery::default(),
281 pipeline: None,
282 source_view: None,
283 });
284
285 index.add_spec(ViewSpec {
286 id: format!("{}/state", entity_name),
287 export: entity_name.clone(),
288 mode: Mode::State,
289 projection: Projection::all(),
290 filters: Filters::all(),
291 delivery: Delivery::default(),
292 pipeline: None,
293 source_view: None,
294 });
295
296 index.add_spec(ViewSpec {
297 id: format!("{}/append", entity_name),
298 export: entity_name.clone(),
299 mode: Mode::Append,
300 projection: Projection::all(),
301 filters: Filters::all(),
302 delivery: Delivery::default(),
303 pipeline: None,
304 source_view: None,
305 });
306 }
307
308 if !spec.views.is_empty() {
309 let reg = registry.get_or_insert_with(MaterializedViewRegistry::new);
310
311 for view_def in &spec.views {
312 let export = match &view_def.source {
313 hyperstack_interpreter::ast::ViewSource::Entity { name } => name.clone(),
314 hyperstack_interpreter::ast::ViewSource::View { id } => {
315 id.split('/').next().unwrap_or(id).to_string()
316 }
317 };
318
319 let view_spec = ViewSpec::from_view_def(view_def, &export);
320 let pipeline = view_spec.pipeline.clone().unwrap_or_default();
321 let source_id = view_spec.source_view.clone().unwrap_or_default();
322 tracing::debug!(
323 view_id = %view_def.id,
324 source = %source_id,
325 "Registering derived view"
326 );
327
328 index.add_spec(view_spec);
329
330 let materialized =
331 MaterializedView::new(view_def.id.clone(), source_id, pipeline);
332 reg.register(materialized);
333 }
334 }
335 }
336
337 (index, registry)
338 }
339
340 pub fn build(self) -> Result<Runtime> {
341 let (view_index, materialized_registry) =
342 Self::build_view_index_and_registry(self.views, self.materialized_views, &self.spec);
343
344 #[cfg(feature = "otel")]
345 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
346 #[cfg(not(feature = "otel"))]
347 let mut runtime = Runtime::new(self.config, view_index);
348
349 if let Some(registry) = materialized_registry {
350 runtime = runtime.with_materialized_views(registry);
351 }
352
353 if let Some(spec) = self.spec {
354 runtime = runtime.with_spec(spec);
355 }
356 Ok(runtime)
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 #[test]
365 fn test_builder_pattern() {
366 let _builder = Server::builder()
367 .websocket()
368 .bind("[::]:8877".parse::<SocketAddr>().unwrap());
369 }
370
371 #[test]
372 fn test_spec_creation() {
373 let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
374 let spec = Spec::new(bytecode, "test_program");
375 assert_eq!(spec.program_id, "test_program");
376 }
377}