Skip to main content

modkit/runtime/
host_runtime.rs

1//! Host Runtime - orchestrates the full `ModKit` lifecycle
2//!
3//! This module contains the `HostRuntime` type that owns and coordinates
4//! the execution of all lifecycle phases.
5//!
6//! High-level phase order:
7//! - `pre_init` (system modules only)
8//! - DB migrations (modules with DB capability)
9//! - `init` (all modules)
10//! - `post_init` (system modules only; runs after *all* `init` complete)
11//! - REST wiring (modules with REST capability; requires a single REST host)
12//! - gRPC registration (modules with gRPC capability; requires a single gRPC hub)
13//! - start/stop (stateful modules)
14//! - `OoP` spawn / wait / stop (host-only orchestration)
15
16use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19use tokio_util::sync::CancellationToken;
20use uuid::Uuid;
21
22use crate::backends::OopSpawnConfig;
23use crate::client_hub::ClientHub;
24use crate::config::ConfigProvider;
25use crate::context::ModuleContextBuilder;
26use crate::registry::{
27    ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
28    SystemCap,
29};
30use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
31
32#[cfg(feature = "db")]
33use crate::registry::DatabaseCap;
34
35/// How the runtime should provide DBs to modules.
36#[derive(Clone)]
37pub enum DbOptions {
38    /// No database integration. `ModuleCtx::db()` will be `None`, `db_required()` will error.
39    None,
40    /// Use a `DbManager` to handle database connections with Figment-based configuration.
41    #[cfg(feature = "db")]
42    Manager(Arc<modkit_db::DbManager>),
43}
44
45/// Runtime execution mode that determines which phases to run.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum RunMode {
48    /// Run all phases and wait for shutdown signal (normal application mode).
49    Full,
50    /// Run only pre-init and DB migration phases, then exit (for cloud deployments).
51    MigrateOnly,
52}
53
54/// Environment variable name for passing directory endpoint to `OoP` modules.
55pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
56
57/// Environment variable name for passing rendered module config to `OoP` modules.
58pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
59
60/// `HostRuntime` owns the lifecycle orchestration for `ModKit`.
61///
62/// It encapsulates all runtime state and drives modules through the full lifecycle (see module docs).
63pub struct HostRuntime {
64    registry: ModuleRegistry,
65    ctx_builder: ModuleContextBuilder,
66    instance_id: Uuid,
67    module_manager: Arc<ModuleManager>,
68    grpc_installers: Arc<GrpcInstallerStore>,
69    #[allow(dead_code)]
70    client_hub: Arc<ClientHub>,
71    cancel: CancellationToken,
72    #[allow(dead_code)]
73    db_options: DbOptions,
74    /// `OoP` module spawn configuration and backend
75    oop_options: Option<OopSpawnOptions>,
76}
77
78impl HostRuntime {
79    /// Create a new `HostRuntime` instance.
80    ///
81    /// This prepares all runtime components but does not start any lifecycle phases.
82    pub fn new(
83        registry: ModuleRegistry,
84        modules_cfg: Arc<dyn ConfigProvider>,
85        db_options: DbOptions,
86        client_hub: Arc<ClientHub>,
87        cancel: CancellationToken,
88        instance_id: Uuid,
89        oop_options: Option<OopSpawnOptions>,
90    ) -> Self {
91        // Create runtime-owned components for system modules
92        let module_manager = Arc::new(ModuleManager::new());
93        let grpc_installers = Arc::new(GrpcInstallerStore::new());
94
95        // Build the context builder that will resolve per-module DbHandles
96        let db_manager = match &db_options {
97            #[cfg(feature = "db")]
98            DbOptions::Manager(mgr) => Some(mgr.clone()),
99            DbOptions::None => None,
100        };
101
102        let ctx_builder = ModuleContextBuilder::new(
103            instance_id,
104            modules_cfg,
105            client_hub.clone(),
106            cancel.clone(),
107            db_manager,
108        );
109
110        Self {
111            registry,
112            ctx_builder,
113            instance_id,
114            module_manager,
115            grpc_installers,
116            client_hub,
117            cancel,
118            db_options,
119            oop_options,
120        }
121    }
122
123    /// `PRE_INIT` phase: wire runtime internals into system modules.
124    ///
125    /// This phase runs before init and only for modules with the "system" capability.
126    ///
127    /// # Errors
128    /// Returns `RegistryError` if system wiring fails.
129    pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
130        tracing::info!("Phase: pre_init");
131
132        let sys_ctx = SystemContext::new(
133            self.instance_id,
134            Arc::clone(&self.module_manager),
135            Arc::clone(&self.grpc_installers),
136        );
137
138        for entry in self.registry.modules() {
139            // Check for cancellation before processing each module
140            if self.cancel.is_cancelled() {
141                tracing::warn!("Pre-init phase cancelled by signal");
142                return Err(RegistryError::Cancelled);
143            }
144
145            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
146                tracing::debug!(module = entry.name, "Running system pre_init");
147                sys_mod
148                    .pre_init(&sys_ctx)
149                    .map_err(|e| RegistryError::PreInit {
150                        module: entry.name,
151                        source: e,
152                    })?;
153            }
154        }
155
156        Ok(())
157    }
158
159    /// Helper: resolve context for a module with error mapping.
160    async fn module_context(
161        &self,
162        module_name: &'static str,
163    ) -> Result<crate::context::ModuleCtx, RegistryError> {
164        self.ctx_builder
165            .for_module(module_name)
166            .await
167            .map_err(|e| RegistryError::DbMigrate {
168                module: module_name,
169                source: e,
170            })
171    }
172
173    /// Helper: extract DB handle and module if both exist.
174    #[cfg(feature = "db")]
175    async fn db_migration_target(
176        &self,
177        module_name: &'static str,
178        ctx: &crate::context::ModuleCtx,
179        db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
180    ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
181    {
182        let Some(dbm) = db_module else {
183            return Ok(None);
184        };
185
186        // Important: DB migrations require access to the underlying `Db`, not just `DBProvider`.
187        // `ModuleCtx` intentionally exposes only `DBProvider` for better DX and to reduce mistakes.
188        // So the runtime resolves the `Db` directly from its `DbManager`.
189        let db = match &self.db_options {
190            DbOptions::None => None,
191            #[cfg(feature = "db")]
192            DbOptions::Manager(mgr) => {
193                mgr.get(module_name)
194                    .await
195                    .map_err(|e| RegistryError::DbMigrate {
196                        module: module_name,
197                        source: e.into(),
198                    })?
199            }
200        };
201
202        let _ = ctx; // ctx is kept for parity/error context; DB is resolved from manager above.
203        Ok(db.map(|db| (db, dbm)))
204    }
205
206    /// Helper: run migrations for a single module using the new migration runner.
207    ///
208    /// This collects migrations from the module and executes them via the
209    /// runtime's privileged connection. Modules never see the raw connection.
210    #[cfg(feature = "db")]
211    async fn migrate_module(
212        module_name: &'static str,
213        db: &modkit_db::Db,
214        db_module: Arc<dyn crate::contracts::DatabaseCapability>,
215    ) -> Result<(), RegistryError> {
216        // Collect migrations from the module
217        let migrations = db_module.migrations();
218
219        if migrations.is_empty() {
220            tracing::debug!(module = module_name, "No migrations to run");
221            return Ok(());
222        }
223
224        tracing::debug!(
225            module = module_name,
226            count = migrations.len(),
227            "Running DB migrations"
228        );
229
230        // Execute migrations using the migration runner
231        let result =
232            modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
233                .await
234                .map_err(|e| RegistryError::DbMigrate {
235                    module: module_name,
236                    source: anyhow::Error::new(e),
237                })?;
238
239        tracing::info!(
240            module = module_name,
241            applied = result.applied,
242            skipped = result.skipped,
243            "DB migrations completed"
244        );
245
246        Ok(())
247    }
248
249    /// DB MIGRATION phase: run migrations for all modules with DB capability.
250    ///
251    /// Runs before init, with system modules processed first.
252    ///
253    /// Modules provide migrations via `DatabaseCapability::migrations()`.
254    /// The runtime executes them with a privileged connection that modules
255    /// never receive directly. Each module gets a separate migration history
256    /// table, preventing cross-module interference.
257    #[cfg(feature = "db")]
258    async fn run_db_phase(&self) -> Result<(), RegistryError> {
259        tracing::info!("Phase: db (before init)");
260
261        for entry in self.registry.modules_by_system_priority() {
262            // Check for cancellation before processing each module
263            if self.cancel.is_cancelled() {
264                tracing::warn!("DB migration phase cancelled by signal");
265                return Err(RegistryError::Cancelled);
266            }
267
268            let ctx = self.module_context(entry.name).await?;
269            let db_module = entry.caps.query::<DatabaseCap>();
270
271            match self
272                .db_migration_target(entry.name, &ctx, db_module.clone())
273                .await?
274            {
275                Some((db, dbm)) => {
276                    Self::migrate_module(entry.name, &db, dbm).await?;
277                }
278                None if db_module.is_some() => {
279                    tracing::debug!(
280                        module = entry.name,
281                        "Module has DbModule trait but no DB handle (no config)"
282                    );
283                }
284                None => {}
285            }
286        }
287
288        Ok(())
289    }
290
291    /// INIT phase: initialize all modules in topological order.
292    ///
293    /// System modules initialize first, followed by user modules.
294    async fn run_init_phase(&self) -> Result<(), RegistryError> {
295        tracing::info!("Phase: init");
296
297        for entry in self.registry.modules_by_system_priority() {
298            let ctx =
299                self.ctx_builder
300                    .for_module(entry.name)
301                    .await
302                    .map_err(|e| RegistryError::Init {
303                        module: entry.name,
304                        source: e,
305                    })?;
306            entry
307                .core
308                .init(&ctx)
309                .await
310                .map_err(|e| RegistryError::Init {
311                    module: entry.name,
312                    source: e,
313                })?;
314        }
315
316        Ok(())
317    }
318
319    /// `POST_INIT` phase: optional hook after ALL modules completed `init()`.
320    ///
321    /// This provides a global barrier between initialization-time registration
322    /// and subsequent phases that may rely on a fully-populated runtime registry.
323    ///
324    /// System modules run first, followed by user modules, preserving topo order.
325    async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
326        tracing::info!("Phase: post_init");
327
328        let sys_ctx = SystemContext::new(
329            self.instance_id,
330            Arc::clone(&self.module_manager),
331            Arc::clone(&self.grpc_installers),
332        );
333
334        for entry in self.registry.modules_by_system_priority() {
335            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
336                sys_mod
337                    .post_init(&sys_ctx)
338                    .await
339                    .map_err(|e| RegistryError::PostInit {
340                        module: entry.name,
341                        source: e,
342                    })?;
343            }
344        }
345
346        Ok(())
347    }
348
349    /// REST phase: compose the router against the REST host.
350    ///
351    /// This is a synchronous phase that builds the final Router by:
352    /// 1. Preparing the host module
353    /// 2. Registering all REST providers
354    /// 3. Finalizing with `OpenAPI` endpoints
355    async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
356        tracing::info!("Phase: rest (sync)");
357
358        let mut router = Router::new();
359
360        // Find host(s) and whether any rest modules exist
361        let host_count = self
362            .registry
363            .modules()
364            .iter()
365            .filter(|e| e.caps.has::<ApiGatewayCap>())
366            .count();
367
368        match host_count {
369            0 => {
370                return if self
371                    .registry
372                    .modules()
373                    .iter()
374                    .any(|e| e.caps.has::<RestApiCap>())
375                {
376                    Err(RegistryError::RestRequiresHost)
377                } else {
378                    Ok(router)
379                };
380            }
381            1 => { /* proceed */ }
382            _ => return Err(RegistryError::MultipleRestHosts),
383        }
384
385        // Resolve the single host entry and its module context
386        let host_idx = self
387            .registry
388            .modules()
389            .iter()
390            .position(|e| e.caps.has::<ApiGatewayCap>())
391            .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
392        let host_entry = &self.registry.modules()[host_idx];
393        let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
394            return Err(RegistryError::RestHostMissingFromEntry);
395        };
396        let host_ctx = self
397            .ctx_builder
398            .for_module(host_entry.name)
399            .await
400            .map_err(|e| RegistryError::RestPrepare {
401                module: host_entry.name,
402                source: e,
403            })?;
404
405        // use host as the registry
406        let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
407
408        // 1) Host prepare: base Router / global middlewares / basic OAS meta
409        router =
410            host.rest_prepare(&host_ctx, router)
411                .map_err(|source| RegistryError::RestPrepare {
412                    module: host_entry.name,
413                    source,
414                })?;
415
416        // 2) Register all REST providers (in the current discovery order)
417        for e in self.registry.modules() {
418            if let Some(rest) = e.caps.query::<RestApiCap>() {
419                let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
420                    RegistryError::RestRegister {
421                        module: e.name,
422                        source: err,
423                    }
424                })?;
425                router = rest
426                    .register_rest(&ctx, router, registry)
427                    .map_err(|source| RegistryError::RestRegister {
428                        module: e.name,
429                        source,
430                    })?;
431            }
432        }
433
434        // 3) Host finalize: attach /openapi.json and /docs, persist Router if needed (no server start)
435        router = host.rest_finalize(&host_ctx, router).map_err(|source| {
436            RegistryError::RestFinalize {
437                module: host_entry.name,
438                source,
439            }
440        })?;
441
442        Ok(router)
443    }
444
445    /// gRPC registration phase: collect services from all grpc modules.
446    ///
447    /// Services are stored in the installer store for the `grpc-hub` to consume during start.
448    async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
449        tracing::info!("Phase: grpc (registration)");
450
451        // If no grpc_hub and no grpc_services, skip the phase
452        if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
453            return Ok(());
454        }
455
456        // If there are grpc_services but no hub, that's an error
457        if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
458            return Err(RegistryError::GrpcRequiresHub);
459        }
460
461        // If there's a hub, collect all services grouped by module and hand them off to the installer store
462        if let Some(hub_name) = &self.registry.grpc_hub {
463            let mut modules_data = Vec::new();
464            let mut seen = HashSet::new();
465
466            // Collect services from all grpc modules
467            for (module_name, service_module) in &self.registry.grpc_services {
468                let ctx = self
469                    .ctx_builder
470                    .for_module(module_name)
471                    .await
472                    .map_err(|err| RegistryError::GrpcRegister {
473                        module: module_name.clone(),
474                        source: err,
475                    })?;
476
477                let installers =
478                    service_module
479                        .get_grpc_services(&ctx)
480                        .await
481                        .map_err(|source| RegistryError::GrpcRegister {
482                            module: module_name.clone(),
483                            source,
484                        })?;
485
486                for reg in &installers {
487                    if !seen.insert(reg.service_name) {
488                        return Err(RegistryError::GrpcRegister {
489                            module: module_name.clone(),
490                            source: anyhow::anyhow!(
491                                "Duplicate gRPC service name: {}",
492                                reg.service_name
493                            ),
494                        });
495                    }
496                }
497
498                modules_data.push(crate::runtime::ModuleInstallers {
499                    module_name: module_name.clone(),
500                    installers,
501                });
502            }
503
504            self.grpc_installers
505                .set(crate::runtime::GrpcInstallerData {
506                    modules: modules_data,
507                })
508                .map_err(|source| RegistryError::GrpcRegister {
509                    module: hub_name.clone(),
510                    source,
511                })?;
512        }
513
514        Ok(())
515    }
516
517    /// START phase: start all stateful modules.
518    ///
519    /// System modules start first, followed by user modules.
520    async fn run_start_phase(&self) -> Result<(), RegistryError> {
521        tracing::info!("Phase: start");
522
523        for e in self.registry.modules_by_system_priority() {
524            if let Some(s) = e.caps.query::<RunnableCap>() {
525                tracing::debug!(
526                    module = e.name,
527                    is_system = e.caps.has::<SystemCap>(),
528                    "Starting stateful module"
529                );
530                s.start(self.cancel.clone())
531                    .await
532                    .map_err(|source| RegistryError::Start {
533                        module: e.name,
534                        source,
535                    })?;
536                tracing::info!(module = e.name, "Started module");
537            }
538        }
539
540        Ok(())
541    }
542
543    /// Stop a single module, logging errors but continuing execution.
544    async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
545        if let Some(s) = entry.caps.query::<RunnableCap>() {
546            match s.stop(cancel).await {
547                Err(err) => {
548                    tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
549                }
550                _ => {
551                    tracing::info!(module = entry.name, "Stopped module");
552                }
553            }
554        }
555    }
556
557    /// STOP phase: stop all stateful modules in reverse order.
558    ///
559    /// Errors are logged but do not fail the shutdown process.
560    /// Note: `OoP` modules are stopped automatically by the backend when the
561    /// cancellation token is triggered.
562    async fn run_stop_phase(&self) -> Result<(), RegistryError> {
563        tracing::info!("Phase: stop");
564
565        for e in self.registry.modules().iter().rev() {
566            Self::stop_one_module(e, self.cancel.clone()).await;
567        }
568
569        Ok(())
570    }
571
572    /// `OoP` SPAWN phase: spawn out-of-process modules after start phase.
573    ///
574    /// This phase runs after `grpc-hub` is already listening, so we can pass
575    /// the real directory endpoint to `OoP` modules.
576    async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
577        let oop_opts = match &self.oop_options {
578            Some(opts) if !opts.modules.is_empty() => opts,
579            _ => return Ok(()),
580        };
581
582        tracing::info!("Phase: oop_spawn");
583
584        // Wait for grpc_hub to publish its endpoint (it runs async in start phase)
585        let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
586
587        for module_cfg in &oop_opts.modules {
588            // Build environment with directory endpoint and rendered config
589            // Note: User controls --config via execution.args in master config
590            let mut env = module_cfg.env.clone();
591            env.insert(
592                MODKIT_MODULE_CONFIG_ENV.to_owned(),
593                module_cfg.rendered_config_json.clone(),
594            );
595            if let Some(ref endpoint) = directory_endpoint {
596                env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
597            }
598
599            // Use args from execution config as-is (user controls --config via args)
600            let args = module_cfg.args.clone();
601
602            let spawn_config = OopSpawnConfig {
603                module_name: module_cfg.module_name.clone(),
604                binary: module_cfg.binary.clone(),
605                args,
606                env,
607                working_directory: module_cfg.working_directory.clone(),
608            };
609
610            oop_opts
611                .backend
612                .spawn(spawn_config)
613                .await
614                .map_err(|e| RegistryError::OopSpawn {
615                    module: module_cfg.module_name.clone(),
616                    source: e,
617                })?;
618
619            tracing::info!(
620                module = %module_cfg.module_name,
621                directory_endpoint = ?directory_endpoint,
622                "Spawned OoP module via backend"
623            );
624        }
625
626        Ok(())
627    }
628
629    /// Wait for `grpc-hub` to publish its bound endpoint.
630    ///
631    /// Polls the `GrpcHubModule::bound_endpoint()` with a short interval until available or timeout.
632    /// Returns None if no `grpc-hub` is running or if it times out.
633    async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
634        const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
635        const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
636
637        // Find grpc_hub in registry
638        let grpc_hub = self
639            .registry
640            .modules()
641            .iter()
642            .find_map(|e| e.caps.query::<GrpcHubCap>());
643
644        let Some(hub) = grpc_hub else {
645            return None; // No grpc_hub registered
646        };
647
648        let start = std::time::Instant::now();
649
650        loop {
651            if let Some(endpoint) = hub.bound_endpoint() {
652                tracing::debug!(
653                    endpoint = %endpoint,
654                    elapsed_ms = start.elapsed().as_millis(),
655                    "gRPC hub endpoint available"
656                );
657                return Some(endpoint);
658            }
659
660            if start.elapsed() > MAX_WAIT {
661                tracing::warn!("Timed out waiting for gRPC hub to bind");
662                return None;
663            }
664
665            tokio::time::sleep(POLL_INTERVAL).await;
666        }
667    }
668
669    /// Run the full module lifecycle (all phases).
670    ///
671    /// This is the standard entry point for normal application execution.
672    /// It runs all phases from pre-init through shutdown.
673    ///
674    /// # Errors
675    ///
676    /// Returns an error if any module phase fails during execution.
677    pub async fn run_module_phases(self) -> anyhow::Result<()> {
678        self.run_phases_internal(RunMode::Full).await
679    }
680
681    /// Run only the migration phases (pre-init + DB migration).
682    ///
683    /// This is designed for cloud deployment workflows where database migrations
684    /// need to run as a separate step before starting the application.
685    /// The process exits after migrations complete.
686    ///
687    /// # Errors
688    ///
689    /// Returns an error if pre-init or migration phases fail.
690    pub async fn run_migration_phases(self) -> anyhow::Result<()> {
691        self.run_phases_internal(RunMode::MigrateOnly).await
692    }
693
694    /// Internal implementation that runs module phases based on the mode.
695    ///
696    /// This private method contains the actual phase execution logic and is called
697    /// by both `run_module_phases()` and `run_migration_phases()`.
698    ///
699    /// # Modes
700    ///
701    /// - `RunMode::Full`: Executes all phases and waits for shutdown signal
702    /// - `RunMode::MigrateOnly`: Executes only pre-init and DB migration phases, then exits
703    ///
704    /// # Phases (Full Mode)
705    ///
706    /// 1. Pre-init (system modules only)
707    /// 2. DB migration (all modules with database capability)
708    /// 3. Init (all modules)
709    /// 4. Post-init (system modules only)
710    /// 5. REST (modules with REST capability)
711    /// 6. gRPC (modules with gRPC capability)
712    /// 7. Start (runnable modules)
713    /// 8. `OoP` spawn (out-of-process modules)
714    /// 9. Wait for cancellation
715    /// 10. Stop (runnable modules in reverse order)
716    async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
717        // Log execution mode
718        match mode {
719            RunMode::Full => {
720                tracing::info!("Running full lifecycle (all phases)");
721            }
722            RunMode::MigrateOnly => {
723                tracing::info!("Running in migration mode (pre-init + db phases only)");
724            }
725        }
726
727        // 1. Pre-init phase (before init, only for system modules)
728        self.run_pre_init_phase()?;
729
730        // 2. DB migration phase (system modules first)
731        #[cfg(feature = "db")]
732        {
733            self.run_db_phase().await?;
734        }
735        #[cfg(not(feature = "db"))]
736        {
737            // No DB integration in this build.
738        }
739
740        // Exit early if running in migration-only mode
741        if mode == RunMode::MigrateOnly {
742            tracing::info!("Migration phases completed successfully");
743            return Ok(());
744        }
745
746        // 3. Init phase (system modules first)
747        self.run_init_phase().await?;
748
749        // 4. Post-init phase (barrier after ALL init; system modules only)
750        self.run_post_init_phase().await?;
751
752        // 5. REST phase (synchronous router composition)
753        let _router = self.run_rest_phase().await?;
754
755        // 6. gRPC registration phase
756        self.run_grpc_phase().await?;
757
758        // 7. Start phase
759        self.run_start_phase().await?;
760
761        // 8. OoP spawn phase (after grpc_hub is running)
762        self.run_oop_spawn_phase().await?;
763
764        // 9. Wait for cancellation
765        self.cancel.cancelled().await;
766
767        // 10. Stop phase
768        self.run_stop_phase().await?;
769
770        Ok(())
771    }
772}
773
774#[cfg(test)]
775#[cfg_attr(coverage_nightly, coverage(off))]
776mod tests {
777    use super::*;
778    use crate::context::ModuleCtx;
779    use crate::contracts::{Module, RunnableCapability, SystemCapability};
780    use crate::registry::RegistryBuilder;
781    use std::sync::Arc;
782    use std::sync::atomic::{AtomicUsize, Ordering};
783    use tokio::sync::Mutex;
784
785    #[derive(Default)]
786    #[allow(dead_code)]
787    struct DummyCore;
788    #[async_trait::async_trait]
789    impl Module for DummyCore {
790        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
791            Ok(())
792        }
793    }
794
795    struct StopOrderTracker {
796        my_order: usize,
797        stop_order: Arc<AtomicUsize>,
798    }
799
800    impl StopOrderTracker {
801        fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
802            let my_order = counter.fetch_add(1, Ordering::SeqCst);
803            Self {
804                my_order,
805                stop_order,
806            }
807        }
808    }
809
810    #[async_trait::async_trait]
811    impl Module for StopOrderTracker {
812        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
813            Ok(())
814        }
815    }
816
817    #[async_trait::async_trait]
818    impl RunnableCapability for StopOrderTracker {
819        async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
820            Ok(())
821        }
822        async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
823            let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
824            tracing::info!(
825                my_order = self.my_order,
826                stop_order = order,
827                "Module stopped"
828            );
829            Ok(())
830        }
831    }
832
833    #[tokio::test]
834    async fn test_stop_phase_reverse_order() {
835        let counter = Arc::new(AtomicUsize::new(0));
836        let stop_order = Arc::new(AtomicUsize::new(0));
837
838        let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
839        let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
840        let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
841
842        let mut builder = RegistryBuilder::default();
843        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
844        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
845        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
846
847        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
848        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
849        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
850
851        let registry = builder.build_topo_sorted().unwrap();
852
853        // Verify module order is a -> b -> c
854        let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
855        assert_eq!(module_names, vec!["a", "b", "c"]);
856
857        let client_hub = Arc::new(ClientHub::new());
858        let cancel = CancellationToken::new();
859        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
860
861        let runtime = HostRuntime::new(
862            registry,
863            config_provider,
864            DbOptions::None,
865            client_hub,
866            cancel.clone(),
867            Uuid::new_v4(),
868            None,
869        );
870
871        // Run stop phase
872        runtime.run_stop_phase().await.unwrap();
873
874        // Verify modules stopped in reverse order: c (stop_order=0), b (stop_order=1), a (stop_order=2)
875        // Module order is: a=0, b=1, c=2
876        // Stop order should be: c=0, b=1, a=2
877        assert_eq!(stop_order.load(Ordering::SeqCst), 3);
878    }
879
880    #[tokio::test]
881    async fn test_stop_phase_continues_on_error() {
882        struct FailingModule {
883            should_fail: bool,
884            stopped: Arc<AtomicUsize>,
885        }
886
887        #[async_trait::async_trait]
888        impl Module for FailingModule {
889            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
890                Ok(())
891            }
892        }
893
894        #[async_trait::async_trait]
895        impl RunnableCapability for FailingModule {
896            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
897                Ok(())
898            }
899            async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
900                self.stopped.fetch_add(1, Ordering::SeqCst);
901                if self.should_fail {
902                    anyhow::bail!("Intentional failure")
903                }
904                Ok(())
905            }
906        }
907
908        let stopped = Arc::new(AtomicUsize::new(0));
909        let module_a = Arc::new(FailingModule {
910            should_fail: false,
911            stopped: stopped.clone(),
912        });
913        let module_b = Arc::new(FailingModule {
914            should_fail: true,
915            stopped: stopped.clone(),
916        });
917        let module_c = Arc::new(FailingModule {
918            should_fail: false,
919            stopped: stopped.clone(),
920        });
921
922        let mut builder = RegistryBuilder::default();
923        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
924        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
925        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
926
927        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
928        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
929        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
930
931        let registry = builder.build_topo_sorted().unwrap();
932
933        let client_hub = Arc::new(ClientHub::new());
934        let cancel = CancellationToken::new();
935        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
936
937        let runtime = HostRuntime::new(
938            registry,
939            config_provider,
940            DbOptions::None,
941            client_hub,
942            cancel.clone(),
943            Uuid::new_v4(),
944            None,
945        );
946
947        // Run stop phase - should not fail even though module_b fails
948        runtime.run_stop_phase().await.unwrap();
949
950        // All modules should have attempted to stop
951        assert_eq!(stopped.load(Ordering::SeqCst), 3);
952    }
953
954    struct EmptyConfigProvider;
955    impl ConfigProvider for EmptyConfigProvider {
956        fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
957            None
958        }
959    }
960
961    #[tokio::test]
962    async fn test_post_init_runs_after_all_init_and_system_first() {
963        #[derive(Clone)]
964        struct TrackHooks {
965            name: &'static str,
966            events: Arc<Mutex<Vec<String>>>,
967        }
968
969        #[async_trait::async_trait]
970        impl Module for TrackHooks {
971            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
972                self.events.lock().await.push(format!("init:{}", self.name));
973                Ok(())
974            }
975        }
976
977        #[async_trait::async_trait]
978        impl SystemCapability for TrackHooks {
979            fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
980                Ok(())
981            }
982
983            async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
984                self.events
985                    .lock()
986                    .await
987                    .push(format!("post_init:{}", self.name));
988                Ok(())
989            }
990        }
991
992        let events = Arc::new(Mutex::new(Vec::<String>::new()));
993        let sys_a = Arc::new(TrackHooks {
994            name: "sys_a",
995            events: events.clone(),
996        });
997        let user_b = Arc::new(TrackHooks {
998            name: "user_b",
999            events: events.clone(),
1000        });
1001        let user_c = Arc::new(TrackHooks {
1002            name: "user_c",
1003            events: events.clone(),
1004        });
1005
1006        let mut builder = RegistryBuilder::default();
1007        builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1008        builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1009        builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1010        builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1011
1012        let registry = builder.build_topo_sorted().unwrap();
1013
1014        let client_hub = Arc::new(ClientHub::new());
1015        let cancel = CancellationToken::new();
1016        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1017
1018        let runtime = HostRuntime::new(
1019            registry,
1020            config_provider,
1021            DbOptions::None,
1022            client_hub,
1023            cancel,
1024            Uuid::new_v4(),
1025            None,
1026        );
1027
1028        // Run init phase for all modules, then post_init as a separate barrier phase.
1029        runtime.run_init_phase().await.unwrap();
1030        runtime.run_post_init_phase().await.unwrap();
1031
1032        let events = events.lock().await.clone();
1033        let first_post_init = events
1034            .iter()
1035            .position(|e| e.starts_with("post_init:"))
1036            .expect("expected post_init events");
1037        assert!(
1038            events[..first_post_init]
1039                .iter()
1040                .all(|e| e.starts_with("init:")),
1041            "expected all init events before post_init, got: {events:?}"
1042        );
1043
1044        // system-first order within each phase
1045        assert_eq!(
1046            events,
1047            vec![
1048                "init:sys_a",
1049                "init:user_b",
1050                "init:user_c",
1051                "post_init:sys_a",
1052            ]
1053        );
1054    }
1055}