Skip to main content

module_orchestrator/
module.rs

1//! Module definition for `ModuleOrchestrator`
2
3use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::{Arc, OnceLock};
6use tokio::sync::RwLock;
7
8use modkit::DirectoryClient;
9use modkit::context::ModuleCtx;
10use modkit::contracts::{
11    GrpcServiceCapability, OpenApiRegistry, RegisterGrpcServiceFn, RestApiCapability,
12    SystemCapability,
13};
14use modkit::directory::LocalDirectoryClient;
15use modkit::registry::ModuleRegistry;
16use modkit::runtime::ModuleManager;
17
18use cf_system_sdks::directory::DIRECTORY_SERVICE_NAME;
19
20use crate::domain::service::ModulesService;
21use crate::server;
22
23/// Configuration for the module orchestrator
24#[derive(Clone, Debug, Default, serde::Deserialize)]
25pub struct ModuleOrchestratorConfig;
26
27/// Module Orchestrator - system module for service discovery
28///
29/// This module:
30/// - Provides `DirectoryClient` to the `ClientHub` for in-process modules
31/// - Exposes `DirectoryService` gRPC service via `grpc-hub`
32/// - Tracks module instances and provides service resolution
33/// - Exposes REST API to list all registered modules
34#[modkit::module(
35    name = "module-orchestrator",
36    capabilities = [grpc, system, rest],
37    client = cf_system_sdks::directory::DirectoryClient
38)]
39pub struct ModuleOrchestrator {
40    config: RwLock<ModuleOrchestratorConfig>,
41    directory_api: OnceLock<Arc<dyn DirectoryClient>>,
42    module_manager: OnceLock<Arc<ModuleManager>>,
43    modules_service: OnceLock<Arc<ModulesService>>,
44}
45
46impl Default for ModuleOrchestrator {
47    fn default() -> Self {
48        Self {
49            config: RwLock::new(ModuleOrchestratorConfig),
50            directory_api: OnceLock::new(),
51            module_manager: OnceLock::new(),
52            modules_service: OnceLock::new(),
53        }
54    }
55}
56
57#[async_trait]
58impl SystemCapability for ModuleOrchestrator {
59    fn pre_init(&self, sys: &modkit::runtime::SystemContext) -> anyhow::Result<()> {
60        self.module_manager
61            .set(Arc::clone(&sys.module_manager))
62            .map_err(|_| anyhow::anyhow!("ModuleManager already set (pre_init called twice?)"))?;
63        Ok(())
64    }
65}
66
67#[async_trait]
68impl modkit::Module for ModuleOrchestrator {
69    async fn init(&self, ctx: &ModuleCtx) -> Result<()> {
70        // Load configuration if present
71        let cfg = ctx.config::<ModuleOrchestratorConfig>().unwrap_or_default();
72        *self.config.write().await = cfg;
73
74        // Use the injected ModuleManager to create the DirectoryClient
75        let manager =
76            self.module_manager.get().cloned().ok_or_else(|| {
77                anyhow::anyhow!("ModuleManager not wired into ModuleOrchestrator")
78            })?;
79
80        let api_impl: Arc<dyn DirectoryClient> =
81            Arc::new(LocalDirectoryClient::new(manager.clone()));
82
83        // Register in ClientHub directly
84        ctx.client_hub()
85            .register::<dyn DirectoryClient>(api_impl.clone());
86
87        self.directory_api
88            .set(api_impl)
89            .map_err(|_| anyhow::anyhow!("DirectoryClient already set (init called twice?)"))?;
90
91        // Build compiled-module catalog from inventory and create the ModulesService
92        let registry = ModuleRegistry::discover_and_build()
93            .map_err(|e| anyhow::anyhow!("Failed to build module registry: {e}"))?;
94        let modules_service = Arc::new(ModulesService::new(&registry, manager));
95        self.modules_service
96            .set(modules_service)
97            .map_err(|_| anyhow::anyhow!("ModulesService already set (init called twice?)"))?;
98
99        tracing::info!("ModuleOrchestrator initialized");
100
101        Ok(())
102    }
103}
104
105impl RestApiCapability for ModuleOrchestrator {
106    fn register_rest(
107        &self,
108        _ctx: &ModuleCtx,
109        router: axum::Router,
110        openapi: &dyn OpenApiRegistry,
111    ) -> Result<axum::Router> {
112        let service = Arc::clone(
113            self.modules_service
114                .get()
115                .ok_or_else(|| anyhow::anyhow!("ModulesService not initialized"))?,
116        );
117
118        let router = crate::api::rest::routes::register_routes(router, openapi, service);
119
120        tracing::info!("ModuleOrchestrator REST routes registered");
121        Ok(router)
122    }
123}
124
125/// Export gRPC services to `grpc-hub`
126#[async_trait]
127impl GrpcServiceCapability for ModuleOrchestrator {
128    async fn get_grpc_services(&self, _ctx: &ModuleCtx) -> Result<Vec<RegisterGrpcServiceFn>> {
129        let api = self
130            .directory_api
131            .get()
132            .cloned()
133            .ok_or_else(|| anyhow::anyhow!("DirectoryClient not initialized"))?;
134
135        // Build DirectoryService
136        let directory_svc = server::make_directory_service(api);
137
138        Ok(vec![RegisterGrpcServiceFn {
139            service_name: DIRECTORY_SERVICE_NAME,
140            register: Box::new(move |routes| {
141                routes.add_service(directory_svc.clone());
142            }),
143        }])
144    }
145}