Skip to main content

module_orchestrator/
module.rs

1//! Module definition for `ModuleOrchestrator`
2
3use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::{Arc, OnceLock};
6use tokio::sync::RwLock;
7
8use modkit::DirectoryClient;
9use modkit::context::ModuleCtx;
10use modkit::contracts::{
11    GrpcServiceCapability, OpenApiRegistry, RegisterGrpcServiceFn, RestApiCapability,
12    SystemCapability,
13};
14use modkit::directory::LocalDirectoryClient;
15use modkit::registry::ModuleRegistry;
16use modkit::runtime::ModuleManager;
17
18use cf_system_sdks::directory::DIRECTORY_SERVICE_NAME;
19
20use crate::domain::service::ModulesService;
21use crate::server;
22
23/// Configuration for the module orchestrator
24#[derive(Clone, Debug, Default, serde::Deserialize)]
25#[allow(
26    clippy::empty_structs_with_brackets,
27    reason = "empty struct is required for config deserialization"
28)]
29pub struct ModuleOrchestratorConfig {}
30
31/// Module Orchestrator - system module for service discovery
32///
33/// This module:
34/// - Provides `DirectoryClient` to the `ClientHub` for in-process modules
35/// - Exposes `DirectoryService` gRPC service via `grpc-hub`
36/// - Tracks module instances and provides service resolution
37/// - Exposes REST API to list all registered modules
38#[modkit::module(
39    name = "module-orchestrator",
40    capabilities = [grpc, system, rest],
41    client = cf_system_sdks::directory::DirectoryClient
42)]
43pub struct ModuleOrchestrator {
44    config: RwLock<ModuleOrchestratorConfig>,
45    directory_api: OnceLock<Arc<dyn DirectoryClient>>,
46    module_manager: OnceLock<Arc<ModuleManager>>,
47    modules_service: OnceLock<Arc<ModulesService>>,
48}
49
50impl Default for ModuleOrchestrator {
51    fn default() -> Self {
52        Self {
53            config: RwLock::new(ModuleOrchestratorConfig {}),
54            directory_api: OnceLock::new(),
55            module_manager: OnceLock::new(),
56            modules_service: OnceLock::new(),
57        }
58    }
59}
60
61#[async_trait]
62impl SystemCapability for ModuleOrchestrator {
63    fn pre_init(&self, sys: &modkit::runtime::SystemContext) -> anyhow::Result<()> {
64        self.module_manager
65            .set(Arc::clone(&sys.module_manager))
66            .map_err(|_| anyhow::anyhow!("ModuleManager already set (pre_init called twice?)"))?;
67        Ok(())
68    }
69}
70
71#[async_trait]
72impl modkit::Module for ModuleOrchestrator {
73    async fn init(&self, ctx: &ModuleCtx) -> Result<()> {
74        // Load configuration if present
75        let cfg = ctx.config_or_default::<ModuleOrchestratorConfig>()?;
76        *self.config.write().await = cfg;
77
78        // Use the injected ModuleManager to create the DirectoryClient
79        let manager =
80            self.module_manager.get().cloned().ok_or_else(|| {
81                anyhow::anyhow!("ModuleManager not wired into ModuleOrchestrator")
82            })?;
83
84        let api_impl: Arc<dyn DirectoryClient> =
85            Arc::new(LocalDirectoryClient::new(manager.clone()));
86
87        // Register in ClientHub directly
88        ctx.client_hub()
89            .register::<dyn DirectoryClient>(api_impl.clone());
90
91        self.directory_api
92            .set(api_impl)
93            .map_err(|_| anyhow::anyhow!("DirectoryClient already set (init called twice?)"))?;
94
95        // Build compiled-module catalog from inventory and create the ModulesService
96        let registry = ModuleRegistry::discover_and_build()
97            .map_err(|e| anyhow::anyhow!("Failed to build module registry: {e}"))?;
98        let modules_service = Arc::new(ModulesService::new(&registry, manager));
99        self.modules_service
100            .set(modules_service)
101            .map_err(|_| anyhow::anyhow!("ModulesService already set (init called twice?)"))?;
102
103        tracing::info!("ModuleOrchestrator initialized");
104
105        Ok(())
106    }
107}
108
109impl RestApiCapability for ModuleOrchestrator {
110    fn register_rest(
111        &self,
112        _ctx: &ModuleCtx,
113        router: axum::Router,
114        openapi: &dyn OpenApiRegistry,
115    ) -> Result<axum::Router> {
116        let service = Arc::clone(
117            self.modules_service
118                .get()
119                .ok_or_else(|| anyhow::anyhow!("ModulesService not initialized"))?,
120        );
121
122        let router = crate::api::rest::routes::register_routes(router, openapi, service);
123
124        tracing::info!("ModuleOrchestrator REST routes registered");
125        Ok(router)
126    }
127}
128
129/// Export gRPC services to `grpc-hub`
130#[async_trait]
131impl GrpcServiceCapability for ModuleOrchestrator {
132    async fn get_grpc_services(&self, _ctx: &ModuleCtx) -> Result<Vec<RegisterGrpcServiceFn>> {
133        let api = self
134            .directory_api
135            .get()
136            .cloned()
137            .ok_or_else(|| anyhow::anyhow!("DirectoryClient not initialized"))?;
138
139        // Build DirectoryService
140        let directory_svc = server::make_directory_service(api);
141
142        Ok(vec![RegisterGrpcServiceFn {
143            service_name: DIRECTORY_SERVICE_NAME,
144            register: Box::new(move |routes| {
145                routes.add_service(directory_svc.clone());
146            }),
147        }])
148    }
149}