1use anyhow::Result;
5use async_trait::async_trait;
6use futures::Stream;
7use serde::{Deserialize, Serialize};
8use std::pin::Pin;
9use tokio_util::sync::CancellationToken;
10
11mod metadata;
12pub use metadata::{DiscoveryMetadata, MetadataSnapshot};
13
14mod mock;
15pub use mock::{MockDiscovery, SharedMockRegistry};
16mod kv_store;
17pub use kv_store::KVStoreDiscovery;
18
19mod kube;
20pub use kube::{KubeDiscoveryClient, hash_pod_name};
21
22pub mod utils;
23use crate::component::TransportType;
24pub use utils::watch_and_extract_field;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub enum DiscoveryQuery {
30 AllEndpoints,
32 NamespacedEndpoints {
34 namespace: String,
35 },
36 ComponentEndpoints {
38 namespace: String,
39 component: String,
40 },
41 Endpoint {
43 namespace: String,
44 component: String,
45 endpoint: String,
46 },
47 AllModels,
48 NamespacedModels {
49 namespace: String,
50 },
51 ComponentModels {
52 namespace: String,
53 component: String,
54 },
55 EndpointModels {
56 namespace: String,
57 component: String,
58 endpoint: String,
59 },
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum DiscoverySpec {
66 Endpoint {
68 namespace: String,
69 component: String,
70 endpoint: String,
71 transport: TransportType,
73 },
74 Model {
75 namespace: String,
76 component: String,
77 endpoint: String,
78 card_json: serde_json::Value,
82 model_suffix: Option<String>,
85 },
86}
87
88impl DiscoverySpec {
89 pub fn from_model<T>(
92 namespace: String,
93 component: String,
94 endpoint: String,
95 card: &T,
96 ) -> Result<Self>
97 where
98 T: Serialize,
99 {
100 Self::from_model_with_suffix(namespace, component, endpoint, card, None)
101 }
102
103 pub fn from_model_with_suffix<T>(
106 namespace: String,
107 component: String,
108 endpoint: String,
109 card: &T,
110 model_suffix: Option<String>,
111 ) -> Result<Self>
112 where
113 T: Serialize,
114 {
115 let card_json = serde_json::to_value(card)?;
116 Ok(Self::Model {
117 namespace,
118 component,
119 endpoint,
120 card_json,
121 model_suffix,
122 })
123 }
124
125 pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance {
127 match self {
128 Self::Endpoint {
129 namespace,
130 component,
131 endpoint,
132 transport,
133 } => DiscoveryInstance::Endpoint(crate::component::Instance {
134 namespace,
135 component,
136 endpoint,
137 instance_id,
138 transport,
139 }),
140 Self::Model {
141 namespace,
142 component,
143 endpoint,
144 card_json,
145 model_suffix,
146 } => DiscoveryInstance::Model {
147 namespace,
148 component,
149 endpoint,
150 instance_id,
151 card_json,
152 model_suffix,
153 },
154 }
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
161#[serde(tag = "type")]
162pub enum DiscoveryInstance {
163 Endpoint(crate::component::Instance),
165 Model {
166 namespace: String,
167 component: String,
168 endpoint: String,
169 instance_id: u64,
170 card_json: serde_json::Value,
173 #[serde(default, skip_serializing_if = "Option::is_none")]
175 model_suffix: Option<String>,
176 },
177}
178
179impl DiscoveryInstance {
180 pub fn instance_id(&self) -> u64 {
182 match self {
183 Self::Endpoint(inst) => inst.instance_id,
184 Self::Model { instance_id, .. } => *instance_id,
185 }
186 }
187
188 pub fn deserialize_model<T>(&self) -> Result<T>
191 where
192 T: for<'de> Deserialize<'de>,
193 {
194 match self {
195 Self::Model { card_json, .. } => Ok(serde_json::from_value(card_json.clone())?),
196 Self::Endpoint(_) => {
197 anyhow::bail!("Cannot deserialize model from Endpoint instance")
198 }
199 }
200 }
201
202 pub fn id(&self) -> DiscoveryInstanceId {
205 match self {
206 Self::Endpoint(inst) => DiscoveryInstanceId::Endpoint(EndpointInstanceId {
207 namespace: inst.namespace.clone(),
208 component: inst.component.clone(),
209 endpoint: inst.endpoint.clone(),
210 instance_id: inst.instance_id,
211 }),
212 Self::Model {
213 namespace,
214 component,
215 endpoint,
216 instance_id,
217 model_suffix,
218 ..
219 } => DiscoveryInstanceId::Model(ModelCardInstanceId {
220 namespace: namespace.clone(),
221 component: component.clone(),
222 endpoint: endpoint.clone(),
223 instance_id: *instance_id,
224 model_suffix: model_suffix.clone(),
225 }),
226 }
227 }
228}
229
230#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
232pub struct EndpointInstanceId {
233 pub namespace: String,
234 pub component: String,
235 pub endpoint: String,
236 pub instance_id: u64,
237}
238
239impl EndpointInstanceId {
240 pub fn to_path(&self) -> String {
242 format!(
243 "{}/{}/{}/{:x}",
244 self.namespace, self.component, self.endpoint, self.instance_id
245 )
246 }
247
248 pub fn from_path(path: &str) -> Result<Self> {
250 let parts: Vec<&str> = path.split('/').collect();
251 if parts.len() != 4 {
252 anyhow::bail!(
253 "Invalid EndpointInstanceId path: expected 4 parts, got {}",
254 parts.len()
255 );
256 }
257 Ok(Self {
258 namespace: parts[0].to_string(),
259 component: parts[1].to_string(),
260 endpoint: parts[2].to_string(),
261 instance_id: u64::from_str_radix(parts[3], 16)
262 .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
263 })
264 }
265}
266
267#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
270pub struct ModelCardInstanceId {
271 pub namespace: String,
272 pub component: String,
273 pub endpoint: String,
274 pub instance_id: u64,
275 pub model_suffix: Option<String>,
277}
278
279impl ModelCardInstanceId {
280 pub fn to_path(&self) -> String {
282 match &self.model_suffix {
283 Some(suffix) => format!(
284 "{}/{}/{}/{:x}/{}",
285 self.namespace, self.component, self.endpoint, self.instance_id, suffix
286 ),
287 None => format!(
288 "{}/{}/{}/{:x}",
289 self.namespace, self.component, self.endpoint, self.instance_id
290 ),
291 }
292 }
293
294 pub fn from_path(path: &str) -> Result<Self> {
296 let parts: Vec<&str> = path.split('/').collect();
297 if parts.len() < 4 || parts.len() > 5 {
298 anyhow::bail!(
299 "Invalid ModelCardInstanceId path: expected 4 or 5 parts, got {}",
300 parts.len()
301 );
302 }
303 Ok(Self {
304 namespace: parts[0].to_string(),
305 component: parts[1].to_string(),
306 endpoint: parts[2].to_string(),
307 instance_id: u64::from_str_radix(parts[3], 16)
308 .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
309 model_suffix: parts.get(4).map(|s| s.to_string()),
310 })
311 }
312}
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
316pub enum DiscoveryInstanceId {
317 Endpoint(EndpointInstanceId),
318 Model(ModelCardInstanceId),
319}
320
321impl DiscoveryInstanceId {
322 pub fn instance_id(&self) -> u64 {
324 match self {
325 Self::Endpoint(eid) => eid.instance_id,
326 Self::Model(mid) => mid.instance_id,
327 }
328 }
329
330 pub fn extract_endpoint_id(&self) -> Result<&EndpointInstanceId> {
332 match self {
333 Self::Endpoint(eid) => Ok(eid),
334 Self::Model(_) => anyhow::bail!("Expected Endpoint variant, got Model"),
335 }
336 }
337
338 pub fn extract_model_id(&self) -> Result<&ModelCardInstanceId> {
340 match self {
341 Self::Model(mid) => Ok(mid),
342 Self::Endpoint(_) => anyhow::bail!("Expected Model variant, got Endpoint"),
343 }
344 }
345}
346
347#[derive(Debug, Clone, PartialEq, Eq)]
349pub enum DiscoveryEvent {
350 Added(DiscoveryInstance),
352 Removed(DiscoveryInstanceId),
354}
355
356pub type DiscoveryStream = Pin<Box<dyn Stream<Item = Result<DiscoveryEvent>> + Send>>;
358
359#[async_trait]
361pub trait Discovery: Send + Sync {
362 fn instance_id(&self) -> u64;
365
366 async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
368
369 async fn unregister(&self, instance: DiscoveryInstance) -> Result<()>;
371
372 async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>>;
375
376 async fn list_and_watch(
379 &self,
380 query: DiscoveryQuery,
381 cancel_token: Option<CancellationToken>,
382 ) -> Result<DiscoveryStream>;
383}