dynamo_runtime/discovery/
kube.rs1mod daemon;
5mod utils;
6
7pub use utils::hash_pod_name;
8
9use daemon::DiscoveryDaemon;
10use utils::PodInfo;
11
12use crate::CancellationToken;
13use crate::discovery::{
14 Discovery, DiscoveryEvent, DiscoveryInstance, DiscoveryMetadata, DiscoveryQuery, DiscoverySpec,
15 DiscoveryStream, MetadataSnapshot,
16};
17use anyhow::Result;
18use async_trait::async_trait;
19use kube::Client as KubeClient;
20use std::collections::HashSet;
21use std::sync::Arc;
22use tokio::sync::RwLock;
23
24#[derive(Clone)]
26pub struct KubeDiscoveryClient {
27 instance_id: u64,
28 metadata: Arc<RwLock<DiscoveryMetadata>>,
29 metadata_watch: tokio::sync::watch::Receiver<Arc<MetadataSnapshot>>,
30}
31
32impl KubeDiscoveryClient {
33 pub async fn new(
39 metadata: Arc<RwLock<DiscoveryMetadata>>,
40 cancel_token: CancellationToken,
41 ) -> Result<Self> {
42 let pod_info = PodInfo::from_env()?;
43 let instance_id = hash_pod_name(&pod_info.pod_name);
44
45 tracing::info!(
46 "Initializing KubeDiscoveryClient: pod_name={}, instance_id={:x}, namespace={}",
47 pod_info.pod_name,
48 instance_id,
49 pod_info.pod_namespace
50 );
51
52 let kube_client = KubeClient::try_default()
53 .await
54 .map_err(|e| anyhow::anyhow!("Failed to create Kubernetes client: {}", e))?;
55
56 let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty()));
58
59 let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?;
61
62 tokio::spawn(async move {
63 if let Err(e) = daemon.run(watch_tx).await {
64 tracing::error!("Discovery daemon failed: {}", e);
65 }
66 });
67
68 tracing::info!("Discovery daemon started");
69
70 Ok(Self {
71 instance_id,
72 metadata,
73 metadata_watch: watch_rx,
74 })
75 }
76}
77
78#[async_trait]
79impl Discovery for KubeDiscoveryClient {
80 fn instance_id(&self) -> u64 {
81 self.instance_id
82 }
83
84 async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance> {
85 let instance_id = self.instance_id();
86 let instance = spec.with_instance_id(instance_id);
87
88 tracing::debug!(
89 "Registering instance: {:?} with instance_id={:x}",
90 instance,
91 instance_id
92 );
93
94 let mut metadata = self.metadata.write().await;
96 match &instance {
97 DiscoveryInstance::Endpoint(inst) => {
98 tracing::info!(
99 "Registered endpoint: namespace={}, component={}, endpoint={}, instance_id={:x}",
100 inst.namespace,
101 inst.component,
102 inst.endpoint,
103 instance_id
104 );
105 metadata.register_endpoint(instance.clone())?;
106 }
107 DiscoveryInstance::Model {
108 namespace,
109 component,
110 endpoint,
111 ..
112 } => {
113 tracing::info!(
114 "Registered model card: namespace={}, component={}, endpoint={}, instance_id={:x}",
115 namespace,
116 component,
117 endpoint,
118 instance_id
119 );
120 metadata.register_model_card(instance.clone())?;
121 }
122 }
123
124 Ok(instance)
125 }
126
127 async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>> {
128 tracing::debug!("KubeDiscoveryClient::list called with query={:?}", query);
129
130 let snapshot = self.metadata_watch.borrow().clone();
132
133 tracing::debug!(
134 "List using snapshot seq={} with {} instances",
135 snapshot.sequence,
136 snapshot.instances.len()
137 );
138
139 let instances = snapshot.filter(&query);
141
142 tracing::info!(
143 "KubeDiscoveryClient::list returning {} instances for query={:?}",
144 instances.len(),
145 query
146 );
147
148 Ok(instances)
149 }
150
151 async fn list_and_watch(
152 &self,
153 query: DiscoveryQuery,
154 cancel_token: Option<CancellationToken>,
155 ) -> Result<DiscoveryStream> {
156 use tokio::sync::mpsc;
157
158 tracing::info!(
159 "KubeDiscoveryClient::list_and_watch started for query={:?}",
160 query
161 );
162
163 let mut watch_rx = self.metadata_watch.clone();
165
166 let (event_tx, event_rx) = mpsc::unbounded_channel();
168
169 let stream_id = uuid::Uuid::new_v4();
171
172 tokio::spawn(async move {
174 let mut known_instances = HashSet::<u64>::new();
175
176 tracing::debug!(
177 stream_id = %stream_id,
178 "Watch started for query={:?}",
179 query
180 );
181
182 loop {
183 let watch_result = if let Some(ref token) = cancel_token {
185 tokio::select! {
186 result = watch_rx.changed() => result,
187 _ = token.cancelled() => {
188 tracing::info!(
189 stream_id = %stream_id,
190 "Watch cancelled via cancel token"
191 );
192 break;
193 }
194 }
195 } else {
196 watch_rx.changed().await
197 };
198
199 match watch_result {
200 Ok(()) => {
201 let snapshot = watch_rx.borrow_and_update().clone();
203
204 let current_instances: HashSet<u64> = snapshot
206 .instances
207 .iter()
208 .filter_map(|(&instance_id, metadata)| {
209 let filtered = metadata.filter(&query);
210 if !filtered.is_empty() {
211 Some(instance_id)
212 } else {
213 None
214 }
215 })
216 .collect();
217
218 let added: Vec<u64> = current_instances
220 .difference(&known_instances)
221 .copied()
222 .collect();
223
224 let removed: Vec<u64> = known_instances
225 .difference(¤t_instances)
226 .copied()
227 .collect();
228
229 if !added.is_empty() || !removed.is_empty() {
231 tracing::debug!(
232 stream_id = %stream_id,
233 seq = snapshot.sequence,
234 added = added.len(),
235 removed = removed.len(),
236 total = current_instances.len(),
237 "Watch detected changes"
238 );
239 }
240
241 for instance_id in added {
243 if let Some(metadata) = snapshot.instances.get(&instance_id) {
244 let instances = metadata.filter(&query);
245 for instance in instances {
246 tracing::info!(
247 stream_id = %stream_id,
248 instance_id = format!("{:x}", instance.instance_id()),
249 "Emitting Added event"
250 );
251 if event_tx.send(Ok(DiscoveryEvent::Added(instance))).is_err() {
252 tracing::debug!(
253 stream_id = %stream_id,
254 "Watch receiver dropped"
255 );
256 return;
257 }
258 }
259 }
260 }
261
262 for instance_id in removed {
264 tracing::info!(
265 stream_id = %stream_id,
266 instance_id = format!("{:x}", instance_id),
267 "Emitting Removed event"
268 );
269 if event_tx
270 .send(Ok(DiscoveryEvent::Removed(instance_id)))
271 .is_err()
272 {
273 tracing::debug!(stream_id = %stream_id, "Watch receiver dropped");
274 return;
275 }
276 }
277
278 known_instances = current_instances;
280 }
281 Err(_) => {
282 tracing::info!(
283 stream_id = %stream_id,
284 "Watch channel closed (daemon stopped)"
285 );
286 break;
287 }
288 }
289 }
290 });
291
292 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(event_rx);
294 Ok(Box::pin(stream))
295 }
296}