1use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::Arc;
6use uuid::Uuid;
7
8use crate::runtime::{Endpoint, ModuleInstance, ModuleManager};
9
10pub use cf_system_sdks::directory::{
12 DirectoryClient, RegisterInstanceInfo, ServiceEndpoint, ServiceInstanceInfo,
13};
14
15pub struct LocalDirectoryClient {
20 mgr: Arc<ModuleManager>,
21}
22
23impl LocalDirectoryClient {
24 #[must_use]
25 pub fn new(mgr: Arc<ModuleManager>) -> Self {
26 Self { mgr }
27 }
28}
29
30#[async_trait]
31impl DirectoryClient for LocalDirectoryClient {
32 async fn resolve_grpc_service(&self, service_name: &str) -> Result<ServiceEndpoint> {
33 if let Some((_module, _inst, ep)) = self.mgr.pick_service_round_robin(service_name) {
34 return Ok(ServiceEndpoint::new(ep.uri));
35 }
36
37 anyhow::bail!("Service not found or no healthy instances: {service_name}")
38 }
39
40 async fn list_instances(&self, module: &str) -> Result<Vec<ServiceInstanceInfo>> {
41 let mut result = Vec::new();
42
43 for inst in self.mgr.instances_of(module) {
44 if let Some((_, ep)) = inst.grpc_services.iter().next() {
45 result.push(ServiceInstanceInfo {
46 module: module.to_owned(),
47 instance_id: inst.instance_id.to_string(),
48 endpoint: ServiceEndpoint::new(ep.uri.clone()),
49 version: inst.version.clone(),
50 });
51 }
52 }
53
54 Ok(result)
55 }
56
57 async fn register_instance(&self, info: RegisterInstanceInfo) -> Result<()> {
58 let instance_id = Uuid::parse_str(&info.instance_id)
60 .map_err(|e| anyhow::anyhow!("Invalid instance_id '{}': {}", info.instance_id, e))?;
61
62 let mut instance = ModuleInstance::new(info.module.clone(), instance_id);
64
65 if let Some(version) = info.version {
67 instance = instance.with_version(version);
68 }
69
70 for (service_name, endpoint) in info.grpc_services {
72 instance = instance.with_grpc_service(service_name, Endpoint::from_uri(endpoint.uri));
73 }
74
75 self.mgr.register_instance(Arc::new(instance));
77
78 Ok(())
79 }
80
81 async fn deregister_instance(&self, module: &str, instance_id: &str) -> Result<()> {
82 let instance_id = Uuid::parse_str(instance_id)
83 .map_err(|e| anyhow::anyhow!("Invalid instance_id '{instance_id}': {e}"))?;
84 self.mgr.deregister(module, instance_id);
85 Ok(())
86 }
87
88 async fn send_heartbeat(&self, module: &str, instance_id: &str) -> Result<()> {
89 let instance_id = Uuid::parse_str(instance_id)
90 .map_err(|e| anyhow::anyhow!("Invalid instance_id '{instance_id}': {e}"))?;
91 self.mgr
92 .update_heartbeat(module, instance_id, std::time::Instant::now());
93 Ok(())
94 }
95}
96
97#[cfg(test)]
98#[cfg_attr(coverage_nightly, coverage(off))]
99mod tests {
100 use super::*;
101
102 #[tokio::test]
103 async fn test_resolve_grpc_service_not_found() {
104 let dir = Arc::new(ModuleManager::new());
105 let api = LocalDirectoryClient::new(dir);
106
107 let result = api.resolve_grpc_service("nonexistent.Service").await;
108 assert!(result.is_err());
109 }
110
111 #[tokio::test]
112 async fn test_register_instance_via_api() {
113 let dir = Arc::new(ModuleManager::new());
114 let api = LocalDirectoryClient::new(dir.clone());
115
116 let instance_id = Uuid::new_v4();
117 let register_info = RegisterInstanceInfo {
119 module: "test_module".to_owned(),
120 instance_id: instance_id.to_string(),
121 grpc_services: vec![(
122 "test.Service".to_owned(),
123 ServiceEndpoint::http("127.0.0.1", 8001),
124 )],
125 version: Some("1.0.0".to_owned()),
126 };
127
128 api.register_instance(register_info).await.unwrap();
129
130 let instances = dir.instances_of("test_module");
132 assert_eq!(instances.len(), 1);
133 assert_eq!(instances[0].instance_id, instance_id);
134 assert_eq!(instances[0].version, Some("1.0.0".to_owned()));
135 assert!(instances[0].grpc_services.contains_key("test.Service"));
136 }
137
138 #[tokio::test]
139 async fn test_deregister_instance_via_api() {
140 let dir = Arc::new(ModuleManager::new());
141 let api = LocalDirectoryClient::new(dir.clone());
142
143 let instance_id = Uuid::new_v4();
144 let inst = Arc::new(ModuleInstance::new("test_module", instance_id));
146 dir.register_instance(inst);
147
148 assert_eq!(dir.instances_of("test_module").len(), 1);
150
151 api.deregister_instance("test_module", &instance_id.to_string())
153 .await
154 .unwrap();
155
156 assert_eq!(dir.instances_of("test_module").len(), 0);
158 }
159
160 #[tokio::test]
161 async fn test_send_heartbeat_via_api() {
162 use crate::runtime::InstanceState;
163
164 let dir = Arc::new(ModuleManager::new());
165 let api = LocalDirectoryClient::new(dir.clone());
166
167 let instance_id = Uuid::new_v4();
168 let inst = Arc::new(ModuleInstance::new("test_module", instance_id));
170 dir.register_instance(inst);
171
172 let instances = dir.instances_of("test_module");
174 assert_eq!(instances[0].state(), InstanceState::Registered);
175
176 api.send_heartbeat("test_module", &instance_id.to_string())
178 .await
179 .unwrap();
180
181 let instances = dir.instances_of("test_module");
183 assert_eq!(instances[0].state(), InstanceState::Healthy);
184 }
185}