1use axum::Router;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use uuid::Uuid;
22
23use crate::backends::OopSpawnConfig;
24use crate::client_hub::ClientHub;
25use crate::config::ConfigProvider;
26use crate::context::ModuleContextBuilder;
27use crate::registry::{
28 ApiGatewayCap, GrpcHubCap, ModuleEntry, ModuleRegistry, RegistryError, RestApiCap, RunnableCap,
29 SystemCap,
30};
31use crate::runtime::{GrpcInstallerStore, ModuleManager, OopSpawnOptions, SystemContext};
32
33#[cfg(feature = "db")]
34use crate::registry::DatabaseCap;
35
36#[derive(Clone)]
38pub enum DbOptions {
39 None,
41 #[cfg(feature = "db")]
43 Manager(Arc<modkit_db::DbManager>),
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum RunMode {
49 Full,
51 MigrateOnly,
53}
54
55pub const MODKIT_DIRECTORY_ENDPOINT_ENV: &str = "MODKIT_DIRECTORY_ENDPOINT";
57
58pub const MODKIT_MODULE_CONFIG_ENV: &str = "MODKIT_MODULE_CONFIG";
60
61pub const DEFAULT_SHUTDOWN_DEADLINE: std::time::Duration = std::time::Duration::from_secs(35);
67
68pub struct HostRuntime {
72 registry: ModuleRegistry,
73 ctx_builder: ModuleContextBuilder,
74 instance_id: Uuid,
75 module_manager: Arc<ModuleManager>,
76 grpc_installers: Arc<GrpcInstallerStore>,
77 #[allow(dead_code)]
78 client_hub: Arc<ClientHub>,
79 cancel: CancellationToken,
80 #[allow(dead_code)]
81 db_options: DbOptions,
82 oop_options: Option<OopSpawnOptions>,
84 shutdown_deadline: std::time::Duration,
86}
87
88impl HostRuntime {
89 pub fn new(
93 registry: ModuleRegistry,
94 modules_cfg: Arc<dyn ConfigProvider>,
95 db_options: DbOptions,
96 client_hub: Arc<ClientHub>,
97 cancel: CancellationToken,
98 instance_id: Uuid,
99 oop_options: Option<OopSpawnOptions>,
100 ) -> Self {
101 let module_manager = Arc::new(ModuleManager::new());
103 let grpc_installers = Arc::new(GrpcInstallerStore::new());
104
105 let db_manager = match &db_options {
107 #[cfg(feature = "db")]
108 DbOptions::Manager(mgr) => Some(mgr.clone()),
109 DbOptions::None => None,
110 };
111
112 let ctx_builder = ModuleContextBuilder::new(
113 instance_id,
114 modules_cfg,
115 client_hub.clone(),
116 cancel.clone(),
117 db_manager,
118 );
119
120 Self {
121 registry,
122 ctx_builder,
123 instance_id,
124 module_manager,
125 grpc_installers,
126 client_hub,
127 cancel,
128 db_options,
129 oop_options,
130 shutdown_deadline: DEFAULT_SHUTDOWN_DEADLINE,
131 }
132 }
133
134 #[must_use]
150 pub fn with_shutdown_deadline(mut self, deadline: std::time::Duration) -> Self {
151 self.shutdown_deadline = deadline;
152 self
153 }
154
155 pub fn run_pre_init_phase(&self) -> Result<(), RegistryError> {
162 tracing::info!("Phase: pre_init");
163
164 let sys_ctx = SystemContext::new(
165 self.instance_id,
166 Arc::clone(&self.module_manager),
167 Arc::clone(&self.grpc_installers),
168 );
169
170 for entry in self.registry.modules() {
171 if self.cancel.is_cancelled() {
173 tracing::warn!("Pre-init phase cancelled by signal");
174 return Err(RegistryError::Cancelled);
175 }
176
177 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
178 tracing::debug!(module = entry.name, "Running system pre_init");
179 sys_mod
180 .pre_init(&sys_ctx)
181 .map_err(|e| RegistryError::PreInit {
182 module: entry.name,
183 source: e,
184 })?;
185 }
186 }
187
188 Ok(())
189 }
190
191 async fn module_context(
193 &self,
194 module_name: &'static str,
195 ) -> Result<crate::context::ModuleCtx, RegistryError> {
196 self.ctx_builder
197 .for_module(module_name)
198 .await
199 .map_err(|e| RegistryError::DbMigrate {
200 module: module_name,
201 source: e,
202 })
203 }
204
205 #[cfg(feature = "db")]
207 async fn db_migration_target(
208 &self,
209 module_name: &'static str,
210 ctx: &crate::context::ModuleCtx,
211 db_module: Option<Arc<dyn crate::contracts::DatabaseCapability>>,
212 ) -> Result<Option<(modkit_db::Db, Arc<dyn crate::contracts::DatabaseCapability>)>, RegistryError>
213 {
214 let Some(dbm) = db_module else {
215 return Ok(None);
216 };
217
218 let db = match &self.db_options {
222 DbOptions::None => None,
223 #[cfg(feature = "db")]
224 DbOptions::Manager(mgr) => {
225 mgr.get(module_name)
226 .await
227 .map_err(|e| RegistryError::DbMigrate {
228 module: module_name,
229 source: e.into(),
230 })?
231 }
232 };
233
234 _ = ctx; Ok(db.map(|db| (db, dbm)))
236 }
237
238 #[cfg(feature = "db")]
243 async fn migrate_module(
244 module_name: &'static str,
245 db: &modkit_db::Db,
246 db_module: Arc<dyn crate::contracts::DatabaseCapability>,
247 ) -> Result<(), RegistryError> {
248 let migrations = db_module.migrations();
250
251 if migrations.is_empty() {
252 tracing::debug!(module = module_name, "No migrations to run");
253 return Ok(());
254 }
255
256 tracing::debug!(
257 module = module_name,
258 count = migrations.len(),
259 "Running DB migrations"
260 );
261
262 let result =
264 modkit_db::migration_runner::run_migrations_for_module(db, module_name, migrations)
265 .await
266 .map_err(|e| RegistryError::DbMigrate {
267 module: module_name,
268 source: anyhow::Error::new(e),
269 })?;
270
271 tracing::info!(
272 module = module_name,
273 applied = result.applied,
274 skipped = result.skipped,
275 "DB migrations completed"
276 );
277
278 Ok(())
279 }
280
281 #[cfg(feature = "db")]
290 async fn run_db_phase(&self) -> Result<(), RegistryError> {
291 tracing::info!("Phase: db (before init)");
292
293 for entry in self.registry.modules_by_system_priority() {
294 if self.cancel.is_cancelled() {
296 tracing::warn!("DB migration phase cancelled by signal");
297 return Err(RegistryError::Cancelled);
298 }
299
300 let ctx = self.module_context(entry.name).await?;
301 let db_module = entry.caps.query::<DatabaseCap>();
302
303 match self
304 .db_migration_target(entry.name, &ctx, db_module.clone())
305 .await?
306 {
307 Some((db, dbm)) => {
308 Self::migrate_module(entry.name, &db, dbm).await?;
309 }
310 None if db_module.is_some() => {
311 tracing::debug!(
312 module = entry.name,
313 "Module has DbModule trait but no DB handle (no config)"
314 );
315 }
316 None => {}
317 }
318 }
319
320 Ok(())
321 }
322
323 async fn run_init_phase(&self) -> Result<(), RegistryError> {
327 tracing::info!("Phase: init");
328
329 for entry in self.registry.modules_by_system_priority() {
330 let ctx =
331 self.ctx_builder
332 .for_module(entry.name)
333 .await
334 .map_err(|e| RegistryError::Init {
335 module: entry.name,
336 source: e,
337 })?;
338 tracing::info!(module = entry.name, "Initializing a module...");
339 entry
340 .core
341 .init(&ctx)
342 .await
343 .map_err(|e| RegistryError::Init {
344 module: entry.name,
345 source: e,
346 })?;
347 tracing::info!(module = entry.name, "Initialized a module.");
348 }
349
350 Ok(())
351 }
352
353 async fn run_post_init_phase(&self) -> Result<(), RegistryError> {
360 tracing::info!("Phase: post_init");
361
362 let sys_ctx = SystemContext::new(
363 self.instance_id,
364 Arc::clone(&self.module_manager),
365 Arc::clone(&self.grpc_installers),
366 );
367
368 for entry in self.registry.modules_by_system_priority() {
369 if let Some(sys_mod) = entry.caps.query::<SystemCap>() {
370 sys_mod
371 .post_init(&sys_ctx)
372 .await
373 .map_err(|e| RegistryError::PostInit {
374 module: entry.name,
375 source: e,
376 })?;
377 }
378 }
379
380 Ok(())
381 }
382
383 async fn run_rest_phase(&self) -> Result<Router, RegistryError> {
390 tracing::info!("Phase: rest (sync)");
391
392 let mut router = Router::new();
393
394 let host_count = self
396 .registry
397 .modules()
398 .iter()
399 .filter(|e| e.caps.has::<ApiGatewayCap>())
400 .count();
401
402 match host_count {
403 0 => {
404 return if self
405 .registry
406 .modules()
407 .iter()
408 .any(|e| e.caps.has::<RestApiCap>())
409 {
410 Err(RegistryError::RestRequiresHost)
411 } else {
412 Ok(router)
413 };
414 }
415 1 => { }
416 _ => return Err(RegistryError::MultipleRestHosts),
417 }
418
419 let host_idx = self
421 .registry
422 .modules()
423 .iter()
424 .position(|e| e.caps.has::<ApiGatewayCap>())
425 .ok_or(RegistryError::RestHostNotFoundAfterValidation)?;
426 let host_entry = &self.registry.modules()[host_idx];
427 let Some(host) = host_entry.caps.query::<ApiGatewayCap>() else {
428 return Err(RegistryError::RestHostMissingFromEntry);
429 };
430 let host_ctx = self
431 .ctx_builder
432 .for_module(host_entry.name)
433 .await
434 .map_err(|e| RegistryError::RestPrepare {
435 module: host_entry.name,
436 source: e,
437 })?;
438
439 let registry: &dyn crate::contracts::OpenApiRegistry = host.as_registry();
441
442 router =
444 host.rest_prepare(&host_ctx, router)
445 .map_err(|source| RegistryError::RestPrepare {
446 module: host_entry.name,
447 source,
448 })?;
449
450 for e in self.registry.modules() {
452 if let Some(rest) = e.caps.query::<RestApiCap>() {
453 let ctx = self.ctx_builder.for_module(e.name).await.map_err(|err| {
454 RegistryError::RestRegister {
455 module: e.name,
456 source: err,
457 }
458 })?;
459
460 router = rest
461 .register_rest(&ctx, router, registry)
462 .map_err(|source| RegistryError::RestRegister {
463 module: e.name,
464 source,
465 })?;
466 }
467 }
468
469 router = host.rest_finalize(&host_ctx, router).map_err(|source| {
471 RegistryError::RestFinalize {
472 module: host_entry.name,
473 source,
474 }
475 })?;
476
477 Ok(router)
478 }
479
480 async fn run_grpc_phase(&self) -> Result<(), RegistryError> {
484 tracing::info!("Phase: grpc (registration)");
485
486 if self.registry.grpc_hub.is_none() && self.registry.grpc_services.is_empty() {
488 return Ok(());
489 }
490
491 if self.registry.grpc_hub.is_none() && !self.registry.grpc_services.is_empty() {
493 return Err(RegistryError::GrpcRequiresHub);
494 }
495
496 if let Some(hub_name) = &self.registry.grpc_hub {
498 let mut modules_data = Vec::new();
499 let mut seen = HashSet::new();
500
501 for (module_name, service_module) in &self.registry.grpc_services {
503 let ctx = self
504 .ctx_builder
505 .for_module(module_name)
506 .await
507 .map_err(|err| RegistryError::GrpcRegister {
508 module: module_name.clone(),
509 source: err,
510 })?;
511
512 let installers =
513 service_module
514 .get_grpc_services(&ctx)
515 .await
516 .map_err(|source| RegistryError::GrpcRegister {
517 module: module_name.clone(),
518 source,
519 })?;
520
521 for reg in &installers {
522 if !seen.insert(reg.service_name) {
523 return Err(RegistryError::GrpcRegister {
524 module: module_name.clone(),
525 source: anyhow::anyhow!(
526 "Duplicate gRPC service name: {}",
527 reg.service_name
528 ),
529 });
530 }
531 }
532
533 modules_data.push(crate::runtime::ModuleInstallers {
534 module_name: module_name.clone(),
535 installers,
536 });
537 }
538
539 self.grpc_installers
540 .set(crate::runtime::GrpcInstallerData {
541 modules: modules_data,
542 })
543 .map_err(|source| RegistryError::GrpcRegister {
544 module: hub_name.clone(),
545 source,
546 })?;
547 }
548
549 Ok(())
550 }
551
552 async fn run_start_phase(&self) -> Result<(), RegistryError> {
556 tracing::info!("Phase: start");
557
558 for e in self.registry.modules_by_system_priority() {
559 if let Some(s) = e.caps.query::<RunnableCap>() {
560 tracing::debug!(
561 module = e.name,
562 is_system = e.caps.has::<SystemCap>(),
563 "Starting stateful module"
564 );
565 s.start(self.cancel.clone())
566 .await
567 .map_err(|source| RegistryError::Start {
568 module: e.name,
569 source,
570 })?;
571 tracing::info!(module = e.name, "Started module");
572 }
573 }
574
575 Ok(())
576 }
577
578 async fn stop_one_module(entry: &ModuleEntry, cancel: CancellationToken) {
580 if let Some(s) = entry.caps.query::<RunnableCap>() {
581 match s.stop(cancel).await {
582 Err(err) => {
583 tracing::warn!(module = entry.name, error = %err, "Failed to stop module");
584 }
585 _ => {
586 tracing::info!(module = entry.name, "Stopped module");
587 }
588 }
589 }
590 }
591
592 async fn run_stop_phase(&self) -> Result<(), RegistryError> {
617 tracing::info!("Phase: stop");
618
619 let deadline = self.shutdown_deadline;
620
621 for e in self.registry.modules().iter().rev() {
623 let module_name = e.name;
624
625 let deadline_token = CancellationToken::new();
628 let deadline_token_for_timeout = deadline_token.clone();
629
630 let deadline_task = tokio::spawn(async move {
632 tokio::time::sleep(deadline).await;
633 tracing::warn!(
634 module = module_name,
635 deadline_secs = deadline.as_secs(),
636 "Module shutdown deadline reached, sending hard-stop signal"
637 );
638 deadline_token_for_timeout.cancel();
639 });
640
641 Self::stop_one_module(e, deadline_token).await;
644
645 deadline_task.abort();
647 #[allow(clippy::let_underscore_must_use)]
648 let _ = deadline_task.await;
649 }
650
651 Ok(())
652 }
653
654 async fn run_oop_spawn_phase(&self) -> Result<(), RegistryError> {
659 let oop_opts = match &self.oop_options {
660 Some(opts) if !opts.modules.is_empty() => opts,
661 _ => return Ok(()),
662 };
663
664 tracing::info!("Phase: oop_spawn");
665
666 let directory_endpoint = self.wait_for_grpc_hub_endpoint().await;
668
669 for module_cfg in &oop_opts.modules {
670 let mut env = module_cfg.env.clone();
673 env.insert(
674 MODKIT_MODULE_CONFIG_ENV.to_owned(),
675 module_cfg.rendered_config_json.clone(),
676 );
677 if let Some(ref endpoint) = directory_endpoint {
678 env.insert(MODKIT_DIRECTORY_ENDPOINT_ENV.to_owned(), endpoint.clone());
679 }
680
681 let args = module_cfg.args.clone();
683
684 let spawn_config = OopSpawnConfig {
685 module_name: module_cfg.module_name.clone(),
686 binary: module_cfg.binary.clone(),
687 args,
688 env,
689 working_directory: module_cfg.working_directory.clone(),
690 };
691
692 oop_opts
693 .backend
694 .spawn(spawn_config)
695 .await
696 .map_err(|e| RegistryError::OopSpawn {
697 module: module_cfg.module_name.clone(),
698 source: e,
699 })?;
700
701 tracing::info!(
702 module = %module_cfg.module_name,
703 directory_endpoint = ?directory_endpoint,
704 "Spawned OoP module via backend"
705 );
706 }
707
708 Ok(())
709 }
710
711 async fn wait_for_grpc_hub_endpoint(&self) -> Option<String> {
716 const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
717 const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(5);
718
719 let grpc_hub = self
721 .registry
722 .modules()
723 .iter()
724 .find_map(|e| e.caps.query::<GrpcHubCap>());
725
726 let Some(hub) = grpc_hub else {
727 return None; };
729
730 let start = std::time::Instant::now();
731
732 loop {
733 if let Some(endpoint) = hub.bound_endpoint() {
734 tracing::debug!(
735 endpoint = %endpoint,
736 elapsed_ms = start.elapsed().as_millis(),
737 "gRPC hub endpoint available"
738 );
739 return Some(endpoint);
740 }
741
742 if start.elapsed() > MAX_WAIT {
743 tracing::warn!("Timed out waiting for gRPC hub to bind");
744 return None;
745 }
746
747 tokio::time::sleep(POLL_INTERVAL).await;
748 }
749 }
750
751 pub async fn run_module_phases(self) -> anyhow::Result<()> {
760 self.run_phases_internal(RunMode::Full).await
761 }
762
763 pub async fn run_migration_phases(self) -> anyhow::Result<()> {
773 self.run_phases_internal(RunMode::MigrateOnly).await
774 }
775
776 async fn run_phases_internal(self, mode: RunMode) -> anyhow::Result<()> {
799 match mode {
801 RunMode::Full => {
802 tracing::info!("Running full lifecycle (all phases)");
803 }
804 RunMode::MigrateOnly => {
805 tracing::info!("Running in migration mode (pre-init + db phases only)");
806 }
807 }
808
809 self.run_pre_init_phase()?;
811
812 #[cfg(feature = "db")]
814 {
815 self.run_db_phase().await?;
816 }
817 #[cfg(not(feature = "db"))]
818 {
819 }
821
822 if mode == RunMode::MigrateOnly {
824 tracing::info!("Migration phases completed successfully");
825 return Ok(());
826 }
827
828 self.run_init_phase().await?;
830
831 self.run_post_init_phase().await?;
833
834 let _router = self.run_rest_phase().await?;
836
837 self.run_grpc_phase().await?;
839
840 self.run_start_phase().await?;
842
843 self.run_oop_spawn_phase().await?;
845
846 self.cancel.cancelled().await;
848
849 let stop_timeout = std::time::Duration::from_secs(15);
855 let disarm = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
856 let disarm_clone = std::sync::Arc::clone(&disarm);
857 std::thread::spawn(move || {
858 std::thread::sleep(stop_timeout);
859 if !disarm_clone.load(std::sync::atomic::Ordering::Relaxed) {
860 tracing::warn!(
861 timeout_secs = stop_timeout.as_secs(),
862 "shutdown: stop phase timed out, force exiting"
863 );
864 std::process::exit(1);
865 }
866 });
867
868 self.run_stop_phase().await?;
869 disarm.store(true, std::sync::atomic::Ordering::Relaxed);
870
871 Ok(())
872 }
873}
874
875#[cfg(test)]
876#[cfg_attr(coverage_nightly, coverage(off))]
877mod tests {
878 use super::*;
879 use crate::context::ModuleCtx;
880 use crate::contracts::{Module, RunnableCapability, SystemCapability};
881 use crate::registry::RegistryBuilder;
882 use std::sync::Arc;
883 use std::sync::atomic::{AtomicUsize, Ordering};
884 use tokio::sync::Mutex;
885
886 #[derive(Default)]
887 #[allow(dead_code)]
888 struct DummyCore;
889 #[async_trait::async_trait]
890 impl Module for DummyCore {
891 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
892 Ok(())
893 }
894 }
895
896 struct StopOrderTracker {
897 my_order: usize,
898 stop_order: Arc<AtomicUsize>,
899 }
900
901 impl StopOrderTracker {
902 fn new(counter: &Arc<AtomicUsize>, stop_order: Arc<AtomicUsize>) -> Self {
903 let my_order = counter.fetch_add(1, Ordering::SeqCst);
904 Self {
905 my_order,
906 stop_order,
907 }
908 }
909 }
910
911 #[async_trait::async_trait]
912 impl Module for StopOrderTracker {
913 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
914 Ok(())
915 }
916 }
917
918 #[async_trait::async_trait]
919 impl RunnableCapability for StopOrderTracker {
920 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
921 Ok(())
922 }
923 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
924 let order = self.stop_order.fetch_add(1, Ordering::SeqCst);
925 tracing::info!(
926 my_order = self.my_order,
927 stop_order = order,
928 "Module stopped"
929 );
930 Ok(())
931 }
932 }
933
934 #[tokio::test]
935 async fn test_stop_phase_reverse_order() {
936 let counter = Arc::new(AtomicUsize::new(0));
937 let stop_order = Arc::new(AtomicUsize::new(0));
938
939 let module_a = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
940 let module_b = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
941 let module_c = Arc::new(StopOrderTracker::new(&counter, stop_order.clone()));
942
943 let mut builder = RegistryBuilder::default();
944 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
945 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
946 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
947
948 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
949 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
950 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
951
952 let registry = builder.build_topo_sorted().unwrap();
953
954 let module_names: Vec<_> = registry.modules().iter().map(|m| m.name).collect();
956 assert_eq!(module_names, vec!["a", "b", "c"]);
957
958 let client_hub = Arc::new(ClientHub::new());
959 let cancel = CancellationToken::new();
960 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
961
962 let runtime = HostRuntime::new(
963 registry,
964 config_provider,
965 DbOptions::None,
966 client_hub,
967 cancel.clone(),
968 Uuid::new_v4(),
969 None,
970 );
971
972 runtime.run_stop_phase().await.unwrap();
974
975 assert_eq!(stop_order.load(Ordering::SeqCst), 3);
979 }
980
981 #[tokio::test]
982 async fn test_stop_phase_continues_on_error() {
983 struct FailingModule {
984 should_fail: bool,
985 stopped: Arc<AtomicUsize>,
986 }
987
988 #[async_trait::async_trait]
989 impl Module for FailingModule {
990 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
991 Ok(())
992 }
993 }
994
995 #[async_trait::async_trait]
996 impl RunnableCapability for FailingModule {
997 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
998 Ok(())
999 }
1000 async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1001 self.stopped.fetch_add(1, Ordering::SeqCst);
1002 if self.should_fail {
1003 anyhow::bail!("Intentional failure")
1004 }
1005 Ok(())
1006 }
1007 }
1008
1009 let stopped = Arc::new(AtomicUsize::new(0));
1010 let module_a = Arc::new(FailingModule {
1011 should_fail: false,
1012 stopped: stopped.clone(),
1013 });
1014 let module_b = Arc::new(FailingModule {
1015 should_fail: true,
1016 stopped: stopped.clone(),
1017 });
1018 let module_c = Arc::new(FailingModule {
1019 should_fail: false,
1020 stopped: stopped.clone(),
1021 });
1022
1023 let mut builder = RegistryBuilder::default();
1024 builder.register_core_with_meta("a", &[], module_a.clone() as Arc<dyn Module>);
1025 builder.register_core_with_meta("b", &["a"], module_b.clone() as Arc<dyn Module>);
1026 builder.register_core_with_meta("c", &["b"], module_c.clone() as Arc<dyn Module>);
1027
1028 builder.register_stateful_with_meta("a", module_a.clone() as Arc<dyn RunnableCapability>);
1029 builder.register_stateful_with_meta("b", module_b.clone() as Arc<dyn RunnableCapability>);
1030 builder.register_stateful_with_meta("c", module_c.clone() as Arc<dyn RunnableCapability>);
1031
1032 let registry = builder.build_topo_sorted().unwrap();
1033
1034 let client_hub = Arc::new(ClientHub::new());
1035 let cancel = CancellationToken::new();
1036 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1037
1038 let runtime = HostRuntime::new(
1039 registry,
1040 config_provider,
1041 DbOptions::None,
1042 client_hub,
1043 cancel.clone(),
1044 Uuid::new_v4(),
1045 None,
1046 );
1047
1048 runtime.run_stop_phase().await.unwrap();
1050
1051 assert_eq!(stopped.load(Ordering::SeqCst), 3);
1053 }
1054
1055 struct EmptyConfigProvider;
1056 impl ConfigProvider for EmptyConfigProvider {
1057 fn get_module_config(&self, _module_name: &str) -> Option<&serde_json::Value> {
1058 None
1059 }
1060 }
1061
1062 #[tokio::test]
1063 async fn test_post_init_runs_after_all_init_and_system_first() {
1064 #[derive(Clone)]
1065 struct TrackHooks {
1066 name: &'static str,
1067 events: Arc<Mutex<Vec<String>>>,
1068 }
1069
1070 #[async_trait::async_trait]
1071 impl Module for TrackHooks {
1072 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1073 self.events.lock().await.push(format!("init:{}", self.name));
1074 Ok(())
1075 }
1076 }
1077
1078 #[async_trait::async_trait]
1079 impl SystemCapability for TrackHooks {
1080 fn pre_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1081 Ok(())
1082 }
1083
1084 async fn post_init(&self, _sys: &crate::runtime::SystemContext) -> anyhow::Result<()> {
1085 self.events
1086 .lock()
1087 .await
1088 .push(format!("post_init:{}", self.name));
1089 Ok(())
1090 }
1091 }
1092
1093 let events = Arc::new(Mutex::new(Vec::<String>::new()));
1094 let sys_a = Arc::new(TrackHooks {
1095 name: "sys_a",
1096 events: events.clone(),
1097 });
1098 let user_b = Arc::new(TrackHooks {
1099 name: "user_b",
1100 events: events.clone(),
1101 });
1102 let user_c = Arc::new(TrackHooks {
1103 name: "user_c",
1104 events: events.clone(),
1105 });
1106
1107 let mut builder = RegistryBuilder::default();
1108 builder.register_core_with_meta("sys_a", &[], sys_a.clone() as Arc<dyn Module>);
1109 builder.register_core_with_meta("user_b", &["sys_a"], user_b.clone() as Arc<dyn Module>);
1110 builder.register_core_with_meta("user_c", &["user_b"], user_c.clone() as Arc<dyn Module>);
1111 builder.register_system_with_meta("sys_a", sys_a.clone() as Arc<dyn SystemCapability>);
1112
1113 let registry = builder.build_topo_sorted().unwrap();
1114
1115 let client_hub = Arc::new(ClientHub::new());
1116 let cancel = CancellationToken::new();
1117 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1118
1119 let runtime = HostRuntime::new(
1120 registry,
1121 config_provider,
1122 DbOptions::None,
1123 client_hub,
1124 cancel,
1125 Uuid::new_v4(),
1126 None,
1127 );
1128
1129 runtime.run_init_phase().await.unwrap();
1131 runtime.run_post_init_phase().await.unwrap();
1132
1133 let events = events.lock().await.clone();
1134 let first_post_init = events
1135 .iter()
1136 .position(|e| e.starts_with("post_init:"))
1137 .expect("expected post_init events");
1138 assert!(
1139 events[..first_post_init]
1140 .iter()
1141 .all(|e| e.starts_with("init:")),
1142 "expected all init events before post_init, got: {events:?}"
1143 );
1144
1145 assert_eq!(
1147 events,
1148 vec![
1149 "init:sys_a",
1150 "init:user_b",
1151 "init:user_c",
1152 "post_init:sys_a",
1153 ]
1154 );
1155 }
1156
1157 #[tokio::test]
1158 async fn test_stop_phase_provides_fresh_deadline_token() {
1159 use std::sync::atomic::AtomicBool;
1160
1161 struct TokenCheckModule {
1162 stop_was_called: AtomicBool,
1163 token_was_cancelled_on_entry: AtomicBool,
1164 }
1165
1166 #[async_trait::async_trait]
1167 impl Module for TokenCheckModule {
1168 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1169 Ok(())
1170 }
1171 }
1172
1173 #[async_trait::async_trait]
1174 impl RunnableCapability for TokenCheckModule {
1175 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1176 Ok(())
1177 }
1178 async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1179 self.stop_was_called.store(true, Ordering::SeqCst);
1181 self.token_was_cancelled_on_entry
1183 .store(deadline_token.is_cancelled(), Ordering::SeqCst);
1184 Ok(())
1185 }
1186 }
1187
1188 let module = Arc::new(TokenCheckModule {
1189 stop_was_called: AtomicBool::new(false),
1190 token_was_cancelled_on_entry: AtomicBool::new(true),
1192 });
1193
1194 let mut builder = RegistryBuilder::default();
1195 builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1196 builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1197
1198 let registry = builder.build_topo_sorted().unwrap();
1199 let client_hub = Arc::new(ClientHub::new());
1200 let cancel = CancellationToken::new();
1201 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1202
1203 let runtime = HostRuntime::new(
1204 registry,
1205 config_provider,
1206 DbOptions::None,
1207 client_hub,
1208 cancel.clone(),
1209 Uuid::new_v4(),
1210 None,
1211 );
1212
1213 runtime.run_stop_phase().await.unwrap();
1215
1216 assert!(
1218 module.stop_was_called.load(Ordering::SeqCst),
1219 "stop() was never called - module may not have been registered correctly"
1220 );
1221
1222 assert!(
1225 !module.token_was_cancelled_on_entry.load(Ordering::SeqCst),
1226 "deadline_token should NOT be cancelled when stop() is called - this enables graceful shutdown"
1227 );
1228 }
1229
1230 #[tokio::test]
1231 async fn test_stop_phase_graceful_shutdown_completes_before_deadline() {
1232 use std::sync::atomic::AtomicBool;
1233 use std::time::Duration;
1234
1235 struct GracefulModule {
1236 graceful_completed: AtomicBool,
1237 deadline_fired: AtomicBool,
1238 }
1239
1240 #[async_trait::async_trait]
1241 impl Module for GracefulModule {
1242 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1243 Ok(())
1244 }
1245 }
1246
1247 #[async_trait::async_trait]
1248 impl RunnableCapability for GracefulModule {
1249 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1250 Ok(())
1251 }
1252 async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1253 tokio::select! {
1255 () = tokio::time::sleep(Duration::from_millis(10)) => {
1256 self.graceful_completed.store(true, Ordering::SeqCst);
1257 }
1258 () = deadline_token.cancelled() => {
1259 self.deadline_fired.store(true, Ordering::SeqCst);
1260 }
1261 }
1262 Ok(())
1263 }
1264 }
1265
1266 let module = Arc::new(GracefulModule {
1267 graceful_completed: AtomicBool::new(false),
1268 deadline_fired: AtomicBool::new(false),
1269 });
1270
1271 let mut builder = RegistryBuilder::default();
1272 builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1273 builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1274
1275 let registry = builder.build_topo_sorted().unwrap();
1276 let client_hub = Arc::new(ClientHub::new());
1277 let cancel = CancellationToken::new();
1278 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1279
1280 let runtime = HostRuntime::new(
1282 registry,
1283 config_provider,
1284 DbOptions::None,
1285 client_hub,
1286 cancel.clone(),
1287 Uuid::new_v4(),
1288 None,
1289 )
1290 .with_shutdown_deadline(Duration::from_secs(5));
1291
1292 runtime.run_stop_phase().await.unwrap();
1293
1294 assert!(
1296 module.graceful_completed.load(Ordering::SeqCst),
1297 "graceful shutdown should complete"
1298 );
1299 assert!(
1301 !module.deadline_fired.load(Ordering::SeqCst),
1302 "deadline should not fire when graceful shutdown completes quickly"
1303 );
1304 }
1305
1306 #[tokio::test]
1307 async fn test_stop_phase_deadline_fires_for_slow_module() {
1308 use std::sync::atomic::AtomicBool;
1309 use std::time::Duration;
1310
1311 struct SlowModule {
1312 graceful_completed: AtomicBool,
1313 deadline_fired: AtomicBool,
1314 }
1315
1316 #[async_trait::async_trait]
1317 impl Module for SlowModule {
1318 async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
1319 Ok(())
1320 }
1321 }
1322
1323 #[async_trait::async_trait]
1324 impl RunnableCapability for SlowModule {
1325 async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
1326 Ok(())
1327 }
1328 async fn stop(&self, deadline_token: CancellationToken) -> anyhow::Result<()> {
1329 tokio::select! {
1331 () = tokio::time::sleep(Duration::from_secs(10)) => {
1332 self.graceful_completed.store(true, Ordering::SeqCst);
1333 }
1334 () = deadline_token.cancelled() => {
1335 self.deadline_fired.store(true, Ordering::SeqCst);
1336 }
1337 }
1338 Ok(())
1339 }
1340 }
1341
1342 let module = Arc::new(SlowModule {
1343 graceful_completed: AtomicBool::new(false),
1344 deadline_fired: AtomicBool::new(false),
1345 });
1346
1347 let mut builder = RegistryBuilder::default();
1348 builder.register_core_with_meta("test", &[], module.clone() as Arc<dyn Module>);
1349 builder.register_stateful_with_meta("test", module.clone() as Arc<dyn RunnableCapability>);
1350
1351 let registry = builder.build_topo_sorted().unwrap();
1352 let client_hub = Arc::new(ClientHub::new());
1353 let cancel = CancellationToken::new();
1354 let config_provider: Arc<dyn ConfigProvider> = Arc::new(EmptyConfigProvider);
1355
1356 let runtime = HostRuntime::new(
1358 registry,
1359 config_provider,
1360 DbOptions::None,
1361 client_hub,
1362 cancel.clone(),
1363 Uuid::new_v4(),
1364 None,
1365 )
1366 .with_shutdown_deadline(Duration::from_millis(100));
1367
1368 runtime.run_stop_phase().await.unwrap();
1369
1370 assert!(
1372 !module.graceful_completed.load(Ordering::SeqCst),
1373 "graceful shutdown should not complete when deadline fires first"
1374 );
1375 assert!(
1377 module.deadline_fired.load(Ordering::SeqCst),
1378 "deadline should fire for slow modules"
1379 );
1380 }
1381}