1use crate::config::{Address, KubernetesDiscoveryConfig};
2use coerce::remote::system::RemoteActorSystem;
3use k8s_openapi::api::core::v1::Pod;
4use kube::api::ListParams;
5use kube::{Api, Client};
6
7#[macro_use]
8extern crate tracing;
9pub mod config;
10
11pub struct KubernetesDiscovery {
12 system: RemoteActorSystem,
13}
14
15impl KubernetesDiscovery {
16 pub async fn discover(config: KubernetesDiscoveryConfig) -> Option<Vec<String>> {
17 let client = Client::try_default()
18 .await
19 .expect("failed to initialise k8s client");
20 let api = Api::<Pod>::default_namespaced(client);
21
22 let params = ListParams {
23 label_selector: config.pod_selection_label.clone(),
24 ..Default::default()
25 };
26
27 let pods = api.list(¶ms).await;
28 let mut cluster_nodes = vec![];
29
30 if let Ok(pods) = pods {
31 debug!("pods={:#?}", &pods.items);
32
33 let pods = pods.items;
34 for pod in pods {
35 let pod_spec = pod.spec.unwrap();
36 let pod_status = pod.status.unwrap();
37
38 debug!("pod_status={:?}", pod_status);
39
40 let addr = match &config.cluster_node_address {
44 Address::PodIp => {
45 let pod_ip = pod_status.pod_ip;
46 if pod_ip.is_none() {
47 continue;
48 }
49
50 pod_ip.unwrap()
51 }
52
53 Address::Hostname => {
54 let hostname = pod_spec.hostname.unwrap();
55 let subdomain = pod_spec.subdomain.unwrap();
56
57 format!("{hostname}.{subdomain}")
58 }
59 };
60
61 for container in pod_spec.containers {
62 let port = container
63 .ports
64 .unwrap()
65 .into_iter()
66 .find(|p| p.name == config.coerce_remote_port_name);
67 if let Some(port) = port {
68 cluster_nodes.push(format!("{}:{}", addr, port.container_port));
69 }
70 }
71 }
72 }
73
74 debug!("discovered nodes: {:#?}", &cluster_nodes);
75 Some(cluster_nodes)
76 }
77}