1use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28 ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29 SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36#[derive(Clone)]
38pub enum DbOptions {
39 None,
41 #[cfg(feature = "db")]
43 Manager(Arc<modkit_db::DbManager>),
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49 Full,
51 MigrateOnly,
53}
54
55pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61pub struct HostRuntime {
65 registry: ModuleRegistry,
66 ctx_builder: ModuleContextBuilder,
67 instance_id: Uuid,
68 module_manager: Arc<ModuleManager>,
69 grpc_installers: Arc<GrpcInstallerStore>,
70 #[allow(dead_code)]
71 client_hub: Arc<ClientHub>,
72 cancel: CancellationToken,
73 #[allow(dead_code)]
74 db_options: DbOptions,
75 oop_options: Option<OopSpawnOptions>,
77}
78
79impl HostRuntime {
80 pub fn new(
84 registry: ModuleRegistry,
85 modules_cfg: Arc<dyn ConfigProvider>,
86 db_options: DbOptions,
87 client_hub: Arc<ClientHub>,
88 cancel: CancellationToken,
89 instance_id: Uuid,
90 oop_options: Option<OopSpawnOptions>,
91 ) -> Self {
92 let module_manager = Arc::new(ModuleManager::new());
94 let grpc_installers = Arc::new(GrpcInstallerStore::new());
95
96 let db_manager = match &db_options {
98 #[cfg(feature = "db")]
99 DbOptions::Manager(mgr) => Some(mgr.clone()),
100 DbOptions::None => None,
101 };
102
103 let ctx_builder = ModuleContextBuilder::new(
104 instance_id,
105 modules_cfg,
106 client_hub.clone(),
107 cancel.clone(),
108 db_manager,
109 );
110
111 Self {
112 registry,
113 ctx_builder,
114 instance_id,
115 module_manager,
116 grpc_installers,
117 client_hub,
118 cancel,
119 db_options,
120 oop_options,
121 }
122 }
123
124 pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
131 tracing::info!("Phase: pre_init");
132
133 let sys_ctx = SystemContext::new(
134 self.instance_id,
135 Arc::clone(&self.module_manager),
136 Arc::clone(&self.grpc_installers),
137 );
138
139 for entry in self.registry.modules() {
140 if self.cancel.is_cancelled() {
142 tracing::warn!("Pre-init phase cancelled by signal");
143 return Err(RegistryError::Cancelled);
144 }
145
146 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
147 tracing::debug!(module = entry.name, "Running system pre_init");
148 sys_mod
149 .pre_init(&sys_ctx)
150 .map_err(|e| RegistryError::PreInit {
151 module: entry.name,
152 source: e,
153 })?;
154 }
155 }
156
157 Ok(())
158 }
159
160 async fn module_context(
162 &self,
163 module_name: &'static str,
164 ) -> Result<crate::context::ModuleCtx, RegistryError> {
165 self.ctx_builder
166 .for_module(module_name)
167 .await
168 .map_err(|e| RegistryError::DbMigrate {
169 module: module_name,
170 source: e,
171 })
172 }
173
174 #[cfg(feature = "db")]
176 async fn db_migration_target(
177 &self,
178 module_name: &'static str,
179 ctx: &crate::context::ModuleCtx,
180 db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
181 ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
182 {
183 let Some(dbm) = db_module else {
184 return Ok(None);
185 };
186
187 let db = match &self.db_options {
191 DbOptions::None => None,
192 #[cfg(feature = "db")]
193 DbOptions::Manager(mgr) => {
194 mgr.get(module_name)
195 .await
196 .map_err(|e| RegistryError::DbMigrate {
197 module: module_name,
198 source: e.into(),
199 })?
200 }
201 };
202
203 _ = ctx; Ok(db.map(|db| (db, dbm)))
205 }
206
207 #[cfg(feature = "db")]
212 async fn migrate_module(
213 module_name: &'static str,
214 db: &modkit_db::Db,
215 db_module: Arc<dyn crate::contracts::DatabaseCapability>,
216 ) -> Result<(), RegistryError> {
217 let migrations = db_module.migrations();
219
220 if migrations.is_empty() {
221 tracing::debug!(module = module_name, "No migrations to run");
222 return Ok(());
223 }
224
225 tracing::debug!(
226 module = module_name,
227 count = migrations.len(),
228 "Running DB migrations"
229 );
230
231 let result =
233 modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
234 .await
235 .map_err(|e| RegistryError::DbMigrate {
236 module: module_name,
237 source: anyhow::Error::new(e),
238 })?;
239
240 tracing::info!(
241 module = module_name,
242 applied = result.applied,
243 skipped = result.skipped,
244 "DB migrations completed"
245 );
246
247 Ok(())
248 }
249
250 #[cfg(feature = "db")]
259 async fn run_db_phase(&self) -> Result<(), RegistryError> {
260 tracing::info!("Phase: db (before init)");
261
262 for entry in self.registry.modules_by_system_priority() {
263 if self.cancel.is_cancelled() {
265 tracing::warn!("DB migration phase cancelled by signal");
266 return Err(RegistryError::Cancelled);
267 }
268
269 let ctx = self.module_context(entry.name).await?;
270 let db_module = entry.caps.query::<DatabaseCap>();
271
272 match self
273 .db_migration_target(entry.name, &ctx, db_module.clone())
274 .await?
275 {
276 Some((db, dbm)) => {
277 Self::migrate_module(entry.name, &db, dbm).await?;
278 }
279 None if db_module.is_some() => {
280 tracing::debug!(
281 module = entry.name,
282 "Module has DbModule trait but no DB handle (no config)"
283 );
284 }
285 None => {}
286 }
287 }
288
289 Ok(())
290 }
291
292 async fn run_init_phase(&self) -> Result<(), RegistryError> {
296 tracing::info!("Phase: init");
297
298 for entry in self.registry.modules_by_system_priority() {
299 let ctx =
300 self.ctx_builder
301 .for_module(entry.name)
302 .await
303 .map_err(|e| RegistryError::Init {
304 module: entry.name,
305 source: e,
306 })?;
307 entry
308 .core
309 .init(&ctx)
310 .await
311 .map_err(|e| RegistryError::Init {
312 module: entry.name,
313 source: e,
314 })?;
315 }
316
317 Ok(())
318 }
319
320 async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
327 tracing::info!("Phase: post_init");
328
329 let sys_ctx = SystemContext::new(
330 self.instance_id,
331 Arc::clone(&self.module_manager),
332 Arc::clone(&self.grpc_installers),
333 );
334
335 for entry in self.registry.modules_by_system_priority() {
336 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
337 sys_mod
338 .post_init(&sys_ctx)
339 .await
340 .map_err(|e| RegistryError::PostInit {
341 module: entry.name,
342 source: e,
343 })?;
344 }
345 }
346
347 Ok(())
348 }
349
350 async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
357 tracing::info!("Phase: rest (sync)");
358
359 let mut router = Router::new();
360
361 let host_count = self
363 .registry
364 .modules()
365 .iter()
366 .filter(|e| e.caps.has::<ApiGatewayCap>())
367 .count();
368
369 match host_count {
370 0 => {
371 return if self
372 .registry
373 .modules()
374 .iter()
375 .any(|e| e.caps.has::<RestApiCap>())
376 {
377 Err(RegistryError::RestRequiresHost)
378 } else {
379 Ok(router)
380 };
381 }
382 1 => { }
383 _ => return Err(RegistryError::MultipleRestHosts),
384 }
385
386 let host_idx = self
388 .registry
389 .modules()
390 .iter()
391 .position(|e| e.caps.has::<ApiGatewayCap>())
392 .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
393 let host_entry = &self.registry.modules()[host_idx];
394 let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
395 return Err(RegistryError::RestHostMissingFromEntry);
396 };
397 let host_ctx = self
398 .ctx_builder
399 .for_module(host_entry.name)
400 .await
401 .map_err(|e| RegistryError::RestPrepare {
402 module: host_entry.name,
403 source: e,
404 })?;
405
406 let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
408
409 router =
411 host.rest_prepare(&host_ctx, router)
412 .map_err(|source| RegistryError::RestPrepare {
413 module: host_entry.name,
414 source,
415 })?;
416
417 for e in self.registry.modules() {
419 if let Some(rest) = e.caps.query::<RestApiCap>() {
420 let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
421 RegistryError::RestRegister {
422 module: e.name,
423 source: err,
424 }
425 })?;
426 router = rest
427 .register_rest(&ctx, router, registry)
428 .map_err(|source| RegistryError::RestRegister {
429 module: e.name,
430 source,
431 })?;
432 }
433 }
434
435 router = host.rest_finalize(&host_ctx, router).map_err(|source| {
437 RegistryError::RestFinalize {
438 module: host_entry.name,
439 source,
440 }
441 })?;
442
443 Ok(router)
444 }
445
446 async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
450 tracing::info!("Phase: grpc (registration)");
451
452 if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
454 return Ok(());
455 }
456
457 if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
459 return Err(RegistryError::GrpcRequiresHub);
460 }
461
462 if let Some(hub_name) = &self.registry.grpc_hub {
464 let mut modules_data = Vec::new();
465 let mut seen = HashSet::new();
466
467 for (module_name, service_module) in &self.registry.grpc_services {
469 let ctx = self
470 .ctx_builder
471 .for_module(module_name)
472 .await
473 .map_err(|err| RegistryError::GrpcRegister {
474 module: module_name.clone(),
475 source: err,
476 })?;
477
478 let installers =
479 service_module
480 .get_grpc_services(&ctx)
481 .await
482 .map_err(|source| RegistryError::GrpcRegister {
483 module: module_name.clone(),
484 source,
485 })?;
486
487 for reg in &installers {
488 if !seen.insert(reg.service_name) {
489 return Err(RegistryError::GrpcRegister {
490 module: module_name.clone(),
491 source: anyhow::anyhow!(
492 "Duplicate gRPC service name: {}",
493 reg.service_name
494 ),
495 });
496 }
497 }
498
499 modules_data.push(crate::runtime::ModuleInstallers {
500 module_name: module_name.clone(),
501 installers,
502 });
503 }
504
505 self.grpc_installers
506 .set(crate::runtime::GrpcInstallerData {
507 modules: modules_data,
508 })
509 .map_err(|source| RegistryError::GrpcRegister {
510 module: hub_name.clone(),
511 source,
512 })?;
513 }
514
515 Ok(())
516 }
517
518 async fn run_start_phase(&self) -> Result<(), RegistryError> {
522 tracing::info!("Phase: start");
523
524 for e in self.registry.modules_by_system_priority() {
525 if let Some(s) = e.caps.query::<RunnableCap>() {
526 tracing::debug!(
527 module = e.name,
528 is_system = e.caps.has::<SystemCap>(),
529 "Starting stateful module"
530 );
531 s.start(self.cancel.clone())
532 .await
533 .map_err(|source| RegistryError::Start {
534 module: e.name,
535 source,
536 })?;
537 tracing::info!(module = e.name, "Started module");
538 }
539 }
540
541 Ok(())
542 }
543
544 async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
546 if let Some(s) = entry.caps.query::<RunnableCap>() {
547 match s.stop(cancel).await {
548 Err(err) => {
549 tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
550 }
551 _ => {
552 tracing::info!(module = entry.name, "Stopped module");
553 }
554 }
555 }
556 }
557
558 async fn run_stop_phase(&self) -> Result<(), RegistryError> {
564 tracing::info!("Phase: stop");
565
566 for e in self.registry.modules().iter().rev() {
567 Self::stop_one_module(e, self.cancel.clone()).await;
568 }
569
570 Ok(())
571 }
572
573 async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
578 let oop_opts = match &self.oop_options {
579 Some(opts) if !opts.modules.is_empty() => opts,
580 _ => return Ok(()),
581 };
582
583 tracing::info!("Phase: oop_spawn");
584
585 let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
587
588 for module_cfg in &oop_opts.modules {
589 let mut env = module_cfg.env.clone();
592 env.insert(
593 MODKIT_MODULE_CONFIG_ENV.to_owned(),
594 module_cfg.rendered_config_json.clone(),
595 );
596 if let Some(ref endpoint) = directory_endpoint {
597 env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
598 }
599
600 let args = module_cfg.args.clone();
602
603 let spawn_config = OopSpawnConfig {
604 module_name: module_cfg.module_name.clone(),
605 binary: module_cfg.binary.clone(),
606 args,
607 env,
608 working_directory: module_cfg.working_directory.clone(),
609 };
610
611 oop_opts
612 .backend
613 .spawn(spawn_config)
614 .await
615 .map_err(|e| RegistryError::OopSpawn {
616 module: module_cfg.module_name.clone(),
617 source: e,
618 })?;
619
620 tracing::info!(
621 module = %module_cfg.module_name,
622 directory_endpoint = ?directory_endpoint,
623 "Spawned OoP module via backend"
624 );
625 }
626
627 Ok(())
628 }
629
630 async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
635 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
636 const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
637
638 let grpc_hub = self
640 .registry
641 .modules()
642 .iter()
643 .find_map(|e| e.caps.query::<GrpcHubCap>());
644
645 let Some(hub) = grpc_hub else {
646 return None; };
648
649 let start = std::time::Instant::now();
650
651 loop {
652 if let Some(endpoint) = hub.bound_endpoint() {
653 tracing::debug!(
654 endpoint = %endpoint,
655 elapsed_ms = start.elapsed().as_millis(),
656 "gRPC hub endpoint available"
657 );
658 return Some(endpoint);
659 }
660
661 if start.elapsed() > MAX_WAIT {
662 tracing::warn!("Timed out waiting for gRPC hub to bind");
663 return None;
664 }
665
666 tokio::time::sleep(POLL_INTERVAL).await;
667 }
668 }
669
670 pub async fn run_module_phases(self) -> anyhow::Result<()> {
679 self.run_phases_internal(RunMode::Full).await
680 }
681
682 pub async fn run_migration_phases(self) -> anyhow::Result<()> {
692 self.run_phases_internal(RunMode::MigrateOnly).await
693 }
694
695 async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
718 match mode {
720 RunMode::Full => {
721 tracing::info!("Running full lifecycle (all phases)");
722 }
723 RunMode::MigrateOnly => {
724 tracing::info!("Running in migration mode (pre-init + db phases only)");
725 }
726 }
727
728 self.run_pre_init_phase()?;
730
731 #[cfg(feature = "db")]
733 {
734 self.run_db_phase().await?;
735 }
736 #[cfg(not(feature = "db"))]
737 {
738 }
740
741 if mode == RunMode::MigrateOnly {
743 tracing::info!("Migration phases completed successfully");
744 return Ok(());
745 }
746
747 self.run_init_phase().await?;
749
750 self.run_post_init_phase().await?;
752
753 let _router = self.run_rest_phase().await?;
755
756 self.run_grpc_phase().await?;
758
759 self.run_start_phase().await?;
761
762 self.run_oop_spawn_phase().await?;
764
765 self.cancel.cancelled().await;
767
768 self.run_stop_phase().await?;
770
771 Ok(())
772 }
773}
774
775#[cfg(test)]
776#[cfg_attr(coverage_nightly, coverage(off))]
777mod tests {
778 use super::*;
779 use crate::context::ModuleCtx;
780 use crate::contracts::{Module, RunnableCapability, SystemCapability};
781 use crate::registry::RegistryBuilder;
782 use std::sync::Arc;
783 use std::sync::atomic::{AtomicUsize, Ordering};
784 use tokio::sync::Mutex;
785
786 #[derive(Default)]
787 #[allow(dead_code)]
788 struct DummyCore;
789 #[async_trait::async_trait]
790 impl Module for DummyCore {
791 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
792 Ok(())
793 }
794 }
795
796 struct StopOrderTracker {
797 my_order: usize,
798 stop_order: Arc<AtomicUsize>,
799 }
800
801 impl StopOrderTracker {
802 fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
803 let my_order = counter.fetch_add(1, Ordering::SeqCst);
804 Self {
805 my_order,
806 stop_order,
807 }
808 }
809 }
810
811 #[async_trait::async_trait]
812 impl Module for StopOrderTracker {
813 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
814 Ok(())
815 }
816 }
817
818 #[async_trait::async_trait]
819 impl RunnableCapability for StopOrderTracker {
820 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
821 Ok(())
822 }
823 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
824 let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
825 tracing::info!(
826 my_order = self.my_order,
827 stop_order = order,
828 "Module stopped"
829 );
830 Ok(())
831 }
832 }
833
834 #[tokio::test]
835 async fn test_stop_phase_reverse_order() {
836 let counter = Arc::new(AtomicUsize::new(0));
837 let stop_order = Arc::new(AtomicUsize::new(0));
838
839 let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
840 let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
841 let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
842
843 let mut builder = RegistryBuilder::default();
844 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
845 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
846 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
847
848 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
849 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
850 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
851
852 let registry = builder.build_topo_sorted().unwrap();
853
854 let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
856 assert_eq!(module_names, vec!["a", "b", "c"]);
857
858 let client_hub = Arc::new(ClientHub::new());
859 let cancel = CancellationToken::new();
860 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
861
862 let runtime = HostRuntime::new(
863 registry,
864 config_provider,
865 DbOptions::None,
866 client_hub,
867 cancel.clone(),
868 Uuid::new_v4(),
869 None,
870 );
871
872 runtime.run_stop_phase().await.unwrap();
874
875 assert_eq!(stop_order.load(Ordering::SeqCst), 3);
879 }
880
881 #[tokio::test]
882 async fn test_stop_phase_continues_on_error() {
883 struct FailingModule {
884 should_fail: bool,
885 stopped: Arc<AtomicUsize>,
886 }
887
888 #[async_trait::async_trait]
889 impl Module for FailingModule {
890 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
891 Ok(())
892 }
893 }
894
895 #[async_trait::async_trait]
896 impl RunnableCapability for FailingModule {
897 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
898 Ok(())
899 }
900 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
901 self.stopped.fetch_add(1, Ordering::SeqCst);
902 if self.should_fail {
903 anyhow::bail!("Intentional failure")
904 }
905 Ok(())
906 }
907 }
908
909 let stopped = Arc::new(AtomicUsize::new(0));
910 let module_a = Arc::new(FailingModule {
911 should_fail: false,
912 stopped: stopped.clone(),
913 });
914 let module_b = Arc::new(FailingModule {
915 should_fail: true,
916 stopped: stopped.clone(),
917 });
918 let module_c = Arc::new(FailingModule {
919 should_fail: false,
920 stopped: stopped.clone(),
921 });
922
923 let mut builder = RegistryBuilder::default();
924 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
925 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
926 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
927
928 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
929 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
930 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
931
932 let registry = builder.build_topo_sorted().unwrap();
933
934 let client_hub = Arc::new(ClientHub::new());
935 let cancel = CancellationToken::new();
936 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
937
938 let runtime = HostRuntime::new(
939 registry,
940 config_provider,
941 DbOptions::None,
942 client_hub,
943 cancel.clone(),
944 Uuid::new_v4(),
945 None,
946 );
947
948 runtime.run_stop_phase().await.unwrap();
950
951 assert_eq!(stopped.load(Ordering::SeqCst), 3);
953 }
954
955 struct EmptyConfigProvider;
956 impl ConfigProvider for EmptyConfigProvider {
957 fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
958 None
959 }
960 }
961
962 #[tokio::test]
963 async fn test_post_init_runs_after_all_init_and_system_first() {
964 #[derive(Clone)]
965 struct TrackHooks {
966 name: &'static str,
967 events: Arc<Mutex<Vec<String>>>,
968 }
969
970 #[async_trait::async_trait]
971 impl Module for TrackHooks {
972 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
973 self.events.lock().await.push(format!("init:{}", self.name));
974 Ok(())
975 }
976 }
977
978 #[async_trait::async_trait]
979 impl SystemCapability for TrackHooks {
980 fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
981 Ok(())
982 }
983
984 async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
985 self.events
986 .lock()
987 .await
988 .push(format!("post_init:{}", self.name));
989 Ok(())
990 }
991 }
992
993 let events = Arc::new(Mutex::new(Vec::<String>::new()));
994 let sys_a = Arc::new(TrackHooks {
995 name: "sys_a",
996 events: events.clone(),
997 });
998 let user_b = Arc::new(TrackHooks {
999 name: "user_b",
1000 events: events.clone(),
1001 });
1002 let user_c = Arc::new(TrackHooks {
1003 name: "user_c",
1004 events: events.clone(),
1005 });
1006
1007 let mut builder = RegistryBuilder::default();
1008 builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1009 builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1010 builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1011 builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1012
1013 let registry = builder.build_topo_sorted().unwrap();
1014
1015 let client_hub = Arc::new(ClientHub::new());
1016 let cancel = CancellationToken::new();
1017 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1018
1019 let runtime = HostRuntime::new(
1020 registry,
1021 config_provider,
1022 DbOptions::None,
1023 client_hub,
1024 cancel,
1025 Uuid::new_v4(),
1026 None,
1027 );
1028
1029 runtime.run_init_phase().await.unwrap();
1031 runtime.run_post_init_phase().await.unwrap();
1032
1033 let events = events.lock().await.clone();
1034 let first_post_init = events
1035 .iter()
1036 .position(|e| e.starts_with("post_init:"))
1037 .expect("expected post_init events");
1038 assert!(
1039 events[..first_post_init]
1040 .iter()
1041 .all(|e| e.starts_with("init:")),
1042 "expected all init events before post_init, got: {events:?}"
1043 );
1044
1045 assert_eq!(
1047 events,
1048 vec![
1049 "init:sys_a",
1050 "init:user_b",
1051 "init:user_c",
1052 "post_init:sys_a",
1053 ]
1054 );
1055 }
1056}