dynamo_runtime/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Component] module defines the top-level API for building distributed applications.
5//!
6//! A distributed application consists of a set of [Component] that can host one
7//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
8//! that can be accessed by other [Component] in the distributed application.
9//!
10//! A [Component] is made discoverable by registering it with the distributed runtime under
11//! a [`Namespace`].
12//!
13//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
14//!
15//! We might extend namespace to include grouping behavior, which would define groups of
16//! components that are tightly coupled.
17//!
18//! A [Component] is the core building block of a distributed application. It is a logical
19//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
20//! distributed application.
21//!
22//! A [Component] can present to the distributed application one or more configuration files
23//! which define how that component was constructed/configured and what capabilities it can
24//! provide.
25//!
26//! Other [Component] can write to watching locations within a [Component] etcd
27//! path. This allows the [Component] to take dynamic actions depending on the watch
28//! triggers.
29//!
30//! TODO: Top-level Overview of Endpoints/Functions
31
32use crate::{
33    config::HealthStatus,
34    discovery::Lease,
35    metrics::{MetricsRegistry, prometheus_names},
36    service::ServiceSet,
37    transports::etcd::EtcdPath,
38};
39
40use super::{
41    DistributedRuntime, Result, Runtime, error,
42    traits::*,
43    transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
44    transports::nats::Slug,
45    utils::Duration,
46};
47
48use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
49use crate::protocols::EndpointId;
50use crate::service::ComponentNatsServerPrometheusMetrics;
51use async_nats::{
52    rustls::quic,
53    service::{Service, ServiceExt},
54};
55use derive_builder::Builder;
56use derive_getters::Getters;
57use educe::Educe;
58use serde::{Deserialize, Serialize};
59use service::EndpointStatsHandler;
60use std::{collections::HashMap, hash::Hash, sync::Arc};
61use validator::{Validate, ValidationError};
62
63mod client;
64#[allow(clippy::module_inception)]
65mod component;
66mod endpoint;
67mod namespace;
68mod registry;
69pub mod service;
70
71pub use client::{Client, InstanceSource};
72
73/// The root etcd path where each instance registers itself in etcd.
74/// An instance is namespace+component+endpoint+lease_id and must be unique.
75pub const INSTANCE_ROOT_PATH: &str = "instances";
76
77/// The root etcd path where each namespace is registered in etcd.
78pub const ETCD_ROOT_PATH: &str = "dynamo://";
79
80#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
81#[serde(rename_all = "snake_case")]
82pub enum TransportType {
83    NatsTcp(String),
84}
85
86#[derive(Default)]
87pub struct RegistryInner {
88    services: HashMap<String, Service>,
89    stats_handlers: HashMap<String, Arc<std::sync::Mutex<HashMap<String, EndpointStatsHandler>>>>,
90}
91
92#[derive(Clone)]
93pub struct Registry {
94    inner: Arc<tokio::sync::Mutex<RegistryInner>>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct Instance {
99    pub component: String,
100    pub endpoint: String,
101    pub namespace: String,
102    pub instance_id: i64,
103    pub transport: TransportType,
104}
105
106impl Instance {
107    pub fn id(&self) -> i64 {
108        self.instance_id
109    }
110}
111
112/// A [Component] a discoverable entity in the distributed runtime.
113/// You can host [Endpoint] on a [Component] by first creating
114/// a [Service] then adding one or more [Endpoint] to the [Service].
115///
116/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
117#[derive(Educe, Builder, Clone, Validate)]
118#[educe(Debug)]
119#[builder(pattern = "owned")]
120pub struct Component {
121    #[builder(private)]
122    #[educe(Debug(ignore))]
123    drt: Arc<DistributedRuntime>,
124
125    // todo - restrict the namespace to a-z0-9-_A-Z
126    /// Name of the component
127    #[builder(setter(into))]
128    #[validate(custom(function = "validate_allowed_chars"))]
129    name: String,
130
131    /// Additional labels for metrics
132    #[builder(default = "Vec::new()")]
133    labels: Vec<(String, String)>,
134
135    // todo - restrict the namespace to a-z0-9-_A-Z
136    /// Namespace
137    #[builder(setter(into))]
138    namespace: Namespace,
139
140    // A static component's endpoints cannot be discovered via etcd, they are
141    // fixed at startup time.
142    is_static: bool,
143}
144
145impl Hash for Component {
146    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
147        self.namespace.name().hash(state);
148        self.name.hash(state);
149        self.is_static.hash(state);
150    }
151}
152
153impl PartialEq for Component {
154    fn eq(&self, other: &Self) -> bool {
155        self.namespace.name() == other.namespace.name()
156            && self.name == other.name
157            && self.is_static == other.is_static
158    }
159}
160
161impl Eq for Component {}
162
163impl std::fmt::Display for Component {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        write!(f, "{}.{}", self.namespace.name(), self.name)
166    }
167}
168
169impl DistributedRuntimeProvider for Component {
170    fn drt(&self) -> &DistributedRuntime {
171        &self.drt
172    }
173}
174
175impl RuntimeProvider for Component {
176    fn rt(&self) -> &Runtime {
177        self.drt.rt()
178    }
179}
180
181impl MetricsRegistry for Component {
182    fn basename(&self) -> String {
183        self.name.clone()
184    }
185
186    fn parent_hierarchy(&self) -> Vec<String> {
187        [
188            self.namespace.parent_hierarchy(),
189            vec![self.namespace.basename()],
190        ]
191        .concat()
192    }
193}
194
195impl Component {
196    /// The component part of an instance path in etcd.
197    pub fn etcd_root(&self) -> String {
198        let ns = self.namespace.name();
199        let cp = &self.name;
200        format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
201    }
202
203    pub fn service_name(&self) -> String {
204        let service_name = format!("{}_{}", self.namespace.name(), self.name);
205        Slug::slugify(&service_name).to_string()
206    }
207
208    pub fn path(&self) -> String {
209        format!("{}/{}", self.namespace.name(), self.name)
210    }
211
212    pub fn etcd_path(&self) -> EtcdPath {
213        EtcdPath::new_component(&self.namespace.name(), &self.name)
214            .expect("Component name and namespace should be valid")
215    }
216
217    pub fn namespace(&self) -> &Namespace {
218        &self.namespace
219    }
220
221    pub fn name(&self) -> String {
222        self.name.clone()
223    }
224
225    pub fn labels(&self) -> &[(String, String)] {
226        &self.labels
227    }
228
229    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
230        Endpoint {
231            component: self.clone(),
232            name: endpoint.into(),
233            is_static: self.is_static,
234            labels: Vec::new(),
235        }
236    }
237
238    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
239        let Some(etcd_client) = self.drt.etcd_client() else {
240            return Ok(vec![]);
241        };
242        let mut out = vec![];
243        // The extra slash is important to only list exact component matches, not substrings.
244        for kv in etcd_client
245            .kv_get_prefix(format!("{}/", self.etcd_root()))
246            .await?
247        {
248            let val = match serde_json::from_slice::<Instance>(kv.value()) {
249                Ok(val) => val,
250                Err(err) => {
251                    anyhow::bail!(
252                        "Error converting etcd response to Instance: {err}. {}",
253                        kv.value_str()?
254                    );
255                }
256            };
257            out.push(val);
258        }
259        Ok(out)
260    }
261
262    /// Scrape ServiceSet, which contains NATS stats as well as user defined stats
263    /// embedded in data field of ServiceInfo.
264    pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
265        // Debug: scraping stats for component
266        let service_name = self.service_name();
267        let service_client = self.drt().service_client();
268        service_client
269            .collect_services(&service_name, timeout)
270            .await
271    }
272
273    /// Add Prometheus metrics for this component's NATS service stats.
274    ///
275    /// Starts a background task that periodically requests service statistics from NATS
276    /// and updates the corresponding Prometheus metrics. The first scrape happens immediately,
277    /// then subsequent scrapes occur at a fixed interval of 9.8 seconds (MAX_WAIT_MS),
278    /// which should be near or smaller than typical Prometheus scraping intervals to ensure
279    /// metrics are fresh when Prometheus collects them.
280    pub fn start_scraping_nats_service_component_metrics(&self) -> Result<()> {
281        const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800); // Should be <= Prometheus scrape interval
282
283        // If there is another component with the same service name, this will fail.
284        let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
285
286        let component_clone = self.clone();
287        let mut hierarchies = self.parent_hierarchy();
288        hierarchies.push(self.hierarchy());
289        debug_assert!(
290            hierarchies
291                .last()
292                .map(|x| x.as_str())
293                .unwrap_or_default()
294                .eq_ignore_ascii_case(&self.service_name())
295        ); // it happens that in component, hierarchy and service name are the same
296
297        // Start a background task that scrapes stats every 5 seconds
298        let m = component_metrics.clone();
299        let c = component_clone.clone();
300
301        // Use the DRT's runtime handle to spawn the background task.
302        // We cannot use regular `tokio::spawn` here because:
303        // 1. This method may be called from contexts without an active Tokio runtime
304        //    (e.g., tests that create a DRT in a blocking context)
305        // 2. Tests often create a temporary runtime just to build the DRT, then drop it
306        // 3. `tokio::spawn` requires being called from within a runtime context
307        // By using the DRT's own runtime handle, we ensure the task runs in the
308        // correct runtime that will persist for the lifetime of the component.
309        c.drt().runtime().secondary().spawn(async move {
310            let timeout = std::time::Duration::from_millis(500);
311            let mut interval = tokio::time::interval(MAX_WAIT_MS);
312            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
313
314            loop {
315                match c.scrape_stats(timeout).await {
316                    Ok(service_set) => {
317                        m.update_from_service_set(&service_set);
318                    }
319                    Err(err) => {
320                        tracing::error!(
321                            "Background scrape failed for {}: {}",
322                            c.service_name(),
323                            err
324                        );
325                        m.reset_to_zeros();
326                    }
327                }
328
329                interval.tick().await;
330            }
331        });
332
333        Ok(())
334    }
335
336    /// TODO
337    ///
338    /// This method will scrape the stats for all available services
339    /// Returns a stream of `ServiceInfo` objects.
340    /// This should be consumed by a `[tokio::time::timeout_at`] because each services
341    /// will only respond once, but there is no way to know when all services have responded.
342    pub async fn stats_stream(&self) -> Result<()> {
343        unimplemented!("collect_stats")
344    }
345
346    pub fn service_builder(&self) -> service::ServiceConfigBuilder {
347        service::ServiceConfigBuilder::from_component(self.clone())
348    }
349}
350
351impl ComponentBuilder {
352    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
353        Self::default().drt(drt)
354    }
355}
356
357#[derive(Debug, Clone)]
358pub struct Endpoint {
359    component: Component,
360
361    // todo - restrict alphabet
362    /// Endpoint name
363    name: String,
364
365    is_static: bool,
366
367    /// Additional labels for metrics
368    labels: Vec<(String, String)>,
369}
370
371impl Hash for Endpoint {
372    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
373        self.component.hash(state);
374        self.name.hash(state);
375        self.is_static.hash(state);
376    }
377}
378
379impl PartialEq for Endpoint {
380    fn eq(&self, other: &Self) -> bool {
381        self.component == other.component
382            && self.name == other.name
383            && self.is_static == other.is_static
384    }
385}
386
387impl Eq for Endpoint {}
388
389impl DistributedRuntimeProvider for Endpoint {
390    fn drt(&self) -> &DistributedRuntime {
391        self.component.drt()
392    }
393}
394
395impl RuntimeProvider for Endpoint {
396    fn rt(&self) -> &Runtime {
397        self.component.rt()
398    }
399}
400
401impl MetricsRegistry for Endpoint {
402    fn basename(&self) -> String {
403        self.name.clone()
404    }
405
406    fn parent_hierarchy(&self) -> Vec<String> {
407        [
408            self.component.parent_hierarchy(),
409            vec![self.component.basename()],
410        ]
411        .concat()
412    }
413}
414
415impl Endpoint {
416    pub fn id(&self) -> EndpointId {
417        EndpointId {
418            namespace: self.component.namespace().name().to_string(),
419            component: self.component.name().to_string(),
420            name: self.name().to_string(),
421        }
422    }
423
424    pub fn name(&self) -> &str {
425        &self.name
426    }
427
428    pub fn component(&self) -> &Component {
429        &self.component
430    }
431
432    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
433    pub fn path(&self) -> String {
434        format!(
435            "{}/{}/{}",
436            self.component.path(),
437            ENDPOINT_KEYWORD,
438            self.name
439        )
440    }
441
442    /// The endpoint part of an instance path in etcd
443    pub fn etcd_root(&self) -> String {
444        let component_path = self.component.etcd_root();
445        let endpoint_name = &self.name;
446        format!("{component_path}/{endpoint_name}")
447    }
448
449    /// The endpoint as an EtcdPath object
450    pub fn etcd_path(&self) -> EtcdPath {
451        EtcdPath::new_endpoint(
452            &self.component.namespace().name(),
453            &self.component.name(),
454            &self.name,
455        )
456        .expect("Endpoint name and component name should be valid")
457    }
458
459    /// The fully path of an instance in etcd
460    pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String {
461        let endpoint_root = self.etcd_root();
462        if self.is_static {
463            endpoint_root
464        } else {
465            format!("{endpoint_root}:{lease_id:x}")
466        }
467    }
468
469    /// The endpoint as an EtcdPath object with lease ID
470    pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
471        if self.is_static {
472            self.etcd_path()
473        } else {
474            EtcdPath::new_endpoint_with_lease(
475                &self.component.namespace().name(),
476                &self.component.name(),
477                &self.name,
478                lease_id,
479            )
480            .expect("Endpoint name and component name should be valid")
481        }
482    }
483
484    pub fn name_with_id(&self, lease_id: i64) -> String {
485        if self.is_static {
486            self.name.clone()
487        } else {
488            format!("{}-{:x}", self.name, lease_id)
489        }
490    }
491
492    pub fn subject(&self) -> String {
493        format!("{}.{}", self.component.service_name(), self.name)
494    }
495
496    /// Subject to an instance of the [Endpoint] with a specific lease id
497    pub fn subject_to(&self, lease_id: i64) -> String {
498        format!(
499            "{}.{}",
500            self.component.service_name(),
501            self.name_with_id(lease_id)
502        )
503    }
504
505    pub async fn client(&self) -> Result<client::Client> {
506        if self.is_static {
507            client::Client::new_static(self.clone()).await
508        } else {
509            client::Client::new_dynamic(self.clone()).await
510        }
511    }
512
513    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
514        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
515    }
516}
517
518#[derive(Builder, Clone, Validate)]
519#[builder(pattern = "owned")]
520pub struct Namespace {
521    #[builder(private)]
522    runtime: Arc<DistributedRuntime>,
523
524    #[validate(custom(function = "validate_allowed_chars"))]
525    name: String,
526
527    is_static: bool,
528
529    #[builder(default = "None")]
530    parent: Option<Arc<Namespace>>,
531
532    /// Additional labels for metrics
533    #[builder(default = "Vec::new()")]
534    labels: Vec<(String, String)>,
535}
536
537impl DistributedRuntimeProvider for Namespace {
538    fn drt(&self) -> &DistributedRuntime {
539        &self.runtime
540    }
541}
542
543impl std::fmt::Debug for Namespace {
544    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
545        write!(
546            f,
547            "Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
548            self.name, self.is_static, self.parent
549        )
550    }
551}
552
553impl RuntimeProvider for Namespace {
554    fn rt(&self) -> &Runtime {
555        self.runtime.rt()
556    }
557}
558
559impl std::fmt::Display for Namespace {
560    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561        write!(f, "{}", self.name)
562    }
563}
564
565impl Namespace {
566    pub(crate) fn new(runtime: DistributedRuntime, name: String, is_static: bool) -> Result<Self> {
567        Ok(NamespaceBuilder::default()
568            .runtime(Arc::new(runtime))
569            .name(name)
570            .is_static(is_static)
571            .build()?)
572    }
573
574    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
575    pub fn component(&self, name: impl Into<String>) -> Result<Component> {
576        Ok(ComponentBuilder::from_runtime(self.runtime.clone())
577            .name(name)
578            .namespace(self.clone())
579            .is_static(self.is_static)
580            .build()?)
581    }
582
583    /// Create a [`Namespace`] in the parent namespace
584    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
585        Ok(NamespaceBuilder::default()
586            .runtime(self.runtime.clone())
587            .name(name.into())
588            .is_static(self.is_static)
589            .parent(Some(Arc::new(self.clone())))
590            .build()?)
591    }
592
593    pub fn etcd_path(&self) -> String {
594        format!("{}{}", ETCD_ROOT_PATH, self.name())
595    }
596
597    pub fn name(&self) -> String {
598        match &self.parent {
599            Some(parent) => format!("{}.{}", parent.name(), self.name),
600            None => self.name.clone(),
601        }
602    }
603}
604
605// Custom validator function
606fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
607    // Define the allowed character set using a regex
608    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
609
610    if regex.is_match(input) {
611        Ok(())
612    } else {
613        Err(ValidationError::new("invalid_characters"))
614    }
615}
616
617// TODO - enable restrictions to the character sets allowed for namespaces,
618// components, and endpoints.
619//
620// Put Validate traits on the struct and use the `validate_allowed_chars` method
621// to validate the fields.
622
623// #[cfg(test)]
624// mod tests {
625//     use super::*;
626//     use validator::Validate;
627
628//     #[test]
629//     fn test_valid_names() {
630//         // Valid strings
631//         let valid_inputs = vec![
632//             "abc",        // Lowercase letters
633//             "abc123",     // Letters and numbers
634//             "a-b-c",      // Letters with hyphens
635//             "a_b_c",      // Letters with underscores
636//             "a-b_c-123",  // Mixed valid characters
637//             "a",          // Single character
638//             "a_b",        // Short valid pattern
639//             "123456",     // Only numbers
640//             "a---b_c123", // Repeated hyphens/underscores
641//         ];
642
643//         for input in valid_inputs {
644//             let result = validate_allowed_chars(input);
645//             assert!(result.is_ok(), "Expected '{}' to be valid", input);
646//         }
647//     }
648
649//     #[test]
650//     fn test_invalid_names() {
651//         // Invalid strings
652//         let invalid_inputs = vec![
653//             "abc!",     // Invalid character `!`
654//             "abc@",     // Invalid character `@`
655//             "123$",     // Invalid character `$`
656//             "foo.bar",  // Invalid character `.`
657//             "foo/bar",  // Invalid character `/`
658//             "foo\\bar", // Invalid character `\`
659//             "abc#",     // Invalid character `#`
660//             "abc def",  // Spaces are not allowed
661//             "foo,",     // Invalid character `,`
662//             "",         // Empty string
663//         ];
664
665//         for input in invalid_inputs {
666//             let result = validate_allowed_chars(input);
667//             assert!(result.is_err(), "Expected '{}' to be invalid", input);
668//         }
669//     }
670
671//     // #[test]
672//     // fn test_struct_validation_valid() {
673//     //     // Struct with valid data
674//     //     let valid_data = InputData {
675//     //         name: "valid-name_123".to_string(),
676//     //     };
677//     //     assert!(valid_data.validate().is_ok());
678//     // }
679
680//     // #[test]
681//     // fn test_struct_validation_invalid() {
682//     //     // Struct with invalid data
683//     //     let invalid_data = InputData {
684//     //         name: "invalid!name".to_string(),
685//     //     };
686//     //     let result = invalid_data.validate();
687//     //     assert!(result.is_err());
688
689//     //     if let Err(errors) = result {
690//     //         let error_map = errors.field_errors();
691//     //         assert!(error_map.contains_key("name"));
692//     //         let name_errors = &error_map["name"];
693//     //         assert_eq!(name_errors[0].code, "invalid_characters");
694//     //     }
695//     // }
696
697//     #[test]
698//     fn test_edge_cases() {
699//         // Edge cases
700//         let edge_inputs = vec![
701//             ("-", true),   // Single hyphen
702//             ("_", true),   // Single underscore
703//             ("a-", true),  // Letter with hyphen
704//             ("-", false),  // Repeated hyphens
705//             ("-a", false), // Hyphen at the beginning
706//             ("a-", false), // Hyphen at the end
707//         ];
708
709//         for (input, expected_validity) in edge_inputs {
710//             let result = validate_allowed_chars(input);
711//             if expected_validity {
712//                 assert!(result.is_ok(), "Expected '{}' to be valid", input);
713//             } else {
714//                 assert!(result.is_err(), "Expected '{}' to be invalid", input);
715//             }
716//         }
717//     }
718// }