Skip to main content

modkit/runtime/
host_runtime.rs

1//! Host Runtime - orchestrates the full `ModKit` lifecycle
2//!
3//! This module contains the `HostRuntime` type that owns and coordinates
4//! the execution of all lifecycle phases.
5//!
6//! High-level phase order:
7//! - `pre_init` (system modules only)
8//! - DB migrations (modules with DB capability)
9//! - `init` (all modules)
10//! - `post_init` (system modules only; runs after *all* `init` complete)
11//! - REST wiring (modules with REST capability; requires a single REST host)
12//! - gRPC registration (modules with gRPC capability; requires a single gRPC hub)
13//! - start/stop (stateful modules)
14//! - `OoP` spawn / wait / stop (host-only orchestration)
15
16use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28    ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29    SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36/// How the runtime should provide DBs to modules.
37#[derive(Clone)]
38pub enum DbOptions {
39    /// No database integration. `ModuleCtx::db()` will be `None`, `db_required()` will error.
40    None,
41    /// Use a `DbManager` to handle database connections with Figment-based configuration.
42    #[cfg(feature = "db")]
43    Manager(Arc<modkit_db::DbManager>),
44}
45
46/// Runtime execution mode that determines which phases to run.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49    /// Run all phases and wait for shutdown signal (normal application mode).
50    Full,
51    /// Run only pre-init and DB migration phases, then exit (for cloud deployments).
52    MigrateOnly,
53}
54
55/// Environment variable name for passing directory endpoint to `OoP` modules.
56pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58/// Environment variable name for passing rendered module config to `OoP` modules.
59pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61/// `HostRuntime` owns the lifecycle orchestration for `ModKit`.
62///
63/// It encapsulates all runtime state and drives modules through the full lifecycle (see module docs).
64pub struct HostRuntime {
65    registry: ModuleRegistry,
66    ctx_builder: ModuleContextBuilder,
67    instance_id: Uuid,
68    module_manager: Arc<ModuleManager>,
69    grpc_installers: Arc<GrpcInstallerStore>,
70    #[allow(dead_code)]
71    client_hub: Arc<ClientHub>,
72    cancel: CancellationToken,
73    #[allow(dead_code)]
74    db_options: DbOptions,
75    /// `OoP` module spawn configuration and backend
76    oop_options: Option<OopSpawnOptions>,
77}
78
79impl HostRuntime {
80    /// Create a new `HostRuntime` instance.
81    ///
82    /// This prepares all runtime components but does not start any lifecycle phases.
83    pub fn new(
84        registry: ModuleRegistry,
85        modules_cfg: Arc<dyn ConfigProvider>,
86        db_options: DbOptions,
87        client_hub: Arc<ClientHub>,
88        cancel: CancellationToken,
89        instance_id: Uuid,
90        oop_options: Option<OopSpawnOptions>,
91    ) -> Self {
92        // Create runtime-owned components for system modules
93        let module_manager = Arc::new(ModuleManager::new());
94        let grpc_installers = Arc::new(GrpcInstallerStore::new());
95
96        // Build the context builder that will resolve per-module DbHandles
97        let db_manager = match &db_options {
98            #[cfg(feature = "db")]
99            DbOptions::Manager(mgr) => Some(mgr.clone()),
100            DbOptions::None => None,
101        };
102
103        let ctx_builder = ModuleContextBuilder::new(
104            instance_id,
105            modules_cfg,
106            client_hub.clone(),
107            cancel.clone(),
108            db_manager,
109        );
110
111        Self {
112            registry,
113            ctx_builder,
114            instance_id,
115            module_manager,
116            grpc_installers,
117            client_hub,
118            cancel,
119            db_options,
120            oop_options,
121        }
122    }
123
124    /// `PRE_INIT` phase: wire runtime internals into system modules.
125    ///
126    /// This phase runs before init and only for modules with the "system" capability.
127    ///
128    /// # Errors
129    /// Returns `RegistryError` if system wiring fails.
130    pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
131        tracing::info!("Phase: pre_init");
132
133        let sys_ctx = SystemContext::new(
134            self.instance_id,
135            Arc::clone(&self.module_manager),
136            Arc::clone(&self.grpc_installers),
137        );
138
139        for entry in self.registry.modules() {
140            // Check for cancellation before processing each module
141            if self.cancel.is_cancelled() {
142                tracing::warn!("Pre-init phase cancelled by signal");
143                return Err(RegistryError::Cancelled);
144            }
145
146            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
147                tracing::debug!(module = entry.name, "Running system pre_init");
148                sys_mod
149                    .pre_init(&sys_ctx)
150                    .map_err(|e| RegistryError::PreInit {
151                        module: entry.name,
152                        source: e,
153                    })?;
154            }
155        }
156
157        Ok(())
158    }
159
160    /// Helper: resolve context for a module with error mapping.
161    async fn module_context(
162        &self,
163        module_name: &'static str,
164    ) -> Result<crate::context::ModuleCtx, RegistryError> {
165        self.ctx_builder
166            .for_module(module_name)
167            .await
168            .map_err(|e| RegistryError::DbMigrate {
169                module: module_name,
170                source: e,
171            })
172    }
173
174    /// Helper: extract DB handle and module if both exist.
175    #[cfg(feature = "db")]
176    async fn db_migration_target(
177        &self,
178        module_name: &'static str,
179        ctx: &crate::context::ModuleCtx,
180        db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
181    ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
182    {
183        let Some(dbm) = db_module else {
184            return Ok(None);
185        };
186
187        // Important: DB migrations require access to the underlying `Db`, not just `DBProvider`.
188        // `ModuleCtx` intentionally exposes only `DBProvider` for better DX and to reduce mistakes.
189        // So the runtime resolves the `Db` directly from its `DbManager`.
190        let db = match &self.db_options {
191            DbOptions::None => None,
192            #[cfg(feature = "db")]
193            DbOptions::Manager(mgr) => {
194                mgr.get(module_name)
195                    .await
196                    .map_err(|e| RegistryError::DbMigrate {
197                        module: module_name,
198                        source: e.into(),
199                    })?
200            }
201        };
202
203        _ = ctx; // ctx is kept for parity/error context; DB is resolved from manager above.
204        Ok(db.map(|db| (db, dbm)))
205    }
206
207    /// Helper: run migrations for a single module using the new migration runner.
208    ///
209    /// This collects migrations from the module and executes them via the
210    /// runtime's privileged connection. Modules never see the raw connection.
211    #[cfg(feature = "db")]
212    async fn migrate_module(
213        module_name: &'static str,
214        db: &modkit_db::Db,
215        db_module: Arc<dyn crate::contracts::DatabaseCapability>,
216    ) -> Result<(), RegistryError> {
217        // Collect migrations from the module
218        let migrations = db_module.migrations();
219
220        if migrations.is_empty() {
221            tracing::debug!(module = module_name, "No migrations to run");
222            return Ok(());
223        }
224
225        tracing::debug!(
226            module = module_name,
227            count = migrations.len(),
228            "Running DB migrations"
229        );
230
231        // Execute migrations using the migration runner
232        let result =
233            modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
234                .await
235                .map_err(|e| RegistryError::DbMigrate {
236                    module: module_name,
237                    source: anyhow::Error::new(e),
238                })?;
239
240        tracing::info!(
241            module = module_name,
242            applied = result.applied,
243            skipped = result.skipped,
244            "DB migrations completed"
245        );
246
247        Ok(())
248    }
249
250    /// DB MIGRATION phase: run migrations for all modules with DB capability.
251    ///
252    /// Runs before init, with system modules processed first.
253    ///
254    /// Modules provide migrations via `DatabaseCapability::migrations()`.
255    /// The runtime executes them with a privileged connection that modules
256    /// never receive directly. Each module gets a separate migration history
257    /// table, preventing cross-module interference.
258    #[cfg(feature = "db")]
259    async fn run_db_phase(&self) -> Result<(), RegistryError> {
260        tracing::info!("Phase: db (before init)");
261
262        for entry in self.registry.modules_by_system_priority() {
263            // Check for cancellation before processing each module
264            if self.cancel.is_cancelled() {
265                tracing::warn!("DB migration phase cancelled by signal");
266                return Err(RegistryError::Cancelled);
267            }
268
269            let ctx = self.module_context(entry.name).await?;
270            let db_module = entry.caps.query::<DatabaseCap>();
271
272            match self
273                .db_migration_target(entry.name, &ctx, db_module.clone())
274                .await?
275            {
276                Some((db, dbm)) => {
277                    Self::migrate_module(entry.name, &db, dbm).await?;
278                }
279                None if db_module.is_some() => {
280                    tracing::debug!(
281                        module = entry.name,
282                        "Module has DbModule trait but no DB handle (no config)"
283                    );
284                }
285                None => {}
286            }
287        }
288
289        Ok(())
290    }
291
292    /// INIT phase: initialize all modules in topological order.
293    ///
294    /// System modules initialize first, followed by user modules.
295    async fn run_init_phase(&self) -> Result<(), RegistryError> {
296        tracing::info!("Phase: init");
297
298        for entry in self.registry.modules_by_system_priority() {
299            let ctx =
300                self.ctx_builder
301                    .for_module(entry.name)
302                    .await
303                    .map_err(|e| RegistryError::Init {
304                        module: entry.name,
305                        source: e,
306                    })?;
307            tracing::info!(module = entry.name, "Initializing a module...");
308            entry
309                .core
310                .init(&ctx)
311                .await
312                .map_err(|e| RegistryError::Init {
313                    module: entry.name,
314                    source: e,
315                })?;
316            tracing::info!(module = entry.name, "Initialized a module.");
317        }
318
319        Ok(())
320    }
321
322    /// `POST_INIT` phase: optional hook after ALL modules completed `init()`.
323    ///
324    /// This provides a global barrier between initialization-time registration
325    /// and subsequent phases that may rely on a fully-populated runtime registry.
326    ///
327    /// System modules run first, followed by user modules, preserving topo order.
328    async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
329        tracing::info!("Phase: post_init");
330
331        let sys_ctx = SystemContext::new(
332            self.instance_id,
333            Arc::clone(&self.module_manager),
334            Arc::clone(&self.grpc_installers),
335        );
336
337        for entry in self.registry.modules_by_system_priority() {
338            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
339                sys_mod
340                    .post_init(&sys_ctx)
341                    .await
342                    .map_err(|e| RegistryError::PostInit {
343                        module: entry.name,
344                        source: e,
345                    })?;
346            }
347        }
348
349        Ok(())
350    }
351
352    /// REST phase: compose the router against the REST host.
353    ///
354    /// This is a synchronous phase that builds the final Router by:
355    /// 1. Preparing the host module
356    /// 2. Registering all REST providers
357    /// 3. Finalizing with `OpenAPI` endpoints
358    async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
359        tracing::info!("Phase: rest (sync)");
360
361        let mut router = Router::new();
362
363        // Find host(s) and whether any rest modules exist
364        let host_count = self
365            .registry
366            .modules()
367            .iter()
368            .filter(|e| e.caps.has::<ApiGatewayCap>())
369            .count();
370
371        match host_count {
372            0 => {
373                return if self
374                    .registry
375                    .modules()
376                    .iter()
377                    .any(|e| e.caps.has::<RestApiCap>())
378                {
379                    Err(RegistryError::RestRequiresHost)
380                } else {
381                    Ok(router)
382                };
383            }
384            1 => { /* proceed */ }
385            _ => return Err(RegistryError::MultipleRestHosts),
386        }
387
388        // Resolve the single host entry and its module context
389        let host_idx = self
390            .registry
391            .modules()
392            .iter()
393            .position(|e| e.caps.has::<ApiGatewayCap>())
394            .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
395        let host_entry = &self.registry.modules()[host_idx];
396        let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
397            return Err(RegistryError::RestHostMissingFromEntry);
398        };
399        let host_ctx = self
400            .ctx_builder
401            .for_module(host_entry.name)
402            .await
403            .map_err(|e| RegistryError::RestPrepare {
404                module: host_entry.name,
405                source: e,
406            })?;
407
408        // use host as the registry
409        let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
410
411        // 1) Host prepare: base Router / global middlewares / basic OAS meta
412        router =
413            host.rest_prepare(&host_ctx, router)
414                .map_err(|source| RegistryError::RestPrepare {
415                    module: host_entry.name,
416                    source,
417                })?;
418
419        // 2) Register all REST providers (in the current discovery order)
420        for e in self.registry.modules() {
421            if let Some(rest) = e.caps.query::<RestApiCap>() {
422                let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
423                    RegistryError::RestRegister {
424                        module: e.name,
425                        source: err,
426                    }
427                })?;
428
429                router = rest
430                    .register_rest(&ctx, router, registry)
431                    .map_err(|source| RegistryError::RestRegister {
432                        module: e.name,
433                        source,
434                    })?;
435            }
436        }
437
438        // 3) Host finalize: attach /openapi.json and /docs, persist Router if needed (no server start)
439        router = host.rest_finalize(&host_ctx, router).map_err(|source| {
440            RegistryError::RestFinalize {
441                module: host_entry.name,
442                source,
443            }
444        })?;
445
446        Ok(router)
447    }
448
449    /// gRPC registration phase: collect services from all grpc modules.
450    ///
451    /// Services are stored in the installer store for the `grpc-hub` to consume during start.
452    async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
453        tracing::info!("Phase: grpc (registration)");
454
455        // If no grpc_hub and no grpc_services, skip the phase
456        if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
457            return Ok(());
458        }
459
460        // If there are grpc_services but no hub, that's an error
461        if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
462            return Err(RegistryError::GrpcRequiresHub);
463        }
464
465        // If there's a hub, collect all services grouped by module and hand them off to the installer store
466        if let Some(hub_name) = &self.registry.grpc_hub {
467            let mut modules_data = Vec::new();
468            let mut seen = HashSet::new();
469
470            // Collect services from all grpc modules
471            for (module_name, service_module) in &self.registry.grpc_services {
472                let ctx = self
473                    .ctx_builder
474                    .for_module(module_name)
475                    .await
476                    .map_err(|err| RegistryError::GrpcRegister {
477                        module: module_name.clone(),
478                        source: err,
479                    })?;
480
481                let installers =
482                    service_module
483                        .get_grpc_services(&ctx)
484                        .await
485                        .map_err(|source| RegistryError::GrpcRegister {
486                            module: module_name.clone(),
487                            source,
488                        })?;
489
490                for reg in &installers {
491                    if !seen.insert(reg.service_name) {
492                        return Err(RegistryError::GrpcRegister {
493                            module: module_name.clone(),
494                            source: anyhow::anyhow!(
495                                "Duplicate gRPC service name: {}",
496                                reg.service_name
497                            ),
498                        });
499                    }
500                }
501
502                modules_data.push(crate::runtime::ModuleInstallers {
503                    module_name: module_name.clone(),
504                    installers,
505                });
506            }
507
508            self.grpc_installers
509                .set(crate::runtime::GrpcInstallerData {
510                    modules: modules_data,
511                })
512                .map_err(|source| RegistryError::GrpcRegister {
513                    module: hub_name.clone(),
514                    source,
515                })?;
516        }
517
518        Ok(())
519    }
520
521    /// START phase: start all stateful modules.
522    ///
523    /// System modules start first, followed by user modules.
524    async fn run_start_phase(&self) -> Result<(), RegistryError> {
525        tracing::info!("Phase: start");
526
527        for e in self.registry.modules_by_system_priority() {
528            if let Some(s) = e.caps.query::<RunnableCap>() {
529                tracing::debug!(
530                    module = e.name,
531                    is_system = e.caps.has::<SystemCap>(),
532                    "Starting stateful module"
533                );
534                s.start(self.cancel.clone())
535                    .await
536                    .map_err(|source| RegistryError::Start {
537                        module: e.name,
538                        source,
539                    })?;
540                tracing::info!(module = e.name, "Started module");
541            }
542        }
543
544        Ok(())
545    }
546
547    /// Stop a single module, logging errors but continuing execution.
548    async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
549        if let Some(s) = entry.caps.query::<RunnableCap>() {
550            match s.stop(cancel).await {
551                Err(err) => {
552                    tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
553                }
554                _ => {
555                    tracing::info!(module = entry.name, "Stopped module");
556                }
557            }
558        }
559    }
560
561    /// STOP phase: stop all stateful modules in reverse order.
562    ///
563    /// Errors are logged but do not fail the shutdown process.
564    /// Note: `OoP` modules are stopped automatically by the backend when the
565    /// cancellation token is triggered.
566    async fn run_stop_phase(&self) -> Result<(), RegistryError> {
567        tracing::info!("Phase: stop");
568
569        for e in self.registry.modules().iter().rev() {
570            Self::stop_one_module(e, self.cancel.clone()).await;
571        }
572
573        Ok(())
574    }
575
576    /// `OoP` SPAWN phase: spawn out-of-process modules after start phase.
577    ///
578    /// This phase runs after `grpc-hub` is already listening, so we can pass
579    /// the real directory endpoint to `OoP` modules.
580    async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
581        let oop_opts = match &self.oop_options {
582            Some(opts) if !opts.modules.is_empty() => opts,
583            _ => return Ok(()),
584        };
585
586        tracing::info!("Phase: oop_spawn");
587
588        // Wait for grpc_hub to publish its endpoint (it runs async in start phase)
589        let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
590
591        for module_cfg in &oop_opts.modules {
592            // Build environment with directory endpoint and rendered config
593            // Note: User controls --config via execution.args in master config
594            let mut env = module_cfg.env.clone();
595            env.insert(
596                MODKIT_MODULE_CONFIG_ENV.to_owned(),
597                module_cfg.rendered_config_json.clone(),
598            );
599            if let Some(ref endpoint) = directory_endpoint {
600                env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
601            }
602
603            // Use args from execution config as-is (user controls --config via args)
604            let args = module_cfg.args.clone();
605
606            let spawn_config = OopSpawnConfig {
607                module_name: module_cfg.module_name.clone(),
608                binary: module_cfg.binary.clone(),
609                args,
610                env,
611                working_directory: module_cfg.working_directory.clone(),
612            };
613
614            oop_opts
615                .backend
616                .spawn(spawn_config)
617                .await
618                .map_err(|e| RegistryError::OopSpawn {
619                    module: module_cfg.module_name.clone(),
620                    source: e,
621                })?;
622
623            tracing::info!(
624                module = %module_cfg.module_name,
625                directory_endpoint = ?directory_endpoint,
626                "Spawned OoP module via backend"
627            );
628        }
629
630        Ok(())
631    }
632
633    /// Wait for `grpc-hub` to publish its bound endpoint.
634    ///
635    /// Polls the `GrpcHubModule::bound_endpoint()` with a short interval until available or timeout.
636    /// Returns None if no `grpc-hub` is running or if it times out.
637    async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
638        const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
639        const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
640
641        // Find grpc_hub in registry
642        let grpc_hub = self
643            .registry
644            .modules()
645            .iter()
646            .find_map(|e| e.caps.query::<GrpcHubCap>());
647
648        let Some(hub) = grpc_hub else {
649            return None; // No grpc_hub registered
650        };
651
652        let start = std::time::Instant::now();
653
654        loop {
655            if let Some(endpoint) = hub.bound_endpoint() {
656                tracing::debug!(
657                    endpoint = %endpoint,
658                    elapsed_ms = start.elapsed().as_millis(),
659                    "gRPC hub endpoint available"
660                );
661                return Some(endpoint);
662            }
663
664            if start.elapsed() > MAX_WAIT {
665                tracing::warn!("Timed out waiting for gRPC hub to bind");
666                return None;
667            }
668
669            tokio::time::sleep(POLL_INTERVAL).await;
670        }
671    }
672
673    /// Run the full module lifecycle (all phases).
674    ///
675    /// This is the standard entry point for normal application execution.
676    /// It runs all phases from pre-init through shutdown.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if any module phase fails during execution.
681    pub async fn run_module_phases(self) -> anyhow::Result<()> {
682        self.run_phases_internal(RunMode::Full).await
683    }
684
685    /// Run only the migration phases (pre-init + DB migration).
686    ///
687    /// This is designed for cloud deployment workflows where database migrations
688    /// need to run as a separate step before starting the application.
689    /// The process exits after migrations complete.
690    ///
691    /// # Errors
692    ///
693    /// Returns an error if pre-init or migration phases fail.
694    pub async fn run_migration_phases(self) -> anyhow::Result<()> {
695        self.run_phases_internal(RunMode::MigrateOnly).await
696    }
697
698    /// Internal implementation that runs module phases based on the mode.
699    ///
700    /// This private method contains the actual phase execution logic and is called
701    /// by both `run_module_phases()` and `run_migration_phases()`.
702    ///
703    /// # Modes
704    ///
705    /// - `RunMode::Full`: Executes all phases and waits for shutdown signal
706    /// - `RunMode::MigrateOnly`: Executes only pre-init and DB migration phases, then exits
707    ///
708    /// # Phases (Full Mode)
709    ///
710    /// 1. Pre-init (system modules only)
711    /// 2. DB migration (all modules with database capability)
712    /// 3. Init (all modules)
713    /// 4. Post-init (system modules only)
714    /// 5. REST (modules with REST capability)
715    /// 6. gRPC (modules with gRPC capability)
716    /// 7. Start (runnable modules)
717    /// 8. `OoP` spawn (out-of-process modules)
718    /// 9. Wait for cancellation
719    /// 10. Stop (runnable modules in reverse order)
720    async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
721        // Log execution mode
722        match mode {
723            RunMode::Full => {
724                tracing::info!("Running full lifecycle (all phases)");
725            }
726            RunMode::MigrateOnly => {
727                tracing::info!("Running in migration mode (pre-init + db phases only)");
728            }
729        }
730
731        // 1. Pre-init phase (before init, only for system modules)
732        self.run_pre_init_phase()?;
733
734        // 2. DB migration phase (system modules first)
735        #[cfg(feature = "db")]
736        {
737            self.run_db_phase().await?;
738        }
739        #[cfg(not(feature = "db"))]
740        {
741            // No DB integration in this build.
742        }
743
744        // Exit early if running in migration-only mode
745        if mode == RunMode::MigrateOnly {
746            tracing::info!("Migration phases completed successfully");
747            return Ok(());
748        }
749
750        // 3. Init phase (system modules first)
751        self.run_init_phase().await?;
752
753        // 4. Post-init phase (barrier after ALL init; system modules only)
754        self.run_post_init_phase().await?;
755
756        // 5. REST phase (synchronous router composition)
757        let _router = self.run_rest_phase().await?;
758
759        // 6. gRPC registration phase
760        self.run_grpc_phase().await?;
761
762        // 7. Start phase
763        self.run_start_phase().await?;
764
765        // 8. OoP spawn phase (after grpc_hub is running)
766        self.run_oop_spawn_phase().await?;
767
768        // 9. Wait for cancellation
769        self.cancel.cancelled().await;
770
771        // 10. Stop phase with hard timeout.
772        //     Blocking syscalls (e.g. libc getaddrinfo in tokio spawn_blocking)
773        //     can saturate all tokio worker threads, preventing tokio timers
774        //     from firing. Use an OS thread so the watchdog works even when
775        //     the tokio runtime is fully blocked.
776        let stop_timeout = std::time::Duration::from_secs(15);
777        let disarm = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
778        let disarm_clone = std::sync::Arc::clone(&disarm);
779        std::thread::spawn(move || {
780            std::thread::sleep(stop_timeout);
781            if !disarm_clone.load(std::sync::atomic::Ordering::Relaxed) {
782                tracing::warn!(
783                    timeout_secs = stop_timeout.as_secs(),
784                    "shutdown: stop phase timed out, force exiting"
785                );
786                std::process::exit(1);
787            }
788        });
789
790        self.run_stop_phase().await?;
791        disarm.store(true, std::sync::atomic::Ordering::Relaxed);
792
793        Ok(())
794    }
795}
796
797#[cfg(test)]
798#[cfg_attr(coverage_nightly, coverage(off))]
799mod tests {
800    use super::*;
801    use crate::context::ModuleCtx;
802    use crate::contracts::{Module, RunnableCapability, SystemCapability};
803    use crate::registry::RegistryBuilder;
804    use std::sync::Arc;
805    use std::sync::atomic::{AtomicUsize, Ordering};
806    use tokio::sync::Mutex;
807
808    #[derive(Default)]
809    #[allow(dead_code)]
810    struct DummyCore;
811    #[async_trait::async_trait]
812    impl Module for DummyCore {
813        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
814            Ok(())
815        }
816    }
817
818    struct StopOrderTracker {
819        my_order: usize,
820        stop_order: Arc<AtomicUsize>,
821    }
822
823    impl StopOrderTracker {
824        fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
825            let my_order = counter.fetch_add(1, Ordering::SeqCst);
826            Self {
827                my_order,
828                stop_order,
829            }
830        }
831    }
832
833    #[async_trait::async_trait]
834    impl Module for StopOrderTracker {
835        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
836            Ok(())
837        }
838    }
839
840    #[async_trait::async_trait]
841    impl RunnableCapability for StopOrderTracker {
842        async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
843            Ok(())
844        }
845        async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
846            let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
847            tracing::info!(
848                my_order = self.my_order,
849                stop_order = order,
850                "Module stopped"
851            );
852            Ok(())
853        }
854    }
855
856    #[tokio::test]
857    async fn test_stop_phase_reverse_order() {
858        let counter = Arc::new(AtomicUsize::new(0));
859        let stop_order = Arc::new(AtomicUsize::new(0));
860
861        let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
862        let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
863        let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
864
865        let mut builder = RegistryBuilder::default();
866        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
867        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
868        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
869
870        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
871        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
872        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
873
874        let registry = builder.build_topo_sorted().unwrap();
875
876        // Verify module order is a -> b -> c
877        let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
878        assert_eq!(module_names, vec!["a", "b", "c"]);
879
880        let client_hub = Arc::new(ClientHub::new());
881        let cancel = CancellationToken::new();
882        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
883
884        let runtime = HostRuntime::new(
885            registry,
886            config_provider,
887            DbOptions::None,
888            client_hub,
889            cancel.clone(),
890            Uuid::new_v4(),
891            None,
892        );
893
894        // Run stop phase
895        runtime.run_stop_phase().await.unwrap();
896
897        // Verify modules stopped in reverse order: c (stop_order=0), b (stop_order=1), a (stop_order=2)
898        // Module order is: a=0, b=1, c=2
899        // Stop order should be: c=0, b=1, a=2
900        assert_eq!(stop_order.load(Ordering::SeqCst), 3);
901    }
902
903    #[tokio::test]
904    async fn test_stop_phase_continues_on_error() {
905        struct FailingModule {
906            should_fail: bool,
907            stopped: Arc<AtomicUsize>,
908        }
909
910        #[async_trait::async_trait]
911        impl Module for FailingModule {
912            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
913                Ok(())
914            }
915        }
916
917        #[async_trait::async_trait]
918        impl RunnableCapability for FailingModule {
919            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
920                Ok(())
921            }
922            async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
923                self.stopped.fetch_add(1, Ordering::SeqCst);
924                if self.should_fail {
925                    anyhow::bail!("Intentional failure")
926                }
927                Ok(())
928            }
929        }
930
931        let stopped = Arc::new(AtomicUsize::new(0));
932        let module_a = Arc::new(FailingModule {
933            should_fail: false,
934            stopped: stopped.clone(),
935        });
936        let module_b = Arc::new(FailingModule {
937            should_fail: true,
938            stopped: stopped.clone(),
939        });
940        let module_c = Arc::new(FailingModule {
941            should_fail: false,
942            stopped: stopped.clone(),
943        });
944
945        let mut builder = RegistryBuilder::default();
946        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
947        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
948        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
949
950        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
951        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
952        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
953
954        let registry = builder.build_topo_sorted().unwrap();
955
956        let client_hub = Arc::new(ClientHub::new());
957        let cancel = CancellationToken::new();
958        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
959
960        let runtime = HostRuntime::new(
961            registry,
962            config_provider,
963            DbOptions::None,
964            client_hub,
965            cancel.clone(),
966            Uuid::new_v4(),
967            None,
968        );
969
970        // Run stop phase - should not fail even though module_b fails
971        runtime.run_stop_phase().await.unwrap();
972
973        // All modules should have attempted to stop
974        assert_eq!(stopped.load(Ordering::SeqCst), 3);
975    }
976
977    struct EmptyConfigProvider;
978    impl ConfigProvider for EmptyConfigProvider {
979        fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
980            None
981        }
982    }
983
984    #[tokio::test]
985    async fn test_post_init_runs_after_all_init_and_system_first() {
986        #[derive(Clone)]
987        struct TrackHooks {
988            name: &'static str,
989            events: Arc<Mutex<Vec<String>>>,
990        }
991
992        #[async_trait::async_trait]
993        impl Module for TrackHooks {
994            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
995                self.events.lock().await.push(format!("init:{}", self.name));
996                Ok(())
997            }
998        }
999
1000        #[async_trait::async_trait]
1001        impl SystemCapability for TrackHooks {
1002            fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1003                Ok(())
1004            }
1005
1006            async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1007                self.events
1008                    .lock()
1009                    .await
1010                    .push(format!("post_init:{}", self.name));
1011                Ok(())
1012            }
1013        }
1014
1015        let events = Arc::new(Mutex::new(Vec::<String>::new()));
1016        let sys_a = Arc::new(TrackHooks {
1017            name: "sys_a",
1018            events: events.clone(),
1019        });
1020        let user_b = Arc::new(TrackHooks {
1021            name: "user_b",
1022            events: events.clone(),
1023        });
1024        let user_c = Arc::new(TrackHooks {
1025            name: "user_c",
1026            events: events.clone(),
1027        });
1028
1029        let mut builder = RegistryBuilder::default();
1030        builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1031        builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1032        builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1033        builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1034
1035        let registry = builder.build_topo_sorted().unwrap();
1036
1037        let client_hub = Arc::new(ClientHub::new());
1038        let cancel = CancellationToken::new();
1039        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1040
1041        let runtime = HostRuntime::new(
1042            registry,
1043            config_provider,
1044            DbOptions::None,
1045            client_hub,
1046            cancel,
1047            Uuid::new_v4(),
1048            None,
1049        );
1050
1051        // Run init phase for all modules, then post_init as a separate barrier phase.
1052        runtime.run_init_phase().await.unwrap();
1053        runtime.run_post_init_phase().await.unwrap();
1054
1055        let events = events.lock().await.clone();
1056        let first_post_init = events
1057            .iter()
1058            .position(|e| e.starts_with("post_init:"))
1059            .expect("expected post_init events");
1060        assert!(
1061            events[..first_post_init]
1062                .iter()
1063                .all(|e| e.starts_with("init:")),
1064            "expected all init events before post_init, got: {events:?}"
1065        );
1066
1067        // system-first order within each phase
1068        assert_eq!(
1069            events,
1070            vec![
1071                "init:sys_a",
1072                "init:user_b",
1073                "init:user_c",
1074                "post_init:sys_a",
1075            ]
1076        );
1077    }
1078}