Skip to main content

dynamo_runtime/discovery/
mod.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use async_trait::async_trait;
6use futures::Stream;
7use serde::{Deserialize, Serialize};
8use std::pin::Pin;
9use tokio_util::sync::CancellationToken;
10
11mod metadata;
12pub use metadata::{DiscoveryMetadata, MetadataSnapshot};
13
14mod mock;
15pub use mock::{MockDiscovery, SharedMockRegistry};
16mod kv_store;
17pub use kv_store::KVStoreDiscovery;
18
19mod kube;
20pub use kube::{KubeDiscoveryClient, hash_pod_name};
21
22pub mod utils;
23use crate::component::TransportType;
24pub use utils::watch_and_extract_field;
25
26/// Transport kind for event plane - used for configuration and env var selection.
27///
28/// This enum represents the *type* of transport without connection details.
29/// Use `EventTransport` when you need the full transport configuration.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
31#[serde(rename_all = "snake_case")]
32pub enum EventTransportKind {
33    /// NATS Core pub/sub
34    #[default]
35    Nats,
36    /// ZMQ pub/sub
37    Zmq,
38}
39
40impl EventTransportKind {
41    /// Parse from environment variable `DYN_EVENT_PLANE`.
42    /// Returns `Nats` if not set or empty.
43    /// Returns error for invalid values.
44    pub fn from_env() -> Result<Self> {
45        match std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE)
46            .as_deref()
47        {
48            Ok("nats") | Ok("") | Err(_) => Ok(Self::Nats),
49            Ok("zmq") => Ok(Self::Zmq),
50            Ok(other) => anyhow::bail!(
51                "Invalid DYN_EVENT_PLANE value '{}'. Valid values: 'nats', 'zmq'",
52                other
53            ),
54        }
55    }
56
57    /// Parse from environment variable, defaulting to Nats on error.
58    /// Logs a warning if an invalid value is encountered.
59    pub fn from_env_or_default() -> Self {
60        Self::from_env().unwrap_or_else(|e| {
61            tracing::warn!("{}, defaulting to NATS", e);
62            Self::Nats
63        })
64    }
65
66    /// Get the default codec for this transport kind.
67    /// NATS defaults to JSON, ZMQ defaults to MsgPack.
68    pub fn default_codec(&self) -> EventCodecKind {
69        match self {
70            Self::Nats => EventCodecKind::Json,
71            Self::Zmq => EventCodecKind::Msgpack,
72        }
73    }
74}
75
76/// Codec kind for event plane serialization.
77///
78/// This enum represents the serialization format for event envelopes and payloads.
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
80#[serde(rename_all = "snake_case")]
81pub enum EventCodecKind {
82    /// JSON codec - human-readable, good for debugging
83    Json,
84    /// MessagePack codec - compact binary format
85    Msgpack,
86}
87
88impl EventCodecKind {
89    /// Parse from environment variable `DYN_EVENT_PLANE_CODEC`.
90    /// Returns None if not set, allowing transport to select default.
91    /// Returns error for invalid values.
92    pub fn from_env() -> Result<Option<Self>> {
93        match std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE_CODEC)
94            .as_deref()
95        {
96            Err(_) => Ok(None), // Not set
97            Ok("") => Ok(None), // Empty
98            Ok("json") => Ok(Some(Self::Json)),
99            Ok("msgpack") => Ok(Some(Self::Msgpack)),
100            Ok(other) => anyhow::bail!(
101                "Invalid DYN_EVENT_PLANE_CODEC value '{}'. Valid values: 'json', 'msgpack'",
102                other
103            ),
104        }
105    }
106
107    /// Parse from environment variable with transport-specific default.
108    /// Logs a warning if an invalid value is encountered.
109    pub fn from_env_or_transport_default(transport: EventTransportKind) -> Self {
110        Self::from_env()
111            .unwrap_or_else(|e| {
112                tracing::warn!(
113                    "{}, defaulting to {:?} for {:?}",
114                    e,
115                    transport.default_codec(),
116                    transport
117                );
118                None
119            })
120            .unwrap_or_else(|| transport.default_codec())
121    }
122}
123
124/// Transport configuration for event plane channels.
125///
126/// This enum carries both the transport kind and its connection configuration.
127/// Kept separate from `TransportType` (request plane) to distinguish event semantics.
128#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
129#[serde(tag = "kind", content = "config")]
130pub enum EventTransport {
131    /// NATS Core pub/sub - subject prefix for the channel
132    Nats {
133        /// Subject prefix (e.g., "namespace.dynamo.component.backend")
134        subject_prefix: String,
135    },
136    /// ZMQ pub/sub - endpoint address (direct mode)
137    Zmq {
138        /// ZMQ endpoint (e.g., "tcp://host:port")
139        endpoint: String,
140    },
141    /// ZMQ broker endpoints (broker mode) - for discovery of brokers
142    ZmqBroker {
143        /// XSUB endpoints (publishers connect here)
144        xsub_endpoints: Vec<String>,
145        /// XPUB endpoints (subscribers connect here)
146        xpub_endpoints: Vec<String>,
147    },
148}
149
150impl EventTransport {
151    /// Get the transport kind
152    pub fn kind(&self) -> EventTransportKind {
153        match self {
154            Self::Nats { .. } => EventTransportKind::Nats,
155            Self::Zmq { .. } | Self::ZmqBroker { .. } => EventTransportKind::Zmq,
156        }
157    }
158
159    /// Create a NATS transport with the given subject prefix
160    pub fn nats(subject_prefix: impl Into<String>) -> Self {
161        Self::Nats {
162            subject_prefix: subject_prefix.into(),
163        }
164    }
165
166    /// Create a ZMQ transport with the given endpoint
167    pub fn zmq(endpoint: impl Into<String>) -> Self {
168        Self::Zmq {
169            endpoint: endpoint.into(),
170        }
171    }
172
173    /// Get the subject prefix (NATS) or endpoint (ZMQ)
174    /// For ZmqBroker, returns the first XSUB endpoint
175    pub fn address(&self) -> &str {
176        match self {
177            Self::Nats { subject_prefix } => subject_prefix,
178            Self::Zmq { endpoint } => endpoint,
179            Self::ZmqBroker { xsub_endpoints, .. } => {
180                xsub_endpoints.first().map(|s| s.as_str()).unwrap_or("")
181            }
182        }
183    }
184}
185
186/// Query key for prefix-based discovery queries
187/// Supports hierarchical queries from all endpoints down to specific endpoints
188#[derive(Debug, Clone, PartialEq, Eq, Hash)]
189pub enum DiscoveryQuery {
190    /// Query all endpoints in the system
191    AllEndpoints,
192    /// Query all endpoints in a specific namespace
193    NamespacedEndpoints {
194        namespace: String,
195    },
196    /// Query all endpoints in a namespace/component
197    ComponentEndpoints {
198        namespace: String,
199        component: String,
200    },
201    /// Query a specific endpoint
202    Endpoint {
203        namespace: String,
204        component: String,
205        endpoint: String,
206    },
207    AllModels,
208    NamespacedModels {
209        namespace: String,
210    },
211    ComponentModels {
212        namespace: String,
213        component: String,
214    },
215    EndpointModels {
216        namespace: String,
217        component: String,
218        endpoint: String,
219    },
220    /// Unified event channel query with optional scope filters
221    EventChannels(EventChannelQuery),
222}
223
224/// Unified query for event channels with optional scope filters
225#[derive(Debug, Clone, PartialEq, Eq, Hash)]
226pub struct EventChannelQuery {
227    /// Optional namespace filter
228    pub namespace: Option<String>,
229    /// Optional component filter (requires namespace to be meaningful)
230    pub component: Option<String>,
231    /// Optional topic filter (requires namespace and component to be meaningful)
232    pub topic: Option<String>,
233}
234
235impl EventChannelQuery {
236    /// Query all event channels (no filters)
237    pub fn all() -> Self {
238        Self {
239            namespace: None,
240            component: None,
241            topic: None,
242        }
243    }
244
245    /// Query event channels in a specific namespace
246    pub fn namespace(namespace: impl Into<String>) -> Self {
247        Self {
248            namespace: Some(namespace.into()),
249            component: None,
250            topic: None,
251        }
252    }
253
254    /// Query event channels for a specific component
255    pub fn component(namespace: impl Into<String>, component: impl Into<String>) -> Self {
256        Self {
257            namespace: Some(namespace.into()),
258            component: Some(component.into()),
259            topic: None,
260        }
261    }
262
263    /// Query event channels for a specific topic
264    pub fn topic(
265        namespace: impl Into<String>,
266        component: impl Into<String>,
267        topic: impl Into<String>,
268    ) -> Self {
269        Self {
270            namespace: Some(namespace.into()),
271            component: Some(component.into()),
272            topic: Some(topic.into()),
273        }
274    }
275
276    /// Get the scope level (0=all, 1=namespace, 2=component, 3=topic)
277    pub fn scope_level(&self) -> u8 {
278        if self.topic.is_some() {
279            3
280        } else if self.component.is_some() {
281            2
282        } else if self.namespace.is_some() {
283            1
284        } else {
285            0
286        }
287    }
288}
289
290/// Specification for registering objects in the discovery plane
291/// Represents the input to the register() operation
292#[derive(Debug, Clone, PartialEq, Eq)]
293pub enum DiscoverySpec {
294    /// Endpoint specification for registration
295    Endpoint {
296        namespace: String,
297        component: String,
298        endpoint: String,
299        /// Transport type and routing information
300        transport: TransportType,
301    },
302    Model {
303        namespace: String,
304        component: String,
305        endpoint: String,
306        /// ModelDeploymentCard serialized as JSON
307        /// This allows lib/runtime to remain independent of lib/llm types
308        /// DiscoverySpec.from_model() and DiscoveryInstance.deserialize_model() are ergonomic helpers to create and deserialize the model card.
309        card_json: serde_json::Value,
310        /// Optional suffix appended after instance_id in the key path (e.g., for LoRA adapters)
311        /// Key format: {namespace}/{component}/{endpoint}/{instance_id}[/{model_suffix}]
312        model_suffix: Option<String>,
313    },
314    /// Event plane channel specification
315    /// Used for registering event publishers/subscribers for discovery
316    EventChannel {
317        namespace: String,
318        component: String,
319        /// Topic name for this channel (e.g., "kv-events", "kv-metrics")
320        topic: String,
321        /// Event transport type (NATS subject prefix or ZMQ endpoint)
322        transport: EventTransport,
323    },
324}
325
326impl DiscoverySpec {
327    /// Creates a Model discovery spec from a serializable type
328    /// The card will be serialized to JSON to avoid cross-crate dependencies
329    pub fn from_model<T>(
330        namespace: String,
331        component: String,
332        endpoint: String,
333        card: &T,
334    ) -> Result<Self>
335    where
336        T: Serialize,
337    {
338        Self::from_model_with_suffix(namespace, component, endpoint, card, None)
339    }
340
341    /// Creates a Model discovery spec with an optional suffix (e.g., for LoRA adapters)
342    /// The suffix is appended after the instance_id in the key path
343    pub fn from_model_with_suffix<T>(
344        namespace: String,
345        component: String,
346        endpoint: String,
347        card: &T,
348        model_suffix: Option<String>,
349    ) -> Result<Self>
350    where
351        T: Serialize,
352    {
353        let card_json = serde_json::to_value(card)?;
354        Ok(Self::Model {
355            namespace,
356            component,
357            endpoint,
358            card_json,
359            model_suffix,
360        })
361    }
362
363    /// Attaches an instance ID to create a DiscoveryInstance
364    pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance {
365        match self {
366            Self::Endpoint {
367                namespace,
368                component,
369                endpoint,
370                transport,
371            } => DiscoveryInstance::Endpoint(crate::component::Instance {
372                namespace,
373                component,
374                endpoint,
375                instance_id,
376                transport,
377            }),
378            Self::Model {
379                namespace,
380                component,
381                endpoint,
382                card_json,
383                model_suffix,
384            } => DiscoveryInstance::Model {
385                namespace,
386                component,
387                endpoint,
388                instance_id,
389                card_json,
390                model_suffix,
391            },
392            Self::EventChannel {
393                namespace,
394                component,
395                topic,
396                transport,
397            } => DiscoveryInstance::EventChannel {
398                namespace,
399                component,
400                topic,
401                instance_id,
402                transport,
403            },
404        }
405    }
406}
407
408/// Registered instances in the discovery plane
409/// Represents objects that have been successfully registered with an instance ID
410#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
411#[serde(tag = "type")]
412pub enum DiscoveryInstance {
413    /// Registered endpoint instance - wraps the component::Instance directly
414    Endpoint(crate::component::Instance),
415    Model {
416        namespace: String,
417        component: String,
418        endpoint: String,
419        instance_id: u64,
420        /// ModelDeploymentCard serialized as JSON
421        /// This allows lib/runtime to remain independent of lib/llm types
422        card_json: serde_json::Value,
423        /// Optional suffix appended after instance_id in the key path (e.g., for LoRA adapters)
424        #[serde(default, skip_serializing_if = "Option::is_none")]
425        model_suffix: Option<String>,
426    },
427    /// Registered event channel instance for event plane pub/sub
428    EventChannel {
429        namespace: String,
430        component: String,
431        /// Topic name for this channel (e.g., "kv-events", "kv-metrics")
432        topic: String,
433        instance_id: u64,
434        /// Event transport type (NATS subject prefix or ZMQ endpoint)
435        transport: EventTransport,
436    },
437}
438
439impl DiscoveryInstance {
440    /// Returns the instance ID for this discovery instance
441    pub fn instance_id(&self) -> u64 {
442        match self {
443            Self::Endpoint(inst) => inst.instance_id,
444            Self::Model { instance_id, .. } => *instance_id,
445            Self::EventChannel { instance_id, .. } => *instance_id,
446        }
447    }
448
449    /// Deserializes the model JSON into the specified type T
450    /// Returns an error if this is not a Model instance or if deserialization fails
451    pub fn deserialize_model<T>(&self) -> Result<T>
452    where
453        T: for<'de> Deserialize<'de>,
454    {
455        match self {
456            Self::Model { card_json, .. } => Ok(serde_json::from_value(card_json.clone())?),
457            Self::Endpoint(_) => {
458                anyhow::bail!("Cannot deserialize model from Endpoint instance")
459            }
460            Self::EventChannel { .. } => {
461                anyhow::bail!("Cannot deserialize model from EventChannel instance")
462            }
463        }
464    }
465
466    /// Extracts the unique identifier for this discovery instance
467    /// Used for tracking, diffing, and removal events
468    pub fn id(&self) -> DiscoveryInstanceId {
469        match self {
470            Self::Endpoint(inst) => DiscoveryInstanceId::Endpoint(EndpointInstanceId {
471                namespace: inst.namespace.clone(),
472                component: inst.component.clone(),
473                endpoint: inst.endpoint.clone(),
474                instance_id: inst.instance_id,
475            }),
476            Self::Model {
477                namespace,
478                component,
479                endpoint,
480                instance_id,
481                model_suffix,
482                ..
483            } => DiscoveryInstanceId::Model(ModelCardInstanceId {
484                namespace: namespace.clone(),
485                component: component.clone(),
486                endpoint: endpoint.clone(),
487                instance_id: *instance_id,
488                model_suffix: model_suffix.clone(),
489            }),
490            Self::EventChannel {
491                namespace,
492                component,
493                topic,
494                instance_id,
495                ..
496            } => DiscoveryInstanceId::EventChannel(EventChannelInstanceId {
497                namespace: namespace.clone(),
498                component: component.clone(),
499                topic: topic.clone(),
500                instance_id: *instance_id,
501            }),
502        }
503    }
504}
505
506/// Unique identifier for an endpoint instance
507#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
508pub struct EndpointInstanceId {
509    pub namespace: String,
510    pub component: String,
511    pub endpoint: String,
512    pub instance_id: u64,
513}
514
515impl EndpointInstanceId {
516    /// Converts to a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}`
517    pub fn to_path(&self) -> String {
518        format!(
519            "{}/{}/{}/{:x}",
520            self.namespace, self.component, self.endpoint, self.instance_id
521        )
522    }
523
524    /// Parses from a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}`
525    pub fn from_path(path: &str) -> Result<Self> {
526        let parts: Vec<&str> = path.split('/').collect();
527        if parts.len() != 4 {
528            anyhow::bail!(
529                "Invalid EndpointInstanceId path: expected 4 parts, got {}",
530                parts.len()
531            );
532        }
533        Ok(Self {
534            namespace: parts[0].to_string(),
535            component: parts[1].to_string(),
536            endpoint: parts[2].to_string(),
537            instance_id: u64::from_str_radix(parts[3], 16)
538                .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
539        })
540    }
541}
542
543/// Unique identifier for a model card instance
544/// The combination of (namespace, component, endpoint, instance_id, model_suffix) uniquely identifies a model card
545#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
546pub struct ModelCardInstanceId {
547    pub namespace: String,
548    pub component: String,
549    pub endpoint: String,
550    pub instance_id: u64,
551    /// None for base models, Some(slug) for LoRA adapters
552    pub model_suffix: Option<String>,
553}
554
555/// Unique identifier for an event channel instance
556#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
557pub struct EventChannelInstanceId {
558    pub namespace: String,
559    pub component: String,
560    /// Topic name for this channel (e.g., "kv-events", "kv-metrics")
561    pub topic: String,
562    pub instance_id: u64,
563}
564
565impl EventChannelInstanceId {
566    /// Converts to a path string: `{namespace}/{component}/{topic}/{instance_id:x}`
567    pub fn to_path(&self) -> String {
568        format!(
569            "{}/{}/{}/{:x}",
570            self.namespace, self.component, self.topic, self.instance_id
571        )
572    }
573
574    /// Parses from a path string: `{namespace}/{component}/{topic}/{instance_id:x}`
575    pub fn from_path(path: &str) -> Result<Self> {
576        let parts: Vec<&str> = path.split('/').collect();
577        if parts.len() != 4 {
578            anyhow::bail!(
579                "Invalid EventChannelInstanceId path: expected 4 parts, got {}",
580                parts.len()
581            );
582        }
583        Ok(Self {
584            namespace: parts[0].to_string(),
585            component: parts[1].to_string(),
586            topic: parts[2].to_string(),
587            instance_id: u64::from_str_radix(parts[3], 16)
588                .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
589        })
590    }
591}
592
593impl ModelCardInstanceId {
594    /// Converts to a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}[/{model_suffix}]`
595    pub fn to_path(&self) -> String {
596        match &self.model_suffix {
597            Some(suffix) => format!(
598                "{}/{}/{}/{:x}/{}",
599                self.namespace, self.component, self.endpoint, self.instance_id, suffix
600            ),
601            None => format!(
602                "{}/{}/{}/{:x}",
603                self.namespace, self.component, self.endpoint, self.instance_id
604            ),
605        }
606    }
607
608    /// Parses from a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}[/{model_suffix}]`
609    pub fn from_path(path: &str) -> Result<Self> {
610        let parts: Vec<&str> = path.split('/').collect();
611        if parts.len() < 4 || parts.len() > 5 {
612            anyhow::bail!(
613                "Invalid ModelCardInstanceId path: expected 4 or 5 parts, got {}",
614                parts.len()
615            );
616        }
617        Ok(Self {
618            namespace: parts[0].to_string(),
619            component: parts[1].to_string(),
620            endpoint: parts[2].to_string(),
621            instance_id: u64::from_str_radix(parts[3], 16)
622                .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
623            model_suffix: parts.get(4).map(|s| s.to_string()),
624        })
625    }
626}
627
628/// Union of instance identifiers for different discovery object types
629#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
630pub enum DiscoveryInstanceId {
631    Endpoint(EndpointInstanceId),
632    Model(ModelCardInstanceId),
633    EventChannel(EventChannelInstanceId),
634}
635
636impl DiscoveryInstanceId {
637    /// Returns the raw instance_id regardless of variant type
638    pub fn instance_id(&self) -> u64 {
639        match self {
640            Self::Endpoint(eid) => eid.instance_id,
641            Self::Model(mid) => mid.instance_id,
642            Self::EventChannel(ecid) => ecid.instance_id,
643        }
644    }
645
646    /// Extracts the EndpointInstanceId, returning an error if this is a Model or EventChannel variant
647    pub fn extract_endpoint_id(&self) -> Result<&EndpointInstanceId> {
648        match self {
649            Self::Endpoint(eid) => Ok(eid),
650            Self::Model(_) => anyhow::bail!("Expected Endpoint variant, got Model"),
651            Self::EventChannel(_) => anyhow::bail!("Expected Endpoint variant, got EventChannel"),
652        }
653    }
654
655    /// Extracts the ModelCardInstanceId, returning an error if this is an Endpoint or EventChannel variant
656    pub fn extract_model_id(&self) -> Result<&ModelCardInstanceId> {
657        match self {
658            Self::Model(mid) => Ok(mid),
659            Self::Endpoint(_) => anyhow::bail!("Expected Model variant, got Endpoint"),
660            Self::EventChannel(_) => anyhow::bail!("Expected Model variant, got EventChannel"),
661        }
662    }
663
664    /// Extracts the EventChannelInstanceId, returning an error if this is an Endpoint or Model variant
665    pub fn extract_event_channel_id(&self) -> Result<&EventChannelInstanceId> {
666        match self {
667            Self::EventChannel(ecid) => Ok(ecid),
668            Self::Endpoint(_) => anyhow::bail!("Expected EventChannel variant, got Endpoint"),
669            Self::Model(_) => anyhow::bail!("Expected EventChannel variant, got Model"),
670        }
671    }
672}
673
674/// Events emitted by the discovery watch stream
675#[derive(Debug, Clone, PartialEq, Eq)]
676pub enum DiscoveryEvent {
677    /// A new instance was added
678    Added(DiscoveryInstance),
679    /// An instance was removed (identified by its unique ID)
680    Removed(DiscoveryInstanceId),
681}
682
683/// Stream type for discovery events
684pub type DiscoveryStream = Pin<Box<dyn Stream<Item = Result<DiscoveryEvent>> + Send>>;
685
686/// Discovery trait for service discovery across different backends
687#[async_trait]
688pub trait Discovery: Send + Sync {
689    /// Returns a unique identifier for this worker (e.g lease id if using etcd or generated id for memory store)
690    /// Discovery objects created by this worker will be associated with this id.
691    fn instance_id(&self) -> u64;
692
693    /// Registers an object in the discovery plane with the instance id
694    async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
695
696    /// Unregisters an instance from the discovery plane
697    async fn unregister(&self, instance: DiscoveryInstance) -> Result<()>;
698
699    /// Returns a list of currently registered instances for the given discovery query
700    /// This is a one-time snapshot without watching for changes
701    async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>>;
702
703    /// Returns a stream of discovery events (Added/Removed) for the given discovery query
704    /// The optional cancellation token can be used to stop the watch stream
705    async fn list_and_watch(
706        &self,
707        query: DiscoveryQuery,
708        cancel_token: Option<CancellationToken>,
709    ) -> Result<DiscoveryStream>;
710}