1pub mod bus;
34pub mod cache;
35pub mod config;
36pub mod health;
37pub mod http_health;
38#[cfg(feature = "otel")]
39pub mod metrics;
40pub mod projector;
41pub mod runtime;
42pub mod view;
43pub mod websocket;
44
45pub use bus::{BusManager, BusMessage};
46pub use cache::{EntityCache, EntityCacheConfig};
47pub use config::{
48 HealthConfig, HttpHealthConfig, ReconnectionConfig, ServerConfig, WebSocketConfig,
49 YellowstoneConfig,
50};
51pub use health::{HealthMonitor, SlotTracker, StreamStatus};
52pub use http_health::HttpHealthServer;
53#[cfg(feature = "otel")]
54pub use metrics::Metrics;
55pub use projector::Projector;
56pub use runtime::Runtime;
57pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
58pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
59
60use anyhow::Result;
61use std::net::SocketAddr;
62use std::sync::Arc;
63
64pub type ParserSetupFn = Arc<
67 dyn Fn(
68 tokio::sync::mpsc::Sender<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
69 Option<HealthMonitor>,
70 ReconnectionConfig,
71 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
72 + Send
73 + Sync,
74>;
75
76pub struct Spec {
79 pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
80 pub program_id: String,
81 pub parser_setup: Option<ParserSetupFn>,
82}
83
84impl Spec {
85 pub fn new(
86 bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
87 program_id: impl Into<String>,
88 ) -> Self {
89 Self {
90 bytecode,
91 program_id: program_id.into(),
92 parser_setup: None,
93 }
94 }
95
96 pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
98 self.parser_setup = Some(setup_fn);
99 self
100 }
101}
102
103pub struct Server;
105
106impl Server {
107 pub fn builder() -> ServerBuilder {
109 ServerBuilder::new()
110 }
111}
112
113pub struct ServerBuilder {
115 spec: Option<Spec>,
116 views: Option<ViewIndex>,
117 config: ServerConfig,
118 #[cfg(feature = "otel")]
119 metrics: Option<Arc<Metrics>>,
120}
121
122impl ServerBuilder {
123 fn new() -> Self {
124 Self {
125 spec: None,
126 views: None,
127 config: ServerConfig::new(),
128 #[cfg(feature = "otel")]
129 metrics: None,
130 }
131 }
132
133 pub fn spec(mut self, spec: Spec) -> Self {
135 self.spec = Some(spec);
136 self
137 }
138
139 pub fn views(mut self, views: ViewIndex) -> Self {
141 self.views = Some(views);
142 self
143 }
144
145 #[cfg(feature = "otel")]
147 pub fn metrics(mut self, metrics: Metrics) -> Self {
148 self.metrics = Some(Arc::new(metrics));
149 self
150 }
151
152 pub fn websocket(mut self) -> Self {
154 self.config.websocket = Some(WebSocketConfig::default());
155 self
156 }
157
158 pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
160 self.config.websocket = Some(config);
161 self
162 }
163
164 pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
166 if let Some(ws_config) = &mut self.config.websocket {
167 ws_config.bind_address = addr.into();
168 } else {
169 self.config.websocket = Some(WebSocketConfig::new(addr.into()));
170 }
171 self
172 }
173
174 pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
176 self.config.yellowstone = Some(config);
177 self
178 }
179
180 pub fn health_monitoring(mut self) -> Self {
182 self.config.health = Some(HealthConfig::default());
183 self
184 }
185
186 pub fn health_config(mut self, config: HealthConfig) -> Self {
188 self.config.health = Some(config);
189 self
190 }
191
192 pub fn reconnection(mut self) -> Self {
194 self.config.reconnection = Some(ReconnectionConfig::default());
195 self
196 }
197
198 pub fn reconnection_config(mut self, config: ReconnectionConfig) -> Self {
200 self.config.reconnection = Some(config);
201 self
202 }
203
204 pub fn http_health(mut self) -> Self {
206 self.config.http_health = Some(HttpHealthConfig::default());
207 self
208 }
209
210 pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
212 self.config.http_health = Some(config);
213 self
214 }
215
216 pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
218 if let Some(http_config) = &mut self.config.http_health {
219 http_config.bind_address = addr.into();
220 } else {
221 self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
222 }
223 self
224 }
225
226 pub async fn start(self) -> Result<()> {
228 let view_index = self.views.unwrap_or_else(|| {
230 let mut index = ViewIndex::new();
231
232 if let Some(ref spec) = self.spec {
233 for entity_name in spec.bytecode.entities.keys() {
234 index.add_spec(ViewSpec {
235 id: format!("{}/list", entity_name),
236 export: entity_name.clone(),
237 mode: Mode::List,
238 projection: Projection::all(),
239 filters: Filters::all(),
240 delivery: Delivery::default(),
241 });
242
243 index.add_spec(ViewSpec {
244 id: format!("{}/state", entity_name),
245 export: entity_name.clone(),
246 mode: Mode::State,
247 projection: Projection::all(),
248 filters: Filters::all(),
249 delivery: Delivery::default(),
250 });
251
252 index.add_spec(ViewSpec {
253 id: format!("{}/append", entity_name),
254 export: entity_name.clone(),
255 mode: Mode::Append,
256 projection: Projection::all(),
257 filters: Filters::all(),
258 delivery: Delivery::default(),
259 });
260
261 tracing::info!("Registered views for entity: {}", entity_name);
262 }
263 }
264
265 index
266 });
267
268 #[cfg(feature = "otel")]
270 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
271 #[cfg(not(feature = "otel"))]
272 let mut runtime = Runtime::new(self.config, view_index);
273
274 if let Some(spec) = self.spec {
276 runtime = runtime.with_spec(spec);
277 }
278
279 runtime.run().await
281 }
282
283 pub fn build(self) -> Result<Runtime> {
285 let view_index = self.views.unwrap_or_else(|| {
287 let mut index = ViewIndex::new();
288
289 if let Some(ref spec) = self.spec {
290 for entity_name in spec.bytecode.entities.keys() {
291 index.add_spec(ViewSpec {
292 id: format!("{}/list", entity_name),
293 export: entity_name.clone(),
294 mode: Mode::List,
295 projection: Projection::all(),
296 filters: Filters::all(),
297 delivery: Delivery::default(),
298 });
299
300 index.add_spec(ViewSpec {
301 id: format!("{}/state", entity_name),
302 export: entity_name.clone(),
303 mode: Mode::State,
304 projection: Projection::all(),
305 filters: Filters::all(),
306 delivery: Delivery::default(),
307 });
308
309 index.add_spec(ViewSpec {
310 id: format!("{}/append", entity_name),
311 export: entity_name.clone(),
312 mode: Mode::Append,
313 projection: Projection::all(),
314 filters: Filters::all(),
315 delivery: Delivery::default(),
316 });
317 }
318 }
319
320 index
321 });
322
323 #[cfg(feature = "otel")]
324 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
325 #[cfg(not(feature = "otel"))]
326 let mut runtime = Runtime::new(self.config, view_index);
327
328 if let Some(spec) = self.spec {
329 runtime = runtime.with_spec(spec);
330 }
331 Ok(runtime)
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn test_builder_pattern() {
341 let _builder = Server::builder()
342 .websocket()
343 .bind("[::]:8877".parse::<SocketAddr>().unwrap());
344 }
345
346 #[test]
347 fn test_spec_creation() {
348 let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
349 let spec = Spec::new(bytecode, "test_program");
350 assert_eq!(spec.program_id, "test_program");
351 }
352}