Skip to main content

modkit/runtime/
host_runtime.rs

1//! Host Runtime - orchestrates the full `ModKit` lifecycle
2//!
3//! This module contains the `HostRuntime` type that owns and coordinates
4//! the execution of all lifecycle phases.
5//!
6//! High-level phase order:
7//! - `pre_init` (system modules only)
8//! - DB migrations (modules with DB capability)
9//! - `init` (all modules)
10//! - `post_init` (system modules only; runs after *all* `init` complete)
11//! - REST wiring (modules with REST capability; requires a single REST host)
12//! - gRPC registration (modules with gRPC capability; requires a single gRPC hub)
13//! - start/stop (stateful modules)
14//! - `OoP` spawn / wait / stop (host-only orchestration)
15
16use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28    ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29    SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36/// How the runtime should provide DBs to modules.
37#[derive(Clone)]
38pub enum DbOptions {
39    /// No database integration. `ModuleCtx::db()` will be `None`, `db_required()` will error.
40    None,
41    /// Use a `DbManager` to handle database connections with Figment-based configuration.
42    #[cfg(feature = "db")]
43    Manager(Arc<modkit_db::DbManager>),
44}
45
46/// Runtime execution mode that determines which phases to run.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49    /// Run all phases and wait for shutdown signal (normal application mode).
50    Full,
51    /// Run only pre-init and DB migration phases, then exit (for cloud deployments).
52    MigrateOnly,
53}
54
55/// Environment variable name for passing directory endpoint to `OoP` modules.
56pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58/// Environment variable name for passing rendered module config to `OoP` modules.
59pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61/// `HostRuntime` owns the lifecycle orchestration for `ModKit`.
62///
63/// It encapsulates all runtime state and drives modules through the full lifecycle (see module docs).
64pub struct HostRuntime {
65    registry: ModuleRegistry,
66    ctx_builder: ModuleContextBuilder,
67    instance_id: Uuid,
68    module_manager: Arc<ModuleManager>,
69    grpc_installers: Arc<GrpcInstallerStore>,
70    #[allow(dead_code)]
71    client_hub: Arc<ClientHub>,
72    cancel: CancellationToken,
73    #[allow(dead_code)]
74    db_options: DbOptions,
75    /// `OoP` module spawn configuration and backend
76    oop_options: Option<OopSpawnOptions>,
77}
78
79impl HostRuntime {
80    /// Create a new `HostRuntime` instance.
81    ///
82    /// This prepares all runtime components but does not start any lifecycle phases.
83    pub fn new(
84        registry: ModuleRegistry,
85        modules_cfg: Arc<dyn ConfigProvider>,
86        db_options: DbOptions,
87        client_hub: Arc<ClientHub>,
88        cancel: CancellationToken,
89        instance_id: Uuid,
90        oop_options: Option<OopSpawnOptions>,
91    ) -> Self {
92        // Create runtime-owned components for system modules
93        let module_manager = Arc::new(ModuleManager::new());
94        let grpc_installers = Arc::new(GrpcInstallerStore::new());
95
96        // Build the context builder that will resolve per-module DbHandles
97        let db_manager = match &db_options {
98            #[cfg(feature = "db")]
99            DbOptions::Manager(mgr) => Some(mgr.clone()),
100            DbOptions::None => None,
101        };
102
103        let ctx_builder = ModuleContextBuilder::new(
104            instance_id,
105            modules_cfg,
106            client_hub.clone(),
107            cancel.clone(),
108            db_manager,
109        );
110
111        Self {
112            registry,
113            ctx_builder,
114            instance_id,
115            module_manager,
116            grpc_installers,
117            client_hub,
118            cancel,
119            db_options,
120            oop_options,
121        }
122    }
123
124    /// `PRE_INIT` phase: wire runtime internals into system modules.
125    ///
126    /// This phase runs before init and only for modules with the "system" capability.
127    ///
128    /// # Errors
129    /// Returns `RegistryError` if system wiring fails.
130    pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
131        tracing::info!("Phase: pre_init");
132
133        let sys_ctx = SystemContext::new(
134            self.instance_id,
135            Arc::clone(&self.module_manager),
136            Arc::clone(&self.grpc_installers),
137        );
138
139        for entry in self.registry.modules() {
140            // Check for cancellation before processing each module
141            if self.cancel.is_cancelled() {
142                tracing::warn!("Pre-init phase cancelled by signal");
143                return Err(RegistryError::Cancelled);
144            }
145
146            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
147                tracing::debug!(module = entry.name, "Running system pre_init");
148                sys_mod
149                    .pre_init(&sys_ctx)
150                    .map_err(|e| RegistryError::PreInit {
151                        module: entry.name,
152                        source: e,
153                    })?;
154            }
155        }
156
157        Ok(())
158    }
159
160    /// Helper: resolve context for a module with error mapping.
161    async fn module_context(
162        &self,
163        module_name: &'static str,
164    ) -> Result<crate::context::ModuleCtx, RegistryError> {
165        self.ctx_builder
166            .for_module(module_name)
167            .await
168            .map_err(|e| RegistryError::DbMigrate {
169                module: module_name,
170                source: e,
171            })
172    }
173
174    /// Helper: extract DB handle and module if both exist.
175    #[cfg(feature = "db")]
176    async fn db_migration_target(
177        &self,
178        module_name: &'static str,
179        ctx: &crate::context::ModuleCtx,
180        db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
181    ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
182    {
183        let Some(dbm) = db_module else {
184            return Ok(None);
185        };
186
187        // Important: DB migrations require access to the underlying `Db`, not just `DBProvider`.
188        // `ModuleCtx` intentionally exposes only `DBProvider` for better DX and to reduce mistakes.
189        // So the runtime resolves the `Db` directly from its `DbManager`.
190        let db = match &self.db_options {
191            DbOptions::None => None,
192            #[cfg(feature = "db")]
193            DbOptions::Manager(mgr) => {
194                mgr.get(module_name)
195                    .await
196                    .map_err(|e| RegistryError::DbMigrate {
197                        module: module_name,
198                        source: e.into(),
199                    })?
200            }
201        };
202
203        _ = ctx; // ctx is kept for parity/error context; DB is resolved from manager above.
204        Ok(db.map(|db| (db, dbm)))
205    }
206
207    /// Helper: run migrations for a single module using the new migration runner.
208    ///
209    /// This collects migrations from the module and executes them via the
210    /// runtime's privileged connection. Modules never see the raw connection.
211    #[cfg(feature = "db")]
212    async fn migrate_module(
213        module_name: &'static str,
214        db: &modkit_db::Db,
215        db_module: Arc<dyn crate::contracts::DatabaseCapability>,
216    ) -> Result<(), RegistryError> {
217        // Collect migrations from the module
218        let migrations = db_module.migrations();
219
220        if migrations.is_empty() {
221            tracing::debug!(module = module_name, "No migrations to run");
222            return Ok(());
223        }
224
225        tracing::debug!(
226            module = module_name,
227            count = migrations.len(),
228            "Running DB migrations"
229        );
230
231        // Execute migrations using the migration runner
232        let result =
233            modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
234                .await
235                .map_err(|e| RegistryError::DbMigrate {
236                    module: module_name,
237                    source: anyhow::Error::new(e),
238                })?;
239
240        tracing::info!(
241            module = module_name,
242            applied = result.applied,
243            skipped = result.skipped,
244            "DB migrations completed"
245        );
246
247        Ok(())
248    }
249
250    /// DB MIGRATION phase: run migrations for all modules with DB capability.
251    ///
252    /// Runs before init, with system modules processed first.
253    ///
254    /// Modules provide migrations via `DatabaseCapability::migrations()`.
255    /// The runtime executes them with a privileged connection that modules
256    /// never receive directly. Each module gets a separate migration history
257    /// table, preventing cross-module interference.
258    #[cfg(feature = "db")]
259    async fn run_db_phase(&self) -> Result<(), RegistryError> {
260        tracing::info!("Phase: db (before init)");
261
262        for entry in self.registry.modules_by_system_priority() {
263            // Check for cancellation before processing each module
264            if self.cancel.is_cancelled() {
265                tracing::warn!("DB migration phase cancelled by signal");
266                return Err(RegistryError::Cancelled);
267            }
268
269            let ctx = self.module_context(entry.name).await?;
270            let db_module = entry.caps.query::<DatabaseCap>();
271
272            match self
273                .db_migration_target(entry.name, &ctx, db_module.clone())
274                .await?
275            {
276                Some((db, dbm)) => {
277                    Self::migrate_module(entry.name, &db, dbm).await?;
278                }
279                None if db_module.is_some() => {
280                    tracing::debug!(
281                        module = entry.name,
282                        "Module has DbModule trait but no DB handle (no config)"
283                    );
284                }
285                None => {}
286            }
287        }
288
289        Ok(())
290    }
291
292    /// INIT phase: initialize all modules in topological order.
293    ///
294    /// System modules initialize first, followed by user modules.
295    async fn run_init_phase(&self) -> Result<(), RegistryError> {
296        tracing::info!("Phase: init");
297
298        for entry in self.registry.modules_by_system_priority() {
299            let ctx =
300                self.ctx_builder
301                    .for_module(entry.name)
302                    .await
303                    .map_err(|e| RegistryError::Init {
304                        module: entry.name,
305                        source: e,
306                    })?;
307            entry
308                .core
309                .init(&ctx)
310                .await
311                .map_err(|e| RegistryError::Init {
312                    module: entry.name,
313                    source: e,
314                })?;
315        }
316
317        Ok(())
318    }
319
320    /// `POST_INIT` phase: optional hook after ALL modules completed `init()`.
321    ///
322    /// This provides a global barrier between initialization-time registration
323    /// and subsequent phases that may rely on a fully-populated runtime registry.
324    ///
325    /// System modules run first, followed by user modules, preserving topo order.
326    async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
327        tracing::info!("Phase: post_init");
328
329        let sys_ctx = SystemContext::new(
330            self.instance_id,
331            Arc::clone(&self.module_manager),
332            Arc::clone(&self.grpc_installers),
333        );
334
335        for entry in self.registry.modules_by_system_priority() {
336            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
337                sys_mod
338                    .post_init(&sys_ctx)
339                    .await
340                    .map_err(|e| RegistryError::PostInit {
341                        module: entry.name,
342                        source: e,
343                    })?;
344            }
345        }
346
347        Ok(())
348    }
349
350    /// REST phase: compose the router against the REST host.
351    ///
352    /// This is a synchronous phase that builds the final Router by:
353    /// 1. Preparing the host module
354    /// 2. Registering all REST providers
355    /// 3. Finalizing with `OpenAPI` endpoints
356    async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
357        tracing::info!("Phase: rest (sync)");
358
359        let mut router = Router::new();
360
361        // Find host(s) and whether any rest modules exist
362        let host_count = self
363            .registry
364            .modules()
365            .iter()
366            .filter(|e| e.caps.has::<ApiGatewayCap>())
367            .count();
368
369        match host_count {
370            0 => {
371                return if self
372                    .registry
373                    .modules()
374                    .iter()
375                    .any(|e| e.caps.has::<RestApiCap>())
376                {
377                    Err(RegistryError::RestRequiresHost)
378                } else {
379                    Ok(router)
380                };
381            }
382            1 => { /* proceed */ }
383            _ => return Err(RegistryError::MultipleRestHosts),
384        }
385
386        // Resolve the single host entry and its module context
387        let host_idx = self
388            .registry
389            .modules()
390            .iter()
391            .position(|e| e.caps.has::<ApiGatewayCap>())
392            .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
393        let host_entry = &self.registry.modules()[host_idx];
394        let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
395            return Err(RegistryError::RestHostMissingFromEntry);
396        };
397        let host_ctx = self
398            .ctx_builder
399            .for_module(host_entry.name)
400            .await
401            .map_err(|e| RegistryError::RestPrepare {
402                module: host_entry.name,
403                source: e,
404            })?;
405
406        // use host as the registry
407        let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
408
409        // 1) Host prepare: base Router / global middlewares / basic OAS meta
410        router =
411            host.rest_prepare(&host_ctx, router)
412                .map_err(|source| RegistryError::RestPrepare {
413                    module: host_entry.name,
414                    source,
415                })?;
416
417        // 2) Register all REST providers (in the current discovery order)
418        for e in self.registry.modules() {
419            if let Some(rest) = e.caps.query::<RestApiCap>() {
420                let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
421                    RegistryError::RestRegister {
422                        module: e.name,
423                        source: err,
424                    }
425                })?;
426                router = rest
427                    .register_rest(&ctx, router, registry)
428                    .map_err(|source| RegistryError::RestRegister {
429                        module: e.name,
430                        source,
431                    })?;
432            }
433        }
434
435        // 3) Host finalize: attach /openapi.json and /docs, persist Router if needed (no server start)
436        router = host.rest_finalize(&host_ctx, router).map_err(|source| {
437            RegistryError::RestFinalize {
438                module: host_entry.name,
439                source,
440            }
441        })?;
442
443        Ok(router)
444    }
445
446    /// gRPC registration phase: collect services from all grpc modules.
447    ///
448    /// Services are stored in the installer store for the `grpc-hub` to consume during start.
449    async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
450        tracing::info!("Phase: grpc (registration)");
451
452        // If no grpc_hub and no grpc_services, skip the phase
453        if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
454            return Ok(());
455        }
456
457        // If there are grpc_services but no hub, that's an error
458        if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
459            return Err(RegistryError::GrpcRequiresHub);
460        }
461
462        // If there's a hub, collect all services grouped by module and hand them off to the installer store
463        if let Some(hub_name) = &self.registry.grpc_hub {
464            let mut modules_data = Vec::new();
465            let mut seen = HashSet::new();
466
467            // Collect services from all grpc modules
468            for (module_name, service_module) in &self.registry.grpc_services {
469                let ctx = self
470                    .ctx_builder
471                    .for_module(module_name)
472                    .await
473                    .map_err(|err| RegistryError::GrpcRegister {
474                        module: module_name.clone(),
475                        source: err,
476                    })?;
477
478                let installers =
479                    service_module
480                        .get_grpc_services(&ctx)
481                        .await
482                        .map_err(|source| RegistryError::GrpcRegister {
483                            module: module_name.clone(),
484                            source,
485                        })?;
486
487                for reg in &installers {
488                    if !seen.insert(reg.service_name) {
489                        return Err(RegistryError::GrpcRegister {
490                            module: module_name.clone(),
491                            source: anyhow::anyhow!(
492                                "Duplicate gRPC service name: {}",
493                                reg.service_name
494                            ),
495                        });
496                    }
497                }
498
499                modules_data.push(crate::runtime::ModuleInstallers {
500                    module_name: module_name.clone(),
501                    installers,
502                });
503            }
504
505            self.grpc_installers
506                .set(crate::runtime::GrpcInstallerData {
507                    modules: modules_data,
508                })
509                .map_err(|source| RegistryError::GrpcRegister {
510                    module: hub_name.clone(),
511                    source,
512                })?;
513        }
514
515        Ok(())
516    }
517
518    /// START phase: start all stateful modules.
519    ///
520    /// System modules start first, followed by user modules.
521    async fn run_start_phase(&self) -> Result<(), RegistryError> {
522        tracing::info!("Phase: start");
523
524        for e in self.registry.modules_by_system_priority() {
525            if let Some(s) = e.caps.query::<RunnableCap>() {
526                tracing::debug!(
527                    module = e.name,
528                    is_system = e.caps.has::<SystemCap>(),
529                    "Starting stateful module"
530                );
531                s.start(self.cancel.clone())
532                    .await
533                    .map_err(|source| RegistryError::Start {
534                        module: e.name,
535                        source,
536                    })?;
537                tracing::info!(module = e.name, "Started module");
538            }
539        }
540
541        Ok(())
542    }
543
544    /// Stop a single module, logging errors but continuing execution.
545    async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
546        if let Some(s) = entry.caps.query::<RunnableCap>() {
547            match s.stop(cancel).await {
548                Err(err) => {
549                    tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
550                }
551                _ => {
552                    tracing::info!(module = entry.name, "Stopped module");
553                }
554            }
555        }
556    }
557
558    /// STOP phase: stop all stateful modules in reverse order.
559    ///
560    /// Errors are logged but do not fail the shutdown process.
561    /// Note: `OoP` modules are stopped automatically by the backend when the
562    /// cancellation token is triggered.
563    async fn run_stop_phase(&self) -> Result<(), RegistryError> {
564        tracing::info!("Phase: stop");
565
566        for e in self.registry.modules().iter().rev() {
567            Self::stop_one_module(e, self.cancel.clone()).await;
568        }
569
570        Ok(())
571    }
572
573    /// `OoP` SPAWN phase: spawn out-of-process modules after start phase.
574    ///
575    /// This phase runs after `grpc-hub` is already listening, so we can pass
576    /// the real directory endpoint to `OoP` modules.
577    async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
578        let oop_opts = match &self.oop_options {
579            Some(opts) if !opts.modules.is_empty() => opts,
580            _ => return Ok(()),
581        };
582
583        tracing::info!("Phase: oop_spawn");
584
585        // Wait for grpc_hub to publish its endpoint (it runs async in start phase)
586        let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
587
588        for module_cfg in &oop_opts.modules {
589            // Build environment with directory endpoint and rendered config
590            // Note: User controls --config via execution.args in master config
591            let mut env = module_cfg.env.clone();
592            env.insert(
593                MODKIT_MODULE_CONFIG_ENV.to_owned(),
594                module_cfg.rendered_config_json.clone(),
595            );
596            if let Some(ref endpoint) = directory_endpoint {
597                env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
598            }
599
600            // Use args from execution config as-is (user controls --config via args)
601            let args = module_cfg.args.clone();
602
603            let spawn_config = OopSpawnConfig {
604                module_name: module_cfg.module_name.clone(),
605                binary: module_cfg.binary.clone(),
606                args,
607                env,
608                working_directory: module_cfg.working_directory.clone(),
609            };
610
611            oop_opts
612                .backend
613                .spawn(spawn_config)
614                .await
615                .map_err(|e| RegistryError::OopSpawn {
616                    module: module_cfg.module_name.clone(),
617                    source: e,
618                })?;
619
620            tracing::info!(
621                module = %module_cfg.module_name,
622                directory_endpoint = ?directory_endpoint,
623                "Spawned OoP module via backend"
624            );
625        }
626
627        Ok(())
628    }
629
630    /// Wait for `grpc-hub` to publish its bound endpoint.
631    ///
632    /// Polls the `GrpcHubModule::bound_endpoint()` with a short interval until available or timeout.
633    /// Returns None if no `grpc-hub` is running or if it times out.
634    async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
635        const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
636        const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
637
638        // Find grpc_hub in registry
639        let grpc_hub = self
640            .registry
641            .modules()
642            .iter()
643            .find_map(|e| e.caps.query::<GrpcHubCap>());
644
645        let Some(hub) = grpc_hub else {
646            return None; // No grpc_hub registered
647        };
648
649        let start = std::time::Instant::now();
650
651        loop {
652            if let Some(endpoint) = hub.bound_endpoint() {
653                tracing::debug!(
654                    endpoint = %endpoint,
655                    elapsed_ms = start.elapsed().as_millis(),
656                    "gRPC hub endpoint available"
657                );
658                return Some(endpoint);
659            }
660
661            if start.elapsed() > MAX_WAIT {
662                tracing::warn!("Timed out waiting for gRPC hub to bind");
663                return None;
664            }
665
666            tokio::time::sleep(POLL_INTERVAL).await;
667        }
668    }
669
670    /// Run the full module lifecycle (all phases).
671    ///
672    /// This is the standard entry point for normal application execution.
673    /// It runs all phases from pre-init through shutdown.
674    ///
675    /// # Errors
676    ///
677    /// Returns an error if any module phase fails during execution.
678    pub async fn run_module_phases(self) -> anyhow::Result<()> {
679        self.run_phases_internal(RunMode::Full).await
680    }
681
682    /// Run only the migration phases (pre-init + DB migration).
683    ///
684    /// This is designed for cloud deployment workflows where database migrations
685    /// need to run as a separate step before starting the application.
686    /// The process exits after migrations complete.
687    ///
688    /// # Errors
689    ///
690    /// Returns an error if pre-init or migration phases fail.
691    pub async fn run_migration_phases(self) -> anyhow::Result<()> {
692        self.run_phases_internal(RunMode::MigrateOnly).await
693    }
694
695    /// Internal implementation that runs module phases based on the mode.
696    ///
697    /// This private method contains the actual phase execution logic and is called
698    /// by both `run_module_phases()` and `run_migration_phases()`.
699    ///
700    /// # Modes
701    ///
702    /// - `RunMode::Full`: Executes all phases and waits for shutdown signal
703    /// - `RunMode::MigrateOnly`: Executes only pre-init and DB migration phases, then exits
704    ///
705    /// # Phases (Full Mode)
706    ///
707    /// 1. Pre-init (system modules only)
708    /// 2. DB migration (all modules with database capability)
709    /// 3. Init (all modules)
710    /// 4. Post-init (system modules only)
711    /// 5. REST (modules with REST capability)
712    /// 6. gRPC (modules with gRPC capability)
713    /// 7. Start (runnable modules)
714    /// 8. `OoP` spawn (out-of-process modules)
715    /// 9. Wait for cancellation
716    /// 10. Stop (runnable modules in reverse order)
717    async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
718        // Log execution mode
719        match mode {
720            RunMode::Full => {
721                tracing::info!("Running full lifecycle (all phases)");
722            }
723            RunMode::MigrateOnly => {
724                tracing::info!("Running in migration mode (pre-init + db phases only)");
725            }
726        }
727
728        // 1. Pre-init phase (before init, only for system modules)
729        self.run_pre_init_phase()?;
730
731        // 2. DB migration phase (system modules first)
732        #[cfg(feature = "db")]
733        {
734            self.run_db_phase().await?;
735        }
736        #[cfg(not(feature = "db"))]
737        {
738            // No DB integration in this build.
739        }
740
741        // Exit early if running in migration-only mode
742        if mode == RunMode::MigrateOnly {
743            tracing::info!("Migration phases completed successfully");
744            return Ok(());
745        }
746
747        // 3. Init phase (system modules first)
748        self.run_init_phase().await?;
749
750        // 4. Post-init phase (barrier after ALL init; system modules only)
751        self.run_post_init_phase().await?;
752
753        // 5. REST phase (synchronous router composition)
754        let _router = self.run_rest_phase().await?;
755
756        // 6. gRPC registration phase
757        self.run_grpc_phase().await?;
758
759        // 7. Start phase
760        self.run_start_phase().await?;
761
762        // 8. OoP spawn phase (after grpc_hub is running)
763        self.run_oop_spawn_phase().await?;
764
765        // 9. Wait for cancellation
766        self.cancel.cancelled().await;
767
768        // 10. Stop phase
769        self.run_stop_phase().await?;
770
771        Ok(())
772    }
773}
774
775#[cfg(test)]
776#[cfg_attr(coverage_nightly, coverage(off))]
777mod tests {
778    use super::*;
779    use crate::context::ModuleCtx;
780    use crate::contracts::{Module, RunnableCapability, SystemCapability};
781    use crate::registry::RegistryBuilder;
782    use std::sync::Arc;
783    use std::sync::atomic::{AtomicUsize, Ordering};
784    use tokio::sync::Mutex;
785
786    #[derive(Default)]
787    #[allow(dead_code)]
788    struct DummyCore;
789    #[async_trait::async_trait]
790    impl Module for DummyCore {
791        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
792            Ok(())
793        }
794    }
795
796    struct StopOrderTracker {
797        my_order: usize,
798        stop_order: Arc<AtomicUsize>,
799    }
800
801    impl StopOrderTracker {
802        fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
803            let my_order = counter.fetch_add(1, Ordering::SeqCst);
804            Self {
805                my_order,
806                stop_order,
807            }
808        }
809    }
810
811    #[async_trait::async_trait]
812    impl Module for StopOrderTracker {
813        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
814            Ok(())
815        }
816    }
817
818    #[async_trait::async_trait]
819    impl RunnableCapability for StopOrderTracker {
820        async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
821            Ok(())
822        }
823        async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
824            let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
825            tracing::info!(
826                my_order = self.my_order,
827                stop_order = order,
828                "Module stopped"
829            );
830            Ok(())
831        }
832    }
833
834    #[tokio::test]
835    async fn test_stop_phase_reverse_order() {
836        let counter = Arc::new(AtomicUsize::new(0));
837        let stop_order = Arc::new(AtomicUsize::new(0));
838
839        let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
840        let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
841        let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
842
843        let mut builder = RegistryBuilder::default();
844        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
845        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
846        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
847
848        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
849        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
850        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
851
852        let registry = builder.build_topo_sorted().unwrap();
853
854        // Verify module order is a -> b -> c
855        let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
856        assert_eq!(module_names, vec!["a", "b", "c"]);
857
858        let client_hub = Arc::new(ClientHub::new());
859        let cancel = CancellationToken::new();
860        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
861
862        let runtime = HostRuntime::new(
863            registry,
864            config_provider,
865            DbOptions::None,
866            client_hub,
867            cancel.clone(),
868            Uuid::new_v4(),
869            None,
870        );
871
872        // Run stop phase
873        runtime.run_stop_phase().await.unwrap();
874
875        // Verify modules stopped in reverse order: c (stop_order=0), b (stop_order=1), a (stop_order=2)
876        // Module order is: a=0, b=1, c=2
877        // Stop order should be: c=0, b=1, a=2
878        assert_eq!(stop_order.load(Ordering::SeqCst), 3);
879    }
880
881    #[tokio::test]
882    async fn test_stop_phase_continues_on_error() {
883        struct FailingModule {
884            should_fail: bool,
885            stopped: Arc<AtomicUsize>,
886        }
887
888        #[async_trait::async_trait]
889        impl Module for FailingModule {
890            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
891                Ok(())
892            }
893        }
894
895        #[async_trait::async_trait]
896        impl RunnableCapability for FailingModule {
897            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
898                Ok(())
899            }
900            async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
901                self.stopped.fetch_add(1, Ordering::SeqCst);
902                if self.should_fail {
903                    anyhow::bail!("Intentional failure")
904                }
905                Ok(())
906            }
907        }
908
909        let stopped = Arc::new(AtomicUsize::new(0));
910        let module_a = Arc::new(FailingModule {
911            should_fail: false,
912            stopped: stopped.clone(),
913        });
914        let module_b = Arc::new(FailingModule {
915            should_fail: true,
916            stopped: stopped.clone(),
917        });
918        let module_c = Arc::new(FailingModule {
919            should_fail: false,
920            stopped: stopped.clone(),
921        });
922
923        let mut builder = RegistryBuilder::default();
924        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
925        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
926        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
927
928        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
929        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
930        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
931
932        let registry = builder.build_topo_sorted().unwrap();
933
934        let client_hub = Arc::new(ClientHub::new());
935        let cancel = CancellationToken::new();
936        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
937
938        let runtime = HostRuntime::new(
939            registry,
940            config_provider,
941            DbOptions::None,
942            client_hub,
943            cancel.clone(),
944            Uuid::new_v4(),
945            None,
946        );
947
948        // Run stop phase - should not fail even though module_b fails
949        runtime.run_stop_phase().await.unwrap();
950
951        // All modules should have attempted to stop
952        assert_eq!(stopped.load(Ordering::SeqCst), 3);
953    }
954
955    struct EmptyConfigProvider;
956    impl ConfigProvider for EmptyConfigProvider {
957        fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
958            None
959        }
960    }
961
962    #[tokio::test]
963    async fn test_post_init_runs_after_all_init_and_system_first() {
964        #[derive(Clone)]
965        struct TrackHooks {
966            name: &'static str,
967            events: Arc<Mutex<Vec<String>>>,
968        }
969
970        #[async_trait::async_trait]
971        impl Module for TrackHooks {
972            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
973                self.events.lock().await.push(format!("init:{}", self.name));
974                Ok(())
975            }
976        }
977
978        #[async_trait::async_trait]
979        impl SystemCapability for TrackHooks {
980            fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
981                Ok(())
982            }
983
984            async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
985                self.events
986                    .lock()
987                    .await
988                    .push(format!("post_init:{}", self.name));
989                Ok(())
990            }
991        }
992
993        let events = Arc::new(Mutex::new(Vec::<String>::new()));
994        let sys_a = Arc::new(TrackHooks {
995            name: "sys_a",
996            events: events.clone(),
997        });
998        let user_b = Arc::new(TrackHooks {
999            name: "user_b",
1000            events: events.clone(),
1001        });
1002        let user_c = Arc::new(TrackHooks {
1003            name: "user_c",
1004            events: events.clone(),
1005        });
1006
1007        let mut builder = RegistryBuilder::default();
1008        builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1009        builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1010        builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1011        builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1012
1013        let registry = builder.build_topo_sorted().unwrap();
1014
1015        let client_hub = Arc::new(ClientHub::new());
1016        let cancel = CancellationToken::new();
1017        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1018
1019        let runtime = HostRuntime::new(
1020            registry,
1021            config_provider,
1022            DbOptions::None,
1023            client_hub,
1024            cancel,
1025            Uuid::new_v4(),
1026            None,
1027        );
1028
1029        // Run init phase for all modules, then post_init as a separate barrier phase.
1030        runtime.run_init_phase().await.unwrap();
1031        runtime.run_post_init_phase().await.unwrap();
1032
1033        let events = events.lock().await.clone();
1034        let first_post_init = events
1035            .iter()
1036            .position(|e| e.starts_with("post_init:"))
1037            .expect("expected post_init events");
1038        assert!(
1039            events[..first_post_init]
1040                .iter()
1041                .all(|e| e.starts_with("init:")),
1042            "expected all init events before post_init, got: {events:?}"
1043        );
1044
1045        // system-first order within each phase
1046        assert_eq!(
1047            events,
1048            vec![
1049                "init:sys_a",
1050                "init:user_b",
1051                "init:user_c",
1052                "post_init:sys_a",
1053            ]
1054        );
1055    }
1056}