1pub mod bus;
34pub mod cache;
35pub mod config;
36pub mod health;
37pub mod http_health;
38#[cfg(feature = "otel")]
39pub mod metrics;
40pub mod mutation_batch;
41pub mod projector;
42pub mod runtime;
43pub mod telemetry;
44pub mod view;
45pub mod websocket;
46
47pub use bus::{BusManager, BusMessage};
48pub use cache::{EntityCache, EntityCacheConfig};
49pub use config::{
50 HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
51 YellowstoneConfig,
52};
53pub use health::{HealthMonitor, SlotTracker, StreamStatus};
54pub use http_health::HttpHealthServer;
55#[cfg(feature = "otel")]
56pub use metrics::Metrics;
57pub use mutation_batch::MutationBatch;
58pub use projector::Projector;
59pub use runtime::Runtime;
60pub use telemetry::{init as init_telemetry, TelemetryConfig};
61#[cfg(feature = "otel")]
62pub use telemetry::{init_with_otel, TelemetryGuard};
63pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
64pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
65
66use anyhow::Result;
67use std::net::SocketAddr;
68use std::sync::Arc;
69
70pub type ParserSetupFn = Arc<
72 dyn Fn(
73 tokio::sync::mpsc::Sender<MutationBatch>,
74 Option<HealthMonitor>,
75 ReconnectionConfig,
76 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
77 + Send
78 + Sync,
79>;
80
81pub struct Spec {
84 pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
85 pub program_id: String,
86 pub parser_setup: Option<ParserSetupFn>,
87}
88
89impl Spec {
90 pub fn new(
91 bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
92 program_id: impl Into<String>,
93 ) -> Self {
94 Self {
95 bytecode,
96 program_id: program_id.into(),
97 parser_setup: None,
98 }
99 }
100
101 pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
103 self.parser_setup = Some(setup_fn);
104 self
105 }
106}
107
108pub struct Server;
110
111impl Server {
112 pub fn builder() -> ServerBuilder {
114 ServerBuilder::new()
115 }
116}
117
118pub struct ServerBuilder {
120 spec: Option<Spec>,
121 views: Option<ViewIndex>,
122 config: ServerConfig,
123 #[cfg(feature = "otel")]
124 metrics: Option<Arc<Metrics>>,
125}
126
127impl ServerBuilder {
128 fn new() -> Self {
129 Self {
130 spec: None,
131 views: None,
132 config: ServerConfig::new(),
133 #[cfg(feature = "otel")]
134 metrics: None,
135 }
136 }
137
138 pub fn spec(mut self, spec: Spec) -> Self {
140 self.spec = Some(spec);
141 self
142 }
143
144 pub fn views(mut self, views: ViewIndex) -> Self {
146 self.views = Some(views);
147 self
148 }
149
150 #[cfg(feature = "otel")]
152 pub fn metrics(mut self, metrics: Metrics) -> Self {
153 self.metrics = Some(Arc::new(metrics));
154 self
155 }
156
157 pub fn websocket(mut self) -> Self {
159 self.config.websocket = Some(WebSocketConfig::default());
160 self
161 }
162
163 pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
165 self.config.websocket = Some(config);
166 self
167 }
168
169 pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
171 if let Some(ws_config) = &mut self.config.websocket {
172 ws_config.bind_address = addr.into();
173 } else {
174 self.config.websocket = Some(WebSocketConfig::new(addr.into()));
175 }
176 self
177 }
178
179 pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
181 self.config.yellowstone = Some(config);
182 self
183 }
184
185 pub fn health_monitoring(mut self) -> Self {
187 self.config.health = Some(HealthConfig::default());
188 self
189 }
190
191 pub fn health_config(mut self, config: HealthConfig) -> Self {
193 self.config.health = Some(config);
194 self
195 }
196
197 pub fn reconnection(mut self) -> Self {
199 self.config.reconnection = Some(ReconnectionConfig::default());
200 self
201 }
202
203 pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
205 self.config.reconnection = Some(config);
206 self
207 }
208
209 pub fn http_health(mut self) -> Self {
211 self.config.http_health = Some(HttpHealthConfig::default());
212 self
213 }
214
215 pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
217 self.config.http_health = Some(config);
218 self
219 }
220
221 pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
223 if let Some(http_config) = &mut self.config.http_health {
224 http_config.bind_address = addr.into();
225 } else {
226 self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
227 }
228 self
229 }
230
231 pub async fn start(self) -> Result<()> {
233 let view_index = self.views.unwrap_or_else(|| {
235 let mut index = ViewIndex::new();
236
237 if let Some(ref spec) = self.spec {
238 for entity_name in spec.bytecode.entities.keys() {
239 index.add_spec(ViewSpec {
240 id: format!("{}/list", entity_name),
241 export: entity_name.clone(),
242 mode: Mode::List,
243 projection: Projection::all(),
244 filters: Filters::all(),
245 delivery: Delivery::default(),
246 });
247
248 index.add_spec(ViewSpec {
249 id: format!("{}/state", entity_name),
250 export: entity_name.clone(),
251 mode: Mode::State,
252 projection: Projection::all(),
253 filters: Filters::all(),
254 delivery: Delivery::default(),
255 });
256
257 index.add_spec(ViewSpec {
258 id: format!("{}/append", entity_name),
259 export: entity_name.clone(),
260 mode: Mode::Append,
261 projection: Projection::all(),
262 filters: Filters::all(),
263 delivery: Delivery::default(),
264 });
265 }
266 }
267
268 index
269 });
270
271 #[cfg(feature = "otel")]
273 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
274 #[cfg(not(feature = "otel"))]
275 let mut runtime = Runtime::new(self.config, view_index);
276
277 if let Some(spec) = self.spec {
279 runtime = runtime.with_spec(spec);
280 }
281
282 runtime.run().await
284 }
285
286 pub fn build(self) -> Result<Runtime> {
288 let view_index = self.views.unwrap_or_else(|| {
290 let mut index = ViewIndex::new();
291
292 if let Some(ref spec) = self.spec {
293 for entity_name in spec.bytecode.entities.keys() {
294 index.add_spec(ViewSpec {
295 id: format!("{}/list", entity_name),
296 export: entity_name.clone(),
297 mode: Mode::List,
298 projection: Projection::all(),
299 filters: Filters::all(),
300 delivery: Delivery::default(),
301 });
302
303 index.add_spec(ViewSpec {
304 id: format!("{}/state", entity_name),
305 export: entity_name.clone(),
306 mode: Mode::State,
307 projection: Projection::all(),
308 filters: Filters::all(),
309 delivery: Delivery::default(),
310 });
311
312 index.add_spec(ViewSpec {
313 id: format!("{}/append", entity_name),
314 export: entity_name.clone(),
315 mode: Mode::Append,
316 projection: Projection::all(),
317 filters: Filters::all(),
318 delivery: Delivery::default(),
319 });
320 }
321 }
322
323 index
324 });
325
326 #[cfg(feature = "otel")]
327 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
328 #[cfg(not(feature = "otel"))]
329 let mut runtime = Runtime::new(self.config, view_index);
330
331 if let Some(spec) = self.spec {
332 runtime = runtime.with_spec(spec);
333 }
334 Ok(runtime)
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341
342 #[test]
343 fn test_builder_pattern() {
344 let _builder = Server::builder()
345 .websocket()
346 .bind("[::]:8877".parse::<SocketAddr>().unwrap());
347 }
348
349 #[test]
350 fn test_spec_creation() {
351 let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
352 let spec = Spec::new(bytecode, "test_program");
353 assert_eq!(spec.program_id, "test_program");
354 }
355}