1pub mod bus;
34pub mod config;
35pub mod health;
36pub mod http_health;
37#[cfg(feature = "otel")]
38pub mod metrics;
39pub mod projector;
40pub mod runtime;
41pub mod view;
42pub mod websocket;
43
44pub use bus::{BusManager, BusMessage};
45pub use config::{
46 HealthConfig, HttpHealthConfig, ServerConfig, WebSocketConfig, YellowstoneConfig,
47};
48pub use health::{HealthMonitor, StreamStatus};
49pub use http_health::HttpHealthServer;
50#[cfg(feature = "otel")]
51pub use metrics::Metrics;
52pub use projector::Projector;
53pub use runtime::Runtime;
54pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
55pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
56
57use anyhow::Result;
58use std::net::SocketAddr;
59use std::sync::Arc;
60
61pub type ParserSetupFn = Arc<
64 dyn Fn(
65 tokio::sync::mpsc::Sender<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
66 Option<HealthMonitor>,
67 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
68 + Send
69 + Sync,
70>;
71
72pub struct Spec {
75 pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
76 pub program_id: String,
77 pub parser_setup: Option<ParserSetupFn>,
78}
79
80impl Spec {
81 pub fn new(
82 bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
83 program_id: impl Into<String>,
84 ) -> Self {
85 Self {
86 bytecode,
87 program_id: program_id.into(),
88 parser_setup: None,
89 }
90 }
91
92 pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
94 self.parser_setup = Some(setup_fn);
95 self
96 }
97}
98
99pub struct Server;
101
102impl Server {
103 pub fn builder() -> ServerBuilder {
105 ServerBuilder::new()
106 }
107}
108
109pub struct ServerBuilder {
111 spec: Option<Spec>,
112 views: Option<ViewIndex>,
113 config: ServerConfig,
114 #[cfg(feature = "otel")]
115 metrics: Option<Arc<Metrics>>,
116}
117
118impl ServerBuilder {
119 fn new() -> Self {
120 Self {
121 spec: None,
122 views: None,
123 config: ServerConfig::new(),
124 #[cfg(feature = "otel")]
125 metrics: None,
126 }
127 }
128
129 pub fn spec(mut self, spec: Spec) -> Self {
131 self.spec = Some(spec);
132 self
133 }
134
135 pub fn views(mut self, views: ViewIndex) -> Self {
137 self.views = Some(views);
138 self
139 }
140
141 #[cfg(feature = "otel")]
143 pub fn metrics(mut self, metrics: Metrics) -> Self {
144 self.metrics = Some(Arc::new(metrics));
145 self
146 }
147
148 pub fn websocket(mut self) -> Self {
150 self.config.websocket = Some(WebSocketConfig::default());
151 self
152 }
153
154 pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
156 self.config.websocket = Some(config);
157 self
158 }
159
160 pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
162 if let Some(ws_config) = &mut self.config.websocket {
163 ws_config.bind_address = addr.into();
164 } else {
165 self.config.websocket = Some(WebSocketConfig::new(addr.into()));
166 }
167 self
168 }
169
170 pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
172 self.config.yellowstone = Some(config);
173 self
174 }
175
176 pub fn health_monitoring(mut self) -> Self {
178 self.config.health = Some(HealthConfig::default());
179 self
180 }
181
182 pub fn health_config(mut self, config: HealthConfig) -> Self {
184 self.config.health = Some(config);
185 self
186 }
187
188 pub fn http_health(mut self) -> Self {
190 self.config.http_health = Some(HttpHealthConfig::default());
191 self
192 }
193
194 pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
196 self.config.http_health = Some(config);
197 self
198 }
199
200 pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
202 if let Some(http_config) = &mut self.config.http_health {
203 http_config.bind_address = addr.into();
204 } else {
205 self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
206 }
207 self
208 }
209
210 pub async fn start(self) -> Result<()> {
212 let view_index = self.views.unwrap_or_else(|| {
214 let mut index = ViewIndex::new();
215
216 if let Some(ref spec) = self.spec {
218 for entity_name in spec.bytecode.entities.keys() {
219 index.add_spec(ViewSpec {
221 id: format!("{}/kv", entity_name),
222 export: entity_name.clone(),
223 mode: Mode::Kv,
224 projection: Projection::all(),
225 filters: Filters::all(),
226 delivery: Delivery::default(),
227 });
228
229 index.add_spec(ViewSpec {
231 id: format!("{}/list", entity_name),
232 export: entity_name.clone(),
233 mode: Mode::List,
234 projection: Projection::all(),
235 filters: Filters::all(),
236 delivery: Delivery::default(),
237 });
238
239 index.add_spec(ViewSpec {
241 id: format!("{}/state", entity_name),
242 export: entity_name.clone(),
243 mode: Mode::State,
244 projection: Projection::all(),
245 filters: Filters::all(),
246 delivery: Delivery::default(),
247 });
248
249 index.add_spec(ViewSpec {
251 id: format!("{}/append", entity_name),
252 export: entity_name.clone(),
253 mode: Mode::Append,
254 projection: Projection::all(),
255 filters: Filters::all(),
256 delivery: Delivery::default(),
257 });
258
259 tracing::info!("Registered views for entity: {}", entity_name);
260 }
261 }
262
263 index
264 });
265
266 #[cfg(feature = "otel")]
268 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
269 #[cfg(not(feature = "otel"))]
270 let mut runtime = Runtime::new(self.config, view_index);
271
272 if let Some(spec) = self.spec {
274 runtime = runtime.with_spec(spec);
275 }
276
277 runtime.run().await
279 }
280
281 pub fn build(self) -> Result<Runtime> {
283 let view_index = self.views.unwrap_or_else(|| {
285 let mut index = ViewIndex::new();
286
287 if let Some(ref spec) = self.spec {
289 for entity_name in spec.bytecode.entities.keys() {
290 index.add_spec(ViewSpec {
292 id: format!("{}/kv", entity_name),
293 export: entity_name.clone(),
294 mode: Mode::Kv,
295 projection: Projection::all(),
296 filters: Filters::all(),
297 delivery: Delivery::default(),
298 });
299
300 index.add_spec(ViewSpec {
302 id: format!("{}/list", entity_name),
303 export: entity_name.clone(),
304 mode: Mode::List,
305 projection: Projection::all(),
306 filters: Filters::all(),
307 delivery: Delivery::default(),
308 });
309
310 index.add_spec(ViewSpec {
312 id: format!("{}/state", entity_name),
313 export: entity_name.clone(),
314 mode: Mode::State,
315 projection: Projection::all(),
316 filters: Filters::all(),
317 delivery: Delivery::default(),
318 });
319
320 index.add_spec(ViewSpec {
322 id: format!("{}/append", entity_name),
323 export: entity_name.clone(),
324 mode: Mode::Append,
325 projection: Projection::all(),
326 filters: Filters::all(),
327 delivery: Delivery::default(),
328 });
329 }
330 }
331
332 index
333 });
334
335 #[cfg(feature = "otel")]
336 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
337 #[cfg(not(feature = "otel"))]
338 let mut runtime = Runtime::new(self.config, view_index);
339
340 if let Some(spec) = self.spec {
341 runtime = runtime.with_spec(spec);
342 }
343 Ok(runtime)
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 #[test]
352 fn test_builder_pattern() {
353 let _builder = Server::builder()
354 .websocket()
355 .bind("[::]:8877".parse::<SocketAddr>().unwrap());
356 }
357
358 #[test]
359 fn test_spec_creation() {
360 let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
361 let spec = Spec::new(bytecode, "test_program");
362 assert_eq!(spec.program_id, "test_program");
363 }
364}