1pub mod bus;
34pub mod cache;
35pub mod compression;
36pub mod config;
37pub mod health;
38pub mod http_health;
39#[cfg(feature = "otel")]
40pub mod metrics;
41pub mod mutation_batch;
42pub mod projector;
43pub mod runtime;
44pub mod telemetry;
45pub mod view;
46pub mod websocket;
47
48pub use bus::{BusManager, BusMessage};
49pub use cache::{EntityCache, EntityCacheConfig};
50pub use config::{
51 HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
52 YellowstoneConfig,
53};
54pub use health::{HealthMonitor, SlotTracker, StreamStatus};
55pub use http_health::HttpHealthServer;
56#[cfg(feature = "otel")]
57pub use metrics::Metrics;
58pub use mutation_batch::MutationBatch;
59pub use projector::Projector;
60pub use runtime::Runtime;
61pub use telemetry::{init as init_telemetry, TelemetryConfig};
62#[cfg(feature = "otel")]
63pub use telemetry::{init_with_otel, TelemetryGuard};
64pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
65pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
66
67use anyhow::Result;
68use std::net::SocketAddr;
69use std::sync::Arc;
70
71pub type ParserSetupFn = Arc<
73 dyn Fn(
74 tokio::sync::mpsc::Sender<MutationBatch>,
75 Option<HealthMonitor>,
76 ReconnectionConfig,
77 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
78 + Send
79 + Sync,
80>;
81
82pub struct Spec {
85 pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
86 pub program_id: String,
87 pub parser_setup: Option<ParserSetupFn>,
88}
89
90impl Spec {
91 pub fn new(
92 bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
93 program_id: impl Into<String>,
94 ) -> Self {
95 Self {
96 bytecode,
97 program_id: program_id.into(),
98 parser_setup: None,
99 }
100 }
101
102 pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
104 self.parser_setup = Some(setup_fn);
105 self
106 }
107}
108
109pub struct Server;
111
112impl Server {
113 pub fn builder() -> ServerBuilder {
115 ServerBuilder::new()
116 }
117}
118
119pub struct ServerBuilder {
121 spec: Option<Spec>,
122 views: Option<ViewIndex>,
123 config: ServerConfig,
124 #[cfg(feature = "otel")]
125 metrics: Option<Arc<Metrics>>,
126}
127
128impl ServerBuilder {
129 fn new() -> Self {
130 Self {
131 spec: None,
132 views: None,
133 config: ServerConfig::new(),
134 #[cfg(feature = "otel")]
135 metrics: None,
136 }
137 }
138
139 pub fn spec(mut self, spec: Spec) -> Self {
141 self.spec = Some(spec);
142 self
143 }
144
145 pub fn views(mut self, views: ViewIndex) -> Self {
147 self.views = Some(views);
148 self
149 }
150
151 #[cfg(feature = "otel")]
153 pub fn metrics(mut self, metrics: Metrics) -> Self {
154 self.metrics = Some(Arc::new(metrics));
155 self
156 }
157
158 pub fn websocket(mut self) -> Self {
160 self.config.websocket = Some(WebSocketConfig::default());
161 self
162 }
163
164 pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
166 self.config.websocket = Some(config);
167 self
168 }
169
170 pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
172 if let Some(ws_config) = &mut self.config.websocket {
173 ws_config.bind_address = addr.into();
174 } else {
175 self.config.websocket = Some(WebSocketConfig::new(addr.into()));
176 }
177 self
178 }
179
180 pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
182 self.config.yellowstone = Some(config);
183 self
184 }
185
186 pub fn health_monitoring(mut self) -> Self {
188 self.config.health = Some(HealthConfig::default());
189 self
190 }
191
192 pub fn health_config(mut self, config: HealthConfig) -> Self {
194 self.config.health = Some(config);
195 self
196 }
197
198 pub fn reconnection(mut self) -> Self {
200 self.config.reconnection = Some(ReconnectionConfig::default());
201 self
202 }
203
204 pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
206 self.config.reconnection = Some(config);
207 self
208 }
209
210 pub fn http_health(mut self) -> Self {
212 self.config.http_health = Some(HttpHealthConfig::default());
213 self
214 }
215
216 pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
218 self.config.http_health = Some(config);
219 self
220 }
221
222 pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
224 if let Some(http_config) = &mut self.config.http_health {
225 http_config.bind_address = addr.into();
226 } else {
227 self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
228 }
229 self
230 }
231
232 pub async fn start(self) -> Result<()> {
234 let view_index = self.views.unwrap_or_else(|| {
236 let mut index = ViewIndex::new();
237
238 if let Some(ref spec) = self.spec {
239 for entity_name in spec.bytecode.entities.keys() {
240 index.add_spec(ViewSpec {
241 id: format!("{}/list", entity_name),
242 export: entity_name.clone(),
243 mode: Mode::List,
244 projection: Projection::all(),
245 filters: Filters::all(),
246 delivery: Delivery::default(),
247 });
248
249 index.add_spec(ViewSpec {
250 id: format!("{}/state", entity_name),
251 export: entity_name.clone(),
252 mode: Mode::State,
253 projection: Projection::all(),
254 filters: Filters::all(),
255 delivery: Delivery::default(),
256 });
257
258 index.add_spec(ViewSpec {
259 id: format!("{}/append", entity_name),
260 export: entity_name.clone(),
261 mode: Mode::Append,
262 projection: Projection::all(),
263 filters: Filters::all(),
264 delivery: Delivery::default(),
265 });
266 }
267 }
268
269 index
270 });
271
272 #[cfg(feature = "otel")]
274 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
275 #[cfg(not(feature = "otel"))]
276 let mut runtime = Runtime::new(self.config, view_index);
277
278 if let Some(spec) = self.spec {
280 runtime = runtime.with_spec(spec);
281 }
282
283 runtime.run().await
285 }
286
287 pub fn build(self) -> Result<Runtime> {
289 let view_index = self.views.unwrap_or_else(|| {
291 let mut index = ViewIndex::new();
292
293 if let Some(ref spec) = self.spec {
294 for entity_name in spec.bytecode.entities.keys() {
295 index.add_spec(ViewSpec {
296 id: format!("{}/list", entity_name),
297 export: entity_name.clone(),
298 mode: Mode::List,
299 projection: Projection::all(),
300 filters: Filters::all(),
301 delivery: Delivery::default(),
302 });
303
304 index.add_spec(ViewSpec {
305 id: format!("{}/state", entity_name),
306 export: entity_name.clone(),
307 mode: Mode::State,
308 projection: Projection::all(),
309 filters: Filters::all(),
310 delivery: Delivery::default(),
311 });
312
313 index.add_spec(ViewSpec {
314 id: format!("{}/append", entity_name),
315 export: entity_name.clone(),
316 mode: Mode::Append,
317 projection: Projection::all(),
318 filters: Filters::all(),
319 delivery: Delivery::default(),
320 });
321 }
322 }
323
324 index
325 });
326
327 #[cfg(feature = "otel")]
328 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
329 #[cfg(not(feature = "otel"))]
330 let mut runtime = Runtime::new(self.config, view_index);
331
332 if let Some(spec) = self.spec {
333 runtime = runtime.with_spec(spec);
334 }
335 Ok(runtime)
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[test]
344 fn test_builder_pattern() {
345 let _builder = Server::builder()
346 .websocket()
347 .bind("[::]:8877".parse::<SocketAddr>().unwrap());
348 }
349
350 #[test]
351 fn test_spec_creation() {
352 let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
353 let spec = Spec::new(bytecode, "test_program");
354 assert_eq!(spec.program_id, "test_program");
355 }
356}