Skip to main content

dynamo_runtime/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Component] module defines the top-level API for building distributed applications.
5//!
6//! A distributed application consists of a set of [Component] that can host one
7//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
8//! that can be accessed by other [Component] in the distributed application.
9//!
10//! A [Component] is made discoverable by registering it with the distributed runtime under
11//! a [`Namespace`].
12//!
13//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
14//!
15//! We might extend namespace to include grouping behavior, which would define groups of
16//! components that are tightly coupled.
17//!
18//! A [Component] is the core building block of a distributed application. It is a logical
19//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
20//! distributed application.
21//!
22//! A [Component] can present to the distributed application one or more configuration files
23//! which define how that component was constructed/configured and what capabilities it can
24//! provide.
25//!
26//! Other [Component] can write to watching locations within a [Component] etcd
27//! path. This allows the [Component] to take dynamic actions depending on the watch
28//! triggers.
29//!
30//! TODO: Top-level Overview of Endpoints/Functions
31
32use std::fmt;
33
34use crate::{
35    config::HealthStatus,
36    distributed::RequestPlaneMode,
37    metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38    service::ServiceClient,
39    service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47    rustls::quic,
48    service::{Service, ServiceExt},
49};
50use derive_builder::Builder;
51use derive_getters::Getters;
52use educe::Educe;
53use serde::{Deserialize, Serialize};
54use std::{collections::HashMap, hash::Hash, sync::Arc};
55use validator::{Validate, ValidationError};
56
57mod client;
58#[allow(clippy::module_inception)]
59mod component;
60mod endpoint;
61mod namespace;
62mod registry;
63pub mod service;
64
65pub use client::Client;
66pub use endpoint::build_transport_type;
67
68#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
69#[serde(rename_all = "snake_case")]
70pub enum TransportType {
71    #[serde(rename = "nats_tcp")]
72    Nats(String),
73    Http(String),
74    Tcp(String),
75}
76
77#[derive(Default)]
78pub struct RegistryInner {
79    pub(crate) services: HashMap<String, Service>,
80}
81
82#[derive(Clone)]
83pub struct Registry {
84    pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
88pub struct Instance {
89    pub component: String,
90    pub endpoint: String,
91    pub namespace: String,
92    pub instance_id: u64,
93    pub transport: TransportType,
94}
95
96impl Instance {
97    pub fn id(&self) -> u64 {
98        self.instance_id
99    }
100    pub fn endpoint_id(&self) -> EndpointId {
101        EndpointId {
102            namespace: self.namespace.clone(),
103            component: self.component.clone(),
104            name: self.endpoint.clone(),
105        }
106    }
107}
108
109impl fmt::Display for Instance {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        write!(
112            f,
113            "{}/{}/{}/{}",
114            self.namespace, self.component, self.endpoint, self.instance_id
115        )
116    }
117}
118
119/// Sort by string name
120impl std::cmp::Ord for Instance {
121    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
122        self.to_string().cmp(&other.to_string())
123    }
124}
125
126impl PartialOrd for Instance {
127    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
128        // Since Ord is fully implemented, the comparison is always total.
129        Some(self.cmp(other))
130    }
131}
132
133/// A [Component] a discoverable entity in the distributed runtime.
134/// You can host [Endpoint] on a [Component] by first creating
135/// a [Service] then adding one or more [Endpoint] to the [Service].
136///
137/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
138#[derive(Educe, Builder, Clone, Validate)]
139#[educe(Debug)]
140#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
141pub struct Component {
142    #[builder(private)]
143    #[educe(Debug(ignore))]
144    drt: Arc<DistributedRuntime>,
145
146    /// Name of the component
147    #[builder(setter(into))]
148    #[validate(custom(function = "validate_allowed_chars"))]
149    name: String,
150
151    /// Additional labels for metrics
152    #[builder(default = "Vec::new()")]
153    labels: Vec<(String, String)>,
154
155    // todo - restrict the namespace to a-z0-9-_A-Z
156    /// Namespace
157    #[builder(setter(into))]
158    namespace: Namespace,
159
160    /// This hierarchy's own metrics registry
161    #[builder(default = "crate::MetricsRegistry::new()")]
162    metrics_registry: crate::MetricsRegistry,
163}
164
165impl Hash for Component {
166    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
167        self.namespace.name().hash(state);
168        self.name.hash(state);
169    }
170}
171
172impl PartialEq for Component {
173    fn eq(&self, other: &Self) -> bool {
174        self.namespace.name() == other.namespace.name() && self.name == other.name
175    }
176}
177
178impl Eq for Component {}
179
180impl std::fmt::Display for Component {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        write!(f, "{}.{}", self.namespace.name(), self.name)
183    }
184}
185
186impl DistributedRuntimeProvider for Component {
187    fn drt(&self) -> &DistributedRuntime {
188        &self.drt
189    }
190}
191
192impl RuntimeProvider for Component {
193    fn rt(&self) -> &Runtime {
194        self.drt.rt()
195    }
196}
197
198impl MetricsHierarchy for Component {
199    fn basename(&self) -> String {
200        self.name.clone()
201    }
202
203    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
204        let mut parents = vec![];
205
206        // Get all ancestors of namespace (DRT, parent namespaces, etc.)
207        parents.extend(self.namespace.parent_hierarchies());
208
209        // Add namespace itself
210        parents.push(&self.namespace as &dyn MetricsHierarchy);
211
212        parents
213    }
214
215    fn get_metrics_registry(&self) -> &MetricsRegistry {
216        &self.metrics_registry
217    }
218}
219
220impl Component {
221    pub fn service_name(&self) -> String {
222        let service_name = format!("{}_{}", self.namespace.name(), self.name);
223        Slug::slugify(&service_name).to_string()
224    }
225
226    pub fn namespace(&self) -> &Namespace {
227        &self.namespace
228    }
229
230    pub fn name(&self) -> &str {
231        &self.name
232    }
233
234    pub fn labels(&self) -> &[(String, String)] {
235        &self.labels
236    }
237
238    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
239        let endpoint = Endpoint {
240            component: self.clone(),
241            name: endpoint.into(),
242            labels: Vec::new(),
243            metrics_registry: crate::MetricsRegistry::new(),
244        };
245        // Attach endpoint registry so scrapes traverse separate registries (avoids collisions).
246        self.get_metrics_registry()
247            .add_child_registry(endpoint.get_metrics_registry());
248        endpoint
249    }
250
251    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
252        let discovery = self.drt.discovery();
253
254        let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
255            namespace: self.namespace.name(),
256            component: self.name.clone(),
257        };
258
259        let discovery_instances = discovery.list(discovery_query).await?;
260
261        // Extract Instance from DiscoveryInstance::Endpoint wrapper
262        let mut instances: Vec<Instance> = discovery_instances
263            .into_iter()
264            .filter_map(|di| match di {
265                crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
266                _ => None, // Ignore all other variants (ModelCard, etc.)
267            })
268            .collect();
269
270        instances.sort();
271        Ok(instances)
272    }
273}
274
275impl ComponentBuilder {
276    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
277        Self::default().drt(drt)
278    }
279
280    pub fn build(self) -> Result<Component, anyhow::Error> {
281        let component = self.build_internal()?;
282        // If this component is using NATS, register the NATS service and wait for completion.
283        // This prevents a race condition where serve_endpoint() tries to look up the service
284        // before it's registered in the component registry.
285        let drt = component.drt();
286        if drt.request_plane().is_nats() {
287            let mut rx = drt.register_nats_service(component.clone());
288            // Wait synchronously for the NATS service registration to complete.
289            // Uses block_in_place() to safely call blocking_recv() from async contexts.
290            // This temporarily moves the current task off the runtime thread to allow
291            // blocking without deadlocking the runtime.
292            let result = tokio::task::block_in_place(|| rx.blocking_recv());
293            match result {
294                Some(Ok(())) => {
295                    tracing::debug!(
296                        component = component.service_name(),
297                        "NATS service registration completed"
298                    );
299                }
300                Some(Err(e)) => {
301                    return Err(anyhow::anyhow!(
302                        "NATS service registration failed for component '{}': {}",
303                        component.service_name(),
304                        e
305                    ));
306                }
307                None => {
308                    return Err(anyhow::anyhow!(
309                        "NATS service registration channel closed unexpectedly for component '{}'",
310                        component.service_name()
311                    ));
312                }
313            }
314        }
315        Ok(component)
316    }
317}
318
319#[derive(Debug, Clone)]
320pub struct Endpoint {
321    component: Component,
322
323    // todo - restrict alphabet
324    /// Endpoint name
325    name: String,
326
327    /// Additional labels for metrics
328    labels: Vec<(String, String)>,
329
330    /// This hierarchy's own metrics registry
331    metrics_registry: crate::MetricsRegistry,
332}
333
334impl Hash for Endpoint {
335    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
336        self.component.hash(state);
337        self.name.hash(state);
338    }
339}
340
341impl PartialEq for Endpoint {
342    fn eq(&self, other: &Self) -> bool {
343        self.component == other.component && self.name == other.name
344    }
345}
346
347impl Eq for Endpoint {}
348
349impl DistributedRuntimeProvider for Endpoint {
350    fn drt(&self) -> &DistributedRuntime {
351        self.component.drt()
352    }
353}
354
355impl RuntimeProvider for Endpoint {
356    fn rt(&self) -> &Runtime {
357        self.component.rt()
358    }
359}
360
361impl MetricsHierarchy for Endpoint {
362    fn basename(&self) -> String {
363        self.name.clone()
364    }
365
366    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
367        let mut parents = vec![];
368
369        // Get all ancestors of component (DRT, Namespace, etc.)
370        parents.extend(self.component.parent_hierarchies());
371
372        // Add component itself
373        parents.push(&self.component as &dyn MetricsHierarchy);
374
375        parents
376    }
377
378    fn get_metrics_registry(&self) -> &MetricsRegistry {
379        &self.metrics_registry
380    }
381}
382
383impl Endpoint {
384    pub fn id(&self) -> EndpointId {
385        EndpointId {
386            namespace: self.component.namespace().name().to_string(),
387            component: self.component.name().to_string(),
388            name: self.name().to_string(),
389        }
390    }
391
392    pub fn name(&self) -> &str {
393        &self.name
394    }
395
396    pub fn component(&self) -> &Component {
397        &self.component
398    }
399
400    pub async fn client(&self) -> anyhow::Result<client::Client> {
401        client::Client::new(self.clone()).await
402    }
403
404    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
405        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
406    }
407}
408
409#[derive(Builder, Clone, Validate)]
410#[builder(pattern = "owned")]
411pub struct Namespace {
412    #[builder(private)]
413    runtime: Arc<DistributedRuntime>,
414
415    #[validate(custom(function = "validate_allowed_chars"))]
416    name: String,
417
418    #[builder(default = "None")]
419    parent: Option<Arc<Namespace>>,
420
421    /// Additional labels for metrics
422    #[builder(default = "Vec::new()")]
423    labels: Vec<(String, String)>,
424
425    /// This hierarchy's own metrics registry
426    #[builder(default = "crate::MetricsRegistry::new()")]
427    metrics_registry: crate::MetricsRegistry,
428}
429
430impl DistributedRuntimeProvider for Namespace {
431    fn drt(&self) -> &DistributedRuntime {
432        &self.runtime
433    }
434}
435
436impl std::fmt::Debug for Namespace {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        write!(
439            f,
440            "Namespace {{ name: {}; parent: {:?} }}",
441            self.name, self.parent
442        )
443    }
444}
445
446impl RuntimeProvider for Namespace {
447    fn rt(&self) -> &Runtime {
448        self.runtime.rt()
449    }
450}
451
452impl std::fmt::Display for Namespace {
453    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454        write!(f, "{}", self.name)
455    }
456}
457
458impl Namespace {
459    pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
460        let ns = NamespaceBuilder::default()
461            .runtime(Arc::new(runtime))
462            .name(name)
463            .build()?;
464        // Attach namespace registry so scrapes traverse separate registries (avoids collisions).
465        ns.drt()
466            .get_metrics_registry()
467            .add_child_registry(ns.get_metrics_registry());
468        Ok(ns)
469    }
470
471    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
472    pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
473        let component = ComponentBuilder::from_runtime(self.runtime.clone())
474            .name(name)
475            .namespace(self.clone())
476            .build()?;
477        // Attach component registry so scrapes traverse separate registries (avoids collisions).
478        self.get_metrics_registry()
479            .add_child_registry(component.get_metrics_registry());
480        Ok(component)
481    }
482
483    /// Create a [`Namespace`] in the parent namespace
484    pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
485        let child = NamespaceBuilder::default()
486            .runtime(self.runtime.clone())
487            .name(name.into())
488            .parent(Some(Arc::new(self.clone())))
489            .build()?;
490        // Attach child namespace registry so scrapes traverse separate registries (avoids collisions).
491        self.get_metrics_registry()
492            .add_child_registry(child.get_metrics_registry());
493        Ok(child)
494    }
495
496    pub fn name(&self) -> String {
497        match &self.parent {
498            Some(parent) => format!("{}.{}", parent.name(), self.name),
499            None => self.name.clone(),
500        }
501    }
502}
503
504// Custom validator function
505fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
506    // Define the allowed character set using a regex
507    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
508
509    if regex.is_match(input) {
510        Ok(())
511    } else {
512        Err(ValidationError::new("invalid_characters"))
513    }
514}