dactor_discover_k8s/
lib.rs1use dactor::{ClusterDiscovery, DiscoveryError};
8use k8s_openapi::api::core::v1::Pod;
9use kube::{api::ListParams, Api, Client};
10use std::fmt;
11use std::net::ToSocketAddrs;
12
13#[derive(Debug)]
19pub enum K8sDiscoveryError {
20 KubeError(kube::Error),
22 NoPodIp(String),
24 Io(std::io::Error),
26}
27
28impl fmt::Display for K8sDiscoveryError {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 K8sDiscoveryError::KubeError(e) => write!(f, "Kubernetes API error: {e}"),
32 K8sDiscoveryError::NoPodIp(name) => write!(f, "Pod '{name}' has no IP assigned"),
33 K8sDiscoveryError::Io(e) => write!(f, "I/O error: {e}"),
34 }
35 }
36}
37
38impl std::error::Error for K8sDiscoveryError {
39 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
40 match self {
41 K8sDiscoveryError::KubeError(e) => Some(e),
42 K8sDiscoveryError::Io(e) => Some(e),
43 K8sDiscoveryError::NoPodIp(_) => None,
44 }
45 }
46}
47
48impl From<kube::Error> for K8sDiscoveryError {
49 fn from(e: kube::Error) -> Self {
50 K8sDiscoveryError::KubeError(e)
51 }
52}
53
54impl From<std::io::Error> for K8sDiscoveryError {
55 fn from(e: std::io::Error) -> Self {
56 K8sDiscoveryError::Io(e)
57 }
58}
59
60#[derive(Debug, Clone)]
66pub struct K8sDiscoveryConfig {
67 pub namespace: String,
69 pub label_selector: String,
71 pub port: u16,
73 pub port_name: Option<String>,
75}
76
77impl Default for K8sDiscoveryConfig {
78 fn default() -> Self {
79 Self {
80 namespace: current_namespace().unwrap_or_else(|| "default".to_string()),
81 label_selector: String::new(),
82 port: 9000,
83 port_name: None,
84 }
85 }
86}
87
88pub struct KubernetesDiscovery {
98 config: K8sDiscoveryConfig,
99}
100
101impl KubernetesDiscovery {
102 pub fn builder() -> K8sDiscoveryBuilder {
104 K8sDiscoveryBuilder {
105 config: K8sDiscoveryConfig::default(),
106 }
107 }
108
109 pub fn config(&self) -> &K8sDiscoveryConfig {
111 &self.config
112 }
113
114 pub async fn discover_async(&self) -> Result<Vec<String>, K8sDiscoveryError> {
116 let client = Client::try_default().await?;
117 let pods: Api<Pod> = Api::namespaced(client, &self.config.namespace);
118 let lp = ListParams::default().labels(&self.config.label_selector);
119 let pod_list = pods.list(&lp).await?;
120
121 let mut addresses = Vec::new();
122 for pod in pod_list.items {
123 let pod_name = pod
124 .metadata
125 .name
126 .as_deref()
127 .unwrap_or("<unknown>");
128
129 let phase = pod
130 .status
131 .as_ref()
132 .and_then(|s| s.phase.as_deref());
133
134 if phase != Some("Running") {
135 tracing::debug!(pod = pod_name, ?phase, "skipping non-running pod");
136 continue;
137 }
138
139 let ip = pod
140 .status
141 .as_ref()
142 .and_then(|s| s.pod_ip.as_deref());
143
144 match ip {
145 Some(ip) => {
146 let port = self.resolve_port(&pod).unwrap_or(self.config.port);
147 addresses.push(format!("{ip}:{port}"));
148 }
149 None => {
150 tracing::warn!(pod = pod_name, "pod has no IP assigned");
151 }
152 }
153 }
154
155 Ok(addresses)
156 }
157
158 fn resolve_port(&self, pod: &Pod) -> Option<u16> {
160 let port_name = self.config.port_name.as_deref()?;
161 let spec = pod.spec.as_ref()?;
162 for container in &spec.containers {
163 if let Some(ports) = &container.ports {
164 for p in ports {
165 if p.name.as_deref() == Some(port_name) {
166 return u16::try_from(p.container_port).ok();
167 }
168 }
169 }
170 }
171 None
172 }
173}
174
175#[async_trait::async_trait]
176impl ClusterDiscovery for KubernetesDiscovery {
177 async fn discover(&self) -> Result<Vec<dactor::DiscoveredPeer>, DiscoveryError> {
178 self.discover_async()
179 .await
180 .map(|addrs| addrs.into_iter().map(dactor::DiscoveredPeer::from_address).collect())
181 .map_err(|e| DiscoveryError::new(e.to_string()))
182 }
183}
184
185pub struct K8sDiscoveryBuilder {
191 config: K8sDiscoveryConfig,
192}
193
194impl K8sDiscoveryBuilder {
195 pub fn namespace(mut self, ns: &str) -> Self {
197 self.config.namespace = ns.to_string();
198 self
199 }
200
201 pub fn label_selector(mut self, selector: &str) -> Self {
203 self.config.label_selector = selector.to_string();
204 self
205 }
206
207 pub fn port(mut self, port: u16) -> Self {
209 self.config.port = port;
210 self
211 }
212
213 pub fn port_name(mut self, name: &str) -> Self {
215 self.config.port_name = Some(name.to_string());
216 self
217 }
218
219 pub fn build(self) -> KubernetesDiscovery {
221 KubernetesDiscovery {
222 config: self.config,
223 }
224 }
225}
226
227pub struct HeadlessServiceDiscovery {
237 service_name: String,
238 namespace: String,
239 port: u16,
240 cluster_domain: String,
241}
242
243impl HeadlessServiceDiscovery {
244 pub fn new(service_name: &str, namespace: &str, port: u16) -> Self {
246 Self {
247 service_name: service_name.to_string(),
248 namespace: namespace.to_string(),
249 port,
250 cluster_domain: "cluster.local".to_string(),
251 }
252 }
253
254 pub fn with_cluster_domain(mut self, domain: &str) -> Self {
259 self.cluster_domain = domain.to_string();
260 self
261 }
262
263 pub fn dns_name(&self) -> String {
265 format!(
266 "{}.{}.svc.{}",
267 self.service_name, self.namespace, self.cluster_domain
268 )
269 }
270}
271
272#[async_trait::async_trait]
273impl ClusterDiscovery for HeadlessServiceDiscovery {
274 async fn discover(&self) -> Result<Vec<dactor::DiscoveredPeer>, DiscoveryError> {
275 let dns = self.dns_name();
276 let port = self.port;
277 let addrs = tokio::task::spawn_blocking(move || {
278 let lookup = format!("{dns}:{port}");
279 lookup.to_socket_addrs()
280 })
281 .await
282 .map_err(|e| DiscoveryError::new(format!("DNS lookup task failed: {e}")))?
283 .map_err(|e| DiscoveryError::new(format!(
284 "DNS resolution failed for {}: {e}", self.dns_name()
285 )))?;
286
287 Ok(addrs.map(|a| dactor::DiscoveredPeer::from_address(a.to_string())).collect())
288 }
289}
290
291pub fn current_namespace() -> Option<String> {
299 std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
300 .ok()
301 .map(|s| s.trim().to_string())
302}
303
304pub fn pod_ip() -> Option<String> {
306 std::env::var("DACTOR_POD_IP")
307 .ok()
308 .or_else(|| std::env::var("POD_IP").ok())
309}
310
311#[cfg(test)]
316mod tests {
317 use super::*;
318
319 #[test]
320 fn builder_creates_valid_config() {
321 let discovery = KubernetesDiscovery::builder()
322 .namespace("production")
323 .label_selector("app=my-service")
324 .port(8080)
325 .port_name("dactor")
326 .build();
327
328 assert_eq!(discovery.config().namespace, "production");
329 assert_eq!(discovery.config().label_selector, "app=my-service");
330 assert_eq!(discovery.config().port, 8080);
331 assert_eq!(discovery.config().port_name.as_deref(), Some("dactor"));
332 }
333
334 #[test]
335 fn builder_default_values() {
336 let discovery = KubernetesDiscovery::builder()
337 .label_selector("app=test")
338 .build();
339
340 assert_eq!(discovery.config().namespace, "default");
342 assert_eq!(discovery.config().port, 9000);
343 assert!(discovery.config().port_name.is_none());
344 }
345
346 #[test]
347 fn headless_service_dns_formatting() {
348 let discovery = HeadlessServiceDiscovery::new("my-service", "production", 9000);
349 assert_eq!(
350 discovery.dns_name(),
351 "my-service.production.svc.cluster.local"
352 );
353 }
354
355 #[test]
356 fn headless_service_dns_default_namespace() {
357 let discovery = HeadlessServiceDiscovery::new("dactor-cluster", "default", 9000);
358 assert_eq!(
359 discovery.dns_name(),
360 "dactor-cluster.default.svc.cluster.local"
361 );
362 }
363
364 #[test]
365 fn current_namespace_returns_none_outside_k8s() {
366 assert!(current_namespace().is_none());
367 }
368
369 #[test]
370 fn pod_ip_returns_none_when_env_not_set() {
371 std::env::remove_var("DACTOR_POD_IP");
372 std::env::remove_var("POD_IP");
373 assert!(pod_ip().is_none());
374 }
375
376 #[test]
377 fn pod_ip_reads_dactor_pod_ip_first() {
378 std::env::set_var("DACTOR_POD_IP", "10.0.0.1");
379 std::env::set_var("POD_IP", "10.0.0.2");
380 assert_eq!(pod_ip(), Some("10.0.0.1".to_string()));
381 std::env::remove_var("DACTOR_POD_IP");
382 std::env::remove_var("POD_IP");
383 }
384
385 #[test]
386 fn pod_ip_falls_back_to_pod_ip_env() {
387 std::env::remove_var("DACTOR_POD_IP");
388 std::env::set_var("POD_IP", "10.0.0.99");
389 assert_eq!(pod_ip(), Some("10.0.0.99".to_string()));
390 std::env::remove_var("POD_IP");
391 }
392
393 #[test]
394 fn config_default_outside_k8s() {
395 let cfg = K8sDiscoveryConfig::default();
396 assert_eq!(cfg.namespace, "default");
397 assert_eq!(cfg.port, 9000);
398 assert!(cfg.label_selector.is_empty());
399 assert!(cfg.port_name.is_none());
400 }
401}