1use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19use tokio_util::sync::CancellationToken;
20use uuid::Uuid;
21
22use crate::backends::OopSpawnConfig;
23use crate::client_hub::ClientHub;
24use crate::config::ConfigProvider;
25use crate::context::ModuleContextBuilder;
26use crate::registry::{
27 ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
28 SystemCap,
29};
30use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
31
32#[cfg(feature = "db")]
33use crate::registry::DatabaseCap;
34
35#[derive(Clone)]
37pub enum DbOptions {
38 None,
40 #[cfg(feature = "db")]
42 Manager(Arc<modkit_db::DbManager>),
43}
44
45pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
47
48pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
50
51pub struct HostRuntime {
55 registry: ModuleRegistry,
56 ctx_builder: ModuleContextBuilder,
57 instance_id: Uuid,
58 module_manager: Arc<ModuleManager>,
59 grpc_installers: Arc<GrpcInstallerStore>,
60 #[allow(dead_code)]
61 client_hub: Arc<ClientHub>,
62 cancel: CancellationToken,
63 #[allow(dead_code)]
64 db_options: DbOptions,
65 oop_options: Option<OopSpawnOptions>,
67}
68
69impl HostRuntime {
70 pub fn new(
74 registry: ModuleRegistry,
75 modules_cfg: Arc<dyn ConfigProvider>,
76 db_options: DbOptions,
77 client_hub: Arc<ClientHub>,
78 cancel: CancellationToken,
79 instance_id: Uuid,
80 oop_options: Option<OopSpawnOptions>,
81 ) -> Self {
82 let module_manager = Arc::new(ModuleManager::new());
84 let grpc_installers = Arc::new(GrpcInstallerStore::new());
85
86 let db_manager = match &db_options {
88 #[cfg(feature = "db")]
89 DbOptions::Manager(mgr) => Some(mgr.clone()),
90 DbOptions::None => None,
91 };
92
93 let ctx_builder = ModuleContextBuilder::new(
94 instance_id,
95 modules_cfg,
96 client_hub.clone(),
97 cancel.clone(),
98 db_manager,
99 );
100
101 Self {
102 registry,
103 ctx_builder,
104 instance_id,
105 module_manager,
106 grpc_installers,
107 client_hub,
108 cancel,
109 db_options,
110 oop_options,
111 }
112 }
113
114 pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
121 tracing::info!("Phase: pre_init");
122
123 let sys_ctx = SystemContext::new(
124 self.instance_id,
125 Arc::clone(&self.module_manager),
126 Arc::clone(&self.grpc_installers),
127 );
128
129 for entry in self.registry.modules() {
130 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
131 tracing::debug!(module = entry.name, "Running system pre_init");
132 sys_mod
133 .pre_init(&sys_ctx)
134 .map_err(|e| RegistryError::PreInit {
135 module: entry.name,
136 source: e,
137 })?;
138 }
139 }
140
141 Ok(())
142 }
143
144 async fn module_context(
146 &self,
147 module_name: &'static str,
148 ) -> Result<crate::context::ModuleCtx, RegistryError> {
149 self.ctx_builder
150 .for_module(module_name)
151 .await
152 .map_err(|e| RegistryError::DbMigrate {
153 module: module_name,
154 source: e,
155 })
156 }
157
158 #[cfg(feature = "db")]
160 async fn db_migration_target(
161 &self,
162 module_name: &'static str,
163 ctx: &crate::context::ModuleCtx,
164 db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
165 ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
166 {
167 let Some(dbm) = db_module else {
168 return Ok(None);
169 };
170
171 let db = match &self.db_options {
175 DbOptions::None => None,
176 #[cfg(feature = "db")]
177 DbOptions::Manager(mgr) => {
178 mgr.get(module_name)
179 .await
180 .map_err(|e| RegistryError::DbMigrate {
181 module: module_name,
182 source: e.into(),
183 })?
184 }
185 };
186
187 let _ = ctx; Ok(db.map(|db| (db, dbm)))
189 }
190
191 #[cfg(feature = "db")]
196 async fn migrate_module(
197 module_name: &'static str,
198 db: &modkit_db::Db,
199 db_module: Arc<dyn crate::contracts::DatabaseCapability>,
200 ) -> Result<(), RegistryError> {
201 let migrations = db_module.migrations();
203
204 if migrations.is_empty() {
205 tracing::debug!(module = module_name, "No migrations to run");
206 return Ok(());
207 }
208
209 tracing::debug!(
210 module = module_name,
211 count = migrations.len(),
212 "Running DB migrations"
213 );
214
215 let result =
217 modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
218 .await
219 .map_err(|e| RegistryError::DbMigrate {
220 module: module_name,
221 source: anyhow::Error::new(e),
222 })?;
223
224 tracing::info!(
225 module = module_name,
226 applied = result.applied,
227 skipped = result.skipped,
228 "DB migrations completed"
229 );
230
231 Ok(())
232 }
233
234 #[cfg(feature = "db")]
243 async fn run_db_phase(&self) -> Result<(), RegistryError> {
244 tracing::info!("Phase: db (before init)");
245
246 for entry in self.registry.modules_by_system_priority() {
247 let ctx = self.module_context(entry.name).await?;
248 let db_module = entry.caps.query::<DatabaseCap>();
249
250 match self
251 .db_migration_target(entry.name, &ctx, db_module.clone())
252 .await?
253 {
254 Some((db, dbm)) => {
255 Self::migrate_module(entry.name, &db, dbm).await?;
256 }
257 None if db_module.is_some() => {
258 tracing::debug!(
259 module = entry.name,
260 "Module has DbModule trait but no DB handle (no config)"
261 );
262 }
263 None => {}
264 }
265 }
266
267 Ok(())
268 }
269
270 async fn run_init_phase(&self) -> Result<(), RegistryError> {
274 tracing::info!("Phase: init");
275
276 for entry in self.registry.modules_by_system_priority() {
277 let ctx =
278 self.ctx_builder
279 .for_module(entry.name)
280 .await
281 .map_err(|e| RegistryError::Init {
282 module: entry.name,
283 source: e,
284 })?;
285 entry
286 .core
287 .init(&ctx)
288 .await
289 .map_err(|e| RegistryError::Init {
290 module: entry.name,
291 source: e,
292 })?;
293 }
294
295 Ok(())
296 }
297
298 async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
305 tracing::info!("Phase: post_init");
306
307 let sys_ctx = SystemContext::new(
308 self.instance_id,
309 Arc::clone(&self.module_manager),
310 Arc::clone(&self.grpc_installers),
311 );
312
313 for entry in self.registry.modules_by_system_priority() {
314 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
315 sys_mod
316 .post_init(&sys_ctx)
317 .await
318 .map_err(|e| RegistryError::PostInit {
319 module: entry.name,
320 source: e,
321 })?;
322 }
323 }
324
325 Ok(())
326 }
327
328 async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
335 tracing::info!("Phase: rest (sync)");
336
337 let mut router = Router::new();
338
339 let host_count = self
341 .registry
342 .modules()
343 .iter()
344 .filter(|e| e.caps.has::<ApiGatewayCap>())
345 .count();
346
347 match host_count {
348 0 => {
349 return if self
350 .registry
351 .modules()
352 .iter()
353 .any(|e| e.caps.has::<RestApiCap>())
354 {
355 Err(RegistryError::RestRequiresHost)
356 } else {
357 Ok(router)
358 };
359 }
360 1 => { }
361 _ => return Err(RegistryError::MultipleRestHosts),
362 }
363
364 let host_idx = self
366 .registry
367 .modules()
368 .iter()
369 .position(|e| e.caps.has::<ApiGatewayCap>())
370 .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
371 let host_entry = &self.registry.modules()[host_idx];
372 let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
373 return Err(RegistryError::RestHostMissingFromEntry);
374 };
375 let host_ctx = self
376 .ctx_builder
377 .for_module(host_entry.name)
378 .await
379 .map_err(|e| RegistryError::RestPrepare {
380 module: host_entry.name,
381 source: e,
382 })?;
383
384 let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
386
387 router =
389 host.rest_prepare(&host_ctx, router)
390 .map_err(|source| RegistryError::RestPrepare {
391 module: host_entry.name,
392 source,
393 })?;
394
395 for e in self.registry.modules() {
397 if let Some(rest) = e.caps.query::<RestApiCap>() {
398 let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
399 RegistryError::RestRegister {
400 module: e.name,
401 source: err,
402 }
403 })?;
404 router = rest
405 .register_rest(&ctx, router, registry)
406 .map_err(|source| RegistryError::RestRegister {
407 module: e.name,
408 source,
409 })?;
410 }
411 }
412
413 router = host.rest_finalize(&host_ctx, router).map_err(|source| {
415 RegistryError::RestFinalize {
416 module: host_entry.name,
417 source,
418 }
419 })?;
420
421 Ok(router)
422 }
423
424 async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
428 tracing::info!("Phase: grpc (registration)");
429
430 if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
432 return Ok(());
433 }
434
435 if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
437 return Err(RegistryError::GrpcRequiresHub);
438 }
439
440 if let Some(hub_name) = &self.registry.grpc_hub {
442 let mut modules_data = Vec::new();
443 let mut seen = HashSet::new();
444
445 for (module_name, service_module) in &self.registry.grpc_services {
447 let ctx = self
448 .ctx_builder
449 .for_module(module_name)
450 .await
451 .map_err(|err| RegistryError::GrpcRegister {
452 module: module_name.clone(),
453 source: err,
454 })?;
455
456 let installers =
457 service_module
458 .get_grpc_services(&ctx)
459 .await
460 .map_err(|source| RegistryError::GrpcRegister {
461 module: module_name.clone(),
462 source,
463 })?;
464
465 for reg in &installers {
466 if !seen.insert(reg.service_name) {
467 return Err(RegistryError::GrpcRegister {
468 module: module_name.clone(),
469 source: anyhow::anyhow!(
470 "Duplicate gRPC service name: {}",
471 reg.service_name
472 ),
473 });
474 }
475 }
476
477 modules_data.push(crate::runtime::ModuleInstallers {
478 module_name: module_name.clone(),
479 installers,
480 });
481 }
482
483 self.grpc_installers
484 .set(crate::runtime::GrpcInstallerData {
485 modules: modules_data,
486 })
487 .map_err(|source| RegistryError::GrpcRegister {
488 module: hub_name.clone(),
489 source,
490 })?;
491 }
492
493 Ok(())
494 }
495
496 async fn run_start_phase(&self) -> Result<(), RegistryError> {
500 tracing::info!("Phase: start");
501
502 for e in self.registry.modules_by_system_priority() {
503 if let Some(s) = e.caps.query::<RunnableCap>() {
504 tracing::debug!(
505 module = e.name,
506 is_system = e.caps.has::<SystemCap>(),
507 "Starting stateful module"
508 );
509 s.start(self.cancel.clone())
510 .await
511 .map_err(|source| RegistryError::Start {
512 module: e.name,
513 source,
514 })?;
515 tracing::info!(module = e.name, "Started module");
516 }
517 }
518
519 Ok(())
520 }
521
522 async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
524 if let Some(s) = entry.caps.query::<RunnableCap>() {
525 match s.stop(cancel).await {
526 Err(err) => {
527 tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
528 }
529 _ => {
530 tracing::info!(module = entry.name, "Stopped module");
531 }
532 }
533 }
534 }
535
536 async fn run_stop_phase(&self) -> Result<(), RegistryError> {
542 tracing::info!("Phase: stop");
543
544 for e in self.registry.modules().iter().rev() {
545 Self::stop_one_module(e, self.cancel.clone()).await;
546 }
547
548 Ok(())
549 }
550
551 async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
556 let oop_opts = match &self.oop_options {
557 Some(opts) if !opts.modules.is_empty() => opts,
558 _ => return Ok(()),
559 };
560
561 tracing::info!("Phase: oop_spawn");
562
563 let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
565
566 for module_cfg in &oop_opts.modules {
567 let mut env = module_cfg.env.clone();
570 env.insert(
571 MODKIT_MODULE_CONFIG_ENV.to_owned(),
572 module_cfg.rendered_config_json.clone(),
573 );
574 if let Some(ref endpoint) = directory_endpoint {
575 env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
576 }
577
578 let args = module_cfg.args.clone();
580
581 let spawn_config = OopSpawnConfig {
582 module_name: module_cfg.module_name.clone(),
583 binary: module_cfg.binary.clone(),
584 args,
585 env,
586 working_directory: module_cfg.working_directory.clone(),
587 };
588
589 oop_opts
590 .backend
591 .spawn(spawn_config)
592 .await
593 .map_err(|e| RegistryError::OopSpawn {
594 module: module_cfg.module_name.clone(),
595 source: e,
596 })?;
597
598 tracing::info!(
599 module = %module_cfg.module_name,
600 directory_endpoint = ?directory_endpoint,
601 "Spawned OoP module via backend"
602 );
603 }
604
605 Ok(())
606 }
607
608 async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
613 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
614 const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
615
616 let grpc_hub = self
618 .registry
619 .modules()
620 .iter()
621 .find_map(|e| e.caps.query::<GrpcHubCap>());
622
623 let Some(hub) = grpc_hub else {
624 return None; };
626
627 let start = std::time::Instant::now();
628
629 loop {
630 if let Some(endpoint) = hub.bound_endpoint() {
631 tracing::debug!(
632 endpoint = %endpoint,
633 elapsed_ms = start.elapsed().as_millis(),
634 "gRPC hub endpoint available"
635 );
636 return Some(endpoint);
637 }
638
639 if start.elapsed() > MAX_WAIT {
640 tracing::warn!("Timed out waiting for gRPC hub to bind");
641 return None;
642 }
643
644 tokio::time::sleep(POLL_INTERVAL).await;
645 }
646 }
647
648 pub async fn run_module_phases(self) -> anyhow::Result<()> {
655 self.run_pre_init_phase()?;
657
658 #[cfg(feature = "db")]
660 {
661 self.run_db_phase().await?;
662 }
663 #[cfg(not(feature = "db"))]
664 {
665 }
667
668 self.run_init_phase().await?;
670
671 self.run_post_init_phase().await?;
673
674 let _router = self.run_rest_phase().await?;
676
677 self.run_grpc_phase().await?;
679
680 self.run_start_phase().await?;
682
683 self.run_oop_spawn_phase().await?;
685
686 self.cancel.cancelled().await;
688
689 self.run_stop_phase().await?;
691
692 Ok(())
693 }
694}
695
696#[cfg(test)]
697#[cfg_attr(coverage_nightly, coverage(off))]
698mod tests {
699 use super::*;
700 use crate::context::ModuleCtx;
701 use crate::contracts::{Module, RunnableCapability, SystemCapability};
702 use crate::registry::RegistryBuilder;
703 use std::sync::Arc;
704 use std::sync::atomic::{AtomicUsize, Ordering};
705 use tokio::sync::Mutex;
706
707 #[derive(Default)]
708 #[allow(dead_code)]
709 struct DummyCore;
710 #[async_trait::async_trait]
711 impl Module for DummyCore {
712 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
713 Ok(())
714 }
715 }
716
717 struct StopOrderTracker {
718 my_order: usize,
719 stop_order: Arc<AtomicUsize>,
720 }
721
722 impl StopOrderTracker {
723 fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
724 let my_order = counter.fetch_add(1, Ordering::SeqCst);
725 Self {
726 my_order,
727 stop_order,
728 }
729 }
730 }
731
732 #[async_trait::async_trait]
733 impl Module for StopOrderTracker {
734 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
735 Ok(())
736 }
737 }
738
739 #[async_trait::async_trait]
740 impl RunnableCapability for StopOrderTracker {
741 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
742 Ok(())
743 }
744 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
745 let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
746 tracing::info!(
747 my_order = self.my_order,
748 stop_order = order,
749 "Module stopped"
750 );
751 Ok(())
752 }
753 }
754
755 #[tokio::test]
756 async fn test_stop_phase_reverse_order() {
757 let counter = Arc::new(AtomicUsize::new(0));
758 let stop_order = Arc::new(AtomicUsize::new(0));
759
760 let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
761 let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
762 let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
763
764 let mut builder = RegistryBuilder::default();
765 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
766 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
767 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
768
769 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
770 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
771 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
772
773 let registry = builder.build_topo_sorted().unwrap();
774
775 let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
777 assert_eq!(module_names, vec!["a", "b", "c"]);
778
779 let client_hub = Arc::new(ClientHub::new());
780 let cancel = CancellationToken::new();
781 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
782
783 let runtime = HostRuntime::new(
784 registry,
785 config_provider,
786 DbOptions::None,
787 client_hub,
788 cancel.clone(),
789 Uuid::new_v4(),
790 None,
791 );
792
793 runtime.run_stop_phase().await.unwrap();
795
796 assert_eq!(stop_order.load(Ordering::SeqCst), 3);
800 }
801
802 #[tokio::test]
803 async fn test_stop_phase_continues_on_error() {
804 struct FailingModule {
805 should_fail: bool,
806 stopped: Arc<AtomicUsize>,
807 }
808
809 #[async_trait::async_trait]
810 impl Module for FailingModule {
811 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
812 Ok(())
813 }
814 }
815
816 #[async_trait::async_trait]
817 impl RunnableCapability for FailingModule {
818 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
819 Ok(())
820 }
821 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
822 self.stopped.fetch_add(1, Ordering::SeqCst);
823 if self.should_fail {
824 anyhow::bail!("Intentional failure")
825 }
826 Ok(())
827 }
828 }
829
830 let stopped = Arc::new(AtomicUsize::new(0));
831 let module_a = Arc::new(FailingModule {
832 should_fail: false,
833 stopped: stopped.clone(),
834 });
835 let module_b = Arc::new(FailingModule {
836 should_fail: true,
837 stopped: stopped.clone(),
838 });
839 let module_c = Arc::new(FailingModule {
840 should_fail: false,
841 stopped: stopped.clone(),
842 });
843
844 let mut builder = RegistryBuilder::default();
845 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
846 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
847 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
848
849 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
850 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
851 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
852
853 let registry = builder.build_topo_sorted().unwrap();
854
855 let client_hub = Arc::new(ClientHub::new());
856 let cancel = CancellationToken::new();
857 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
858
859 let runtime = HostRuntime::new(
860 registry,
861 config_provider,
862 DbOptions::None,
863 client_hub,
864 cancel.clone(),
865 Uuid::new_v4(),
866 None,
867 );
868
869 runtime.run_stop_phase().await.unwrap();
871
872 assert_eq!(stopped.load(Ordering::SeqCst), 3);
874 }
875
876 struct EmptyConfigProvider;
877 impl ConfigProvider for EmptyConfigProvider {
878 fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
879 None
880 }
881 }
882
883 #[tokio::test]
884 async fn test_post_init_runs_after_all_init_and_system_first() {
885 #[derive(Clone)]
886 struct TrackHooks {
887 name: &'static str,
888 events: Arc<Mutex<Vec<String>>>,
889 }
890
891 #[async_trait::async_trait]
892 impl Module for TrackHooks {
893 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
894 self.events.lock().await.push(format!("init:{}", self.name));
895 Ok(())
896 }
897 }
898
899 #[async_trait::async_trait]
900 impl SystemCapability for TrackHooks {
901 fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
902 Ok(())
903 }
904
905 async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
906 self.events
907 .lock()
908 .await
909 .push(format!("post_init:{}", self.name));
910 Ok(())
911 }
912 }
913
914 let events = Arc::new(Mutex::new(Vec::<String>::new()));
915 let sys_a = Arc::new(TrackHooks {
916 name: "sys_a",
917 events: events.clone(),
918 });
919 let user_b = Arc::new(TrackHooks {
920 name: "user_b",
921 events: events.clone(),
922 });
923 let user_c = Arc::new(TrackHooks {
924 name: "user_c",
925 events: events.clone(),
926 });
927
928 let mut builder = RegistryBuilder::default();
929 builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
930 builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
931 builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
932 builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
933
934 let registry = builder.build_topo_sorted().unwrap();
935
936 let client_hub = Arc::new(ClientHub::new());
937 let cancel = CancellationToken::new();
938 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
939
940 let runtime = HostRuntime::new(
941 registry,
942 config_provider,
943 DbOptions::None,
944 client_hub,
945 cancel,
946 Uuid::new_v4(),
947 None,
948 );
949
950 runtime.run_init_phase().await.unwrap();
952 runtime.run_post_init_phase().await.unwrap();
953
954 let events = events.lock().await.clone();
955 let first_post_init = events
956 .iter()
957 .position(|e| e.starts_with("post_init:"))
958 .expect("expected post_init events");
959 assert!(
960 events[..first_post_init]
961 .iter()
962 .all(|e| e.starts_with("init:")),
963 "expected all init events before post_init, got: {events:?}"
964 );
965
966 assert_eq!(
968 events,
969 vec![
970 "init:sys_a",
971 "init:user_b",
972 "init:user_c",
973 "post_init:sys_a",
974 ]
975 );
976 }
977}