dynamo_runtime/discovery/
mod.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use async_trait::async_trait;
6use futures::Stream;
7use serde::{Deserialize, Serialize};
8use std::pin::Pin;
9use tokio_util::sync::CancellationToken;
10
11mod metadata;
12pub use metadata::{DiscoveryMetadata, MetadataSnapshot};
13
14mod mock;
15pub use mock::{MockDiscovery, SharedMockRegistry};
16mod kv_store;
17pub use kv_store::KVStoreDiscovery;
18
19mod kube;
20pub use kube::{KubeDiscoveryClient, hash_pod_name};
21
22pub mod utils;
23use crate::component::TransportType;
24pub use utils::watch_and_extract_field;
25
26/// Query key for prefix-based discovery queries
27/// Supports hierarchical queries from all endpoints down to specific endpoints
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub enum DiscoveryQuery {
30    /// Query all endpoints in the system
31    AllEndpoints,
32    /// Query all endpoints in a specific namespace
33    NamespacedEndpoints {
34        namespace: String,
35    },
36    /// Query all endpoints in a namespace/component
37    ComponentEndpoints {
38        namespace: String,
39        component: String,
40    },
41    /// Query a specific endpoint
42    Endpoint {
43        namespace: String,
44        component: String,
45        endpoint: String,
46    },
47    AllModels,
48    NamespacedModels {
49        namespace: String,
50    },
51    ComponentModels {
52        namespace: String,
53        component: String,
54    },
55    EndpointModels {
56        namespace: String,
57        component: String,
58        endpoint: String,
59    },
60}
61
62/// Specification for registering objects in the discovery plane
63/// Represents the input to the register() operation
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum DiscoverySpec {
66    /// Endpoint specification for registration
67    Endpoint {
68        namespace: String,
69        component: String,
70        endpoint: String,
71        /// Transport type and routing information
72        transport: TransportType,
73    },
74    Model {
75        namespace: String,
76        component: String,
77        endpoint: String,
78        /// ModelDeploymentCard serialized as JSON
79        /// This allows lib/runtime to remain independent of lib/llm types
80        /// DiscoverySpec.from_model() and DiscoveryInstance.deserialize_model() are ergonomic helpers to create and deserialize the model card.
81        card_json: serde_json::Value,
82    },
83}
84
85impl DiscoverySpec {
86    /// Creates a Model discovery spec from a serializable type
87    /// The card will be serialized to JSON to avoid cross-crate dependencies
88    pub fn from_model<T>(
89        namespace: String,
90        component: String,
91        endpoint: String,
92        card: &T,
93    ) -> Result<Self>
94    where
95        T: Serialize,
96    {
97        let card_json = serde_json::to_value(card)?;
98        Ok(Self::Model {
99            namespace,
100            component,
101            endpoint,
102            card_json,
103        })
104    }
105
106    /// Attaches an instance ID to create a DiscoveryInstance
107    pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance {
108        match self {
109            Self::Endpoint {
110                namespace,
111                component,
112                endpoint,
113                transport,
114            } => DiscoveryInstance::Endpoint(crate::component::Instance {
115                namespace,
116                component,
117                endpoint,
118                instance_id,
119                transport,
120            }),
121            Self::Model {
122                namespace,
123                component,
124                endpoint,
125                card_json,
126            } => DiscoveryInstance::Model {
127                namespace,
128                component,
129                endpoint,
130                instance_id,
131                card_json,
132            },
133        }
134    }
135}
136
137/// Registered instances in the discovery plane
138/// Represents objects that have been successfully registered with an instance ID
139#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
140#[serde(tag = "type")]
141pub enum DiscoveryInstance {
142    /// Registered endpoint instance - wraps the component::Instance directly
143    Endpoint(crate::component::Instance),
144    Model {
145        namespace: String,
146        component: String,
147        endpoint: String,
148        instance_id: u64,
149        /// ModelDeploymentCard serialized as JSON
150        /// This allows lib/runtime to remain independent of lib/llm types
151        card_json: serde_json::Value,
152    },
153}
154
155impl DiscoveryInstance {
156    /// Returns the instance ID for this discovery instance
157    pub fn instance_id(&self) -> u64 {
158        match self {
159            Self::Endpoint(inst) => inst.instance_id,
160            Self::Model { instance_id, .. } => *instance_id,
161        }
162    }
163
164    /// Deserializes the model JSON into the specified type T
165    /// Returns an error if this is not a Model instance or if deserialization fails
166    pub fn deserialize_model<T>(&self) -> Result<T>
167    where
168        T: for<'de> Deserialize<'de>,
169    {
170        match self {
171            Self::Model { card_json, .. } => Ok(serde_json::from_value(card_json.clone())?),
172            Self::Endpoint(_) => {
173                anyhow::bail!("Cannot deserialize model from Endpoint instance")
174            }
175        }
176    }
177}
178
179/// Events emitted by the discovery watch stream
180#[derive(Debug, Clone, PartialEq, Eq)]
181pub enum DiscoveryEvent {
182    /// A new instance was added
183    Added(DiscoveryInstance),
184    /// An instance was removed (identified by instance_id)
185    Removed(u64),
186}
187
188/// Stream type for discovery events
189pub type DiscoveryStream = Pin<Box<dyn Stream<Item = Result<DiscoveryEvent>> + Send>>;
190
191/// Discovery trait for service discovery across different backends
192#[async_trait]
193pub trait Discovery: Send + Sync {
194    /// Returns a unique identifier for this worker (e.g lease id if using etcd or generated id for memory store)
195    /// Discovery objects created by this worker will be associated with this id.
196    fn instance_id(&self) -> u64;
197
198    /// Registers an object in the discovery plane with the instance id
199    async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
200
201    /// Returns a list of currently registered instances for the given discovery query
202    /// This is a one-time snapshot without watching for changes
203    async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>>;
204
205    /// Returns a stream of discovery events (Added/Removed) for the given discovery query
206    /// The optional cancellation token can be used to stop the watch stream
207    async fn list_and_watch(
208        &self,
209        query: DiscoveryQuery,
210        cancel_token: Option<CancellationToken>,
211    ) -> Result<DiscoveryStream>;
212}