dynamo_runtime/discovery/
kube.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4mod daemon;
5mod utils;
6
7pub use utils::hash_pod_name;
8
9use daemon::DiscoveryDaemon;
10use utils::PodInfo;
11
12use crate::CancellationToken;
13use crate::discovery::{
14    Discovery, DiscoveryEvent, DiscoveryInstance, DiscoveryMetadata, DiscoveryQuery, DiscoverySpec,
15    DiscoveryStream, MetadataSnapshot,
16};
17use anyhow::Result;
18use async_trait::async_trait;
19use kube::Client as KubeClient;
20use std::collections::HashSet;
21use std::sync::Arc;
22use tokio::sync::RwLock;
23
24/// Kubernetes-based discovery client
25#[derive(Clone)]
26pub struct KubeDiscoveryClient {
27    instance_id: u64,
28    metadata: Arc<RwLock<DiscoveryMetadata>>,
29    metadata_watch: tokio::sync::watch::Receiver<Arc<MetadataSnapshot>>,
30}
31
32impl KubeDiscoveryClient {
33    /// Create a new Kubernetes discovery client
34    ///
35    /// # Arguments
36    /// * `metadata` - Shared metadata store (also used by system server)
37    /// * `cancel_token` - Cancellation token for shutdown
38    pub async fn new(
39        metadata: Arc<RwLock<DiscoveryMetadata>>,
40        cancel_token: CancellationToken,
41    ) -> Result<Self> {
42        let pod_info = PodInfo::from_env()?;
43        let instance_id = hash_pod_name(&pod_info.pod_name);
44
45        tracing::info!(
46            "Initializing KubeDiscoveryClient: pod_name={}, instance_id={:x}, namespace={}",
47            pod_info.pod_name,
48            instance_id,
49            pod_info.pod_namespace
50        );
51
52        let kube_client = KubeClient::try_default()
53            .await
54            .map_err(|e| anyhow::anyhow!("Failed to create Kubernetes client: {}", e))?;
55
56        // Create watch channel with initial empty snapshot
57        let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty()));
58
59        // Create and spawn daemon
60        let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?;
61
62        tokio::spawn(async move {
63            if let Err(e) = daemon.run(watch_tx).await {
64                tracing::error!("Discovery daemon failed: {}", e);
65            }
66        });
67
68        tracing::info!("Discovery daemon started");
69
70        Ok(Self {
71            instance_id,
72            metadata,
73            metadata_watch: watch_rx,
74        })
75    }
76}
77
78#[async_trait]
79impl Discovery for KubeDiscoveryClient {
80    fn instance_id(&self) -> u64 {
81        self.instance_id
82    }
83
84    async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance> {
85        let instance_id = self.instance_id();
86        let instance = spec.with_instance_id(instance_id);
87
88        tracing::debug!(
89            "Registering instance: {:?} with instance_id={:x}",
90            instance,
91            instance_id
92        );
93
94        // Write to local metadata
95        let mut metadata = self.metadata.write().await;
96        match &instance {
97            DiscoveryInstance::Endpoint(inst) => {
98                tracing::info!(
99                    "Registered endpoint: namespace={}, component={}, endpoint={}, instance_id={:x}",
100                    inst.namespace,
101                    inst.component,
102                    inst.endpoint,
103                    instance_id
104                );
105                metadata.register_endpoint(instance.clone())?;
106            }
107            DiscoveryInstance::Model {
108                namespace,
109                component,
110                endpoint,
111                ..
112            } => {
113                tracing::info!(
114                    "Registered model card: namespace={}, component={}, endpoint={}, instance_id={:x}",
115                    namespace,
116                    component,
117                    endpoint,
118                    instance_id
119                );
120                metadata.register_model_card(instance.clone())?;
121            }
122        }
123
124        Ok(instance)
125    }
126
127    async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>> {
128        tracing::debug!("KubeDiscoveryClient::list called with query={:?}", query);
129
130        // Get current snapshot (may be empty if daemon hasn't fetched yet)
131        let snapshot = self.metadata_watch.borrow().clone();
132
133        tracing::debug!(
134            "List using snapshot seq={} with {} instances",
135            snapshot.sequence,
136            snapshot.instances.len()
137        );
138
139        // Filter snapshot by query
140        let instances = snapshot.filter(&query);
141
142        tracing::info!(
143            "KubeDiscoveryClient::list returning {} instances for query={:?}",
144            instances.len(),
145            query
146        );
147
148        Ok(instances)
149    }
150
151    async fn list_and_watch(
152        &self,
153        query: DiscoveryQuery,
154        cancel_token: Option<CancellationToken>,
155    ) -> Result<DiscoveryStream> {
156        use tokio::sync::mpsc;
157
158        tracing::info!(
159            "KubeDiscoveryClient::list_and_watch started for query={:?}",
160            query
161        );
162
163        // Clone the watch receiver
164        let mut watch_rx = self.metadata_watch.clone();
165
166        // Create output stream
167        let (event_tx, event_rx) = mpsc::unbounded_channel();
168
169        // Generate unique stream identifier for tracing
170        let stream_id = uuid::Uuid::new_v4();
171
172        // Spawn task to process snapshots
173        tokio::spawn(async move {
174            let mut known_instances = HashSet::<u64>::new();
175
176            tracing::debug!(
177                stream_id = %stream_id,
178                "Watch started for query={:?}",
179                query
180            );
181
182            loop {
183                // Wait for next snapshot or cancellation
184                let watch_result = if let Some(ref token) = cancel_token {
185                    tokio::select! {
186                        result = watch_rx.changed() => result,
187                        _ = token.cancelled() => {
188                            tracing::info!(
189                                stream_id = %stream_id,
190                                "Watch cancelled via cancel token"
191                            );
192                            break;
193                        }
194                    }
195                } else {
196                    watch_rx.changed().await
197                };
198
199                match watch_result {
200                    Ok(()) => {
201                        // Get latest snapshot
202                        let snapshot = watch_rx.borrow_and_update().clone();
203
204                        // Filter snapshot by query
205                        let current_instances: HashSet<u64> = snapshot
206                            .instances
207                            .iter()
208                            .filter_map(|(&instance_id, metadata)| {
209                                let filtered = metadata.filter(&query);
210                                if !filtered.is_empty() {
211                                    Some(instance_id)
212                                } else {
213                                    None
214                                }
215                            })
216                            .collect();
217
218                        // Compute diff
219                        let added: Vec<u64> = current_instances
220                            .difference(&known_instances)
221                            .copied()
222                            .collect();
223
224                        let removed: Vec<u64> = known_instances
225                            .difference(&current_instances)
226                            .copied()
227                            .collect();
228
229                        // Only log if there are changes
230                        if !added.is_empty() || !removed.is_empty() {
231                            tracing::debug!(
232                                stream_id = %stream_id,
233                                seq = snapshot.sequence,
234                                added = added.len(),
235                                removed = removed.len(),
236                                total = current_instances.len(),
237                                "Watch detected changes"
238                            );
239                        }
240
241                        // Emit Added events
242                        for instance_id in added {
243                            if let Some(metadata) = snapshot.instances.get(&instance_id) {
244                                let instances = metadata.filter(&query);
245                                for instance in instances {
246                                    tracing::info!(
247                                        stream_id = %stream_id,
248                                        instance_id = format!("{:x}", instance.instance_id()),
249                                        "Emitting Added event"
250                                    );
251                                    if event_tx.send(Ok(DiscoveryEvent::Added(instance))).is_err() {
252                                        tracing::debug!(
253                                            stream_id = %stream_id,
254                                            "Watch receiver dropped"
255                                        );
256                                        return;
257                                    }
258                                }
259                            }
260                        }
261
262                        // Emit Removed events
263                        for instance_id in removed {
264                            tracing::info!(
265                                stream_id = %stream_id,
266                                instance_id = format!("{:x}", instance_id),
267                                "Emitting Removed event"
268                            );
269                            if event_tx
270                                .send(Ok(DiscoveryEvent::Removed(instance_id)))
271                                .is_err()
272                            {
273                                tracing::debug!(stream_id = %stream_id, "Watch receiver dropped");
274                                return;
275                            }
276                        }
277
278                        // Update known set
279                        known_instances = current_instances;
280                    }
281                    Err(_) => {
282                        tracing::info!(
283                            stream_id = %stream_id,
284                            "Watch channel closed (daemon stopped)"
285                        );
286                        break;
287                    }
288                }
289            }
290        });
291
292        // Convert receiver to stream
293        let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(event_rx);
294        Ok(Box::pin(stream))
295    }
296}