use super::entity_registry::EntityRegistry;
use super::exposure::RestExposure;
use super::host::ServerHost;
use crate::config::LinksConfig;
use crate::core::events::EventBus;
use crate::core::module::Module;
use crate::core::service::LinkService;
use crate::core::{EntityCreator, EntityFetcher};
use crate::events::SinkFactory;
use crate::events::sinks::SinkRegistry;
use crate::events::sinks::device_tokens::DeviceTokenStore;
use crate::events::sinks::in_app::NotificationStore;
use crate::events::sinks::preferences::NotificationPreferencesStore;
use anyhow::Result;
use axum::Router;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
pub struct ServerBuilder {
link_service: Option<Arc<dyn LinkService>>,
entity_registry: EntityRegistry,
configs: Vec<LinksConfig>,
modules: Vec<Arc<dyn Module>>,
custom_routes: Vec<Router>,
event_bus: Option<EventBus>,
sink_registry: Option<SinkRegistry>,
notification_store: Option<Arc<NotificationStore>>,
device_token_store: Option<Arc<DeviceTokenStore>>,
preferences_store: Option<Arc<NotificationPreferencesStore>>,
}
impl ServerBuilder {
pub fn new() -> Self {
Self {
link_service: None,
entity_registry: EntityRegistry::new(),
configs: Vec::new(),
modules: Vec::new(),
custom_routes: Vec::new(),
event_bus: None,
sink_registry: None,
notification_store: None,
device_token_store: None,
preferences_store: None,
}
}
pub fn with_link_service(mut self, service: impl LinkService + 'static) -> Self {
self.link_service = Some(Arc::new(service));
self
}
pub fn with_custom_routes(mut self, routes: Router) -> Self {
self.custom_routes.push(routes);
self
}
pub fn with_event_bus(mut self, capacity: usize) -> Self {
self.event_bus = Some(EventBus::new(capacity));
self
}
pub fn with_sink_registry(mut self, registry: SinkRegistry) -> Self {
self.sink_registry = Some(registry);
self
}
pub fn with_notification_store(mut self, store: Arc<NotificationStore>) -> Self {
self.notification_store = Some(store);
self
}
pub fn with_device_token_store(mut self, store: Arc<DeviceTokenStore>) -> Self {
self.device_token_store = Some(store);
self
}
pub fn with_preferences_store(mut self, store: Arc<NotificationPreferencesStore>) -> Self {
self.preferences_store = Some(store);
self
}
pub fn register_module(mut self, module: impl Module + 'static) -> Result<Self> {
let module = Arc::new(module);
let config = module.links_config()?;
self.configs.push(config);
module.register_entities(&mut self.entity_registry);
self.modules.push(module);
Ok(self)
}
pub fn build_host(mut self) -> Result<ServerHost> {
let merged_config = self.merge_configs()?;
let link_service = self
.link_service
.take()
.ok_or_else(|| anyhow::anyhow!("LinkService is required. Call .with_link_service()"))?;
let mut fetchers_map: HashMap<String, Arc<dyn EntityFetcher>> = HashMap::new();
for module in &self.modules {
for entity_type in module.entity_types() {
if let Some(fetcher) = module.get_entity_fetcher(entity_type) {
fetchers_map.insert(entity_type.to_string(), fetcher);
}
}
}
let mut creators_map: HashMap<String, Arc<dyn EntityCreator>> = HashMap::new();
for module in &self.modules {
for entity_type in module.entity_types() {
if let Some(creator) = module.get_entity_creator(entity_type) {
creators_map.insert(entity_type.to_string(), creator);
}
}
}
let mut host = ServerHost::from_builder_components(
link_service,
merged_config,
self.entity_registry,
fetchers_map,
creators_map,
)?;
if let Some(event_bus) = self.event_bus.take() {
host = host.with_event_bus(event_bus);
}
let has_sinks = host.config.sinks.as_ref().is_some_and(|s| !s.is_empty());
if has_sinks || self.sink_registry.is_some() {
let notification_store = self
.notification_store
.take()
.unwrap_or_else(|| Arc::new(NotificationStore::new()));
let preferences_store = self
.preferences_store
.take()
.unwrap_or_else(|| Arc::new(NotificationPreferencesStore::new()));
let device_token_store = self
.device_token_store
.take()
.unwrap_or_else(|| Arc::new(DeviceTokenStore::new()));
let sink_registry = if let Some(registry) = self.sink_registry.take() {
registry
} else if let Some(ref sink_configs) = host.config.sinks {
let factory = SinkFactory::with_stores(
notification_store.clone(),
preferences_store.clone(),
device_token_store.clone(),
);
factory.build_registry(sink_configs)
} else {
SinkRegistry::new()
};
host = host
.with_notification_store(notification_store)
.with_preferences_store(preferences_store)
.with_device_token_store(device_token_store)
.with_sink_registry(sink_registry);
tracing::info!("event pipeline auto-wired from config");
}
if host.config.events.is_some() {
let event_log = Arc::new(crate::events::InMemoryEventLog::new());
host = host.with_event_log(event_log);
tracing::info!("event log auto-wired (InMemoryEventLog)");
}
Ok(host)
}
pub fn build(mut self) -> Result<Router> {
let custom_routes = std::mem::take(&mut self.custom_routes);
let host = Arc::new(self.build_host()?);
RestExposure::build_router(host, custom_routes)
}
fn merge_configs(&self) -> Result<LinksConfig> {
Ok(LinksConfig::merge(self.configs.clone()))
}
#[cfg(feature = "grpc")]
pub fn build_with_grpc(mut self) -> Result<Router> {
use super::exposure::grpc::GrpcExposure;
use super::router::combine_rest_and_grpc;
let custom_routes = std::mem::take(&mut self.custom_routes);
let host = Arc::new(self.build_host()?);
let rest_router = RestExposure::build_router(host.clone(), custom_routes)?;
let grpc_router = GrpcExposure::build_router_no_fallback(host)?;
Ok(combine_rest_and_grpc(rest_router, grpc_router))
}
pub async fn serve(self, addr: &str) -> Result<()> {
let app = self.build()?;
let listener = TcpListener::bind(addr).await?;
tracing::info!("Server listening on {}", addr);
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
tracing::info!("Server shutdown complete");
Ok(())
}
#[cfg(feature = "grpc")]
pub async fn serve_with_grpc(self, addr: &str) -> Result<()> {
let app = self.build_with_grpc()?;
let listener = TcpListener::bind(addr).await?;
tracing::info!("Server listening on {} (REST + gRPC)", addr);
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
tracing::info!("Server shutdown complete");
Ok(())
}
}
impl Default for ServerBuilder {
fn default() -> Self {
Self::new()
}
}
async fn shutdown_signal() {
use tokio::signal;
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
tracing::info!("Received Ctrl+C signal, initiating graceful shutdown...");
},
_ = terminate => {
tracing::info!("Received SIGTERM signal, initiating graceful shutdown...");
},
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{EntityAuthConfig, EntityConfig, LinksConfig};
use crate::core::LinkDefinition;
use crate::core::module::Module;
use crate::server::entity_registry::EntityRegistry;
use crate::storage::InMemoryLinkService;
use std::sync::Arc;
struct StubModule {
name: &'static str,
entity_types: Vec<&'static str>,
config: LinksConfig,
}
impl StubModule {
fn single_entity() -> Self {
Self {
name: "stub",
entity_types: vec!["order"],
config: LinksConfig {
entities: vec![EntityConfig {
singular: "order".to_string(),
plural: "orders".to_string(),
auth: EntityAuthConfig::default(),
}],
links: vec![],
validation_rules: None,
events: None,
sinks: None,
},
}
}
fn with_link() -> Self {
Self {
name: "linked_stub",
entity_types: vec!["user", "car"],
config: LinksConfig {
entities: vec![
EntityConfig {
singular: "user".to_string(),
plural: "users".to_string(),
auth: EntityAuthConfig::default(),
},
EntityConfig {
singular: "car".to_string(),
plural: "cars".to_string(),
auth: EntityAuthConfig::default(),
},
],
links: vec![LinkDefinition {
link_type: "owner".to_string(),
source_type: "user".to_string(),
target_type: "car".to_string(),
forward_route_name: "cars-owned".to_string(),
reverse_route_name: "users-owners".to_string(),
description: Some("User owns a car".to_string()),
required_fields: None,
auth: None,
}],
validation_rules: None,
events: None,
sinks: None,
},
}
}
}
impl Module for StubModule {
fn name(&self) -> &str {
self.name
}
fn entity_types(&self) -> Vec<&str> {
self.entity_types.clone()
}
fn links_config(&self) -> anyhow::Result<LinksConfig> {
Ok(self.config.clone())
}
fn register_entities(&self, _registry: &mut EntityRegistry) {
}
fn get_entity_fetcher(
&self,
_entity_type: &str,
) -> Option<Arc<dyn crate::core::EntityFetcher>> {
None
}
fn get_entity_creator(
&self,
_entity_type: &str,
) -> Option<Arc<dyn crate::core::EntityCreator>> {
None
}
}
struct FailingModule;
impl Module for FailingModule {
fn name(&self) -> &str {
"failing"
}
fn entity_types(&self) -> Vec<&str> {
vec![]
}
fn links_config(&self) -> anyhow::Result<LinksConfig> {
Err(anyhow::anyhow!("config load failed"))
}
fn register_entities(&self, _registry: &mut EntityRegistry) {}
fn get_entity_fetcher(
&self,
_entity_type: &str,
) -> Option<Arc<dyn crate::core::EntityFetcher>> {
None
}
fn get_entity_creator(
&self,
_entity_type: &str,
) -> Option<Arc<dyn crate::core::EntityCreator>> {
None
}
}
#[test]
fn test_new_creates_empty_builder() {
let builder = ServerBuilder::new();
assert!(builder.link_service.is_none());
assert!(builder.configs.is_empty());
assert!(builder.modules.is_empty());
assert!(builder.custom_routes.is_empty());
assert!(builder.event_bus.is_none());
}
#[test]
fn test_default_is_same_as_new() {
let builder = ServerBuilder::default();
assert!(builder.link_service.is_none());
assert!(builder.configs.is_empty());
assert!(builder.modules.is_empty());
assert!(builder.custom_routes.is_empty());
assert!(builder.event_bus.is_none());
}
#[test]
fn test_with_link_service_sets_service() {
let builder = ServerBuilder::new().with_link_service(InMemoryLinkService::new());
assert!(builder.link_service.is_some());
}
#[test]
fn test_with_event_bus_sets_bus() {
let builder = ServerBuilder::new().with_event_bus(1024);
assert!(builder.event_bus.is_some());
}
#[test]
fn test_with_custom_routes_appends_router() {
let builder = ServerBuilder::new()
.with_custom_routes(Router::new())
.with_custom_routes(Router::new());
assert_eq!(builder.custom_routes.len(), 2);
}
#[test]
fn test_register_module_stores_config_and_module() {
let builder = ServerBuilder::new()
.register_module(StubModule::single_entity())
.expect("register_module should succeed");
assert_eq!(builder.configs.len(), 1);
assert_eq!(builder.modules.len(), 1);
}
#[test]
fn test_register_multiple_modules() {
let builder = ServerBuilder::new()
.register_module(StubModule::single_entity())
.expect("first module should register")
.register_module(StubModule::with_link())
.expect("second module should register");
assert_eq!(builder.configs.len(), 2);
assert_eq!(builder.modules.len(), 2);
}
#[test]
fn test_register_module_failing_config_returns_error() {
let result = ServerBuilder::new().register_module(FailingModule);
assert!(result.is_err());
let err_msg = format!("{}", result.err().expect("should be Err"));
assert!(
err_msg.contains("config load failed"),
"error should contain cause: {}",
err_msg
);
}
#[test]
fn test_build_host_without_link_service_fails() {
let result = ServerBuilder::new()
.register_module(StubModule::single_entity())
.expect("register should succeed")
.build_host();
assert!(result.is_err());
let err_msg = format!("{}", result.err().expect("should be Err"));
assert!(
err_msg.contains("LinkService is required"),
"error should mention LinkService: {}",
err_msg
);
}
#[test]
fn test_build_host_single_module() {
let host = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.register_module(StubModule::single_entity())
.expect("register should succeed")
.build_host()
.expect("build_host should succeed");
assert_eq!(host.config.entities.len(), 1);
assert_eq!(host.config.entities[0].singular, "order");
assert!(host.event_bus.is_none());
}
#[test]
fn test_build_host_multi_module_merges_configs() {
let host = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.register_module(StubModule::single_entity())
.expect("register first should succeed")
.register_module(StubModule::with_link())
.expect("register second should succeed")
.build_host()
.expect("build_host should succeed");
let entity_names: Vec<&str> = host
.config
.entities
.iter()
.map(|e| e.singular.as_str())
.collect();
assert!(entity_names.contains(&"order"), "should contain order");
assert!(entity_names.contains(&"user"), "should contain user");
assert!(entity_names.contains(&"car"), "should contain car");
}
#[test]
fn test_build_host_with_event_bus_attaches_bus() {
let host = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.with_event_bus(16)
.register_module(StubModule::single_entity())
.expect("register should succeed")
.build_host()
.expect("build_host should succeed");
assert!(host.event_bus().is_some());
}
#[test]
fn test_build_host_no_modules_empty_config() {
let host = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.build_host()
.expect("build_host with no modules should succeed");
assert!(host.config.entities.is_empty());
assert!(host.config.links.is_empty());
}
#[test]
fn test_build_produces_router() {
let router = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.register_module(StubModule::single_entity())
.expect("register should succeed")
.build()
.expect("build should produce a Router");
let _ = router;
}
#[test]
fn test_build_without_link_service_fails() {
let result = ServerBuilder::new()
.register_module(StubModule::single_entity())
.expect("register should succeed")
.build();
assert!(result.is_err());
}
#[test]
fn test_build_with_custom_routes() {
use axum::routing::get;
let custom = Router::new().route("/custom", get(|| async { "ok" }));
let router = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.with_custom_routes(custom)
.register_module(StubModule::single_entity())
.expect("register should succeed")
.build()
.expect("build should succeed with custom routes");
let _ = router;
}
#[test]
fn test_fluent_chaining_full_pipeline() {
let result = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.with_event_bus(256)
.with_custom_routes(Router::new())
.register_module(StubModule::with_link())
.expect("register should succeed")
.build();
assert!(result.is_ok(), "full fluent pipeline should succeed");
}
struct StubModuleWithSinks;
impl Module for StubModuleWithSinks {
fn name(&self) -> &str {
"with_sinks"
}
fn entity_types(&self) -> Vec<&str> {
vec!["user"]
}
fn links_config(&self) -> anyhow::Result<LinksConfig> {
use crate::config::events::EventsConfig;
use crate::config::sinks::{SinkConfig, SinkType};
Ok(LinksConfig {
entities: vec![EntityConfig {
singular: "user".to_string(),
plural: "users".to_string(),
auth: EntityAuthConfig::default(),
}],
links: vec![],
validation_rules: None,
events: Some(EventsConfig::default()),
sinks: Some(vec![SinkConfig {
name: "in-app-notif".to_string(),
sink_type: SinkType::InApp,
config: Default::default(),
}]),
})
}
fn register_entities(&self, _registry: &mut EntityRegistry) {}
fn get_entity_fetcher(
&self,
_entity_type: &str,
) -> Option<Arc<dyn crate::core::EntityFetcher>> {
None
}
fn get_entity_creator(
&self,
_entity_type: &str,
) -> Option<Arc<dyn crate::core::EntityCreator>> {
None
}
}
#[test]
fn test_auto_wire_sinks_from_config() {
let host = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.with_event_bus(16)
.register_module(StubModuleWithSinks)
.expect("register should succeed")
.build_host()
.expect("build_host should succeed");
assert!(host.sink_registry().is_some());
let registry = host.sink_registry().unwrap();
assert!(registry.get("in-app-notif").is_some());
assert_eq!(registry.len(), 1);
assert!(host.notification_store().is_some());
assert!(host.device_token_store().is_some());
assert!(host.preferences_store().is_some());
assert!(host.event_log().is_some());
}
#[test]
fn test_no_auto_wire_without_sinks_config() {
let host = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.register_module(StubModule::single_entity())
.expect("register should succeed")
.build_host()
.expect("build_host should succeed");
assert!(host.sink_registry().is_none());
assert!(host.notification_store().is_none());
assert!(host.device_token_store().is_none());
assert!(host.preferences_store().is_none());
assert!(host.event_log().is_none());
}
#[test]
fn test_manual_sink_registry_overrides_auto_wire() {
let manual_registry = SinkRegistry::new();
let host = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.with_sink_registry(manual_registry)
.register_module(StubModuleWithSinks)
.expect("register should succeed")
.build_host()
.expect("build_host should succeed");
let registry = host.sink_registry().unwrap();
assert!(registry.is_empty());
assert!(registry.get("in-app-notif").is_none());
}
#[test]
fn test_manual_stores_used_in_auto_wire() {
let custom_store = Arc::new(crate::events::sinks::in_app::NotificationStore::new());
let store_clone = custom_store.clone();
let host = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.with_notification_store(custom_store)
.register_module(StubModuleWithSinks)
.expect("register should succeed")
.build_host()
.expect("build_host should succeed");
assert!(Arc::ptr_eq(
host.notification_store().unwrap(),
&store_clone
));
}
#[test]
fn test_retro_compatible_no_sinks_no_events() {
let host = ServerBuilder::new()
.with_link_service(InMemoryLinkService::new())
.with_event_bus(16)
.register_module(StubModule::single_entity())
.expect("register should succeed")
.build_host()
.expect("build_host should succeed");
assert!(host.event_bus().is_some());
assert!(host.sink_registry().is_none());
assert!(host.event_log().is_none());
assert_eq!(host.config.entities.len(), 1);
}
}