Skip to main content

modkit/runtime/
host_runtime.rs

1//! Host Runtime - orchestrates the full `ModKit` lifecycle
2//!
3//! This module contains the `HostRuntime` type that owns and coordinates
4//! the execution of all lifecycle phases.
5//!
6//! High-level phase order:
7//! - `pre_init` (system modules only)
8//! - DB migrations (modules with DB capability)
9//! - `init` (all modules)
10//! - `post_init` (system modules only; runs after *all* `init` complete)
11//! - REST wiring (modules with REST capability; requires a single REST host)
12//! - gRPC registration (modules with gRPC capability; requires a single gRPC hub)
13//! - start/stop (stateful modules)
14//! - `OoP` spawn / wait / stop (host-only orchestration)
15
16use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28    ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29    SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36/// How the runtime should provide DBs to modules.
37#[derive(Clone)]
38pub enum DbOptions {
39    /// No database integration. `ModuleCtx::db()` will be `None`, `db_required()` will error.
40    None,
41    /// Use a `DbManager` to handle database connections with Figment-based configuration.
42    #[cfg(feature = "db")]
43    Manager(Arc<modkit_db::DbManager>),
44}
45
46/// Runtime execution mode that determines which phases to run.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49    /// Run all phases and wait for shutdown signal (normal application mode).
50    Full,
51    /// Run only pre-init and DB migration phases, then exit (for cloud deployments).
52    MigrateOnly,
53}
54
55/// Environment variable name for passing directory endpoint to `OoP` modules.
56pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58/// Environment variable name for passing rendered module config to `OoP` modules.
59pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61/// Default shutdown deadline for graceful module stop (35 seconds).
62///
63/// This is intentionally 5 seconds longer than `WithLifecycle::stop_timeout` (30s default)
64/// to ensure deterministic behavior: the lifecycle's internal timeout fires first,
65/// and the runtime deadline acts as a hard backstop.
66pub const DEFAULT_SHUTDOWN_DEADLINE: std::time::Duration = std::time::Duration::from_secs(35);
67
68/// `HostRuntime` owns the lifecycle orchestration for `ModKit`.
69///
70/// It encapsulates all runtime state and drives modules through the full lifecycle (see module docs).
71pub struct HostRuntime {
72    registry: ModuleRegistry,
73    ctx_builder: ModuleContextBuilder,
74    instance_id: Uuid,
75    module_manager: Arc<ModuleManager>,
76    grpc_installers: Arc<GrpcInstallerStore>,
77    #[allow(dead_code)]
78    client_hub: Arc<ClientHub>,
79    cancel: CancellationToken,
80    #[allow(dead_code)]
81    db_options: DbOptions,
82    /// `OoP` module spawn configuration and backend
83    oop_options: Option<OopSpawnOptions>,
84    /// Maximum time allowed for graceful shutdown before hard-stop signal is sent.
85    shutdown_deadline: std::time::Duration,
86}
87
88impl HostRuntime {
89    /// Create a new `HostRuntime` instance.
90    ///
91    /// This prepares all runtime components but does not start any lifecycle phases.
92    pub fn new(
93        registry: ModuleRegistry,
94        modules_cfg: Arc<dyn ConfigProvider>,
95        db_options: DbOptions,
96        client_hub: Arc<ClientHub>,
97        cancel: CancellationToken,
98        instance_id: Uuid,
99        oop_options: Option<OopSpawnOptions>,
100    ) -> Self {
101        // Create runtime-owned components for system modules
102        let module_manager = Arc::new(ModuleManager::new());
103        let grpc_installers = Arc::new(GrpcInstallerStore::new());
104
105        // Build the context builder that will resolve per-module DbHandles
106        let db_manager = match &db_options {
107            #[cfg(feature = "db")]
108            DbOptions::Manager(mgr) => Some(mgr.clone()),
109            DbOptions::None => None,
110        };
111
112        let ctx_builder = ModuleContextBuilder::new(
113            instance_id,
114            modules_cfg,
115            client_hub.clone(),
116            cancel.clone(),
117            db_manager,
118        );
119
120        Self {
121            registry,
122            ctx_builder,
123            instance_id,
124            module_manager,
125            grpc_installers,
126            client_hub,
127            cancel,
128            db_options,
129            oop_options,
130            shutdown_deadline: DEFAULT_SHUTDOWN_DEADLINE,
131        }
132    }
133
134    /// Set a custom shutdown deadline for graceful module stop.
135    ///
136    /// This is the maximum time the runtime will wait for each module to stop gracefully
137    /// before sending the hard-stop signal (cancelling the deadline token).
138    ///
139    /// # Relationship with `WithLifecycle::stop_timeout`
140    ///
141    /// When using `WithLifecycle`, its `stop_timeout` (default 30s) races against this
142    /// `shutdown_deadline` (also default 30s). To ensure deterministic behavior:
143    ///
144    /// - `WithLifecycle::stop_timeout` should be **less than** `shutdown_deadline`
145    /// - This allows the lifecycle's internal timeout to trigger first for graceful cleanup
146    /// - The runtime's `deadline_token` then acts as a hard backstop
147    ///
148    /// Example: `stop_timeout = 25s`, `shutdown_deadline = 30s`
149    #[must_use]
150    pub fn with_shutdown_deadline(mut self, deadline: std::time::Duration) -> Self {
151        self.shutdown_deadline = deadline;
152        self
153    }
154
155    /// `PRE_INIT` phase: wire runtime internals into system modules.
156    ///
157    /// This phase runs before init and only for modules with the "system" capability.
158    ///
159    /// # Errors
160    /// Returns `RegistryError` if system wiring fails.
161    pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
162        tracing::info!("Phase: pre_init");
163
164        let sys_ctx = SystemContext::new(
165            self.instance_id,
166            Arc::clone(&self.module_manager),
167            Arc::clone(&self.grpc_installers),
168        );
169
170        for entry in self.registry.modules() {
171            // Check for cancellation before processing each module
172            if self.cancel.is_cancelled() {
173                tracing::warn!("Pre-init phase cancelled by signal");
174                return Err(RegistryError::Cancelled);
175            }
176
177            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
178                tracing::debug!(module = entry.name, "Running system pre_init");
179                sys_mod
180                    .pre_init(&sys_ctx)
181                    .map_err(|e| RegistryError::PreInit {
182                        module: entry.name,
183                        source: e,
184                    })?;
185            }
186        }
187
188        Ok(())
189    }
190
191    /// Helper: resolve context for a module with error mapping.
192    async fn module_context(
193        &self,
194        module_name: &'static str,
195    ) -> Result<crate::context::ModuleCtx, RegistryError> {
196        self.ctx_builder
197            .for_module(module_name)
198            .await
199            .map_err(|e| RegistryError::DbMigrate {
200                module: module_name,
201                source: e,
202            })
203    }
204
205    /// Helper: extract DB handle and module if both exist.
206    #[cfg(feature = "db")]
207    async fn db_migration_target(
208        &self,
209        module_name: &'static str,
210        ctx: &crate::context::ModuleCtx,
211        db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
212    ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
213    {
214        let Some(dbm) = db_module else {
215            return Ok(None);
216        };
217
218        // Important: DB migrations require access to the underlying `Db`, not just `DBProvider`.
219        // `ModuleCtx` intentionally exposes only `DBProvider` for better DX and to reduce mistakes.
220        // So the runtime resolves the `Db` directly from its `DbManager`.
221        let db = match &self.db_options {
222            DbOptions::None => None,
223            #[cfg(feature = "db")]
224            DbOptions::Manager(mgr) => {
225                mgr.get(module_name)
226                    .await
227                    .map_err(|e| RegistryError::DbMigrate {
228                        module: module_name,
229                        source: e.into(),
230                    })?
231            }
232        };
233
234        _ = ctx; // ctx is kept for parity/error context; DB is resolved from manager above.
235        Ok(db.map(|db| (db, dbm)))
236    }
237
238    /// Helper: run migrations for a single module using the new migration runner.
239    ///
240    /// This collects migrations from the module and executes them via the
241    /// runtime's privileged connection. Modules never see the raw connection.
242    #[cfg(feature = "db")]
243    async fn migrate_module(
244        module_name: &'static str,
245        db: &modkit_db::Db,
246        db_module: Arc<dyn crate::contracts::DatabaseCapability>,
247    ) -> Result<(), RegistryError> {
248        // Collect migrations from the module
249        let migrations = db_module.migrations();
250
251        if migrations.is_empty() {
252            tracing::debug!(module = module_name, "No migrations to run");
253            return Ok(());
254        }
255
256        tracing::debug!(
257            module = module_name,
258            count = migrations.len(),
259            "Running DB migrations"
260        );
261
262        // Execute migrations using the migration runner
263        let result =
264            modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
265                .await
266                .map_err(|e| RegistryError::DbMigrate {
267                    module: module_name,
268                    source: anyhow::Error::new(e),
269                })?;
270
271        tracing::info!(
272            module = module_name,
273            applied = result.applied,
274            skipped = result.skipped,
275            "DB migrations completed"
276        );
277
278        Ok(())
279    }
280
281    /// DB MIGRATION phase: run migrations for all modules with DB capability.
282    ///
283    /// Runs before init, with system modules processed first.
284    ///
285    /// Modules provide migrations via `DatabaseCapability::migrations()`.
286    /// The runtime executes them with a privileged connection that modules
287    /// never receive directly. Each module gets a separate migration history
288    /// table, preventing cross-module interference.
289    #[cfg(feature = "db")]
290    async fn run_db_phase(&self) -> Result<(), RegistryError> {
291        tracing::info!("Phase: db (before init)");
292
293        for entry in self.registry.modules_by_system_priority() {
294            // Check for cancellation before processing each module
295            if self.cancel.is_cancelled() {
296                tracing::warn!("DB migration phase cancelled by signal");
297                return Err(RegistryError::Cancelled);
298            }
299
300            let ctx = self.module_context(entry.name).await?;
301            let db_module = entry.caps.query::<DatabaseCap>();
302
303            match self
304                .db_migration_target(entry.name, &ctx, db_module.clone())
305                .await?
306            {
307                Some((db, dbm)) => {
308                    Self::migrate_module(entry.name, &db, dbm).await?;
309                }
310                None if db_module.is_some() => {
311                    tracing::debug!(
312                        module = entry.name,
313                        "Module has DbModule trait but no DB handle (no config)"
314                    );
315                }
316                None => {}
317            }
318        }
319
320        Ok(())
321    }
322
323    /// INIT phase: initialize all modules in topological order.
324    ///
325    /// System modules initialize first, followed by user modules.
326    async fn run_init_phase(&self) -> Result<(), RegistryError> {
327        tracing::info!("Phase: init");
328
329        for entry in self.registry.modules_by_system_priority() {
330            let ctx =
331                self.ctx_builder
332                    .for_module(entry.name)
333                    .await
334                    .map_err(|e| RegistryError::Init {
335                        module: entry.name,
336                        source: e,
337                    })?;
338            tracing::info!(module = entry.name, "Initializing a module...");
339            entry
340                .core
341                .init(&ctx)
342                .await
343                .map_err(|e| RegistryError::Init {
344                    module: entry.name,
345                    source: e,
346                })?;
347            tracing::info!(module = entry.name, "Initialized a module.");
348        }
349
350        Ok(())
351    }
352
353    /// `POST_INIT` phase: optional hook after ALL modules completed `init()`.
354    ///
355    /// This provides a global barrier between initialization-time registration
356    /// and subsequent phases that may rely on a fully-populated runtime registry.
357    ///
358    /// System modules run first, followed by user modules, preserving topo order.
359    async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
360        tracing::info!("Phase: post_init");
361
362        let sys_ctx = SystemContext::new(
363            self.instance_id,
364            Arc::clone(&self.module_manager),
365            Arc::clone(&self.grpc_installers),
366        );
367
368        for entry in self.registry.modules_by_system_priority() {
369            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
370                sys_mod
371                    .post_init(&sys_ctx)
372                    .await
373                    .map_err(|e| RegistryError::PostInit {
374                        module: entry.name,
375                        source: e,
376                    })?;
377            }
378        }
379
380        Ok(())
381    }
382
383    /// REST phase: compose the router against the REST host.
384    ///
385    /// This is a synchronous phase that builds the final Router by:
386    /// 1. Preparing the host module
387    /// 2. Registering all REST providers
388    /// 3. Finalizing with `OpenAPI` endpoints
389    async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
390        tracing::info!("Phase: rest (sync)");
391
392        let mut router = Router::new();
393
394        // Find host(s) and whether any rest modules exist
395        let host_count = self
396            .registry
397            .modules()
398            .iter()
399            .filter(|e| e.caps.has::<ApiGatewayCap>())
400            .count();
401
402        match host_count {
403            0 => {
404                return if self
405                    .registry
406                    .modules()
407                    .iter()
408                    .any(|e| e.caps.has::<RestApiCap>())
409                {
410                    Err(RegistryError::RestRequiresHost)
411                } else {
412                    Ok(router)
413                };
414            }
415            1 => { /* proceed */ }
416            _ => return Err(RegistryError::MultipleRestHosts),
417        }
418
419        // Resolve the single host entry and its module context
420        let host_idx = self
421            .registry
422            .modules()
423            .iter()
424            .position(|e| e.caps.has::<ApiGatewayCap>())
425            .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
426        let host_entry = &self.registry.modules()[host_idx];
427        let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
428            return Err(RegistryError::RestHostMissingFromEntry);
429        };
430        let host_ctx = self
431            .ctx_builder
432            .for_module(host_entry.name)
433            .await
434            .map_err(|e| RegistryError::RestPrepare {
435                module: host_entry.name,
436                source: e,
437            })?;
438
439        // use host as the registry
440        let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
441
442        // 1) Host prepare: base Router / global middlewares / basic OAS meta
443        router =
444            host.rest_prepare(&host_ctx, router)
445                .map_err(|source| RegistryError::RestPrepare {
446                    module: host_entry.name,
447                    source,
448                })?;
449
450        // 2) Register all REST providers (in the current discovery order)
451        for e in self.registry.modules() {
452            if let Some(rest) = e.caps.query::<RestApiCap>() {
453                let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
454                    RegistryError::RestRegister {
455                        module: e.name,
456                        source: err,
457                    }
458                })?;
459
460                router = rest
461                    .register_rest(&ctx, router, registry)
462                    .map_err(|source| RegistryError::RestRegister {
463                        module: e.name,
464                        source,
465                    })?;
466            }
467        }
468
469        // 3) Host finalize: attach /openapi.json and /docs, persist Router if needed (no server start)
470        router = host.rest_finalize(&host_ctx, router).map_err(|source| {
471            RegistryError::RestFinalize {
472                module: host_entry.name,
473                source,
474            }
475        })?;
476
477        Ok(router)
478    }
479
480    /// gRPC registration phase: collect services from all grpc modules.
481    ///
482    /// Services are stored in the installer store for the `grpc-hub` to consume during start.
483    async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
484        tracing::info!("Phase: grpc (registration)");
485
486        // If no grpc_hub and no grpc_services, skip the phase
487        if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
488            return Ok(());
489        }
490
491        // If there are grpc_services but no hub, that's an error
492        if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
493            return Err(RegistryError::GrpcRequiresHub);
494        }
495
496        // If there's a hub, collect all services grouped by module and hand them off to the installer store
497        if let Some(hub_name) = &self.registry.grpc_hub {
498            let mut modules_data = Vec::new();
499            let mut seen = HashSet::new();
500
501            // Collect services from all grpc modules
502            for (module_name, service_module) in &self.registry.grpc_services {
503                let ctx = self
504                    .ctx_builder
505                    .for_module(module_name)
506                    .await
507                    .map_err(|err| RegistryError::GrpcRegister {
508                        module: module_name.clone(),
509                        source: err,
510                    })?;
511
512                let installers =
513                    service_module
514                        .get_grpc_services(&ctx)
515                        .await
516                        .map_err(|source| RegistryError::GrpcRegister {
517                            module: module_name.clone(),
518                            source,
519                        })?;
520
521                for reg in &installers {
522                    if !seen.insert(reg.service_name) {
523                        return Err(RegistryError::GrpcRegister {
524                            module: module_name.clone(),
525                            source: anyhow::anyhow!(
526                                "Duplicate gRPC service name: {}",
527                                reg.service_name
528                            ),
529                        });
530                    }
531                }
532
533                modules_data.push(crate::runtime::ModuleInstallers {
534                    module_name: module_name.clone(),
535                    installers,
536                });
537            }
538
539            self.grpc_installers
540                .set(crate::runtime::GrpcInstallerData {
541                    modules: modules_data,
542                })
543                .map_err(|source| RegistryError::GrpcRegister {
544                    module: hub_name.clone(),
545                    source,
546                })?;
547        }
548
549        Ok(())
550    }
551
552    /// START phase: start all stateful modules.
553    ///
554    /// System modules start first, followed by user modules.
555    async fn run_start_phase(&self) -> Result<(), RegistryError> {
556        tracing::info!("Phase: start");
557
558        for e in self.registry.modules_by_system_priority() {
559            if let Some(s) = e.caps.query::<RunnableCap>() {
560                tracing::debug!(
561                    module = e.name,
562                    is_system = e.caps.has::<SystemCap>(),
563                    "Starting stateful module"
564                );
565                s.start(self.cancel.clone())
566                    .await
567                    .map_err(|source| RegistryError::Start {
568                        module: e.name,
569                        source,
570                    })?;
571                tracing::info!(module = e.name, "Started module");
572            }
573        }
574
575        Ok(())
576    }
577
578    /// Stop a single module, logging errors but continuing execution.
579    async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
580        if let Some(s) = entry.caps.query::<RunnableCap>() {
581            match s.stop(cancel).await {
582                Err(err) => {
583                    tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
584                }
585                _ => {
586                    tracing::info!(module = entry.name, "Stopped module");
587                }
588            }
589        }
590    }
591
592    /// STOP phase: stop all stateful modules in reverse order.
593    ///
594    /// # Two-Phase Shutdown Contract
595    ///
596    /// This phase implements a proper two-phase shutdown for **each module**:
597    ///
598    /// 1. **Graceful stop request**: Each module's `stop(deadline_token)` is called with a
599    ///    *fresh* cancellation token (not the already-cancelled root token). Modules should
600    ///    interpret this as "please stop gracefully".
601    ///
602    /// 2. **Hard-stop deadline**: After `shutdown_deadline` expires **for that module**,
603    ///    its `deadline_token` is cancelled. Modules should interpret this as "abort immediately".
604    ///
605    /// Each module gets its own independent deadline — if module A takes 25s to stop,
606    /// module B still gets the full `shutdown_deadline` for its graceful shutdown.
607    ///
608    /// This allows modules to implement real graceful shutdown:
609    /// - Request cooperative shutdown of child tasks
610    /// - Wait for them to finish gracefully
611    /// - If `deadline_token` fires, switch to hard-abort mode
612    ///
613    /// Errors are logged but do not fail the shutdown process.
614    /// Note: `OoP` modules are stopped automatically by the backend when the
615    /// cancellation token is triggered.
616    async fn run_stop_phase(&self) -> Result<(), RegistryError> {
617        tracing::info!("Phase: stop");
618
619        let deadline = self.shutdown_deadline;
620
621        // Stop all modules in reverse order, each with its own independent deadline
622        for e in self.registry.modules().iter().rev() {
623            let module_name = e.name;
624
625            // Create a fresh deadline token for THIS module
626            // Each module gets the full shutdown_deadline independently
627            let deadline_token = CancellationToken::new();
628            let deadline_token_for_timeout = deadline_token.clone();
629
630            // Spawn a task to cancel this module's deadline token after shutdown_deadline
631            let deadline_task = tokio::spawn(async move {
632                tokio::time::sleep(deadline).await;
633                tracing::warn!(
634                    module = module_name,
635                    deadline_secs = deadline.as_secs(),
636                    "Module shutdown deadline reached, sending hard-stop signal"
637                );
638                deadline_token_for_timeout.cancel();
639            });
640
641            // Stop this module with its own deadline token
642            // The module can observe the token transition from uncancelled→cancelled
643            Self::stop_one_module(e, deadline_token).await;
644
645            // Cancel the deadline task and await it to ensure full cleanup
646            deadline_task.abort();
647            #[allow(clippy::let_underscore_must_use)]
648            let _ = deadline_task.await;
649        }
650
651        Ok(())
652    }
653
654    /// `OoP` SPAWN phase: spawn out-of-process modules after start phase.
655    ///
656    /// This phase runs after `grpc-hub` is already listening, so we can pass
657    /// the real directory endpoint to `OoP` modules.
658    async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
659        let oop_opts = match &self.oop_options {
660            Some(opts) if !opts.modules.is_empty() => opts,
661            _ => return Ok(()),
662        };
663
664        tracing::info!("Phase: oop_spawn");
665
666        // Wait for grpc_hub to publish its endpoint (it runs async in start phase)
667        let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
668
669        for module_cfg in &oop_opts.modules {
670            // Build environment with directory endpoint and rendered config
671            // Note: User controls --config via execution.args in master config
672            let mut env = module_cfg.env.clone();
673            env.insert(
674                MODKIT_MODULE_CONFIG_ENV.to_owned(),
675                module_cfg.rendered_config_json.clone(),
676            );
677            if let Some(ref endpoint) = directory_endpoint {
678                env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
679            }
680
681            // Use args from execution config as-is (user controls --config via args)
682            let args = module_cfg.args.clone();
683
684            let spawn_config = OopSpawnConfig {
685                module_name: module_cfg.module_name.clone(),
686                binary: module_cfg.binary.clone(),
687                args,
688                env,
689                working_directory: module_cfg.working_directory.clone(),
690            };
691
692            oop_opts
693                .backend
694                .spawn(spawn_config)
695                .await
696                .map_err(|e| RegistryError::OopSpawn {
697                    module: module_cfg.module_name.clone(),
698                    source: e,
699                })?;
700
701            tracing::info!(
702                module = %module_cfg.module_name,
703                directory_endpoint = ?directory_endpoint,
704                "Spawned OoP module via backend"
705            );
706        }
707
708        Ok(())
709    }
710
711    /// Wait for `grpc-hub` to publish its bound endpoint.
712    ///
713    /// Polls the `GrpcHubModule::bound_endpoint()` with a short interval until available or timeout.
714    /// Returns None if no `grpc-hub` is running or if it times out.
715    async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
716        const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
717        const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
718
719        // Find grpc_hub in registry
720        let grpc_hub = self
721            .registry
722            .modules()
723            .iter()
724            .find_map(|e| e.caps.query::<GrpcHubCap>());
725
726        let Some(hub) = grpc_hub else {
727            return None; // No grpc_hub registered
728        };
729
730        let start = std::time::Instant::now();
731
732        loop {
733            if let Some(endpoint) = hub.bound_endpoint() {
734                tracing::debug!(
735                    endpoint = %endpoint,
736                    elapsed_ms = start.elapsed().as_millis(),
737                    "gRPC hub endpoint available"
738                );
739                return Some(endpoint);
740            }
741
742            if start.elapsed() > MAX_WAIT {
743                tracing::warn!("Timed out waiting for gRPC hub to bind");
744                return None;
745            }
746
747            tokio::time::sleep(POLL_INTERVAL).await;
748        }
749    }
750
751    /// Run the full module lifecycle (all phases).
752    ///
753    /// This is the standard entry point for normal application execution.
754    /// It runs all phases from pre-init through shutdown.
755    ///
756    /// # Errors
757    ///
758    /// Returns an error if any module phase fails during execution.
759    pub async fn run_module_phases(self) -> anyhow::Result<()> {
760        self.run_phases_internal(RunMode::Full).await
761    }
762
763    /// Run only the migration phases (pre-init + DB migration).
764    ///
765    /// This is designed for cloud deployment workflows where database migrations
766    /// need to run as a separate step before starting the application.
767    /// The process exits after migrations complete.
768    ///
769    /// # Errors
770    ///
771    /// Returns an error if pre-init or migration phases fail.
772    pub async fn run_migration_phases(self) -> anyhow::Result<()> {
773        self.run_phases_internal(RunMode::MigrateOnly).await
774    }
775
776    /// Internal implementation that runs module phases based on the mode.
777    ///
778    /// This private method contains the actual phase execution logic and is called
779    /// by both `run_module_phases()` and `run_migration_phases()`.
780    ///
781    /// # Modes
782    ///
783    /// - `RunMode::Full`: Executes all phases and waits for shutdown signal
784    /// - `RunMode::MigrateOnly`: Executes only pre-init and DB migration phases, then exits
785    ///
786    /// # Phases (Full Mode)
787    ///
788    /// 1. Pre-init (system modules only)
789    /// 2. DB migration (all modules with database capability)
790    /// 3. Init (all modules)
791    /// 4. Post-init (system modules only)
792    /// 5. REST (modules with REST capability)
793    /// 6. gRPC (modules with gRPC capability)
794    /// 7. Start (runnable modules)
795    /// 8. `OoP` spawn (out-of-process modules)
796    /// 9. Wait for cancellation
797    /// 10. Stop (runnable modules in reverse order)
798    async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
799        // Log execution mode
800        match mode {
801            RunMode::Full => {
802                tracing::info!("Running full lifecycle (all phases)");
803            }
804            RunMode::MigrateOnly => {
805                tracing::info!("Running in migration mode (pre-init + db phases only)");
806            }
807        }
808
809        // 1. Pre-init phase (before init, only for system modules)
810        self.run_pre_init_phase()?;
811
812        // 2. DB migration phase (system modules first)
813        #[cfg(feature = "db")]
814        {
815            self.run_db_phase().await?;
816        }
817        #[cfg(not(feature = "db"))]
818        {
819            // No DB integration in this build.
820        }
821
822        // Exit early if running in migration-only mode
823        if mode == RunMode::MigrateOnly {
824            tracing::info!("Migration phases completed successfully");
825            return Ok(());
826        }
827
828        // 3. Init phase (system modules first)
829        self.run_init_phase().await?;
830
831        // 4. Post-init phase (barrier after ALL init; system modules only)
832        self.run_post_init_phase().await?;
833
834        // 5. REST phase (synchronous router composition)
835        let _router = self.run_rest_phase().await?;
836
837        // 6. gRPC registration phase
838        self.run_grpc_phase().await?;
839
840        // 7. Start phase
841        self.run_start_phase().await?;
842
843        // 8. OoP spawn phase (after grpc_hub is running)
844        self.run_oop_spawn_phase().await?;
845
846        // 9. Wait for cancellation
847        self.cancel.cancelled().await;
848
849        // 10. Stop phase with hard timeout.
850        //     Blocking syscalls (e.g. libc getaddrinfo in tokio spawn_blocking)
851        //     can saturate all tokio worker threads, preventing tokio timers
852        //     from firing. Use an OS thread so the watchdog works even when
853        //     the tokio runtime is fully blocked.
854        let stop_timeout = std::time::Duration::from_secs(15);
855        let disarm = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
856        let disarm_clone = std::sync::Arc::clone(&disarm);
857        std::thread::spawn(move || {
858            std::thread::sleep(stop_timeout);
859            if !disarm_clone.load(std::sync::atomic::Ordering::Relaxed) {
860                tracing::warn!(
861                    timeout_secs = stop_timeout.as_secs(),
862                    "shutdown: stop phase timed out, force exiting"
863                );
864                std::process::exit(1);
865            }
866        });
867
868        self.run_stop_phase().await?;
869        disarm.store(true, std::sync::atomic::Ordering::Relaxed);
870
871        Ok(())
872    }
873}
874
875#[cfg(test)]
876#[cfg_attr(coverage_nightly, coverage(off))]
877mod tests {
878    use super::*;
879    use crate::context::ModuleCtx;
880    use crate::contracts::{Module, RunnableCapability, SystemCapability};
881    use crate::registry::RegistryBuilder;
882    use std::sync::Arc;
883    use std::sync::atomic::{AtomicUsize, Ordering};
884    use tokio::sync::Mutex;
885
886    #[derive(Default)]
887    #[allow(dead_code)]
888    struct DummyCore;
889    #[async_trait::async_trait]
890    impl Module for DummyCore {
891        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
892            Ok(())
893        }
894    }
895
896    struct StopOrderTracker {
897        my_order: usize,
898        stop_order: Arc<AtomicUsize>,
899    }
900
901    impl StopOrderTracker {
902        fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
903            let my_order = counter.fetch_add(1, Ordering::SeqCst);
904            Self {
905                my_order,
906                stop_order,
907            }
908        }
909    }
910
911    #[async_trait::async_trait]
912    impl Module for StopOrderTracker {
913        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
914            Ok(())
915        }
916    }
917
918    #[async_trait::async_trait]
919    impl RunnableCapability for StopOrderTracker {
920        async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
921            Ok(())
922        }
923        async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
924            let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
925            tracing::info!(
926                my_order = self.my_order,
927                stop_order = order,
928                "Module stopped"
929            );
930            Ok(())
931        }
932    }
933
934    #[tokio::test]
935    async fn test_stop_phase_reverse_order() {
936        let counter = Arc::new(AtomicUsize::new(0));
937        let stop_order = Arc::new(AtomicUsize::new(0));
938
939        let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
940        let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
941        let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
942
943        let mut builder = RegistryBuilder::default();
944        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
945        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
946        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
947
948        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
949        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
950        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
951
952        let registry = builder.build_topo_sorted().unwrap();
953
954        // Verify module order is a -> b -> c
955        let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
956        assert_eq!(module_names, vec!["a", "b", "c"]);
957
958        let client_hub = Arc::new(ClientHub::new());
959        let cancel = CancellationToken::new();
960        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
961
962        let runtime = HostRuntime::new(
963            registry,
964            config_provider,
965            DbOptions::None,
966            client_hub,
967            cancel.clone(),
968            Uuid::new_v4(),
969            None,
970        );
971
972        // Run stop phase
973        runtime.run_stop_phase().await.unwrap();
974
975        // Verify modules stopped in reverse order: c (stop_order=0), b (stop_order=1), a (stop_order=2)
976        // Module order is: a=0, b=1, c=2
977        // Stop order should be: c=0, b=1, a=2
978        assert_eq!(stop_order.load(Ordering::SeqCst), 3);
979    }
980
981    #[tokio::test]
982    async fn test_stop_phase_continues_on_error() {
983        struct FailingModule {
984            should_fail: bool,
985            stopped: Arc<AtomicUsize>,
986        }
987
988        #[async_trait::async_trait]
989        impl Module for FailingModule {
990            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
991                Ok(())
992            }
993        }
994
995        #[async_trait::async_trait]
996        impl RunnableCapability for FailingModule {
997            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
998                Ok(())
999            }
1000            async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1001                self.stopped.fetch_add(1, Ordering::SeqCst);
1002                if self.should_fail {
1003                    anyhow::bail!("Intentional failure")
1004                }
1005                Ok(())
1006            }
1007        }
1008
1009        let stopped = Arc::new(AtomicUsize::new(0));
1010        let module_a = Arc::new(FailingModule {
1011            should_fail: false,
1012            stopped: stopped.clone(),
1013        });
1014        let module_b = Arc::new(FailingModule {
1015            should_fail: true,
1016            stopped: stopped.clone(),
1017        });
1018        let module_c = Arc::new(FailingModule {
1019            should_fail: false,
1020            stopped: stopped.clone(),
1021        });
1022
1023        let mut builder = RegistryBuilder::default();
1024        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
1025        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
1026        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
1027
1028        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
1029        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
1030        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
1031
1032        let registry = builder.build_topo_sorted().unwrap();
1033
1034        let client_hub = Arc::new(ClientHub::new());
1035        let cancel = CancellationToken::new();
1036        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1037
1038        let runtime = HostRuntime::new(
1039            registry,
1040            config_provider,
1041            DbOptions::None,
1042            client_hub,
1043            cancel.clone(),
1044            Uuid::new_v4(),
1045            None,
1046        );
1047
1048        // Run stop phase - should not fail even though module_b fails
1049        runtime.run_stop_phase().await.unwrap();
1050
1051        // All modules should have attempted to stop
1052        assert_eq!(stopped.load(Ordering::SeqCst), 3);
1053    }
1054
1055    struct EmptyConfigProvider;
1056    impl ConfigProvider for EmptyConfigProvider {
1057        fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
1058            None
1059        }
1060    }
1061
1062    #[tokio::test]
1063    async fn test_post_init_runs_after_all_init_and_system_first() {
1064        #[derive(Clone)]
1065        struct TrackHooks {
1066            name: &'static str,
1067            events: Arc<Mutex<Vec<String>>>,
1068        }
1069
1070        #[async_trait::async_trait]
1071        impl Module for TrackHooks {
1072            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1073                self.events.lock().await.push(format!("init:{}", self.name));
1074                Ok(())
1075            }
1076        }
1077
1078        #[async_trait::async_trait]
1079        impl SystemCapability for TrackHooks {
1080            fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1081                Ok(())
1082            }
1083
1084            async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1085                self.events
1086                    .lock()
1087                    .await
1088                    .push(format!("post_init:{}", self.name));
1089                Ok(())
1090            }
1091        }
1092
1093        let events = Arc::new(Mutex::new(Vec::<String>::new()));
1094        let sys_a = Arc::new(TrackHooks {
1095            name: "sys_a",
1096            events: events.clone(),
1097        });
1098        let user_b = Arc::new(TrackHooks {
1099            name: "user_b",
1100            events: events.clone(),
1101        });
1102        let user_c = Arc::new(TrackHooks {
1103            name: "user_c",
1104            events: events.clone(),
1105        });
1106
1107        let mut builder = RegistryBuilder::default();
1108        builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1109        builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1110        builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1111        builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1112
1113        let registry = builder.build_topo_sorted().unwrap();
1114
1115        let client_hub = Arc::new(ClientHub::new());
1116        let cancel = CancellationToken::new();
1117        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1118
1119        let runtime = HostRuntime::new(
1120            registry,
1121            config_provider,
1122            DbOptions::None,
1123            client_hub,
1124            cancel,
1125            Uuid::new_v4(),
1126            None,
1127        );
1128
1129        // Run init phase for all modules, then post_init as a separate barrier phase.
1130        runtime.run_init_phase().await.unwrap();
1131        runtime.run_post_init_phase().await.unwrap();
1132
1133        let events = events.lock().await.clone();
1134        let first_post_init = events
1135            .iter()
1136            .position(|e| e.starts_with("post_init:"))
1137            .expect("expected post_init events");
1138        assert!(
1139            events[..first_post_init]
1140                .iter()
1141                .all(|e| e.starts_with("init:")),
1142            "expected all init events before post_init, got: {events:?}"
1143        );
1144
1145        // system-first order within each phase
1146        assert_eq!(
1147            events,
1148            vec![
1149                "init:sys_a",
1150                "init:user_b",
1151                "init:user_c",
1152                "post_init:sys_a",
1153            ]
1154        );
1155    }
1156
1157    #[tokio::test]
1158    async fn test_stop_phase_provides_fresh_deadline_token() {
1159        use std::sync::atomic::AtomicBool;
1160
1161        struct TokenCheckModule {
1162            stop_was_called: AtomicBool,
1163            token_was_cancelled_on_entry: AtomicBool,
1164        }
1165
1166        #[async_trait::async_trait]
1167        impl Module for TokenCheckModule {
1168            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1169                Ok(())
1170            }
1171        }
1172
1173        #[async_trait::async_trait]
1174        impl RunnableCapability for TokenCheckModule {
1175            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1176                Ok(())
1177            }
1178            async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1179                // Record that stop() was called
1180                self.stop_was_called.store(true, Ordering::SeqCst);
1181                // Record whether the token was already cancelled when stop() was called
1182                self.token_was_cancelled_on_entry
1183                    .store(deadline_token.is_cancelled(), Ordering::SeqCst);
1184                Ok(())
1185            }
1186        }
1187
1188        let module = Arc::new(TokenCheckModule {
1189            stop_was_called: AtomicBool::new(false),
1190            // Default to true to detect if stop() was never called
1191            token_was_cancelled_on_entry: AtomicBool::new(true),
1192        });
1193
1194        let mut builder = RegistryBuilder::default();
1195        builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1196        builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1197
1198        let registry = builder.build_topo_sorted().unwrap();
1199        let client_hub = Arc::new(ClientHub::new());
1200        let cancel = CancellationToken::new();
1201        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1202
1203        let runtime = HostRuntime::new(
1204            registry,
1205            config_provider,
1206            DbOptions::None,
1207            client_hub,
1208            cancel.clone(),
1209            Uuid::new_v4(),
1210            None,
1211        );
1212
1213        // Run stop phase - the deadline token should NOT be cancelled
1214        runtime.run_stop_phase().await.unwrap();
1215
1216        // First, verify stop() was actually called (guards against silent registration failures)
1217        assert!(
1218            module.stop_was_called.load(Ordering::SeqCst),
1219            "stop() was never called - module may not have been registered correctly"
1220        );
1221
1222        // The token should NOT have been cancelled when stop() was called
1223        // This is the key fix: modules get a fresh token, not the already-cancelled root token
1224        assert!(
1225            !module.token_was_cancelled_on_entry.load(Ordering::SeqCst),
1226            "deadline_token should NOT be cancelled when stop() is called - this enables graceful shutdown"
1227        );
1228    }
1229
1230    #[tokio::test]
1231    async fn test_stop_phase_graceful_shutdown_completes_before_deadline() {
1232        use std::sync::atomic::AtomicBool;
1233        use std::time::Duration;
1234
1235        struct GracefulModule {
1236            graceful_completed: AtomicBool,
1237            deadline_fired: AtomicBool,
1238        }
1239
1240        #[async_trait::async_trait]
1241        impl Module for GracefulModule {
1242            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1243                Ok(())
1244            }
1245        }
1246
1247        #[async_trait::async_trait]
1248        impl RunnableCapability for GracefulModule {
1249            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1250                Ok(())
1251            }
1252            async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1253                // Simulate graceful shutdown that completes quickly (10ms)
1254                tokio::select! {
1255                    () = tokio::time::sleep(Duration::from_millis(10)) => {
1256                        self.graceful_completed.store(true, Ordering::SeqCst);
1257                    }
1258                    () = deadline_token.cancelled() => {
1259                        self.deadline_fired.store(true, Ordering::SeqCst);
1260                    }
1261                }
1262                Ok(())
1263            }
1264        }
1265
1266        let module = Arc::new(GracefulModule {
1267            graceful_completed: AtomicBool::new(false),
1268            deadline_fired: AtomicBool::new(false),
1269        });
1270
1271        let mut builder = RegistryBuilder::default();
1272        builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1273        builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1274
1275        let registry = builder.build_topo_sorted().unwrap();
1276        let client_hub = Arc::new(ClientHub::new());
1277        let cancel = CancellationToken::new();
1278        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1279
1280        // Use a long deadline (5s) - module should complete gracefully before this
1281        let runtime = HostRuntime::new(
1282            registry,
1283            config_provider,
1284            DbOptions::None,
1285            client_hub,
1286            cancel.clone(),
1287            Uuid::new_v4(),
1288            None,
1289        )
1290        .with_shutdown_deadline(Duration::from_secs(5));
1291
1292        runtime.run_stop_phase().await.unwrap();
1293
1294        // Graceful shutdown should have completed
1295        assert!(
1296            module.graceful_completed.load(Ordering::SeqCst),
1297            "graceful shutdown should complete"
1298        );
1299        // Deadline should NOT have fired (module finished before deadline)
1300        assert!(
1301            !module.deadline_fired.load(Ordering::SeqCst),
1302            "deadline should not fire when graceful shutdown completes quickly"
1303        );
1304    }
1305
1306    #[tokio::test]
1307    async fn test_stop_phase_deadline_fires_for_slow_module() {
1308        use std::sync::atomic::AtomicBool;
1309        use std::time::Duration;
1310
1311        struct SlowModule {
1312            graceful_completed: AtomicBool,
1313            deadline_fired: AtomicBool,
1314        }
1315
1316        #[async_trait::async_trait]
1317        impl Module for SlowModule {
1318            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1319                Ok(())
1320            }
1321        }
1322
1323        #[async_trait::async_trait]
1324        impl RunnableCapability for SlowModule {
1325            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1326                Ok(())
1327            }
1328            async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1329                // Simulate slow graceful shutdown (would take 10s, but deadline is 100ms)
1330                tokio::select! {
1331                    () = tokio::time::sleep(Duration::from_secs(10)) => {
1332                        self.graceful_completed.store(true, Ordering::SeqCst);
1333                    }
1334                    () = deadline_token.cancelled() => {
1335                        self.deadline_fired.store(true, Ordering::SeqCst);
1336                    }
1337                }
1338                Ok(())
1339            }
1340        }
1341
1342        let module = Arc::new(SlowModule {
1343            graceful_completed: AtomicBool::new(false),
1344            deadline_fired: AtomicBool::new(false),
1345        });
1346
1347        let mut builder = RegistryBuilder::default();
1348        builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1349        builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1350
1351        let registry = builder.build_topo_sorted().unwrap();
1352        let client_hub = Arc::new(ClientHub::new());
1353        let cancel = CancellationToken::new();
1354        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1355
1356        // Use a short deadline (100ms) - module should be interrupted by deadline
1357        let runtime = HostRuntime::new(
1358            registry,
1359            config_provider,
1360            DbOptions::None,
1361            client_hub,
1362            cancel.clone(),
1363            Uuid::new_v4(),
1364            None,
1365        )
1366        .with_shutdown_deadline(Duration::from_millis(100));
1367
1368        runtime.run_stop_phase().await.unwrap();
1369
1370        // Graceful shutdown should NOT have completed (deadline fired first)
1371        assert!(
1372            !module.graceful_completed.load(Ordering::SeqCst),
1373            "graceful shutdown should not complete when deadline fires first"
1374        );
1375        // Deadline should have fired
1376        assert!(
1377            module.deadline_fired.load(Ordering::SeqCst),
1378            "deadline should fire for slow modules"
1379        );
1380    }
1381}