Skip to main content

this/server/
builder.rs

1//! ServerBuilder for fluent API to build HTTP servers
2
3use super::entity_registry::EntityRegistry;
4use super::exposure::RestExposure;
5use super::host::ServerHost;
6use crate::config::LinksConfig;
7use crate::core::events::EventBus;
8use crate::core::module::Module;
9use crate::core::service::LinkService;
10use crate::core::{EntityCreator, EntityFetcher};
11use crate::events::SinkFactory;
12use crate::events::sinks::SinkRegistry;
13use crate::events::sinks::device_tokens::DeviceTokenStore;
14use crate::events::sinks::in_app::NotificationStore;
15use crate::events::sinks::preferences::NotificationPreferencesStore;
16use anyhow::Result;
17use axum::Router;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::net::TcpListener;
21
22/// Builder for creating HTTP servers with auto-registered routes
23///
24/// # Example
25///
26/// ```ignore
27/// let app = ServerBuilder::new()
28///     .with_link_service(InMemoryLinkService::new())
29///     .register_module(MyModule)
30///     .build()?;
31/// ```
32pub struct ServerBuilder {
33    link_service: Option<Arc<dyn LinkService>>,
34    entity_registry: EntityRegistry,
35    configs: Vec<LinksConfig>,
36    modules: Vec<Arc<dyn Module>>,
37    custom_routes: Vec<Router>,
38    event_bus: Option<EventBus>,
39
40    // Manual overrides for event system stores
41    sink_registry: Option<SinkRegistry>,
42    notification_store: Option<Arc<NotificationStore>>,
43    device_token_store: Option<Arc<DeviceTokenStore>>,
44    preferences_store: Option<Arc<NotificationPreferencesStore>>,
45}
46
47impl ServerBuilder {
48    /// Create a new ServerBuilder
49    pub fn new() -> Self {
50        Self {
51            link_service: None,
52            entity_registry: EntityRegistry::new(),
53            configs: Vec::new(),
54            modules: Vec::new(),
55            custom_routes: Vec::new(),
56            event_bus: None,
57            sink_registry: None,
58            notification_store: None,
59            device_token_store: None,
60            preferences_store: None,
61        }
62    }
63
64    /// Set the link service (required)
65    pub fn with_link_service(mut self, service: impl LinkService + 'static) -> Self {
66        self.link_service = Some(Arc::new(service));
67        self
68    }
69
70    /// Add custom routes to the server
71    ///
72    /// Use this to add routes that don't fit the CRUD pattern, such as:
73    /// - Authentication endpoints (/login, /logout)
74    /// - OAuth flows (/oauth/token, /oauth/callback)
75    /// - Webhooks (/webhooks/stripe)
76    /// - Custom business logic endpoints
77    ///
78    /// # Example
79    ///
80    /// ```ignore
81    /// use axum::{Router, routing::{post, get}, Json};
82    /// use serde_json::json;
83    ///
84    /// let auth_routes = Router::new()
85    ///     .route("/login", post(login_handler))
86    ///     .route("/logout", post(logout_handler))
87    ///     .route("/oauth/token", post(oauth_token_handler));
88    ///
89    /// ServerBuilder::new()
90    ///     .with_link_service(service)
91    ///     .with_custom_routes(auth_routes)
92    ///     .register_module(module)?
93    ///     .build()?;
94    /// ```
95    pub fn with_custom_routes(mut self, routes: Router) -> Self {
96        self.custom_routes.push(routes);
97        self
98    }
99
100    /// Enable the event bus for real-time notifications
101    ///
102    /// When enabled, REST/GraphQL handlers will publish events for mutations,
103    /// and real-time exposures (WebSocket, SSE) can subscribe to receive them.
104    ///
105    /// # Arguments
106    ///
107    /// * `capacity` - Buffer size for the broadcast channel (recommended: 1024)
108    ///
109    /// # Example
110    ///
111    /// ```ignore
112    /// ServerBuilder::new()
113    ///     .with_link_service(service)
114    ///     .with_event_bus(1024)
115    ///     .register_module(module)?
116    ///     .build_host()?;
117    /// ```
118    pub fn with_event_bus(mut self, capacity: usize) -> Self {
119        self.event_bus = Some(EventBus::new(capacity));
120        self
121    }
122
123    /// Provide a pre-built sink registry (overrides auto-wiring from config)
124    ///
125    /// Use this when you need full control over which sinks are registered.
126    /// If not provided and `sinks` is present in config, sinks are auto-wired.
127    pub fn with_sink_registry(mut self, registry: SinkRegistry) -> Self {
128        self.sink_registry = Some(registry);
129        self
130    }
131
132    /// Provide a pre-built notification store
133    ///
134    /// If not provided, one is auto-created when InApp sinks are configured.
135    pub fn with_notification_store(mut self, store: Arc<NotificationStore>) -> Self {
136        self.notification_store = Some(store);
137        self
138    }
139
140    /// Provide a pre-built device token store
141    ///
142    /// If not provided, one is auto-created when sinks are configured.
143    pub fn with_device_token_store(mut self, store: Arc<DeviceTokenStore>) -> Self {
144        self.device_token_store = Some(store);
145        self
146    }
147
148    /// Provide a pre-built notification preferences store
149    ///
150    /// If not provided, one is auto-created when sinks are configured.
151    pub fn with_preferences_store(mut self, store: Arc<NotificationPreferencesStore>) -> Self {
152        self.preferences_store = Some(store);
153        self
154    }
155
156    /// Register a module
157    ///
158    /// This will:
159    /// 1. Load the module's configuration
160    /// 2. Register all entities from the module
161    /// 3. Store the module for entity fetching
162    pub fn register_module(mut self, module: impl Module + 'static) -> Result<Self> {
163        let module = Arc::new(module);
164
165        // Load the module's configuration
166        let config = module.links_config()?;
167        self.configs.push(config);
168
169        // Register entities from the module
170        module.register_entities(&mut self.entity_registry);
171
172        // Store module for fetchers
173        self.modules.push(module);
174
175        Ok(self)
176    }
177
178    /// Build the transport-agnostic host
179    ///
180    /// This generates a `ServerHost` that can be used with any exposure type
181    /// (REST, GraphQL, gRPC, etc.).
182    ///
183    /// # Returns
184    ///
185    /// Returns a `ServerHost` containing all framework state.
186    pub fn build_host(mut self) -> Result<ServerHost> {
187        // Merge all configs
188        let merged_config = self.merge_configs()?;
189
190        // Extract link service
191        let link_service = self
192            .link_service
193            .take()
194            .ok_or_else(|| anyhow::anyhow!("LinkService is required. Call .with_link_service()"))?;
195
196        // Build entity fetchers map from all modules
197        let mut fetchers_map: HashMap<String, Arc<dyn EntityFetcher>> = HashMap::new();
198        for module in &self.modules {
199            for entity_type in module.entity_types() {
200                if let Some(fetcher) = module.get_entity_fetcher(entity_type) {
201                    fetchers_map.insert(entity_type.to_string(), fetcher);
202                }
203            }
204        }
205
206        // Build entity creators map from all modules
207        let mut creators_map: HashMap<String, Arc<dyn EntityCreator>> = HashMap::new();
208        for module in &self.modules {
209            for entity_type in module.entity_types() {
210                if let Some(creator) = module.get_entity_creator(entity_type) {
211                    creators_map.insert(entity_type.to_string(), creator);
212                }
213            }
214        }
215
216        // Build the host
217        let mut host = ServerHost::from_builder_components(
218            link_service,
219            merged_config,
220            self.entity_registry,
221            fetchers_map,
222            creators_map,
223        )?;
224
225        // Attach event bus if configured
226        if let Some(event_bus) = self.event_bus.take() {
227            host = host.with_event_bus(event_bus);
228        }
229
230        // Auto-wire event pipeline from config (sinks section)
231        let has_sinks = host.config.sinks.as_ref().is_some_and(|s| !s.is_empty());
232
233        if has_sinks || self.sink_registry.is_some() {
234            // Build or use provided stores
235            let notification_store = self
236                .notification_store
237                .take()
238                .unwrap_or_else(|| Arc::new(NotificationStore::new()));
239            let preferences_store = self
240                .preferences_store
241                .take()
242                .unwrap_or_else(|| Arc::new(NotificationPreferencesStore::new()));
243            let device_token_store = self
244                .device_token_store
245                .take()
246                .unwrap_or_else(|| Arc::new(DeviceTokenStore::new()));
247
248            // Build or use provided sink registry
249            let sink_registry = if let Some(registry) = self.sink_registry.take() {
250                registry
251            } else if let Some(ref sink_configs) = host.config.sinks {
252                let factory = SinkFactory::with_stores(
253                    notification_store.clone(),
254                    preferences_store.clone(),
255                    device_token_store.clone(),
256                );
257                factory.build_registry(sink_configs)
258            } else {
259                SinkRegistry::new()
260            };
261
262            // Attach everything to the host
263            host = host
264                .with_notification_store(notification_store)
265                .with_preferences_store(preferences_store)
266                .with_device_token_store(device_token_store)
267                .with_sink_registry(sink_registry);
268
269            tracing::info!("event pipeline auto-wired from config");
270        }
271
272        // Auto-wire event log if events section is present
273        if host.config.events.is_some() {
274            let event_log = Arc::new(crate::events::InMemoryEventLog::new());
275            host = host.with_event_log(event_log);
276            tracing::info!("event log auto-wired (InMemoryEventLog)");
277        }
278
279        Ok(host)
280    }
281
282    /// Build the final REST router
283    ///
284    /// This generates:
285    /// - CRUD routes for all registered entities
286    /// - Link routes (bidirectional)
287    /// - Introspection routes
288    ///
289    /// Note: This is a convenience method that builds the host and immediately
290    /// exposes it via REST. For other exposure types, use `build_host_arc()`.
291    pub fn build(mut self) -> Result<Router> {
292        let custom_routes = std::mem::take(&mut self.custom_routes);
293        let host = Arc::new(self.build_host()?);
294        RestExposure::build_router(host, custom_routes)
295    }
296
297    /// Merge all configurations from registered modules
298    fn merge_configs(&self) -> Result<LinksConfig> {
299        Ok(LinksConfig::merge(self.configs.clone()))
300    }
301
302    /// Build a combined REST + gRPC router
303    ///
304    /// This is a convenience method that builds both REST and gRPC routers
305    /// from the registered modules and merges them safely into a single router.
306    ///
307    /// Internally, it uses [`GrpcExposure::build_router_no_fallback`](super::GrpcExposure::build_router_no_fallback) for the
308    /// gRPC side (no fallback) and [`RestExposure::build_router`] for REST
309    /// (with its nested link path fallback), then merges them via
310    /// [`combine_rest_and_grpc`](super::router::combine_rest_and_grpc).
311    ///
312    /// # Example
313    ///
314    /// ```ignore
315    /// let app = ServerBuilder::new()
316    ///     .with_link_service(InMemoryLinkService::new())
317    ///     .register_module(MyModule)?
318    ///     .build_with_grpc()?;
319    ///
320    /// let listener = TcpListener::bind("127.0.0.1:3000").await?;
321    /// axum::serve(listener, app).await?;
322    /// ```
323    #[cfg(feature = "grpc")]
324    pub fn build_with_grpc(mut self) -> Result<Router> {
325        use super::exposure::grpc::GrpcExposure;
326        use super::router::combine_rest_and_grpc;
327
328        let custom_routes = std::mem::take(&mut self.custom_routes);
329        let host = Arc::new(self.build_host()?);
330
331        let rest_router = RestExposure::build_router(host.clone(), custom_routes)?;
332        let grpc_router = GrpcExposure::build_router_no_fallback(host)?;
333
334        Ok(combine_rest_and_grpc(rest_router, grpc_router))
335    }
336
337    /// Serve the application with graceful shutdown
338    ///
339    /// This will:
340    /// - Bind to the provided address
341    /// - Start serving requests
342    /// - Handle SIGTERM and SIGINT (Ctrl+C) for graceful shutdown
343    ///
344    /// # Example
345    ///
346    /// ```ignore
347    /// ServerBuilder::new()
348    ///     .with_link_service(service)
349    ///     .register_module(module)?
350    ///     .serve("127.0.0.1:3000").await?;
351    /// ```
352    pub async fn serve(self, addr: &str) -> Result<()> {
353        let app = self.build()?;
354        let listener = TcpListener::bind(addr).await?;
355
356        tracing::info!("Server listening on {}", addr);
357
358        axum::serve(listener, app)
359            .with_graceful_shutdown(shutdown_signal())
360            .await?;
361
362        tracing::info!("Server shutdown complete");
363        Ok(())
364    }
365
366    /// Serve the application with REST + gRPC and graceful shutdown
367    ///
368    /// This is the gRPC equivalent of [`serve`](Self::serve). It builds a combined
369    /// REST+gRPC router and serves it on the given address.
370    ///
371    /// # Example
372    ///
373    /// ```ignore
374    /// ServerBuilder::new()
375    ///     .with_link_service(service)
376    ///     .register_module(module)?
377    ///     .serve_with_grpc("127.0.0.1:3000").await?;
378    /// ```
379    #[cfg(feature = "grpc")]
380    pub async fn serve_with_grpc(self, addr: &str) -> Result<()> {
381        let app = self.build_with_grpc()?;
382        let listener = TcpListener::bind(addr).await?;
383
384        tracing::info!("Server listening on {} (REST + gRPC)", addr);
385
386        axum::serve(listener, app)
387            .with_graceful_shutdown(shutdown_signal())
388            .await?;
389
390        tracing::info!("Server shutdown complete");
391        Ok(())
392    }
393}
394
395impl Default for ServerBuilder {
396    fn default() -> Self {
397        Self::new()
398    }
399}
400
401/// Wait for shutdown signal (SIGTERM or Ctrl+C)
402async fn shutdown_signal() {
403    use tokio::signal;
404
405    let ctrl_c = async {
406        signal::ctrl_c()
407            .await
408            .expect("failed to install Ctrl+C handler");
409    };
410
411    #[cfg(unix)]
412    let terminate = async {
413        signal::unix::signal(signal::unix::SignalKind::terminate())
414            .expect("failed to install SIGTERM handler")
415            .recv()
416            .await;
417    };
418
419    #[cfg(not(unix))]
420    let terminate = std::future::pending::<()>();
421
422    tokio::select! {
423        _ = ctrl_c => {
424            tracing::info!("Received Ctrl+C signal, initiating graceful shutdown...");
425        },
426        _ = terminate => {
427            tracing::info!("Received SIGTERM signal, initiating graceful shutdown...");
428        },
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use crate::config::{EntityAuthConfig, EntityConfig, LinksConfig};
436    use crate::core::LinkDefinition;
437    use crate::core::module::Module;
438    use crate::server::entity_registry::EntityRegistry;
439    use crate::storage::InMemoryLinkService;
440    use std::sync::Arc;
441
442    // ── Mock Module for testing ──────────────────────────────────────────
443
444    /// A minimal Module implementation for builder tests
445    struct StubModule {
446        name: &'static str,
447        entity_types: Vec<&'static str>,
448        config: LinksConfig,
449    }
450
451    impl StubModule {
452        fn single_entity() -> Self {
453            Self {
454                name: "stub",
455                entity_types: vec!["order"],
456                config: LinksConfig {
457                    entities: vec![EntityConfig {
458                        singular: "order".to_string(),
459                        plural: "orders".to_string(),
460                        auth: EntityAuthConfig::default(),
461                    }],
462                    links: vec![],
463                    validation_rules: None,
464                    events: None,
465                    sinks: None,
466                },
467            }
468        }
469
470        fn with_link() -> Self {
471            Self {
472                name: "linked_stub",
473                entity_types: vec!["user", "car"],
474                config: LinksConfig {
475                    entities: vec![
476                        EntityConfig {
477                            singular: "user".to_string(),
478                            plural: "users".to_string(),
479                            auth: EntityAuthConfig::default(),
480                        },
481                        EntityConfig {
482                            singular: "car".to_string(),
483                            plural: "cars".to_string(),
484                            auth: EntityAuthConfig::default(),
485                        },
486                    ],
487                    links: vec![LinkDefinition {
488                        link_type: "owner".to_string(),
489                        source_type: "user".to_string(),
490                        target_type: "car".to_string(),
491                        forward_route_name: "cars-owned".to_string(),
492                        reverse_route_name: "users-owners".to_string(),
493                        description: Some("User owns a car".to_string()),
494                        required_fields: None,
495                        auth: None,
496                    }],
497                    validation_rules: None,
498                    events: None,
499                    sinks: None,
500                },
501            }
502        }
503    }
504
505    impl Module for StubModule {
506        fn name(&self) -> &str {
507            self.name
508        }
509
510        fn entity_types(&self) -> Vec<&str> {
511            self.entity_types.clone()
512        }
513
514        fn links_config(&self) -> anyhow::Result<LinksConfig> {
515            Ok(self.config.clone())
516        }
517
518        fn register_entities(&self, _registry: &mut EntityRegistry) {
519            // No entity descriptors in stub
520        }
521
522        fn get_entity_fetcher(
523            &self,
524            _entity_type: &str,
525        ) -> Option<Arc<dyn crate::core::EntityFetcher>> {
526            None
527        }
528
529        fn get_entity_creator(
530            &self,
531            _entity_type: &str,
532        ) -> Option<Arc<dyn crate::core::EntityCreator>> {
533            None
534        }
535    }
536
537    /// A module whose links_config() returns an error
538    struct FailingModule;
539
540    impl Module for FailingModule {
541        fn name(&self) -> &str {
542            "failing"
543        }
544
545        fn entity_types(&self) -> Vec<&str> {
546            vec![]
547        }
548
549        fn links_config(&self) -> anyhow::Result<LinksConfig> {
550            Err(anyhow::anyhow!("config load failed"))
551        }
552
553        fn register_entities(&self, _registry: &mut EntityRegistry) {}
554
555        fn get_entity_fetcher(
556            &self,
557            _entity_type: &str,
558        ) -> Option<Arc<dyn crate::core::EntityFetcher>> {
559            None
560        }
561
562        fn get_entity_creator(
563            &self,
564            _entity_type: &str,
565        ) -> Option<Arc<dyn crate::core::EntityCreator>> {
566            None
567        }
568    }
569
570    // ── Constructor tests ────────────────────────────────────────────────
571
572    #[test]
573    fn test_new_creates_empty_builder() {
574        let builder = ServerBuilder::new();
575        assert!(builder.link_service.is_none());
576        assert!(builder.configs.is_empty());
577        assert!(builder.modules.is_empty());
578        assert!(builder.custom_routes.is_empty());
579        assert!(builder.event_bus.is_none());
580    }
581
582    #[test]
583    fn test_default_is_same_as_new() {
584        let builder = ServerBuilder::default();
585        assert!(builder.link_service.is_none());
586        assert!(builder.configs.is_empty());
587        assert!(builder.modules.is_empty());
588        assert!(builder.custom_routes.is_empty());
589        assert!(builder.event_bus.is_none());
590    }
591
592    // ── with_link_service ────────────────────────────────────────────────
593
594    #[test]
595    fn test_with_link_service_sets_service() {
596        let builder = ServerBuilder::new().with_link_service(InMemoryLinkService::new());
597        assert!(builder.link_service.is_some());
598    }
599
600    // ── with_event_bus ───────────────────────────────────────────────────
601
602    #[test]
603    fn test_with_event_bus_sets_bus() {
604        let builder = ServerBuilder::new().with_event_bus(1024);
605        assert!(builder.event_bus.is_some());
606    }
607
608    // ── with_custom_routes ───────────────────────────────────────────────
609
610    #[test]
611    fn test_with_custom_routes_appends_router() {
612        let builder = ServerBuilder::new()
613            .with_custom_routes(Router::new())
614            .with_custom_routes(Router::new());
615        assert_eq!(builder.custom_routes.len(), 2);
616    }
617
618    // ── register_module ──────────────────────────────────────────────────
619
620    #[test]
621    fn test_register_module_stores_config_and_module() {
622        let builder = ServerBuilder::new()
623            .register_module(StubModule::single_entity())
624            .expect("register_module should succeed");
625        assert_eq!(builder.configs.len(), 1);
626        assert_eq!(builder.modules.len(), 1);
627    }
628
629    #[test]
630    fn test_register_multiple_modules() {
631        let builder = ServerBuilder::new()
632            .register_module(StubModule::single_entity())
633            .expect("first module should register")
634            .register_module(StubModule::with_link())
635            .expect("second module should register");
636        assert_eq!(builder.configs.len(), 2);
637        assert_eq!(builder.modules.len(), 2);
638    }
639
640    #[test]
641    fn test_register_module_failing_config_returns_error() {
642        let result = ServerBuilder::new().register_module(FailingModule);
643        assert!(result.is_err());
644        let err_msg = format!("{}", result.err().expect("should be Err"));
645        assert!(
646            err_msg.contains("config load failed"),
647            "error should contain cause: {}",
648            err_msg
649        );
650    }
651
652    // ── build_host ───────────────────────────────────────────────────────
653
654    #[test]
655    fn test_build_host_without_link_service_fails() {
656        let result = ServerBuilder::new()
657            .register_module(StubModule::single_entity())
658            .expect("register should succeed")
659            .build_host();
660        assert!(result.is_err());
661        let err_msg = format!("{}", result.err().expect("should be Err"));
662        assert!(
663            err_msg.contains("LinkService is required"),
664            "error should mention LinkService: {}",
665            err_msg
666        );
667    }
668
669    #[test]
670    fn test_build_host_single_module() {
671        let host = ServerBuilder::new()
672            .with_link_service(InMemoryLinkService::new())
673            .register_module(StubModule::single_entity())
674            .expect("register should succeed")
675            .build_host()
676            .expect("build_host should succeed");
677
678        assert_eq!(host.config.entities.len(), 1);
679        assert_eq!(host.config.entities[0].singular, "order");
680        assert!(host.event_bus.is_none());
681    }
682
683    #[test]
684    fn test_build_host_multi_module_merges_configs() {
685        let host = ServerBuilder::new()
686            .with_link_service(InMemoryLinkService::new())
687            .register_module(StubModule::single_entity())
688            .expect("register first should succeed")
689            .register_module(StubModule::with_link())
690            .expect("register second should succeed")
691            .build_host()
692            .expect("build_host should succeed");
693
694        // Merged config should contain entities from both modules
695        let entity_names: Vec<&str> = host
696            .config
697            .entities
698            .iter()
699            .map(|e| e.singular.as_str())
700            .collect();
701        assert!(entity_names.contains(&"order"), "should contain order");
702        assert!(entity_names.contains(&"user"), "should contain user");
703        assert!(entity_names.contains(&"car"), "should contain car");
704    }
705
706    #[test]
707    fn test_build_host_with_event_bus_attaches_bus() {
708        let host = ServerBuilder::new()
709            .with_link_service(InMemoryLinkService::new())
710            .with_event_bus(16)
711            .register_module(StubModule::single_entity())
712            .expect("register should succeed")
713            .build_host()
714            .expect("build_host should succeed");
715
716        assert!(host.event_bus().is_some());
717    }
718
719    #[test]
720    fn test_build_host_no_modules_empty_config() {
721        let host = ServerBuilder::new()
722            .with_link_service(InMemoryLinkService::new())
723            .build_host()
724            .expect("build_host with no modules should succeed");
725
726        assert!(host.config.entities.is_empty());
727        assert!(host.config.links.is_empty());
728    }
729
730    // ── build (REST router) ──────────────────────────────────────────────
731
732    #[test]
733    fn test_build_produces_router() {
734        let router = ServerBuilder::new()
735            .with_link_service(InMemoryLinkService::new())
736            .register_module(StubModule::single_entity())
737            .expect("register should succeed")
738            .build()
739            .expect("build should produce a Router");
740
741        // We cannot inspect the Router deeply, but it should not panic
742        let _ = router;
743    }
744
745    #[test]
746    fn test_build_without_link_service_fails() {
747        let result = ServerBuilder::new()
748            .register_module(StubModule::single_entity())
749            .expect("register should succeed")
750            .build();
751        assert!(result.is_err());
752    }
753
754    #[test]
755    fn test_build_with_custom_routes() {
756        use axum::routing::get;
757
758        let custom = Router::new().route("/custom", get(|| async { "ok" }));
759        let router = ServerBuilder::new()
760            .with_link_service(InMemoryLinkService::new())
761            .with_custom_routes(custom)
762            .register_module(StubModule::single_entity())
763            .expect("register should succeed")
764            .build()
765            .expect("build should succeed with custom routes");
766
767        let _ = router;
768    }
769
770    // ── Fluent chaining ──────────────────────────────────────────────────
771
772    #[test]
773    fn test_fluent_chaining_full_pipeline() {
774        let result = ServerBuilder::new()
775            .with_link_service(InMemoryLinkService::new())
776            .with_event_bus(256)
777            .with_custom_routes(Router::new())
778            .register_module(StubModule::with_link())
779            .expect("register should succeed")
780            .build();
781        assert!(result.is_ok(), "full fluent pipeline should succeed");
782    }
783
784    // ── Auto-wiring tests ──────────────────────────────────────────────
785
786    /// A module whose config includes sinks
787    struct StubModuleWithSinks;
788
789    impl Module for StubModuleWithSinks {
790        fn name(&self) -> &str {
791            "with_sinks"
792        }
793
794        fn entity_types(&self) -> Vec<&str> {
795            vec!["user"]
796        }
797
798        fn links_config(&self) -> anyhow::Result<LinksConfig> {
799            use crate::config::events::EventsConfig;
800            use crate::config::sinks::{SinkConfig, SinkType};
801
802            Ok(LinksConfig {
803                entities: vec![EntityConfig {
804                    singular: "user".to_string(),
805                    plural: "users".to_string(),
806                    auth: EntityAuthConfig::default(),
807                }],
808                links: vec![],
809                validation_rules: None,
810                events: Some(EventsConfig::default()),
811                sinks: Some(vec![SinkConfig {
812                    name: "in-app-notif".to_string(),
813                    sink_type: SinkType::InApp,
814                    config: Default::default(),
815                }]),
816            })
817        }
818
819        fn register_entities(&self, _registry: &mut EntityRegistry) {}
820
821        fn get_entity_fetcher(
822            &self,
823            _entity_type: &str,
824        ) -> Option<Arc<dyn crate::core::EntityFetcher>> {
825            None
826        }
827
828        fn get_entity_creator(
829            &self,
830            _entity_type: &str,
831        ) -> Option<Arc<dyn crate::core::EntityCreator>> {
832            None
833        }
834    }
835
836    #[test]
837    fn test_auto_wire_sinks_from_config() {
838        let host = ServerBuilder::new()
839            .with_link_service(InMemoryLinkService::new())
840            .with_event_bus(16)
841            .register_module(StubModuleWithSinks)
842            .expect("register should succeed")
843            .build_host()
844            .expect("build_host should succeed");
845
846        // Auto-wired: InApp sink registered
847        assert!(host.sink_registry().is_some());
848        let registry = host.sink_registry().unwrap();
849        assert!(registry.get("in-app-notif").is_some());
850        assert_eq!(registry.len(), 1);
851
852        // Auto-wired: stores created
853        assert!(host.notification_store().is_some());
854        assert!(host.device_token_store().is_some());
855        assert!(host.preferences_store().is_some());
856
857        // Auto-wired: event log created (events section present)
858        assert!(host.event_log().is_some());
859    }
860
861    #[test]
862    fn test_no_auto_wire_without_sinks_config() {
863        let host = ServerBuilder::new()
864            .with_link_service(InMemoryLinkService::new())
865            .register_module(StubModule::single_entity())
866            .expect("register should succeed")
867            .build_host()
868            .expect("build_host should succeed");
869
870        // No sinks in config → no auto-wiring
871        assert!(host.sink_registry().is_none());
872        assert!(host.notification_store().is_none());
873        assert!(host.device_token_store().is_none());
874        assert!(host.preferences_store().is_none());
875        assert!(host.event_log().is_none());
876    }
877
878    #[test]
879    fn test_manual_sink_registry_overrides_auto_wire() {
880        let manual_registry = SinkRegistry::new();
881
882        let host = ServerBuilder::new()
883            .with_link_service(InMemoryLinkService::new())
884            .with_sink_registry(manual_registry)
885            .register_module(StubModuleWithSinks)
886            .expect("register should succeed")
887            .build_host()
888            .expect("build_host should succeed");
889
890        // Manual registry used instead of auto-wired → empty, no InApp
891        let registry = host.sink_registry().unwrap();
892        assert!(registry.is_empty());
893        // InApp from config NOT auto-created because we provided a manual registry
894        assert!(registry.get("in-app-notif").is_none());
895    }
896
897    #[test]
898    fn test_manual_stores_used_in_auto_wire() {
899        let custom_store = Arc::new(crate::events::sinks::in_app::NotificationStore::new());
900        let store_clone = custom_store.clone();
901
902        let host = ServerBuilder::new()
903            .with_link_service(InMemoryLinkService::new())
904            .with_notification_store(custom_store)
905            .register_module(StubModuleWithSinks)
906            .expect("register should succeed")
907            .build_host()
908            .expect("build_host should succeed");
909
910        // The custom store should be the one on the host
911        assert!(Arc::ptr_eq(
912            host.notification_store().unwrap(),
913            &store_clone
914        ));
915    }
916
917    #[test]
918    fn test_retro_compatible_no_sinks_no_events() {
919        // Exact same test as before — nothing changes
920        let host = ServerBuilder::new()
921            .with_link_service(InMemoryLinkService::new())
922            .with_event_bus(16)
923            .register_module(StubModule::single_entity())
924            .expect("register should succeed")
925            .build_host()
926            .expect("build_host should succeed");
927
928        assert!(host.event_bus().is_some());
929        assert!(host.sink_registry().is_none());
930        assert!(host.event_log().is_none());
931        assert_eq!(host.config.entities.len(), 1);
932    }
933}