1use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19use tokio_util::sync::CancellationToken;
20use uuid::Uuid;
21
22use crate::backends::OopSpawnConfig;
23use crate::client_hub::ClientHub;
24use crate::config::ConfigProvider;
25use crate::context::ModuleContextBuilder;
26use crate::registry::{
27 ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
28 SystemCap,
29};
30use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
31
32#[cfg(feature = "db")]
33use crate::registry::DatabaseCap;
34
35#[derive(Clone)]
37pub enum DbOptions {
38 None,
40 #[cfg(feature = "db")]
42 Manager(Arc<modkit_db::DbManager>),
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum RunMode {
48 Full,
50 MigrateOnly,
52}
53
54pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
56
57pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
59
60pub struct HostRuntime {
64 registry: ModuleRegistry,
65 ctx_builder: ModuleContextBuilder,
66 instance_id: Uuid,
67 module_manager: Arc<ModuleManager>,
68 grpc_installers: Arc<GrpcInstallerStore>,
69 #[allow(dead_code)]
70 client_hub: Arc<ClientHub>,
71 cancel: CancellationToken,
72 #[allow(dead_code)]
73 db_options: DbOptions,
74 oop_options: Option<OopSpawnOptions>,
76}
77
78impl HostRuntime {
79 pub fn new(
83 registry: ModuleRegistry,
84 modules_cfg: Arc<dyn ConfigProvider>,
85 db_options: DbOptions,
86 client_hub: Arc<ClientHub>,
87 cancel: CancellationToken,
88 instance_id: Uuid,
89 oop_options: Option<OopSpawnOptions>,
90 ) -> Self {
91 let module_manager = Arc::new(ModuleManager::new());
93 let grpc_installers = Arc::new(GrpcInstallerStore::new());
94
95 let db_manager = match &db_options {
97 #[cfg(feature = "db")]
98 DbOptions::Manager(mgr) => Some(mgr.clone()),
99 DbOptions::None => None,
100 };
101
102 let ctx_builder = ModuleContextBuilder::new(
103 instance_id,
104 modules_cfg,
105 client_hub.clone(),
106 cancel.clone(),
107 db_manager,
108 );
109
110 Self {
111 registry,
112 ctx_builder,
113 instance_id,
114 module_manager,
115 grpc_installers,
116 client_hub,
117 cancel,
118 db_options,
119 oop_options,
120 }
121 }
122
123 pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
130 tracing::info!("Phase: pre_init");
131
132 let sys_ctx = SystemContext::new(
133 self.instance_id,
134 Arc::clone(&self.module_manager),
135 Arc::clone(&self.grpc_installers),
136 );
137
138 for entry in self.registry.modules() {
139 if self.cancel.is_cancelled() {
141 tracing::warn!("Pre-init phase cancelled by signal");
142 return Err(RegistryError::Cancelled);
143 }
144
145 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
146 tracing::debug!(module = entry.name, "Running system pre_init");
147 sys_mod
148 .pre_init(&sys_ctx)
149 .map_err(|e| RegistryError::PreInit {
150 module: entry.name,
151 source: e,
152 })?;
153 }
154 }
155
156 Ok(())
157 }
158
159 async fn module_context(
161 &self,
162 module_name: &'static str,
163 ) -> Result<crate::context::ModuleCtx, RegistryError> {
164 self.ctx_builder
165 .for_module(module_name)
166 .await
167 .map_err(|e| RegistryError::DbMigrate {
168 module: module_name,
169 source: e,
170 })
171 }
172
173 #[cfg(feature = "db")]
175 async fn db_migration_target(
176 &self,
177 module_name: &'static str,
178 ctx: &crate::context::ModuleCtx,
179 db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
180 ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
181 {
182 let Some(dbm) = db_module else {
183 return Ok(None);
184 };
185
186 let db = match &self.db_options {
190 DbOptions::None => None,
191 #[cfg(feature = "db")]
192 DbOptions::Manager(mgr) => {
193 mgr.get(module_name)
194 .await
195 .map_err(|e| RegistryError::DbMigrate {
196 module: module_name,
197 source: e.into(),
198 })?
199 }
200 };
201
202 let _ = ctx; Ok(db.map(|db| (db, dbm)))
204 }
205
206 #[cfg(feature = "db")]
211 async fn migrate_module(
212 module_name: &'static str,
213 db: &modkit_db::Db,
214 db_module: Arc<dyn crate::contracts::DatabaseCapability>,
215 ) -> Result<(), RegistryError> {
216 let migrations = db_module.migrations();
218
219 if migrations.is_empty() {
220 tracing::debug!(module = module_name, "No migrations to run");
221 return Ok(());
222 }
223
224 tracing::debug!(
225 module = module_name,
226 count = migrations.len(),
227 "Running DB migrations"
228 );
229
230 let result =
232 modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
233 .await
234 .map_err(|e| RegistryError::DbMigrate {
235 module: module_name,
236 source: anyhow::Error::new(e),
237 })?;
238
239 tracing::info!(
240 module = module_name,
241 applied = result.applied,
242 skipped = result.skipped,
243 "DB migrations completed"
244 );
245
246 Ok(())
247 }
248
249 #[cfg(feature = "db")]
258 async fn run_db_phase(&self) -> Result<(), RegistryError> {
259 tracing::info!("Phase: db (before init)");
260
261 for entry in self.registry.modules_by_system_priority() {
262 if self.cancel.is_cancelled() {
264 tracing::warn!("DB migration phase cancelled by signal");
265 return Err(RegistryError::Cancelled);
266 }
267
268 let ctx = self.module_context(entry.name).await?;
269 let db_module = entry.caps.query::<DatabaseCap>();
270
271 match self
272 .db_migration_target(entry.name, &ctx, db_module.clone())
273 .await?
274 {
275 Some((db, dbm)) => {
276 Self::migrate_module(entry.name, &db, dbm).await?;
277 }
278 None if db_module.is_some() => {
279 tracing::debug!(
280 module = entry.name,
281 "Module has DbModule trait but no DB handle (no config)"
282 );
283 }
284 None => {}
285 }
286 }
287
288 Ok(())
289 }
290
291 async fn run_init_phase(&self) -> Result<(), RegistryError> {
295 tracing::info!("Phase: init");
296
297 for entry in self.registry.modules_by_system_priority() {
298 let ctx =
299 self.ctx_builder
300 .for_module(entry.name)
301 .await
302 .map_err(|e| RegistryError::Init {
303 module: entry.name,
304 source: e,
305 })?;
306 entry
307 .core
308 .init(&ctx)
309 .await
310 .map_err(|e| RegistryError::Init {
311 module: entry.name,
312 source: e,
313 })?;
314 }
315
316 Ok(())
317 }
318
319 async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
326 tracing::info!("Phase: post_init");
327
328 let sys_ctx = SystemContext::new(
329 self.instance_id,
330 Arc::clone(&self.module_manager),
331 Arc::clone(&self.grpc_installers),
332 );
333
334 for entry in self.registry.modules_by_system_priority() {
335 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
336 sys_mod
337 .post_init(&sys_ctx)
338 .await
339 .map_err(|e| RegistryError::PostInit {
340 module: entry.name,
341 source: e,
342 })?;
343 }
344 }
345
346 Ok(())
347 }
348
349 async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
356 tracing::info!("Phase: rest (sync)");
357
358 let mut router = Router::new();
359
360 let host_count = self
362 .registry
363 .modules()
364 .iter()
365 .filter(|e| e.caps.has::<ApiGatewayCap>())
366 .count();
367
368 match host_count {
369 0 => {
370 return if self
371 .registry
372 .modules()
373 .iter()
374 .any(|e| e.caps.has::<RestApiCap>())
375 {
376 Err(RegistryError::RestRequiresHost)
377 } else {
378 Ok(router)
379 };
380 }
381 1 => { }
382 _ => return Err(RegistryError::MultipleRestHosts),
383 }
384
385 let host_idx = self
387 .registry
388 .modules()
389 .iter()
390 .position(|e| e.caps.has::<ApiGatewayCap>())
391 .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
392 let host_entry = &self.registry.modules()[host_idx];
393 let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
394 return Err(RegistryError::RestHostMissingFromEntry);
395 };
396 let host_ctx = self
397 .ctx_builder
398 .for_module(host_entry.name)
399 .await
400 .map_err(|e| RegistryError::RestPrepare {
401 module: host_entry.name,
402 source: e,
403 })?;
404
405 let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
407
408 router =
410 host.rest_prepare(&host_ctx, router)
411 .map_err(|source| RegistryError::RestPrepare {
412 module: host_entry.name,
413 source,
414 })?;
415
416 for e in self.registry.modules() {
418 if let Some(rest) = e.caps.query::<RestApiCap>() {
419 let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
420 RegistryError::RestRegister {
421 module: e.name,
422 source: err,
423 }
424 })?;
425 router = rest
426 .register_rest(&ctx, router, registry)
427 .map_err(|source| RegistryError::RestRegister {
428 module: e.name,
429 source,
430 })?;
431 }
432 }
433
434 router = host.rest_finalize(&host_ctx, router).map_err(|source| {
436 RegistryError::RestFinalize {
437 module: host_entry.name,
438 source,
439 }
440 })?;
441
442 Ok(router)
443 }
444
445 async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
449 tracing::info!("Phase: grpc (registration)");
450
451 if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
453 return Ok(());
454 }
455
456 if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
458 return Err(RegistryError::GrpcRequiresHub);
459 }
460
461 if let Some(hub_name) = &self.registry.grpc_hub {
463 let mut modules_data = Vec::new();
464 let mut seen = HashSet::new();
465
466 for (module_name, service_module) in &self.registry.grpc_services {
468 let ctx = self
469 .ctx_builder
470 .for_module(module_name)
471 .await
472 .map_err(|err| RegistryError::GrpcRegister {
473 module: module_name.clone(),
474 source: err,
475 })?;
476
477 let installers =
478 service_module
479 .get_grpc_services(&ctx)
480 .await
481 .map_err(|source| RegistryError::GrpcRegister {
482 module: module_name.clone(),
483 source,
484 })?;
485
486 for reg in &installers {
487 if !seen.insert(reg.service_name) {
488 return Err(RegistryError::GrpcRegister {
489 module: module_name.clone(),
490 source: anyhow::anyhow!(
491 "Duplicate gRPC service name: {}",
492 reg.service_name
493 ),
494 });
495 }
496 }
497
498 modules_data.push(crate::runtime::ModuleInstallers {
499 module_name: module_name.clone(),
500 installers,
501 });
502 }
503
504 self.grpc_installers
505 .set(crate::runtime::GrpcInstallerData {
506 modules: modules_data,
507 })
508 .map_err(|source| RegistryError::GrpcRegister {
509 module: hub_name.clone(),
510 source,
511 })?;
512 }
513
514 Ok(())
515 }
516
517 async fn run_start_phase(&self) -> Result<(), RegistryError> {
521 tracing::info!("Phase: start");
522
523 for e in self.registry.modules_by_system_priority() {
524 if let Some(s) = e.caps.query::<RunnableCap>() {
525 tracing::debug!(
526 module = e.name,
527 is_system = e.caps.has::<SystemCap>(),
528 "Starting stateful module"
529 );
530 s.start(self.cancel.clone())
531 .await
532 .map_err(|source| RegistryError::Start {
533 module: e.name,
534 source,
535 })?;
536 tracing::info!(module = e.name, "Started module");
537 }
538 }
539
540 Ok(())
541 }
542
543 async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
545 if let Some(s) = entry.caps.query::<RunnableCap>() {
546 match s.stop(cancel).await {
547 Err(err) => {
548 tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
549 }
550 _ => {
551 tracing::info!(module = entry.name, "Stopped module");
552 }
553 }
554 }
555 }
556
557 async fn run_stop_phase(&self) -> Result<(), RegistryError> {
563 tracing::info!("Phase: stop");
564
565 for e in self.registry.modules().iter().rev() {
566 Self::stop_one_module(e, self.cancel.clone()).await;
567 }
568
569 Ok(())
570 }
571
572 async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
577 let oop_opts = match &self.oop_options {
578 Some(opts) if !opts.modules.is_empty() => opts,
579 _ => return Ok(()),
580 };
581
582 tracing::info!("Phase: oop_spawn");
583
584 let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
586
587 for module_cfg in &oop_opts.modules {
588 let mut env = module_cfg.env.clone();
591 env.insert(
592 MODKIT_MODULE_CONFIG_ENV.to_owned(),
593 module_cfg.rendered_config_json.clone(),
594 );
595 if let Some(ref endpoint) = directory_endpoint {
596 env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
597 }
598
599 let args = module_cfg.args.clone();
601
602 let spawn_config = OopSpawnConfig {
603 module_name: module_cfg.module_name.clone(),
604 binary: module_cfg.binary.clone(),
605 args,
606 env,
607 working_directory: module_cfg.working_directory.clone(),
608 };
609
610 oop_opts
611 .backend
612 .spawn(spawn_config)
613 .await
614 .map_err(|e| RegistryError::OopSpawn {
615 module: module_cfg.module_name.clone(),
616 source: e,
617 })?;
618
619 tracing::info!(
620 module = %module_cfg.module_name,
621 directory_endpoint = ?directory_endpoint,
622 "Spawned OoP module via backend"
623 );
624 }
625
626 Ok(())
627 }
628
629 async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
634 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
635 const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
636
637 let grpc_hub = self
639 .registry
640 .modules()
641 .iter()
642 .find_map(|e| e.caps.query::<GrpcHubCap>());
643
644 let Some(hub) = grpc_hub else {
645 return None; };
647
648 let start = std::time::Instant::now();
649
650 loop {
651 if let Some(endpoint) = hub.bound_endpoint() {
652 tracing::debug!(
653 endpoint = %endpoint,
654 elapsed_ms = start.elapsed().as_millis(),
655 "gRPC hub endpoint available"
656 );
657 return Some(endpoint);
658 }
659
660 if start.elapsed() > MAX_WAIT {
661 tracing::warn!("Timed out waiting for gRPC hub to bind");
662 return None;
663 }
664
665 tokio::time::sleep(POLL_INTERVAL).await;
666 }
667 }
668
669 pub async fn run_module_phases(self) -> anyhow::Result<()> {
678 self.run_phases_internal(RunMode::Full).await
679 }
680
681 pub async fn run_migration_phases(self) -> anyhow::Result<()> {
691 self.run_phases_internal(RunMode::MigrateOnly).await
692 }
693
694 async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
717 match mode {
719 RunMode::Full => {
720 tracing::info!("Running full lifecycle (all phases)");
721 }
722 RunMode::MigrateOnly => {
723 tracing::info!("Running in migration mode (pre-init + db phases only)");
724 }
725 }
726
727 self.run_pre_init_phase()?;
729
730 #[cfg(feature = "db")]
732 {
733 self.run_db_phase().await?;
734 }
735 #[cfg(not(feature = "db"))]
736 {
737 }
739
740 if mode == RunMode::MigrateOnly {
742 tracing::info!("Migration phases completed successfully");
743 return Ok(());
744 }
745
746 self.run_init_phase().await?;
748
749 self.run_post_init_phase().await?;
751
752 let _router = self.run_rest_phase().await?;
754
755 self.run_grpc_phase().await?;
757
758 self.run_start_phase().await?;
760
761 self.run_oop_spawn_phase().await?;
763
764 self.cancel.cancelled().await;
766
767 self.run_stop_phase().await?;
769
770 Ok(())
771 }
772}
773
774#[cfg(test)]
775#[cfg_attr(coverage_nightly, coverage(off))]
776mod tests {
777 use super::*;
778 use crate::context::ModuleCtx;
779 use crate::contracts::{Module, RunnableCapability, SystemCapability};
780 use crate::registry::RegistryBuilder;
781 use std::sync::Arc;
782 use std::sync::atomic::{AtomicUsize, Ordering};
783 use tokio::sync::Mutex;
784
785 #[derive(Default)]
786 #[allow(dead_code)]
787 struct DummyCore;
788 #[async_trait::async_trait]
789 impl Module for DummyCore {
790 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
791 Ok(())
792 }
793 }
794
795 struct StopOrderTracker {
796 my_order: usize,
797 stop_order: Arc<AtomicUsize>,
798 }
799
800 impl StopOrderTracker {
801 fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
802 let my_order = counter.fetch_add(1, Ordering::SeqCst);
803 Self {
804 my_order,
805 stop_order,
806 }
807 }
808 }
809
810 #[async_trait::async_trait]
811 impl Module for StopOrderTracker {
812 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
813 Ok(())
814 }
815 }
816
817 #[async_trait::async_trait]
818 impl RunnableCapability for StopOrderTracker {
819 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
820 Ok(())
821 }
822 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
823 let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
824 tracing::info!(
825 my_order = self.my_order,
826 stop_order = order,
827 "Module stopped"
828 );
829 Ok(())
830 }
831 }
832
833 #[tokio::test]
834 async fn test_stop_phase_reverse_order() {
835 let counter = Arc::new(AtomicUsize::new(0));
836 let stop_order = Arc::new(AtomicUsize::new(0));
837
838 let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
839 let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
840 let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
841
842 let mut builder = RegistryBuilder::default();
843 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
844 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
845 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
846
847 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
848 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
849 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
850
851 let registry = builder.build_topo_sorted().unwrap();
852
853 let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
855 assert_eq!(module_names, vec!["a", "b", "c"]);
856
857 let client_hub = Arc::new(ClientHub::new());
858 let cancel = CancellationToken::new();
859 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
860
861 let runtime = HostRuntime::new(
862 registry,
863 config_provider,
864 DbOptions::None,
865 client_hub,
866 cancel.clone(),
867 Uuid::new_v4(),
868 None,
869 );
870
871 runtime.run_stop_phase().await.unwrap();
873
874 assert_eq!(stop_order.load(Ordering::SeqCst), 3);
878 }
879
880 #[tokio::test]
881 async fn test_stop_phase_continues_on_error() {
882 struct FailingModule {
883 should_fail: bool,
884 stopped: Arc<AtomicUsize>,
885 }
886
887 #[async_trait::async_trait]
888 impl Module for FailingModule {
889 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
890 Ok(())
891 }
892 }
893
894 #[async_trait::async_trait]
895 impl RunnableCapability for FailingModule {
896 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
897 Ok(())
898 }
899 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
900 self.stopped.fetch_add(1, Ordering::SeqCst);
901 if self.should_fail {
902 anyhow::bail!("Intentional failure")
903 }
904 Ok(())
905 }
906 }
907
908 let stopped = Arc::new(AtomicUsize::new(0));
909 let module_a = Arc::new(FailingModule {
910 should_fail: false,
911 stopped: stopped.clone(),
912 });
913 let module_b = Arc::new(FailingModule {
914 should_fail: true,
915 stopped: stopped.clone(),
916 });
917 let module_c = Arc::new(FailingModule {
918 should_fail: false,
919 stopped: stopped.clone(),
920 });
921
922 let mut builder = RegistryBuilder::default();
923 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
924 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
925 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
926
927 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
928 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
929 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
930
931 let registry = builder.build_topo_sorted().unwrap();
932
933 let client_hub = Arc::new(ClientHub::new());
934 let cancel = CancellationToken::new();
935 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
936
937 let runtime = HostRuntime::new(
938 registry,
939 config_provider,
940 DbOptions::None,
941 client_hub,
942 cancel.clone(),
943 Uuid::new_v4(),
944 None,
945 );
946
947 runtime.run_stop_phase().await.unwrap();
949
950 assert_eq!(stopped.load(Ordering::SeqCst), 3);
952 }
953
954 struct EmptyConfigProvider;
955 impl ConfigProvider for EmptyConfigProvider {
956 fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
957 None
958 }
959 }
960
961 #[tokio::test]
962 async fn test_post_init_runs_after_all_init_and_system_first() {
963 #[derive(Clone)]
964 struct TrackHooks {
965 name: &'static str,
966 events: Arc<Mutex<Vec<String>>>,
967 }
968
969 #[async_trait::async_trait]
970 impl Module for TrackHooks {
971 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
972 self.events.lock().await.push(format!("init:{}", self.name));
973 Ok(())
974 }
975 }
976
977 #[async_trait::async_trait]
978 impl SystemCapability for TrackHooks {
979 fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
980 Ok(())
981 }
982
983 async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
984 self.events
985 .lock()
986 .await
987 .push(format!("post_init:{}", self.name));
988 Ok(())
989 }
990 }
991
992 let events = Arc::new(Mutex::new(Vec::<String>::new()));
993 let sys_a = Arc::new(TrackHooks {
994 name: "sys_a",
995 events: events.clone(),
996 });
997 let user_b = Arc::new(TrackHooks {
998 name: "user_b",
999 events: events.clone(),
1000 });
1001 let user_c = Arc::new(TrackHooks {
1002 name: "user_c",
1003 events: events.clone(),
1004 });
1005
1006 let mut builder = RegistryBuilder::default();
1007 builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1008 builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1009 builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1010 builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1011
1012 let registry = builder.build_topo_sorted().unwrap();
1013
1014 let client_hub = Arc::new(ClientHub::new());
1015 let cancel = CancellationToken::new();
1016 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1017
1018 let runtime = HostRuntime::new(
1019 registry,
1020 config_provider,
1021 DbOptions::None,
1022 client_hub,
1023 cancel,
1024 Uuid::new_v4(),
1025 None,
1026 );
1027
1028 runtime.run_init_phase().await.unwrap();
1030 runtime.run_post_init_phase().await.unwrap();
1031
1032 let events = events.lock().await.clone();
1033 let first_post_init = events
1034 .iter()
1035 .position(|e| e.starts_with("post_init:"))
1036 .expect("expected post_init events");
1037 assert!(
1038 events[..first_post_init]
1039 .iter()
1040 .all(|e| e.starts_with("init:")),
1041 "expected all init events before post_init, got: {events:?}"
1042 );
1043
1044 assert_eq!(
1046 events,
1047 vec![
1048 "init:sys_a",
1049 "init:user_b",
1050 "init:user_c",
1051 "post_init:sys_a",
1052 ]
1053 );
1054 }
1055}