dynamo_runtime/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Component] module defines the top-level API for building distributed applications.
5//!
6//! A distributed application consists of a set of [Component] that can host one
7//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
8//! that can be accessed by other [Component] in the distributed application.
9//!
10//! A [Component] is made discoverable by registering it with the distributed runtime under
11//! a [`Namespace`].
12//!
13//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
14//!
15//! We might extend namespace to include grouping behavior, which would define groups of
16//! components that are tightly coupled.
17//!
18//! A [Component] is the core building block of a distributed application. It is a logical
19//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
20//! distributed application.
21//!
22//! A [Component] can present to the distributed application one or more configuration files
23//! which define how that component was constructed/configured and what capabilities it can
24//! provide.
25//!
26//! Other [Component] can write to watching locations within a [Component] etcd
27//! path. This allows the [Component] to take dynamic actions depending on the watch
28//! triggers.
29//!
30//! TODO: Top-level Overview of Endpoints/Functions
31
32use crate::{
33    config::HealthStatus,
34    discovery::Lease,
35    metrics::{prometheus_names, MetricsRegistry},
36    service::ServiceSet,
37    transports::etcd::EtcdPath,
38};
39
40use super::{
41    error,
42    traits::*,
43    transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
44    transports::nats::Slug,
45    utils::Duration,
46    DistributedRuntime, Result, Runtime,
47};
48
49use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
50use crate::protocols::Endpoint as EndpointId;
51use crate::service::ComponentNatsServerPrometheusMetrics;
52use async_nats::{
53    rustls::quic,
54    service::{Service, ServiceExt},
55};
56use derive_builder::Builder;
57use derive_getters::Getters;
58use educe::Educe;
59use serde::{Deserialize, Serialize};
60use service::EndpointStatsHandler;
61use std::{collections::HashMap, hash::Hash, sync::Arc};
62use validator::{Validate, ValidationError};
63
64mod client;
65#[allow(clippy::module_inception)]
66mod component;
67mod endpoint;
68mod namespace;
69mod registry;
70pub mod service;
71
72pub use client::{Client, InstanceSource};
73
74/// The root etcd path where each instance registers itself in etcd.
75/// An instance is namespace+component+endpoint+lease_id and must be unique.
76pub const INSTANCE_ROOT_PATH: &str = "instances";
77
78/// The root etcd path where each namespace is registered in etcd.
79pub const ETCD_ROOT_PATH: &str = "dynamo://";
80
81#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
82#[serde(rename_all = "snake_case")]
83pub enum TransportType {
84    NatsTcp(String),
85}
86
87#[derive(Default)]
88pub struct RegistryInner {
89    services: HashMap<String, Service>,
90    stats_handlers: HashMap<String, Arc<std::sync::Mutex<HashMap<String, EndpointStatsHandler>>>>,
91}
92
93#[derive(Clone)]
94pub struct Registry {
95    inner: Arc<tokio::sync::Mutex<RegistryInner>>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct Instance {
100    pub component: String,
101    pub endpoint: String,
102    pub namespace: String,
103    pub instance_id: i64,
104    pub transport: TransportType,
105}
106
107impl Instance {
108    pub fn id(&self) -> i64 {
109        self.instance_id
110    }
111}
112
113/// A [Component] a discoverable entity in the distributed runtime.
114/// You can host [Endpoint] on a [Component] by first creating
115/// a [Service] then adding one or more [Endpoint] to the [Service].
116///
117/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
118#[derive(Educe, Builder, Clone, Validate)]
119#[educe(Debug)]
120#[builder(pattern = "owned")]
121pub struct Component {
122    #[builder(private)]
123    #[educe(Debug(ignore))]
124    drt: Arc<DistributedRuntime>,
125
126    // todo - restrict the namespace to a-z0-9-_A-Z
127    /// Name of the component
128    #[builder(setter(into))]
129    #[validate(custom(function = "validate_allowed_chars"))]
130    name: String,
131
132    /// Additional labels for metrics
133    #[builder(default = "Vec::new()")]
134    labels: Vec<(String, String)>,
135
136    // todo - restrict the namespace to a-z0-9-_A-Z
137    /// Namespace
138    #[builder(setter(into))]
139    namespace: Namespace,
140
141    // A static component's endpoints cannot be discovered via etcd, they are
142    // fixed at startup time.
143    is_static: bool,
144}
145
146impl Hash for Component {
147    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
148        self.namespace.name().hash(state);
149        self.name.hash(state);
150        self.is_static.hash(state);
151    }
152}
153
154impl PartialEq for Component {
155    fn eq(&self, other: &Self) -> bool {
156        self.namespace.name() == other.namespace.name()
157            && self.name == other.name
158            && self.is_static == other.is_static
159    }
160}
161
162impl Eq for Component {}
163
164impl std::fmt::Display for Component {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        write!(f, "{}.{}", self.namespace.name(), self.name)
167    }
168}
169
170impl DistributedRuntimeProvider for Component {
171    fn drt(&self) -> &DistributedRuntime {
172        &self.drt
173    }
174}
175
176impl RuntimeProvider for Component {
177    fn rt(&self) -> &Runtime {
178        self.drt.rt()
179    }
180}
181
182impl MetricsRegistry for Component {
183    fn basename(&self) -> String {
184        self.name.clone()
185    }
186
187    fn parent_hierarchy(&self) -> Vec<String> {
188        [
189            self.namespace.parent_hierarchy(),
190            vec![self.namespace.basename()],
191        ]
192        .concat()
193    }
194}
195
196impl Component {
197    /// The component part of an instance path in etcd.
198    pub fn etcd_root(&self) -> String {
199        let ns = self.namespace.name();
200        let cp = &self.name;
201        format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
202    }
203
204    pub fn service_name(&self) -> String {
205        let service_name = format!("{}_{}", self.namespace.name(), self.name);
206        Slug::slugify(&service_name).to_string()
207    }
208
209    pub fn path(&self) -> String {
210        format!("{}/{}", self.namespace.name(), self.name)
211    }
212
213    pub fn etcd_path(&self) -> EtcdPath {
214        EtcdPath::new_component(&self.namespace.name(), &self.name)
215            .expect("Component name and namespace should be valid")
216    }
217
218    pub fn namespace(&self) -> &Namespace {
219        &self.namespace
220    }
221
222    pub fn name(&self) -> String {
223        self.name.clone()
224    }
225
226    pub fn labels(&self) -> &[(String, String)] {
227        &self.labels
228    }
229
230    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
231        Endpoint {
232            component: self.clone(),
233            name: endpoint.into(),
234            is_static: self.is_static,
235            labels: Vec::new(),
236        }
237    }
238
239    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
240        let Some(etcd_client) = self.drt.etcd_client() else {
241            return Ok(vec![]);
242        };
243        let mut out = vec![];
244        // The extra slash is important to only list exact component matches, not substrings.
245        for kv in etcd_client
246            .kv_get_prefix(format!("{}/", self.etcd_root()))
247            .await?
248        {
249            let val = match serde_json::from_slice::<Instance>(kv.value()) {
250                Ok(val) => val,
251                Err(err) => {
252                    anyhow::bail!(
253                        "Error converting etcd response to Instance: {err}. {}",
254                        kv.value_str()?
255                    );
256                }
257            };
258            out.push(val);
259        }
260        Ok(out)
261    }
262
263    /// Scrape ServiceSet, which contains NATS stats as well as user defined stats
264    /// embedded in data field of ServiceInfo.
265    pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
266        // Debug: scraping stats for component
267        let service_name = self.service_name();
268        let service_client = self.drt().service_client();
269        service_client
270            .collect_services(&service_name, timeout)
271            .await
272    }
273
274    /// Add Prometheus metrics for this component's NATS service stats.
275    ///
276    /// Starts a background task that periodically requests service statistics from NATS
277    /// and updates the corresponding Prometheus metrics. The scraping interval is set to
278    /// approximately 873ms (MAX_DELAY_MS), which is arbitrary but any value less than a second
279    /// is fair game. This frequent scraping provides real-time service statistics updates.
280    pub fn start_scraping_nats_service_component_metrics(&self) -> Result<()> {
281        const NATS_TIMEOUT_AND_INITIAL_DELAY_MS: std::time::Duration =
282            std::time::Duration::from_millis(300);
283        const MAX_DELAY_MS: std::time::Duration = std::time::Duration::from_millis(873);
284
285        // If there is another component with the same service name, this will fail.
286        let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
287
288        let component_clone = self.clone();
289        let mut hierarchies = self.parent_hierarchy();
290        hierarchies.push(self.hierarchy());
291        debug_assert!(hierarchies
292            .last()
293            .map(|x| x.as_str())
294            .unwrap_or_default()
295            .eq_ignore_ascii_case(&self.service_name())); // it happens that in component, hierarchy and service name are the same
296
297        // Start a background task that scrapes stats every 5 seconds
298        let m = component_metrics.clone();
299        let c = component_clone.clone();
300
301        // Use the DRT's runtime handle to spawn the background task.
302        // We cannot use regular `tokio::spawn` here because:
303        // 1. This method may be called from contexts without an active Tokio runtime
304        //    (e.g., tests that create a DRT in a blocking context)
305        // 2. Tests often create a temporary runtime just to build the DRT, then drop it
306        // 3. `tokio::spawn` requires being called from within a runtime context
307        // By using the DRT's own runtime handle, we ensure the task runs in the
308        // correct runtime that will persist for the lifetime of the component.
309        c.drt().runtime().secondary().spawn(async move {
310            let timeout = NATS_TIMEOUT_AND_INITIAL_DELAY_MS;
311            let mut interval = tokio::time::interval(MAX_DELAY_MS);
312            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
313
314            loop {
315                match c.scrape_stats(timeout).await {
316                    Ok(service_set) => {
317                        m.update_from_service_set(&service_set);
318                    }
319                    Err(err) => {
320                        tracing::error!(
321                            "Background scrape failed for {}: {}",
322                            c.service_name(),
323                            err
324                        );
325                        m.reset_to_zeros();
326                    }
327                }
328                interval.tick().await;
329            }
330        });
331
332        Ok(())
333    }
334
335    /// TODO
336    ///
337    /// This method will scrape the stats for all available services
338    /// Returns a stream of `ServiceInfo` objects.
339    /// This should be consumed by a `[tokio::time::timeout_at`] because each services
340    /// will only respond once, but there is no way to know when all services have responded.
341    pub async fn stats_stream(&self) -> Result<()> {
342        unimplemented!("collect_stats")
343    }
344
345    pub fn service_builder(&self) -> service::ServiceConfigBuilder {
346        service::ServiceConfigBuilder::from_component(self.clone())
347    }
348}
349
350impl ComponentBuilder {
351    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
352        Self::default().drt(drt)
353    }
354}
355
356#[derive(Debug, Clone)]
357pub struct Endpoint {
358    component: Component,
359
360    // todo - restrict alphabet
361    /// Endpoint name
362    name: String,
363
364    is_static: bool,
365
366    /// Additional labels for metrics
367    labels: Vec<(String, String)>,
368}
369
370impl Hash for Endpoint {
371    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
372        self.component.hash(state);
373        self.name.hash(state);
374        self.is_static.hash(state);
375    }
376}
377
378impl PartialEq for Endpoint {
379    fn eq(&self, other: &Self) -> bool {
380        self.component == other.component
381            && self.name == other.name
382            && self.is_static == other.is_static
383    }
384}
385
386impl Eq for Endpoint {}
387
388impl DistributedRuntimeProvider for Endpoint {
389    fn drt(&self) -> &DistributedRuntime {
390        self.component.drt()
391    }
392}
393
394impl RuntimeProvider for Endpoint {
395    fn rt(&self) -> &Runtime {
396        self.component.rt()
397    }
398}
399
400impl MetricsRegistry for Endpoint {
401    fn basename(&self) -> String {
402        self.name.clone()
403    }
404
405    fn parent_hierarchy(&self) -> Vec<String> {
406        [
407            self.component.parent_hierarchy(),
408            vec![self.component.basename()],
409        ]
410        .concat()
411    }
412}
413
414impl Endpoint {
415    pub fn id(&self) -> EndpointId {
416        EndpointId {
417            namespace: self.component.namespace().name().to_string(),
418            component: self.component.name().to_string(),
419            name: self.name().to_string(),
420        }
421    }
422
423    pub fn name(&self) -> &str {
424        &self.name
425    }
426
427    pub fn component(&self) -> &Component {
428        &self.component
429    }
430
431    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
432    pub fn path(&self) -> String {
433        format!(
434            "{}/{}/{}",
435            self.component.path(),
436            ENDPOINT_KEYWORD,
437            self.name
438        )
439    }
440
441    /// The endpoint part of an instance path in etcd
442    pub fn etcd_root(&self) -> String {
443        let component_path = self.component.etcd_root();
444        let endpoint_name = &self.name;
445        format!("{component_path}/{endpoint_name}")
446    }
447
448    /// The endpoint as an EtcdPath object
449    pub fn etcd_path(&self) -> EtcdPath {
450        EtcdPath::new_endpoint(
451            &self.component.namespace().name(),
452            &self.component.name(),
453            &self.name,
454        )
455        .expect("Endpoint name and component name should be valid")
456    }
457
458    /// The fully path of an instance in etcd
459    pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String {
460        let endpoint_root = self.etcd_root();
461        if self.is_static {
462            endpoint_root
463        } else {
464            format!("{endpoint_root}:{lease_id:x}")
465        }
466    }
467
468    /// The endpoint as an EtcdPath object with lease ID
469    pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
470        if self.is_static {
471            self.etcd_path()
472        } else {
473            EtcdPath::new_endpoint_with_lease(
474                &self.component.namespace().name(),
475                &self.component.name(),
476                &self.name,
477                lease_id,
478            )
479            .expect("Endpoint name and component name should be valid")
480        }
481    }
482
483    pub fn name_with_id(&self, lease_id: i64) -> String {
484        if self.is_static {
485            self.name.clone()
486        } else {
487            format!("{}-{:x}", self.name, lease_id)
488        }
489    }
490
491    pub fn subject(&self) -> String {
492        format!("{}.{}", self.component.service_name(), self.name)
493    }
494
495    /// Subject to an instance of the [Endpoint] with a specific lease id
496    pub fn subject_to(&self, lease_id: i64) -> String {
497        format!(
498            "{}.{}",
499            self.component.service_name(),
500            self.name_with_id(lease_id)
501        )
502    }
503
504    pub async fn client(&self) -> Result<client::Client> {
505        if self.is_static {
506            client::Client::new_static(self.clone()).await
507        } else {
508            client::Client::new_dynamic(self.clone()).await
509        }
510    }
511
512    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
513        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
514    }
515}
516
517#[derive(Builder, Clone, Validate)]
518#[builder(pattern = "owned")]
519pub struct Namespace {
520    #[builder(private)]
521    runtime: Arc<DistributedRuntime>,
522
523    #[validate(custom(function = "validate_allowed_chars"))]
524    name: String,
525
526    is_static: bool,
527
528    #[builder(default = "None")]
529    parent: Option<Arc<Namespace>>,
530
531    /// Additional labels for metrics
532    #[builder(default = "Vec::new()")]
533    labels: Vec<(String, String)>,
534}
535
536impl DistributedRuntimeProvider for Namespace {
537    fn drt(&self) -> &DistributedRuntime {
538        &self.runtime
539    }
540}
541
542impl std::fmt::Debug for Namespace {
543    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544        write!(
545            f,
546            "Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
547            self.name, self.is_static, self.parent
548        )
549    }
550}
551
552impl RuntimeProvider for Namespace {
553    fn rt(&self) -> &Runtime {
554        self.runtime.rt()
555    }
556}
557
558impl std::fmt::Display for Namespace {
559    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560        write!(f, "{}", self.name)
561    }
562}
563
564impl Namespace {
565    pub(crate) fn new(runtime: DistributedRuntime, name: String, is_static: bool) -> Result<Self> {
566        Ok(NamespaceBuilder::default()
567            .runtime(Arc::new(runtime))
568            .name(name)
569            .is_static(is_static)
570            .build()?)
571    }
572
573    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
574    pub fn component(&self, name: impl Into<String>) -> Result<Component> {
575        let component = ComponentBuilder::from_runtime(self.runtime.clone())
576            .name(name)
577            .namespace(self.clone())
578            .is_static(self.is_static)
579            .build()?;
580
581        // Register the metrics callback for this component.
582        // If registration fails, log a warning but do not propagate the error,
583        // as metrics are not mission critical and should not block component creation.
584        if let Err(err) = component.start_scraping_nats_service_component_metrics() {
585            let error_str = err.to_string();
586
587            // Check if this is a duplicate metrics registration (expected in some cases)
588            // or a different error (unexpected)
589            if error_str.contains("Duplicate metrics") {
590                // This is not a critical error because it's possible for multiple Components
591                // with the same service_name to register metrics callbacks.
592                tracing::debug!(
593                    "Duplicate metrics registration for component '{}' (expected when multiple components share the same service_name): {}",
594                    component.service_name(),
595                    error_str
596                );
597            } else {
598                // This is unexpected and should be more visible
599                tracing::warn!(
600                    "Failed to start scraping metrics for component '{}': {}",
601                    component.service_name(),
602                    err
603                );
604            }
605        }
606
607        Ok(component)
608    }
609
610    /// Create a [`Namespace`] in the parent namespace
611    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
612        Ok(NamespaceBuilder::default()
613            .runtime(self.runtime.clone())
614            .name(name.into())
615            .is_static(self.is_static)
616            .parent(Some(Arc::new(self.clone())))
617            .build()?)
618    }
619
620    pub fn etcd_path(&self) -> String {
621        format!("{}{}", ETCD_ROOT_PATH, self.name())
622    }
623
624    pub fn name(&self) -> String {
625        match &self.parent {
626            Some(parent) => format!("{}.{}", parent.name(), self.name),
627            None => self.name.clone(),
628        }
629    }
630}
631
632// Custom validator function
633fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
634    // Define the allowed character set using a regex
635    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
636
637    if regex.is_match(input) {
638        Ok(())
639    } else {
640        Err(ValidationError::new("invalid_characters"))
641    }
642}
643
644// TODO - enable restrictions to the character sets allowed for namespaces,
645// components, and endpoints.
646//
647// Put Validate traits on the struct and use the `validate_allowed_chars` method
648// to validate the fields.
649
650// #[cfg(test)]
651// mod tests {
652//     use super::*;
653//     use validator::Validate;
654
655//     #[test]
656//     fn test_valid_names() {
657//         // Valid strings
658//         let valid_inputs = vec![
659//             "abc",        // Lowercase letters
660//             "abc123",     // Letters and numbers
661//             "a-b-c",      // Letters with hyphens
662//             "a_b_c",      // Letters with underscores
663//             "a-b_c-123",  // Mixed valid characters
664//             "a",          // Single character
665//             "a_b",        // Short valid pattern
666//             "123456",     // Only numbers
667//             "a---b_c123", // Repeated hyphens/underscores
668//         ];
669
670//         for input in valid_inputs {
671//             let result = validate_allowed_chars(input);
672//             assert!(result.is_ok(), "Expected '{}' to be valid", input);
673//         }
674//     }
675
676//     #[test]
677//     fn test_invalid_names() {
678//         // Invalid strings
679//         let invalid_inputs = vec![
680//             "abc!",     // Invalid character `!`
681//             "abc@",     // Invalid character `@`
682//             "123$",     // Invalid character `$`
683//             "foo.bar",  // Invalid character `.`
684//             "foo/bar",  // Invalid character `/`
685//             "foo\\bar", // Invalid character `\`
686//             "abc#",     // Invalid character `#`
687//             "abc def",  // Spaces are not allowed
688//             "foo,",     // Invalid character `,`
689//             "",         // Empty string
690//         ];
691
692//         for input in invalid_inputs {
693//             let result = validate_allowed_chars(input);
694//             assert!(result.is_err(), "Expected '{}' to be invalid", input);
695//         }
696//     }
697
698//     // #[test]
699//     // fn test_struct_validation_valid() {
700//     //     // Struct with valid data
701//     //     let valid_data = InputData {
702//     //         name: "valid-name_123".to_string(),
703//     //     };
704//     //     assert!(valid_data.validate().is_ok());
705//     // }
706
707//     // #[test]
708//     // fn test_struct_validation_invalid() {
709//     //     // Struct with invalid data
710//     //     let invalid_data = InputData {
711//     //         name: "invalid!name".to_string(),
712//     //     };
713//     //     let result = invalid_data.validate();
714//     //     assert!(result.is_err());
715
716//     //     if let Err(errors) = result {
717//     //         let error_map = errors.field_errors();
718//     //         assert!(error_map.contains_key("name"));
719//     //         let name_errors = &error_map["name"];
720//     //         assert_eq!(name_errors[0].code, "invalid_characters");
721//     //     }
722//     // }
723
724//     #[test]
725//     fn test_edge_cases() {
726//         // Edge cases
727//         let edge_inputs = vec![
728//             ("-", true),   // Single hyphen
729//             ("_", true),   // Single underscore
730//             ("a-", true),  // Letter with hyphen
731//             ("-", false),  // Repeated hyphens
732//             ("-a", false), // Hyphen at the beginning
733//             ("a-", false), // Hyphen at the end
734//         ];
735
736//         for (input, expected_validity) in edge_inputs {
737//             let result = validate_allowed_chars(input);
738//             if expected_validity {
739//                 assert!(result.is_ok(), "Expected '{}' to be valid", input);
740//             } else {
741//                 assert!(result.is_err(), "Expected '{}' to be invalid", input);
742//             }
743//         }
744//     }
745// }