dynamo_runtime/discovery/
mod.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use async_trait::async_trait;
6use futures::Stream;
7use serde::{Deserialize, Serialize};
8use std::pin::Pin;
9use tokio_util::sync::CancellationToken;
10
11mod metadata;
12pub use metadata::{DiscoveryMetadata, MetadataSnapshot};
13
14mod mock;
15pub use mock::{MockDiscovery, SharedMockRegistry};
16mod kv_store;
17pub use kv_store::KVStoreDiscovery;
18
19mod kube;
20pub use kube::{KubeDiscoveryClient, hash_pod_name};
21
22pub mod utils;
23use crate::component::TransportType;
24pub use utils::watch_and_extract_field;
25
26/// Query key for prefix-based discovery queries
27/// Supports hierarchical queries from all endpoints down to specific endpoints
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub enum DiscoveryQuery {
30    /// Query all endpoints in the system
31    AllEndpoints,
32    /// Query all endpoints in a specific namespace
33    NamespacedEndpoints {
34        namespace: String,
35    },
36    /// Query all endpoints in a namespace/component
37    ComponentEndpoints {
38        namespace: String,
39        component: String,
40    },
41    /// Query a specific endpoint
42    Endpoint {
43        namespace: String,
44        component: String,
45        endpoint: String,
46    },
47    AllModels,
48    NamespacedModels {
49        namespace: String,
50    },
51    ComponentModels {
52        namespace: String,
53        component: String,
54    },
55    EndpointModels {
56        namespace: String,
57        component: String,
58        endpoint: String,
59    },
60}
61
62/// Specification for registering objects in the discovery plane
63/// Represents the input to the register() operation
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum DiscoverySpec {
66    /// Endpoint specification for registration
67    Endpoint {
68        namespace: String,
69        component: String,
70        endpoint: String,
71        /// Transport type and routing information
72        transport: TransportType,
73    },
74    Model {
75        namespace: String,
76        component: String,
77        endpoint: String,
78        /// ModelDeploymentCard serialized as JSON
79        /// This allows lib/runtime to remain independent of lib/llm types
80        /// DiscoverySpec.from_model() and DiscoveryInstance.deserialize_model() are ergonomic helpers to create and deserialize the model card.
81        card_json: serde_json::Value,
82        /// Optional suffix appended after instance_id in the key path (e.g., for LoRA adapters)
83        /// Key format: {namespace}/{component}/{endpoint}/{instance_id}[/{model_suffix}]
84        model_suffix: Option<String>,
85    },
86}
87
88impl DiscoverySpec {
89    /// Creates a Model discovery spec from a serializable type
90    /// The card will be serialized to JSON to avoid cross-crate dependencies
91    pub fn from_model<T>(
92        namespace: String,
93        component: String,
94        endpoint: String,
95        card: &T,
96    ) -> Result<Self>
97    where
98        T: Serialize,
99    {
100        Self::from_model_with_suffix(namespace, component, endpoint, card, None)
101    }
102
103    /// Creates a Model discovery spec with an optional suffix (e.g., for LoRA adapters)
104    /// The suffix is appended after the instance_id in the key path
105    pub fn from_model_with_suffix<T>(
106        namespace: String,
107        component: String,
108        endpoint: String,
109        card: &T,
110        model_suffix: Option<String>,
111    ) -> Result<Self>
112    where
113        T: Serialize,
114    {
115        let card_json = serde_json::to_value(card)?;
116        Ok(Self::Model {
117            namespace,
118            component,
119            endpoint,
120            card_json,
121            model_suffix,
122        })
123    }
124
125    /// Attaches an instance ID to create a DiscoveryInstance
126    pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance {
127        match self {
128            Self::Endpoint {
129                namespace,
130                component,
131                endpoint,
132                transport,
133            } => DiscoveryInstance::Endpoint(crate::component::Instance {
134                namespace,
135                component,
136                endpoint,
137                instance_id,
138                transport,
139            }),
140            Self::Model {
141                namespace,
142                component,
143                endpoint,
144                card_json,
145                model_suffix,
146            } => DiscoveryInstance::Model {
147                namespace,
148                component,
149                endpoint,
150                instance_id,
151                card_json,
152                model_suffix,
153            },
154        }
155    }
156}
157
158/// Registered instances in the discovery plane
159/// Represents objects that have been successfully registered with an instance ID
160#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
161#[serde(tag = "type")]
162pub enum DiscoveryInstance {
163    /// Registered endpoint instance - wraps the component::Instance directly
164    Endpoint(crate::component::Instance),
165    Model {
166        namespace: String,
167        component: String,
168        endpoint: String,
169        instance_id: u64,
170        /// ModelDeploymentCard serialized as JSON
171        /// This allows lib/runtime to remain independent of lib/llm types
172        card_json: serde_json::Value,
173        /// Optional suffix appended after instance_id in the key path (e.g., for LoRA adapters)
174        #[serde(default, skip_serializing_if = "Option::is_none")]
175        model_suffix: Option<String>,
176    },
177}
178
179impl DiscoveryInstance {
180    /// Returns the instance ID for this discovery instance
181    pub fn instance_id(&self) -> u64 {
182        match self {
183            Self::Endpoint(inst) => inst.instance_id,
184            Self::Model { instance_id, .. } => *instance_id,
185        }
186    }
187
188    /// Deserializes the model JSON into the specified type T
189    /// Returns an error if this is not a Model instance or if deserialization fails
190    pub fn deserialize_model<T>(&self) -> Result<T>
191    where
192        T: for<'de> Deserialize<'de>,
193    {
194        match self {
195            Self::Model { card_json, .. } => Ok(serde_json::from_value(card_json.clone())?),
196            Self::Endpoint(_) => {
197                anyhow::bail!("Cannot deserialize model from Endpoint instance")
198            }
199        }
200    }
201
202    /// Extracts the unique identifier for this discovery instance
203    /// Used for tracking, diffing, and removal events
204    pub fn id(&self) -> DiscoveryInstanceId {
205        match self {
206            Self::Endpoint(inst) => DiscoveryInstanceId::Endpoint(EndpointInstanceId {
207                namespace: inst.namespace.clone(),
208                component: inst.component.clone(),
209                endpoint: inst.endpoint.clone(),
210                instance_id: inst.instance_id,
211            }),
212            Self::Model {
213                namespace,
214                component,
215                endpoint,
216                instance_id,
217                model_suffix,
218                ..
219            } => DiscoveryInstanceId::Model(ModelCardInstanceId {
220                namespace: namespace.clone(),
221                component: component.clone(),
222                endpoint: endpoint.clone(),
223                instance_id: *instance_id,
224                model_suffix: model_suffix.clone(),
225            }),
226        }
227    }
228}
229
230/// Unique identifier for an endpoint instance
231#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
232pub struct EndpointInstanceId {
233    pub namespace: String,
234    pub component: String,
235    pub endpoint: String,
236    pub instance_id: u64,
237}
238
239impl EndpointInstanceId {
240    /// Converts to a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}`
241    pub fn to_path(&self) -> String {
242        format!(
243            "{}/{}/{}/{:x}",
244            self.namespace, self.component, self.endpoint, self.instance_id
245        )
246    }
247
248    /// Parses from a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}`
249    pub fn from_path(path: &str) -> Result<Self> {
250        let parts: Vec<&str> = path.split('/').collect();
251        if parts.len() != 4 {
252            anyhow::bail!(
253                "Invalid EndpointInstanceId path: expected 4 parts, got {}",
254                parts.len()
255            );
256        }
257        Ok(Self {
258            namespace: parts[0].to_string(),
259            component: parts[1].to_string(),
260            endpoint: parts[2].to_string(),
261            instance_id: u64::from_str_radix(parts[3], 16)
262                .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
263        })
264    }
265}
266
267/// Unique identifier for a model card instance
268/// The combination of (namespace, component, endpoint, instance_id, model_suffix) uniquely identifies a model card
269#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
270pub struct ModelCardInstanceId {
271    pub namespace: String,
272    pub component: String,
273    pub endpoint: String,
274    pub instance_id: u64,
275    /// None for base models, Some(slug) for LoRA adapters
276    pub model_suffix: Option<String>,
277}
278
279impl ModelCardInstanceId {
280    /// Converts to a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}[/{model_suffix}]`
281    pub fn to_path(&self) -> String {
282        match &self.model_suffix {
283            Some(suffix) => format!(
284                "{}/{}/{}/{:x}/{}",
285                self.namespace, self.component, self.endpoint, self.instance_id, suffix
286            ),
287            None => format!(
288                "{}/{}/{}/{:x}",
289                self.namespace, self.component, self.endpoint, self.instance_id
290            ),
291        }
292    }
293
294    /// Parses from a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}[/{model_suffix}]`
295    pub fn from_path(path: &str) -> Result<Self> {
296        let parts: Vec<&str> = path.split('/').collect();
297        if parts.len() < 4 || parts.len() > 5 {
298            anyhow::bail!(
299                "Invalid ModelCardInstanceId path: expected 4 or 5 parts, got {}",
300                parts.len()
301            );
302        }
303        Ok(Self {
304            namespace: parts[0].to_string(),
305            component: parts[1].to_string(),
306            endpoint: parts[2].to_string(),
307            instance_id: u64::from_str_radix(parts[3], 16)
308                .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
309            model_suffix: parts.get(4).map(|s| s.to_string()),
310        })
311    }
312}
313
314/// Union of instance identifiers for different discovery object types
315#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
316pub enum DiscoveryInstanceId {
317    Endpoint(EndpointInstanceId),
318    Model(ModelCardInstanceId),
319}
320
321impl DiscoveryInstanceId {
322    /// Returns the raw instance_id regardless of variant type
323    pub fn instance_id(&self) -> u64 {
324        match self {
325            Self::Endpoint(eid) => eid.instance_id,
326            Self::Model(mid) => mid.instance_id,
327        }
328    }
329
330    /// Extracts the EndpointInstanceId, returning an error if this is a Model variant
331    pub fn extract_endpoint_id(&self) -> Result<&EndpointInstanceId> {
332        match self {
333            Self::Endpoint(eid) => Ok(eid),
334            Self::Model(_) => anyhow::bail!("Expected Endpoint variant, got Model"),
335        }
336    }
337
338    /// Extracts the ModelCardInstanceId, returning an error if this is an Endpoint variant
339    pub fn extract_model_id(&self) -> Result<&ModelCardInstanceId> {
340        match self {
341            Self::Model(mid) => Ok(mid),
342            Self::Endpoint(_) => anyhow::bail!("Expected Model variant, got Endpoint"),
343        }
344    }
345}
346
347/// Events emitted by the discovery watch stream
348#[derive(Debug, Clone, PartialEq, Eq)]
349pub enum DiscoveryEvent {
350    /// A new instance was added
351    Added(DiscoveryInstance),
352    /// An instance was removed (identified by its unique ID)
353    Removed(DiscoveryInstanceId),
354}
355
356/// Stream type for discovery events
357pub type DiscoveryStream = Pin<Box<dyn Stream<Item = Result<DiscoveryEvent>> + Send>>;
358
359/// Discovery trait for service discovery across different backends
360#[async_trait]
361pub trait Discovery: Send + Sync {
362    /// Returns a unique identifier for this worker (e.g lease id if using etcd or generated id for memory store)
363    /// Discovery objects created by this worker will be associated with this id.
364    fn instance_id(&self) -> u64;
365
366    /// Registers an object in the discovery plane with the instance id
367    async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
368
369    /// Unregisters an instance from the discovery plane
370    async fn unregister(&self, instance: DiscoveryInstance) -> Result<()>;
371
372    /// Returns a list of currently registered instances for the given discovery query
373    /// This is a one-time snapshot without watching for changes
374    async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>>;
375
376    /// Returns a stream of discovery events (Added/Removed) for the given discovery query
377    /// The optional cancellation token can be used to stop the watch stream
378    async fn list_and_watch(
379        &self,
380        query: DiscoveryQuery,
381        cancel_token: Option<CancellationToken>,
382    ) -> Result<DiscoveryStream>;
383}