dynamo_runtime/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Component] module defines the top-level API for building distributed applications.
5//!
6//! A distributed application consists of a set of [Component] that can host one
7//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
8//! that can be accessed by other [Component] in the distributed application.
9//!
10//! A [Component] is made discoverable by registering it with the distributed runtime under
11//! a [`Namespace`].
12//!
13//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
14//!
15//! We might extend namespace to include grouping behavior, which would define groups of
16//! components that are tightly coupled.
17//!
18//! A [Component] is the core building block of a distributed application. It is a logical
19//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
20//! distributed application.
21//!
22//! A [Component] can present to the distributed application one or more configuration files
23//! which define how that component was constructed/configured and what capabilities it can
24//! provide.
25//!
26//! Other [Component] can write to watching locations within a [Component] etcd
27//! path. This allows the [Component] to take dynamic actions depending on the watch
28//! triggers.
29//!
30//! TODO: Top-level Overview of Endpoints/Functions
31
32use std::fmt;
33
34use crate::{
35    config::HealthStatus,
36    distributed::RequestPlaneMode,
37    metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38    service::ServiceClient,
39    service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47    rustls::quic,
48    service::{Service, ServiceExt},
49};
50use derive_builder::Builder;
51use derive_getters::Getters;
52use educe::Educe;
53use serde::{Deserialize, Serialize};
54use std::{collections::HashMap, hash::Hash, sync::Arc};
55use validator::{Validate, ValidationError};
56
57mod client;
58#[allow(clippy::module_inception)]
59mod component;
60mod endpoint;
61mod namespace;
62mod registry;
63pub mod service;
64
65pub use client::Client;
66pub use endpoint::build_transport_type;
67
68#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
69#[serde(rename_all = "snake_case")]
70pub enum TransportType {
71    #[serde(rename = "nats_tcp")]
72    Nats(String),
73    Http(String),
74    Tcp(String),
75}
76
77#[derive(Default)]
78pub struct RegistryInner {
79    pub(crate) services: HashMap<String, Service>,
80}
81
82#[derive(Clone)]
83pub struct Registry {
84    pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
88pub struct Instance {
89    pub component: String,
90    pub endpoint: String,
91    pub namespace: String,
92    pub instance_id: u64,
93    pub transport: TransportType,
94}
95
96impl Instance {
97    pub fn id(&self) -> u64 {
98        self.instance_id
99    }
100    pub fn endpoint_id(&self) -> EndpointId {
101        EndpointId {
102            namespace: self.namespace.clone(),
103            component: self.component.clone(),
104            name: self.endpoint.clone(),
105        }
106    }
107}
108
109impl fmt::Display for Instance {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        write!(
112            f,
113            "{}/{}/{}/{}",
114            self.namespace, self.component, self.endpoint, self.instance_id
115        )
116    }
117}
118
119/// Sort by string name
120impl std::cmp::Ord for Instance {
121    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
122        self.to_string().cmp(&other.to_string())
123    }
124}
125
126impl PartialOrd for Instance {
127    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
128        // Since Ord is fully implemented, the comparison is always total.
129        Some(self.cmp(other))
130    }
131}
132
133/// A [Component] a discoverable entity in the distributed runtime.
134/// You can host [Endpoint] on a [Component] by first creating
135/// a [Service] then adding one or more [Endpoint] to the [Service].
136///
137/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
138#[derive(Educe, Builder, Clone, Validate)]
139#[educe(Debug)]
140#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
141pub struct Component {
142    #[builder(private)]
143    #[educe(Debug(ignore))]
144    drt: Arc<DistributedRuntime>,
145
146    /// Name of the component
147    #[builder(setter(into))]
148    #[validate(custom(function = "validate_allowed_chars"))]
149    name: String,
150
151    /// Additional labels for metrics
152    #[builder(default = "Vec::new()")]
153    labels: Vec<(String, String)>,
154
155    // todo - restrict the namespace to a-z0-9-_A-Z
156    /// Namespace
157    #[builder(setter(into))]
158    namespace: Namespace,
159
160    /// This hierarchy's own metrics registry
161    #[builder(default = "crate::MetricsRegistry::new()")]
162    metrics_registry: crate::MetricsRegistry,
163}
164
165impl Hash for Component {
166    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
167        self.namespace.name().hash(state);
168        self.name.hash(state);
169    }
170}
171
172impl PartialEq for Component {
173    fn eq(&self, other: &Self) -> bool {
174        self.namespace.name() == other.namespace.name() && self.name == other.name
175    }
176}
177
178impl Eq for Component {}
179
180impl std::fmt::Display for Component {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        write!(f, "{}.{}", self.namespace.name(), self.name)
183    }
184}
185
186impl DistributedRuntimeProvider for Component {
187    fn drt(&self) -> &DistributedRuntime {
188        &self.drt
189    }
190}
191
192impl RuntimeProvider for Component {
193    fn rt(&self) -> &Runtime {
194        self.drt.rt()
195    }
196}
197
198impl MetricsHierarchy for Component {
199    fn basename(&self) -> String {
200        self.name.clone()
201    }
202
203    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
204        let mut parents = vec![];
205
206        // Get all ancestors of namespace (DRT, parent namespaces, etc.)
207        parents.extend(self.namespace.parent_hierarchies());
208
209        // Add namespace itself
210        parents.push(&self.namespace as &dyn MetricsHierarchy);
211
212        parents
213    }
214
215    fn get_metrics_registry(&self) -> &MetricsRegistry {
216        &self.metrics_registry
217    }
218}
219
220impl Component {
221    pub fn service_name(&self) -> String {
222        let service_name = format!("{}_{}", self.namespace.name(), self.name);
223        Slug::slugify(&service_name).to_string()
224    }
225
226    pub fn namespace(&self) -> &Namespace {
227        &self.namespace
228    }
229
230    pub fn name(&self) -> &str {
231        &self.name
232    }
233
234    pub fn labels(&self) -> &[(String, String)] {
235        &self.labels
236    }
237
238    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
239        Endpoint {
240            component: self.clone(),
241            name: endpoint.into(),
242            labels: Vec::new(),
243            metrics_registry: crate::MetricsRegistry::new(),
244        }
245    }
246
247    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
248        let discovery = self.drt.discovery();
249
250        let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
251            namespace: self.namespace.name(),
252            component: self.name.clone(),
253        };
254
255        let discovery_instances = discovery.list(discovery_query).await?;
256
257        // Extract Instance from DiscoveryInstance::Endpoint wrapper
258        let mut instances: Vec<Instance> = discovery_instances
259            .into_iter()
260            .filter_map(|di| match di {
261                crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
262                _ => None, // Ignore all other variants (ModelCard, etc.)
263            })
264            .collect();
265
266        instances.sort();
267        Ok(instances)
268    }
269}
270
271impl ComponentBuilder {
272    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
273        Self::default().drt(drt)
274    }
275
276    pub fn build(self) -> Result<Component, anyhow::Error> {
277        let component = self.build_internal()?;
278        // If this component is using NATS, register the NATS service and wait for completion.
279        // This prevents a race condition where serve_endpoint() tries to look up the service
280        // before it's registered in the component registry.
281        let drt = component.drt();
282        if drt.request_plane().is_nats() {
283            let mut rx = drt.register_nats_service(component.clone());
284            // Wait synchronously for the NATS service registration to complete.
285            // Uses block_in_place() to safely call blocking_recv() from async contexts.
286            // This temporarily moves the current task off the runtime thread to allow
287            // blocking without deadlocking the runtime.
288            let result = tokio::task::block_in_place(|| rx.blocking_recv());
289            match result {
290                Some(Ok(())) => {
291                    tracing::debug!(
292                        component = component.service_name(),
293                        "NATS service registration completed"
294                    );
295                }
296                Some(Err(e)) => {
297                    return Err(anyhow::anyhow!(
298                        "NATS service registration failed for component '{}': {}",
299                        component.service_name(),
300                        e
301                    ));
302                }
303                None => {
304                    return Err(anyhow::anyhow!(
305                        "NATS service registration channel closed unexpectedly for component '{}'",
306                        component.service_name()
307                    ));
308                }
309            }
310        }
311        Ok(component)
312    }
313}
314
315#[derive(Debug, Clone)]
316pub struct Endpoint {
317    component: Component,
318
319    // todo - restrict alphabet
320    /// Endpoint name
321    name: String,
322
323    /// Additional labels for metrics
324    labels: Vec<(String, String)>,
325
326    /// This hierarchy's own metrics registry
327    metrics_registry: crate::MetricsRegistry,
328}
329
330impl Hash for Endpoint {
331    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
332        self.component.hash(state);
333        self.name.hash(state);
334    }
335}
336
337impl PartialEq for Endpoint {
338    fn eq(&self, other: &Self) -> bool {
339        self.component == other.component && self.name == other.name
340    }
341}
342
343impl Eq for Endpoint {}
344
345impl DistributedRuntimeProvider for Endpoint {
346    fn drt(&self) -> &DistributedRuntime {
347        self.component.drt()
348    }
349}
350
351impl RuntimeProvider for Endpoint {
352    fn rt(&self) -> &Runtime {
353        self.component.rt()
354    }
355}
356
357impl MetricsHierarchy for Endpoint {
358    fn basename(&self) -> String {
359        self.name.clone()
360    }
361
362    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
363        let mut parents = vec![];
364
365        // Get all ancestors of component (DRT, Namespace, etc.)
366        parents.extend(self.component.parent_hierarchies());
367
368        // Add component itself
369        parents.push(&self.component as &dyn MetricsHierarchy);
370
371        parents
372    }
373
374    fn get_metrics_registry(&self) -> &MetricsRegistry {
375        &self.metrics_registry
376    }
377}
378
379impl Endpoint {
380    pub fn id(&self) -> EndpointId {
381        EndpointId {
382            namespace: self.component.namespace().name().to_string(),
383            component: self.component.name().to_string(),
384            name: self.name().to_string(),
385        }
386    }
387
388    pub fn name(&self) -> &str {
389        &self.name
390    }
391
392    pub fn component(&self) -> &Component {
393        &self.component
394    }
395
396    pub async fn client(&self) -> anyhow::Result<client::Client> {
397        client::Client::new(self.clone()).await
398    }
399
400    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
401        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
402    }
403}
404
405#[derive(Builder, Clone, Validate)]
406#[builder(pattern = "owned")]
407pub struct Namespace {
408    #[builder(private)]
409    runtime: Arc<DistributedRuntime>,
410
411    #[validate(custom(function = "validate_allowed_chars"))]
412    name: String,
413
414    #[builder(default = "None")]
415    parent: Option<Arc<Namespace>>,
416
417    /// Additional labels for metrics
418    #[builder(default = "Vec::new()")]
419    labels: Vec<(String, String)>,
420
421    /// This hierarchy's own metrics registry
422    #[builder(default = "crate::MetricsRegistry::new()")]
423    metrics_registry: crate::MetricsRegistry,
424}
425
426impl DistributedRuntimeProvider for Namespace {
427    fn drt(&self) -> &DistributedRuntime {
428        &self.runtime
429    }
430}
431
432impl std::fmt::Debug for Namespace {
433    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434        write!(
435            f,
436            "Namespace {{ name: {}; parent: {:?} }}",
437            self.name, self.parent
438        )
439    }
440}
441
442impl RuntimeProvider for Namespace {
443    fn rt(&self) -> &Runtime {
444        self.runtime.rt()
445    }
446}
447
448impl std::fmt::Display for Namespace {
449    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450        write!(f, "{}", self.name)
451    }
452}
453
454impl Namespace {
455    pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
456        Ok(NamespaceBuilder::default()
457            .runtime(Arc::new(runtime))
458            .name(name)
459            .build()?)
460    }
461
462    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
463    pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
464        ComponentBuilder::from_runtime(self.runtime.clone())
465            .name(name)
466            .namespace(self.clone())
467            .build()
468    }
469
470    /// Create a [`Namespace`] in the parent namespace
471    pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
472        Ok(NamespaceBuilder::default()
473            .runtime(self.runtime.clone())
474            .name(name.into())
475            .parent(Some(Arc::new(self.clone())))
476            .build()?)
477    }
478
479    pub fn name(&self) -> String {
480        match &self.parent {
481            Some(parent) => format!("{}.{}", parent.name(), self.name),
482            None => self.name.clone(),
483        }
484    }
485}
486
487// Custom validator function
488fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
489    // Define the allowed character set using a regex
490    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
491
492    if regex.is_match(input) {
493        Ok(())
494    } else {
495        Err(ValidationError::new("invalid_characters"))
496    }
497}