use dactor::{ClusterDiscovery, DiscoveryError};
use k8s_openapi::api::core::v1::Pod;
use kube::{api::ListParams, Api, Client};
use std::fmt;
use std::net::ToSocketAddrs;
#[derive(Debug)]
pub enum K8sDiscoveryError {
KubeError(kube::Error),
NoPodIp(String),
Io(std::io::Error),
}
impl fmt::Display for K8sDiscoveryError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
K8sDiscoveryError::KubeError(e) => write!(f, "Kubernetes API error: {e}"),
K8sDiscoveryError::NoPodIp(name) => write!(f, "Pod '{name}' has no IP assigned"),
K8sDiscoveryError::Io(e) => write!(f, "I/O error: {e}"),
}
}
}
impl std::error::Error for K8sDiscoveryError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
K8sDiscoveryError::KubeError(e) => Some(e),
K8sDiscoveryError::Io(e) => Some(e),
K8sDiscoveryError::NoPodIp(_) => None,
}
}
}
impl From<kube::Error> for K8sDiscoveryError {
fn from(e: kube::Error) -> Self {
K8sDiscoveryError::KubeError(e)
}
}
impl From<std::io::Error> for K8sDiscoveryError {
fn from(e: std::io::Error) -> Self {
K8sDiscoveryError::Io(e)
}
}
#[derive(Debug, Clone)]
pub struct K8sDiscoveryConfig {
pub namespace: String,
pub label_selector: String,
pub port: u16,
pub port_name: Option<String>,
}
impl Default for K8sDiscoveryConfig {
fn default() -> Self {
Self {
namespace: current_namespace().unwrap_or_else(|| "default".to_string()),
label_selector: String::new(),
port: 9000,
port_name: None,
}
}
}
pub struct KubernetesDiscovery {
config: K8sDiscoveryConfig,
}
impl KubernetesDiscovery {
pub fn builder() -> K8sDiscoveryBuilder {
K8sDiscoveryBuilder {
config: K8sDiscoveryConfig::default(),
}
}
pub fn config(&self) -> &K8sDiscoveryConfig {
&self.config
}
pub async fn discover_async(&self) -> Result<Vec<String>, K8sDiscoveryError> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::namespaced(client, &self.config.namespace);
let lp = ListParams::default().labels(&self.config.label_selector);
let pod_list = pods.list(&lp).await?;
let mut addresses = Vec::new();
for pod in pod_list.items {
let pod_name = pod
.metadata
.name
.as_deref()
.unwrap_or("<unknown>");
let phase = pod
.status
.as_ref()
.and_then(|s| s.phase.as_deref());
if phase != Some("Running") {
tracing::debug!(pod = pod_name, ?phase, "skipping non-running pod");
continue;
}
let ip = pod
.status
.as_ref()
.and_then(|s| s.pod_ip.as_deref());
match ip {
Some(ip) => {
let port = self.resolve_port(&pod).unwrap_or(self.config.port);
addresses.push(format!("{ip}:{port}"));
}
None => {
tracing::warn!(pod = pod_name, "pod has no IP assigned");
}
}
}
Ok(addresses)
}
fn resolve_port(&self, pod: &Pod) -> Option<u16> {
let port_name = self.config.port_name.as_deref()?;
let spec = pod.spec.as_ref()?;
for container in &spec.containers {
if let Some(ports) = &container.ports {
for p in ports {
if p.name.as_deref() == Some(port_name) {
return u16::try_from(p.container_port).ok();
}
}
}
}
None
}
}
#[async_trait::async_trait]
impl ClusterDiscovery for KubernetesDiscovery {
async fn discover(&self) -> Result<Vec<dactor::DiscoveredPeer>, DiscoveryError> {
self.discover_async()
.await
.map(|addrs| addrs.into_iter().map(dactor::DiscoveredPeer::from_address).collect())
.map_err(|e| DiscoveryError::new(e.to_string()))
}
}
pub struct K8sDiscoveryBuilder {
config: K8sDiscoveryConfig,
}
impl K8sDiscoveryBuilder {
pub fn namespace(mut self, ns: &str) -> Self {
self.config.namespace = ns.to_string();
self
}
pub fn label_selector(mut self, selector: &str) -> Self {
self.config.label_selector = selector.to_string();
self
}
pub fn port(mut self, port: u16) -> Self {
self.config.port = port;
self
}
pub fn port_name(mut self, name: &str) -> Self {
self.config.port_name = Some(name.to_string());
self
}
pub fn build(self) -> KubernetesDiscovery {
KubernetesDiscovery {
config: self.config,
}
}
}
pub struct HeadlessServiceDiscovery {
service_name: String,
namespace: String,
port: u16,
cluster_domain: String,
}
impl HeadlessServiceDiscovery {
pub fn new(service_name: &str, namespace: &str, port: u16) -> Self {
Self {
service_name: service_name.to_string(),
namespace: namespace.to_string(),
port,
cluster_domain: "cluster.local".to_string(),
}
}
pub fn with_cluster_domain(mut self, domain: &str) -> Self {
self.cluster_domain = domain.to_string();
self
}
pub fn dns_name(&self) -> String {
format!(
"{}.{}.svc.{}",
self.service_name, self.namespace, self.cluster_domain
)
}
}
#[async_trait::async_trait]
impl ClusterDiscovery for HeadlessServiceDiscovery {
async fn discover(&self) -> Result<Vec<dactor::DiscoveredPeer>, DiscoveryError> {
let dns = self.dns_name();
let port = self.port;
let addrs = tokio::task::spawn_blocking(move || {
let lookup = format!("{dns}:{port}");
lookup.to_socket_addrs()
})
.await
.map_err(|e| DiscoveryError::new(format!("DNS lookup task failed: {e}")))?
.map_err(|e| DiscoveryError::new(format!(
"DNS resolution failed for {}: {e}", self.dns_name()
)))?;
Ok(addrs.map(|a| dactor::DiscoveredPeer::from_address(a.to_string())).collect())
}
}
pub fn current_namespace() -> Option<String> {
std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
.ok()
.map(|s| s.trim().to_string())
}
pub fn pod_ip() -> Option<String> {
std::env::var("DACTOR_POD_IP")
.ok()
.or_else(|| std::env::var("POD_IP").ok())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_creates_valid_config() {
let discovery = KubernetesDiscovery::builder()
.namespace("production")
.label_selector("app=my-service")
.port(8080)
.port_name("dactor")
.build();
assert_eq!(discovery.config().namespace, "production");
assert_eq!(discovery.config().label_selector, "app=my-service");
assert_eq!(discovery.config().port, 8080);
assert_eq!(discovery.config().port_name.as_deref(), Some("dactor"));
}
#[test]
fn builder_default_values() {
let discovery = KubernetesDiscovery::builder()
.label_selector("app=test")
.build();
assert_eq!(discovery.config().namespace, "default");
assert_eq!(discovery.config().port, 9000);
assert!(discovery.config().port_name.is_none());
}
#[test]
fn headless_service_dns_formatting() {
let discovery = HeadlessServiceDiscovery::new("my-service", "production", 9000);
assert_eq!(
discovery.dns_name(),
"my-service.production.svc.cluster.local"
);
}
#[test]
fn headless_service_dns_default_namespace() {
let discovery = HeadlessServiceDiscovery::new("dactor-cluster", "default", 9000);
assert_eq!(
discovery.dns_name(),
"dactor-cluster.default.svc.cluster.local"
);
}
#[test]
fn current_namespace_returns_none_outside_k8s() {
assert!(current_namespace().is_none());
}
#[test]
fn pod_ip_returns_none_when_env_not_set() {
std::env::remove_var("DACTOR_POD_IP");
std::env::remove_var("POD_IP");
assert!(pod_ip().is_none());
}
#[test]
fn pod_ip_reads_dactor_pod_ip_first() {
std::env::set_var("DACTOR_POD_IP", "10.0.0.1");
std::env::set_var("POD_IP", "10.0.0.2");
assert_eq!(pod_ip(), Some("10.0.0.1".to_string()));
std::env::remove_var("DACTOR_POD_IP");
std::env::remove_var("POD_IP");
}
#[test]
fn pod_ip_falls_back_to_pod_ip_env() {
std::env::remove_var("DACTOR_POD_IP");
std::env::set_var("POD_IP", "10.0.0.99");
assert_eq!(pod_ip(), Some("10.0.0.99".to_string()));
std::env::remove_var("POD_IP");
}
#[test]
fn config_default_outside_k8s() {
let cfg = K8sDiscoveryConfig::default();
assert_eq!(cfg.namespace, "default");
assert_eq!(cfg.port, 9000);
assert!(cfg.label_selector.is_empty());
assert!(cfg.port_name.is_none());
}
}