module_orchestrator/
module.rs1use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::{Arc, OnceLock};
6use tokio::sync::RwLock;
7
8use modkit::DirectoryClient;
9use modkit::context::ModuleCtx;
10use modkit::contracts::{
11 GrpcServiceCapability, OpenApiRegistry, RegisterGrpcServiceFn, RestApiCapability,
12 SystemCapability,
13};
14use modkit::directory::LocalDirectoryClient;
15use modkit::registry::ModuleRegistry;
16use modkit::runtime::ModuleManager;
17
18use cf_system_sdks::directory::DIRECTORY_SERVICE_NAME;
19
20use crate::domain::service::ModulesService;
21use crate::server;
22
23#[derive(Clone, Debug, Default, serde::Deserialize)]
25#[allow(
26 clippy::empty_structs_with_brackets,
27 reason = "empty struct is required for config deserialization"
28)]
29pub struct ModuleOrchestratorConfig {}
30
31#[modkit::module(
39 name = "module-orchestrator",
40 capabilities = [grpc, system, rest],
41 client = cf_system_sdks::directory::DirectoryClient
42)]
43pub struct ModuleOrchestrator {
44 config: RwLock<ModuleOrchestratorConfig>,
45 directory_api: OnceLock<Arc<dyn DirectoryClient>>,
46 module_manager: OnceLock<Arc<ModuleManager>>,
47 modules_service: OnceLock<Arc<ModulesService>>,
48}
49
50impl Default for ModuleOrchestrator {
51 fn default() -> Self {
52 Self {
53 config: RwLock::new(ModuleOrchestratorConfig {}),
54 directory_api: OnceLock::new(),
55 module_manager: OnceLock::new(),
56 modules_service: OnceLock::new(),
57 }
58 }
59}
60
61#[async_trait]
62impl SystemCapability for ModuleOrchestrator {
63 fn pre_init(&self, sys: &modkit::runtime::SystemContext) -> anyhow::Result<()> {
64 self.module_manager
65 .set(Arc::clone(&sys.module_manager))
66 .map_err(|_| anyhow::anyhow!("ModuleManager already set (pre_init called twice?)"))?;
67 Ok(())
68 }
69}
70
71#[async_trait]
72impl modkit::Module for ModuleOrchestrator {
73 async fn init(&self, ctx: &ModuleCtx) -> Result<()> {
74 let cfg = ctx.config_or_default::<ModuleOrchestratorConfig>()?;
76 *self.config.write().await = cfg;
77
78 let manager =
80 self.module_manager.get().cloned().ok_or_else(|| {
81 anyhow::anyhow!("ModuleManager not wired into ModuleOrchestrator")
82 })?;
83
84 let api_impl: Arc<dyn DirectoryClient> =
85 Arc::new(LocalDirectoryClient::new(manager.clone()));
86
87 ctx.client_hub()
89 .register::<dyn DirectoryClient>(api_impl.clone());
90
91 self.directory_api
92 .set(api_impl)
93 .map_err(|_| anyhow::anyhow!("DirectoryClient already set (init called twice?)"))?;
94
95 let registry = ModuleRegistry::discover_and_build()
97 .map_err(|e| anyhow::anyhow!("Failed to build module registry: {e}"))?;
98 let modules_service = Arc::new(ModulesService::new(®istry, manager));
99 self.modules_service
100 .set(modules_service)
101 .map_err(|_| anyhow::anyhow!("ModulesService already set (init called twice?)"))?;
102
103 tracing::info!("ModuleOrchestrator initialized");
104
105 Ok(())
106 }
107}
108
109impl RestApiCapability for ModuleOrchestrator {
110 fn register_rest(
111 &self,
112 _ctx: &ModuleCtx,
113 router: axum::Router,
114 openapi: &dyn OpenApiRegistry,
115 ) -> Result<axum::Router> {
116 let service = Arc::clone(
117 self.modules_service
118 .get()
119 .ok_or_else(|| anyhow::anyhow!("ModulesService not initialized"))?,
120 );
121
122 let router = crate::api::rest::routes::register_routes(router, openapi, service);
123
124 tracing::info!("ModuleOrchestrator REST routes registered");
125 Ok(router)
126 }
127}
128
129#[async_trait]
131impl GrpcServiceCapability for ModuleOrchestrator {
132 async fn get_grpc_services(&self, _ctx: &ModuleCtx) -> Result<Vec<RegisterGrpcServiceFn>> {
133 let api = self
134 .directory_api
135 .get()
136 .cloned()
137 .ok_or_else(|| anyhow::anyhow!("DirectoryClient not initialized"))?;
138
139 let directory_svc = server::make_directory_service(api);
141
142 Ok(vec![RegisterGrpcServiceFn {
143 service_name: DIRECTORY_SERVICE_NAME,
144 register: Box::new(move |routes| {
145 routes.add_service(directory_svc.clone());
146 }),
147 }])
148 }
149}