Skip to main content

dynamo_runtime/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Component] module defines the top-level API for building distributed applications.
5//!
6//! A distributed application consists of a set of [Component] that can host one
7//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
8//! that can be accessed by other [Component] in the distributed application.
9//!
10//! A [Component] is made discoverable by registering it with the distributed runtime under
11//! a [`Namespace`].
12//!
13//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
14//!
15//! We might extend namespace to include grouping behavior, which would define groups of
16//! components that are tightly coupled.
17//!
18//! A [Component] is the core building block of a distributed application. It is a logical
19//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
20//! distributed application.
21//!
22//! A [Component] can present to the distributed application one or more configuration files
23//! which define how that component was constructed/configured and what capabilities it can
24//! provide.
25//!
26//! Other [Component] can write to watching locations within a [Component] etcd
27//! path. This allows the [Component] to take dynamic actions depending on the watch
28//! triggers.
29//!
30//! TODO: Top-level Overview of Endpoints/Functions
31
32use std::fmt;
33
34use crate::{
35    config::HealthStatus,
36    distributed::RequestPlaneMode,
37    metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38    service::ServiceClient,
39    service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47    rustls::quic,
48    service::{Service, ServiceExt},
49};
50use dashmap::DashMap;
51use derive_builder::Builder;
52use derive_getters::Getters;
53use educe::Educe;
54use serde::{Deserialize, Serialize};
55use std::{collections::HashMap, hash::Hash, sync::Arc};
56use validator::{Validate, ValidationError};
57
58mod client;
59#[allow(clippy::module_inception)]
60mod component;
61mod endpoint;
62mod namespace;
63mod registry;
64pub mod service;
65
66pub use client::Client;
67pub use endpoint::build_transport_type;
68
69#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
70#[serde(rename_all = "snake_case")]
71pub enum TransportType {
72    #[serde(rename = "nats_tcp")]
73    Nats(String),
74    Http(String),
75    Tcp(String),
76}
77
78#[derive(Default)]
79pub struct RegistryInner {
80    pub(crate) services: HashMap<String, Service>,
81}
82
83#[derive(Clone)]
84pub struct Registry {
85    pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
89pub struct Instance {
90    pub component: String,
91    pub endpoint: String,
92    pub namespace: String,
93    pub instance_id: u64,
94    pub transport: TransportType,
95}
96
97impl Instance {
98    pub fn id(&self) -> u64 {
99        self.instance_id
100    }
101    pub fn endpoint_id(&self) -> EndpointId {
102        EndpointId {
103            namespace: self.namespace.clone(),
104            component: self.component.clone(),
105            name: self.endpoint.clone(),
106        }
107    }
108}
109
110impl fmt::Display for Instance {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        write!(
113            f,
114            "{}/{}/{}/{}",
115            self.namespace, self.component, self.endpoint, self.instance_id
116        )
117    }
118}
119
120/// Sort by string name
121impl std::cmp::Ord for Instance {
122    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
123        self.to_string().cmp(&other.to_string())
124    }
125}
126
127impl PartialOrd for Instance {
128    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
129        // Since Ord is fully implemented, the comparison is always total.
130        Some(self.cmp(other))
131    }
132}
133
134/// A [Component] a discoverable entity in the distributed runtime.
135/// You can host [Endpoint] on a [Component] by first creating
136/// a [Service] then adding one or more [Endpoint] to the [Service].
137///
138/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
139#[derive(Educe, Builder, Clone, Validate)]
140#[educe(Debug)]
141#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
142pub struct Component {
143    #[builder(private)]
144    #[educe(Debug(ignore))]
145    drt: Arc<DistributedRuntime>,
146
147    /// Name of the component
148    #[builder(setter(into))]
149    #[validate(custom(function = "validate_allowed_chars"))]
150    name: String,
151
152    /// Additional labels for metrics
153    #[builder(default = "Vec::new()")]
154    labels: Vec<(String, String)>,
155
156    // todo - restrict the namespace to a-z0-9-_A-Z
157    /// Namespace
158    #[builder(setter(into))]
159    namespace: Namespace,
160
161    /// This hierarchy's own metrics registry
162    #[builder(default = "crate::MetricsRegistry::new()")]
163    metrics_registry: crate::MetricsRegistry,
164}
165
166impl Hash for Component {
167    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
168        self.namespace.name().hash(state);
169        self.name.hash(state);
170    }
171}
172
173impl PartialEq for Component {
174    fn eq(&self, other: &Self) -> bool {
175        self.namespace.name() == other.namespace.name() && self.name == other.name
176    }
177}
178
179impl Eq for Component {}
180
181impl std::fmt::Display for Component {
182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        write!(f, "{}.{}", self.namespace.name(), self.name)
184    }
185}
186
187impl DistributedRuntimeProvider for Component {
188    fn drt(&self) -> &DistributedRuntime {
189        &self.drt
190    }
191}
192
193impl RuntimeProvider for Component {
194    fn rt(&self) -> &Runtime {
195        self.drt.rt()
196    }
197}
198
199impl MetricsHierarchy for Component {
200    fn basename(&self) -> String {
201        self.name.clone()
202    }
203
204    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
205        let mut parents = vec![];
206
207        // Get all ancestors of namespace (DRT, parent namespaces, etc.)
208        parents.extend(self.namespace.parent_hierarchies());
209
210        // Add namespace itself
211        parents.push(&self.namespace as &dyn MetricsHierarchy);
212
213        parents
214    }
215
216    fn get_metrics_registry(&self) -> &MetricsRegistry {
217        &self.metrics_registry
218    }
219}
220
221impl Component {
222    pub fn service_name(&self) -> String {
223        let service_name = format!("{}_{}", self.namespace.name(), self.name);
224        Slug::slugify(&service_name).to_string()
225    }
226
227    pub fn namespace(&self) -> &Namespace {
228        &self.namespace
229    }
230
231    pub fn name(&self) -> &str {
232        &self.name
233    }
234
235    pub fn labels(&self) -> &[(String, String)] {
236        &self.labels
237    }
238
239    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
240        let endpoint = Endpoint {
241            component: self.clone(),
242            name: endpoint.into(),
243            labels: Vec::new(),
244            metrics_registry: crate::MetricsRegistry::new(),
245        };
246        // Attach endpoint registry so scrapes traverse separate registries (avoids collisions).
247        self.get_metrics_registry()
248            .add_child_registry(endpoint.get_metrics_registry());
249        endpoint
250    }
251
252    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
253        let discovery = self.drt.discovery();
254
255        let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
256            namespace: self.namespace.name(),
257            component: self.name.clone(),
258        };
259
260        let discovery_instances = discovery.list(discovery_query).await?;
261
262        // Extract Instance from DiscoveryInstance::Endpoint wrapper
263        let mut instances: Vec<Instance> = discovery_instances
264            .into_iter()
265            .filter_map(|di| match di {
266                crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
267                _ => None, // Ignore all other variants (ModelCard, etc.)
268            })
269            .collect();
270
271        instances.sort();
272        Ok(instances)
273    }
274}
275
276impl ComponentBuilder {
277    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
278        Self::default().drt(drt)
279    }
280
281    pub fn build(self) -> Result<Component, anyhow::Error> {
282        let component = self.build_internal()?;
283        // If this component is using NATS, register the NATS service and wait for completion.
284        // This prevents a race condition where serve_endpoint() tries to look up the service
285        // before it's registered in the component registry.
286        let drt = component.drt();
287        if drt.request_plane().is_nats() {
288            let mut rx = drt.register_nats_service(component.clone());
289            // Wait synchronously for the NATS service registration to complete.
290            // Uses block_in_place() to safely call blocking_recv() from async contexts.
291            // This temporarily moves the current task off the runtime thread to allow
292            // blocking without deadlocking the runtime.
293            let result = tokio::task::block_in_place(|| rx.blocking_recv());
294            match result {
295                Some(Ok(())) => {
296                    tracing::debug!(
297                        component = component.service_name(),
298                        "NATS service registration completed"
299                    );
300                }
301                Some(Err(e)) => {
302                    return Err(anyhow::anyhow!(
303                        "NATS service registration failed for component '{}': {}",
304                        component.service_name(),
305                        e
306                    ));
307                }
308                None => {
309                    return Err(anyhow::anyhow!(
310                        "NATS service registration channel closed unexpectedly for component '{}'",
311                        component.service_name()
312                    ));
313                }
314            }
315        }
316        Ok(component)
317    }
318}
319
320#[derive(Debug, Clone)]
321pub struct Endpoint {
322    component: Component,
323
324    // todo - restrict alphabet
325    /// Endpoint name
326    name: String,
327
328    /// Additional labels for metrics
329    labels: Vec<(String, String)>,
330
331    /// This hierarchy's own metrics registry
332    metrics_registry: crate::MetricsRegistry,
333}
334
335impl Hash for Endpoint {
336    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
337        self.component.hash(state);
338        self.name.hash(state);
339    }
340}
341
342impl PartialEq for Endpoint {
343    fn eq(&self, other: &Self) -> bool {
344        self.component == other.component && self.name == other.name
345    }
346}
347
348impl Eq for Endpoint {}
349
350impl DistributedRuntimeProvider for Endpoint {
351    fn drt(&self) -> &DistributedRuntime {
352        self.component.drt()
353    }
354}
355
356impl RuntimeProvider for Endpoint {
357    fn rt(&self) -> &Runtime {
358        self.component.rt()
359    }
360}
361
362impl MetricsHierarchy for Endpoint {
363    fn basename(&self) -> String {
364        self.name.clone()
365    }
366
367    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
368        let mut parents = vec![];
369
370        // Get all ancestors of component (DRT, Namespace, etc.)
371        parents.extend(self.component.parent_hierarchies());
372
373        // Add component itself
374        parents.push(&self.component as &dyn MetricsHierarchy);
375
376        parents
377    }
378
379    fn get_metrics_registry(&self) -> &MetricsRegistry {
380        &self.metrics_registry
381    }
382}
383
384impl Endpoint {
385    pub fn id(&self) -> EndpointId {
386        EndpointId {
387            namespace: self.component.namespace().name().to_string(),
388            component: self.component.name().to_string(),
389            name: self.name().to_string(),
390        }
391    }
392
393    pub fn name(&self) -> &str {
394        &self.name
395    }
396
397    pub fn component(&self) -> &Component {
398        &self.component
399    }
400
401    pub async fn client(&self) -> anyhow::Result<client::Client> {
402        client::Client::new(self.clone()).await
403    }
404
405    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
406        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
407    }
408}
409
410#[derive(Builder, Clone, Validate)]
411#[builder(pattern = "owned")]
412pub struct Namespace {
413    #[builder(private)]
414    runtime: Arc<DistributedRuntime>,
415
416    #[validate(custom(function = "validate_allowed_chars"))]
417    name: String,
418
419    #[builder(default = "None")]
420    parent: Option<Arc<Namespace>>,
421
422    /// Additional labels for metrics
423    #[builder(default = "Vec::new()")]
424    labels: Vec<(String, String)>,
425
426    /// This hierarchy's own metrics registry
427    #[builder(default = "crate::MetricsRegistry::new()")]
428    metrics_registry: crate::MetricsRegistry,
429
430    /// Cache for components to avoid duplicate registrations and metrics collisions.
431    /// When the same component is requested multiple times, we return the cached instance
432    /// to ensure all endpoints share the same Component and MetricsRegistry.
433    /// Uses DashMap for lock-free reads and automatic handling of concurrent inserts.
434    #[builder(default = "Arc::new(DashMap::new())")]
435    component_cache: Arc<DashMap<String, Component>>,
436}
437
438impl DistributedRuntimeProvider for Namespace {
439    fn drt(&self) -> &DistributedRuntime {
440        &self.runtime
441    }
442}
443
444impl std::fmt::Debug for Namespace {
445    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446        write!(
447            f,
448            "Namespace {{ name: {}; parent: {:?} }}",
449            self.name, self.parent
450        )
451    }
452}
453
454impl RuntimeProvider for Namespace {
455    fn rt(&self) -> &Runtime {
456        self.runtime.rt()
457    }
458}
459
460impl std::fmt::Display for Namespace {
461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462        write!(f, "{}", self.name)
463    }
464}
465
466impl Namespace {
467    pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
468        let ns = NamespaceBuilder::default()
469            .runtime(Arc::new(runtime))
470            .name(name)
471            .build()?;
472        // Attach namespace registry so scrapes traverse separate registries (avoids collisions).
473        ns.drt()
474            .get_metrics_registry()
475            .add_child_registry(ns.get_metrics_registry());
476        Ok(ns)
477    }
478
479    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
480    ///
481    /// Components are cached by name to ensure that multiple calls with the same name
482    /// return the same Component instance. This prevents duplicate metrics registrations
483    /// and ensures all endpoints share the same Component's MetricsRegistry.
484    pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
485        let name = name.into();
486
487        // Fast path: Check if component exists in cache
488        // DashMap provides lock-free reads via internal sharding
489        if let Some(cached) = self.component_cache.get(&name) {
490            return Ok(cached.value().clone());
491        }
492
493        // Slow path: Create new component
494        let component = ComponentBuilder::from_runtime(self.runtime.clone())
495            .name(&name)
496            .namespace(self.clone())
497            .build()?;
498
499        // Attach component registry so scrapes traverse separate registries (avoids collisions).
500        self.get_metrics_registry()
501            .add_child_registry(component.get_metrics_registry());
502
503        // Cache the component for future calls
504        // DashMap handles race conditions internally - if another thread
505        // inserted the same key concurrently, we just use our created component
506        self.component_cache.insert(name, component.clone());
507
508        Ok(component)
509    }
510
511    /// Create a [`Namespace`] in the parent namespace
512    pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
513        let child = NamespaceBuilder::default()
514            .runtime(self.runtime.clone())
515            .name(name.into())
516            .parent(Some(Arc::new(self.clone())))
517            .build()?;
518        // Attach child namespace registry so scrapes traverse separate registries (avoids collisions).
519        self.get_metrics_registry()
520            .add_child_registry(child.get_metrics_registry());
521        Ok(child)
522    }
523
524    pub fn name(&self) -> String {
525        match &self.parent {
526            Some(parent) => format!("{}.{}", parent.name(), self.name),
527            None => self.name.clone(),
528        }
529    }
530}
531
532// Custom validator function
533fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
534    // Define the allowed character set using a regex
535    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
536
537    if regex.is_match(input) {
538        Ok(())
539    } else {
540        Err(ValidationError::new("invalid_characters"))
541    }
542}