Skip to main content

modkit/
directory.rs

1//! Directory API - contract for service discovery and instance resolution
2
3use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::Arc;
6use uuid::Uuid;
7
8use crate::runtime::{Endpoint, ModuleInstance, ModuleManager};
9
10// Re-export all types from contracts - this is the single source of truth
11pub use cf_system_sdks::directory::{
12    DirectoryClient, RegisterInstanceInfo, ServiceEndpoint, ServiceInstanceInfo,
13};
14
15/// Local implementation of `DirectoryClient` that delegates to `ModuleManager`
16///
17/// This is the in-process implementation used by modules running in the same
18/// process as the module orchestrator.
19pub struct LocalDirectoryClient {
20    mgr: Arc<ModuleManager>,
21}
22
23impl LocalDirectoryClient {
24    #[must_use]
25    pub fn new(mgr: Arc<ModuleManager>) -> Self {
26        Self { mgr }
27    }
28}
29
30#[async_trait]
31impl DirectoryClient for LocalDirectoryClient {
32    async fn resolve_grpc_service(&self, service_name: &str) -> Result<ServiceEndpoint> {
33        if let Some((_module, _inst, ep)) = self.mgr.pick_service_round_robin(service_name) {
34            return Ok(ServiceEndpoint::new(ep.uri));
35        }
36
37        anyhow::bail!("Service not found or no healthy instances: {service_name}")
38    }
39
40    async fn list_instances(&self, module: &str) -> Result<Vec<ServiceInstanceInfo>> {
41        let mut result = Vec::new();
42
43        for inst in self.mgr.instances_of(module) {
44            if let Some((_, ep)) = inst.grpc_services.iter().next() {
45                result.push(ServiceInstanceInfo {
46                    module: module.to_owned(),
47                    instance_id: inst.instance_id.to_string(),
48                    endpoint: ServiceEndpoint::new(ep.uri.clone()),
49                    version: inst.version.clone(),
50                });
51            }
52        }
53
54        Ok(result)
55    }
56
57    async fn register_instance(&self, info: RegisterInstanceInfo) -> Result<()> {
58        // Parse instance_id from string to Uuid
59        let instance_id = Uuid::parse_str(&info.instance_id)
60            .map_err(|e| anyhow::anyhow!("Invalid instance_id '{}': {}", info.instance_id, e))?;
61
62        // Build a ModuleInstance from RegisterInstanceInfo
63        let mut instance = ModuleInstance::new(info.module.clone(), instance_id);
64
65        // Apply version if provided
66        if let Some(version) = info.version {
67            instance = instance.with_version(version);
68        }
69
70        // Add all gRPC services
71        for (service_name, endpoint) in info.grpc_services {
72            instance = instance.with_grpc_service(service_name, Endpoint::from_uri(endpoint.uri));
73        }
74
75        // Register the instance with the manager
76        self.mgr.register_instance(Arc::new(instance));
77
78        Ok(())
79    }
80
81    async fn deregister_instance(&self, module: &str, instance_id: &str) -> Result<()> {
82        let instance_id = Uuid::parse_str(instance_id)
83            .map_err(|e| anyhow::anyhow!("Invalid instance_id '{instance_id}': {e}"))?;
84        self.mgr.deregister(module, instance_id);
85        Ok(())
86    }
87
88    async fn send_heartbeat(&self, module: &str, instance_id: &str) -> Result<()> {
89        let instance_id = Uuid::parse_str(instance_id)
90            .map_err(|e| anyhow::anyhow!("Invalid instance_id '{instance_id}': {e}"))?;
91        self.mgr
92            .update_heartbeat(module, instance_id, std::time::Instant::now());
93        Ok(())
94    }
95}
96
97#[cfg(test)]
98#[cfg_attr(coverage_nightly, coverage(off))]
99mod tests {
100    use super::*;
101
102    #[tokio::test]
103    async fn test_resolve_grpc_service_not_found() {
104        let dir = Arc::new(ModuleManager::new());
105        let api = LocalDirectoryClient::new(dir);
106
107        let result = api.resolve_grpc_service("nonexistent.Service").await;
108        assert!(result.is_err());
109    }
110
111    #[tokio::test]
112    async fn test_register_instance_via_api() {
113        let dir = Arc::new(ModuleManager::new());
114        let api = LocalDirectoryClient::new(dir.clone());
115
116        let instance_id = Uuid::new_v4();
117        // Register an instance through the API
118        let register_info = RegisterInstanceInfo {
119            module: "test_module".to_owned(),
120            instance_id: instance_id.to_string(),
121            grpc_services: vec![(
122                "test.Service".to_owned(),
123                ServiceEndpoint::http("127.0.0.1", 8001),
124            )],
125            version: Some("1.0.0".to_owned()),
126        };
127
128        api.register_instance(register_info).await.unwrap();
129
130        // Verify the instance was registered
131        let instances = dir.instances_of("test_module");
132        assert_eq!(instances.len(), 1);
133        assert_eq!(instances[0].instance_id, instance_id);
134        assert_eq!(instances[0].version, Some("1.0.0".to_owned()));
135        assert!(instances[0].grpc_services.contains_key("test.Service"));
136    }
137
138    #[tokio::test]
139    async fn test_deregister_instance_via_api() {
140        let dir = Arc::new(ModuleManager::new());
141        let api = LocalDirectoryClient::new(dir.clone());
142
143        let instance_id = Uuid::new_v4();
144        // Register an instance first
145        let inst = Arc::new(ModuleInstance::new("test_module", instance_id));
146        dir.register_instance(inst);
147
148        // Verify it exists
149        assert_eq!(dir.instances_of("test_module").len(), 1);
150
151        // Deregister via API
152        api.deregister_instance("test_module", &instance_id.to_string())
153            .await
154            .unwrap();
155
156        // Verify it's gone
157        assert_eq!(dir.instances_of("test_module").len(), 0);
158    }
159
160    #[tokio::test]
161    async fn test_send_heartbeat_via_api() {
162        use crate::runtime::InstanceState;
163
164        let dir = Arc::new(ModuleManager::new());
165        let api = LocalDirectoryClient::new(dir.clone());
166
167        let instance_id = Uuid::new_v4();
168        // Register an instance first
169        let inst = Arc::new(ModuleInstance::new("test_module", instance_id));
170        dir.register_instance(inst);
171
172        // Verify initial state is Registered
173        let instances = dir.instances_of("test_module");
174        assert_eq!(instances[0].state(), InstanceState::Registered);
175
176        // Send heartbeat via API
177        api.send_heartbeat("test_module", &instance_id.to_string())
178            .await
179            .unwrap();
180
181        // Verify state transitioned to Healthy
182        let instances = dir.instances_of("test_module");
183        assert_eq!(instances[0].state(), InstanceState::Healthy);
184    }
185}