module_orchestrator/
module.rs1use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::{Arc, OnceLock};
6use tokio::sync::RwLock;
7
8use modkit::DirectoryClient;
9use modkit::context::ModuleCtx;
10use modkit::contracts::{GrpcServiceCapability, RegisterGrpcServiceFn, SystemCapability};
11use modkit::directory::LocalDirectoryClient;
12use modkit::runtime::ModuleManager;
13
14use cf_system_sdks::directory::DIRECTORY_SERVICE_NAME;
15
16use crate::server;
17
18#[derive(Clone, Debug, Default, serde::Deserialize)]
20pub struct ModuleOrchestratorConfig;
21
22#[modkit::module(
29 name = "module-orchestrator",
30 capabilities = [grpc, system],
31 client = cf_system_sdks::directory::DirectoryClient
32)]
33pub struct ModuleOrchestrator {
34 config: RwLock<ModuleOrchestratorConfig>,
35 directory_api: OnceLock<Arc<dyn DirectoryClient>>,
36 module_manager: OnceLock<Arc<ModuleManager>>,
37}
38
39impl Default for ModuleOrchestrator {
40 fn default() -> Self {
41 Self {
42 config: RwLock::new(ModuleOrchestratorConfig),
43 directory_api: OnceLock::new(),
44 module_manager: OnceLock::new(),
45 }
46 }
47}
48
49#[async_trait]
50impl SystemCapability for ModuleOrchestrator {
51 fn pre_init(&self, sys: &modkit::runtime::SystemContext) -> anyhow::Result<()> {
52 self.module_manager
53 .set(Arc::clone(&sys.module_manager))
54 .map_err(|_| anyhow::anyhow!("ModuleManager already set (pre_init called twice?)"))?;
55 Ok(())
56 }
57}
58
59#[async_trait]
60impl modkit::Module for ModuleOrchestrator {
61 async fn init(&self, ctx: &ModuleCtx) -> Result<()> {
62 let cfg = ctx.config::<ModuleOrchestratorConfig>().unwrap_or_default();
64 *self.config.write().await = cfg;
65
66 let manager =
68 self.module_manager.get().cloned().ok_or_else(|| {
69 anyhow::anyhow!("ModuleManager not wired into ModuleOrchestrator")
70 })?;
71
72 let api_impl: Arc<dyn DirectoryClient> = Arc::new(LocalDirectoryClient::new(manager));
73
74 ctx.client_hub()
76 .register::<dyn DirectoryClient>(api_impl.clone());
77
78 self.directory_api
79 .set(api_impl)
80 .map_err(|_| anyhow::anyhow!("DirectoryClient already set (init called twice?)"))?;
81
82 tracing::info!("ModuleOrchestrator initialized");
83
84 Ok(())
85 }
86}
87
88#[async_trait]
90impl GrpcServiceCapability for ModuleOrchestrator {
91 async fn get_grpc_services(&self, _ctx: &ModuleCtx) -> Result<Vec<RegisterGrpcServiceFn>> {
92 let api = self
93 .directory_api
94 .get()
95 .cloned()
96 .ok_or_else(|| anyhow::anyhow!("DirectoryClient not initialized"))?;
97
98 let directory_svc = server::make_directory_service(api);
100
101 Ok(vec![RegisterGrpcServiceFn {
102 service_name: DIRECTORY_SERVICE_NAME,
103 register: Box::new(move |routes| {
104 routes.add_service(directory_svc.clone());
105 }),
106 }])
107 }
108}