Skip to main content

module_orchestrator/
module.rs

1//! Module definition for `ModuleOrchestrator`
2
3use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::{Arc, OnceLock};
6use tokio::sync::RwLock;
7
8use modkit::DirectoryClient;
9use modkit::context::ModuleCtx;
10use modkit::contracts::{GrpcServiceCapability, RegisterGrpcServiceFn, SystemCapability};
11use modkit::directory::LocalDirectoryClient;
12use modkit::runtime::ModuleManager;
13
14use cf_system_sdks::directory::DIRECTORY_SERVICE_NAME;
15
16use crate::server;
17
18/// Configuration for the module orchestrator
19#[derive(Clone, Debug, Default, serde::Deserialize)]
20pub struct ModuleOrchestratorConfig;
21
22/// Module Orchestrator - system module for service discovery
23///
24/// This module:
25/// - Provides `DirectoryClient` to the `ClientHub` for in-process modules
26/// - Exposes `DirectoryService` gRPC service via `grpc-hub`
27/// - Tracks module instances and provides service resolution
28#[modkit::module(
29    name = "module-orchestrator",
30    capabilities = [grpc, system],
31    client = cf_system_sdks::directory::DirectoryClient
32)]
33pub struct ModuleOrchestrator {
34    config: RwLock<ModuleOrchestratorConfig>,
35    directory_api: OnceLock<Arc<dyn DirectoryClient>>,
36    module_manager: OnceLock<Arc<ModuleManager>>,
37}
38
39impl Default for ModuleOrchestrator {
40    fn default() -> Self {
41        Self {
42            config: RwLock::new(ModuleOrchestratorConfig),
43            directory_api: OnceLock::new(),
44            module_manager: OnceLock::new(),
45        }
46    }
47}
48
49#[async_trait]
50impl SystemCapability for ModuleOrchestrator {
51    fn pre_init(&self, sys: &modkit::runtime::SystemContext) -> anyhow::Result<()> {
52        self.module_manager
53            .set(Arc::clone(&sys.module_manager))
54            .map_err(|_| anyhow::anyhow!("ModuleManager already set (pre_init called twice?)"))?;
55        Ok(())
56    }
57}
58
59#[async_trait]
60impl modkit::Module for ModuleOrchestrator {
61    async fn init(&self, ctx: &ModuleCtx) -> Result<()> {
62        // Load configuration if present
63        let cfg = ctx.config::<ModuleOrchestratorConfig>().unwrap_or_default();
64        *self.config.write().await = cfg;
65
66        // Use the injected ModuleManager to create the DirectoryClient
67        let manager =
68            self.module_manager.get().cloned().ok_or_else(|| {
69                anyhow::anyhow!("ModuleManager not wired into ModuleOrchestrator")
70            })?;
71
72        let api_impl: Arc<dyn DirectoryClient> = Arc::new(LocalDirectoryClient::new(manager));
73
74        // Register in ClientHub directly
75        ctx.client_hub()
76            .register::<dyn DirectoryClient>(api_impl.clone());
77
78        self.directory_api
79            .set(api_impl)
80            .map_err(|_| anyhow::anyhow!("DirectoryClient already set (init called twice?)"))?;
81
82        tracing::info!("ModuleOrchestrator initialized");
83
84        Ok(())
85    }
86}
87
88/// Export gRPC services to `grpc-hub`
89#[async_trait]
90impl GrpcServiceCapability for ModuleOrchestrator {
91    async fn get_grpc_services(&self, _ctx: &ModuleCtx) -> Result<Vec<RegisterGrpcServiceFn>> {
92        let api = self
93            .directory_api
94            .get()
95            .cloned()
96            .ok_or_else(|| anyhow::anyhow!("DirectoryClient not initialized"))?;
97
98        // Build DirectoryService
99        let directory_svc = server::make_directory_service(api);
100
101        Ok(vec![RegisterGrpcServiceFn {
102            service_name: DIRECTORY_SERVICE_NAME,
103            register: Box::new(move |routes| {
104                routes.add_service(directory_svc.clone());
105            }),
106        }])
107    }
108}