Skip to main content

modkit/runtime/
host_runtime.rs

1//! Host Runtime - orchestrates the full `ModKit` lifecycle
2//!
3//! This module contains the `HostRuntime` type that owns and coordinates
4//! the execution of all lifecycle phases.
5//!
6//! High-level phase order:
7//! - `pre_init` (system modules only)
8//! - DB migrations (modules with DB capability)
9//! - `init` (all modules)
10//! - `post_init` (system modules only; runs after *all* `init` complete)
11//! - REST wiring (modules with REST capability; requires a single REST host)
12//! - gRPC registration (modules with gRPC capability; requires a single gRPC hub)
13//! - start/stop (stateful modules)
14//! - `OoP` spawn / wait / stop (host-only orchestration)
15
16use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19use tokio_util::sync::CancellationToken;
20use uuid::Uuid;
21
22use crate::backends::OopSpawnConfig;
23use crate::client_hub::ClientHub;
24use crate::config::ConfigProvider;
25use crate::context::ModuleContextBuilder;
26use crate::registry::{
27    ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
28    SystemCap,
29};
30use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
31
32#[cfg(feature = "db")]
33use crate::registry::DatabaseCap;
34
35/// How the runtime should provide DBs to modules.
36#[derive(Clone)]
37pub enum DbOptions {
38    /// No database integration. `ModuleCtx::db()` will be `None`, `db_required()` will error.
39    None,
40    /// Use a `DbManager` to handle database connections with Figment-based configuration.
41    #[cfg(feature = "db")]
42    Manager(Arc<modkit_db::DbManager>),
43}
44
45/// Environment variable name for passing directory endpoint to `OoP` modules.
46pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
47
48/// Environment variable name for passing rendered module config to `OoP` modules.
49pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
50
51/// `HostRuntime` owns the lifecycle orchestration for `ModKit`.
52///
53/// It encapsulates all runtime state and drives modules through the full lifecycle (see module docs).
54pub struct HostRuntime {
55    registry: ModuleRegistry,
56    ctx_builder: ModuleContextBuilder,
57    instance_id: Uuid,
58    module_manager: Arc<ModuleManager>,
59    grpc_installers: Arc<GrpcInstallerStore>,
60    #[allow(dead_code)]
61    client_hub: Arc<ClientHub>,
62    cancel: CancellationToken,
63    #[allow(dead_code)]
64    db_options: DbOptions,
65    /// `OoP` module spawn configuration and backend
66    oop_options: Option<OopSpawnOptions>,
67}
68
69impl HostRuntime {
70    /// Create a new `HostRuntime` instance.
71    ///
72    /// This prepares all runtime components but does not start any lifecycle phases.
73    pub fn new(
74        registry: ModuleRegistry,
75        modules_cfg: Arc<dyn ConfigProvider>,
76        db_options: DbOptions,
77        client_hub: Arc<ClientHub>,
78        cancel: CancellationToken,
79        instance_id: Uuid,
80        oop_options: Option<OopSpawnOptions>,
81    ) -> Self {
82        // Create runtime-owned components for system modules
83        let module_manager = Arc::new(ModuleManager::new());
84        let grpc_installers = Arc::new(GrpcInstallerStore::new());
85
86        // Build the context builder that will resolve per-module DbHandles
87        let db_manager = match &db_options {
88            #[cfg(feature = "db")]
89            DbOptions::Manager(mgr) => Some(mgr.clone()),
90            DbOptions::None => None,
91        };
92
93        let ctx_builder = ModuleContextBuilder::new(
94            instance_id,
95            modules_cfg,
96            client_hub.clone(),
97            cancel.clone(),
98            db_manager,
99        );
100
101        Self {
102            registry,
103            ctx_builder,
104            instance_id,
105            module_manager,
106            grpc_installers,
107            client_hub,
108            cancel,
109            db_options,
110            oop_options,
111        }
112    }
113
114    /// `PRE_INIT` phase: wire runtime internals into system modules.
115    ///
116    /// This phase runs before init and only for modules with the "system" capability.
117    ///
118    /// # Errors
119    /// Returns `RegistryError` if system wiring fails.
120    pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
121        tracing::info!("Phase: pre_init");
122
123        let sys_ctx = SystemContext::new(
124            self.instance_id,
125            Arc::clone(&self.module_manager),
126            Arc::clone(&self.grpc_installers),
127        );
128
129        for entry in self.registry.modules() {
130            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
131                tracing::debug!(module = entry.name, "Running system pre_init");
132                sys_mod
133                    .pre_init(&sys_ctx)
134                    .map_err(|e| RegistryError::PreInit {
135                        module: entry.name,
136                        source: e,
137                    })?;
138            }
139        }
140
141        Ok(())
142    }
143
144    /// Helper: resolve context for a module with error mapping.
145    async fn module_context(
146        &self,
147        module_name: &'static str,
148    ) -> Result<crate::context::ModuleCtx, RegistryError> {
149        self.ctx_builder
150            .for_module(module_name)
151            .await
152            .map_err(|e| RegistryError::DbMigrate {
153                module: module_name,
154                source: e,
155            })
156    }
157
158    /// Helper: extract DB handle and module if both exist.
159    #[cfg(feature = "db")]
160    async fn db_migration_target(
161        &self,
162        module_name: &'static str,
163        ctx: &crate::context::ModuleCtx,
164        db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
165    ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
166    {
167        let Some(dbm) = db_module else {
168            return Ok(None);
169        };
170
171        // Important: DB migrations require access to the underlying `Db`, not just `DBProvider`.
172        // `ModuleCtx` intentionally exposes only `DBProvider` for better DX and to reduce mistakes.
173        // So the runtime resolves the `Db` directly from its `DbManager`.
174        let db = match &self.db_options {
175            DbOptions::None => None,
176            #[cfg(feature = "db")]
177            DbOptions::Manager(mgr) => {
178                mgr.get(module_name)
179                    .await
180                    .map_err(|e| RegistryError::DbMigrate {
181                        module: module_name,
182                        source: e.into(),
183                    })?
184            }
185        };
186
187        let _ = ctx; // ctx is kept for parity/error context; DB is resolved from manager above.
188        Ok(db.map(|db| (db, dbm)))
189    }
190
191    /// Helper: run migrations for a single module using the new migration runner.
192    ///
193    /// This collects migrations from the module and executes them via the
194    /// runtime's privileged connection. Modules never see the raw connection.
195    #[cfg(feature = "db")]
196    async fn migrate_module(
197        module_name: &'static str,
198        db: &modkit_db::Db,
199        db_module: Arc<dyn crate::contracts::DatabaseCapability>,
200    ) -> Result<(), RegistryError> {
201        // Collect migrations from the module
202        let migrations = db_module.migrations();
203
204        if migrations.is_empty() {
205            tracing::debug!(module = module_name, "No migrations to run");
206            return Ok(());
207        }
208
209        tracing::debug!(
210            module = module_name,
211            count = migrations.len(),
212            "Running DB migrations"
213        );
214
215        // Execute migrations using the migration runner
216        let result =
217            modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
218                .await
219                .map_err(|e| RegistryError::DbMigrate {
220                    module: module_name,
221                    source: anyhow::Error::new(e),
222                })?;
223
224        tracing::info!(
225            module = module_name,
226            applied = result.applied,
227            skipped = result.skipped,
228            "DB migrations completed"
229        );
230
231        Ok(())
232    }
233
234    /// DB MIGRATION phase: run migrations for all modules with DB capability.
235    ///
236    /// Runs before init, with system modules processed first.
237    ///
238    /// Modules provide migrations via `DatabaseCapability::migrations()`.
239    /// The runtime executes them with a privileged connection that modules
240    /// never receive directly. Each module gets a separate migration history
241    /// table, preventing cross-module interference.
242    #[cfg(feature = "db")]
243    async fn run_db_phase(&self) -> Result<(), RegistryError> {
244        tracing::info!("Phase: db (before init)");
245
246        for entry in self.registry.modules_by_system_priority() {
247            let ctx = self.module_context(entry.name).await?;
248            let db_module = entry.caps.query::<DatabaseCap>();
249
250            match self
251                .db_migration_target(entry.name, &ctx, db_module.clone())
252                .await?
253            {
254                Some((db, dbm)) => {
255                    Self::migrate_module(entry.name, &db, dbm).await?;
256                }
257                None if db_module.is_some() => {
258                    tracing::debug!(
259                        module = entry.name,
260                        "Module has DbModule trait but no DB handle (no config)"
261                    );
262                }
263                None => {}
264            }
265        }
266
267        Ok(())
268    }
269
270    /// INIT phase: initialize all modules in topological order.
271    ///
272    /// System modules initialize first, followed by user modules.
273    async fn run_init_phase(&self) -> Result<(), RegistryError> {
274        tracing::info!("Phase: init");
275
276        for entry in self.registry.modules_by_system_priority() {
277            let ctx =
278                self.ctx_builder
279                    .for_module(entry.name)
280                    .await
281                    .map_err(|e| RegistryError::Init {
282                        module: entry.name,
283                        source: e,
284                    })?;
285            entry
286                .core
287                .init(&ctx)
288                .await
289                .map_err(|e| RegistryError::Init {
290                    module: entry.name,
291                    source: e,
292                })?;
293        }
294
295        Ok(())
296    }
297
298    /// `POST_INIT` phase: optional hook after ALL modules completed `init()`.
299    ///
300    /// This provides a global barrier between initialization-time registration
301    /// and subsequent phases that may rely on a fully-populated runtime registry.
302    ///
303    /// System modules run first, followed by user modules, preserving topo order.
304    async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
305        tracing::info!("Phase: post_init");
306
307        let sys_ctx = SystemContext::new(
308            self.instance_id,
309            Arc::clone(&self.module_manager),
310            Arc::clone(&self.grpc_installers),
311        );
312
313        for entry in self.registry.modules_by_system_priority() {
314            if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
315                sys_mod
316                    .post_init(&sys_ctx)
317                    .await
318                    .map_err(|e| RegistryError::PostInit {
319                        module: entry.name,
320                        source: e,
321                    })?;
322            }
323        }
324
325        Ok(())
326    }
327
328    /// REST phase: compose the router against the REST host.
329    ///
330    /// This is a synchronous phase that builds the final Router by:
331    /// 1. Preparing the host module
332    /// 2. Registering all REST providers
333    /// 3. Finalizing with `OpenAPI` endpoints
334    async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
335        tracing::info!("Phase: rest (sync)");
336
337        let mut router = Router::new();
338
339        // Find host(s) and whether any rest modules exist
340        let host_count = self
341            .registry
342            .modules()
343            .iter()
344            .filter(|e| e.caps.has::<ApiGatewayCap>())
345            .count();
346
347        match host_count {
348            0 => {
349                return if self
350                    .registry
351                    .modules()
352                    .iter()
353                    .any(|e| e.caps.has::<RestApiCap>())
354                {
355                    Err(RegistryError::RestRequiresHost)
356                } else {
357                    Ok(router)
358                };
359            }
360            1 => { /* proceed */ }
361            _ => return Err(RegistryError::MultipleRestHosts),
362        }
363
364        // Resolve the single host entry and its module context
365        let host_idx = self
366            .registry
367            .modules()
368            .iter()
369            .position(|e| e.caps.has::<ApiGatewayCap>())
370            .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
371        let host_entry = &self.registry.modules()[host_idx];
372        let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
373            return Err(RegistryError::RestHostMissingFromEntry);
374        };
375        let host_ctx = self
376            .ctx_builder
377            .for_module(host_entry.name)
378            .await
379            .map_err(|e| RegistryError::RestPrepare {
380                module: host_entry.name,
381                source: e,
382            })?;
383
384        // use host as the registry
385        let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
386
387        // 1) Host prepare: base Router / global middlewares / basic OAS meta
388        router =
389            host.rest_prepare(&host_ctx, router)
390                .map_err(|source| RegistryError::RestPrepare {
391                    module: host_entry.name,
392                    source,
393                })?;
394
395        // 2) Register all REST providers (in the current discovery order)
396        for e in self.registry.modules() {
397            if let Some(rest) = e.caps.query::<RestApiCap>() {
398                let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
399                    RegistryError::RestRegister {
400                        module: e.name,
401                        source: err,
402                    }
403                })?;
404                router = rest
405                    .register_rest(&ctx, router, registry)
406                    .map_err(|source| RegistryError::RestRegister {
407                        module: e.name,
408                        source,
409                    })?;
410            }
411        }
412
413        // 3) Host finalize: attach /openapi.json and /docs, persist Router if needed (no server start)
414        router = host.rest_finalize(&host_ctx, router).map_err(|source| {
415            RegistryError::RestFinalize {
416                module: host_entry.name,
417                source,
418            }
419        })?;
420
421        Ok(router)
422    }
423
424    /// gRPC registration phase: collect services from all grpc modules.
425    ///
426    /// Services are stored in the installer store for the `grpc_hub` to consume during start.
427    async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
428        tracing::info!("Phase: grpc (registration)");
429
430        // If no grpc_hub and no grpc_services, skip the phase
431        if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
432            return Ok(());
433        }
434
435        // If there are grpc_services but no hub, that's an error
436        if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
437            return Err(RegistryError::GrpcRequiresHub);
438        }
439
440        // If there's a hub, collect all services grouped by module and hand them off to the installer store
441        if let Some(hub_name) = &self.registry.grpc_hub {
442            let mut modules_data = Vec::new();
443            let mut seen = HashSet::new();
444
445            // Collect services from all grpc modules
446            for (module_name, service_module) in &self.registry.grpc_services {
447                let ctx = self
448                    .ctx_builder
449                    .for_module(module_name)
450                    .await
451                    .map_err(|err| RegistryError::GrpcRegister {
452                        module: module_name.clone(),
453                        source: err,
454                    })?;
455
456                let installers =
457                    service_module
458                        .get_grpc_services(&ctx)
459                        .await
460                        .map_err(|source| RegistryError::GrpcRegister {
461                            module: module_name.clone(),
462                            source,
463                        })?;
464
465                for reg in &installers {
466                    if !seen.insert(reg.service_name) {
467                        return Err(RegistryError::GrpcRegister {
468                            module: module_name.clone(),
469                            source: anyhow::anyhow!(
470                                "Duplicate gRPC service name: {}",
471                                reg.service_name
472                            ),
473                        });
474                    }
475                }
476
477                modules_data.push(crate::runtime::ModuleInstallers {
478                    module_name: module_name.clone(),
479                    installers,
480                });
481            }
482
483            self.grpc_installers
484                .set(crate::runtime::GrpcInstallerData {
485                    modules: modules_data,
486                })
487                .map_err(|source| RegistryError::GrpcRegister {
488                    module: hub_name.clone(),
489                    source,
490                })?;
491        }
492
493        Ok(())
494    }
495
496    /// START phase: start all stateful modules.
497    ///
498    /// System modules start first, followed by user modules.
499    async fn run_start_phase(&self) -> Result<(), RegistryError> {
500        tracing::info!("Phase: start");
501
502        for e in self.registry.modules_by_system_priority() {
503            if let Some(s) = e.caps.query::<RunnableCap>() {
504                tracing::debug!(
505                    module = e.name,
506                    is_system = e.caps.has::<SystemCap>(),
507                    "Starting stateful module"
508                );
509                s.start(self.cancel.clone())
510                    .await
511                    .map_err(|source| RegistryError::Start {
512                        module: e.name,
513                        source,
514                    })?;
515                tracing::info!(module = e.name, "Started module");
516            }
517        }
518
519        Ok(())
520    }
521
522    /// Stop a single module, logging errors but continuing execution.
523    async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
524        if let Some(s) = entry.caps.query::<RunnableCap>() {
525            match s.stop(cancel).await {
526                Err(err) => {
527                    tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
528                }
529                _ => {
530                    tracing::info!(module = entry.name, "Stopped module");
531                }
532            }
533        }
534    }
535
536    /// STOP phase: stop all stateful modules in reverse order.
537    ///
538    /// Errors are logged but do not fail the shutdown process.
539    /// Note: `OoP` modules are stopped automatically by the backend when the
540    /// cancellation token is triggered.
541    async fn run_stop_phase(&self) -> Result<(), RegistryError> {
542        tracing::info!("Phase: stop");
543
544        for e in self.registry.modules().iter().rev() {
545            Self::stop_one_module(e, self.cancel.clone()).await;
546        }
547
548        Ok(())
549    }
550
551    /// `OoP` SPAWN phase: spawn out-of-process modules after start phase.
552    ///
553    /// This phase runs after `grpc_hub` is already listening, so we can pass
554    /// the real directory endpoint to `OoP` modules.
555    async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
556        let oop_opts = match &self.oop_options {
557            Some(opts) if !opts.modules.is_empty() => opts,
558            _ => return Ok(()),
559        };
560
561        tracing::info!("Phase: oop_spawn");
562
563        // Wait for grpc_hub to publish its endpoint (it runs async in start phase)
564        let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
565
566        for module_cfg in &oop_opts.modules {
567            // Build environment with directory endpoint and rendered config
568            // Note: User controls --config via execution.args in master config
569            let mut env = module_cfg.env.clone();
570            env.insert(
571                MODKIT_MODULE_CONFIG_ENV.to_owned(),
572                module_cfg.rendered_config_json.clone(),
573            );
574            if let Some(ref endpoint) = directory_endpoint {
575                env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
576            }
577
578            // Use args from execution config as-is (user controls --config via args)
579            let args = module_cfg.args.clone();
580
581            let spawn_config = OopSpawnConfig {
582                module_name: module_cfg.module_name.clone(),
583                binary: module_cfg.binary.clone(),
584                args,
585                env,
586                working_directory: module_cfg.working_directory.clone(),
587            };
588
589            oop_opts
590                .backend
591                .spawn(spawn_config)
592                .await
593                .map_err(|e| RegistryError::OopSpawn {
594                    module: module_cfg.module_name.clone(),
595                    source: e,
596                })?;
597
598            tracing::info!(
599                module = %module_cfg.module_name,
600                directory_endpoint = ?directory_endpoint,
601                "Spawned OoP module via backend"
602            );
603        }
604
605        Ok(())
606    }
607
608    /// Wait for `grpc_hub` to publish its bound endpoint.
609    ///
610    /// Polls the `GrpcHubModule::bound_endpoint()` with a short interval until available or timeout.
611    /// Returns None if no `grpc_hub` is running or if it times out.
612    async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
613        const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
614        const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
615
616        // Find grpc_hub in registry
617        let grpc_hub = self
618            .registry
619            .modules()
620            .iter()
621            .find_map(|e| e.caps.query::<GrpcHubCap>());
622
623        let Some(hub) = grpc_hub else {
624            return None; // No grpc_hub registered
625        };
626
627        let start = std::time::Instant::now();
628
629        loop {
630            if let Some(endpoint) = hub.bound_endpoint() {
631                tracing::debug!(
632                    endpoint = %endpoint,
633                    elapsed_ms = start.elapsed().as_millis(),
634                    "gRPC hub endpoint available"
635                );
636                return Some(endpoint);
637            }
638
639            if start.elapsed() > MAX_WAIT {
640                tracing::warn!("Timed out waiting for gRPC hub to bind");
641                return None;
642            }
643
644            tokio::time::sleep(POLL_INTERVAL).await;
645        }
646    }
647
648    /// Run the full lifecycle: `pre_init` → DB → init → `post_init` → REST → gRPC → start → `OoP` spawn → wait → stop.
649    ///
650    /// This is the main entry point for orchestrating the complete module lifecycle.
651    ///
652    /// # Errors
653    /// Returns an error if any lifecycle phase fails.
654    pub async fn run_module_phases(self) -> anyhow::Result<()> {
655        // 1. Pre-init phase (before init, only for system modules)
656        self.run_pre_init_phase()?;
657
658        // 2. DB migration phase (system modules first)
659        #[cfg(feature = "db")]
660        {
661            self.run_db_phase().await?;
662        }
663        #[cfg(not(feature = "db"))]
664        {
665            // No DB integration in this build.
666        }
667
668        // 3. Init phase (system modules first)
669        self.run_init_phase().await?;
670
671        // 4. Post-init phase (barrier after ALL init; system modules only)
672        self.run_post_init_phase().await?;
673
674        // 5. REST phase (synchronous router composition)
675        let _router = self.run_rest_phase().await?;
676
677        // 6. gRPC registration phase
678        self.run_grpc_phase().await?;
679
680        // 7. Start phase
681        self.run_start_phase().await?;
682
683        // 8. OoP spawn phase (after grpc_hub is running)
684        self.run_oop_spawn_phase().await?;
685
686        // 9. Wait for cancellation
687        self.cancel.cancelled().await;
688
689        // 10. Stop phase
690        self.run_stop_phase().await?;
691
692        Ok(())
693    }
694}
695
696#[cfg(test)]
697#[cfg_attr(coverage_nightly, coverage(off))]
698mod tests {
699    use super::*;
700    use crate::context::ModuleCtx;
701    use crate::contracts::{Module, RunnableCapability, SystemCapability};
702    use crate::registry::RegistryBuilder;
703    use std::sync::Arc;
704    use std::sync::atomic::{AtomicUsize, Ordering};
705    use tokio::sync::Mutex;
706
707    #[derive(Default)]
708    #[allow(dead_code)]
709    struct DummyCore;
710    #[async_trait::async_trait]
711    impl Module for DummyCore {
712        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
713            Ok(())
714        }
715    }
716
717    struct StopOrderTracker {
718        my_order: usize,
719        stop_order: Arc<AtomicUsize>,
720    }
721
722    impl StopOrderTracker {
723        fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
724            let my_order = counter.fetch_add(1, Ordering::SeqCst);
725            Self {
726                my_order,
727                stop_order,
728            }
729        }
730    }
731
732    #[async_trait::async_trait]
733    impl Module for StopOrderTracker {
734        async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
735            Ok(())
736        }
737    }
738
739    #[async_trait::async_trait]
740    impl RunnableCapability for StopOrderTracker {
741        async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
742            Ok(())
743        }
744        async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
745            let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
746            tracing::info!(
747                my_order = self.my_order,
748                stop_order = order,
749                "Module stopped"
750            );
751            Ok(())
752        }
753    }
754
755    #[tokio::test]
756    async fn test_stop_phase_reverse_order() {
757        let counter = Arc::new(AtomicUsize::new(0));
758        let stop_order = Arc::new(AtomicUsize::new(0));
759
760        let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
761        let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
762        let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
763
764        let mut builder = RegistryBuilder::default();
765        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
766        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
767        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
768
769        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
770        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
771        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
772
773        let registry = builder.build_topo_sorted().unwrap();
774
775        // Verify module order is a -> b -> c
776        let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
777        assert_eq!(module_names, vec!["a", "b", "c"]);
778
779        let client_hub = Arc::new(ClientHub::new());
780        let cancel = CancellationToken::new();
781        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
782
783        let runtime = HostRuntime::new(
784            registry,
785            config_provider,
786            DbOptions::None,
787            client_hub,
788            cancel.clone(),
789            Uuid::new_v4(),
790            None,
791        );
792
793        // Run stop phase
794        runtime.run_stop_phase().await.unwrap();
795
796        // Verify modules stopped in reverse order: c (stop_order=0), b (stop_order=1), a (stop_order=2)
797        // Module order is: a=0, b=1, c=2
798        // Stop order should be: c=0, b=1, a=2
799        assert_eq!(stop_order.load(Ordering::SeqCst), 3);
800    }
801
802    #[tokio::test]
803    async fn test_stop_phase_continues_on_error() {
804        struct FailingModule {
805            should_fail: bool,
806            stopped: Arc<AtomicUsize>,
807        }
808
809        #[async_trait::async_trait]
810        impl Module for FailingModule {
811            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
812                Ok(())
813            }
814        }
815
816        #[async_trait::async_trait]
817        impl RunnableCapability for FailingModule {
818            async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
819                Ok(())
820            }
821            async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
822                self.stopped.fetch_add(1, Ordering::SeqCst);
823                if self.should_fail {
824                    anyhow::bail!("Intentional failure")
825                }
826                Ok(())
827            }
828        }
829
830        let stopped = Arc::new(AtomicUsize::new(0));
831        let module_a = Arc::new(FailingModule {
832            should_fail: false,
833            stopped: stopped.clone(),
834        });
835        let module_b = Arc::new(FailingModule {
836            should_fail: true,
837            stopped: stopped.clone(),
838        });
839        let module_c = Arc::new(FailingModule {
840            should_fail: false,
841            stopped: stopped.clone(),
842        });
843
844        let mut builder = RegistryBuilder::default();
845        builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
846        builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
847        builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
848
849        builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
850        builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
851        builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
852
853        let registry = builder.build_topo_sorted().unwrap();
854
855        let client_hub = Arc::new(ClientHub::new());
856        let cancel = CancellationToken::new();
857        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
858
859        let runtime = HostRuntime::new(
860            registry,
861            config_provider,
862            DbOptions::None,
863            client_hub,
864            cancel.clone(),
865            Uuid::new_v4(),
866            None,
867        );
868
869        // Run stop phase - should not fail even though module_b fails
870        runtime.run_stop_phase().await.unwrap();
871
872        // All modules should have attempted to stop
873        assert_eq!(stopped.load(Ordering::SeqCst), 3);
874    }
875
876    struct EmptyConfigProvider;
877    impl ConfigProvider for EmptyConfigProvider {
878        fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
879            None
880        }
881    }
882
883    #[tokio::test]
884    async fn test_post_init_runs_after_all_init_and_system_first() {
885        #[derive(Clone)]
886        struct TrackHooks {
887            name: &'static str,
888            events: Arc<Mutex<Vec<String>>>,
889        }
890
891        #[async_trait::async_trait]
892        impl Module for TrackHooks {
893            async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
894                self.events.lock().await.push(format!("init:{}", self.name));
895                Ok(())
896            }
897        }
898
899        #[async_trait::async_trait]
900        impl SystemCapability for TrackHooks {
901            fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
902                Ok(())
903            }
904
905            async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
906                self.events
907                    .lock()
908                    .await
909                    .push(format!("post_init:{}", self.name));
910                Ok(())
911            }
912        }
913
914        let events = Arc::new(Mutex::new(Vec::<String>::new()));
915        let sys_a = Arc::new(TrackHooks {
916            name: "sys_a",
917            events: events.clone(),
918        });
919        let user_b = Arc::new(TrackHooks {
920            name: "user_b",
921            events: events.clone(),
922        });
923        let user_c = Arc::new(TrackHooks {
924            name: "user_c",
925            events: events.clone(),
926        });
927
928        let mut builder = RegistryBuilder::default();
929        builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
930        builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
931        builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
932        builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
933
934        let registry = builder.build_topo_sorted().unwrap();
935
936        let client_hub = Arc::new(ClientHub::new());
937        let cancel = CancellationToken::new();
938        let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
939
940        let runtime = HostRuntime::new(
941            registry,
942            config_provider,
943            DbOptions::None,
944            client_hub,
945            cancel,
946            Uuid::new_v4(),
947            None,
948        );
949
950        // Run init phase for all modules, then post_init as a separate barrier phase.
951        runtime.run_init_phase().await.unwrap();
952        runtime.run_post_init_phase().await.unwrap();
953
954        let events = events.lock().await.clone();
955        let first_post_init = events
956            .iter()
957            .position(|e| e.starts_with("post_init:"))
958            .expect("expected post_init events");
959        assert!(
960            events[..first_post_init]
961                .iter()
962                .all(|e| e.starts_with("init:")),
963            "expected all init events before post_init, got: {events:?}"
964        );
965
966        // system-first order within each phase
967        assert_eq!(
968            events,
969            vec![
970                "init:sys_a",
971                "init:user_b",
972                "init:user_c",
973                "post_init:sys_a",
974            ]
975        );
976    }
977}