Skip to main content

modkit/runtime/
host_runtime.rs

1//! Host Runtime - orchestrates the full `ModKit` lifecycle
2//!
3//! This module contains the `HostRuntime` type that owns and coordinates
4//! the execution of all lifecycle phases.
5//!
6//! High-level phase order:
7//! - `pre_init` (system modules only)
8//! - DB migrations (modules with DB capability)
9//! - `init` (all modules)
10//! - `post_init` (system modules only; runs after *all* `init` complete)
11//! - REST wiring (modules with REST capability; requires a single REST host)
12//! - gRPC registration (modules with gRPC capability; requires a single gRPC hub)
13//! - start/stop (stateful modules)
14//! - `OoP` spawn / wait / stop (host-only orchestration)
15
16use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28    ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29    SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36/// How the runtime should provide DBs to modules.
37#[derive(Clone)]
38pub enum DbOptions {
39    /// No database integration. `ModuleCtx::db()` will be `None`, `db_required()` will error.
40    None,
41    /// Use a `DbManager` to handle database connections with Figment-based configuration.
42    #[cfg(feature = "db")]
43    Manager(Arc<modkit_db::DbManager>),
44}
45
46/// Runtime execution mode that determines which phases to run.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49    /// Run all phases and wait for shutdown signal (normal application mode).
50    Full,
51    /// Run only pre-init and DB migration phases, then exit (for cloud deployments).
52    MigrateOnly,
53}
54
55/// Environment variable name for passing directory endpoint to `OoP` modules.
56pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58/// Environment variable name for passing rendered module config to `OoP` modules.
59pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61/// Default shutdown deadline for graceful module stop (35 seconds).
62///
63/// This is intentionally 5 seconds longer than `WithLifecycle::stop_timeout` (30s default)
64/// to ensure deterministic behavior: the lifecycle's internal timeout fires first,
65/// and the runtime deadline acts as a hard backstop.
66pub const DEFAULT_SHUTDOWN_DEADLINE: std::time::Duration = std::time::Duration::from_secs(35);
67
68/// `HostRuntime` owns the lifecycle orchestration for `ModKit`.
69///
70/// It encapsulates all runtime state and drives modules through the full lifecycle (see module docs).
71pub struct HostRuntime {
72    registry: ModuleRegistry,
73    ctx_builder: ModuleContextBuilder,
74    instance_id: Uuid,
75    module_manager: Arc<ModuleManager>,
76    grpc_installers: Arc<GrpcInstallerStore>,
77    #[allow(dead_code)]
78    client_hub: Arc<ClientHub>,
79    cancel: CancellationToken,
80    #[allow(dead_code)]
81    db_options: DbOptions,
82    /// `OoP` module spawn configuration and backend
83    oop_options: Option<OopSpawnOptions>,
84    /// Maximum time allowed for graceful shutdown before hard-stop signal is sent.
85    shutdown_deadline: std::time::Duration,
86}
87
88impl HostRuntime {
89    /// Create a new `HostRuntime` instance.
90    ///
91    /// This prepares all runtime components but does not start any lifecycle phases.
92    pub fn new(
93        registry: ModuleRegistry,
94        modules_cfg: Arc<dyn ConfigProvider>,
95        db_options: DbOptions,
96        client_hub: Arc<ClientHub>,
97        cancel: CancellationToken,
98        instance_id: Uuid,
99        oop_options: Option<OopSpawnOptions>,
100    ) -> Self {
101        // Create runtime-owned components for system modules
102        let module_manager = Arc::new(ModuleManager::new());
103        let grpc_installers = Arc::new(GrpcInstallerStore::new());
104
105        // Build the context builder that will resolve per-module DbHandles
106        let db_manager = match &db_options {
107            #[cfg(feature = "db")]
108            DbOptions::Manager(mgr) => Some(mgr.clone()),
109            DbOptions::None => None,
110        };
111
112        let ctx_builder = ModuleContextBuilder::new(
113            instance_id,
114            modules_cfg,
115            client_hub.clone(),
116            cancel.clone(),
117            db_manager,
118        );
119
120        Self {
121            registry,
122            ctx_builder,
123            instance_id,
124            module_manager,
125            grpc_installers,
126            client_hub,
127            cancel,
128            db_options,
129            oop_options,
130            shutdown_deadline: DEFAULT_SHUTDOWN_DEADLINE,
131        }
132    }
133
134    /// Set a custom shutdown deadline for graceful module stop.
135    ///
136    /// This is the maximum time the runtime will wait for each module to stop gracefully
137    /// before sending the hard-stop signal (cancelling the deadline token).
138    ///
139    /// # Relationship with `WithLifecycle::stop_timeout`
140    ///
141    /// When using `WithLifecycle`, its `stop_timeout` (default 30s) races against this
142    /// `shutdown_deadline` (also default 30s). To ensure deterministic behavior:
143    ///
144    /// - `WithLifecycle::stop_timeout` should be **less than** `shutdown_deadline`
145    /// - This allows the lifecycle's internal timeout to trigger first for graceful cleanup
146    /// - The runtime's `deadline_token` then acts as a hard backstop
147    ///
148    /// Example: `stop_timeout = 25s`, `shutdown_deadline = 30s`
149    #[must_use]
150    pub fn with_shutdown_deadline(mut self, deadline: std::time::Duration) -> Self {
151        self.shutdown_deadline = deadline;
152        self
153    }
154
155    /// `PRE_INIT` phase: wire runtime internals into system modules.
156    ///
157    /// This phase runs before init and only for modules with the "system" capability.
158    ///
159    /// # Errors
160    /// Returns `RegistryError` if system wiring fails.
161    pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
162        tracing::info!("Phase: pre_init");
163
164        let sys_ctx = SystemContext::new(
165            self.instance_id,
166            Arc::clone(&self.module_manager),
167            Arc::clone(&self.grpc_installers),
168        );
169
170        for entry in self.registry.modules() {
171            // Check for cancellation before processing each module
172            if self.cancel.is_cancelled() {
173                tracing::warn!("Pre-init phase cancelled by signal");
174                return Err(RegistryError::Cancelled);
175            }
176
177            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
178                tracing::debug!(module = entry.name, "Running system pre_init");
179                sys_mod
180                    .pre_init(&sys_ctx)
181                    .map_err(|e| RegistryError::PreInit {
182                        module: entry.name,
183                        source: e,
184                    })?;
185            }
186        }
187
188        Ok(())
189    }
190
191    /// Helper: resolve context for a module with error mapping.
192    #[cfg(feature = "db")]
193    async fn module_context(
194        &self,
195        module_name: &'static str,
196    ) -> Result<crate::context::ModuleCtx, RegistryError> {
197        self.ctx_builder
198            .for_module(module_name)
199            .await
200            .map_err(|e| RegistryError::DbMigrate {
201                module: module_name,
202                source: e,
203            })
204    }
205
206    /// Helper: extract DB handle and module if both exist.
207    #[cfg(feature = "db")]
208    async fn db_migration_target(
209        &self,
210        module_name: &'static str,
211        ctx: &crate::context::ModuleCtx,
212        db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
213    ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
214    {
215        let Some(dbm) = db_module else {
216            return Ok(None);
217        };
218
219        // Important: DB migrations require access to the underlying `Db`, not just `DBProvider`.
220        // `ModuleCtx` intentionally exposes only `DBProvider` for better DX and to reduce mistakes.
221        // So the runtime resolves the `Db` directly from its `DbManager`.
222        let db = match &self.db_options {
223            DbOptions::None => None,
224            #[cfg(feature = "db")]
225            DbOptions::Manager(mgr) => {
226                mgr.get(module_name)
227                    .await
228                    .map_err(|e| RegistryError::DbMigrate {
229                        module: module_name,
230                        source: e.into(),
231                    })?
232            }
233        };
234
235        _ = ctx; // ctx is kept for parity/error context; DB is resolved from manager above.
236        Ok(db.map(|db| (db, dbm)))
237    }
238
239    /// Helper: run migrations for a single module using the new migration runner.
240    ///
241    /// This collects migrations from the module and executes them via the
242    /// runtime's privileged connection. Modules never see the raw connection.
243    #[cfg(feature = "db")]
244    async fn migrate_module(
245        module_name: &'static str,
246        db: &modkit_db::Db,
247        db_module: Arc<dyn crate::contracts::DatabaseCapability>,
248    ) -> Result<(), RegistryError> {
249        // Collect migrations from the module
250        let migrations = db_module.migrations();
251
252        if migrations.is_empty() {
253            tracing::debug!(module = module_name, "No migrations to run");
254            return Ok(());
255        }
256
257        tracing::debug!(
258            module = module_name,
259            count = migrations.len(),
260            "Running DB migrations"
261        );
262
263        // Execute migrations using the migration runner
264        let result =
265            modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
266                .await
267                .map_err(|e| RegistryError::DbMigrate {
268                    module: module_name,
269                    source: anyhow::Error::new(e),
270                })?;
271
272        tracing::info!(
273            module = module_name,
274            applied = result.applied,
275            skipped = result.skipped,
276            "DB migrations completed"
277        );
278
279        Ok(())
280    }
281
282    /// DB MIGRATION phase: run migrations for all modules with DB capability.
283    ///
284    /// Runs before init, with system modules processed first.
285    ///
286    /// Modules provide migrations via `DatabaseCapability::migrations()`.
287    /// The runtime executes them with a privileged connection that modules
288    /// never receive directly. Each module gets a separate migration history
289    /// table, preventing cross-module interference.
290    #[cfg(feature = "db")]
291    async fn run_db_phase(&self) -> Result<(), RegistryError> {
292        tracing::info!("Phase: db (before init)");
293
294        for entry in self.registry.modules_by_system_priority() {
295            // Check for cancellation before processing each module
296            if self.cancel.is_cancelled() {
297                tracing::warn!("DB migration phase cancelled by signal");
298                return Err(RegistryError::Cancelled);
299            }
300
301            let ctx = self.module_context(entry.name).await?;
302            let db_module = entry.caps.query::<DatabaseCap>();
303
304            match self
305                .db_migration_target(entry.name, &ctx, db_module.clone())
306                .await?
307            {
308                Some((db, dbm)) => {
309                    Self::migrate_module(entry.name, &db, dbm).await?;
310                }
311                None if db_module.is_some() => {
312                    tracing::debug!(
313                        module = entry.name,
314                        "Module has DbModule trait but no DB handle (no config)"
315                    );
316                }
317                None => {}
318            }
319        }
320
321        Ok(())
322    }
323
324    /// INIT phase: initialize all modules in topological order.
325    ///
326    /// System modules initialize first, followed by user modules.
327    async fn run_init_phase(&self) -> Result<(), RegistryError> {
328        tracing::info!("Phase: init");
329
330        for entry in self.registry.modules_by_system_priority() {
331            let ctx =
332                self.ctx_builder
333                    .for_module(entry.name)
334                    .await
335                    .map_err(|e| RegistryError::Init {
336                        module: entry.name,
337                        source: e,
338                    })?;
339            tracing::info!(module = entry.name, "Initializing a module...");
340            entry
341                .core
342                .init(&ctx)
343                .await
344                .map_err(|e| RegistryError::Init {
345                    module: entry.name,
346                    source: e,
347                })?;
348            tracing::info!(module = entry.name, "Initialized a module.");
349        }
350
351        Ok(())
352    }
353
354    /// `POST_INIT` phase: optional hook after ALL modules completed `init()`.
355    ///
356    /// This provides a global barrier between initialization-time registration
357    /// and subsequent phases that may rely on a fully-populated runtime registry.
358    ///
359    /// System modules run first, followed by user modules, preserving topo order.
360    async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
361        tracing::info!("Phase: post_init");
362
363        let sys_ctx = SystemContext::new(
364            self.instance_id,
365            Arc::clone(&self.module_manager),
366            Arc::clone(&self.grpc_installers),
367        );
368
369        for entry in self.registry.modules_by_system_priority() {
370            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
371                sys_mod
372                    .post_init(&sys_ctx)
373                    .await
374                    .map_err(|e| RegistryError::PostInit {
375                        module: entry.name,
376                        source: e,
377                    })?;
378            }
379        }
380
381        Ok(())
382    }
383
384    /// REST phase: compose the router against the REST host.
385    ///
386    /// This is a synchronous phase that builds the final Router by:
387    /// 1. Preparing the host module
388    /// 2. Registering all REST providers
389    /// 3. Finalizing with `OpenAPI` endpoints
390    async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
391        tracing::info!("Phase: rest (sync)");
392
393        let mut router = Router::new();
394
395        // Find host(s) and whether any rest modules exist
396        let host_count = self
397            .registry
398            .modules()
399            .iter()
400            .filter(|e| e.caps.has::<ApiGatewayCap>())
401            .count();
402
403        match host_count {
404            0 => {
405                return if self
406                    .registry
407                    .modules()
408                    .iter()
409                    .any(|e| e.caps.has::<RestApiCap>())
410                {
411                    Err(RegistryError::RestRequiresHost)
412                } else {
413                    Ok(router)
414                };
415            }
416            1 => { /* proceed */ }
417            _ => return Err(RegistryError::MultipleRestHosts),
418        }
419
420        // Resolve the single host entry and its module context
421        let host_idx = self
422            .registry
423            .modules()
424            .iter()
425            .position(|e| e.caps.has::<ApiGatewayCap>())
426            .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
427        let host_entry = &self.registry.modules()[host_idx];
428        let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
429            return Err(RegistryError::RestHostMissingFromEntry);
430        };
431        let host_ctx = self
432            .ctx_builder
433            .for_module(host_entry.name)
434            .await
435            .map_err(|e| RegistryError::RestPrepare {
436                module: host_entry.name,
437                source: e,
438            })?;
439
440        // use host as the registry
441        let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
442
443        // 1) Host prepare: base Router / global middlewares / basic OAS meta
444        router =
445            host.rest_prepare(&host_ctx, router)
446                .map_err(|source| RegistryError::RestPrepare {
447                    module: host_entry.name,
448                    source,
449                })?;
450
451        // 2) Register all REST providers (in the current discovery order)
452        for e in self.registry.modules() {
453            if let Some(rest) = e.caps.query::<RestApiCap>() {
454                let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
455                    RegistryError::RestRegister {
456                        module: e.name,
457                        source: err,
458                    }
459                })?;
460
461                router = rest
462                    .register_rest(&ctx, router, registry)
463                    .map_err(|source| RegistryError::RestRegister {
464                        module: e.name,
465                        source,
466                    })?;
467            }
468        }
469
470        // 3) Host finalize: attach /openapi.json and /docs, persist Router if needed (no server start)
471        router = host.rest_finalize(&host_ctx, router).map_err(|source| {
472            RegistryError::RestFinalize {
473                module: host_entry.name,
474                source,
475            }
476        })?;
477
478        Ok(router)
479    }
480
481    /// gRPC registration phase: collect services from all grpc modules.
482    ///
483    /// Services are stored in the installer store for the `grpc-hub` to consume during start.
484    async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
485        tracing::info!("Phase: grpc (registration)");
486
487        // If no grpc_hub and no grpc_services, skip the phase
488        if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
489            return Ok(());
490        }
491
492        // If there are grpc_services but no hub, that's an error
493        if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
494            return Err(RegistryError::GrpcRequiresHub);
495        }
496
497        // If there's a hub, collect all services grouped by module and hand them off to the installer store
498        if let Some(hub_name) = &self.registry.grpc_hub {
499            let mut modules_data = Vec::new();
500            let mut seen = HashSet::new();
501
502            // Collect services from all grpc modules
503            for (module_name, service_module) in &self.registry.grpc_services {
504                let ctx = self
505                    .ctx_builder
506                    .for_module(module_name)
507                    .await
508                    .map_err(|err| RegistryError::GrpcRegister {
509                        module: module_name.clone(),
510                        source: err,
511                    })?;
512
513                let installers =
514                    service_module
515                        .get_grpc_services(&ctx)
516                        .await
517                        .map_err(|source| RegistryError::GrpcRegister {
518                            module: module_name.clone(),
519                            source,
520                        })?;
521
522                for reg in &installers {
523                    if !seen.insert(reg.service_name) {
524                        return Err(RegistryError::GrpcRegister {
525                            module: module_name.clone(),
526                            source: anyhow::anyhow!(
527                                "Duplicate gRPC service name: {}",
528                                reg.service_name
529                            ),
530                        });
531                    }
532                }
533
534                modules_data.push(crate::runtime::ModuleInstallers {
535                    module_name: module_name.clone(),
536                    installers,
537                });
538            }
539
540            self.grpc_installers
541                .set(crate::runtime::GrpcInstallerData {
542                    modules: modules_data,
543                })
544                .map_err(|source| RegistryError::GrpcRegister {
545                    module: hub_name.clone(),
546                    source,
547                })?;
548        }
549
550        Ok(())
551    }
552
553    /// START phase: start all stateful modules.
554    ///
555    /// System modules start first, followed by user modules.
556    async fn run_start_phase(&self) -> Result<(), RegistryError> {
557        tracing::info!("Phase: start");
558
559        for e in self.registry.modules_by_system_priority() {
560            if let Some(s) = e.caps.query::<RunnableCap>() {
561                tracing::debug!(
562                    module = e.name,
563                    is_system = e.caps.has::<SystemCap>(),
564                    "Starting stateful module"
565                );
566                s.start(self.cancel.clone())
567                    .await
568                    .map_err(|source| RegistryError::Start {
569                        module: e.name,
570                        source,
571                    })?;
572                tracing::info!(module = e.name, "Started module");
573            }
574        }
575
576        Ok(())
577    }
578
579    /// Stop a single module, logging errors but continuing execution.
580    async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
581        if let Some(s) = entry.caps.query::<RunnableCap>() {
582            match s.stop(cancel).await {
583                Err(err) => {
584                    tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
585                }
586                _ => {
587                    tracing::info!(module = entry.name, "Stopped module");
588                }
589            }
590        }
591    }
592
593    /// STOP phase: stop all stateful modules in reverse order.
594    ///
595    /// # Two-Phase Shutdown Contract
596    ///
597    /// This phase implements a proper two-phase shutdown for **each module**:
598    ///
599    /// 1. **Graceful stop request**: Each module's `stop(deadline_token)` is called with a
600    ///    *fresh* cancellation token (not the already-cancelled root token). Modules should
601    ///    interpret this as "please stop gracefully".
602    ///
603    /// 2. **Hard-stop deadline**: After `shutdown_deadline` expires **for that module**,
604    ///    its `deadline_token` is cancelled. Modules should interpret this as "abort immediately".
605    ///
606    /// Each module gets its own independent deadline — if module A takes 25s to stop,
607    /// module B still gets the full `shutdown_deadline` for its graceful shutdown.
608    ///
609    /// This allows modules to implement real graceful shutdown:
610    /// - Request cooperative shutdown of child tasks
611    /// - Wait for them to finish gracefully
612    /// - If `deadline_token` fires, switch to hard-abort mode
613    ///
614    /// Errors are logged but do not fail the shutdown process.
615    /// Note: `OoP` modules are stopped automatically by the backend when the
616    /// cancellation token is triggered.
617    async fn run_stop_phase(&self) -> Result<(), RegistryError> {
618        tracing::info!("Phase: stop");
619
620        let deadline = self.shutdown_deadline;
621
622        // Stop all modules in reverse order, each with its own independent deadline
623        for e in self.registry.modules().iter().rev() {
624            let module_name = e.name;
625
626            // Create a fresh deadline token for THIS module
627            // Each module gets the full shutdown_deadline independently
628            let deadline_token = CancellationToken::new();
629            let deadline_token_for_timeout = deadline_token.clone();
630
631            // Spawn a task to cancel this module's deadline token after shutdown_deadline
632            let deadline_task = tokio::spawn(async move {
633                tokio::time::sleep(deadline).await;
634                tracing::warn!(
635                    module = module_name,
636                    deadline_secs = deadline.as_secs(),
637                    "Module shutdown deadline reached, sending hard-stop signal"
638                );
639                deadline_token_for_timeout.cancel();
640            });
641
642            // Stop this module with its own deadline token
643            // The module can observe the token transition from uncancelled→cancelled
644            Self::stop_one_module(e, deadline_token).await;
645
646            // Cancel the deadline task and await it to ensure full cleanup
647            deadline_task.abort();
648            #[allow(clippy::let_underscore_must_use)]
649            let _ = deadline_task.await;
650        }
651
652        Ok(())
653    }
654
655    /// `OoP` SPAWN phase: spawn out-of-process modules after start phase.
656    ///
657    /// This phase runs after `grpc-hub` is already listening, so we can pass
658    /// the real directory endpoint to `OoP` modules.
659    async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
660        let oop_opts = match &self.oop_options {
661            Some(opts) if !opts.modules.is_empty() => opts,
662            _ => return Ok(()),
663        };
664
665        tracing::info!("Phase: oop_spawn");
666
667        // Wait for grpc_hub to publish its endpoint (it runs async in start phase)
668        let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
669
670        for module_cfg in &oop_opts.modules {
671            // Build environment with directory endpoint and rendered config
672            // Note: User controls --config via execution.args in master config
673            let mut env = module_cfg.env.clone();
674            env.insert(
675                MODKIT_MODULE_CONFIG_ENV.to_owned(),
676                module_cfg.rendered_config_json.clone(),
677            );
678            if let Some(ref endpoint) = directory_endpoint {
679                env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
680            }
681
682            // Use args from execution config as-is (user controls --config via args)
683            let args = module_cfg.args.clone();
684
685            let spawn_config = OopSpawnConfig {
686                module_name: module_cfg.module_name.clone(),
687                binary: module_cfg.binary.clone(),
688                args,
689                env,
690                working_directory: module_cfg.working_directory.clone(),
691            };
692
693            oop_opts
694                .backend
695                .spawn(spawn_config)
696                .await
697                .map_err(|e| RegistryError::OopSpawn {
698                    module: module_cfg.module_name.clone(),
699                    source: e,
700                })?;
701
702            tracing::info!(
703                module = %module_cfg.module_name,
704                directory_endpoint = ?directory_endpoint,
705                "Spawned OoP module via backend"
706            );
707        }
708
709        Ok(())
710    }
711
712    /// Wait for `grpc-hub` to publish its bound endpoint.
713    ///
714    /// Polls the `GrpcHubModule::bound_endpoint()` with a short interval until available or timeout.
715    /// Returns None if no `grpc-hub` is running or if it times out.
716    async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
717        const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
718        const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
719
720        // Find grpc_hub in registry
721        let grpc_hub = self
722            .registry
723            .modules()
724            .iter()
725            .find_map(|e| e.caps.query::<GrpcHubCap>());
726
727        let Some(hub) = grpc_hub else {
728            return None; // No grpc_hub registered
729        };
730
731        let start = std::time::Instant::now();
732
733        loop {
734            if let Some(endpoint) = hub.bound_endpoint() {
735                tracing::debug!(
736                    endpoint = %endpoint,
737                    elapsed_ms = start.elapsed().as_millis(),
738                    "gRPC hub endpoint available"
739                );
740                return Some(endpoint);
741            }
742
743            if start.elapsed() > MAX_WAIT {
744                tracing::warn!("Timed out waiting for gRPC hub to bind");
745                return None;
746            }
747
748            tokio::time::sleep(POLL_INTERVAL).await;
749        }
750    }
751
752    /// Run the full module lifecycle (all phases).
753    ///
754    /// This is the standard entry point for normal application execution.
755    /// It runs all phases from pre-init through shutdown.
756    ///
757    /// # Errors
758    ///
759    /// Returns an error if any module phase fails during execution.
760    pub async fn run_module_phases(self) -> anyhow::Result<()> {
761        self.run_phases_internal(RunMode::Full).await
762    }
763
764    /// Run only the migration phases (pre-init + DB migration).
765    ///
766    /// This is designed for cloud deployment workflows where database migrations
767    /// need to run as a separate step before starting the application.
768    /// The process exits after migrations complete.
769    ///
770    /// # Errors
771    ///
772    /// Returns an error if pre-init or migration phases fail.
773    pub async fn run_migration_phases(self) -> anyhow::Result<()> {
774        self.run_phases_internal(RunMode::MigrateOnly).await
775    }
776
777    /// Internal implementation that runs module phases based on the mode.
778    ///
779    /// This private method contains the actual phase execution logic and is called
780    /// by both `run_module_phases()` and `run_migration_phases()`.
781    ///
782    /// # Modes
783    ///
784    /// - `RunMode::Full`: Executes all phases and waits for shutdown signal
785    /// - `RunMode::MigrateOnly`: Executes only pre-init and DB migration phases, then exits
786    ///
787    /// # Phases (Full Mode)
788    ///
789    /// 1. Pre-init (system modules only)
790    /// 2. DB migration (all modules with database capability)
791    /// 3. Init (all modules)
792    /// 4. Post-init (system modules only)
793    /// 5. REST (modules with REST capability)
794    /// 6. gRPC (modules with gRPC capability)
795    /// 7. Start (runnable modules)
796    /// 8. `OoP` spawn (out-of-process modules)
797    /// 9. Wait for cancellation
798    /// 10. Stop (runnable modules in reverse order)
799    async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
800        // Log execution mode
801        match mode {
802            RunMode::Full => {
803                tracing::info!("Running full lifecycle (all phases)");
804            }
805            RunMode::MigrateOnly => {
806                tracing::info!("Running in migration mode (pre-init + db phases only)");
807            }
808        }
809
810        // 1. Pre-init phase (before init, only for system modules)
811        self.run_pre_init_phase()?;
812
813        // 2. DB migration phase (system modules first)
814        #[cfg(feature = "db")]
815        {
816            self.run_db_phase().await?;
817        }
818        #[cfg(not(feature = "db"))]
819        {
820            // No DB integration in this build.
821        }
822
823        // Exit early if running in migration-only mode
824        if mode == RunMode::MigrateOnly {
825            tracing::info!("Migration phases completed successfully");
826            return Ok(());
827        }
828
829        // 3. Init phase (system modules first)
830        self.run_init_phase().await?;
831
832        // 4. Post-init phase (barrier after ALL init; system modules only)
833        self.run_post_init_phase().await?;
834
835        // 5. REST phase (synchronous router composition)
836        let _router = self.run_rest_phase().await?;
837
838        // 6. gRPC registration phase
839        self.run_grpc_phase().await?;
840
841        // 7. Start phase
842        self.run_start_phase().await?;
843
844        // 8. OoP spawn phase (after grpc_hub is running)
845        self.run_oop_spawn_phase().await?;
846
847        // 9. Wait for cancellation
848        self.cancel.cancelled().await;
849
850        // 10. Stop phase with hard timeout.
851        //     Blocking syscalls (e.g. libc getaddrinfo in tokio spawn_blocking)
852        //     can saturate all tokio worker threads, preventing tokio timers
853        //     from firing. Use an OS thread so the watchdog works even when
854        //     the tokio runtime is fully blocked.
855        let stop_timeout = std::time::Duration::from_secs(15);
856        let disarm = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
857        let disarm_clone = std::sync::Arc::clone(&disarm);
858        std::thread::spawn(move || {
859            std::thread::sleep(stop_timeout);
860            if !disarm_clone.load(std::sync::atomic::Ordering::Relaxed) {
861                tracing::warn!(
862                    timeout_secs = stop_timeout.as_secs(),
863                    "shutdown: stop phase timed out, force exiting"
864                );
865                std::process::exit(1);
866            }
867        });
868
869        self.run_stop_phase().await?;
870        disarm.store(true, std::sync::atomic::Ordering::Relaxed);
871
872        Ok(())
873    }
874}
875
876#[cfg(test)]
877#[cfg_attr(coverage_nightly, coverage(off))]
878mod tests {
879    use super::*;
880    use crate::context::ModuleCtx;
881    use crate::contracts::{Module, RunnableCapability, SystemCapability};
882    use crate::registry::RegistryBuilder;
883    use std::sync::Arc;
884    use std::sync::atomic::{AtomicUsize, Ordering};
885    use tokio::sync::Mutex;
886
887    #[derive(Default)]
888    #[allow(dead_code)]
889    struct DummyCore;
890    #[async_trait::async_trait]
891    impl Module for DummyCore {
892        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
893            Ok(())
894        }
895    }
896
897    struct StopOrderTracker {
898        my_order: usize,
899        stop_order: Arc<AtomicUsize>,
900    }
901
902    impl StopOrderTracker {
903        fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
904            let my_order = counter.fetch_add(1, Ordering::SeqCst);
905            Self {
906                my_order,
907                stop_order,
908            }
909        }
910    }
911
912    #[async_trait::async_trait]
913    impl Module for StopOrderTracker {
914        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
915            Ok(())
916        }
917    }
918
919    #[async_trait::async_trait]
920    impl RunnableCapability for StopOrderTracker {
921        async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
922            Ok(())
923        }
924        async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
925            let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
926            tracing::info!(
927                my_order = self.my_order,
928                stop_order = order,
929                "Module stopped"
930            );
931            Ok(())
932        }
933    }
934
935    #[tokio::test]
936    async fn test_stop_phase_reverse_order() {
937        let counter = Arc::new(AtomicUsize::new(0));
938        let stop_order = Arc::new(AtomicUsize::new(0));
939
940        let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
941        let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
942        let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
943
944        let mut builder = RegistryBuilder::default();
945        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
946        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
947        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
948
949        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
950        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
951        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
952
953        let registry = builder.build_topo_sorted().unwrap();
954
955        // Verify module order is a -> b -> c
956        let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
957        assert_eq!(module_names, vec!["a", "b", "c"]);
958
959        let client_hub = Arc::new(ClientHub::new());
960        let cancel = CancellationToken::new();
961        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
962
963        let runtime = HostRuntime::new(
964            registry,
965            config_provider,
966            DbOptions::None,
967            client_hub,
968            cancel.clone(),
969            Uuid::new_v4(),
970            None,
971        );
972
973        // Run stop phase
974        runtime.run_stop_phase().await.unwrap();
975
976        // Verify modules stopped in reverse order: c (stop_order=0), b (stop_order=1), a (stop_order=2)
977        // Module order is: a=0, b=1, c=2
978        // Stop order should be: c=0, b=1, a=2
979        assert_eq!(stop_order.load(Ordering::SeqCst), 3);
980    }
981
982    #[tokio::test]
983    async fn test_stop_phase_continues_on_error() {
984        struct FailingModule {
985            should_fail: bool,
986            stopped: Arc<AtomicUsize>,
987        }
988
989        #[async_trait::async_trait]
990        impl Module for FailingModule {
991            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
992                Ok(())
993            }
994        }
995
996        #[async_trait::async_trait]
997        impl RunnableCapability for FailingModule {
998            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
999                Ok(())
1000            }
1001            async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1002                self.stopped.fetch_add(1, Ordering::SeqCst);
1003                if self.should_fail {
1004                    anyhow::bail!("Intentional failure")
1005                }
1006                Ok(())
1007            }
1008        }
1009
1010        let stopped = Arc::new(AtomicUsize::new(0));
1011        let module_a = Arc::new(FailingModule {
1012            should_fail: false,
1013            stopped: stopped.clone(),
1014        });
1015        let module_b = Arc::new(FailingModule {
1016            should_fail: true,
1017            stopped: stopped.clone(),
1018        });
1019        let module_c = Arc::new(FailingModule {
1020            should_fail: false,
1021            stopped: stopped.clone(),
1022        });
1023
1024        let mut builder = RegistryBuilder::default();
1025        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
1026        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
1027        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
1028
1029        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
1030        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
1031        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
1032
1033        let registry = builder.build_topo_sorted().unwrap();
1034
1035        let client_hub = Arc::new(ClientHub::new());
1036        let cancel = CancellationToken::new();
1037        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1038
1039        let runtime = HostRuntime::new(
1040            registry,
1041            config_provider,
1042            DbOptions::None,
1043            client_hub,
1044            cancel.clone(),
1045            Uuid::new_v4(),
1046            None,
1047        );
1048
1049        // Run stop phase - should not fail even though module_b fails
1050        runtime.run_stop_phase().await.unwrap();
1051
1052        // All modules should have attempted to stop
1053        assert_eq!(stopped.load(Ordering::SeqCst), 3);
1054    }
1055
1056    struct EmptyConfigProvider;
1057    impl ConfigProvider for EmptyConfigProvider {
1058        fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
1059            None
1060        }
1061    }
1062
1063    #[tokio::test]
1064    async fn test_post_init_runs_after_all_init_and_system_first() {
1065        #[derive(Clone)]
1066        struct TrackHooks {
1067            name: &'static str,
1068            events: Arc<Mutex<Vec<String>>>,
1069        }
1070
1071        #[async_trait::async_trait]
1072        impl Module for TrackHooks {
1073            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1074                self.events.lock().await.push(format!("init:{}", self.name));
1075                Ok(())
1076            }
1077        }
1078
1079        #[async_trait::async_trait]
1080        impl SystemCapability for TrackHooks {
1081            fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1082                Ok(())
1083            }
1084
1085            async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1086                self.events
1087                    .lock()
1088                    .await
1089                    .push(format!("post_init:{}", self.name));
1090                Ok(())
1091            }
1092        }
1093
1094        let events = Arc::new(Mutex::new(Vec::<String>::new()));
1095        let sys_a = Arc::new(TrackHooks {
1096            name: "sys_a",
1097            events: events.clone(),
1098        });
1099        let user_b = Arc::new(TrackHooks {
1100            name: "user_b",
1101            events: events.clone(),
1102        });
1103        let user_c = Arc::new(TrackHooks {
1104            name: "user_c",
1105            events: events.clone(),
1106        });
1107
1108        let mut builder = RegistryBuilder::default();
1109        builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1110        builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1111        builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1112        builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1113
1114        let registry = builder.build_topo_sorted().unwrap();
1115
1116        let client_hub = Arc::new(ClientHub::new());
1117        let cancel = CancellationToken::new();
1118        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1119
1120        let runtime = HostRuntime::new(
1121            registry,
1122            config_provider,
1123            DbOptions::None,
1124            client_hub,
1125            cancel,
1126            Uuid::new_v4(),
1127            None,
1128        );
1129
1130        // Run init phase for all modules, then post_init as a separate barrier phase.
1131        runtime.run_init_phase().await.unwrap();
1132        runtime.run_post_init_phase().await.unwrap();
1133
1134        let events = events.lock().await.clone();
1135        let first_post_init = events
1136            .iter()
1137            .position(|e| e.starts_with("post_init:"))
1138            .expect("expected post_init events");
1139        assert!(
1140            events[..first_post_init]
1141                .iter()
1142                .all(|e| e.starts_with("init:")),
1143            "expected all init events before post_init, got: {events:?}"
1144        );
1145
1146        // system-first order within each phase
1147        assert_eq!(
1148            events,
1149            vec![
1150                "init:sys_a",
1151                "init:user_b",
1152                "init:user_c",
1153                "post_init:sys_a",
1154            ]
1155        );
1156    }
1157
1158    #[tokio::test]
1159    async fn test_stop_phase_provides_fresh_deadline_token() {
1160        use std::sync::atomic::AtomicBool;
1161
1162        struct TokenCheckModule {
1163            stop_was_called: AtomicBool,
1164            token_was_cancelled_on_entry: AtomicBool,
1165        }
1166
1167        #[async_trait::async_trait]
1168        impl Module for TokenCheckModule {
1169            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1170                Ok(())
1171            }
1172        }
1173
1174        #[async_trait::async_trait]
1175        impl RunnableCapability for TokenCheckModule {
1176            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1177                Ok(())
1178            }
1179            async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1180                // Record that stop() was called
1181                self.stop_was_called.store(true, Ordering::SeqCst);
1182                // Record whether the token was already cancelled when stop() was called
1183                self.token_was_cancelled_on_entry
1184                    .store(deadline_token.is_cancelled(), Ordering::SeqCst);
1185                Ok(())
1186            }
1187        }
1188
1189        let module = Arc::new(TokenCheckModule {
1190            stop_was_called: AtomicBool::new(false),
1191            // Default to true to detect if stop() was never called
1192            token_was_cancelled_on_entry: AtomicBool::new(true),
1193        });
1194
1195        let mut builder = RegistryBuilder::default();
1196        builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1197        builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1198
1199        let registry = builder.build_topo_sorted().unwrap();
1200        let client_hub = Arc::new(ClientHub::new());
1201        let cancel = CancellationToken::new();
1202        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1203
1204        let runtime = HostRuntime::new(
1205            registry,
1206            config_provider,
1207            DbOptions::None,
1208            client_hub,
1209            cancel.clone(),
1210            Uuid::new_v4(),
1211            None,
1212        );
1213
1214        // Run stop phase - the deadline token should NOT be cancelled
1215        runtime.run_stop_phase().await.unwrap();
1216
1217        // First, verify stop() was actually called (guards against silent registration failures)
1218        assert!(
1219            module.stop_was_called.load(Ordering::SeqCst),
1220            "stop() was never called - module may not have been registered correctly"
1221        );
1222
1223        // The token should NOT have been cancelled when stop() was called
1224        // This is the key fix: modules get a fresh token, not the already-cancelled root token
1225        assert!(
1226            !module.token_was_cancelled_on_entry.load(Ordering::SeqCst),
1227            "deadline_token should NOT be cancelled when stop() is called - this enables graceful shutdown"
1228        );
1229    }
1230
1231    #[tokio::test]
1232    async fn test_stop_phase_graceful_shutdown_completes_before_deadline() {
1233        use std::sync::atomic::AtomicBool;
1234        use std::time::Duration;
1235
1236        struct GracefulModule {
1237            graceful_completed: AtomicBool,
1238            deadline_fired: AtomicBool,
1239        }
1240
1241        #[async_trait::async_trait]
1242        impl Module for GracefulModule {
1243            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1244                Ok(())
1245            }
1246        }
1247
1248        #[async_trait::async_trait]
1249        impl RunnableCapability for GracefulModule {
1250            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1251                Ok(())
1252            }
1253            async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1254                // Simulate graceful shutdown that completes quickly (10ms)
1255                tokio::select! {
1256                    () = tokio::time::sleep(Duration::from_millis(10)) => {
1257                        self.graceful_completed.store(true, Ordering::SeqCst);
1258                    }
1259                    () = deadline_token.cancelled() => {
1260                        self.deadline_fired.store(true, Ordering::SeqCst);
1261                    }
1262                }
1263                Ok(())
1264            }
1265        }
1266
1267        let module = Arc::new(GracefulModule {
1268            graceful_completed: AtomicBool::new(false),
1269            deadline_fired: AtomicBool::new(false),
1270        });
1271
1272        let mut builder = RegistryBuilder::default();
1273        builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1274        builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1275
1276        let registry = builder.build_topo_sorted().unwrap();
1277        let client_hub = Arc::new(ClientHub::new());
1278        let cancel = CancellationToken::new();
1279        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1280
1281        // Use a long deadline (5s) - module should complete gracefully before this
1282        let runtime = HostRuntime::new(
1283            registry,
1284            config_provider,
1285            DbOptions::None,
1286            client_hub,
1287            cancel.clone(),
1288            Uuid::new_v4(),
1289            None,
1290        )
1291        .with_shutdown_deadline(Duration::from_secs(5));
1292
1293        runtime.run_stop_phase().await.unwrap();
1294
1295        // Graceful shutdown should have completed
1296        assert!(
1297            module.graceful_completed.load(Ordering::SeqCst),
1298            "graceful shutdown should complete"
1299        );
1300        // Deadline should NOT have fired (module finished before deadline)
1301        assert!(
1302            !module.deadline_fired.load(Ordering::SeqCst),
1303            "deadline should not fire when graceful shutdown completes quickly"
1304        );
1305    }
1306
1307    #[tokio::test]
1308    async fn test_stop_phase_deadline_fires_for_slow_module() {
1309        use std::sync::atomic::AtomicBool;
1310        use std::time::Duration;
1311
1312        struct SlowModule {
1313            graceful_completed: AtomicBool,
1314            deadline_fired: AtomicBool,
1315        }
1316
1317        #[async_trait::async_trait]
1318        impl Module for SlowModule {
1319            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1320                Ok(())
1321            }
1322        }
1323
1324        #[async_trait::async_trait]
1325        impl RunnableCapability for SlowModule {
1326            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1327                Ok(())
1328            }
1329            async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1330                // Simulate slow graceful shutdown (would take 10s, but deadline is 100ms)
1331                tokio::select! {
1332                    () = tokio::time::sleep(Duration::from_secs(10)) => {
1333                        self.graceful_completed.store(true, Ordering::SeqCst);
1334                    }
1335                    () = deadline_token.cancelled() => {
1336                        self.deadline_fired.store(true, Ordering::SeqCst);
1337                    }
1338                }
1339                Ok(())
1340            }
1341        }
1342
1343        let module = Arc::new(SlowModule {
1344            graceful_completed: AtomicBool::new(false),
1345            deadline_fired: AtomicBool::new(false),
1346        });
1347
1348        let mut builder = RegistryBuilder::default();
1349        builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1350        builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1351
1352        let registry = builder.build_topo_sorted().unwrap();
1353        let client_hub = Arc::new(ClientHub::new());
1354        let cancel = CancellationToken::new();
1355        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1356
1357        // Use a short deadline (100ms) - module should be interrupted by deadline
1358        let runtime = HostRuntime::new(
1359            registry,
1360            config_provider,
1361            DbOptions::None,
1362            client_hub,
1363            cancel.clone(),
1364            Uuid::new_v4(),
1365            None,
1366        )
1367        .with_shutdown_deadline(Duration::from_millis(100));
1368
1369        runtime.run_stop_phase().await.unwrap();
1370
1371        // Graceful shutdown should NOT have completed (deadline fired first)
1372        assert!(
1373            !module.graceful_completed.load(Ordering::SeqCst),
1374            "graceful shutdown should not complete when deadline fires first"
1375        );
1376        // Deadline should have fired
1377        assert!(
1378            module.deadline_fired.load(Ordering::SeqCst),
1379            "deadline should fire for slow modules"
1380        );
1381    }
1382}