module_orchestrator/
module.rs1use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::{Arc, OnceLock};
6use tokio::sync::RwLock;
7
8use modkit::DirectoryClient;
9use modkit::context::ModuleCtx;
10use modkit::contracts::{
11 GrpcServiceCapability, OpenApiRegistry, RegisterGrpcServiceFn, RestApiCapability,
12 SystemCapability,
13};
14use modkit::directory::LocalDirectoryClient;
15use modkit::registry::ModuleRegistry;
16use modkit::runtime::ModuleManager;
17
18use cf_system_sdks::directory::DIRECTORY_SERVICE_NAME;
19
20use crate::domain::service::ModulesService;
21use crate::server;
22
23#[derive(Clone, Debug, Default, serde::Deserialize)]
25pub struct ModuleOrchestratorConfig;
26
27#[modkit::module(
35 name = "module-orchestrator",
36 capabilities = [grpc, system, rest],
37 client = cf_system_sdks::directory::DirectoryClient
38)]
39pub struct ModuleOrchestrator {
40 config: RwLock<ModuleOrchestratorConfig>,
41 directory_api: OnceLock<Arc<dyn DirectoryClient>>,
42 module_manager: OnceLock<Arc<ModuleManager>>,
43 modules_service: OnceLock<Arc<ModulesService>>,
44}
45
46impl Default for ModuleOrchestrator {
47 fn default() -> Self {
48 Self {
49 config: RwLock::new(ModuleOrchestratorConfig),
50 directory_api: OnceLock::new(),
51 module_manager: OnceLock::new(),
52 modules_service: OnceLock::new(),
53 }
54 }
55}
56
57#[async_trait]
58impl SystemCapability for ModuleOrchestrator {
59 fn pre_init(&self, sys: &modkit::runtime::SystemContext) -> anyhow::Result<()> {
60 self.module_manager
61 .set(Arc::clone(&sys.module_manager))
62 .map_err(|_| anyhow::anyhow!("ModuleManager already set (pre_init called twice?)"))?;
63 Ok(())
64 }
65}
66
67#[async_trait]
68impl modkit::Module for ModuleOrchestrator {
69 async fn init(&self, ctx: &ModuleCtx) -> Result<()> {
70 let cfg = ctx.config::<ModuleOrchestratorConfig>().unwrap_or_default();
72 *self.config.write().await = cfg;
73
74 let manager =
76 self.module_manager.get().cloned().ok_or_else(|| {
77 anyhow::anyhow!("ModuleManager not wired into ModuleOrchestrator")
78 })?;
79
80 let api_impl: Arc<dyn DirectoryClient> =
81 Arc::new(LocalDirectoryClient::new(manager.clone()));
82
83 ctx.client_hub()
85 .register::<dyn DirectoryClient>(api_impl.clone());
86
87 self.directory_api
88 .set(api_impl)
89 .map_err(|_| anyhow::anyhow!("DirectoryClient already set (init called twice?)"))?;
90
91 let registry = ModuleRegistry::discover_and_build()
93 .map_err(|e| anyhow::anyhow!("Failed to build module registry: {e}"))?;
94 let modules_service = Arc::new(ModulesService::new(®istry, manager));
95 self.modules_service
96 .set(modules_service)
97 .map_err(|_| anyhow::anyhow!("ModulesService already set (init called twice?)"))?;
98
99 tracing::info!("ModuleOrchestrator initialized");
100
101 Ok(())
102 }
103}
104
105impl RestApiCapability for ModuleOrchestrator {
106 fn register_rest(
107 &self,
108 _ctx: &ModuleCtx,
109 router: axum::Router,
110 openapi: &dyn OpenApiRegistry,
111 ) -> Result<axum::Router> {
112 let service = Arc::clone(
113 self.modules_service
114 .get()
115 .ok_or_else(|| anyhow::anyhow!("ModulesService not initialized"))?,
116 );
117
118 let router = crate::api::rest::routes::register_routes(router, openapi, service);
119
120 tracing::info!("ModuleOrchestrator REST routes registered");
121 Ok(router)
122 }
123}
124
125#[async_trait]
127impl GrpcServiceCapability for ModuleOrchestrator {
128 async fn get_grpc_services(&self, _ctx: &ModuleCtx) -> Result<Vec<RegisterGrpcServiceFn>> {
129 let api = self
130 .directory_api
131 .get()
132 .cloned()
133 .ok_or_else(|| anyhow::anyhow!("DirectoryClient not initialized"))?;
134
135 let directory_svc = server::make_directory_service(api);
137
138 Ok(vec![RegisterGrpcServiceFn {
139 service_name: DIRECTORY_SERVICE_NAME,
140 register: Box::new(move |routes| {
141 routes.add_service(directory_svc.clone());
142 }),
143 }])
144 }
145}