Skip to main content

modkit/runtime/
host_runtime.rs

1//! Host Runtime - orchestrates the full `ModKit` lifecycle
2//!
3//! This module contains the `HostRuntime` type that owns and coordinates
4//! the execution of all lifecycle phases.
5//!
6//! High-level phase order:
7//! - `pre_init` (system modules only)
8//! - DB migrations (modules with DB capability)
9//! - `init` (all modules)
10//! - `post_init` (system modules only; runs after *all* `init` complete)
11//! - REST wiring (modules with REST capability; requires a single REST host)
12//! - gRPC registration (modules with gRPC capability; requires a single gRPC hub)
13//! - start/stop (stateful modules)
14//! - `OoP` spawn / wait / stop (host-only orchestration)
15
16use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19use tokio_util::sync::CancellationToken;
20use uuid::Uuid;
21
22use crate::backends::OopSpawnConfig;
23use crate::client_hub::ClientHub;
24use crate::config::ConfigProvider;
25use crate::context::ModuleContextBuilder;
26use crate::registry::{
27    ApiGatewayCap, DatabaseCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap,
28    RunnableCap, SystemCap,
29};
30use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
31
32/// How the runtime should provide DBs to modules.
33#[derive(Clone)]
34pub enum DbOptions {
35    /// No database integration. `ModuleCtx::db()` will be `None`, `db_required()` will error.
36    None,
37    /// Use a `DbManager` to handle database connections with Figment-based configuration.
38    Manager(Arc<modkit_db::DbManager>),
39}
40
41/// Environment variable name for passing directory endpoint to `OoP` modules.
42pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
43
44/// Environment variable name for passing rendered module config to `OoP` modules.
45pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
46
47/// `HostRuntime` owns the lifecycle orchestration for `ModKit`.
48///
49/// It encapsulates all runtime state and drives modules through the full lifecycle (see module docs).
50pub struct HostRuntime {
51    registry: ModuleRegistry,
52    ctx_builder: ModuleContextBuilder,
53    instance_id: Uuid,
54    module_manager: Arc<ModuleManager>,
55    grpc_installers: Arc<GrpcInstallerStore>,
56    #[allow(dead_code)]
57    client_hub: Arc<ClientHub>,
58    cancel: CancellationToken,
59    #[allow(dead_code)]
60    db_options: DbOptions,
61    /// `OoP` module spawn configuration and backend
62    oop_options: Option<OopSpawnOptions>,
63}
64
65impl HostRuntime {
66    /// Create a new `HostRuntime` instance.
67    ///
68    /// This prepares all runtime components but does not start any lifecycle phases.
69    pub fn new(
70        registry: ModuleRegistry,
71        modules_cfg: Arc<dyn ConfigProvider>,
72        db_options: DbOptions,
73        client_hub: Arc<ClientHub>,
74        cancel: CancellationToken,
75        instance_id: Uuid,
76        oop_options: Option<OopSpawnOptions>,
77    ) -> Self {
78        // Create runtime-owned components for system modules
79        let module_manager = Arc::new(ModuleManager::new());
80        let grpc_installers = Arc::new(GrpcInstallerStore::new());
81
82        // Build the context builder that will resolve per-module DbHandles
83        let db_manager = match &db_options {
84            DbOptions::Manager(mgr) => Some(mgr.clone()),
85            DbOptions::None => None,
86        };
87
88        let ctx_builder = ModuleContextBuilder::new(
89            instance_id,
90            modules_cfg,
91            client_hub.clone(),
92            cancel.clone(),
93            db_manager,
94        );
95
96        Self {
97            registry,
98            ctx_builder,
99            instance_id,
100            module_manager,
101            grpc_installers,
102            client_hub,
103            cancel,
104            db_options,
105            oop_options,
106        }
107    }
108
109    /// `PRE_INIT` phase: wire runtime internals into system modules.
110    ///
111    /// This phase runs before init and only for modules with the "system" capability.
112    ///
113    /// # Errors
114    /// Returns `RegistryError` if system wiring fails.
115    pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
116        tracing::info!("Phase: pre_init");
117
118        let sys_ctx = SystemContext::new(
119            self.instance_id,
120            Arc::clone(&self.module_manager),
121            Arc::clone(&self.grpc_installers),
122        );
123
124        for entry in self.registry.modules() {
125            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
126                tracing::debug!(module = entry.name, "Running system pre_init");
127                sys_mod
128                    .pre_init(&sys_ctx)
129                    .map_err(|e| RegistryError::PreInit {
130                        module: entry.name,
131                        source: e,
132                    })?;
133            }
134        }
135
136        Ok(())
137    }
138
139    /// Helper: resolve context for a module with error mapping.
140    async fn module_context(
141        &self,
142        module_name: &'static str,
143    ) -> Result<crate::context::ModuleCtx, RegistryError> {
144        self.ctx_builder
145            .for_module(module_name)
146            .await
147            .map_err(|e| RegistryError::DbMigrate {
148                module: module_name,
149                source: e,
150            })
151    }
152
153    /// Helper: extract DB handle and module if both exist.
154    fn db_migration_target(
155        ctx: &crate::context::ModuleCtx,
156        db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
157    ) -> Option<(
158        Arc<modkit_db::DbHandle>,
159        Arc<dyn crate::contracts::DatabaseCapability>,
160    )> {
161        match (ctx.db_optional(), db_module) {
162            (Some(db), Some(dbm)) => Some((db, dbm)),
163            _ => None,
164        }
165    }
166
167    /// Helper: run migration for a single module.
168    async fn migrate_module(
169        module_name: &'static str,
170        db: &modkit_db::DbHandle,
171        db_module: Arc<dyn crate::contracts::DatabaseCapability>,
172    ) -> Result<(), RegistryError> {
173        tracing::debug!(module = module_name, "Running DB migration");
174        db_module
175            .migrate(db)
176            .await
177            .map_err(|source| RegistryError::DbMigrate {
178                module: module_name,
179                source,
180            })
181    }
182
183    /// DB MIGRATION phase: run migrations for all modules with DB capability.
184    ///
185    /// Runs before init, with system modules processed first.
186    async fn run_db_phase(&self) -> Result<(), RegistryError> {
187        tracing::info!("Phase: db (before init)");
188
189        for entry in self.registry.modules_by_system_priority() {
190            let ctx = self.module_context(entry.name).await?;
191            let db_module = entry.caps.query::<DatabaseCap>();
192
193            match Self::db_migration_target(&ctx, db_module.clone()) {
194                Some((db, dbm)) => {
195                    Self::migrate_module(entry.name, &db, dbm).await?;
196                }
197                None if db_module.is_some() => {
198                    tracing::debug!(
199                        module = entry.name,
200                        "Module has DbModule trait but no DB handle (no config)"
201                    );
202                }
203                None => {}
204            }
205        }
206
207        Ok(())
208    }
209
210    /// INIT phase: initialize all modules in topological order.
211    ///
212    /// System modules initialize first, followed by user modules.
213    async fn run_init_phase(&self) -> Result<(), RegistryError> {
214        tracing::info!("Phase: init");
215
216        for entry in self.registry.modules_by_system_priority() {
217            let ctx =
218                self.ctx_builder
219                    .for_module(entry.name)
220                    .await
221                    .map_err(|e| RegistryError::Init {
222                        module: entry.name,
223                        source: e,
224                    })?;
225            entry
226                .core
227                .init(&ctx)
228                .await
229                .map_err(|e| RegistryError::Init {
230                    module: entry.name,
231                    source: e,
232                })?;
233        }
234
235        Ok(())
236    }
237
238    /// `POST_INIT` phase: optional hook after ALL modules completed `init()`.
239    ///
240    /// This provides a global barrier between initialization-time registration
241    /// and subsequent phases that may rely on a fully-populated runtime registry.
242    ///
243    /// System modules run first, followed by user modules, preserving topo order.
244    async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
245        tracing::info!("Phase: post_init");
246
247        let sys_ctx = SystemContext::new(
248            self.instance_id,
249            Arc::clone(&self.module_manager),
250            Arc::clone(&self.grpc_installers),
251        );
252
253        for entry in self.registry.modules_by_system_priority() {
254            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
255                sys_mod
256                    .post_init(&sys_ctx)
257                    .await
258                    .map_err(|e| RegistryError::PostInit {
259                        module: entry.name,
260                        source: e,
261                    })?;
262            }
263        }
264
265        Ok(())
266    }
267
268    /// REST phase: compose the router against the REST host.
269    ///
270    /// This is a synchronous phase that builds the final Router by:
271    /// 1. Preparing the host module
272    /// 2. Registering all REST providers
273    /// 3. Finalizing with `OpenAPI` endpoints
274    async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
275        tracing::info!("Phase: rest (sync)");
276
277        let mut router = Router::new();
278
279        // Find host(s) and whether any rest modules exist
280        let host_count = self
281            .registry
282            .modules()
283            .iter()
284            .filter(|e| e.caps.has::<ApiGatewayCap>())
285            .count();
286
287        match host_count {
288            0 => {
289                return if self
290                    .registry
291                    .modules()
292                    .iter()
293                    .any(|e| e.caps.has::<RestApiCap>())
294                {
295                    Err(RegistryError::RestRequiresHost)
296                } else {
297                    Ok(router)
298                };
299            }
300            1 => { /* proceed */ }
301            _ => return Err(RegistryError::MultipleRestHosts),
302        }
303
304        // Resolve the single host entry and its module context
305        let host_idx = self
306            .registry
307            .modules()
308            .iter()
309            .position(|e| e.caps.has::<ApiGatewayCap>())
310            .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
311        let host_entry = &self.registry.modules()[host_idx];
312        let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
313            return Err(RegistryError::RestHostMissingFromEntry);
314        };
315        let host_ctx = self
316            .ctx_builder
317            .for_module(host_entry.name)
318            .await
319            .map_err(|e| RegistryError::RestPrepare {
320                module: host_entry.name,
321                source: e,
322            })?;
323
324        // use host as the registry
325        let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
326
327        // 1) Host prepare: base Router / global middlewares / basic OAS meta
328        router =
329            host.rest_prepare(&host_ctx, router)
330                .map_err(|source| RegistryError::RestPrepare {
331                    module: host_entry.name,
332                    source,
333                })?;
334
335        // 2) Register all REST providers (in the current discovery order)
336        for e in self.registry.modules() {
337            if let Some(rest) = e.caps.query::<RestApiCap>() {
338                let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
339                    RegistryError::RestRegister {
340                        module: e.name,
341                        source: err,
342                    }
343                })?;
344                router = rest
345                    .register_rest(&ctx, router, registry)
346                    .map_err(|source| RegistryError::RestRegister {
347                        module: e.name,
348                        source,
349                    })?;
350            }
351        }
352
353        // 3) Host finalize: attach /openapi.json and /docs, persist Router if needed (no server start)
354        router = host.rest_finalize(&host_ctx, router).map_err(|source| {
355            RegistryError::RestFinalize {
356                module: host_entry.name,
357                source,
358            }
359        })?;
360
361        Ok(router)
362    }
363
364    /// gRPC registration phase: collect services from all grpc modules.
365    ///
366    /// Services are stored in the installer store for the `grpc_hub` to consume during start.
367    async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
368        tracing::info!("Phase: grpc (registration)");
369
370        // If no grpc_hub and no grpc_services, skip the phase
371        if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
372            return Ok(());
373        }
374
375        // If there are grpc_services but no hub, that's an error
376        if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
377            return Err(RegistryError::GrpcRequiresHub);
378        }
379
380        // If there's a hub, collect all services grouped by module and hand them off to the installer store
381        if let Some(hub_name) = &self.registry.grpc_hub {
382            let mut modules_data = Vec::new();
383            let mut seen = HashSet::new();
384
385            // Collect services from all grpc modules
386            for (module_name, service_module) in &self.registry.grpc_services {
387                let ctx = self
388                    .ctx_builder
389                    .for_module(module_name)
390                    .await
391                    .map_err(|err| RegistryError::GrpcRegister {
392                        module: module_name.clone(),
393                        source: err,
394                    })?;
395
396                let installers =
397                    service_module
398                        .get_grpc_services(&ctx)
399                        .await
400                        .map_err(|source| RegistryError::GrpcRegister {
401                            module: module_name.clone(),
402                            source,
403                        })?;
404
405                for reg in &installers {
406                    if !seen.insert(reg.service_name) {
407                        return Err(RegistryError::GrpcRegister {
408                            module: module_name.clone(),
409                            source: anyhow::anyhow!(
410                                "Duplicate gRPC service name: {}",
411                                reg.service_name
412                            ),
413                        });
414                    }
415                }
416
417                modules_data.push(crate::runtime::ModuleInstallers {
418                    module_name: module_name.clone(),
419                    installers,
420                });
421            }
422
423            self.grpc_installers
424                .set(crate::runtime::GrpcInstallerData {
425                    modules: modules_data,
426                })
427                .map_err(|source| RegistryError::GrpcRegister {
428                    module: hub_name.clone(),
429                    source,
430                })?;
431        }
432
433        Ok(())
434    }
435
436    /// START phase: start all stateful modules.
437    ///
438    /// System modules start first, followed by user modules.
439    async fn run_start_phase(&self) -> Result<(), RegistryError> {
440        tracing::info!("Phase: start");
441
442        for e in self.registry.modules_by_system_priority() {
443            if let Some(s) = e.caps.query::<RunnableCap>() {
444                tracing::debug!(
445                    module = e.name,
446                    is_system = e.caps.has::<SystemCap>(),
447                    "Starting stateful module"
448                );
449                s.start(self.cancel.clone())
450                    .await
451                    .map_err(|source| RegistryError::Start {
452                        module: e.name,
453                        source,
454                    })?;
455                tracing::info!(module = e.name, "Started module");
456            }
457        }
458
459        Ok(())
460    }
461
462    /// Stop a single module, logging errors but continuing execution.
463    async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
464        if let Some(s) = entry.caps.query::<RunnableCap>() {
465            match s.stop(cancel).await {
466                Err(err) => {
467                    tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
468                }
469                _ => {
470                    tracing::info!(module = entry.name, "Stopped module");
471                }
472            }
473        }
474    }
475
476    /// STOP phase: stop all stateful modules in reverse order.
477    ///
478    /// Errors are logged but do not fail the shutdown process.
479    /// Note: `OoP` modules are stopped automatically by the backend when the
480    /// cancellation token is triggered.
481    async fn run_stop_phase(&self) -> Result<(), RegistryError> {
482        tracing::info!("Phase: stop");
483
484        for e in self.registry.modules().iter().rev() {
485            Self::stop_one_module(e, self.cancel.clone()).await;
486        }
487
488        Ok(())
489    }
490
491    /// `OoP` SPAWN phase: spawn out-of-process modules after start phase.
492    ///
493    /// This phase runs after `grpc_hub` is already listening, so we can pass
494    /// the real directory endpoint to `OoP` modules.
495    async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
496        let oop_opts = match &self.oop_options {
497            Some(opts) if !opts.modules.is_empty() => opts,
498            _ => return Ok(()),
499        };
500
501        tracing::info!("Phase: oop_spawn");
502
503        // Wait for grpc_hub to publish its endpoint (it runs async in start phase)
504        let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
505
506        for module_cfg in &oop_opts.modules {
507            // Build environment with directory endpoint and rendered config
508            // Note: User controls --config via execution.args in master config
509            let mut env = module_cfg.env.clone();
510            env.insert(
511                MODKIT_MODULE_CONFIG_ENV.to_owned(),
512                module_cfg.rendered_config_json.clone(),
513            );
514            if let Some(ref endpoint) = directory_endpoint {
515                env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
516            }
517
518            // Use args from execution config as-is (user controls --config via args)
519            let args = module_cfg.args.clone();
520
521            let spawn_config = OopSpawnConfig {
522                module_name: module_cfg.module_name.clone(),
523                binary: module_cfg.binary.clone(),
524                args,
525                env,
526                working_directory: module_cfg.working_directory.clone(),
527            };
528
529            oop_opts
530                .backend
531                .spawn(spawn_config)
532                .await
533                .map_err(|e| RegistryError::OopSpawn {
534                    module: module_cfg.module_name.clone(),
535                    source: e,
536                })?;
537
538            tracing::info!(
539                module = %module_cfg.module_name,
540                directory_endpoint = ?directory_endpoint,
541                "Spawned OoP module via backend"
542            );
543        }
544
545        Ok(())
546    }
547
548    /// Wait for `grpc_hub` to publish its bound endpoint.
549    ///
550    /// Polls the `GrpcHubModule::bound_endpoint()` with a short interval until available or timeout.
551    /// Returns None if no `grpc_hub` is running or if it times out.
552    async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
553        const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
554        const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
555
556        // Find grpc_hub in registry
557        let grpc_hub = self
558            .registry
559            .modules()
560            .iter()
561            .find_map(|e| e.caps.query::<GrpcHubCap>());
562
563        let Some(hub) = grpc_hub else {
564            return None; // No grpc_hub registered
565        };
566
567        let start = std::time::Instant::now();
568
569        loop {
570            if let Some(endpoint) = hub.bound_endpoint() {
571                tracing::debug!(
572                    endpoint = %endpoint,
573                    elapsed_ms = start.elapsed().as_millis(),
574                    "gRPC hub endpoint available"
575                );
576                return Some(endpoint);
577            }
578
579            if start.elapsed() > MAX_WAIT {
580                tracing::warn!("Timed out waiting for gRPC hub to bind");
581                return None;
582            }
583
584            tokio::time::sleep(POLL_INTERVAL).await;
585        }
586    }
587
588    /// Run the full lifecycle: `pre_init` → DB → init → `post_init` → REST → gRPC → start → `OoP` spawn → wait → stop.
589    ///
590    /// This is the main entry point for orchestrating the complete module lifecycle.
591    ///
592    /// # Errors
593    /// Returns an error if any lifecycle phase fails.
594    pub async fn run_module_phases(self) -> anyhow::Result<()> {
595        // 1. Pre-init phase (before init, only for system modules)
596        self.run_pre_init_phase()?;
597
598        // 2. DB migration phase (system modules first)
599        self.run_db_phase().await?;
600
601        // 3. Init phase (system modules first)
602        self.run_init_phase().await?;
603
604        // 4. Post-init phase (barrier after ALL init; system modules only)
605        self.run_post_init_phase().await?;
606
607        // 5. REST phase (synchronous router composition)
608        let _router = self.run_rest_phase().await?;
609
610        // 6. gRPC registration phase
611        self.run_grpc_phase().await?;
612
613        // 7. Start phase
614        self.run_start_phase().await?;
615
616        // 8. OoP spawn phase (after grpc_hub is running)
617        self.run_oop_spawn_phase().await?;
618
619        // 9. Wait for cancellation
620        self.cancel.cancelled().await;
621
622        // 10. Stop phase
623        self.run_stop_phase().await?;
624
625        Ok(())
626    }
627}
628
629#[cfg(test)]
630#[cfg_attr(coverage_nightly, coverage(off))]
631mod tests {
632    use super::*;
633    use crate::context::ModuleCtx;
634    use crate::contracts::{Module, RunnableCapability, SystemCapability};
635    use crate::registry::RegistryBuilder;
636    use std::sync::Arc;
637    use std::sync::atomic::{AtomicUsize, Ordering};
638    use tokio::sync::Mutex;
639
640    #[derive(Default)]
641    #[allow(dead_code)]
642    struct DummyCore;
643    #[async_trait::async_trait]
644    impl Module for DummyCore {
645        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
646            Ok(())
647        }
648    }
649
650    struct StopOrderTracker {
651        my_order: usize,
652        stop_order: Arc<AtomicUsize>,
653    }
654
655    impl StopOrderTracker {
656        fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
657            let my_order = counter.fetch_add(1, Ordering::SeqCst);
658            Self {
659                my_order,
660                stop_order,
661            }
662        }
663    }
664
665    #[async_trait::async_trait]
666    impl Module for StopOrderTracker {
667        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
668            Ok(())
669        }
670    }
671
672    #[async_trait::async_trait]
673    impl RunnableCapability for StopOrderTracker {
674        async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
675            Ok(())
676        }
677        async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
678            let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
679            tracing::info!(
680                my_order = self.my_order,
681                stop_order = order,
682                "Module stopped"
683            );
684            Ok(())
685        }
686    }
687
688    #[tokio::test]
689    async fn test_stop_phase_reverse_order() {
690        let counter = Arc::new(AtomicUsize::new(0));
691        let stop_order = Arc::new(AtomicUsize::new(0));
692
693        let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
694        let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
695        let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
696
697        let mut builder = RegistryBuilder::default();
698        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
699        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
700        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
701
702        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
703        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
704        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
705
706        let registry = builder.build_topo_sorted().unwrap();
707
708        // Verify module order is a -> b -> c
709        let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
710        assert_eq!(module_names, vec!["a", "b", "c"]);
711
712        let client_hub = Arc::new(ClientHub::new());
713        let cancel = CancellationToken::new();
714        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
715
716        let runtime = HostRuntime::new(
717            registry,
718            config_provider,
719            DbOptions::None,
720            client_hub,
721            cancel.clone(),
722            Uuid::new_v4(),
723            None,
724        );
725
726        // Run stop phase
727        runtime.run_stop_phase().await.unwrap();
728
729        // Verify modules stopped in reverse order: c (stop_order=0), b (stop_order=1), a (stop_order=2)
730        // Module order is: a=0, b=1, c=2
731        // Stop order should be: c=0, b=1, a=2
732        assert_eq!(stop_order.load(Ordering::SeqCst), 3);
733    }
734
735    #[tokio::test]
736    async fn test_stop_phase_continues_on_error() {
737        struct FailingModule {
738            should_fail: bool,
739            stopped: Arc<AtomicUsize>,
740        }
741
742        #[async_trait::async_trait]
743        impl Module for FailingModule {
744            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
745                Ok(())
746            }
747        }
748
749        #[async_trait::async_trait]
750        impl RunnableCapability for FailingModule {
751            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
752                Ok(())
753            }
754            async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
755                self.stopped.fetch_add(1, Ordering::SeqCst);
756                if self.should_fail {
757                    anyhow::bail!("Intentional failure")
758                }
759                Ok(())
760            }
761        }
762
763        let stopped = Arc::new(AtomicUsize::new(0));
764        let module_a = Arc::new(FailingModule {
765            should_fail: false,
766            stopped: stopped.clone(),
767        });
768        let module_b = Arc::new(FailingModule {
769            should_fail: true,
770            stopped: stopped.clone(),
771        });
772        let module_c = Arc::new(FailingModule {
773            should_fail: false,
774            stopped: stopped.clone(),
775        });
776
777        let mut builder = RegistryBuilder::default();
778        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
779        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
780        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
781
782        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
783        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
784        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
785
786        let registry = builder.build_topo_sorted().unwrap();
787
788        let client_hub = Arc::new(ClientHub::new());
789        let cancel = CancellationToken::new();
790        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
791
792        let runtime = HostRuntime::new(
793            registry,
794            config_provider,
795            DbOptions::None,
796            client_hub,
797            cancel.clone(),
798            Uuid::new_v4(),
799            None,
800        );
801
802        // Run stop phase - should not fail even though module_b fails
803        runtime.run_stop_phase().await.unwrap();
804
805        // All modules should have attempted to stop
806        assert_eq!(stopped.load(Ordering::SeqCst), 3);
807    }
808
809    struct EmptyConfigProvider;
810    impl ConfigProvider for EmptyConfigProvider {
811        fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
812            None
813        }
814    }
815
816    #[tokio::test]
817    async fn test_post_init_runs_after_all_init_and_system_first() {
818        #[derive(Clone)]
819        struct TrackHooks {
820            name: &'static str,
821            events: Arc<Mutex<Vec<String>>>,
822        }
823
824        #[async_trait::async_trait]
825        impl Module for TrackHooks {
826            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
827                self.events.lock().await.push(format!("init:{}", self.name));
828                Ok(())
829            }
830        }
831
832        #[async_trait::async_trait]
833        impl SystemCapability for TrackHooks {
834            fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
835                Ok(())
836            }
837
838            async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
839                self.events
840                    .lock()
841                    .await
842                    .push(format!("post_init:{}", self.name));
843                Ok(())
844            }
845        }
846
847        let events = Arc::new(Mutex::new(Vec::<String>::new()));
848        let sys_a = Arc::new(TrackHooks {
849            name: "sys_a",
850            events: events.clone(),
851        });
852        let user_b = Arc::new(TrackHooks {
853            name: "user_b",
854            events: events.clone(),
855        });
856        let user_c = Arc::new(TrackHooks {
857            name: "user_c",
858            events: events.clone(),
859        });
860
861        let mut builder = RegistryBuilder::default();
862        builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
863        builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
864        builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
865        builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
866
867        let registry = builder.build_topo_sorted().unwrap();
868
869        let client_hub = Arc::new(ClientHub::new());
870        let cancel = CancellationToken::new();
871        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
872
873        let runtime = HostRuntime::new(
874            registry,
875            config_provider,
876            DbOptions::None,
877            client_hub,
878            cancel,
879            Uuid::new_v4(),
880            None,
881        );
882
883        // Run init phase for all modules, then post_init as a separate barrier phase.
884        runtime.run_init_phase().await.unwrap();
885        runtime.run_post_init_phase().await.unwrap();
886
887        let events = events.lock().await.clone();
888        let first_post_init = events
889            .iter()
890            .position(|e| e.starts_with("post_init:"))
891            .expect("expected post_init events");
892        assert!(
893            events[..first_post_init]
894                .iter()
895                .all(|e| e.starts_with("init:")),
896            "expected all init events before post_init, got: {events:?}"
897        );
898
899        // system-first order within each phase
900        assert_eq!(
901            events,
902            vec![
903                "init:sys_a",
904                "init:user_b",
905                "init:user_c",
906                "post_init:sys_a",
907            ]
908        );
909    }
910}