1use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28 ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29 SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36#[derive(Clone)]
38pub enum DbOptions {
39 None,
41 #[cfg(feature = "db")]
43 Manager(Arc<modkit_db::DbManager>),
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49 Full,
51 MigrateOnly,
53}
54
55pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61pub const DEFAULT_SHUTDOWN_DEADLINE: std::time::Duration = std::time::Duration::from_secs(35);
67
68pub struct HostRuntime {
72 registry: ModuleRegistry,
73 ctx_builder: ModuleContextBuilder,
74 instance_id: Uuid,
75 module_manager: Arc<ModuleManager>,
76 grpc_installers: Arc<GrpcInstallerStore>,
77 #[allow(dead_code)]
78 client_hub: Arc<ClientHub>,
79 cancel: CancellationToken,
80 #[allow(dead_code)]
81 db_options: DbOptions,
82 oop_options: Option<OopSpawnOptions>,
84 shutdown_deadline: std::time::Duration,
86}
87
88impl HostRuntime {
89 pub fn new(
93 registry: ModuleRegistry,
94 modules_cfg: Arc<dyn ConfigProvider>,
95 db_options: DbOptions,
96 client_hub: Arc<ClientHub>,
97 cancel: CancellationToken,
98 instance_id: Uuid,
99 oop_options: Option<OopSpawnOptions>,
100 ) -> Self {
101 let module_manager = Arc::new(ModuleManager::new());
103 let grpc_installers = Arc::new(GrpcInstallerStore::new());
104
105 let db_manager = match &db_options {
107 #[cfg(feature = "db")]
108 DbOptions::Manager(mgr) => Some(mgr.clone()),
109 DbOptions::None => None,
110 };
111
112 let ctx_builder = ModuleContextBuilder::new(
113 instance_id,
114 modules_cfg,
115 client_hub.clone(),
116 cancel.clone(),
117 db_manager,
118 );
119
120 Self {
121 registry,
122 ctx_builder,
123 instance_id,
124 module_manager,
125 grpc_installers,
126 client_hub,
127 cancel,
128 db_options,
129 oop_options,
130 shutdown_deadline: DEFAULT_SHUTDOWN_DEADLINE,
131 }
132 }
133
134 #[must_use]
150 pub fn with_shutdown_deadline(mut self, deadline: std::time::Duration) -> Self {
151 self.shutdown_deadline = deadline;
152 self
153 }
154
155 pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
162 tracing::info!("Phase: pre_init");
163
164 let sys_ctx = SystemContext::new(
165 self.instance_id,
166 Arc::clone(&self.module_manager),
167 Arc::clone(&self.grpc_installers),
168 );
169
170 for entry in self.registry.modules() {
171 if self.cancel.is_cancelled() {
173 tracing::warn!("Pre-init phase cancelled by signal");
174 return Err(RegistryError::Cancelled);
175 }
176
177 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
178 tracing::debug!(module = entry.name, "Running system pre_init");
179 sys_mod
180 .pre_init(&sys_ctx)
181 .map_err(|e| RegistryError::PreInit {
182 module: entry.name,
183 source: e,
184 })?;
185 }
186 }
187
188 Ok(())
189 }
190
191 #[cfg(feature = "db")]
193 async fn module_context(
194 &self,
195 module_name: &'static str,
196 ) -> Result<crate::context::ModuleCtx, RegistryError> {
197 self.ctx_builder
198 .for_module(module_name)
199 .await
200 .map_err(|e| RegistryError::DbMigrate {
201 module: module_name,
202 source: e,
203 })
204 }
205
206 #[cfg(feature = "db")]
208 async fn db_migration_target(
209 &self,
210 module_name: &'static str,
211 ctx: &crate::context::ModuleCtx,
212 db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
213 ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
214 {
215 let Some(dbm) = db_module else {
216 return Ok(None);
217 };
218
219 let db = match &self.db_options {
223 DbOptions::None => None,
224 #[cfg(feature = "db")]
225 DbOptions::Manager(mgr) => {
226 mgr.get(module_name)
227 .await
228 .map_err(|e| RegistryError::DbMigrate {
229 module: module_name,
230 source: e.into(),
231 })?
232 }
233 };
234
235 _ = ctx; Ok(db.map(|db| (db, dbm)))
237 }
238
239 #[cfg(feature = "db")]
244 async fn migrate_module(
245 module_name: &'static str,
246 db: &modkit_db::Db,
247 db_module: Arc<dyn crate::contracts::DatabaseCapability>,
248 ) -> Result<(), RegistryError> {
249 let migrations = db_module.migrations();
251
252 if migrations.is_empty() {
253 tracing::debug!(module = module_name, "No migrations to run");
254 return Ok(());
255 }
256
257 tracing::debug!(
258 module = module_name,
259 count = migrations.len(),
260 "Running DB migrations"
261 );
262
263 let result =
265 modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
266 .await
267 .map_err(|e| RegistryError::DbMigrate {
268 module: module_name,
269 source: anyhow::Error::new(e),
270 })?;
271
272 tracing::info!(
273 module = module_name,
274 applied = result.applied,
275 skipped = result.skipped,
276 "DB migrations completed"
277 );
278
279 Ok(())
280 }
281
282 #[cfg(feature = "db")]
291 async fn run_db_phase(&self) -> Result<(), RegistryError> {
292 tracing::info!("Phase: db (before init)");
293
294 for entry in self.registry.modules_by_system_priority() {
295 if self.cancel.is_cancelled() {
297 tracing::warn!("DB migration phase cancelled by signal");
298 return Err(RegistryError::Cancelled);
299 }
300
301 let ctx = self.module_context(entry.name).await?;
302 let db_module = entry.caps.query::<DatabaseCap>();
303
304 match self
305 .db_migration_target(entry.name, &ctx, db_module.clone())
306 .await?
307 {
308 Some((db, dbm)) => {
309 Self::migrate_module(entry.name, &db, dbm).await?;
310 }
311 None if db_module.is_some() => {
312 tracing::debug!(
313 module = entry.name,
314 "Module has DbModule trait but no DB handle (no config)"
315 );
316 }
317 None => {}
318 }
319 }
320
321 Ok(())
322 }
323
324 async fn run_init_phase(&self) -> Result<(), RegistryError> {
328 tracing::info!("Phase: init");
329
330 for entry in self.registry.modules_by_system_priority() {
331 let ctx =
332 self.ctx_builder
333 .for_module(entry.name)
334 .await
335 .map_err(|e| RegistryError::Init {
336 module: entry.name,
337 source: e,
338 })?;
339 tracing::info!(module = entry.name, "Initializing a module...");
340 entry
341 .core
342 .init(&ctx)
343 .await
344 .map_err(|e| RegistryError::Init {
345 module: entry.name,
346 source: e,
347 })?;
348 tracing::info!(module = entry.name, "Initialized a module.");
349 }
350
351 Ok(())
352 }
353
354 async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
361 tracing::info!("Phase: post_init");
362
363 let sys_ctx = SystemContext::new(
364 self.instance_id,
365 Arc::clone(&self.module_manager),
366 Arc::clone(&self.grpc_installers),
367 );
368
369 for entry in self.registry.modules_by_system_priority() {
370 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
371 sys_mod
372 .post_init(&sys_ctx)
373 .await
374 .map_err(|e| RegistryError::PostInit {
375 module: entry.name,
376 source: e,
377 })?;
378 }
379 }
380
381 Ok(())
382 }
383
384 async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
391 tracing::info!("Phase: rest (sync)");
392
393 let mut router = Router::new();
394
395 let host_count = self
397 .registry
398 .modules()
399 .iter()
400 .filter(|e| e.caps.has::<ApiGatewayCap>())
401 .count();
402
403 match host_count {
404 0 => {
405 return if self
406 .registry
407 .modules()
408 .iter()
409 .any(|e| e.caps.has::<RestApiCap>())
410 {
411 Err(RegistryError::RestRequiresHost)
412 } else {
413 Ok(router)
414 };
415 }
416 1 => { }
417 _ => return Err(RegistryError::MultipleRestHosts),
418 }
419
420 let host_idx = self
422 .registry
423 .modules()
424 .iter()
425 .position(|e| e.caps.has::<ApiGatewayCap>())
426 .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
427 let host_entry = &self.registry.modules()[host_idx];
428 let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
429 return Err(RegistryError::RestHostMissingFromEntry);
430 };
431 let host_ctx = self
432 .ctx_builder
433 .for_module(host_entry.name)
434 .await
435 .map_err(|e| RegistryError::RestPrepare {
436 module: host_entry.name,
437 source: e,
438 })?;
439
440 let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
442
443 router =
445 host.rest_prepare(&host_ctx, router)
446 .map_err(|source| RegistryError::RestPrepare {
447 module: host_entry.name,
448 source,
449 })?;
450
451 for e in self.registry.modules() {
453 if let Some(rest) = e.caps.query::<RestApiCap>() {
454 let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
455 RegistryError::RestRegister {
456 module: e.name,
457 source: err,
458 }
459 })?;
460
461 router = rest
462 .register_rest(&ctx, router, registry)
463 .map_err(|source| RegistryError::RestRegister {
464 module: e.name,
465 source,
466 })?;
467 }
468 }
469
470 router = host.rest_finalize(&host_ctx, router).map_err(|source| {
472 RegistryError::RestFinalize {
473 module: host_entry.name,
474 source,
475 }
476 })?;
477
478 Ok(router)
479 }
480
481 async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
485 tracing::info!("Phase: grpc (registration)");
486
487 if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
489 return Ok(());
490 }
491
492 if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
494 return Err(RegistryError::GrpcRequiresHub);
495 }
496
497 if let Some(hub_name) = &self.registry.grpc_hub {
499 let mut modules_data = Vec::new();
500 let mut seen = HashSet::new();
501
502 for (module_name, service_module) in &self.registry.grpc_services {
504 let ctx = self
505 .ctx_builder
506 .for_module(module_name)
507 .await
508 .map_err(|err| RegistryError::GrpcRegister {
509 module: module_name.clone(),
510 source: err,
511 })?;
512
513 let installers =
514 service_module
515 .get_grpc_services(&ctx)
516 .await
517 .map_err(|source| RegistryError::GrpcRegister {
518 module: module_name.clone(),
519 source,
520 })?;
521
522 for reg in &installers {
523 if !seen.insert(reg.service_name) {
524 return Err(RegistryError::GrpcRegister {
525 module: module_name.clone(),
526 source: anyhow::anyhow!(
527 "Duplicate gRPC service name: {}",
528 reg.service_name
529 ),
530 });
531 }
532 }
533
534 modules_data.push(crate::runtime::ModuleInstallers {
535 module_name: module_name.clone(),
536 installers,
537 });
538 }
539
540 self.grpc_installers
541 .set(crate::runtime::GrpcInstallerData {
542 modules: modules_data,
543 })
544 .map_err(|source| RegistryError::GrpcRegister {
545 module: hub_name.clone(),
546 source,
547 })?;
548 }
549
550 Ok(())
551 }
552
553 async fn run_start_phase(&self) -> Result<(), RegistryError> {
557 tracing::info!("Phase: start");
558
559 for e in self.registry.modules_by_system_priority() {
560 if let Some(s) = e.caps.query::<RunnableCap>() {
561 tracing::debug!(
562 module = e.name,
563 is_system = e.caps.has::<SystemCap>(),
564 "Starting stateful module"
565 );
566 s.start(self.cancel.clone())
567 .await
568 .map_err(|source| RegistryError::Start {
569 module: e.name,
570 source,
571 })?;
572 tracing::info!(module = e.name, "Started module");
573 }
574 }
575
576 Ok(())
577 }
578
579 async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
581 if let Some(s) = entry.caps.query::<RunnableCap>() {
582 match s.stop(cancel).await {
583 Err(err) => {
584 tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
585 }
586 _ => {
587 tracing::info!(module = entry.name, "Stopped module");
588 }
589 }
590 }
591 }
592
593 async fn run_stop_phase(&self) -> Result<(), RegistryError> {
618 tracing::info!("Phase: stop");
619
620 let deadline = self.shutdown_deadline;
621
622 for e in self.registry.modules().iter().rev() {
624 let module_name = e.name;
625
626 let deadline_token = CancellationToken::new();
629 let deadline_token_for_timeout = deadline_token.clone();
630
631 let deadline_task = tokio::spawn(async move {
633 tokio::time::sleep(deadline).await;
634 tracing::warn!(
635 module = module_name,
636 deadline_secs = deadline.as_secs(),
637 "Module shutdown deadline reached, sending hard-stop signal"
638 );
639 deadline_token_for_timeout.cancel();
640 });
641
642 Self::stop_one_module(e, deadline_token).await;
645
646 deadline_task.abort();
648 #[allow(clippy::let_underscore_must_use)]
649 let _ = deadline_task.await;
650 }
651
652 Ok(())
653 }
654
655 async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
660 let oop_opts = match &self.oop_options {
661 Some(opts) if !opts.modules.is_empty() => opts,
662 _ => return Ok(()),
663 };
664
665 tracing::info!("Phase: oop_spawn");
666
667 let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
669
670 for module_cfg in &oop_opts.modules {
671 let mut env = module_cfg.env.clone();
674 env.insert(
675 MODKIT_MODULE_CONFIG_ENV.to_owned(),
676 module_cfg.rendered_config_json.clone(),
677 );
678 if let Some(ref endpoint) = directory_endpoint {
679 env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
680 }
681
682 let args = module_cfg.args.clone();
684
685 let spawn_config = OopSpawnConfig {
686 module_name: module_cfg.module_name.clone(),
687 binary: module_cfg.binary.clone(),
688 args,
689 env,
690 working_directory: module_cfg.working_directory.clone(),
691 };
692
693 oop_opts
694 .backend
695 .spawn(spawn_config)
696 .await
697 .map_err(|e| RegistryError::OopSpawn {
698 module: module_cfg.module_name.clone(),
699 source: e,
700 })?;
701
702 tracing::info!(
703 module = %module_cfg.module_name,
704 directory_endpoint = ?directory_endpoint,
705 "Spawned OoP module via backend"
706 );
707 }
708
709 Ok(())
710 }
711
712 async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
717 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
718 const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
719
720 let grpc_hub = self
722 .registry
723 .modules()
724 .iter()
725 .find_map(|e| e.caps.query::<GrpcHubCap>());
726
727 let Some(hub) = grpc_hub else {
728 return None; };
730
731 let start = std::time::Instant::now();
732
733 loop {
734 if let Some(endpoint) = hub.bound_endpoint() {
735 tracing::debug!(
736 endpoint = %endpoint,
737 elapsed_ms = start.elapsed().as_millis(),
738 "gRPC hub endpoint available"
739 );
740 return Some(endpoint);
741 }
742
743 if start.elapsed() > MAX_WAIT {
744 tracing::warn!("Timed out waiting for gRPC hub to bind");
745 return None;
746 }
747
748 tokio::time::sleep(POLL_INTERVAL).await;
749 }
750 }
751
752 pub async fn run_module_phases(self) -> anyhow::Result<()> {
761 self.run_phases_internal(RunMode::Full).await
762 }
763
764 pub async fn run_migration_phases(self) -> anyhow::Result<()> {
774 self.run_phases_internal(RunMode::MigrateOnly).await
775 }
776
777 async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
800 match mode {
802 RunMode::Full => {
803 tracing::info!("Running full lifecycle (all phases)");
804 }
805 RunMode::MigrateOnly => {
806 tracing::info!("Running in migration mode (pre-init + db phases only)");
807 }
808 }
809
810 self.run_pre_init_phase()?;
812
813 #[cfg(feature = "db")]
815 {
816 self.run_db_phase().await?;
817 }
818 #[cfg(not(feature = "db"))]
819 {
820 }
822
823 if mode == RunMode::MigrateOnly {
825 tracing::info!("Migration phases completed successfully");
826 return Ok(());
827 }
828
829 self.run_init_phase().await?;
831
832 self.run_post_init_phase().await?;
834
835 let _router = self.run_rest_phase().await?;
837
838 self.run_grpc_phase().await?;
840
841 self.run_start_phase().await?;
843
844 self.run_oop_spawn_phase().await?;
846
847 self.cancel.cancelled().await;
849
850 let stop_timeout = std::time::Duration::from_secs(15);
856 let disarm = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
857 let disarm_clone = std::sync::Arc::clone(&disarm);
858 std::thread::spawn(move || {
859 std::thread::sleep(stop_timeout);
860 if !disarm_clone.load(std::sync::atomic::Ordering::Relaxed) {
861 tracing::warn!(
862 timeout_secs = stop_timeout.as_secs(),
863 "shutdown: stop phase timed out, force exiting"
864 );
865 std::process::exit(1);
866 }
867 });
868
869 self.run_stop_phase().await?;
870 disarm.store(true, std::sync::atomic::Ordering::Relaxed);
871
872 Ok(())
873 }
874}
875
876#[cfg(test)]
877#[cfg_attr(coverage_nightly, coverage(off))]
878mod tests {
879 use super::*;
880 use crate::context::ModuleCtx;
881 use crate::contracts::{Module, RunnableCapability, SystemCapability};
882 use crate::registry::RegistryBuilder;
883 use std::sync::Arc;
884 use std::sync::atomic::{AtomicUsize, Ordering};
885 use tokio::sync::Mutex;
886
887 #[derive(Default)]
888 #[allow(dead_code)]
889 struct DummyCore;
890 #[async_trait::async_trait]
891 impl Module for DummyCore {
892 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
893 Ok(())
894 }
895 }
896
897 struct StopOrderTracker {
898 my_order: usize,
899 stop_order: Arc<AtomicUsize>,
900 }
901
902 impl StopOrderTracker {
903 fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
904 let my_order = counter.fetch_add(1, Ordering::SeqCst);
905 Self {
906 my_order,
907 stop_order,
908 }
909 }
910 }
911
912 #[async_trait::async_trait]
913 impl Module for StopOrderTracker {
914 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
915 Ok(())
916 }
917 }
918
919 #[async_trait::async_trait]
920 impl RunnableCapability for StopOrderTracker {
921 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
922 Ok(())
923 }
924 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
925 let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
926 tracing::info!(
927 my_order = self.my_order,
928 stop_order = order,
929 "Module stopped"
930 );
931 Ok(())
932 }
933 }
934
935 #[tokio::test]
936 async fn test_stop_phase_reverse_order() {
937 let counter = Arc::new(AtomicUsize::new(0));
938 let stop_order = Arc::new(AtomicUsize::new(0));
939
940 let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
941 let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
942 let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
943
944 let mut builder = RegistryBuilder::default();
945 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
946 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
947 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
948
949 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
950 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
951 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
952
953 let registry = builder.build_topo_sorted().unwrap();
954
955 let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
957 assert_eq!(module_names, vec!["a", "b", "c"]);
958
959 let client_hub = Arc::new(ClientHub::new());
960 let cancel = CancellationToken::new();
961 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
962
963 let runtime = HostRuntime::new(
964 registry,
965 config_provider,
966 DbOptions::None,
967 client_hub,
968 cancel.clone(),
969 Uuid::new_v4(),
970 None,
971 );
972
973 runtime.run_stop_phase().await.unwrap();
975
976 assert_eq!(stop_order.load(Ordering::SeqCst), 3);
980 }
981
982 #[tokio::test]
983 async fn test_stop_phase_continues_on_error() {
984 struct FailingModule {
985 should_fail: bool,
986 stopped: Arc<AtomicUsize>,
987 }
988
989 #[async_trait::async_trait]
990 impl Module for FailingModule {
991 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
992 Ok(())
993 }
994 }
995
996 #[async_trait::async_trait]
997 impl RunnableCapability for FailingModule {
998 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
999 Ok(())
1000 }
1001 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1002 self.stopped.fetch_add(1, Ordering::SeqCst);
1003 if self.should_fail {
1004 anyhow::bail!("Intentional failure")
1005 }
1006 Ok(())
1007 }
1008 }
1009
1010 let stopped = Arc::new(AtomicUsize::new(0));
1011 let module_a = Arc::new(FailingModule {
1012 should_fail: false,
1013 stopped: stopped.clone(),
1014 });
1015 let module_b = Arc::new(FailingModule {
1016 should_fail: true,
1017 stopped: stopped.clone(),
1018 });
1019 let module_c = Arc::new(FailingModule {
1020 should_fail: false,
1021 stopped: stopped.clone(),
1022 });
1023
1024 let mut builder = RegistryBuilder::default();
1025 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
1026 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
1027 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
1028
1029 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
1030 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
1031 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
1032
1033 let registry = builder.build_topo_sorted().unwrap();
1034
1035 let client_hub = Arc::new(ClientHub::new());
1036 let cancel = CancellationToken::new();
1037 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1038
1039 let runtime = HostRuntime::new(
1040 registry,
1041 config_provider,
1042 DbOptions::None,
1043 client_hub,
1044 cancel.clone(),
1045 Uuid::new_v4(),
1046 None,
1047 );
1048
1049 runtime.run_stop_phase().await.unwrap();
1051
1052 assert_eq!(stopped.load(Ordering::SeqCst), 3);
1054 }
1055
1056 struct EmptyConfigProvider;
1057 impl ConfigProvider for EmptyConfigProvider {
1058 fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
1059 None
1060 }
1061 }
1062
1063 #[tokio::test]
1064 async fn test_post_init_runs_after_all_init_and_system_first() {
1065 #[derive(Clone)]
1066 struct TrackHooks {
1067 name: &'static str,
1068 events: Arc<Mutex<Vec<String>>>,
1069 }
1070
1071 #[async_trait::async_trait]
1072 impl Module for TrackHooks {
1073 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1074 self.events.lock().await.push(format!("init:{}", self.name));
1075 Ok(())
1076 }
1077 }
1078
1079 #[async_trait::async_trait]
1080 impl SystemCapability for TrackHooks {
1081 fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1082 Ok(())
1083 }
1084
1085 async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1086 self.events
1087 .lock()
1088 .await
1089 .push(format!("post_init:{}", self.name));
1090 Ok(())
1091 }
1092 }
1093
1094 let events = Arc::new(Mutex::new(Vec::<String>::new()));
1095 let sys_a = Arc::new(TrackHooks {
1096 name: "sys_a",
1097 events: events.clone(),
1098 });
1099 let user_b = Arc::new(TrackHooks {
1100 name: "user_b",
1101 events: events.clone(),
1102 });
1103 let user_c = Arc::new(TrackHooks {
1104 name: "user_c",
1105 events: events.clone(),
1106 });
1107
1108 let mut builder = RegistryBuilder::default();
1109 builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1110 builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1111 builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1112 builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1113
1114 let registry = builder.build_topo_sorted().unwrap();
1115
1116 let client_hub = Arc::new(ClientHub::new());
1117 let cancel = CancellationToken::new();
1118 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1119
1120 let runtime = HostRuntime::new(
1121 registry,
1122 config_provider,
1123 DbOptions::None,
1124 client_hub,
1125 cancel,
1126 Uuid::new_v4(),
1127 None,
1128 );
1129
1130 runtime.run_init_phase().await.unwrap();
1132 runtime.run_post_init_phase().await.unwrap();
1133
1134 let events = events.lock().await.clone();
1135 let first_post_init = events
1136 .iter()
1137 .position(|e| e.starts_with("post_init:"))
1138 .expect("expected post_init events");
1139 assert!(
1140 events[..first_post_init]
1141 .iter()
1142 .all(|e| e.starts_with("init:")),
1143 "expected all init events before post_init, got: {events:?}"
1144 );
1145
1146 assert_eq!(
1148 events,
1149 vec![
1150 "init:sys_a",
1151 "init:user_b",
1152 "init:user_c",
1153 "post_init:sys_a",
1154 ]
1155 );
1156 }
1157
1158 #[tokio::test]
1159 async fn test_stop_phase_provides_fresh_deadline_token() {
1160 use std::sync::atomic::AtomicBool;
1161
1162 struct TokenCheckModule {
1163 stop_was_called: AtomicBool,
1164 token_was_cancelled_on_entry: AtomicBool,
1165 }
1166
1167 #[async_trait::async_trait]
1168 impl Module for TokenCheckModule {
1169 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1170 Ok(())
1171 }
1172 }
1173
1174 #[async_trait::async_trait]
1175 impl RunnableCapability for TokenCheckModule {
1176 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1177 Ok(())
1178 }
1179 async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1180 self.stop_was_called.store(true, Ordering::SeqCst);
1182 self.token_was_cancelled_on_entry
1184 .store(deadline_token.is_cancelled(), Ordering::SeqCst);
1185 Ok(())
1186 }
1187 }
1188
1189 let module = Arc::new(TokenCheckModule {
1190 stop_was_called: AtomicBool::new(false),
1191 token_was_cancelled_on_entry: AtomicBool::new(true),
1193 });
1194
1195 let mut builder = RegistryBuilder::default();
1196 builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1197 builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1198
1199 let registry = builder.build_topo_sorted().unwrap();
1200 let client_hub = Arc::new(ClientHub::new());
1201 let cancel = CancellationToken::new();
1202 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1203
1204 let runtime = HostRuntime::new(
1205 registry,
1206 config_provider,
1207 DbOptions::None,
1208 client_hub,
1209 cancel.clone(),
1210 Uuid::new_v4(),
1211 None,
1212 );
1213
1214 runtime.run_stop_phase().await.unwrap();
1216
1217 assert!(
1219 module.stop_was_called.load(Ordering::SeqCst),
1220 "stop() was never called - module may not have been registered correctly"
1221 );
1222
1223 assert!(
1226 !module.token_was_cancelled_on_entry.load(Ordering::SeqCst),
1227 "deadline_token should NOT be cancelled when stop() is called - this enables graceful shutdown"
1228 );
1229 }
1230
1231 #[tokio::test]
1232 async fn test_stop_phase_graceful_shutdown_completes_before_deadline() {
1233 use std::sync::atomic::AtomicBool;
1234 use std::time::Duration;
1235
1236 struct GracefulModule {
1237 graceful_completed: AtomicBool,
1238 deadline_fired: AtomicBool,
1239 }
1240
1241 #[async_trait::async_trait]
1242 impl Module for GracefulModule {
1243 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1244 Ok(())
1245 }
1246 }
1247
1248 #[async_trait::async_trait]
1249 impl RunnableCapability for GracefulModule {
1250 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1251 Ok(())
1252 }
1253 async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1254 tokio::select! {
1256 () = tokio::time::sleep(Duration::from_millis(10)) => {
1257 self.graceful_completed.store(true, Ordering::SeqCst);
1258 }
1259 () = deadline_token.cancelled() => {
1260 self.deadline_fired.store(true, Ordering::SeqCst);
1261 }
1262 }
1263 Ok(())
1264 }
1265 }
1266
1267 let module = Arc::new(GracefulModule {
1268 graceful_completed: AtomicBool::new(false),
1269 deadline_fired: AtomicBool::new(false),
1270 });
1271
1272 let mut builder = RegistryBuilder::default();
1273 builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1274 builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1275
1276 let registry = builder.build_topo_sorted().unwrap();
1277 let client_hub = Arc::new(ClientHub::new());
1278 let cancel = CancellationToken::new();
1279 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1280
1281 let runtime = HostRuntime::new(
1283 registry,
1284 config_provider,
1285 DbOptions::None,
1286 client_hub,
1287 cancel.clone(),
1288 Uuid::new_v4(),
1289 None,
1290 )
1291 .with_shutdown_deadline(Duration::from_secs(5));
1292
1293 runtime.run_stop_phase().await.unwrap();
1294
1295 assert!(
1297 module.graceful_completed.load(Ordering::SeqCst),
1298 "graceful shutdown should complete"
1299 );
1300 assert!(
1302 !module.deadline_fired.load(Ordering::SeqCst),
1303 "deadline should not fire when graceful shutdown completes quickly"
1304 );
1305 }
1306
1307 #[tokio::test]
1308 async fn test_stop_phase_deadline_fires_for_slow_module() {
1309 use std::sync::atomic::AtomicBool;
1310 use std::time::Duration;
1311
1312 struct SlowModule {
1313 graceful_completed: AtomicBool,
1314 deadline_fired: AtomicBool,
1315 }
1316
1317 #[async_trait::async_trait]
1318 impl Module for SlowModule {
1319 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1320 Ok(())
1321 }
1322 }
1323
1324 #[async_trait::async_trait]
1325 impl RunnableCapability for SlowModule {
1326 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1327 Ok(())
1328 }
1329 async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1330 tokio::select! {
1332 () = tokio::time::sleep(Duration::from_secs(10)) => {
1333 self.graceful_completed.store(true, Ordering::SeqCst);
1334 }
1335 () = deadline_token.cancelled() => {
1336 self.deadline_fired.store(true, Ordering::SeqCst);
1337 }
1338 }
1339 Ok(())
1340 }
1341 }
1342
1343 let module = Arc::new(SlowModule {
1344 graceful_completed: AtomicBool::new(false),
1345 deadline_fired: AtomicBool::new(false),
1346 });
1347
1348 let mut builder = RegistryBuilder::default();
1349 builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1350 builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1351
1352 let registry = builder.build_topo_sorted().unwrap();
1353 let client_hub = Arc::new(ClientHub::new());
1354 let cancel = CancellationToken::new();
1355 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1356
1357 let runtime = HostRuntime::new(
1359 registry,
1360 config_provider,
1361 DbOptions::None,
1362 client_hub,
1363 cancel.clone(),
1364 Uuid::new_v4(),
1365 None,
1366 )
1367 .with_shutdown_deadline(Duration::from_millis(100));
1368
1369 runtime.run_stop_phase().await.unwrap();
1370
1371 assert!(
1373 !module.graceful_completed.load(Ordering::SeqCst),
1374 "graceful shutdown should not complete when deadline fires first"
1375 );
1376 assert!(
1378 module.deadline_fired.load(Ordering::SeqCst),
1379 "deadline should fire for slow modules"
1380 );
1381 }
1382}