1use super::entity_registry::EntityRegistry;
4use super::exposure::RestExposure;
5use super::host::ServerHost;
6use crate::config::LinksConfig;
7use crate::core::events::EventBus;
8use crate::core::module::Module;
9use crate::core::service::LinkService;
10use crate::core::{EntityCreator, EntityFetcher};
11use crate::events::SinkFactory;
12use crate::events::sinks::SinkRegistry;
13use crate::events::sinks::device_tokens::DeviceTokenStore;
14use crate::events::sinks::in_app::NotificationStore;
15use crate::events::sinks::preferences::NotificationPreferencesStore;
16use anyhow::Result;
17use axum::Router;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::net::TcpListener;
21
22pub struct ServerBuilder {
33 link_service: Option<Arc<dyn LinkService>>,
34 entity_registry: EntityRegistry,
35 configs: Vec<LinksConfig>,
36 modules: Vec<Arc<dyn Module>>,
37 custom_routes: Vec<Router>,
38 event_bus: Option<EventBus>,
39
40 sink_registry: Option<SinkRegistry>,
42 notification_store: Option<Arc<NotificationStore>>,
43 device_token_store: Option<Arc<DeviceTokenStore>>,
44 preferences_store: Option<Arc<NotificationPreferencesStore>>,
45}
46
47impl ServerBuilder {
48 pub fn new() -> Self {
50 Self {
51 link_service: None,
52 entity_registry: EntityRegistry::new(),
53 configs: Vec::new(),
54 modules: Vec::new(),
55 custom_routes: Vec::new(),
56 event_bus: None,
57 sink_registry: None,
58 notification_store: None,
59 device_token_store: None,
60 preferences_store: None,
61 }
62 }
63
64 pub fn with_link_service(mut self, service: impl LinkService + 'static) -> Self {
66 self.link_service = Some(Arc::new(service));
67 self
68 }
69
70 pub fn with_custom_routes(mut self, routes: Router) -> Self {
96 self.custom_routes.push(routes);
97 self
98 }
99
100 pub fn with_event_bus(mut self, capacity: usize) -> Self {
119 self.event_bus = Some(EventBus::new(capacity));
120 self
121 }
122
123 pub fn with_sink_registry(mut self, registry: SinkRegistry) -> Self {
128 self.sink_registry = Some(registry);
129 self
130 }
131
132 pub fn with_notification_store(mut self, store: Arc<NotificationStore>) -> Self {
136 self.notification_store = Some(store);
137 self
138 }
139
140 pub fn with_device_token_store(mut self, store: Arc<DeviceTokenStore>) -> Self {
144 self.device_token_store = Some(store);
145 self
146 }
147
148 pub fn with_preferences_store(mut self, store: Arc<NotificationPreferencesStore>) -> Self {
152 self.preferences_store = Some(store);
153 self
154 }
155
156 pub fn register_module(mut self, module: impl Module + 'static) -> Result<Self> {
163 let module = Arc::new(module);
164
165 let config = module.links_config()?;
167 self.configs.push(config);
168
169 module.register_entities(&mut self.entity_registry);
171
172 self.modules.push(module);
174
175 Ok(self)
176 }
177
178 pub fn build_host(mut self) -> Result<ServerHost> {
187 let merged_config = self.merge_configs()?;
189
190 let link_service = self
192 .link_service
193 .take()
194 .ok_or_else(|| anyhow::anyhow!("LinkService is required. Call .with_link_service()"))?;
195
196 let mut fetchers_map: HashMap<String, Arc<dyn EntityFetcher>> = HashMap::new();
198 for module in &self.modules {
199 for entity_type in module.entity_types() {
200 if let Some(fetcher) = module.get_entity_fetcher(entity_type) {
201 fetchers_map.insert(entity_type.to_string(), fetcher);
202 }
203 }
204 }
205
206 let mut creators_map: HashMap<String, Arc<dyn EntityCreator>> = HashMap::new();
208 for module in &self.modules {
209 for entity_type in module.entity_types() {
210 if let Some(creator) = module.get_entity_creator(entity_type) {
211 creators_map.insert(entity_type.to_string(), creator);
212 }
213 }
214 }
215
216 let mut host = ServerHost::from_builder_components(
218 link_service,
219 merged_config,
220 self.entity_registry,
221 fetchers_map,
222 creators_map,
223 )?;
224
225 if let Some(event_bus) = self.event_bus.take() {
227 host = host.with_event_bus(event_bus);
228 }
229
230 let has_sinks = host.config.sinks.as_ref().is_some_and(|s| !s.is_empty());
232
233 if has_sinks || self.sink_registry.is_some() {
234 let notification_store = self
236 .notification_store
237 .take()
238 .unwrap_or_else(|| Arc::new(NotificationStore::new()));
239 let preferences_store = self
240 .preferences_store
241 .take()
242 .unwrap_or_else(|| Arc::new(NotificationPreferencesStore::new()));
243 let device_token_store = self
244 .device_token_store
245 .take()
246 .unwrap_or_else(|| Arc::new(DeviceTokenStore::new()));
247
248 let sink_registry = if let Some(registry) = self.sink_registry.take() {
250 registry
251 } else if let Some(ref sink_configs) = host.config.sinks {
252 let factory = SinkFactory::with_stores(
253 notification_store.clone(),
254 preferences_store.clone(),
255 device_token_store.clone(),
256 );
257 factory.build_registry(sink_configs)
258 } else {
259 SinkRegistry::new()
260 };
261
262 host = host
264 .with_notification_store(notification_store)
265 .with_preferences_store(preferences_store)
266 .with_device_token_store(device_token_store)
267 .with_sink_registry(sink_registry);
268
269 tracing::info!("event pipeline auto-wired from config");
270 }
271
272 if host.config.events.is_some() {
274 let event_log = Arc::new(crate::events::InMemoryEventLog::new());
275 host = host.with_event_log(event_log);
276 tracing::info!("event log auto-wired (InMemoryEventLog)");
277 }
278
279 Ok(host)
280 }
281
282 pub fn build(mut self) -> Result<Router> {
292 let custom_routes = std::mem::take(&mut self.custom_routes);
293 let host = Arc::new(self.build_host()?);
294 RestExposure::build_router(host, custom_routes)
295 }
296
297 fn merge_configs(&self) -> Result<LinksConfig> {
299 Ok(LinksConfig::merge(self.configs.clone()))
300 }
301
302 #[cfg(feature = "grpc")]
324 pub fn build_with_grpc(mut self) -> Result<Router> {
325 use super::exposure::grpc::GrpcExposure;
326 use super::router::combine_rest_and_grpc;
327
328 let custom_routes = std::mem::take(&mut self.custom_routes);
329 let host = Arc::new(self.build_host()?);
330
331 let rest_router = RestExposure::build_router(host.clone(), custom_routes)?;
332 let grpc_router = GrpcExposure::build_router_no_fallback(host)?;
333
334 Ok(combine_rest_and_grpc(rest_router, grpc_router))
335 }
336
337 pub async fn serve(self, addr: &str) -> Result<()> {
353 let app = self.build()?;
354 let listener = TcpListener::bind(addr).await?;
355
356 tracing::info!("Server listening on {}", addr);
357
358 axum::serve(listener, app)
359 .with_graceful_shutdown(shutdown_signal())
360 .await?;
361
362 tracing::info!("Server shutdown complete");
363 Ok(())
364 }
365
366 #[cfg(feature = "grpc")]
380 pub async fn serve_with_grpc(self, addr: &str) -> Result<()> {
381 let app = self.build_with_grpc()?;
382 let listener = TcpListener::bind(addr).await?;
383
384 tracing::info!("Server listening on {} (REST + gRPC)", addr);
385
386 axum::serve(listener, app)
387 .with_graceful_shutdown(shutdown_signal())
388 .await?;
389
390 tracing::info!("Server shutdown complete");
391 Ok(())
392 }
393}
394
395impl Default for ServerBuilder {
396 fn default() -> Self {
397 Self::new()
398 }
399}
400
401async fn shutdown_signal() {
403 use tokio::signal;
404
405 let ctrl_c = async {
406 signal::ctrl_c()
407 .await
408 .expect("failed to install Ctrl+C handler");
409 };
410
411 #[cfg(unix)]
412 let terminate = async {
413 signal::unix::signal(signal::unix::SignalKind::terminate())
414 .expect("failed to install SIGTERM handler")
415 .recv()
416 .await;
417 };
418
419 #[cfg(not(unix))]
420 let terminate = std::future::pending::<()>();
421
422 tokio::select! {
423 _ = ctrl_c => {
424 tracing::info!("Received Ctrl+C signal, initiating graceful shutdown...");
425 },
426 _ = terminate => {
427 tracing::info!("Received SIGTERM signal, initiating graceful shutdown...");
428 },
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use crate::config::{EntityAuthConfig, EntityConfig, LinksConfig};
436 use crate::core::LinkDefinition;
437 use crate::core::module::Module;
438 use crate::server::entity_registry::EntityRegistry;
439 use crate::storage::InMemoryLinkService;
440 use std::sync::Arc;
441
442 struct StubModule {
446 name: &'static str,
447 entity_types: Vec<&'static str>,
448 config: LinksConfig,
449 }
450
451 impl StubModule {
452 fn single_entity() -> Self {
453 Self {
454 name: "stub",
455 entity_types: vec!["order"],
456 config: LinksConfig {
457 entities: vec![EntityConfig {
458 singular: "order".to_string(),
459 plural: "orders".to_string(),
460 auth: EntityAuthConfig::default(),
461 }],
462 links: vec![],
463 validation_rules: None,
464 events: None,
465 sinks: None,
466 },
467 }
468 }
469
470 fn with_link() -> Self {
471 Self {
472 name: "linked_stub",
473 entity_types: vec!["user", "car"],
474 config: LinksConfig {
475 entities: vec![
476 EntityConfig {
477 singular: "user".to_string(),
478 plural: "users".to_string(),
479 auth: EntityAuthConfig::default(),
480 },
481 EntityConfig {
482 singular: "car".to_string(),
483 plural: "cars".to_string(),
484 auth: EntityAuthConfig::default(),
485 },
486 ],
487 links: vec![LinkDefinition {
488 link_type: "owner".to_string(),
489 source_type: "user".to_string(),
490 target_type: "car".to_string(),
491 forward_route_name: "cars-owned".to_string(),
492 reverse_route_name: "users-owners".to_string(),
493 description: Some("User owns a car".to_string()),
494 required_fields: None,
495 auth: None,
496 }],
497 validation_rules: None,
498 events: None,
499 sinks: None,
500 },
501 }
502 }
503 }
504
505 impl Module for StubModule {
506 fn name(&self) -> &str {
507 self.name
508 }
509
510 fn entity_types(&self) -> Vec<&str> {
511 self.entity_types.clone()
512 }
513
514 fn links_config(&self) -> anyhow::Result<LinksConfig> {
515 Ok(self.config.clone())
516 }
517
518 fn register_entities(&self, _registry: &mut EntityRegistry) {
519 }
521
522 fn get_entity_fetcher(
523 &self,
524 _entity_type: &str,
525 ) -> Option<Arc<dyn crate::core::EntityFetcher>> {
526 None
527 }
528
529 fn get_entity_creator(
530 &self,
531 _entity_type: &str,
532 ) -> Option<Arc<dyn crate::core::EntityCreator>> {
533 None
534 }
535 }
536
537 struct FailingModule;
539
540 impl Module for FailingModule {
541 fn name(&self) -> &str {
542 "failing"
543 }
544
545 fn entity_types(&self) -> Vec<&str> {
546 vec![]
547 }
548
549 fn links_config(&self) -> anyhow::Result<LinksConfig> {
550 Err(anyhow::anyhow!("config load failed"))
551 }
552
553 fn register_entities(&self, _registry: &mut EntityRegistry) {}
554
555 fn get_entity_fetcher(
556 &self,
557 _entity_type: &str,
558 ) -> Option<Arc<dyn crate::core::EntityFetcher>> {
559 None
560 }
561
562 fn get_entity_creator(
563 &self,
564 _entity_type: &str,
565 ) -> Option<Arc<dyn crate::core::EntityCreator>> {
566 None
567 }
568 }
569
570 #[test]
573 fn test_new_creates_empty_builder() {
574 let builder = ServerBuilder::new();
575 assert!(builder.link_service.is_none());
576 assert!(builder.configs.is_empty());
577 assert!(builder.modules.is_empty());
578 assert!(builder.custom_routes.is_empty());
579 assert!(builder.event_bus.is_none());
580 }
581
582 #[test]
583 fn test_default_is_same_as_new() {
584 let builder = ServerBuilder::default();
585 assert!(builder.link_service.is_none());
586 assert!(builder.configs.is_empty());
587 assert!(builder.modules.is_empty());
588 assert!(builder.custom_routes.is_empty());
589 assert!(builder.event_bus.is_none());
590 }
591
592 #[test]
595 fn test_with_link_service_sets_service() {
596 let builder = ServerBuilder::new().with_link_service(InMemoryLinkService::new());
597 assert!(builder.link_service.is_some());
598 }
599
600 #[test]
603 fn test_with_event_bus_sets_bus() {
604 let builder = ServerBuilder::new().with_event_bus(1024);
605 assert!(builder.event_bus.is_some());
606 }
607
608 #[test]
611 fn test_with_custom_routes_appends_router() {
612 let builder = ServerBuilder::new()
613 .with_custom_routes(Router::new())
614 .with_custom_routes(Router::new());
615 assert_eq!(builder.custom_routes.len(), 2);
616 }
617
618 #[test]
621 fn test_register_module_stores_config_and_module() {
622 let builder = ServerBuilder::new()
623 .register_module(StubModule::single_entity())
624 .expect("register_module should succeed");
625 assert_eq!(builder.configs.len(), 1);
626 assert_eq!(builder.modules.len(), 1);
627 }
628
629 #[test]
630 fn test_register_multiple_modules() {
631 let builder = ServerBuilder::new()
632 .register_module(StubModule::single_entity())
633 .expect("first module should register")
634 .register_module(StubModule::with_link())
635 .expect("second module should register");
636 assert_eq!(builder.configs.len(), 2);
637 assert_eq!(builder.modules.len(), 2);
638 }
639
640 #[test]
641 fn test_register_module_failing_config_returns_error() {
642 let result = ServerBuilder::new().register_module(FailingModule);
643 assert!(result.is_err());
644 let err_msg = format!("{}", result.err().expect("should be Err"));
645 assert!(
646 err_msg.contains("config load failed"),
647 "error should contain cause: {}",
648 err_msg
649 );
650 }
651
652 #[test]
655 fn test_build_host_without_link_service_fails() {
656 let result = ServerBuilder::new()
657 .register_module(StubModule::single_entity())
658 .expect("register should succeed")
659 .build_host();
660 assert!(result.is_err());
661 let err_msg = format!("{}", result.err().expect("should be Err"));
662 assert!(
663 err_msg.contains("LinkService is required"),
664 "error should mention LinkService: {}",
665 err_msg
666 );
667 }
668
669 #[test]
670 fn test_build_host_single_module() {
671 let host = ServerBuilder::new()
672 .with_link_service(InMemoryLinkService::new())
673 .register_module(StubModule::single_entity())
674 .expect("register should succeed")
675 .build_host()
676 .expect("build_host should succeed");
677
678 assert_eq!(host.config.entities.len(), 1);
679 assert_eq!(host.config.entities[0].singular, "order");
680 assert!(host.event_bus.is_none());
681 }
682
683 #[test]
684 fn test_build_host_multi_module_merges_configs() {
685 let host = ServerBuilder::new()
686 .with_link_service(InMemoryLinkService::new())
687 .register_module(StubModule::single_entity())
688 .expect("register first should succeed")
689 .register_module(StubModule::with_link())
690 .expect("register second should succeed")
691 .build_host()
692 .expect("build_host should succeed");
693
694 let entity_names: Vec<&str> = host
696 .config
697 .entities
698 .iter()
699 .map(|e| e.singular.as_str())
700 .collect();
701 assert!(entity_names.contains(&"order"), "should contain order");
702 assert!(entity_names.contains(&"user"), "should contain user");
703 assert!(entity_names.contains(&"car"), "should contain car");
704 }
705
706 #[test]
707 fn test_build_host_with_event_bus_attaches_bus() {
708 let host = ServerBuilder::new()
709 .with_link_service(InMemoryLinkService::new())
710 .with_event_bus(16)
711 .register_module(StubModule::single_entity())
712 .expect("register should succeed")
713 .build_host()
714 .expect("build_host should succeed");
715
716 assert!(host.event_bus().is_some());
717 }
718
719 #[test]
720 fn test_build_host_no_modules_empty_config() {
721 let host = ServerBuilder::new()
722 .with_link_service(InMemoryLinkService::new())
723 .build_host()
724 .expect("build_host with no modules should succeed");
725
726 assert!(host.config.entities.is_empty());
727 assert!(host.config.links.is_empty());
728 }
729
730 #[test]
733 fn test_build_produces_router() {
734 let router = ServerBuilder::new()
735 .with_link_service(InMemoryLinkService::new())
736 .register_module(StubModule::single_entity())
737 .expect("register should succeed")
738 .build()
739 .expect("build should produce a Router");
740
741 let _ = router;
743 }
744
745 #[test]
746 fn test_build_without_link_service_fails() {
747 let result = ServerBuilder::new()
748 .register_module(StubModule::single_entity())
749 .expect("register should succeed")
750 .build();
751 assert!(result.is_err());
752 }
753
754 #[test]
755 fn test_build_with_custom_routes() {
756 use axum::routing::get;
757
758 let custom = Router::new().route("/custom", get(|| async { "ok" }));
759 let router = ServerBuilder::new()
760 .with_link_service(InMemoryLinkService::new())
761 .with_custom_routes(custom)
762 .register_module(StubModule::single_entity())
763 .expect("register should succeed")
764 .build()
765 .expect("build should succeed with custom routes");
766
767 let _ = router;
768 }
769
770 #[test]
773 fn test_fluent_chaining_full_pipeline() {
774 let result = ServerBuilder::new()
775 .with_link_service(InMemoryLinkService::new())
776 .with_event_bus(256)
777 .with_custom_routes(Router::new())
778 .register_module(StubModule::with_link())
779 .expect("register should succeed")
780 .build();
781 assert!(result.is_ok(), "full fluent pipeline should succeed");
782 }
783
784 struct StubModuleWithSinks;
788
789 impl Module for StubModuleWithSinks {
790 fn name(&self) -> &str {
791 "with_sinks"
792 }
793
794 fn entity_types(&self) -> Vec<&str> {
795 vec!["user"]
796 }
797
798 fn links_config(&self) -> anyhow::Result<LinksConfig> {
799 use crate::config::events::EventsConfig;
800 use crate::config::sinks::{SinkConfig, SinkType};
801
802 Ok(LinksConfig {
803 entities: vec![EntityConfig {
804 singular: "user".to_string(),
805 plural: "users".to_string(),
806 auth: EntityAuthConfig::default(),
807 }],
808 links: vec![],
809 validation_rules: None,
810 events: Some(EventsConfig::default()),
811 sinks: Some(vec![SinkConfig {
812 name: "in-app-notif".to_string(),
813 sink_type: SinkType::InApp,
814 config: Default::default(),
815 }]),
816 })
817 }
818
819 fn register_entities(&self, _registry: &mut EntityRegistry) {}
820
821 fn get_entity_fetcher(
822 &self,
823 _entity_type: &str,
824 ) -> Option<Arc<dyn crate::core::EntityFetcher>> {
825 None
826 }
827
828 fn get_entity_creator(
829 &self,
830 _entity_type: &str,
831 ) -> Option<Arc<dyn crate::core::EntityCreator>> {
832 None
833 }
834 }
835
836 #[test]
837 fn test_auto_wire_sinks_from_config() {
838 let host = ServerBuilder::new()
839 .with_link_service(InMemoryLinkService::new())
840 .with_event_bus(16)
841 .register_module(StubModuleWithSinks)
842 .expect("register should succeed")
843 .build_host()
844 .expect("build_host should succeed");
845
846 assert!(host.sink_registry().is_some());
848 let registry = host.sink_registry().unwrap();
849 assert!(registry.get("in-app-notif").is_some());
850 assert_eq!(registry.len(), 1);
851
852 assert!(host.notification_store().is_some());
854 assert!(host.device_token_store().is_some());
855 assert!(host.preferences_store().is_some());
856
857 assert!(host.event_log().is_some());
859 }
860
861 #[test]
862 fn test_no_auto_wire_without_sinks_config() {
863 let host = ServerBuilder::new()
864 .with_link_service(InMemoryLinkService::new())
865 .register_module(StubModule::single_entity())
866 .expect("register should succeed")
867 .build_host()
868 .expect("build_host should succeed");
869
870 assert!(host.sink_registry().is_none());
872 assert!(host.notification_store().is_none());
873 assert!(host.device_token_store().is_none());
874 assert!(host.preferences_store().is_none());
875 assert!(host.event_log().is_none());
876 }
877
878 #[test]
879 fn test_manual_sink_registry_overrides_auto_wire() {
880 let manual_registry = SinkRegistry::new();
881
882 let host = ServerBuilder::new()
883 .with_link_service(InMemoryLinkService::new())
884 .with_sink_registry(manual_registry)
885 .register_module(StubModuleWithSinks)
886 .expect("register should succeed")
887 .build_host()
888 .expect("build_host should succeed");
889
890 let registry = host.sink_registry().unwrap();
892 assert!(registry.is_empty());
893 assert!(registry.get("in-app-notif").is_none());
895 }
896
897 #[test]
898 fn test_manual_stores_used_in_auto_wire() {
899 let custom_store = Arc::new(crate::events::sinks::in_app::NotificationStore::new());
900 let store_clone = custom_store.clone();
901
902 let host = ServerBuilder::new()
903 .with_link_service(InMemoryLinkService::new())
904 .with_notification_store(custom_store)
905 .register_module(StubModuleWithSinks)
906 .expect("register should succeed")
907 .build_host()
908 .expect("build_host should succeed");
909
910 assert!(Arc::ptr_eq(
912 host.notification_store().unwrap(),
913 &store_clone
914 ));
915 }
916
917 #[test]
918 fn test_retro_compatible_no_sinks_no_events() {
919 let host = ServerBuilder::new()
921 .with_link_service(InMemoryLinkService::new())
922 .with_event_bus(16)
923 .register_module(StubModule::single_entity())
924 .expect("register should succeed")
925 .build_host()
926 .expect("build_host should succeed");
927
928 assert!(host.event_bus().is_some());
929 assert!(host.sink_registry().is_none());
930 assert!(host.event_log().is_none());
931 assert_eq!(host.config.entities.len(), 1);
932 }
933}