Skip to main content

modkit/runtime/
host_runtime.rs

1//! Host Runtime - orchestrates the full `ModKit` lifecycle
2//!
3//! This module contains the `HostRuntime` type that owns and coordinates
4//! the execution of all lifecycle phases.
5//!
6//! High-level phase order:
7//! - `pre_init` (system modules only)
8//! - DB migrations (modules with DB capability)
9//! - `init` (all modules)
10//! - `post_init` (system modules only; runs after *all* `init` complete)
11//! - REST wiring (modules with REST capability; requires a single REST host)
12//! - gRPC registration (modules with gRPC capability; requires a single gRPC hub)
13//! - start/stop (stateful modules)
14//! - `OoP` spawn / wait / stop (host-only orchestration)
15
16use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28    ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29    SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36/// How the runtime should provide DBs to modules.
37#[derive(Clone)]
38pub enum DbOptions {
39    /// No database integration. `ModuleCtx::db()` will be `None`, `db_required()` will error.
40    None,
41    /// Use a `DbManager` to handle database connections with Figment-based configuration.
42    #[cfg(feature = "db")]
43    Manager(Arc<modkit_db::DbManager>),
44}
45
46/// Runtime execution mode that determines which phases to run.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49    /// Run all phases and wait for shutdown signal (normal application mode).
50    Full,
51    /// Run only pre-init and DB migration phases, then exit (for cloud deployments).
52    MigrateOnly,
53}
54
55/// Environment variable name for passing directory endpoint to `OoP` modules.
56pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58/// Environment variable name for passing rendered module config to `OoP` modules.
59pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61/// `HostRuntime` owns the lifecycle orchestration for `ModKit`.
62///
63/// It encapsulates all runtime state and drives modules through the full lifecycle (see module docs).
64pub struct HostRuntime {
65    registry: ModuleRegistry,
66    ctx_builder: ModuleContextBuilder,
67    instance_id: Uuid,
68    module_manager: Arc<ModuleManager>,
69    grpc_installers: Arc<GrpcInstallerStore>,
70    #[allow(dead_code)]
71    client_hub: Arc<ClientHub>,
72    cancel: CancellationToken,
73    #[allow(dead_code)]
74    db_options: DbOptions,
75    /// `OoP` module spawn configuration and backend
76    oop_options: Option<OopSpawnOptions>,
77}
78
79impl HostRuntime {
80    /// Create a new `HostRuntime` instance.
81    ///
82    /// This prepares all runtime components but does not start any lifecycle phases.
83    pub fn new(
84        registry: ModuleRegistry,
85        modules_cfg: Arc<dyn ConfigProvider>,
86        db_options: DbOptions,
87        client_hub: Arc<ClientHub>,
88        cancel: CancellationToken,
89        instance_id: Uuid,
90        oop_options: Option<OopSpawnOptions>,
91    ) -> Self {
92        // Create runtime-owned components for system modules
93        let module_manager = Arc::new(ModuleManager::new());
94        let grpc_installers = Arc::new(GrpcInstallerStore::new());
95
96        // Build the context builder that will resolve per-module DbHandles
97        let db_manager = match &db_options {
98            #[cfg(feature = "db")]
99            DbOptions::Manager(mgr) => Some(mgr.clone()),
100            DbOptions::None => None,
101        };
102
103        let ctx_builder = ModuleContextBuilder::new(
104            instance_id,
105            modules_cfg,
106            client_hub.clone(),
107            cancel.clone(),
108            db_manager,
109        );
110
111        Self {
112            registry,
113            ctx_builder,
114            instance_id,
115            module_manager,
116            grpc_installers,
117            client_hub,
118            cancel,
119            db_options,
120            oop_options,
121        }
122    }
123
124    /// `PRE_INIT` phase: wire runtime internals into system modules.
125    ///
126    /// This phase runs before init and only for modules with the "system" capability.
127    ///
128    /// # Errors
129    /// Returns `RegistryError` if system wiring fails.
130    pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
131        tracing::info!("Phase: pre_init");
132
133        let sys_ctx = SystemContext::new(
134            self.instance_id,
135            Arc::clone(&self.module_manager),
136            Arc::clone(&self.grpc_installers),
137        );
138
139        for entry in self.registry.modules() {
140            // Check for cancellation before processing each module
141            if self.cancel.is_cancelled() {
142                tracing::warn!("Pre-init phase cancelled by signal");
143                return Err(RegistryError::Cancelled);
144            }
145
146            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
147                tracing::debug!(module = entry.name, "Running system pre_init");
148                sys_mod
149                    .pre_init(&sys_ctx)
150                    .map_err(|e| RegistryError::PreInit {
151                        module: entry.name,
152                        source: e,
153                    })?;
154            }
155        }
156
157        Ok(())
158    }
159
160    /// Helper: resolve context for a module with error mapping.
161    async fn module_context(
162        &self,
163        module_name: &'static str,
164    ) -> Result<crate::context::ModuleCtx, RegistryError> {
165        self.ctx_builder
166            .for_module(module_name)
167            .await
168            .map_err(|e| RegistryError::DbMigrate {
169                module: module_name,
170                source: e,
171            })
172    }
173
174    /// Helper: extract DB handle and module if both exist.
175    #[cfg(feature = "db")]
176    async fn db_migration_target(
177        &self,
178        module_name: &'static str,
179        ctx: &crate::context::ModuleCtx,
180        db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
181    ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
182    {
183        let Some(dbm) = db_module else {
184            return Ok(None);
185        };
186
187        // Important: DB migrations require access to the underlying `Db`, not just `DBProvider`.
188        // `ModuleCtx` intentionally exposes only `DBProvider` for better DX and to reduce mistakes.
189        // So the runtime resolves the `Db` directly from its `DbManager`.
190        let db = match &self.db_options {
191            DbOptions::None => None,
192            #[cfg(feature = "db")]
193            DbOptions::Manager(mgr) => {
194                mgr.get(module_name)
195                    .await
196                    .map_err(|e| RegistryError::DbMigrate {
197                        module: module_name,
198                        source: e.into(),
199                    })?
200            }
201        };
202
203        _ = ctx; // ctx is kept for parity/error context; DB is resolved from manager above.
204        Ok(db.map(|db| (db, dbm)))
205    }
206
207    /// Helper: run migrations for a single module using the new migration runner.
208    ///
209    /// This collects migrations from the module and executes them via the
210    /// runtime's privileged connection. Modules never see the raw connection.
211    #[cfg(feature = "db")]
212    async fn migrate_module(
213        module_name: &'static str,
214        db: &modkit_db::Db,
215        db_module: Arc<dyn crate::contracts::DatabaseCapability>,
216    ) -> Result<(), RegistryError> {
217        // Collect migrations from the module
218        let migrations = db_module.migrations();
219
220        if migrations.is_empty() {
221            tracing::debug!(module = module_name, "No migrations to run");
222            return Ok(());
223        }
224
225        tracing::debug!(
226            module = module_name,
227            count = migrations.len(),
228            "Running DB migrations"
229        );
230
231        // Execute migrations using the migration runner
232        let result =
233            modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
234                .await
235                .map_err(|e| RegistryError::DbMigrate {
236                    module: module_name,
237                    source: anyhow::Error::new(e),
238                })?;
239
240        tracing::info!(
241            module = module_name,
242            applied = result.applied,
243            skipped = result.skipped,
244            "DB migrations completed"
245        );
246
247        Ok(())
248    }
249
250    /// DB MIGRATION phase: run migrations for all modules with DB capability.
251    ///
252    /// Runs before init, with system modules processed first.
253    ///
254    /// Modules provide migrations via `DatabaseCapability::migrations()`.
255    /// The runtime executes them with a privileged connection that modules
256    /// never receive directly. Each module gets a separate migration history
257    /// table, preventing cross-module interference.
258    #[cfg(feature = "db")]
259    async fn run_db_phase(&self) -> Result<(), RegistryError> {
260        tracing::info!("Phase: db (before init)");
261
262        for entry in self.registry.modules_by_system_priority() {
263            // Check for cancellation before processing each module
264            if self.cancel.is_cancelled() {
265                tracing::warn!("DB migration phase cancelled by signal");
266                return Err(RegistryError::Cancelled);
267            }
268
269            let ctx = self.module_context(entry.name).await?;
270            let db_module = entry.caps.query::<DatabaseCap>();
271
272            match self
273                .db_migration_target(entry.name, &ctx, db_module.clone())
274                .await?
275            {
276                Some((db, dbm)) => {
277                    Self::migrate_module(entry.name, &db, dbm).await?;
278                }
279                None if db_module.is_some() => {
280                    tracing::debug!(
281                        module = entry.name,
282                        "Module has DbModule trait but no DB handle (no config)"
283                    );
284                }
285                None => {}
286            }
287        }
288
289        Ok(())
290    }
291
292    /// INIT phase: initialize all modules in topological order.
293    ///
294    /// System modules initialize first, followed by user modules.
295    async fn run_init_phase(&self) -> Result<(), RegistryError> {
296        tracing::info!("Phase: init");
297
298        for entry in self.registry.modules_by_system_priority() {
299            let ctx =
300                self.ctx_builder
301                    .for_module(entry.name)
302                    .await
303                    .map_err(|e| RegistryError::Init {
304                        module: entry.name,
305                        source: e,
306                    })?;
307            tracing::info!(module = entry.name, "Initializing a module...");
308            entry
309                .core
310                .init(&ctx)
311                .await
312                .map_err(|e| RegistryError::Init {
313                    module: entry.name,
314                    source: e,
315                })?;
316            tracing::info!(module = entry.name, "Initialized a module.");
317        }
318
319        Ok(())
320    }
321
322    /// `POST_INIT` phase: optional hook after ALL modules completed `init()`.
323    ///
324    /// This provides a global barrier between initialization-time registration
325    /// and subsequent phases that may rely on a fully-populated runtime registry.
326    ///
327    /// System modules run first, followed by user modules, preserving topo order.
328    async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
329        tracing::info!("Phase: post_init");
330
331        let sys_ctx = SystemContext::new(
332            self.instance_id,
333            Arc::clone(&self.module_manager),
334            Arc::clone(&self.grpc_installers),
335        );
336
337        for entry in self.registry.modules_by_system_priority() {
338            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
339                sys_mod
340                    .post_init(&sys_ctx)
341                    .await
342                    .map_err(|e| RegistryError::PostInit {
343                        module: entry.name,
344                        source: e,
345                    })?;
346            }
347        }
348
349        Ok(())
350    }
351
352    /// REST phase: compose the router against the REST host.
353    ///
354    /// This is a synchronous phase that builds the final Router by:
355    /// 1. Preparing the host module
356    /// 2. Registering all REST providers
357    /// 3. Finalizing with `OpenAPI` endpoints
358    async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
359        tracing::info!("Phase: rest (sync)");
360
361        let mut router = Router::new();
362
363        // Find host(s) and whether any rest modules exist
364        let host_count = self
365            .registry
366            .modules()
367            .iter()
368            .filter(|e| e.caps.has::<ApiGatewayCap>())
369            .count();
370
371        match host_count {
372            0 => {
373                return if self
374                    .registry
375                    .modules()
376                    .iter()
377                    .any(|e| e.caps.has::<RestApiCap>())
378                {
379                    Err(RegistryError::RestRequiresHost)
380                } else {
381                    Ok(router)
382                };
383            }
384            1 => { /* proceed */ }
385            _ => return Err(RegistryError::MultipleRestHosts),
386        }
387
388        // Resolve the single host entry and its module context
389        let host_idx = self
390            .registry
391            .modules()
392            .iter()
393            .position(|e| e.caps.has::<ApiGatewayCap>())
394            .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
395        let host_entry = &self.registry.modules()[host_idx];
396        let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
397            return Err(RegistryError::RestHostMissingFromEntry);
398        };
399        let host_ctx = self
400            .ctx_builder
401            .for_module(host_entry.name)
402            .await
403            .map_err(|e| RegistryError::RestPrepare {
404                module: host_entry.name,
405                source: e,
406            })?;
407
408        // use host as the registry
409        let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
410
411        // 1) Host prepare: base Router / global middlewares / basic OAS meta
412        router =
413            host.rest_prepare(&host_ctx, router)
414                .map_err(|source| RegistryError::RestPrepare {
415                    module: host_entry.name,
416                    source,
417                })?;
418
419        // 2) Register all REST providers (in the current discovery order)
420        for e in self.registry.modules() {
421            if let Some(rest) = e.caps.query::<RestApiCap>() {
422                let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
423                    RegistryError::RestRegister {
424                        module: e.name,
425                        source: err,
426                    }
427                })?;
428
429                router = rest
430                    .register_rest(&ctx, router, registry)
431                    .map_err(|source| RegistryError::RestRegister {
432                        module: e.name,
433                        source,
434                    })?;
435            }
436        }
437
438        // 3) Host finalize: attach /openapi.json and /docs, persist Router if needed (no server start)
439        router = host.rest_finalize(&host_ctx, router).map_err(|source| {
440            RegistryError::RestFinalize {
441                module: host_entry.name,
442                source,
443            }
444        })?;
445
446        Ok(router)
447    }
448
449    /// gRPC registration phase: collect services from all grpc modules.
450    ///
451    /// Services are stored in the installer store for the `grpc-hub` to consume during start.
452    async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
453        tracing::info!("Phase: grpc (registration)");
454
455        // If no grpc_hub and no grpc_services, skip the phase
456        if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
457            return Ok(());
458        }
459
460        // If there are grpc_services but no hub, that's an error
461        if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
462            return Err(RegistryError::GrpcRequiresHub);
463        }
464
465        // If there's a hub, collect all services grouped by module and hand them off to the installer store
466        if let Some(hub_name) = &self.registry.grpc_hub {
467            let mut modules_data = Vec::new();
468            let mut seen = HashSet::new();
469
470            // Collect services from all grpc modules
471            for (module_name, service_module) in &self.registry.grpc_services {
472                let ctx = self
473                    .ctx_builder
474                    .for_module(module_name)
475                    .await
476                    .map_err(|err| RegistryError::GrpcRegister {
477                        module: module_name.clone(),
478                        source: err,
479                    })?;
480
481                let installers =
482                    service_module
483                        .get_grpc_services(&ctx)
484                        .await
485                        .map_err(|source| RegistryError::GrpcRegister {
486                            module: module_name.clone(),
487                            source,
488                        })?;
489
490                for reg in &installers {
491                    if !seen.insert(reg.service_name) {
492                        return Err(RegistryError::GrpcRegister {
493                            module: module_name.clone(),
494                            source: anyhow::anyhow!(
495                                "Duplicate gRPC service name: {}",
496                                reg.service_name
497                            ),
498                        });
499                    }
500                }
501
502                modules_data.push(crate::runtime::ModuleInstallers {
503                    module_name: module_name.clone(),
504                    installers,
505                });
506            }
507
508            self.grpc_installers
509                .set(crate::runtime::GrpcInstallerData {
510                    modules: modules_data,
511                })
512                .map_err(|source| RegistryError::GrpcRegister {
513                    module: hub_name.clone(),
514                    source,
515                })?;
516        }
517
518        Ok(())
519    }
520
521    /// START phase: start all stateful modules.
522    ///
523    /// System modules start first, followed by user modules.
524    async fn run_start_phase(&self) -> Result<(), RegistryError> {
525        tracing::info!("Phase: start");
526
527        for e in self.registry.modules_by_system_priority() {
528            if let Some(s) = e.caps.query::<RunnableCap>() {
529                tracing::debug!(
530                    module = e.name,
531                    is_system = e.caps.has::<SystemCap>(),
532                    "Starting stateful module"
533                );
534                s.start(self.cancel.clone())
535                    .await
536                    .map_err(|source| RegistryError::Start {
537                        module: e.name,
538                        source,
539                    })?;
540                tracing::info!(module = e.name, "Started module");
541            }
542        }
543
544        Ok(())
545    }
546
547    /// Stop a single module, logging errors but continuing execution.
548    async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
549        if let Some(s) = entry.caps.query::<RunnableCap>() {
550            match s.stop(cancel).await {
551                Err(err) => {
552                    tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
553                }
554                _ => {
555                    tracing::info!(module = entry.name, "Stopped module");
556                }
557            }
558        }
559    }
560
561    /// STOP phase: stop all stateful modules in reverse order.
562    ///
563    /// Errors are logged but do not fail the shutdown process.
564    /// Note: `OoP` modules are stopped automatically by the backend when the
565    /// cancellation token is triggered.
566    async fn run_stop_phase(&self) -> Result<(), RegistryError> {
567        tracing::info!("Phase: stop");
568
569        for e in self.registry.modules().iter().rev() {
570            Self::stop_one_module(e, self.cancel.clone()).await;
571        }
572
573        Ok(())
574    }
575
576    /// `OoP` SPAWN phase: spawn out-of-process modules after start phase.
577    ///
578    /// This phase runs after `grpc-hub` is already listening, so we can pass
579    /// the real directory endpoint to `OoP` modules.
580    async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
581        let oop_opts = match &self.oop_options {
582            Some(opts) if !opts.modules.is_empty() => opts,
583            _ => return Ok(()),
584        };
585
586        tracing::info!("Phase: oop_spawn");
587
588        // Wait for grpc_hub to publish its endpoint (it runs async in start phase)
589        let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
590
591        for module_cfg in &oop_opts.modules {
592            // Build environment with directory endpoint and rendered config
593            // Note: User controls --config via execution.args in master config
594            let mut env = module_cfg.env.clone();
595            env.insert(
596                MODKIT_MODULE_CONFIG_ENV.to_owned(),
597                module_cfg.rendered_config_json.clone(),
598            );
599            if let Some(ref endpoint) = directory_endpoint {
600                env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
601            }
602
603            // Use args from execution config as-is (user controls --config via args)
604            let args = module_cfg.args.clone();
605
606            let spawn_config = OopSpawnConfig {
607                module_name: module_cfg.module_name.clone(),
608                binary: module_cfg.binary.clone(),
609                args,
610                env,
611                working_directory: module_cfg.working_directory.clone(),
612            };
613
614            oop_opts
615                .backend
616                .spawn(spawn_config)
617                .await
618                .map_err(|e| RegistryError::OopSpawn {
619                    module: module_cfg.module_name.clone(),
620                    source: e,
621                })?;
622
623            tracing::info!(
624                module = %module_cfg.module_name,
625                directory_endpoint = ?directory_endpoint,
626                "Spawned OoP module via backend"
627            );
628        }
629
630        Ok(())
631    }
632
633    /// Wait for `grpc-hub` to publish its bound endpoint.
634    ///
635    /// Polls the `GrpcHubModule::bound_endpoint()` with a short interval until available or timeout.
636    /// Returns None if no `grpc-hub` is running or if it times out.
637    async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
638        const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
639        const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
640
641        // Find grpc_hub in registry
642        let grpc_hub = self
643            .registry
644            .modules()
645            .iter()
646            .find_map(|e| e.caps.query::<GrpcHubCap>());
647
648        let Some(hub) = grpc_hub else {
649            return None; // No grpc_hub registered
650        };
651
652        let start = std::time::Instant::now();
653
654        loop {
655            if let Some(endpoint) = hub.bound_endpoint() {
656                tracing::debug!(
657                    endpoint = %endpoint,
658                    elapsed_ms = start.elapsed().as_millis(),
659                    "gRPC hub endpoint available"
660                );
661                return Some(endpoint);
662            }
663
664            if start.elapsed() > MAX_WAIT {
665                tracing::warn!("Timed out waiting for gRPC hub to bind");
666                return None;
667            }
668
669            tokio::time::sleep(POLL_INTERVAL).await;
670        }
671    }
672
673    /// Run the full module lifecycle (all phases).
674    ///
675    /// This is the standard entry point for normal application execution.
676    /// It runs all phases from pre-init through shutdown.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if any module phase fails during execution.
681    pub async fn run_module_phases(self) -> anyhow::Result<()> {
682        self.run_phases_internal(RunMode::Full).await
683    }
684
685    /// Run only the migration phases (pre-init + DB migration).
686    ///
687    /// This is designed for cloud deployment workflows where database migrations
688    /// need to run as a separate step before starting the application.
689    /// The process exits after migrations complete.
690    ///
691    /// # Errors
692    ///
693    /// Returns an error if pre-init or migration phases fail.
694    pub async fn run_migration_phases(self) -> anyhow::Result<()> {
695        self.run_phases_internal(RunMode::MigrateOnly).await
696    }
697
698    /// Internal implementation that runs module phases based on the mode.
699    ///
700    /// This private method contains the actual phase execution logic and is called
701    /// by both `run_module_phases()` and `run_migration_phases()`.
702    ///
703    /// # Modes
704    ///
705    /// - `RunMode::Full`: Executes all phases and waits for shutdown signal
706    /// - `RunMode::MigrateOnly`: Executes only pre-init and DB migration phases, then exits
707    ///
708    /// # Phases (Full Mode)
709    ///
710    /// 1. Pre-init (system modules only)
711    /// 2. DB migration (all modules with database capability)
712    /// 3. Init (all modules)
713    /// 4. Post-init (system modules only)
714    /// 5. REST (modules with REST capability)
715    /// 6. gRPC (modules with gRPC capability)
716    /// 7. Start (runnable modules)
717    /// 8. `OoP` spawn (out-of-process modules)
718    /// 9. Wait for cancellation
719    /// 10. Stop (runnable modules in reverse order)
720    async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
721        // Log execution mode
722        match mode {
723            RunMode::Full => {
724                tracing::info!("Running full lifecycle (all phases)");
725            }
726            RunMode::MigrateOnly => {
727                tracing::info!("Running in migration mode (pre-init + db phases only)");
728            }
729        }
730
731        // 1. Pre-init phase (before init, only for system modules)
732        self.run_pre_init_phase()?;
733
734        // 2. DB migration phase (system modules first)
735        #[cfg(feature = "db")]
736        {
737            self.run_db_phase().await?;
738        }
739        #[cfg(not(feature = "db"))]
740        {
741            // No DB integration in this build.
742        }
743
744        // Exit early if running in migration-only mode
745        if mode == RunMode::MigrateOnly {
746            tracing::info!("Migration phases completed successfully");
747            return Ok(());
748        }
749
750        // 3. Init phase (system modules first)
751        self.run_init_phase().await?;
752
753        // 4. Post-init phase (barrier after ALL init; system modules only)
754        self.run_post_init_phase().await?;
755
756        // 5. REST phase (synchronous router composition)
757        let _router = self.run_rest_phase().await?;
758
759        // 6. gRPC registration phase
760        self.run_grpc_phase().await?;
761
762        // 7. Start phase
763        self.run_start_phase().await?;
764
765        // 8. OoP spawn phase (after grpc_hub is running)
766        self.run_oop_spawn_phase().await?;
767
768        // 9. Wait for cancellation
769        self.cancel.cancelled().await;
770
771        // 10. Stop phase
772        self.run_stop_phase().await?;
773
774        Ok(())
775    }
776}
777
778#[cfg(test)]
779#[cfg_attr(coverage_nightly, coverage(off))]
780mod tests {
781    use super::*;
782    use crate::context::ModuleCtx;
783    use crate::contracts::{Module, RunnableCapability, SystemCapability};
784    use crate::registry::RegistryBuilder;
785    use std::sync::Arc;
786    use std::sync::atomic::{AtomicUsize, Ordering};
787    use tokio::sync::Mutex;
788
789    #[derive(Default)]
790    #[allow(dead_code)]
791    struct DummyCore;
792    #[async_trait::async_trait]
793    impl Module for DummyCore {
794        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
795            Ok(())
796        }
797    }
798
799    struct StopOrderTracker {
800        my_order: usize,
801        stop_order: Arc<AtomicUsize>,
802    }
803
804    impl StopOrderTracker {
805        fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
806            let my_order = counter.fetch_add(1, Ordering::SeqCst);
807            Self {
808                my_order,
809                stop_order,
810            }
811        }
812    }
813
814    #[async_trait::async_trait]
815    impl Module for StopOrderTracker {
816        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
817            Ok(())
818        }
819    }
820
821    #[async_trait::async_trait]
822    impl RunnableCapability for StopOrderTracker {
823        async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
824            Ok(())
825        }
826        async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
827            let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
828            tracing::info!(
829                my_order = self.my_order,
830                stop_order = order,
831                "Module stopped"
832            );
833            Ok(())
834        }
835    }
836
837    #[tokio::test]
838    async fn test_stop_phase_reverse_order() {
839        let counter = Arc::new(AtomicUsize::new(0));
840        let stop_order = Arc::new(AtomicUsize::new(0));
841
842        let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
843        let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
844        let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
845
846        let mut builder = RegistryBuilder::default();
847        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
848        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
849        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
850
851        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
852        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
853        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
854
855        let registry = builder.build_topo_sorted().unwrap();
856
857        // Verify module order is a -> b -> c
858        let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
859        assert_eq!(module_names, vec!["a", "b", "c"]);
860
861        let client_hub = Arc::new(ClientHub::new());
862        let cancel = CancellationToken::new();
863        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
864
865        let runtime = HostRuntime::new(
866            registry,
867            config_provider,
868            DbOptions::None,
869            client_hub,
870            cancel.clone(),
871            Uuid::new_v4(),
872            None,
873        );
874
875        // Run stop phase
876        runtime.run_stop_phase().await.unwrap();
877
878        // Verify modules stopped in reverse order: c (stop_order=0), b (stop_order=1), a (stop_order=2)
879        // Module order is: a=0, b=1, c=2
880        // Stop order should be: c=0, b=1, a=2
881        assert_eq!(stop_order.load(Ordering::SeqCst), 3);
882    }
883
884    #[tokio::test]
885    async fn test_stop_phase_continues_on_error() {
886        struct FailingModule {
887            should_fail: bool,
888            stopped: Arc<AtomicUsize>,
889        }
890
891        #[async_trait::async_trait]
892        impl Module for FailingModule {
893            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
894                Ok(())
895            }
896        }
897
898        #[async_trait::async_trait]
899        impl RunnableCapability for FailingModule {
900            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
901                Ok(())
902            }
903            async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
904                self.stopped.fetch_add(1, Ordering::SeqCst);
905                if self.should_fail {
906                    anyhow::bail!("Intentional failure")
907                }
908                Ok(())
909            }
910        }
911
912        let stopped = Arc::new(AtomicUsize::new(0));
913        let module_a = Arc::new(FailingModule {
914            should_fail: false,
915            stopped: stopped.clone(),
916        });
917        let module_b = Arc::new(FailingModule {
918            should_fail: true,
919            stopped: stopped.clone(),
920        });
921        let module_c = Arc::new(FailingModule {
922            should_fail: false,
923            stopped: stopped.clone(),
924        });
925
926        let mut builder = RegistryBuilder::default();
927        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
928        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
929        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
930
931        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
932        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
933        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
934
935        let registry = builder.build_topo_sorted().unwrap();
936
937        let client_hub = Arc::new(ClientHub::new());
938        let cancel = CancellationToken::new();
939        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
940
941        let runtime = HostRuntime::new(
942            registry,
943            config_provider,
944            DbOptions::None,
945            client_hub,
946            cancel.clone(),
947            Uuid::new_v4(),
948            None,
949        );
950
951        // Run stop phase - should not fail even though module_b fails
952        runtime.run_stop_phase().await.unwrap();
953
954        // All modules should have attempted to stop
955        assert_eq!(stopped.load(Ordering::SeqCst), 3);
956    }
957
958    struct EmptyConfigProvider;
959    impl ConfigProvider for EmptyConfigProvider {
960        fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
961            None
962        }
963    }
964
965    #[tokio::test]
966    async fn test_post_init_runs_after_all_init_and_system_first() {
967        #[derive(Clone)]
968        struct TrackHooks {
969            name: &'static str,
970            events: Arc<Mutex<Vec<String>>>,
971        }
972
973        #[async_trait::async_trait]
974        impl Module for TrackHooks {
975            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
976                self.events.lock().await.push(format!("init:{}", self.name));
977                Ok(())
978            }
979        }
980
981        #[async_trait::async_trait]
982        impl SystemCapability for TrackHooks {
983            fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
984                Ok(())
985            }
986
987            async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
988                self.events
989                    .lock()
990                    .await
991                    .push(format!("post_init:{}", self.name));
992                Ok(())
993            }
994        }
995
996        let events = Arc::new(Mutex::new(Vec::<String>::new()));
997        let sys_a = Arc::new(TrackHooks {
998            name: "sys_a",
999            events: events.clone(),
1000        });
1001        let user_b = Arc::new(TrackHooks {
1002            name: "user_b",
1003            events: events.clone(),
1004        });
1005        let user_c = Arc::new(TrackHooks {
1006            name: "user_c",
1007            events: events.clone(),
1008        });
1009
1010        let mut builder = RegistryBuilder::default();
1011        builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1012        builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1013        builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1014        builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1015
1016        let registry = builder.build_topo_sorted().unwrap();
1017
1018        let client_hub = Arc::new(ClientHub::new());
1019        let cancel = CancellationToken::new();
1020        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1021
1022        let runtime = HostRuntime::new(
1023            registry,
1024            config_provider,
1025            DbOptions::None,
1026            client_hub,
1027            cancel,
1028            Uuid::new_v4(),
1029            None,
1030        );
1031
1032        // Run init phase for all modules, then post_init as a separate barrier phase.
1033        runtime.run_init_phase().await.unwrap();
1034        runtime.run_post_init_phase().await.unwrap();
1035
1036        let events = events.lock().await.clone();
1037        let first_post_init = events
1038            .iter()
1039            .position(|e| e.starts_with("post_init:"))
1040            .expect("expected post_init events");
1041        assert!(
1042            events[..first_post_init]
1043                .iter()
1044                .all(|e| e.starts_with("init:")),
1045            "expected all init events before post_init, got: {events:?}"
1046        );
1047
1048        // system-first order within each phase
1049        assert_eq!(
1050            events,
1051            vec![
1052                "init:sys_a",
1053                "init:user_b",
1054                "init:user_c",
1055                "post_init:sys_a",
1056            ]
1057        );
1058    }
1059}