1use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28 ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29 SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36#[derive(Clone)]
38pub enum DbOptions {
39 None,
41 #[cfg(feature = "db")]
43 Manager(Arc<modkit_db::DbManager>),
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49 Full,
51 MigrateOnly,
53}
54
55pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61pub struct HostRuntime {
65 registry: ModuleRegistry,
66 ctx_builder: ModuleContextBuilder,
67 instance_id: Uuid,
68 module_manager: Arc<ModuleManager>,
69 grpc_installers: Arc<GrpcInstallerStore>,
70 #[allow(dead_code)]
71 client_hub: Arc<ClientHub>,
72 cancel: CancellationToken,
73 #[allow(dead_code)]
74 db_options: DbOptions,
75 oop_options: Option<OopSpawnOptions>,
77}
78
79impl HostRuntime {
80 pub fn new(
84 registry: ModuleRegistry,
85 modules_cfg: Arc<dyn ConfigProvider>,
86 db_options: DbOptions,
87 client_hub: Arc<ClientHub>,
88 cancel: CancellationToken,
89 instance_id: Uuid,
90 oop_options: Option<OopSpawnOptions>,
91 ) -> Self {
92 let module_manager = Arc::new(ModuleManager::new());
94 let grpc_installers = Arc::new(GrpcInstallerStore::new());
95
96 let db_manager = match &db_options {
98 #[cfg(feature = "db")]
99 DbOptions::Manager(mgr) => Some(mgr.clone()),
100 DbOptions::None => None,
101 };
102
103 let ctx_builder = ModuleContextBuilder::new(
104 instance_id,
105 modules_cfg,
106 client_hub.clone(),
107 cancel.clone(),
108 db_manager,
109 );
110
111 Self {
112 registry,
113 ctx_builder,
114 instance_id,
115 module_manager,
116 grpc_installers,
117 client_hub,
118 cancel,
119 db_options,
120 oop_options,
121 }
122 }
123
124 pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
131 tracing::info!("Phase: pre_init");
132
133 let sys_ctx = SystemContext::new(
134 self.instance_id,
135 Arc::clone(&self.module_manager),
136 Arc::clone(&self.grpc_installers),
137 );
138
139 for entry in self.registry.modules() {
140 if self.cancel.is_cancelled() {
142 tracing::warn!("Pre-init phase cancelled by signal");
143 return Err(RegistryError::Cancelled);
144 }
145
146 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
147 tracing::debug!(module = entry.name, "Running system pre_init");
148 sys_mod
149 .pre_init(&sys_ctx)
150 .map_err(|e| RegistryError::PreInit {
151 module: entry.name,
152 source: e,
153 })?;
154 }
155 }
156
157 Ok(())
158 }
159
160 async fn module_context(
162 &self,
163 module_name: &'static str,
164 ) -> Result<crate::context::ModuleCtx, RegistryError> {
165 self.ctx_builder
166 .for_module(module_name)
167 .await
168 .map_err(|e| RegistryError::DbMigrate {
169 module: module_name,
170 source: e,
171 })
172 }
173
174 #[cfg(feature = "db")]
176 async fn db_migration_target(
177 &self,
178 module_name: &'static str,
179 ctx: &crate::context::ModuleCtx,
180 db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
181 ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
182 {
183 let Some(dbm) = db_module else {
184 return Ok(None);
185 };
186
187 let db = match &self.db_options {
191 DbOptions::None => None,
192 #[cfg(feature = "db")]
193 DbOptions::Manager(mgr) => {
194 mgr.get(module_name)
195 .await
196 .map_err(|e| RegistryError::DbMigrate {
197 module: module_name,
198 source: e.into(),
199 })?
200 }
201 };
202
203 _ = ctx; Ok(db.map(|db| (db, dbm)))
205 }
206
207 #[cfg(feature = "db")]
212 async fn migrate_module(
213 module_name: &'static str,
214 db: &modkit_db::Db,
215 db_module: Arc<dyn crate::contracts::DatabaseCapability>,
216 ) -> Result<(), RegistryError> {
217 let migrations = db_module.migrations();
219
220 if migrations.is_empty() {
221 tracing::debug!(module = module_name, "No migrations to run");
222 return Ok(());
223 }
224
225 tracing::debug!(
226 module = module_name,
227 count = migrations.len(),
228 "Running DB migrations"
229 );
230
231 let result =
233 modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
234 .await
235 .map_err(|e| RegistryError::DbMigrate {
236 module: module_name,
237 source: anyhow::Error::new(e),
238 })?;
239
240 tracing::info!(
241 module = module_name,
242 applied = result.applied,
243 skipped = result.skipped,
244 "DB migrations completed"
245 );
246
247 Ok(())
248 }
249
250 #[cfg(feature = "db")]
259 async fn run_db_phase(&self) -> Result<(), RegistryError> {
260 tracing::info!("Phase: db (before init)");
261
262 for entry in self.registry.modules_by_system_priority() {
263 if self.cancel.is_cancelled() {
265 tracing::warn!("DB migration phase cancelled by signal");
266 return Err(RegistryError::Cancelled);
267 }
268
269 let ctx = self.module_context(entry.name).await?;
270 let db_module = entry.caps.query::<DatabaseCap>();
271
272 match self
273 .db_migration_target(entry.name, &ctx, db_module.clone())
274 .await?
275 {
276 Some((db, dbm)) => {
277 Self::migrate_module(entry.name, &db, dbm).await?;
278 }
279 None if db_module.is_some() => {
280 tracing::debug!(
281 module = entry.name,
282 "Module has DbModule trait but no DB handle (no config)"
283 );
284 }
285 None => {}
286 }
287 }
288
289 Ok(())
290 }
291
292 async fn run_init_phase(&self) -> Result<(), RegistryError> {
296 tracing::info!("Phase: init");
297
298 for entry in self.registry.modules_by_system_priority() {
299 let ctx =
300 self.ctx_builder
301 .for_module(entry.name)
302 .await
303 .map_err(|e| RegistryError::Init {
304 module: entry.name,
305 source: e,
306 })?;
307 tracing::info!(module = entry.name, "Initializing a module...");
308 entry
309 .core
310 .init(&ctx)
311 .await
312 .map_err(|e| RegistryError::Init {
313 module: entry.name,
314 source: e,
315 })?;
316 tracing::info!(module = entry.name, "Initialized a module.");
317 }
318
319 Ok(())
320 }
321
322 async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
329 tracing::info!("Phase: post_init");
330
331 let sys_ctx = SystemContext::new(
332 self.instance_id,
333 Arc::clone(&self.module_manager),
334 Arc::clone(&self.grpc_installers),
335 );
336
337 for entry in self.registry.modules_by_system_priority() {
338 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
339 sys_mod
340 .post_init(&sys_ctx)
341 .await
342 .map_err(|e| RegistryError::PostInit {
343 module: entry.name,
344 source: e,
345 })?;
346 }
347 }
348
349 Ok(())
350 }
351
352 async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
359 tracing::info!("Phase: rest (sync)");
360
361 let mut router = Router::new();
362
363 let host_count = self
365 .registry
366 .modules()
367 .iter()
368 .filter(|e| e.caps.has::<ApiGatewayCap>())
369 .count();
370
371 match host_count {
372 0 => {
373 return if self
374 .registry
375 .modules()
376 .iter()
377 .any(|e| e.caps.has::<RestApiCap>())
378 {
379 Err(RegistryError::RestRequiresHost)
380 } else {
381 Ok(router)
382 };
383 }
384 1 => { }
385 _ => return Err(RegistryError::MultipleRestHosts),
386 }
387
388 let host_idx = self
390 .registry
391 .modules()
392 .iter()
393 .position(|e| e.caps.has::<ApiGatewayCap>())
394 .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
395 let host_entry = &self.registry.modules()[host_idx];
396 let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
397 return Err(RegistryError::RestHostMissingFromEntry);
398 };
399 let host_ctx = self
400 .ctx_builder
401 .for_module(host_entry.name)
402 .await
403 .map_err(|e| RegistryError::RestPrepare {
404 module: host_entry.name,
405 source: e,
406 })?;
407
408 let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
410
411 router =
413 host.rest_prepare(&host_ctx, router)
414 .map_err(|source| RegistryError::RestPrepare {
415 module: host_entry.name,
416 source,
417 })?;
418
419 for e in self.registry.modules() {
421 if let Some(rest) = e.caps.query::<RestApiCap>() {
422 let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
423 RegistryError::RestRegister {
424 module: e.name,
425 source: err,
426 }
427 })?;
428
429 router = rest
430 .register_rest(&ctx, router, registry)
431 .map_err(|source| RegistryError::RestRegister {
432 module: e.name,
433 source,
434 })?;
435 }
436 }
437
438 router = host.rest_finalize(&host_ctx, router).map_err(|source| {
440 RegistryError::RestFinalize {
441 module: host_entry.name,
442 source,
443 }
444 })?;
445
446 Ok(router)
447 }
448
449 async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
453 tracing::info!("Phase: grpc (registration)");
454
455 if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
457 return Ok(());
458 }
459
460 if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
462 return Err(RegistryError::GrpcRequiresHub);
463 }
464
465 if let Some(hub_name) = &self.registry.grpc_hub {
467 let mut modules_data = Vec::new();
468 let mut seen = HashSet::new();
469
470 for (module_name, service_module) in &self.registry.grpc_services {
472 let ctx = self
473 .ctx_builder
474 .for_module(module_name)
475 .await
476 .map_err(|err| RegistryError::GrpcRegister {
477 module: module_name.clone(),
478 source: err,
479 })?;
480
481 let installers =
482 service_module
483 .get_grpc_services(&ctx)
484 .await
485 .map_err(|source| RegistryError::GrpcRegister {
486 module: module_name.clone(),
487 source,
488 })?;
489
490 for reg in &installers {
491 if !seen.insert(reg.service_name) {
492 return Err(RegistryError::GrpcRegister {
493 module: module_name.clone(),
494 source: anyhow::anyhow!(
495 "Duplicate gRPC service name: {}",
496 reg.service_name
497 ),
498 });
499 }
500 }
501
502 modules_data.push(crate::runtime::ModuleInstallers {
503 module_name: module_name.clone(),
504 installers,
505 });
506 }
507
508 self.grpc_installers
509 .set(crate::runtime::GrpcInstallerData {
510 modules: modules_data,
511 })
512 .map_err(|source| RegistryError::GrpcRegister {
513 module: hub_name.clone(),
514 source,
515 })?;
516 }
517
518 Ok(())
519 }
520
521 async fn run_start_phase(&self) -> Result<(), RegistryError> {
525 tracing::info!("Phase: start");
526
527 for e in self.registry.modules_by_system_priority() {
528 if let Some(s) = e.caps.query::<RunnableCap>() {
529 tracing::debug!(
530 module = e.name,
531 is_system = e.caps.has::<SystemCap>(),
532 "Starting stateful module"
533 );
534 s.start(self.cancel.clone())
535 .await
536 .map_err(|source| RegistryError::Start {
537 module: e.name,
538 source,
539 })?;
540 tracing::info!(module = e.name, "Started module");
541 }
542 }
543
544 Ok(())
545 }
546
547 async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
549 if let Some(s) = entry.caps.query::<RunnableCap>() {
550 match s.stop(cancel).await {
551 Err(err) => {
552 tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
553 }
554 _ => {
555 tracing::info!(module = entry.name, "Stopped module");
556 }
557 }
558 }
559 }
560
561 async fn run_stop_phase(&self) -> Result<(), RegistryError> {
567 tracing::info!("Phase: stop");
568
569 for e in self.registry.modules().iter().rev() {
570 Self::stop_one_module(e, self.cancel.clone()).await;
571 }
572
573 Ok(())
574 }
575
576 async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
581 let oop_opts = match &self.oop_options {
582 Some(opts) if !opts.modules.is_empty() => opts,
583 _ => return Ok(()),
584 };
585
586 tracing::info!("Phase: oop_spawn");
587
588 let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
590
591 for module_cfg in &oop_opts.modules {
592 let mut env = module_cfg.env.clone();
595 env.insert(
596 MODKIT_MODULE_CONFIG_ENV.to_owned(),
597 module_cfg.rendered_config_json.clone(),
598 );
599 if let Some(ref endpoint) = directory_endpoint {
600 env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
601 }
602
603 let args = module_cfg.args.clone();
605
606 let spawn_config = OopSpawnConfig {
607 module_name: module_cfg.module_name.clone(),
608 binary: module_cfg.binary.clone(),
609 args,
610 env,
611 working_directory: module_cfg.working_directory.clone(),
612 };
613
614 oop_opts
615 .backend
616 .spawn(spawn_config)
617 .await
618 .map_err(|e| RegistryError::OopSpawn {
619 module: module_cfg.module_name.clone(),
620 source: e,
621 })?;
622
623 tracing::info!(
624 module = %module_cfg.module_name,
625 directory_endpoint = ?directory_endpoint,
626 "Spawned OoP module via backend"
627 );
628 }
629
630 Ok(())
631 }
632
633 async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
638 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
639 const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
640
641 let grpc_hub = self
643 .registry
644 .modules()
645 .iter()
646 .find_map(|e| e.caps.query::<GrpcHubCap>());
647
648 let Some(hub) = grpc_hub else {
649 return None; };
651
652 let start = std::time::Instant::now();
653
654 loop {
655 if let Some(endpoint) = hub.bound_endpoint() {
656 tracing::debug!(
657 endpoint = %endpoint,
658 elapsed_ms = start.elapsed().as_millis(),
659 "gRPC hub endpoint available"
660 );
661 return Some(endpoint);
662 }
663
664 if start.elapsed() > MAX_WAIT {
665 tracing::warn!("Timed out waiting for gRPC hub to bind");
666 return None;
667 }
668
669 tokio::time::sleep(POLL_INTERVAL).await;
670 }
671 }
672
673 pub async fn run_module_phases(self) -> anyhow::Result<()> {
682 self.run_phases_internal(RunMode::Full).await
683 }
684
685 pub async fn run_migration_phases(self) -> anyhow::Result<()> {
695 self.run_phases_internal(RunMode::MigrateOnly).await
696 }
697
698 async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
721 match mode {
723 RunMode::Full => {
724 tracing::info!("Running full lifecycle (all phases)");
725 }
726 RunMode::MigrateOnly => {
727 tracing::info!("Running in migration mode (pre-init + db phases only)");
728 }
729 }
730
731 self.run_pre_init_phase()?;
733
734 #[cfg(feature = "db")]
736 {
737 self.run_db_phase().await?;
738 }
739 #[cfg(not(feature = "db"))]
740 {
741 }
743
744 if mode == RunMode::MigrateOnly {
746 tracing::info!("Migration phases completed successfully");
747 return Ok(());
748 }
749
750 self.run_init_phase().await?;
752
753 self.run_post_init_phase().await?;
755
756 let _router = self.run_rest_phase().await?;
758
759 self.run_grpc_phase().await?;
761
762 self.run_start_phase().await?;
764
765 self.run_oop_spawn_phase().await?;
767
768 self.cancel.cancelled().await;
770
771 let stop_timeout = std::time::Duration::from_secs(15);
777 let disarm = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
778 let disarm_clone = std::sync::Arc::clone(&disarm);
779 std::thread::spawn(move || {
780 std::thread::sleep(stop_timeout);
781 if !disarm_clone.load(std::sync::atomic::Ordering::Relaxed) {
782 tracing::warn!(
783 timeout_secs = stop_timeout.as_secs(),
784 "shutdown: stop phase timed out, force exiting"
785 );
786 std::process::exit(1);
787 }
788 });
789
790 self.run_stop_phase().await?;
791 disarm.store(true, std::sync::atomic::Ordering::Relaxed);
792
793 Ok(())
794 }
795}
796
797#[cfg(test)]
798#[cfg_attr(coverage_nightly, coverage(off))]
799mod tests {
800 use super::*;
801 use crate::context::ModuleCtx;
802 use crate::contracts::{Module, RunnableCapability, SystemCapability};
803 use crate::registry::RegistryBuilder;
804 use std::sync::Arc;
805 use std::sync::atomic::{AtomicUsize, Ordering};
806 use tokio::sync::Mutex;
807
808 #[derive(Default)]
809 #[allow(dead_code)]
810 struct DummyCore;
811 #[async_trait::async_trait]
812 impl Module for DummyCore {
813 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
814 Ok(())
815 }
816 }
817
818 struct StopOrderTracker {
819 my_order: usize,
820 stop_order: Arc<AtomicUsize>,
821 }
822
823 impl StopOrderTracker {
824 fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
825 let my_order = counter.fetch_add(1, Ordering::SeqCst);
826 Self {
827 my_order,
828 stop_order,
829 }
830 }
831 }
832
833 #[async_trait::async_trait]
834 impl Module for StopOrderTracker {
835 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
836 Ok(())
837 }
838 }
839
840 #[async_trait::async_trait]
841 impl RunnableCapability for StopOrderTracker {
842 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
843 Ok(())
844 }
845 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
846 let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
847 tracing::info!(
848 my_order = self.my_order,
849 stop_order = order,
850 "Module stopped"
851 );
852 Ok(())
853 }
854 }
855
856 #[tokio::test]
857 async fn test_stop_phase_reverse_order() {
858 let counter = Arc::new(AtomicUsize::new(0));
859 let stop_order = Arc::new(AtomicUsize::new(0));
860
861 let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
862 let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
863 let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
864
865 let mut builder = RegistryBuilder::default();
866 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
867 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
868 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
869
870 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
871 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
872 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
873
874 let registry = builder.build_topo_sorted().unwrap();
875
876 let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
878 assert_eq!(module_names, vec!["a", "b", "c"]);
879
880 let client_hub = Arc::new(ClientHub::new());
881 let cancel = CancellationToken::new();
882 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
883
884 let runtime = HostRuntime::new(
885 registry,
886 config_provider,
887 DbOptions::None,
888 client_hub,
889 cancel.clone(),
890 Uuid::new_v4(),
891 None,
892 );
893
894 runtime.run_stop_phase().await.unwrap();
896
897 assert_eq!(stop_order.load(Ordering::SeqCst), 3);
901 }
902
903 #[tokio::test]
904 async fn test_stop_phase_continues_on_error() {
905 struct FailingModule {
906 should_fail: bool,
907 stopped: Arc<AtomicUsize>,
908 }
909
910 #[async_trait::async_trait]
911 impl Module for FailingModule {
912 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
913 Ok(())
914 }
915 }
916
917 #[async_trait::async_trait]
918 impl RunnableCapability for FailingModule {
919 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
920 Ok(())
921 }
922 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
923 self.stopped.fetch_add(1, Ordering::SeqCst);
924 if self.should_fail {
925 anyhow::bail!("Intentional failure")
926 }
927 Ok(())
928 }
929 }
930
931 let stopped = Arc::new(AtomicUsize::new(0));
932 let module_a = Arc::new(FailingModule {
933 should_fail: false,
934 stopped: stopped.clone(),
935 });
936 let module_b = Arc::new(FailingModule {
937 should_fail: true,
938 stopped: stopped.clone(),
939 });
940 let module_c = Arc::new(FailingModule {
941 should_fail: false,
942 stopped: stopped.clone(),
943 });
944
945 let mut builder = RegistryBuilder::default();
946 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
947 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
948 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
949
950 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
951 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
952 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
953
954 let registry = builder.build_topo_sorted().unwrap();
955
956 let client_hub = Arc::new(ClientHub::new());
957 let cancel = CancellationToken::new();
958 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
959
960 let runtime = HostRuntime::new(
961 registry,
962 config_provider,
963 DbOptions::None,
964 client_hub,
965 cancel.clone(),
966 Uuid::new_v4(),
967 None,
968 );
969
970 runtime.run_stop_phase().await.unwrap();
972
973 assert_eq!(stopped.load(Ordering::SeqCst), 3);
975 }
976
977 struct EmptyConfigProvider;
978 impl ConfigProvider for EmptyConfigProvider {
979 fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
980 None
981 }
982 }
983
984 #[tokio::test]
985 async fn test_post_init_runs_after_all_init_and_system_first() {
986 #[derive(Clone)]
987 struct TrackHooks {
988 name: &'static str,
989 events: Arc<Mutex<Vec<String>>>,
990 }
991
992 #[async_trait::async_trait]
993 impl Module for TrackHooks {
994 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
995 self.events.lock().await.push(format!("init:{}", self.name));
996 Ok(())
997 }
998 }
999
1000 #[async_trait::async_trait]
1001 impl SystemCapability for TrackHooks {
1002 fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1003 Ok(())
1004 }
1005
1006 async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1007 self.events
1008 .lock()
1009 .await
1010 .push(format!("post_init:{}", self.name));
1011 Ok(())
1012 }
1013 }
1014
1015 let events = Arc::new(Mutex::new(Vec::<String>::new()));
1016 let sys_a = Arc::new(TrackHooks {
1017 name: "sys_a",
1018 events: events.clone(),
1019 });
1020 let user_b = Arc::new(TrackHooks {
1021 name: "user_b",
1022 events: events.clone(),
1023 });
1024 let user_c = Arc::new(TrackHooks {
1025 name: "user_c",
1026 events: events.clone(),
1027 });
1028
1029 let mut builder = RegistryBuilder::default();
1030 builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1031 builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1032 builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1033 builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1034
1035 let registry = builder.build_topo_sorted().unwrap();
1036
1037 let client_hub = Arc::new(ClientHub::new());
1038 let cancel = CancellationToken::new();
1039 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1040
1041 let runtime = HostRuntime::new(
1042 registry,
1043 config_provider,
1044 DbOptions::None,
1045 client_hub,
1046 cancel,
1047 Uuid::new_v4(),
1048 None,
1049 );
1050
1051 runtime.run_init_phase().await.unwrap();
1053 runtime.run_post_init_phase().await.unwrap();
1054
1055 let events = events.lock().await.clone();
1056 let first_post_init = events
1057 .iter()
1058 .position(|e| e.starts_with("post_init:"))
1059 .expect("expected post_init events");
1060 assert!(
1061 events[..first_post_init]
1062 .iter()
1063 .all(|e| e.starts_with("init:")),
1064 "expected all init events before post_init, got: {events:?}"
1065 );
1066
1067 assert_eq!(
1069 events,
1070 vec![
1071 "init:sys_a",
1072 "init:user_b",
1073 "init:user_c",
1074 "post_init:sys_a",
1075 ]
1076 );
1077 }
1078}