dynamo_runtime/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The [Component] module defines the top-level API for building distributed applications.
5//!
6//! A distributed application consists of a set of [Component] that can host one
7//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
8//! that can be accessed by other [Component] in the distributed application.
9//!
10//! A [Component] is made discoverable by registering it with the distributed runtime under
11//! a [`Namespace`].
12//!
13//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
14//!
15//! We might extend namespace to include grouping behavior, which would define groups of
16//! components that are tightly coupled.
17//!
18//! A [Component] is the core building block of a distributed application. It is a logical
19//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
20//! distributed application.
21//!
22//! A [Component] can present to the distributed application one or more configuration files
23//! which define how that component was constructed/configured and what capabilities it can
24//! provide.
25//!
26//! Other [Component] can write to watching locations within a [Component] etcd
27//! path. This allows the [Component] to take dynamic actions depending on the watch
28//! triggers.
29//!
30//! TODO: Top-level Overview of Endpoints/Functions
31
32use std::fmt;
33
34use crate::{
35    config::HealthStatus,
36    discovery::Lease,
37    metrics::{MetricsRegistry, prometheus_names},
38    service::ServiceSet,
39    transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
40};
41
42use super::{
43    DistributedRuntime, Result, Runtime, error,
44    traits::*,
45    transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
46    transports::nats::Slug,
47    utils::Duration,
48};
49
50use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
51use crate::protocols::EndpointId;
52use crate::service::ComponentNatsServerPrometheusMetrics;
53use async_nats::{
54    rustls::quic,
55    service::{Service, ServiceExt},
56};
57use derive_builder::Builder;
58use derive_getters::Getters;
59use educe::Educe;
60use serde::{Deserialize, Serialize};
61use service::EndpointStatsHandler;
62use std::{collections::HashMap, hash::Hash, sync::Arc};
63use validator::{Validate, ValidationError};
64
65mod client;
66#[allow(clippy::module_inception)]
67mod component;
68mod endpoint;
69mod namespace;
70mod registry;
71pub mod service;
72
73pub use client::{Client, InstanceSource};
74
75/// The root key-value path where each instance registers itself in.
76/// An instance is namespace+component+endpoint+lease_id and must be unique.
77pub const INSTANCE_ROOT_PATH: &str = "v1/instances";
78
79#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
80#[serde(rename_all = "snake_case")]
81pub enum TransportType {
82    NatsTcp(String),
83}
84
85#[derive(Default)]
86pub struct RegistryInner {
87    services: HashMap<String, Service>,
88    stats_handlers: HashMap<String, Arc<std::sync::Mutex<HashMap<String, EndpointStatsHandler>>>>,
89}
90
91#[derive(Clone)]
92pub struct Registry {
93    inner: Arc<tokio::sync::Mutex<RegistryInner>>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97pub struct Instance {
98    pub component: String,
99    pub endpoint: String,
100    pub namespace: String,
101    pub instance_id: i64,
102    pub transport: TransportType,
103}
104
105impl Instance {
106    pub fn id(&self) -> i64 {
107        self.instance_id
108    }
109    pub fn endpoint_id(&self) -> EndpointId {
110        EndpointId {
111            namespace: self.namespace.clone(),
112            component: self.component.clone(),
113            name: self.endpoint.clone(),
114        }
115    }
116}
117
118impl fmt::Display for Instance {
119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120        write!(
121            f,
122            "{}/{}/{}/{}",
123            self.namespace, self.component, self.endpoint, self.instance_id
124        )
125    }
126}
127
128/// Sort by string name
129impl std::cmp::Ord for Instance {
130    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
131        self.to_string().cmp(&other.to_string())
132    }
133}
134
135impl PartialOrd for Instance {
136    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
137        // Since Ord is fully implemented, the comparison is always total.
138        Some(self.cmp(other))
139    }
140}
141
142/// A [Component] a discoverable entity in the distributed runtime.
143/// You can host [Endpoint] on a [Component] by first creating
144/// a [Service] then adding one or more [Endpoint] to the [Service].
145///
146/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
147#[derive(Educe, Builder, Clone, Validate)]
148#[educe(Debug)]
149#[builder(pattern = "owned")]
150pub struct Component {
151    #[builder(private)]
152    #[educe(Debug(ignore))]
153    drt: Arc<DistributedRuntime>,
154
155    // todo - restrict the namespace to a-z0-9-_A-Z
156    /// Name of the component
157    #[builder(setter(into))]
158    #[validate(custom(function = "validate_allowed_chars"))]
159    name: String,
160
161    /// Additional labels for metrics
162    #[builder(default = "Vec::new()")]
163    labels: Vec<(String, String)>,
164
165    // todo - restrict the namespace to a-z0-9-_A-Z
166    /// Namespace
167    #[builder(setter(into))]
168    namespace: Namespace,
169
170    // A static component's endpoints cannot be discovered via etcd, they are
171    // fixed at startup time.
172    is_static: bool,
173}
174
175impl Hash for Component {
176    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
177        self.namespace.name().hash(state);
178        self.name.hash(state);
179        self.is_static.hash(state);
180    }
181}
182
183impl PartialEq for Component {
184    fn eq(&self, other: &Self) -> bool {
185        self.namespace.name() == other.namespace.name()
186            && self.name == other.name
187            && self.is_static == other.is_static
188    }
189}
190
191impl Eq for Component {}
192
193impl std::fmt::Display for Component {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        write!(f, "{}.{}", self.namespace.name(), self.name)
196    }
197}
198
199impl DistributedRuntimeProvider for Component {
200    fn drt(&self) -> &DistributedRuntime {
201        &self.drt
202    }
203}
204
205impl RuntimeProvider for Component {
206    fn rt(&self) -> &Runtime {
207        self.drt.rt()
208    }
209}
210
211impl MetricsRegistry for Component {
212    fn basename(&self) -> String {
213        self.name.clone()
214    }
215
216    fn parent_hierarchy(&self) -> Vec<String> {
217        [
218            self.namespace.parent_hierarchy(),
219            vec![self.namespace.basename()],
220        ]
221        .concat()
222    }
223}
224
225impl Component {
226    /// The component part of an instance path in key-value store.
227    pub fn instance_root(&self) -> String {
228        let ns = self.namespace.name();
229        let cp = &self.name;
230        format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
231    }
232
233    pub fn service_name(&self) -> String {
234        let service_name = format!("{}_{}", self.namespace.name(), self.name);
235        Slug::slugify(&service_name).to_string()
236    }
237
238    pub fn path(&self) -> String {
239        format!("{}/{}", self.namespace.name(), self.name)
240    }
241
242    pub fn etcd_path(&self) -> EtcdPath {
243        EtcdPath::new_component(&self.namespace.name(), &self.name)
244            .expect("Component name and namespace should be valid")
245    }
246
247    pub fn namespace(&self) -> &Namespace {
248        &self.namespace
249    }
250
251    pub fn name(&self) -> &str {
252        &self.name
253    }
254
255    pub fn labels(&self) -> &[(String, String)] {
256        &self.labels
257    }
258
259    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
260        Endpoint {
261            component: self.clone(),
262            name: endpoint.into(),
263            is_static: self.is_static,
264            labels: Vec::new(),
265        }
266    }
267
268    pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
269        let client = self.drt.store();
270        let Some(bucket) = client.get_bucket(&self.instance_root()).await? else {
271            return Ok(vec![]);
272        };
273        let entries = bucket.entries().await?;
274        let mut instances = Vec::with_capacity(entries.len());
275        for (name, bytes) in entries.into_iter() {
276            let val = match serde_json::from_slice::<Instance>(&bytes) {
277                Ok(val) => val,
278                Err(err) => {
279                    anyhow::bail!("Error converting storage response to Instance: {err}. {name}",);
280                }
281            };
282            instances.push(val);
283        }
284        instances.sort();
285        Ok(instances)
286    }
287
288    /// Scrape ServiceSet, which contains NATS stats as well as user defined stats
289    /// embedded in data field of ServiceInfo.
290    pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
291        // Debug: scraping stats for component
292        let service_name = self.service_name();
293        let service_client = self.drt().service_client();
294        service_client
295            .collect_services(&service_name, timeout)
296            .await
297    }
298
299    /// Add Prometheus metrics for this component's NATS service stats.
300    ///
301    /// Starts a background task that periodically requests service statistics from NATS
302    /// and updates the corresponding Prometheus metrics. The first scrape happens immediately,
303    /// then subsequent scrapes occur at a fixed interval of 9.8 seconds (MAX_WAIT_MS),
304    /// which should be near or smaller than typical Prometheus scraping intervals to ensure
305    /// metrics are fresh when Prometheus collects them.
306    pub fn start_scraping_nats_service_component_metrics(&self) -> Result<()> {
307        const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800); // Should be <= Prometheus scrape interval
308
309        // If there is another component with the same service name, this will fail.
310        let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
311
312        let component_clone = self.clone();
313        let mut hierarchies = self.parent_hierarchy();
314        hierarchies.push(self.hierarchy());
315        debug_assert!(
316            hierarchies
317                .last()
318                .map(|x| x.as_str())
319                .unwrap_or_default()
320                .eq_ignore_ascii_case(&self.service_name())
321        ); // it happens that in component, hierarchy and service name are the same
322
323        // Start a background task that scrapes stats every 5 seconds
324        let m = component_metrics.clone();
325        let c = component_clone.clone();
326
327        // Use the DRT's runtime handle to spawn the background task.
328        // We cannot use regular `tokio::spawn` here because:
329        // 1. This method may be called from contexts without an active Tokio runtime
330        //    (e.g., tests that create a DRT in a blocking context)
331        // 2. Tests often create a temporary runtime just to build the DRT, then drop it
332        // 3. `tokio::spawn` requires being called from within a runtime context
333        // By using the DRT's own runtime handle, we ensure the task runs in the
334        // correct runtime that will persist for the lifetime of the component.
335        c.drt().runtime().secondary().spawn(async move {
336            let timeout = std::time::Duration::from_millis(500);
337            let mut interval = tokio::time::interval(MAX_WAIT_MS);
338            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
339
340            loop {
341                match c.scrape_stats(timeout).await {
342                    Ok(service_set) => {
343                        m.update_from_service_set(&service_set);
344                    }
345                    Err(err) => {
346                        tracing::error!(
347                            "Background scrape failed for {}: {}",
348                            c.service_name(),
349                            err
350                        );
351                        m.reset_to_zeros();
352                    }
353                }
354
355                interval.tick().await;
356            }
357        });
358
359        Ok(())
360    }
361
362    /// TODO
363    ///
364    /// This method will scrape the stats for all available services
365    /// Returns a stream of `ServiceInfo` objects.
366    /// This should be consumed by a `[tokio::time::timeout_at`] because each services
367    /// will only respond once, but there is no way to know when all services have responded.
368    pub async fn stats_stream(&self) -> Result<()> {
369        unimplemented!("collect_stats")
370    }
371
372    pub fn service_builder(&self) -> service::ServiceConfigBuilder {
373        service::ServiceConfigBuilder::from_component(self.clone())
374    }
375}
376
377impl ComponentBuilder {
378    pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
379        Self::default().drt(drt)
380    }
381}
382
383#[derive(Debug, Clone)]
384pub struct Endpoint {
385    component: Component,
386
387    // todo - restrict alphabet
388    /// Endpoint name
389    name: String,
390
391    is_static: bool,
392
393    /// Additional labels for metrics
394    labels: Vec<(String, String)>,
395}
396
397impl Hash for Endpoint {
398    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
399        self.component.hash(state);
400        self.name.hash(state);
401        self.is_static.hash(state);
402    }
403}
404
405impl PartialEq for Endpoint {
406    fn eq(&self, other: &Self) -> bool {
407        self.component == other.component
408            && self.name == other.name
409            && self.is_static == other.is_static
410    }
411}
412
413impl Eq for Endpoint {}
414
415impl DistributedRuntimeProvider for Endpoint {
416    fn drt(&self) -> &DistributedRuntime {
417        self.component.drt()
418    }
419}
420
421impl RuntimeProvider for Endpoint {
422    fn rt(&self) -> &Runtime {
423        self.component.rt()
424    }
425}
426
427impl MetricsRegistry for Endpoint {
428    fn basename(&self) -> String {
429        self.name.clone()
430    }
431
432    fn parent_hierarchy(&self) -> Vec<String> {
433        [
434            self.component.parent_hierarchy(),
435            vec![self.component.basename()],
436        ]
437        .concat()
438    }
439}
440
441impl Endpoint {
442    pub fn id(&self) -> EndpointId {
443        EndpointId {
444            namespace: self.component.namespace().name().to_string(),
445            component: self.component.name().to_string(),
446            name: self.name().to_string(),
447        }
448    }
449
450    pub fn name(&self) -> &str {
451        &self.name
452    }
453
454    pub fn component(&self) -> &Component {
455        &self.component
456    }
457
458    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
459    pub fn path(&self) -> String {
460        format!(
461            "{}/{}/{}",
462            self.component.path(),
463            ENDPOINT_KEYWORD,
464            self.name
465        )
466    }
467
468    /// The endpoint part of an instance path in etcd
469    pub fn etcd_root(&self) -> String {
470        let component_path = self.component.instance_root();
471        let endpoint_name = &self.name;
472        format!("{component_path}/{endpoint_name}")
473    }
474
475    /// The endpoint as an EtcdPath object
476    pub fn etcd_path(&self) -> EtcdPath {
477        EtcdPath::new_endpoint(
478            &self.component.namespace().name(),
479            self.component.name(),
480            &self.name,
481        )
482        .expect("Endpoint name and component name should be valid")
483    }
484
485    /// The fully path of an instance in etcd
486    pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String {
487        format!("{INSTANCE_ROOT_PATH}/{}", self.unique_path(lease_id))
488    }
489
490    /// Full path of this endpoint with forward slash separators, including lease id
491    pub fn unique_path(&self, lease_id: i64) -> String {
492        let ns = self.component.namespace().name();
493        let cp = self.component.name();
494        let ep = self.name();
495        format!("{ns}/{cp}/{ep}/{lease_id:x}")
496    }
497
498    /// The endpoint as an EtcdPath object with lease ID
499    pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
500        if self.is_static {
501            self.etcd_path()
502        } else {
503            EtcdPath::new_endpoint_with_lease(
504                &self.component.namespace().name(),
505                self.component.name(),
506                &self.name,
507                lease_id,
508            )
509            .expect("Endpoint name and component name should be valid")
510        }
511    }
512
513    pub fn name_with_id(&self, lease_id: i64) -> String {
514        if self.is_static {
515            self.name.clone()
516        } else {
517            format!("{}-{:x}", self.name, lease_id)
518        }
519    }
520
521    pub fn subject(&self) -> String {
522        format!("{}.{}", self.component.service_name(), self.name)
523    }
524
525    /// Subject to an instance of the [Endpoint] with a specific lease id
526    pub fn subject_to(&self, lease_id: i64) -> String {
527        format!(
528            "{}.{}",
529            self.component.service_name(),
530            self.name_with_id(lease_id)
531        )
532    }
533
534    pub async fn client(&self) -> Result<client::Client> {
535        if self.is_static {
536            client::Client::new_static(self.clone()).await
537        } else {
538            client::Client::new_dynamic(self.clone()).await
539        }
540    }
541
542    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
543        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
544    }
545}
546
547#[derive(Builder, Clone, Validate)]
548#[builder(pattern = "owned")]
549pub struct Namespace {
550    #[builder(private)]
551    runtime: Arc<DistributedRuntime>,
552
553    #[validate(custom(function = "validate_allowed_chars"))]
554    name: String,
555
556    is_static: bool,
557
558    #[builder(default = "None")]
559    parent: Option<Arc<Namespace>>,
560
561    /// Additional labels for metrics
562    #[builder(default = "Vec::new()")]
563    labels: Vec<(String, String)>,
564}
565
566impl DistributedRuntimeProvider for Namespace {
567    fn drt(&self) -> &DistributedRuntime {
568        &self.runtime
569    }
570}
571
572impl std::fmt::Debug for Namespace {
573    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
574        write!(
575            f,
576            "Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
577            self.name, self.is_static, self.parent
578        )
579    }
580}
581
582impl RuntimeProvider for Namespace {
583    fn rt(&self) -> &Runtime {
584        self.runtime.rt()
585    }
586}
587
588impl std::fmt::Display for Namespace {
589    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
590        write!(f, "{}", self.name)
591    }
592}
593
594impl Namespace {
595    pub(crate) fn new(runtime: DistributedRuntime, name: String, is_static: bool) -> Result<Self> {
596        Ok(NamespaceBuilder::default()
597            .runtime(Arc::new(runtime))
598            .name(name)
599            .is_static(is_static)
600            .build()?)
601    }
602
603    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
604    pub fn component(&self, name: impl Into<String>) -> Result<Component> {
605        Ok(ComponentBuilder::from_runtime(self.runtime.clone())
606            .name(name)
607            .namespace(self.clone())
608            .is_static(self.is_static)
609            .build()?)
610    }
611
612    /// Create a [`Namespace`] in the parent namespace
613    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
614        Ok(NamespaceBuilder::default()
615            .runtime(self.runtime.clone())
616            .name(name.into())
617            .is_static(self.is_static)
618            .parent(Some(Arc::new(self.clone())))
619            .build()?)
620    }
621
622    pub fn etcd_path(&self) -> String {
623        format!("{ETCD_ROOT_PATH}{}", self.name())
624    }
625
626    pub fn name(&self) -> String {
627        match &self.parent {
628            Some(parent) => format!("{}.{}", parent.name(), self.name),
629            None => self.name.clone(),
630        }
631    }
632}
633
634// Custom validator function
635fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
636    // Define the allowed character set using a regex
637    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
638
639    if regex.is_match(input) {
640        Ok(())
641    } else {
642        Err(ValidationError::new("invalid_characters"))
643    }
644}
645
646// TODO - enable restrictions to the character sets allowed for namespaces,
647// components, and endpoints.
648//
649// Put Validate traits on the struct and use the `validate_allowed_chars` method
650// to validate the fields.
651
652// #[cfg(test)]
653// mod tests {
654//     use super::*;
655//     use validator::Validate;
656
657//     #[test]
658//     fn test_valid_names() {
659//         // Valid strings
660//         let valid_inputs = vec![
661//             "abc",        // Lowercase letters
662//             "abc123",     // Letters and numbers
663//             "a-b-c",      // Letters with hyphens
664//             "a_b_c",      // Letters with underscores
665//             "a-b_c-123",  // Mixed valid characters
666//             "a",          // Single character
667//             "a_b",        // Short valid pattern
668//             "123456",     // Only numbers
669//             "a---b_c123", // Repeated hyphens/underscores
670//         ];
671
672//         for input in valid_inputs {
673//             let result = validate_allowed_chars(input);
674//             assert!(result.is_ok(), "Expected '{}' to be valid", input);
675//         }
676//     }
677
678//     #[test]
679//     fn test_invalid_names() {
680//         // Invalid strings
681//         let invalid_inputs = vec![
682//             "abc!",     // Invalid character `!`
683//             "abc@",     // Invalid character `@`
684//             "123$",     // Invalid character `$`
685//             "foo.bar",  // Invalid character `.`
686//             "foo/bar",  // Invalid character `/`
687//             "foo\\bar", // Invalid character `\`
688//             "abc#",     // Invalid character `#`
689//             "abc def",  // Spaces are not allowed
690//             "foo,",     // Invalid character `,`
691//             "",         // Empty string
692//         ];
693
694//         for input in invalid_inputs {
695//             let result = validate_allowed_chars(input);
696//             assert!(result.is_err(), "Expected '{}' to be invalid", input);
697//         }
698//     }
699
700//     // #[test]
701//     // fn test_struct_validation_valid() {
702//     //     // Struct with valid data
703//     //     let valid_data = InputData {
704//     //         name: "valid-name_123".to_string(),
705//     //     };
706//     //     assert!(valid_data.validate().is_ok());
707//     // }
708
709//     // #[test]
710//     // fn test_struct_validation_invalid() {
711//     //     // Struct with invalid data
712//     //     let invalid_data = InputData {
713//     //         name: "invalid!name".to_string(),
714//     //     };
715//     //     let result = invalid_data.validate();
716//     //     assert!(result.is_err());
717
718//     //     if let Err(errors) = result {
719//     //         let error_map = errors.field_errors();
720//     //         assert!(error_map.contains_key("name"));
721//     //         let name_errors = &error_map["name"];
722//     //         assert_eq!(name_errors[0].code, "invalid_characters");
723//     //     }
724//     // }
725
726//     #[test]
727//     fn test_edge_cases() {
728//         // Edge cases
729//         let edge_inputs = vec![
730//             ("-", true),   // Single hyphen
731//             ("_", true),   // Single underscore
732//             ("a-", true),  // Letter with hyphen
733//             ("-", false),  // Repeated hyphens
734//             ("-a", false), // Hyphen at the beginning
735//             ("a-", false), // Hyphen at the end
736//         ];
737
738//         for (input, expected_validity) in edge_inputs {
739//             let result = validate_allowed_chars(input);
740//             if expected_validity {
741//                 assert!(result.is_ok(), "Expected '{}' to be valid", input);
742//             } else {
743//                 assert!(result.is_err(), "Expected '{}' to be invalid", input);
744//             }
745//         }
746//     }
747// }