Skip to main content

dactor_discover_k8s/
lib.rs

1//! Kubernetes node discovery for the dactor distributed actor framework.
2//!
3//! Provides two discovery mechanisms:
4//! - [`KubernetesDiscovery`]: Uses the Kubernetes API to list pods by label selector.
5//! - [`HeadlessServiceDiscovery`]: Uses DNS resolution of a headless Kubernetes service.
6
7use dactor::{ClusterDiscovery, DiscoveryError};
8use k8s_openapi::api::core::v1::Pod;
9use kube::{api::ListParams, Api, Client};
10use std::fmt;
11use std::net::ToSocketAddrs;
12
13// ---------------------------------------------------------------------------
14// Error type
15// ---------------------------------------------------------------------------
16
17/// Errors returned by Kubernetes discovery operations.
18#[derive(Debug)]
19pub enum K8sDiscoveryError {
20    /// Error from the Kubernetes API client.
21    KubeError(kube::Error),
22    /// A pod was found but has no IP assigned.
23    NoPodIp(String),
24    /// An I/O error (e.g., DNS resolution failure).
25    Io(std::io::Error),
26}
27
28impl fmt::Display for K8sDiscoveryError {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        match self {
31            K8sDiscoveryError::KubeError(e) => write!(f, "Kubernetes API error: {e}"),
32            K8sDiscoveryError::NoPodIp(name) => write!(f, "Pod '{name}' has no IP assigned"),
33            K8sDiscoveryError::Io(e) => write!(f, "I/O error: {e}"),
34        }
35    }
36}
37
38impl std::error::Error for K8sDiscoveryError {
39    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
40        match self {
41            K8sDiscoveryError::KubeError(e) => Some(e),
42            K8sDiscoveryError::Io(e) => Some(e),
43            K8sDiscoveryError::NoPodIp(_) => None,
44        }
45    }
46}
47
48impl From<kube::Error> for K8sDiscoveryError {
49    fn from(e: kube::Error) -> Self {
50        K8sDiscoveryError::KubeError(e)
51    }
52}
53
54impl From<std::io::Error> for K8sDiscoveryError {
55    fn from(e: std::io::Error) -> Self {
56        K8sDiscoveryError::Io(e)
57    }
58}
59
60// ---------------------------------------------------------------------------
61// Configuration
62// ---------------------------------------------------------------------------
63
64/// Configuration for Kubernetes pod discovery.
65#[derive(Debug, Clone)]
66pub struct K8sDiscoveryConfig {
67    /// Kubernetes namespace to query.
68    pub namespace: String,
69    /// Label selector to filter pods (e.g., `"app=my-dactor-service"`).
70    pub label_selector: String,
71    /// Fallback port number when `port_name` is unset or cannot be resolved.
72    pub port: u16,
73    /// Named container port to look up (e.g., `"dactor"`).
74    pub port_name: Option<String>,
75}
76
77impl Default for K8sDiscoveryConfig {
78    fn default() -> Self {
79        Self {
80            namespace: current_namespace().unwrap_or_else(|| "default".to_string()),
81            label_selector: String::new(),
82            port: 9000,
83            port_name: None,
84        }
85    }
86}
87
88// ---------------------------------------------------------------------------
89// KubernetesDiscovery
90// ---------------------------------------------------------------------------
91
92/// Discovers peer pods using the Kubernetes API.
93///
94/// Queries the API server for pods matching a label selector and extracts
95/// their IP addresses.  A tokio runtime must be available (either the
96/// caller is inside one, or one is created externally).
97pub struct KubernetesDiscovery {
98    config: K8sDiscoveryConfig,
99}
100
101impl KubernetesDiscovery {
102    /// Returns a new builder with default configuration.
103    pub fn builder() -> K8sDiscoveryBuilder {
104        K8sDiscoveryBuilder {
105            config: K8sDiscoveryConfig::default(),
106        }
107    }
108
109    /// Returns a reference to the current configuration.
110    pub fn config(&self) -> &K8sDiscoveryConfig {
111        &self.config
112    }
113
114    /// Asynchronously discover peer pod addresses.
115    pub async fn discover_async(&self) -> Result<Vec<String>, K8sDiscoveryError> {
116        let client = Client::try_default().await?;
117        let pods: Api<Pod> = Api::namespaced(client, &self.config.namespace);
118        let lp = ListParams::default().labels(&self.config.label_selector);
119        let pod_list = pods.list(&lp).await?;
120
121        let mut addresses = Vec::new();
122        for pod in pod_list.items {
123            let pod_name = pod
124                .metadata
125                .name
126                .as_deref()
127                .unwrap_or("<unknown>");
128
129            let phase = pod
130                .status
131                .as_ref()
132                .and_then(|s| s.phase.as_deref());
133
134            if phase != Some("Running") {
135                tracing::debug!(pod = pod_name, ?phase, "skipping non-running pod");
136                continue;
137            }
138
139            let ip = pod
140                .status
141                .as_ref()
142                .and_then(|s| s.pod_ip.as_deref());
143
144            match ip {
145                Some(ip) => {
146                    let port = self.resolve_port(&pod).unwrap_or(self.config.port);
147                    addresses.push(format!("{ip}:{port}"));
148                }
149                None => {
150                    tracing::warn!(pod = pod_name, "pod has no IP assigned");
151                }
152            }
153        }
154
155        Ok(addresses)
156    }
157
158    /// Resolve the port for a pod by looking up the configured `port_name`.
159    fn resolve_port(&self, pod: &Pod) -> Option<u16> {
160        let port_name = self.config.port_name.as_deref()?;
161        let spec = pod.spec.as_ref()?;
162        for container in &spec.containers {
163            if let Some(ports) = &container.ports {
164                for p in ports {
165                    if p.name.as_deref() == Some(port_name) {
166                        return u16::try_from(p.container_port).ok();
167                    }
168                }
169            }
170        }
171        None
172    }
173}
174
175#[async_trait::async_trait]
176impl ClusterDiscovery for KubernetesDiscovery {
177    async fn discover(&self) -> Result<Vec<dactor::DiscoveredPeer>, DiscoveryError> {
178        self.discover_async()
179            .await
180            .map(|addrs| addrs.into_iter().map(dactor::DiscoveredPeer::from_address).collect())
181            .map_err(|e| DiscoveryError::new(e.to_string()))
182    }
183}
184
185// ---------------------------------------------------------------------------
186// Builder
187// ---------------------------------------------------------------------------
188
189/// Builder for [`KubernetesDiscovery`].
190pub struct K8sDiscoveryBuilder {
191    config: K8sDiscoveryConfig,
192}
193
194impl K8sDiscoveryBuilder {
195    /// Set the Kubernetes namespace to query.
196    pub fn namespace(mut self, ns: &str) -> Self {
197        self.config.namespace = ns.to_string();
198        self
199    }
200
201    /// Set the pod label selector (e.g., `"app=my-dactor-service"`).
202    pub fn label_selector(mut self, selector: &str) -> Self {
203        self.config.label_selector = selector.to_string();
204        self
205    }
206
207    /// Set the fallback port number (default: `9000`).
208    pub fn port(mut self, port: u16) -> Self {
209        self.config.port = port;
210        self
211    }
212
213    /// Set the named port to resolve from the pod spec (e.g., `"dactor"`).
214    pub fn port_name(mut self, name: &str) -> Self {
215        self.config.port_name = Some(name.to_string());
216        self
217    }
218
219    /// Build the [`KubernetesDiscovery`] instance.
220    pub fn build(self) -> KubernetesDiscovery {
221        KubernetesDiscovery {
222            config: self.config,
223        }
224    }
225}
226
227// ---------------------------------------------------------------------------
228// HeadlessServiceDiscovery
229// ---------------------------------------------------------------------------
230
231/// Discovers peer pods via DNS resolution of a Kubernetes headless service.
232///
233/// A headless service (`clusterIP: None`) causes the Kubernetes DNS to return
234/// A/AAAA records for every ready pod endpoint.  This discovery method simply
235/// resolves that DNS name and returns the resulting addresses.
236pub struct HeadlessServiceDiscovery {
237    service_name: String,
238    namespace: String,
239    port: u16,
240    cluster_domain: String,
241}
242
243impl HeadlessServiceDiscovery {
244    /// Create a new headless-service discovery with default cluster domain (`cluster.local`).
245    pub fn new(service_name: &str, namespace: &str, port: u16) -> Self {
246        Self {
247            service_name: service_name.to_string(),
248            namespace: namespace.to_string(),
249            port,
250            cluster_domain: "cluster.local".to_string(),
251        }
252    }
253
254    /// Override the cluster DNS domain (default: `cluster.local`).
255    ///
256    /// Some clusters use a custom domain. Set this to match your
257    /// cluster's `--cluster-domain` kubelet configuration.
258    pub fn with_cluster_domain(mut self, domain: &str) -> Self {
259        self.cluster_domain = domain.to_string();
260        self
261    }
262
263    /// Returns the FQDN used for DNS resolution.
264    pub fn dns_name(&self) -> String {
265        format!(
266            "{}.{}.svc.{}",
267            self.service_name, self.namespace, self.cluster_domain
268        )
269    }
270}
271
272#[async_trait::async_trait]
273impl ClusterDiscovery for HeadlessServiceDiscovery {
274    async fn discover(&self) -> Result<Vec<dactor::DiscoveredPeer>, DiscoveryError> {
275        let dns = self.dns_name();
276        let port = self.port;
277        let addrs = tokio::task::spawn_blocking(move || {
278            let lookup = format!("{dns}:{port}");
279            lookup.to_socket_addrs()
280        })
281        .await
282        .map_err(|e| DiscoveryError::new(format!("DNS lookup task failed: {e}")))?
283        .map_err(|e| DiscoveryError::new(format!(
284            "DNS resolution failed for {}: {e}", self.dns_name()
285        )))?;
286
287        Ok(addrs.map(|a| dactor::DiscoveredPeer::from_address(a.to_string())).collect())
288    }
289}
290
291// ---------------------------------------------------------------------------
292// Environment helpers
293// ---------------------------------------------------------------------------
294
295/// Read the current namespace from the pod's mounted service account.
296///
297/// Returns `None` when not running inside a Kubernetes pod.
298pub fn current_namespace() -> Option<String> {
299    std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
300        .ok()
301        .map(|s| s.trim().to_string())
302}
303
304/// Read the pod's own IP from the `DACTOR_POD_IP` or `POD_IP` environment variable.
305pub fn pod_ip() -> Option<String> {
306    std::env::var("DACTOR_POD_IP")
307        .ok()
308        .or_else(|| std::env::var("POD_IP").ok())
309}
310
311// ---------------------------------------------------------------------------
312// Tests
313// ---------------------------------------------------------------------------
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn builder_creates_valid_config() {
321        let discovery = KubernetesDiscovery::builder()
322            .namespace("production")
323            .label_selector("app=my-service")
324            .port(8080)
325            .port_name("dactor")
326            .build();
327
328        assert_eq!(discovery.config().namespace, "production");
329        assert_eq!(discovery.config().label_selector, "app=my-service");
330        assert_eq!(discovery.config().port, 8080);
331        assert_eq!(discovery.config().port_name.as_deref(), Some("dactor"));
332    }
333
334    #[test]
335    fn builder_default_values() {
336        let discovery = KubernetesDiscovery::builder()
337            .label_selector("app=test")
338            .build();
339
340        // Outside K8s, namespace defaults to "default"
341        assert_eq!(discovery.config().namespace, "default");
342        assert_eq!(discovery.config().port, 9000);
343        assert!(discovery.config().port_name.is_none());
344    }
345
346    #[test]
347    fn headless_service_dns_formatting() {
348        let discovery = HeadlessServiceDiscovery::new("my-service", "production", 9000);
349        assert_eq!(
350            discovery.dns_name(),
351            "my-service.production.svc.cluster.local"
352        );
353    }
354
355    #[test]
356    fn headless_service_dns_default_namespace() {
357        let discovery = HeadlessServiceDiscovery::new("dactor-cluster", "default", 9000);
358        assert_eq!(
359            discovery.dns_name(),
360            "dactor-cluster.default.svc.cluster.local"
361        );
362    }
363
364    #[test]
365    fn current_namespace_returns_none_outside_k8s() {
366        assert!(current_namespace().is_none());
367    }
368
369    #[test]
370    fn pod_ip_returns_none_when_env_not_set() {
371        std::env::remove_var("DACTOR_POD_IP");
372        std::env::remove_var("POD_IP");
373        assert!(pod_ip().is_none());
374    }
375
376    #[test]
377    fn pod_ip_reads_dactor_pod_ip_first() {
378        std::env::set_var("DACTOR_POD_IP", "10.0.0.1");
379        std::env::set_var("POD_IP", "10.0.0.2");
380        assert_eq!(pod_ip(), Some("10.0.0.1".to_string()));
381        std::env::remove_var("DACTOR_POD_IP");
382        std::env::remove_var("POD_IP");
383    }
384
385    #[test]
386    fn pod_ip_falls_back_to_pod_ip_env() {
387        std::env::remove_var("DACTOR_POD_IP");
388        std::env::set_var("POD_IP", "10.0.0.99");
389        assert_eq!(pod_ip(), Some("10.0.0.99".to_string()));
390        std::env::remove_var("POD_IP");
391    }
392
393    #[test]
394    fn config_default_outside_k8s() {
395        let cfg = K8sDiscoveryConfig::default();
396        assert_eq!(cfg.namespace, "default");
397        assert_eq!(cfg.port, 9000);
398        assert!(cfg.label_selector.is_empty());
399        assert!(cfg.port_name.is_none());
400    }
401}