1pub mod bus;
34pub mod config;
35pub mod health;
36pub mod http_health;
37#[cfg(feature = "otel")]
38pub mod metrics;
39pub mod projector;
40pub mod runtime;
41pub mod view;
42pub mod websocket;
43
44pub use bus::{BusManager, BusMessage};
45pub use config::{HealthConfig, HttpHealthConfig, ServerConfig, WebSocketConfig, YellowstoneConfig};
46pub use health::{HealthMonitor, StreamStatus};
47pub use http_health::HttpHealthServer;
48#[cfg(feature = "otel")]
49pub use metrics::Metrics;
50pub use projector::Projector;
51pub use runtime::Runtime;
52pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
53pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
54
55use anyhow::Result;
56use std::net::SocketAddr;
57use std::sync::Arc;
58
59pub type ParserSetupFn = Arc<
62 dyn Fn(
63 tokio::sync::mpsc::Sender<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
64 Option<HealthMonitor>,
65 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
66 + Send
67 + Sync
68>;
69
70pub struct Spec {
73 pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
74 pub program_id: String,
75 pub parser_setup: Option<ParserSetupFn>,
76}
77
78impl Spec {
79 pub fn new(bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode, program_id: impl Into<String>) -> Self {
80 Self {
81 bytecode,
82 program_id: program_id.into(),
83 parser_setup: None,
84 }
85 }
86
87 pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
89 self.parser_setup = Some(setup_fn);
90 self
91 }
92}
93
94pub struct Server;
96
97impl Server {
98 pub fn builder() -> ServerBuilder {
100 ServerBuilder::new()
101 }
102}
103
104pub struct ServerBuilder {
106 spec: Option<Spec>,
107 views: Option<ViewIndex>,
108 config: ServerConfig,
109 #[cfg(feature = "otel")]
110 metrics: Option<Arc<Metrics>>,
111}
112
113impl ServerBuilder {
114 fn new() -> Self {
115 Self {
116 spec: None,
117 views: None,
118 config: ServerConfig::new(),
119 #[cfg(feature = "otel")]
120 metrics: None,
121 }
122 }
123
124 pub fn spec(mut self, spec: Spec) -> Self {
126 self.spec = Some(spec);
127 self
128 }
129
130 pub fn views(mut self, views: ViewIndex) -> Self {
132 self.views = Some(views);
133 self
134 }
135
136 #[cfg(feature = "otel")]
138 pub fn metrics(mut self, metrics: Metrics) -> Self {
139 self.metrics = Some(Arc::new(metrics));
140 self
141 }
142
143 pub fn websocket(mut self) -> Self {
145 self.config.websocket = Some(WebSocketConfig::default());
146 self
147 }
148
149 pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
151 self.config.websocket = Some(config);
152 self
153 }
154
155 pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
157 if let Some(ws_config) = &mut self.config.websocket {
158 ws_config.bind_address = addr.into();
159 } else {
160 self.config.websocket = Some(WebSocketConfig::new(addr.into()));
161 }
162 self
163 }
164
165 pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
167 self.config.yellowstone = Some(config);
168 self
169 }
170
171 pub fn health_monitoring(mut self) -> Self {
173 self.config.health = Some(HealthConfig::default());
174 self
175 }
176
177 pub fn health_config(mut self, config: HealthConfig) -> Self {
179 self.config.health = Some(config);
180 self
181 }
182
183 pub fn http_health(mut self) -> Self {
185 self.config.http_health = Some(HttpHealthConfig::default());
186 self
187 }
188
189 pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
191 self.config.http_health = Some(config);
192 self
193 }
194
195 pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
197 if let Some(http_config) = &mut self.config.http_health {
198 http_config.bind_address = addr.into();
199 } else {
200 self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
201 }
202 self
203 }
204
205 pub async fn start(self) -> Result<()> {
207 let view_index = self.views.unwrap_or_else(|| {
209 let mut index = ViewIndex::new();
210
211 if let Some(ref spec) = self.spec {
213 for entity_name in spec.bytecode.entities.keys() {
214 index.add_spec(ViewSpec {
216 id: format!("{}/kv", entity_name),
217 export: entity_name.clone(),
218 mode: Mode::Kv,
219 projection: Projection::all(),
220 filters: Filters::all(),
221 delivery: Delivery::default(),
222 });
223
224 index.add_spec(ViewSpec {
226 id: format!("{}/list", entity_name),
227 export: entity_name.clone(),
228 mode: Mode::List,
229 projection: Projection::all(),
230 filters: Filters::all(),
231 delivery: Delivery::default(),
232 });
233
234 index.add_spec(ViewSpec {
236 id: format!("{}/state", entity_name),
237 export: entity_name.clone(),
238 mode: Mode::State,
239 projection: Projection::all(),
240 filters: Filters::all(),
241 delivery: Delivery::default(),
242 });
243
244 index.add_spec(ViewSpec {
246 id: format!("{}/append", entity_name),
247 export: entity_name.clone(),
248 mode: Mode::Append,
249 projection: Projection::all(),
250 filters: Filters::all(),
251 delivery: Delivery::default(),
252 });
253
254 tracing::info!("Registered views for entity: {}", entity_name);
255 }
256 }
257
258 index
259 });
260
261 #[cfg(feature = "otel")]
263 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
264 #[cfg(not(feature = "otel"))]
265 let mut runtime = Runtime::new(self.config, view_index);
266
267 if let Some(spec) = self.spec {
269 runtime = runtime.with_spec(spec);
270 }
271
272 runtime.run().await
274 }
275
276 pub fn build(self) -> Result<Runtime> {
278 let view_index = self.views.unwrap_or_else(|| {
280 let mut index = ViewIndex::new();
281
282 if let Some(ref spec) = self.spec {
284 for entity_name in spec.bytecode.entities.keys() {
285 index.add_spec(ViewSpec {
287 id: format!("{}/kv", entity_name),
288 export: entity_name.clone(),
289 mode: Mode::Kv,
290 projection: Projection::all(),
291 filters: Filters::all(),
292 delivery: Delivery::default(),
293 });
294
295 index.add_spec(ViewSpec {
297 id: format!("{}/list", entity_name),
298 export: entity_name.clone(),
299 mode: Mode::List,
300 projection: Projection::all(),
301 filters: Filters::all(),
302 delivery: Delivery::default(),
303 });
304
305 index.add_spec(ViewSpec {
307 id: format!("{}/state", entity_name),
308 export: entity_name.clone(),
309 mode: Mode::State,
310 projection: Projection::all(),
311 filters: Filters::all(),
312 delivery: Delivery::default(),
313 });
314
315 index.add_spec(ViewSpec {
317 id: format!("{}/append", entity_name),
318 export: entity_name.clone(),
319 mode: Mode::Append,
320 projection: Projection::all(),
321 filters: Filters::all(),
322 delivery: Delivery::default(),
323 });
324 }
325 }
326
327 index
328 });
329
330 #[cfg(feature = "otel")]
331 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
332 #[cfg(not(feature = "otel"))]
333 let mut runtime = Runtime::new(self.config, view_index);
334
335 if let Some(spec) = self.spec {
336 runtime = runtime.with_spec(spec);
337 }
338 Ok(runtime)
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 #[test]
347 fn test_builder_pattern() {
348 let _builder = Server::builder()
349 .websocket()
350 .bind("[::]:8877".parse::<SocketAddr>().unwrap());
351 }
352
353 #[test]
354 fn test_spec_creation() {
355 let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
356 let spec = Spec::new(bytecode, "test_program");
357 assert_eq!(spec.program_id, "test_program");
358 }
359}