1pub mod bus;
34pub mod cache;
35pub mod config;
36pub mod health;
37pub mod http_health;
38#[cfg(feature = "otel")]
39pub mod metrics;
40pub mod projector;
41pub mod runtime;
42pub mod view;
43pub mod websocket;
44
45pub use bus::{BusManager, BusMessage};
46pub use cache::{EntityCache, EntityCacheConfig};
47pub use config::{
48 HealthConfig, HttpHealthConfig, ServerConfig, WebSocketConfig, YellowstoneConfig,
49};
50pub use health::{HealthMonitor, StreamStatus};
51pub use http_health::HttpHealthServer;
52#[cfg(feature = "otel")]
53pub use metrics::Metrics;
54pub use projector::Projector;
55pub use runtime::Runtime;
56pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
57pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
58
59use anyhow::Result;
60use std::net::SocketAddr;
61use std::sync::Arc;
62
63pub type ParserSetupFn = Arc<
66 dyn Fn(
67 tokio::sync::mpsc::Sender<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
68 Option<HealthMonitor>,
69 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
70 + Send
71 + Sync,
72>;
73
74pub struct Spec {
77 pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
78 pub program_id: String,
79 pub parser_setup: Option<ParserSetupFn>,
80}
81
82impl Spec {
83 pub fn new(
84 bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
85 program_id: impl Into<String>,
86 ) -> Self {
87 Self {
88 bytecode,
89 program_id: program_id.into(),
90 parser_setup: None,
91 }
92 }
93
94 pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
96 self.parser_setup = Some(setup_fn);
97 self
98 }
99}
100
101pub struct Server;
103
104impl Server {
105 pub fn builder() -> ServerBuilder {
107 ServerBuilder::new()
108 }
109}
110
111pub struct ServerBuilder {
113 spec: Option<Spec>,
114 views: Option<ViewIndex>,
115 config: ServerConfig,
116 #[cfg(feature = "otel")]
117 metrics: Option<Arc<Metrics>>,
118}
119
120impl ServerBuilder {
121 fn new() -> Self {
122 Self {
123 spec: None,
124 views: None,
125 config: ServerConfig::new(),
126 #[cfg(feature = "otel")]
127 metrics: None,
128 }
129 }
130
131 pub fn spec(mut self, spec: Spec) -> Self {
133 self.spec = Some(spec);
134 self
135 }
136
137 pub fn views(mut self, views: ViewIndex) -> Self {
139 self.views = Some(views);
140 self
141 }
142
143 #[cfg(feature = "otel")]
145 pub fn metrics(mut self, metrics: Metrics) -> Self {
146 self.metrics = Some(Arc::new(metrics));
147 self
148 }
149
150 pub fn websocket(mut self) -> Self {
152 self.config.websocket = Some(WebSocketConfig::default());
153 self
154 }
155
156 pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
158 self.config.websocket = Some(config);
159 self
160 }
161
162 pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
164 if let Some(ws_config) = &mut self.config.websocket {
165 ws_config.bind_address = addr.into();
166 } else {
167 self.config.websocket = Some(WebSocketConfig::new(addr.into()));
168 }
169 self
170 }
171
172 pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
174 self.config.yellowstone = Some(config);
175 self
176 }
177
178 pub fn health_monitoring(mut self) -> Self {
180 self.config.health = Some(HealthConfig::default());
181 self
182 }
183
184 pub fn health_config(mut self, config: HealthConfig) -> Self {
186 self.config.health = Some(config);
187 self
188 }
189
190 pub fn http_health(mut self) -> Self {
192 self.config.http_health = Some(HttpHealthConfig::default());
193 self
194 }
195
196 pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
198 self.config.http_health = Some(config);
199 self
200 }
201
202 pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
204 if let Some(http_config) = &mut self.config.http_health {
205 http_config.bind_address = addr.into();
206 } else {
207 self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
208 }
209 self
210 }
211
212 pub async fn start(self) -> Result<()> {
214 let view_index = self.views.unwrap_or_else(|| {
216 let mut index = ViewIndex::new();
217
218 if let Some(ref spec) = self.spec {
219 for entity_name in spec.bytecode.entities.keys() {
220 index.add_spec(ViewSpec {
221 id: format!("{}/list", entity_name),
222 export: entity_name.clone(),
223 mode: Mode::List,
224 projection: Projection::all(),
225 filters: Filters::all(),
226 delivery: Delivery::default(),
227 });
228
229 index.add_spec(ViewSpec {
230 id: format!("{}/state", entity_name),
231 export: entity_name.clone(),
232 mode: Mode::State,
233 projection: Projection::all(),
234 filters: Filters::all(),
235 delivery: Delivery::default(),
236 });
237
238 index.add_spec(ViewSpec {
239 id: format!("{}/append", entity_name),
240 export: entity_name.clone(),
241 mode: Mode::Append,
242 projection: Projection::all(),
243 filters: Filters::all(),
244 delivery: Delivery::default(),
245 });
246
247 tracing::info!("Registered views for entity: {}", entity_name);
248 }
249 }
250
251 index
252 });
253
254 #[cfg(feature = "otel")]
256 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
257 #[cfg(not(feature = "otel"))]
258 let mut runtime = Runtime::new(self.config, view_index);
259
260 if let Some(spec) = self.spec {
262 runtime = runtime.with_spec(spec);
263 }
264
265 runtime.run().await
267 }
268
269 pub fn build(self) -> Result<Runtime> {
271 let view_index = self.views.unwrap_or_else(|| {
273 let mut index = ViewIndex::new();
274
275 if let Some(ref spec) = self.spec {
276 for entity_name in spec.bytecode.entities.keys() {
277 index.add_spec(ViewSpec {
278 id: format!("{}/list", entity_name),
279 export: entity_name.clone(),
280 mode: Mode::List,
281 projection: Projection::all(),
282 filters: Filters::all(),
283 delivery: Delivery::default(),
284 });
285
286 index.add_spec(ViewSpec {
287 id: format!("{}/state", entity_name),
288 export: entity_name.clone(),
289 mode: Mode::State,
290 projection: Projection::all(),
291 filters: Filters::all(),
292 delivery: Delivery::default(),
293 });
294
295 index.add_spec(ViewSpec {
296 id: format!("{}/append", entity_name),
297 export: entity_name.clone(),
298 mode: Mode::Append,
299 projection: Projection::all(),
300 filters: Filters::all(),
301 delivery: Delivery::default(),
302 });
303 }
304 }
305
306 index
307 });
308
309 #[cfg(feature = "otel")]
310 let mut runtime = Runtime::new(self.config, view_index, self.metrics);
311 #[cfg(not(feature = "otel"))]
312 let mut runtime = Runtime::new(self.config, view_index);
313
314 if let Some(spec) = self.spec {
315 runtime = runtime.with_spec(spec);
316 }
317 Ok(runtime)
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324
325 #[test]
326 fn test_builder_pattern() {
327 let _builder = Server::builder()
328 .websocket()
329 .bind("[::]:8877".parse::<SocketAddr>().unwrap());
330 }
331
332 #[test]
333 fn test_spec_creation() {
334 let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
335 let spec = Spec::new(bytecode, "test_program");
336 assert_eq!(spec.program_id, "test_program");
337 }
338}