1use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28 ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29 SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36#[derive(Clone)]
38pub enum DbOptions {
39 None,
41 #[cfg(feature = "db")]
43 Manager(Arc<modkit_db::DbManager>),
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49 Full,
51 MigrateOnly,
53}
54
55pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61pub struct HostRuntime {
65 registry: ModuleRegistry,
66 ctx_builder: ModuleContextBuilder,
67 instance_id: Uuid,
68 module_manager: Arc<ModuleManager>,
69 grpc_installers: Arc<GrpcInstallerStore>,
70 #[allow(dead_code)]
71 client_hub: Arc<ClientHub>,
72 cancel: CancellationToken,
73 #[allow(dead_code)]
74 db_options: DbOptions,
75 oop_options: Option<OopSpawnOptions>,
77}
78
79impl HostRuntime {
80 pub fn new(
84 registry: ModuleRegistry,
85 modules_cfg: Arc<dyn ConfigProvider>,
86 db_options: DbOptions,
87 client_hub: Arc<ClientHub>,
88 cancel: CancellationToken,
89 instance_id: Uuid,
90 oop_options: Option<OopSpawnOptions>,
91 ) -> Self {
92 let module_manager = Arc::new(ModuleManager::new());
94 let grpc_installers = Arc::new(GrpcInstallerStore::new());
95
96 let db_manager = match &db_options {
98 #[cfg(feature = "db")]
99 DbOptions::Manager(mgr) => Some(mgr.clone()),
100 DbOptions::None => None,
101 };
102
103 let ctx_builder = ModuleContextBuilder::new(
104 instance_id,
105 modules_cfg,
106 client_hub.clone(),
107 cancel.clone(),
108 db_manager,
109 );
110
111 Self {
112 registry,
113 ctx_builder,
114 instance_id,
115 module_manager,
116 grpc_installers,
117 client_hub,
118 cancel,
119 db_options,
120 oop_options,
121 }
122 }
123
124 pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
131 tracing::info!("Phase: pre_init");
132
133 let sys_ctx = SystemContext::new(
134 self.instance_id,
135 Arc::clone(&self.module_manager),
136 Arc::clone(&self.grpc_installers),
137 );
138
139 for entry in self.registry.modules() {
140 if self.cancel.is_cancelled() {
142 tracing::warn!("Pre-init phase cancelled by signal");
143 return Err(RegistryError::Cancelled);
144 }
145
146 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
147 tracing::debug!(module = entry.name, "Running system pre_init");
148 sys_mod
149 .pre_init(&sys_ctx)
150 .map_err(|e| RegistryError::PreInit {
151 module: entry.name,
152 source: e,
153 })?;
154 }
155 }
156
157 Ok(())
158 }
159
160 async fn module_context(
162 &self,
163 module_name: &'static str,
164 ) -> Result<crate::context::ModuleCtx, RegistryError> {
165 self.ctx_builder
166 .for_module(module_name)
167 .await
168 .map_err(|e| RegistryError::DbMigrate {
169 module: module_name,
170 source: e,
171 })
172 }
173
174 #[cfg(feature = "db")]
176 async fn db_migration_target(
177 &self,
178 module_name: &'static str,
179 ctx: &crate::context::ModuleCtx,
180 db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
181 ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
182 {
183 let Some(dbm) = db_module else {
184 return Ok(None);
185 };
186
187 let db = match &self.db_options {
191 DbOptions::None => None,
192 #[cfg(feature = "db")]
193 DbOptions::Manager(mgr) => {
194 mgr.get(module_name)
195 .await
196 .map_err(|e| RegistryError::DbMigrate {
197 module: module_name,
198 source: e.into(),
199 })?
200 }
201 };
202
203 _ = ctx; Ok(db.map(|db| (db, dbm)))
205 }
206
207 #[cfg(feature = "db")]
212 async fn migrate_module(
213 module_name: &'static str,
214 db: &modkit_db::Db,
215 db_module: Arc<dyn crate::contracts::DatabaseCapability>,
216 ) -> Result<(), RegistryError> {
217 let migrations = db_module.migrations();
219
220 if migrations.is_empty() {
221 tracing::debug!(module = module_name, "No migrations to run");
222 return Ok(());
223 }
224
225 tracing::debug!(
226 module = module_name,
227 count = migrations.len(),
228 "Running DB migrations"
229 );
230
231 let result =
233 modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
234 .await
235 .map_err(|e| RegistryError::DbMigrate {
236 module: module_name,
237 source: anyhow::Error::new(e),
238 })?;
239
240 tracing::info!(
241 module = module_name,
242 applied = result.applied,
243 skipped = result.skipped,
244 "DB migrations completed"
245 );
246
247 Ok(())
248 }
249
250 #[cfg(feature = "db")]
259 async fn run_db_phase(&self) -> Result<(), RegistryError> {
260 tracing::info!("Phase: db (before init)");
261
262 for entry in self.registry.modules_by_system_priority() {
263 if self.cancel.is_cancelled() {
265 tracing::warn!("DB migration phase cancelled by signal");
266 return Err(RegistryError::Cancelled);
267 }
268
269 let ctx = self.module_context(entry.name).await?;
270 let db_module = entry.caps.query::<DatabaseCap>();
271
272 match self
273 .db_migration_target(entry.name, &ctx, db_module.clone())
274 .await?
275 {
276 Some((db, dbm)) => {
277 Self::migrate_module(entry.name, &db, dbm).await?;
278 }
279 None if db_module.is_some() => {
280 tracing::debug!(
281 module = entry.name,
282 "Module has DbModule trait but no DB handle (no config)"
283 );
284 }
285 None => {}
286 }
287 }
288
289 Ok(())
290 }
291
292 async fn run_init_phase(&self) -> Result<(), RegistryError> {
296 tracing::info!("Phase: init");
297
298 for entry in self.registry.modules_by_system_priority() {
299 let ctx =
300 self.ctx_builder
301 .for_module(entry.name)
302 .await
303 .map_err(|e| RegistryError::Init {
304 module: entry.name,
305 source: e,
306 })?;
307 tracing::info!(module = entry.name, "Initializing a module...");
308 entry
309 .core
310 .init(&ctx)
311 .await
312 .map_err(|e| RegistryError::Init {
313 module: entry.name,
314 source: e,
315 })?;
316 tracing::info!(module = entry.name, "Initialized a module.");
317 }
318
319 Ok(())
320 }
321
322 async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
329 tracing::info!("Phase: post_init");
330
331 let sys_ctx = SystemContext::new(
332 self.instance_id,
333 Arc::clone(&self.module_manager),
334 Arc::clone(&self.grpc_installers),
335 );
336
337 for entry in self.registry.modules_by_system_priority() {
338 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
339 sys_mod
340 .post_init(&sys_ctx)
341 .await
342 .map_err(|e| RegistryError::PostInit {
343 module: entry.name,
344 source: e,
345 })?;
346 }
347 }
348
349 Ok(())
350 }
351
352 async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
359 tracing::info!("Phase: rest (sync)");
360
361 let mut router = Router::new();
362
363 let host_count = self
365 .registry
366 .modules()
367 .iter()
368 .filter(|e| e.caps.has::<ApiGatewayCap>())
369 .count();
370
371 match host_count {
372 0 => {
373 return if self
374 .registry
375 .modules()
376 .iter()
377 .any(|e| e.caps.has::<RestApiCap>())
378 {
379 Err(RegistryError::RestRequiresHost)
380 } else {
381 Ok(router)
382 };
383 }
384 1 => { }
385 _ => return Err(RegistryError::MultipleRestHosts),
386 }
387
388 let host_idx = self
390 .registry
391 .modules()
392 .iter()
393 .position(|e| e.caps.has::<ApiGatewayCap>())
394 .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
395 let host_entry = &self.registry.modules()[host_idx];
396 let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
397 return Err(RegistryError::RestHostMissingFromEntry);
398 };
399 let host_ctx = self
400 .ctx_builder
401 .for_module(host_entry.name)
402 .await
403 .map_err(|e| RegistryError::RestPrepare {
404 module: host_entry.name,
405 source: e,
406 })?;
407
408 let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
410
411 router =
413 host.rest_prepare(&host_ctx, router)
414 .map_err(|source| RegistryError::RestPrepare {
415 module: host_entry.name,
416 source,
417 })?;
418
419 for e in self.registry.modules() {
421 if let Some(rest) = e.caps.query::<RestApiCap>() {
422 let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
423 RegistryError::RestRegister {
424 module: e.name,
425 source: err,
426 }
427 })?;
428
429 router = rest
430 .register_rest(&ctx, router, registry)
431 .map_err(|source| RegistryError::RestRegister {
432 module: e.name,
433 source,
434 })?;
435 }
436 }
437
438 router = host.rest_finalize(&host_ctx, router).map_err(|source| {
440 RegistryError::RestFinalize {
441 module: host_entry.name,
442 source,
443 }
444 })?;
445
446 Ok(router)
447 }
448
449 async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
453 tracing::info!("Phase: grpc (registration)");
454
455 if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
457 return Ok(());
458 }
459
460 if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
462 return Err(RegistryError::GrpcRequiresHub);
463 }
464
465 if let Some(hub_name) = &self.registry.grpc_hub {
467 let mut modules_data = Vec::new();
468 let mut seen = HashSet::new();
469
470 for (module_name, service_module) in &self.registry.grpc_services {
472 let ctx = self
473 .ctx_builder
474 .for_module(module_name)
475 .await
476 .map_err(|err| RegistryError::GrpcRegister {
477 module: module_name.clone(),
478 source: err,
479 })?;
480
481 let installers =
482 service_module
483 .get_grpc_services(&ctx)
484 .await
485 .map_err(|source| RegistryError::GrpcRegister {
486 module: module_name.clone(),
487 source,
488 })?;
489
490 for reg in &installers {
491 if !seen.insert(reg.service_name) {
492 return Err(RegistryError::GrpcRegister {
493 module: module_name.clone(),
494 source: anyhow::anyhow!(
495 "Duplicate gRPC service name: {}",
496 reg.service_name
497 ),
498 });
499 }
500 }
501
502 modules_data.push(crate::runtime::ModuleInstallers {
503 module_name: module_name.clone(),
504 installers,
505 });
506 }
507
508 self.grpc_installers
509 .set(crate::runtime::GrpcInstallerData {
510 modules: modules_data,
511 })
512 .map_err(|source| RegistryError::GrpcRegister {
513 module: hub_name.clone(),
514 source,
515 })?;
516 }
517
518 Ok(())
519 }
520
521 async fn run_start_phase(&self) -> Result<(), RegistryError> {
525 tracing::info!("Phase: start");
526
527 for e in self.registry.modules_by_system_priority() {
528 if let Some(s) = e.caps.query::<RunnableCap>() {
529 tracing::debug!(
530 module = e.name,
531 is_system = e.caps.has::<SystemCap>(),
532 "Starting stateful module"
533 );
534 s.start(self.cancel.clone())
535 .await
536 .map_err(|source| RegistryError::Start {
537 module: e.name,
538 source,
539 })?;
540 tracing::info!(module = e.name, "Started module");
541 }
542 }
543
544 Ok(())
545 }
546
547 async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
549 if let Some(s) = entry.caps.query::<RunnableCap>() {
550 match s.stop(cancel).await {
551 Err(err) => {
552 tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
553 }
554 _ => {
555 tracing::info!(module = entry.name, "Stopped module");
556 }
557 }
558 }
559 }
560
561 async fn run_stop_phase(&self) -> Result<(), RegistryError> {
567 tracing::info!("Phase: stop");
568
569 for e in self.registry.modules().iter().rev() {
570 Self::stop_one_module(e, self.cancel.clone()).await;
571 }
572
573 Ok(())
574 }
575
576 async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
581 let oop_opts = match &self.oop_options {
582 Some(opts) if !opts.modules.is_empty() => opts,
583 _ => return Ok(()),
584 };
585
586 tracing::info!("Phase: oop_spawn");
587
588 let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
590
591 for module_cfg in &oop_opts.modules {
592 let mut env = module_cfg.env.clone();
595 env.insert(
596 MODKIT_MODULE_CONFIG_ENV.to_owned(),
597 module_cfg.rendered_config_json.clone(),
598 );
599 if let Some(ref endpoint) = directory_endpoint {
600 env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
601 }
602
603 let args = module_cfg.args.clone();
605
606 let spawn_config = OopSpawnConfig {
607 module_name: module_cfg.module_name.clone(),
608 binary: module_cfg.binary.clone(),
609 args,
610 env,
611 working_directory: module_cfg.working_directory.clone(),
612 };
613
614 oop_opts
615 .backend
616 .spawn(spawn_config)
617 .await
618 .map_err(|e| RegistryError::OopSpawn {
619 module: module_cfg.module_name.clone(),
620 source: e,
621 })?;
622
623 tracing::info!(
624 module = %module_cfg.module_name,
625 directory_endpoint = ?directory_endpoint,
626 "Spawned OoP module via backend"
627 );
628 }
629
630 Ok(())
631 }
632
633 async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
638 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
639 const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
640
641 let grpc_hub = self
643 .registry
644 .modules()
645 .iter()
646 .find_map(|e| e.caps.query::<GrpcHubCap>());
647
648 let Some(hub) = grpc_hub else {
649 return None; };
651
652 let start = std::time::Instant::now();
653
654 loop {
655 if let Some(endpoint) = hub.bound_endpoint() {
656 tracing::debug!(
657 endpoint = %endpoint,
658 elapsed_ms = start.elapsed().as_millis(),
659 "gRPC hub endpoint available"
660 );
661 return Some(endpoint);
662 }
663
664 if start.elapsed() > MAX_WAIT {
665 tracing::warn!("Timed out waiting for gRPC hub to bind");
666 return None;
667 }
668
669 tokio::time::sleep(POLL_INTERVAL).await;
670 }
671 }
672
673 pub async fn run_module_phases(self) -> anyhow::Result<()> {
682 self.run_phases_internal(RunMode::Full).await
683 }
684
685 pub async fn run_migration_phases(self) -> anyhow::Result<()> {
695 self.run_phases_internal(RunMode::MigrateOnly).await
696 }
697
698 async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
721 match mode {
723 RunMode::Full => {
724 tracing::info!("Running full lifecycle (all phases)");
725 }
726 RunMode::MigrateOnly => {
727 tracing::info!("Running in migration mode (pre-init + db phases only)");
728 }
729 }
730
731 self.run_pre_init_phase()?;
733
734 #[cfg(feature = "db")]
736 {
737 self.run_db_phase().await?;
738 }
739 #[cfg(not(feature = "db"))]
740 {
741 }
743
744 if mode == RunMode::MigrateOnly {
746 tracing::info!("Migration phases completed successfully");
747 return Ok(());
748 }
749
750 self.run_init_phase().await?;
752
753 self.run_post_init_phase().await?;
755
756 let _router = self.run_rest_phase().await?;
758
759 self.run_grpc_phase().await?;
761
762 self.run_start_phase().await?;
764
765 self.run_oop_spawn_phase().await?;
767
768 self.cancel.cancelled().await;
770
771 self.run_stop_phase().await?;
773
774 Ok(())
775 }
776}
777
778#[cfg(test)]
779#[cfg_attr(coverage_nightly, coverage(off))]
780mod tests {
781 use super::*;
782 use crate::context::ModuleCtx;
783 use crate::contracts::{Module, RunnableCapability, SystemCapability};
784 use crate::registry::RegistryBuilder;
785 use std::sync::Arc;
786 use std::sync::atomic::{AtomicUsize, Ordering};
787 use tokio::sync::Mutex;
788
789 #[derive(Default)]
790 #[allow(dead_code)]
791 struct DummyCore;
792 #[async_trait::async_trait]
793 impl Module for DummyCore {
794 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
795 Ok(())
796 }
797 }
798
799 struct StopOrderTracker {
800 my_order: usize,
801 stop_order: Arc<AtomicUsize>,
802 }
803
804 impl StopOrderTracker {
805 fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
806 let my_order = counter.fetch_add(1, Ordering::SeqCst);
807 Self {
808 my_order,
809 stop_order,
810 }
811 }
812 }
813
814 #[async_trait::async_trait]
815 impl Module for StopOrderTracker {
816 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
817 Ok(())
818 }
819 }
820
821 #[async_trait::async_trait]
822 impl RunnableCapability for StopOrderTracker {
823 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
824 Ok(())
825 }
826 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
827 let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
828 tracing::info!(
829 my_order = self.my_order,
830 stop_order = order,
831 "Module stopped"
832 );
833 Ok(())
834 }
835 }
836
837 #[tokio::test]
838 async fn test_stop_phase_reverse_order() {
839 let counter = Arc::new(AtomicUsize::new(0));
840 let stop_order = Arc::new(AtomicUsize::new(0));
841
842 let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
843 let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
844 let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
845
846 let mut builder = RegistryBuilder::default();
847 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
848 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
849 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
850
851 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
852 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
853 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
854
855 let registry = builder.build_topo_sorted().unwrap();
856
857 let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
859 assert_eq!(module_names, vec!["a", "b", "c"]);
860
861 let client_hub = Arc::new(ClientHub::new());
862 let cancel = CancellationToken::new();
863 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
864
865 let runtime = HostRuntime::new(
866 registry,
867 config_provider,
868 DbOptions::None,
869 client_hub,
870 cancel.clone(),
871 Uuid::new_v4(),
872 None,
873 );
874
875 runtime.run_stop_phase().await.unwrap();
877
878 assert_eq!(stop_order.load(Ordering::SeqCst), 3);
882 }
883
884 #[tokio::test]
885 async fn test_stop_phase_continues_on_error() {
886 struct FailingModule {
887 should_fail: bool,
888 stopped: Arc<AtomicUsize>,
889 }
890
891 #[async_trait::async_trait]
892 impl Module for FailingModule {
893 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
894 Ok(())
895 }
896 }
897
898 #[async_trait::async_trait]
899 impl RunnableCapability for FailingModule {
900 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
901 Ok(())
902 }
903 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
904 self.stopped.fetch_add(1, Ordering::SeqCst);
905 if self.should_fail {
906 anyhow::bail!("Intentional failure")
907 }
908 Ok(())
909 }
910 }
911
912 let stopped = Arc::new(AtomicUsize::new(0));
913 let module_a = Arc::new(FailingModule {
914 should_fail: false,
915 stopped: stopped.clone(),
916 });
917 let module_b = Arc::new(FailingModule {
918 should_fail: true,
919 stopped: stopped.clone(),
920 });
921 let module_c = Arc::new(FailingModule {
922 should_fail: false,
923 stopped: stopped.clone(),
924 });
925
926 let mut builder = RegistryBuilder::default();
927 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
928 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
929 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
930
931 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
932 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
933 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
934
935 let registry = builder.build_topo_sorted().unwrap();
936
937 let client_hub = Arc::new(ClientHub::new());
938 let cancel = CancellationToken::new();
939 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
940
941 let runtime = HostRuntime::new(
942 registry,
943 config_provider,
944 DbOptions::None,
945 client_hub,
946 cancel.clone(),
947 Uuid::new_v4(),
948 None,
949 );
950
951 runtime.run_stop_phase().await.unwrap();
953
954 assert_eq!(stopped.load(Ordering::SeqCst), 3);
956 }
957
958 struct EmptyConfigProvider;
959 impl ConfigProvider for EmptyConfigProvider {
960 fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
961 None
962 }
963 }
964
965 #[tokio::test]
966 async fn test_post_init_runs_after_all_init_and_system_first() {
967 #[derive(Clone)]
968 struct TrackHooks {
969 name: &'static str,
970 events: Arc<Mutex<Vec<String>>>,
971 }
972
973 #[async_trait::async_trait]
974 impl Module for TrackHooks {
975 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
976 self.events.lock().await.push(format!("init:{}", self.name));
977 Ok(())
978 }
979 }
980
981 #[async_trait::async_trait]
982 impl SystemCapability for TrackHooks {
983 fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
984 Ok(())
985 }
986
987 async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
988 self.events
989 .lock()
990 .await
991 .push(format!("post_init:{}", self.name));
992 Ok(())
993 }
994 }
995
996 let events = Arc::new(Mutex::new(Vec::<String>::new()));
997 let sys_a = Arc::new(TrackHooks {
998 name: "sys_a",
999 events: events.clone(),
1000 });
1001 let user_b = Arc::new(TrackHooks {
1002 name: "user_b",
1003 events: events.clone(),
1004 });
1005 let user_c = Arc::new(TrackHooks {
1006 name: "user_c",
1007 events: events.clone(),
1008 });
1009
1010 let mut builder = RegistryBuilder::default();
1011 builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1012 builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1013 builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1014 builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1015
1016 let registry = builder.build_topo_sorted().unwrap();
1017
1018 let client_hub = Arc::new(ClientHub::new());
1019 let cancel = CancellationToken::new();
1020 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1021
1022 let runtime = HostRuntime::new(
1023 registry,
1024 config_provider,
1025 DbOptions::None,
1026 client_hub,
1027 cancel,
1028 Uuid::new_v4(),
1029 None,
1030 );
1031
1032 runtime.run_init_phase().await.unwrap();
1034 runtime.run_post_init_phase().await.unwrap();
1035
1036 let events = events.lock().await.clone();
1037 let first_post_init = events
1038 .iter()
1039 .position(|e| e.starts_with("post_init:"))
1040 .expect("expected post_init events");
1041 assert!(
1042 events[..first_post_init]
1043 .iter()
1044 .all(|e| e.starts_with("init:")),
1045 "expected all init events before post_init, got: {events:?}"
1046 );
1047
1048 assert_eq!(
1050 events,
1051 vec![
1052 "init:sys_a",
1053 "init:user_b",
1054 "init:user_c",
1055 "post_init:sys_a",
1056 ]
1057 );
1058 }
1059}