Skip to main content

dynamo_runtime/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Component] module defines the top-level API for building distributed applications.
5//!
6//! A distributed application consists of a set of [Component] that can host one
7//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
8//! that can be accessed by other [Component] in the distributed application.
9//!
10//! A [Component] is made discoverable by registering it with the distributed runtime under
11//! a [`Namespace`].
12//!
13//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
14//!
15//! We might extend namespace to include grouping behavior, which would define groups of
16//! components that are tightly coupled.
17//!
18//! A [Component] is the core building block of a distributed application. It is a logical
19//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
20//! distributed application.
21//!
22//! A [Component] can present to the distributed application one or more configuration files
23//! which define how that component was constructed/configured and what capabilities it can
24//! provide.
25//!
26//! Other [Component] can write to watching locations within a [Component] etcd
27//! path. This allows the [Component] to take dynamic actions depending on the watch
28//! triggers.
29//!
30//! TODO: Top-level Overview of Endpoints/Functions
31
32use std::fmt;
33
34use crate::{
35    config::HealthStatus,
36    distributed::RequestPlaneMode,
37    metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38    service::ServiceClient,
39    service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47    rustls::quic,
48    service::{Service, ServiceExt},
49};
50use dashmap::DashMap;
51use derive_builder::Builder;
52use derive_getters::Getters;
53use educe::Educe;
54use serde::{Deserialize, Serialize};
55use std::{collections::HashMap, hash::Hash, sync::Arc};
56use validator::{Validate, ValidationError};
57
58mod client;
59#[allow(clippy::module_inception)]
60mod component;
61mod endpoint;
62mod namespace;
63mod registry;
64pub mod service;
65
66pub use client::Client;
67pub(crate) use client::EndpointDiscoverySource;
68pub(crate) use client::RoutingOccupancyState;
69pub(crate) use client::get_or_create_routing_occupancy_state;
70pub use endpoint::build_transport_type;
71
72#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
73#[serde(rename_all = "snake_case")]
74pub enum TransportType {
75    #[serde(rename = "nats_tcp")]
76    Nats(String),
77    Http(String),
78    Tcp(String),
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
82#[serde(rename_all = "snake_case")]
83pub enum DeviceType {
84    Cpu,
85    Cuda,
86}
87
88#[derive(Default)]
89pub struct RegistryInner {
90    pub(crate) services: HashMap<String, Service>,
91}
92
93#[derive(Clone)]
94pub struct Registry {
95    pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
99pub struct Instance {
100    pub component: String,
101    pub endpoint: String,
102    pub namespace: String,
103    pub instance_id: u64,
104    pub transport: TransportType,
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub device_type: Option<DeviceType>,
107}
108
109impl Instance {
110    pub fn id(&self) -> u64 {
111        self.instance_id
112    }
113
114    pub fn endpoint_id(&self) -> EndpointId {
115        EndpointId {
116            namespace: self.namespace.clone(),
117            component: self.component.clone(),
118            name: self.endpoint.clone(),
119        }
120    }
121
122    pub fn endpoint_instance_id(&self) -> crate::discovery::EndpointInstanceId {
123        crate::discovery::EndpointInstanceId {
124            namespace: self.namespace.clone(),
125            component: self.component.clone(),
126            endpoint: self.endpoint.clone(),
127            instance_id: self.instance_id,
128        }
129    }
130}
131
132impl fmt::Display for Instance {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        write!(
135            f,
136            "{}/{}/{}/{}",
137            self.namespace, self.component, self.endpoint, self.instance_id
138        )
139    }
140}
141
142/// Sort by string name
143impl std::cmp::Ord for Instance {
144    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
145        self.to_string().cmp(&other.to_string())
146    }
147}
148
149impl PartialOrd for Instance {
150    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
151        // Since Ord is fully implemented, the comparison is always total.
152        Some(self.cmp(other))
153    }
154}
155
156/// A [Component] a discoverable entity in the distributed runtime.
157/// You can host [Endpoint] on a [Component] by first creating
158/// a [Service] then adding one or more [Endpoint] to the [Service].
159///
160/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
161#[derive(Educe, Builder, Clone, Validate)]
162#[educe(Debug)]
163#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
164pub struct Component {
165    #[builder(private)]
166    #[educe(Debug(ignore))]
167    drt: Arc<DistributedRuntime>,
168
169    /// Name of the component
170    #[builder(setter(into))]
171    #[validate(custom(function = "validate_allowed_chars"))]
172    name: String,
173
174    /// Additional labels for metrics
175    #[builder(default = "Vec::new()")]
176    labels: Vec<(String, String)>,
177
178    // todo - restrict the namespace to a-z0-9-_A-Z
179    /// Namespace
180    #[builder(setter(into))]
181    namespace: Namespace,
182
183    /// This hierarchy's own metrics registry
184    #[builder(default = "crate::MetricsRegistry::new()")]
185    metrics_registry: crate::MetricsRegistry,
186}
187
188impl Hash for Component {
189    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
190        self.namespace.name().hash(state);
191        self.name.hash(state);
192    }
193}
194
195impl PartialEq for Component {
196    fn eq(&self, other: &Self) -> bool {
197        self.namespace.name() == other.namespace.name() && self.name == other.name
198    }
199}
200
201impl Eq for Component {}
202
203impl std::fmt::Display for Component {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        write!(f, "{}.{}", self.namespace.name(), self.name)
206    }
207}
208
209impl DistributedRuntimeProvider for Component {
210    fn drt(&self) -> &DistributedRuntime {
211        &self.drt
212    }
213}
214
215impl RuntimeProvider for Component {
216    fn rt(&self) -> &Runtime {
217        self.drt.rt()
218    }
219}
220
221impl MetricsHierarchy for Component {
222    fn basename(&self) -> String {
223        self.name.clone()
224    }
225
226    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
227        let mut parents = vec![];
228
229        // Get all ancestors of namespace (DRT, parent namespaces, etc.)
230        parents.extend(self.namespace.parent_hierarchies());
231
232        // Add namespace itself
233        parents.push(&self.namespace as &dyn MetricsHierarchy);
234
235        parents
236    }
237
238    fn get_metrics_registry(&self) -> &MetricsRegistry {
239        &self.metrics_registry
240    }
241
242    fn connection_id(&self) -> Option<u64> {
243        Some(self.drt.connection_id())
244    }
245}
246
247impl Component {
248    pub fn service_name(&self) -> String {
249        let service_name = format!("{}_{}", self.namespace.name(), self.name);
250        Slug::slugify(&service_name).to_string()
251    }
252
253    pub fn namespace(&self) -> &Namespace {
254        &self.namespace
255    }
256
257    pub fn name(&self) -> &str {
258        &self.name
259    }
260
261    pub fn labels(&self) -> &[(String, String)] {
262        &self.labels
263    }
264
265    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
266        let endpoint = Endpoint {
267            component: self.clone(),
268            name: endpoint.into(),
269            labels: Vec::new(),
270            metrics_registry: crate::MetricsRegistry::new(),
271        };
272        // Attach endpoint registry so scrapes traverse separate registries (avoids collisions).
273        self.get_metrics_registry()
274            .add_child_registry(endpoint.get_metrics_registry());
275        endpoint
276    }
277
278    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
279        let discovery = self.drt.discovery();
280
281        let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
282            namespace: self.namespace.name(),
283            component: self.name.clone(),
284        };
285
286        let discovery_instances = discovery.list(discovery_query).await?;
287
288        // Extract Instance from DiscoveryInstance::Endpoint wrapper
289        let mut instances: Vec<Instance> = discovery_instances
290            .into_iter()
291            .filter_map(|di| match di {
292                crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
293                _ => None, // Ignore all other variants (ModelCard, etc.)
294            })
295            .collect();
296
297        instances.sort();
298        Ok(instances)
299    }
300}
301
302impl ComponentBuilder {
303    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
304        Self::default().drt(drt)
305    }
306
307    pub fn build(self) -> Result<Component, anyhow::Error> {
308        let component = self.build_internal()?;
309        // If this component is using NATS, register the NATS service and wait for completion.
310        // This prevents a race condition where serve_endpoint() tries to look up the service
311        // before it's registered in the component registry.
312        let drt = component.drt();
313        if drt.request_plane().is_nats() {
314            let mut rx = drt.register_nats_service(component.clone());
315            // Wait synchronously for the NATS service registration to complete.
316            // Uses block_in_place() to safely call blocking_recv() from async contexts.
317            // This temporarily moves the current task off the runtime thread to allow
318            // blocking without deadlocking the runtime.
319            let result = tokio::task::block_in_place(|| rx.blocking_recv());
320            match result {
321                Some(Ok(())) => {
322                    tracing::debug!(
323                        component = component.service_name(),
324                        "NATS service registration completed"
325                    );
326                }
327                Some(Err(e)) => {
328                    return Err(anyhow::anyhow!(
329                        "NATS service registration failed for component '{}': {}",
330                        component.service_name(),
331                        e
332                    ));
333                }
334                None => {
335                    return Err(anyhow::anyhow!(
336                        "NATS service registration channel closed unexpectedly for component '{}'",
337                        component.service_name()
338                    ));
339                }
340            }
341        }
342        Ok(component)
343    }
344}
345
346#[derive(Debug, Clone)]
347pub struct Endpoint {
348    component: Component,
349
350    // todo - restrict alphabet
351    /// Endpoint name
352    name: String,
353
354    /// Additional labels for metrics
355    labels: Vec<(String, String)>,
356
357    /// This hierarchy's own metrics registry
358    metrics_registry: crate::MetricsRegistry,
359}
360
361impl Hash for Endpoint {
362    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
363        self.component.hash(state);
364        self.name.hash(state);
365    }
366}
367
368impl PartialEq for Endpoint {
369    fn eq(&self, other: &Self) -> bool {
370        self.component == other.component && self.name == other.name
371    }
372}
373
374impl Eq for Endpoint {}
375
376impl DistributedRuntimeProvider for Endpoint {
377    fn drt(&self) -> &DistributedRuntime {
378        self.component.drt()
379    }
380}
381
382impl RuntimeProvider for Endpoint {
383    fn rt(&self) -> &Runtime {
384        self.component.rt()
385    }
386}
387
388impl MetricsHierarchy for Endpoint {
389    fn basename(&self) -> String {
390        self.name.clone()
391    }
392
393    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
394        let mut parents = vec![];
395
396        // Get all ancestors of component (DRT, Namespace, etc.)
397        parents.extend(self.component.parent_hierarchies());
398
399        // Add component itself
400        parents.push(&self.component as &dyn MetricsHierarchy);
401
402        parents
403    }
404
405    fn get_metrics_registry(&self) -> &MetricsRegistry {
406        &self.metrics_registry
407    }
408
409    fn connection_id(&self) -> Option<u64> {
410        Some(self.component.drt().connection_id())
411    }
412}
413
414impl Endpoint {
415    pub fn id(&self) -> EndpointId {
416        EndpointId {
417            namespace: self.component.namespace().name().to_string(),
418            component: self.component.name().to_string(),
419            name: self.name().to_string(),
420        }
421    }
422
423    pub fn name(&self) -> &str {
424        &self.name
425    }
426
427    pub fn component(&self) -> &Component {
428        &self.component
429    }
430
431    pub async fn client(&self) -> anyhow::Result<client::Client> {
432        client::Client::new(self.clone()).await
433    }
434
435    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
436        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
437    }
438}
439
440#[derive(Builder, Clone, Validate)]
441#[builder(pattern = "owned")]
442pub struct Namespace {
443    #[builder(private)]
444    runtime: Arc<DistributedRuntime>,
445
446    #[validate(custom(function = "validate_allowed_chars"))]
447    name: String,
448
449    #[builder(default = "None")]
450    parent: Option<Arc<Namespace>>,
451
452    /// Additional labels for metrics
453    #[builder(default = "Vec::new()")]
454    labels: Vec<(String, String)>,
455
456    /// This hierarchy's own metrics registry
457    #[builder(default = "crate::MetricsRegistry::new()")]
458    metrics_registry: crate::MetricsRegistry,
459
460    /// Cache for components to avoid duplicate registrations and metrics collisions.
461    /// When the same component is requested multiple times, we return the cached instance
462    /// to ensure all endpoints share the same Component and MetricsRegistry.
463    /// Uses DashMap for lock-free reads and automatic handling of concurrent inserts.
464    #[builder(default = "Arc::new(DashMap::new())")]
465    component_cache: Arc<DashMap<String, Component>>,
466}
467
468impl DistributedRuntimeProvider for Namespace {
469    fn drt(&self) -> &DistributedRuntime {
470        &self.runtime
471    }
472}
473
474impl std::fmt::Debug for Namespace {
475    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476        write!(
477            f,
478            "Namespace {{ name: {}; parent: {:?} }}",
479            self.name, self.parent
480        )
481    }
482}
483
484impl RuntimeProvider for Namespace {
485    fn rt(&self) -> &Runtime {
486        self.runtime.rt()
487    }
488}
489
490impl std::fmt::Display for Namespace {
491    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492        write!(f, "{}", self.name)
493    }
494}
495
496impl Namespace {
497    pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
498        let ns = NamespaceBuilder::default()
499            .runtime(Arc::new(runtime))
500            .name(name)
501            .build()?;
502        // Attach namespace registry so scrapes traverse separate registries (avoids collisions).
503        ns.drt()
504            .get_metrics_registry()
505            .add_child_registry(ns.get_metrics_registry());
506        Ok(ns)
507    }
508
509    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
510    ///
511    /// Components are cached by name to ensure that multiple calls with the same name
512    /// return the same Component instance. This prevents duplicate metrics registrations
513    /// and ensures all endpoints share the same Component's MetricsRegistry.
514    pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
515        let name = name.into();
516
517        // Fast path: Check if component exists in cache
518        // DashMap provides lock-free reads via internal sharding
519        if let Some(cached) = self.component_cache.get(&name) {
520            return Ok(cached.value().clone());
521        }
522
523        // Slow path: Create new component
524        let component = ComponentBuilder::from_runtime(self.runtime.clone())
525            .name(&name)
526            .namespace(self.clone())
527            .build()?;
528
529        // Attach component registry so scrapes traverse separate registries (avoids collisions).
530        self.get_metrics_registry()
531            .add_child_registry(component.get_metrics_registry());
532
533        // Cache the component for future calls
534        // DashMap handles race conditions internally - if another thread
535        // inserted the same key concurrently, we just use our created component
536        self.component_cache.insert(name, component.clone());
537
538        Ok(component)
539    }
540
541    /// Create a [`Namespace`] in the parent namespace
542    pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
543        let child = NamespaceBuilder::default()
544            .runtime(self.runtime.clone())
545            .name(name.into())
546            .parent(Some(Arc::new(self.clone())))
547            .build()?;
548        // Attach child namespace registry so scrapes traverse separate registries (avoids collisions).
549        self.get_metrics_registry()
550            .add_child_registry(child.get_metrics_registry());
551        Ok(child)
552    }
553
554    pub fn name(&self) -> String {
555        match &self.parent {
556            Some(parent) => format!("{}.{}", parent.name(), self.name),
557            None => self.name.clone(),
558        }
559    }
560}
561
562// Custom validator function
563fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
564    // Define the allowed character set using a regex
565    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
566
567    if regex.is_match(input) {
568        Ok(())
569    } else {
570        Err(ValidationError::new("invalid_characters"))
571    }
572}