Skip to main content

dynamo_runtime/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Component] module defines the top-level API for building distributed applications.
5//!
6//! A distributed application consists of a set of [Component] that can host one
7//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
8//! that can be accessed by other [Component] in the distributed application.
9//!
10//! A [Component] is made discoverable by registering it with the distributed runtime under
11//! a [`Namespace`].
12//!
13//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
14//!
15//! We might extend namespace to include grouping behavior, which would define groups of
16//! components that are tightly coupled.
17//!
18//! A [Component] is the core building block of a distributed application. It is a logical
19//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
20//! distributed application.
21//!
22//! A [Component] can present to the distributed application one or more configuration files
23//! which define how that component was constructed/configured and what capabilities it can
24//! provide.
25//!
26//! Other [Component] can write to watching locations within a [Component] etcd
27//! path. This allows the [Component] to take dynamic actions depending on the watch
28//! triggers.
29//!
30//! TODO: Top-level Overview of Endpoints/Functions
31
32use std::fmt;
33
34use crate::{
35    config::HealthStatus,
36    distributed::RequestPlaneMode,
37    metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38    service::ServiceClient,
39    service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47    rustls::quic,
48    service::{Service, ServiceExt},
49};
50use dashmap::DashMap;
51use derive_builder::Builder;
52use derive_getters::Getters;
53use educe::Educe;
54use serde::{Deserialize, Serialize};
55use std::{collections::HashMap, hash::Hash, sync::Arc};
56use validator::{Validate, ValidationError};
57
58mod client;
59#[allow(clippy::module_inception)]
60mod component;
61mod endpoint;
62mod namespace;
63mod registry;
64pub mod service;
65
66pub use client::Client;
67pub(crate) use client::RoutingOccupancyState;
68pub(crate) use client::get_or_create_routing_occupancy_state;
69pub use endpoint::build_transport_type;
70
71#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
72#[serde(rename_all = "snake_case")]
73pub enum TransportType {
74    #[serde(rename = "nats_tcp")]
75    Nats(String),
76    Http(String),
77    Tcp(String),
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
81#[serde(rename_all = "snake_case")]
82pub enum DeviceType {
83    Cpu,
84    Cuda,
85}
86
87#[derive(Default)]
88pub struct RegistryInner {
89    pub(crate) services: HashMap<String, Service>,
90}
91
92#[derive(Clone)]
93pub struct Registry {
94    pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
98pub struct Instance {
99    pub component: String,
100    pub endpoint: String,
101    pub namespace: String,
102    pub instance_id: u64,
103    pub transport: TransportType,
104    #[serde(default, skip_serializing_if = "Option::is_none")]
105    pub device_type: Option<DeviceType>,
106}
107
108impl Instance {
109    pub fn id(&self) -> u64 {
110        self.instance_id
111    }
112    pub fn endpoint_id(&self) -> EndpointId {
113        EndpointId {
114            namespace: self.namespace.clone(),
115            component: self.component.clone(),
116            name: self.endpoint.clone(),
117        }
118    }
119}
120
121impl fmt::Display for Instance {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        write!(
124            f,
125            "{}/{}/{}/{}",
126            self.namespace, self.component, self.endpoint, self.instance_id
127        )
128    }
129}
130
131/// Sort by string name
132impl std::cmp::Ord for Instance {
133    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
134        self.to_string().cmp(&other.to_string())
135    }
136}
137
138impl PartialOrd for Instance {
139    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
140        // Since Ord is fully implemented, the comparison is always total.
141        Some(self.cmp(other))
142    }
143}
144
145/// A [Component] a discoverable entity in the distributed runtime.
146/// You can host [Endpoint] on a [Component] by first creating
147/// a [Service] then adding one or more [Endpoint] to the [Service].
148///
149/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
150#[derive(Educe, Builder, Clone, Validate)]
151#[educe(Debug)]
152#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
153pub struct Component {
154    #[builder(private)]
155    #[educe(Debug(ignore))]
156    drt: Arc<DistributedRuntime>,
157
158    /// Name of the component
159    #[builder(setter(into))]
160    #[validate(custom(function = "validate_allowed_chars"))]
161    name: String,
162
163    /// Additional labels for metrics
164    #[builder(default = "Vec::new()")]
165    labels: Vec<(String, String)>,
166
167    // todo - restrict the namespace to a-z0-9-_A-Z
168    /// Namespace
169    #[builder(setter(into))]
170    namespace: Namespace,
171
172    /// This hierarchy's own metrics registry
173    #[builder(default = "crate::MetricsRegistry::new()")]
174    metrics_registry: crate::MetricsRegistry,
175}
176
177impl Hash for Component {
178    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
179        self.namespace.name().hash(state);
180        self.name.hash(state);
181    }
182}
183
184impl PartialEq for Component {
185    fn eq(&self, other: &Self) -> bool {
186        self.namespace.name() == other.namespace.name() && self.name == other.name
187    }
188}
189
190impl Eq for Component {}
191
192impl std::fmt::Display for Component {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        write!(f, "{}.{}", self.namespace.name(), self.name)
195    }
196}
197
198impl DistributedRuntimeProvider for Component {
199    fn drt(&self) -> &DistributedRuntime {
200        &self.drt
201    }
202}
203
204impl RuntimeProvider for Component {
205    fn rt(&self) -> &Runtime {
206        self.drt.rt()
207    }
208}
209
210impl MetricsHierarchy for Component {
211    fn basename(&self) -> String {
212        self.name.clone()
213    }
214
215    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
216        let mut parents = vec![];
217
218        // Get all ancestors of namespace (DRT, parent namespaces, etc.)
219        parents.extend(self.namespace.parent_hierarchies());
220
221        // Add namespace itself
222        parents.push(&self.namespace as &dyn MetricsHierarchy);
223
224        parents
225    }
226
227    fn get_metrics_registry(&self) -> &MetricsRegistry {
228        &self.metrics_registry
229    }
230
231    fn connection_id(&self) -> Option<u64> {
232        Some(self.drt.connection_id())
233    }
234}
235
236impl Component {
237    pub fn service_name(&self) -> String {
238        let service_name = format!("{}_{}", self.namespace.name(), self.name);
239        Slug::slugify(&service_name).to_string()
240    }
241
242    pub fn namespace(&self) -> &Namespace {
243        &self.namespace
244    }
245
246    pub fn name(&self) -> &str {
247        &self.name
248    }
249
250    pub fn labels(&self) -> &[(String, String)] {
251        &self.labels
252    }
253
254    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
255        let endpoint = Endpoint {
256            component: self.clone(),
257            name: endpoint.into(),
258            labels: Vec::new(),
259            metrics_registry: crate::MetricsRegistry::new(),
260        };
261        // Attach endpoint registry so scrapes traverse separate registries (avoids collisions).
262        self.get_metrics_registry()
263            .add_child_registry(endpoint.get_metrics_registry());
264        endpoint
265    }
266
267    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
268        let discovery = self.drt.discovery();
269
270        let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
271            namespace: self.namespace.name(),
272            component: self.name.clone(),
273        };
274
275        let discovery_instances = discovery.list(discovery_query).await?;
276
277        // Extract Instance from DiscoveryInstance::Endpoint wrapper
278        let mut instances: Vec<Instance> = discovery_instances
279            .into_iter()
280            .filter_map(|di| match di {
281                crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
282                _ => None, // Ignore all other variants (ModelCard, etc.)
283            })
284            .collect();
285
286        instances.sort();
287        Ok(instances)
288    }
289}
290
291impl ComponentBuilder {
292    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
293        Self::default().drt(drt)
294    }
295
296    pub fn build(self) -> Result<Component, anyhow::Error> {
297        let component = self.build_internal()?;
298        // If this component is using NATS, register the NATS service and wait for completion.
299        // This prevents a race condition where serve_endpoint() tries to look up the service
300        // before it's registered in the component registry.
301        let drt = component.drt();
302        if drt.request_plane().is_nats() {
303            let mut rx = drt.register_nats_service(component.clone());
304            // Wait synchronously for the NATS service registration to complete.
305            // Uses block_in_place() to safely call blocking_recv() from async contexts.
306            // This temporarily moves the current task off the runtime thread to allow
307            // blocking without deadlocking the runtime.
308            let result = tokio::task::block_in_place(|| rx.blocking_recv());
309            match result {
310                Some(Ok(())) => {
311                    tracing::debug!(
312                        component = component.service_name(),
313                        "NATS service registration completed"
314                    );
315                }
316                Some(Err(e)) => {
317                    return Err(anyhow::anyhow!(
318                        "NATS service registration failed for component '{}': {}",
319                        component.service_name(),
320                        e
321                    ));
322                }
323                None => {
324                    return Err(anyhow::anyhow!(
325                        "NATS service registration channel closed unexpectedly for component '{}'",
326                        component.service_name()
327                    ));
328                }
329            }
330        }
331        Ok(component)
332    }
333}
334
335#[derive(Debug, Clone)]
336pub struct Endpoint {
337    component: Component,
338
339    // todo - restrict alphabet
340    /// Endpoint name
341    name: String,
342
343    /// Additional labels for metrics
344    labels: Vec<(String, String)>,
345
346    /// This hierarchy's own metrics registry
347    metrics_registry: crate::MetricsRegistry,
348}
349
350impl Hash for Endpoint {
351    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
352        self.component.hash(state);
353        self.name.hash(state);
354    }
355}
356
357impl PartialEq for Endpoint {
358    fn eq(&self, other: &Self) -> bool {
359        self.component == other.component && self.name == other.name
360    }
361}
362
363impl Eq for Endpoint {}
364
365impl DistributedRuntimeProvider for Endpoint {
366    fn drt(&self) -> &DistributedRuntime {
367        self.component.drt()
368    }
369}
370
371impl RuntimeProvider for Endpoint {
372    fn rt(&self) -> &Runtime {
373        self.component.rt()
374    }
375}
376
377impl MetricsHierarchy for Endpoint {
378    fn basename(&self) -> String {
379        self.name.clone()
380    }
381
382    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
383        let mut parents = vec![];
384
385        // Get all ancestors of component (DRT, Namespace, etc.)
386        parents.extend(self.component.parent_hierarchies());
387
388        // Add component itself
389        parents.push(&self.component as &dyn MetricsHierarchy);
390
391        parents
392    }
393
394    fn get_metrics_registry(&self) -> &MetricsRegistry {
395        &self.metrics_registry
396    }
397
398    fn connection_id(&self) -> Option<u64> {
399        Some(self.component.drt().connection_id())
400    }
401}
402
403impl Endpoint {
404    pub fn id(&self) -> EndpointId {
405        EndpointId {
406            namespace: self.component.namespace().name().to_string(),
407            component: self.component.name().to_string(),
408            name: self.name().to_string(),
409        }
410    }
411
412    pub fn name(&self) -> &str {
413        &self.name
414    }
415
416    pub fn component(&self) -> &Component {
417        &self.component
418    }
419
420    pub async fn client(&self) -> anyhow::Result<client::Client> {
421        client::Client::new(self.clone()).await
422    }
423
424    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
425        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
426    }
427}
428
429#[derive(Builder, Clone, Validate)]
430#[builder(pattern = "owned")]
431pub struct Namespace {
432    #[builder(private)]
433    runtime: Arc<DistributedRuntime>,
434
435    #[validate(custom(function = "validate_allowed_chars"))]
436    name: String,
437
438    #[builder(default = "None")]
439    parent: Option<Arc<Namespace>>,
440
441    /// Additional labels for metrics
442    #[builder(default = "Vec::new()")]
443    labels: Vec<(String, String)>,
444
445    /// This hierarchy's own metrics registry
446    #[builder(default = "crate::MetricsRegistry::new()")]
447    metrics_registry: crate::MetricsRegistry,
448
449    /// Cache for components to avoid duplicate registrations and metrics collisions.
450    /// When the same component is requested multiple times, we return the cached instance
451    /// to ensure all endpoints share the same Component and MetricsRegistry.
452    /// Uses DashMap for lock-free reads and automatic handling of concurrent inserts.
453    #[builder(default = "Arc::new(DashMap::new())")]
454    component_cache: Arc<DashMap<String, Component>>,
455}
456
457impl DistributedRuntimeProvider for Namespace {
458    fn drt(&self) -> &DistributedRuntime {
459        &self.runtime
460    }
461}
462
463impl std::fmt::Debug for Namespace {
464    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465        write!(
466            f,
467            "Namespace {{ name: {}; parent: {:?} }}",
468            self.name, self.parent
469        )
470    }
471}
472
473impl RuntimeProvider for Namespace {
474    fn rt(&self) -> &Runtime {
475        self.runtime.rt()
476    }
477}
478
479impl std::fmt::Display for Namespace {
480    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481        write!(f, "{}", self.name)
482    }
483}
484
485impl Namespace {
486    pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
487        let ns = NamespaceBuilder::default()
488            .runtime(Arc::new(runtime))
489            .name(name)
490            .build()?;
491        // Attach namespace registry so scrapes traverse separate registries (avoids collisions).
492        ns.drt()
493            .get_metrics_registry()
494            .add_child_registry(ns.get_metrics_registry());
495        Ok(ns)
496    }
497
498    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
499    ///
500    /// Components are cached by name to ensure that multiple calls with the same name
501    /// return the same Component instance. This prevents duplicate metrics registrations
502    /// and ensures all endpoints share the same Component's MetricsRegistry.
503    pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
504        let name = name.into();
505
506        // Fast path: Check if component exists in cache
507        // DashMap provides lock-free reads via internal sharding
508        if let Some(cached) = self.component_cache.get(&name) {
509            return Ok(cached.value().clone());
510        }
511
512        // Slow path: Create new component
513        let component = ComponentBuilder::from_runtime(self.runtime.clone())
514            .name(&name)
515            .namespace(self.clone())
516            .build()?;
517
518        // Attach component registry so scrapes traverse separate registries (avoids collisions).
519        self.get_metrics_registry()
520            .add_child_registry(component.get_metrics_registry());
521
522        // Cache the component for future calls
523        // DashMap handles race conditions internally - if another thread
524        // inserted the same key concurrently, we just use our created component
525        self.component_cache.insert(name, component.clone());
526
527        Ok(component)
528    }
529
530    /// Create a [`Namespace`] in the parent namespace
531    pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
532        let child = NamespaceBuilder::default()
533            .runtime(self.runtime.clone())
534            .name(name.into())
535            .parent(Some(Arc::new(self.clone())))
536            .build()?;
537        // Attach child namespace registry so scrapes traverse separate registries (avoids collisions).
538        self.get_metrics_registry()
539            .add_child_registry(child.get_metrics_registry());
540        Ok(child)
541    }
542
543    pub fn name(&self) -> String {
544        match &self.parent {
545            Some(parent) => format!("{}.{}", parent.name(), self.name),
546            None => self.name.clone(),
547        }
548    }
549}
550
551// Custom validator function
552fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
553    // Define the allowed character set using a regex
554    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
555
556    if regex.is_match(input) {
557        Ok(())
558    } else {
559        Err(ValidationError::new("invalid_characters"))
560    }
561}