pub mod bus;
pub mod config;
pub mod health;
pub mod http_health;
#[cfg(feature = "otel")]
pub mod metrics;
pub mod projector;
pub mod runtime;
pub mod view;
pub mod websocket;
pub use bus::{BusManager, BusMessage};
pub use config::{
HealthConfig, HttpHealthConfig, ServerConfig, WebSocketConfig, YellowstoneConfig,
};
pub use health::{HealthMonitor, StreamStatus};
pub use http_health::HttpHealthServer;
#[cfg(feature = "otel")]
pub use metrics::Metrics;
pub use projector::Projector;
pub use runtime::Runtime;
pub use view::{Delivery, Filters, Projection, ViewIndex, ViewSpec};
pub use websocket::{ClientInfo, ClientManager, Frame, Mode, Subscription, WebSocketServer};
use anyhow::Result;
use std::net::SocketAddr;
use std::sync::Arc;
pub type ParserSetupFn = Arc<
dyn Fn(
tokio::sync::mpsc::Sender<smallvec::SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
Option<HealthMonitor>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
+ Send
+ Sync,
>;
pub struct Spec {
pub bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
pub program_id: String,
pub parser_setup: Option<ParserSetupFn>,
}
impl Spec {
pub fn new(
bytecode: hyperstack_interpreter::compiler::MultiEntityBytecode,
program_id: impl Into<String>,
) -> Self {
Self {
bytecode,
program_id: program_id.into(),
parser_setup: None,
}
}
pub fn with_parser_setup(mut self, setup_fn: ParserSetupFn) -> Self {
self.parser_setup = Some(setup_fn);
self
}
}
pub struct Server;
impl Server {
pub fn builder() -> ServerBuilder {
ServerBuilder::new()
}
}
pub struct ServerBuilder {
spec: Option<Spec>,
views: Option<ViewIndex>,
config: ServerConfig,
#[cfg(feature = "otel")]
metrics: Option<Arc<Metrics>>,
}
impl ServerBuilder {
fn new() -> Self {
Self {
spec: None,
views: None,
config: ServerConfig::new(),
#[cfg(feature = "otel")]
metrics: None,
}
}
pub fn spec(mut self, spec: Spec) -> Self {
self.spec = Some(spec);
self
}
pub fn views(mut self, views: ViewIndex) -> Self {
self.views = Some(views);
self
}
#[cfg(feature = "otel")]
pub fn metrics(mut self, metrics: Metrics) -> Self {
self.metrics = Some(Arc::new(metrics));
self
}
pub fn websocket(mut self) -> Self {
self.config.websocket = Some(WebSocketConfig::default());
self
}
pub fn websocket_config(mut self, config: WebSocketConfig) -> Self {
self.config.websocket = Some(config);
self
}
pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
if let Some(ws_config) = &mut self.config.websocket {
ws_config.bind_address = addr.into();
} else {
self.config.websocket = Some(WebSocketConfig::new(addr.into()));
}
self
}
pub fn yellowstone(mut self, config: YellowstoneConfig) -> Self {
self.config.yellowstone = Some(config);
self
}
pub fn health_monitoring(mut self) -> Self {
self.config.health = Some(HealthConfig::default());
self
}
pub fn health_config(mut self, config: HealthConfig) -> Self {
self.config.health = Some(config);
self
}
pub fn http_health(mut self) -> Self {
self.config.http_health = Some(HttpHealthConfig::default());
self
}
pub fn http_health_config(mut self, config: HttpHealthConfig) -> Self {
self.config.http_health = Some(config);
self
}
pub fn health_bind(mut self, addr: impl Into<SocketAddr>) -> Self {
if let Some(http_config) = &mut self.config.http_health {
http_config.bind_address = addr.into();
} else {
self.config.http_health = Some(HttpHealthConfig::new(addr.into()));
}
self
}
pub async fn start(self) -> Result<()> {
let view_index = self.views.unwrap_or_else(|| {
let mut index = ViewIndex::new();
if let Some(ref spec) = self.spec {
for entity_name in spec.bytecode.entities.keys() {
index.add_spec(ViewSpec {
id: format!("{}/kv", entity_name),
export: entity_name.clone(),
mode: Mode::Kv,
projection: Projection::all(),
filters: Filters::all(),
delivery: Delivery::default(),
});
index.add_spec(ViewSpec {
id: format!("{}/list", entity_name),
export: entity_name.clone(),
mode: Mode::List,
projection: Projection::all(),
filters: Filters::all(),
delivery: Delivery::default(),
});
index.add_spec(ViewSpec {
id: format!("{}/state", entity_name),
export: entity_name.clone(),
mode: Mode::State,
projection: Projection::all(),
filters: Filters::all(),
delivery: Delivery::default(),
});
index.add_spec(ViewSpec {
id: format!("{}/append", entity_name),
export: entity_name.clone(),
mode: Mode::Append,
projection: Projection::all(),
filters: Filters::all(),
delivery: Delivery::default(),
});
tracing::info!("Registered views for entity: {}", entity_name);
}
}
index
});
#[cfg(feature = "otel")]
let mut runtime = Runtime::new(self.config, view_index, self.metrics);
#[cfg(not(feature = "otel"))]
let mut runtime = Runtime::new(self.config, view_index);
if let Some(spec) = self.spec {
runtime = runtime.with_spec(spec);
}
runtime.run().await
}
pub fn build(self) -> Result<Runtime> {
let view_index = self.views.unwrap_or_else(|| {
let mut index = ViewIndex::new();
if let Some(ref spec) = self.spec {
for entity_name in spec.bytecode.entities.keys() {
index.add_spec(ViewSpec {
id: format!("{}/kv", entity_name),
export: entity_name.clone(),
mode: Mode::Kv,
projection: Projection::all(),
filters: Filters::all(),
delivery: Delivery::default(),
});
index.add_spec(ViewSpec {
id: format!("{}/list", entity_name),
export: entity_name.clone(),
mode: Mode::List,
projection: Projection::all(),
filters: Filters::all(),
delivery: Delivery::default(),
});
index.add_spec(ViewSpec {
id: format!("{}/state", entity_name),
export: entity_name.clone(),
mode: Mode::State,
projection: Projection::all(),
filters: Filters::all(),
delivery: Delivery::default(),
});
index.add_spec(ViewSpec {
id: format!("{}/append", entity_name),
export: entity_name.clone(),
mode: Mode::Append,
projection: Projection::all(),
filters: Filters::all(),
delivery: Delivery::default(),
});
}
}
index
});
#[cfg(feature = "otel")]
let mut runtime = Runtime::new(self.config, view_index, self.metrics);
#[cfg(not(feature = "otel"))]
let mut runtime = Runtime::new(self.config, view_index);
if let Some(spec) = self.spec {
runtime = runtime.with_spec(spec);
}
Ok(runtime)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_builder_pattern() {
let _builder = Server::builder()
.websocket()
.bind("[::]:8877".parse::<SocketAddr>().unwrap());
}
#[test]
fn test_spec_creation() {
let bytecode = hyperstack_interpreter::compiler::MultiEntityBytecode::new().build();
let spec = Spec::new(bytecode, "test_program");
assert_eq!(spec.program_id, "test_program");
}
}