dynamo_runtime/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [Component] module defines the top-level API for building distributed applications.
17//!
18//! A distributed application consists of a set of [Component] that can host one
19//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
20//! that can be accessed by other [Component] in the distributed application.
21//!
22//! A [Component] is made discoverable by registering it with the distributed runtime under
23//! a [`Namespace`].
24//!
25//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
26//!
27//! We might extend namespace to include grouping behavior, which would define groups of
28//! components that are tightly coupled.
29//!
30//! A [Component] is the core building block of a distributed application. It is a logical
31//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
32//! distributed application.
33//!
34//! A [Component] can present to the distributed application one or more configuration files
35//! which define how that component was constructed/configured and what capabilities it can
36//! provide.
37//!
38//! Other [Component] can write to watching locations within a [Component] etcd
39//! path. This allows the [Component] to take dynamic actions depending on the watch
40//! triggers.
41//!
42//! TODO: Top-level Overview of Endpoints/Functions
43
44use crate::{discovery::Lease, service::ServiceSet};
45
46use super::{
47    error, traits::*, transports::nats::Slug, utils::Duration, DistributedRuntime, Result, Runtime,
48};
49
50use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
51use async_nats::{
52    rustls::quic,
53    service::{Service, ServiceExt},
54};
55use derive_builder::Builder;
56use derive_getters::Getters;
57use educe::Educe;
58use serde::{Deserialize, Serialize};
59use service::EndpointStatsHandler;
60use std::{collections::HashMap, sync::Arc};
61use validator::{Validate, ValidationError};
62
63mod client;
64#[allow(clippy::module_inception)]
65mod component;
66mod endpoint;
67mod namespace;
68mod registry;
69pub mod service;
70
71pub use client::{Client, RouterMode};
72
73#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
74#[serde(rename_all = "snake_case")]
75pub enum TransportType {
76    NatsTcp(String),
77}
78
79#[derive(Default)]
80pub struct RegistryInner {
81    services: HashMap<String, Service>,
82    stats_handlers: HashMap<String, Arc<std::sync::Mutex<HashMap<String, EndpointStatsHandler>>>>,
83}
84
85#[derive(Clone)]
86pub struct Registry {
87    inner: Arc<tokio::sync::Mutex<RegistryInner>>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ComponentEndpointInfo {
92    pub component: String,
93    pub endpoint: String,
94    pub namespace: String,
95    pub lease_id: i64,
96    pub transport: TransportType,
97}
98
99/// A [Component] a discoverable entity in the distributed runtime.
100/// You can host [Endpoint] on a [Component] by first creating
101/// a [Service] then adding one or more [Endpoint] to the [Service].
102///
103/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
104#[derive(Educe, Builder, Clone)]
105#[educe(Debug)]
106#[builder(pattern = "owned")]
107pub struct Component {
108    #[builder(private)]
109    #[educe(Debug(ignore))]
110    drt: DistributedRuntime,
111
112    // todo - restrict the namespace to a-z0-9-_A-Z
113    /// Name of the component
114    #[builder(setter(into))]
115    name: String,
116
117    // todo - restrict the namespace to a-z0-9-_A-Z
118    /// Namespace
119    #[builder(setter(into))]
120    namespace: Namespace,
121
122    // A static component's endpoints cannot be discovered via etcd, they are
123    // fixed at startup time.
124    is_static: bool,
125}
126
127impl std::fmt::Display for Component {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "{}.{}", self.namespace.name(), self.name)
130    }
131}
132
133impl DistributedRuntimeProvider for Component {
134    fn drt(&self) -> &DistributedRuntime {
135        &self.drt
136    }
137}
138
139impl RuntimeProvider for Component {
140    fn rt(&self) -> &Runtime {
141        self.drt.rt()
142    }
143}
144
145impl Component {
146    pub fn etcd_path(&self) -> String {
147        format!("{}/components/{}", self.namespace.name(), self.name)
148    }
149
150    pub fn service_name(&self) -> String {
151        let service_name = format!("{}_{}", self.namespace.name(), self.name);
152        Slug::slugify_unique(&service_name).to_string()
153    }
154
155    pub fn path(&self) -> String {
156        format!("{}/{}", self.namespace.name(), self.name)
157    }
158
159    pub fn namespace(&self) -> &Namespace {
160        &self.namespace
161    }
162
163    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
164        Endpoint {
165            component: self.clone(),
166            name: endpoint.into(),
167            is_static: self.is_static,
168        }
169    }
170
171    /// Get keys from etcd on the slug, splitting the endpoints and only returning the
172    /// set of unique endpoints.
173    pub async fn list_endpoints(&self) -> Vec<Endpoint> {
174        unimplemented!("endpoints")
175    }
176
177    pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
178        let service_name = self.service_name();
179        let service_client = self.drt().service_client();
180        service_client
181            .collect_services(&service_name, timeout)
182            .await
183    }
184
185    /// TODO
186    ///
187    /// This method will scrape the stats for all available services
188    /// Returns a stream of `ServiceInfo` objects.
189    /// This should be consumed by a `[tokio::time::timeout_at`] because each services
190    /// will only respond once, but there is no way to know when all services have responded.
191    pub async fn stats_stream(&self) -> Result<()> {
192        unimplemented!("collect_stats")
193    }
194
195    pub fn service_builder(&self) -> service::ServiceConfigBuilder {
196        service::ServiceConfigBuilder::from_component(self.clone())
197    }
198}
199
200impl ComponentBuilder {
201    pub fn from_runtime(drt: DistributedRuntime) -> Self {
202        Self::default().drt(drt)
203    }
204}
205
206#[derive(Debug, Clone)]
207pub struct Endpoint {
208    component: Component,
209
210    // todo - restrict alphabet
211    /// Endpoint name
212    name: String,
213
214    is_static: bool,
215}
216
217impl DistributedRuntimeProvider for Endpoint {
218    fn drt(&self) -> &DistributedRuntime {
219        self.component.drt()
220    }
221}
222
223impl RuntimeProvider for Endpoint {
224    fn rt(&self) -> &Runtime {
225        self.component.rt()
226    }
227}
228
229impl Endpoint {
230    pub fn name(&self) -> &str {
231        &self.name
232    }
233
234    pub fn component(&self) -> &Component {
235        &self.component
236    }
237
238    pub fn path(&self) -> String {
239        format!("{}/{}", self.component.path(), self.name)
240    }
241
242    pub fn etcd_path(&self) -> String {
243        format!("{}/{}", self.component.etcd_path(), self.name)
244    }
245
246    pub fn etcd_path_with_id(&self, lease_id: i64) -> String {
247        if self.is_static {
248            self.etcd_path()
249        } else {
250            format!("{}:{:x}", self.etcd_path(), lease_id)
251        }
252    }
253
254    pub fn name_with_id(&self, lease_id: i64) -> String {
255        if self.is_static {
256            self.name.clone()
257        } else {
258            format!("{}-{:x}", self.name, lease_id)
259        }
260    }
261
262    pub fn subject(&self) -> String {
263        format!("{}.{}", self.component.service_name(), self.name)
264    }
265
266    /// Subject to an instance of the [Endpoint] with a specific lease id
267    pub fn subject_to(&self, lease_id: i64) -> String {
268        format!(
269            "{}.{}",
270            self.component.service_name(),
271            self.name_with_id(lease_id)
272        )
273    }
274
275    pub async fn client<Req, Resp>(&self) -> Result<client::Client<Req, Resp>>
276    where
277        Req: Serialize + Send + Sync + 'static,
278        Resp: for<'de> Deserialize<'de> + Send + Sync + 'static,
279    {
280        if self.is_static {
281            client::Client::new_static(self.clone()).await
282        } else {
283            client::Client::new_dynamic(self.clone()).await
284        }
285    }
286
287    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
288        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
289    }
290}
291
292#[derive(Educe, Builder, Clone, Validate)]
293#[educe(Debug)]
294#[builder(pattern = "owned")]
295pub struct Namespace {
296    #[builder(private)]
297    #[educe(Debug(ignore))]
298    runtime: DistributedRuntime,
299
300    #[validate()]
301    name: String,
302
303    is_static: bool,
304}
305
306impl DistributedRuntimeProvider for Namespace {
307    fn drt(&self) -> &DistributedRuntime {
308        &self.runtime
309    }
310}
311
312impl RuntimeProvider for Namespace {
313    fn rt(&self) -> &Runtime {
314        self.runtime.rt()
315    }
316}
317
318impl std::fmt::Display for Namespace {
319    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320        write!(f, "{}", self.name)
321    }
322}
323
324impl Namespace {
325    pub(crate) fn new(runtime: DistributedRuntime, name: String, is_static: bool) -> Result<Self> {
326        Ok(NamespaceBuilder::default()
327            .runtime(runtime)
328            .name(name)
329            .is_static(is_static)
330            .build()?)
331    }
332
333    /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
334    pub fn component(&self, name: impl Into<String>) -> Result<Component> {
335        Ok(ComponentBuilder::from_runtime(self.runtime.clone())
336            .name(name)
337            .namespace(self.clone())
338            .is_static(self.is_static)
339            .build()?)
340    }
341
342    pub fn name(&self) -> &str {
343        &self.name
344    }
345}
346
347// Custom validator function
348fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
349    // Define the allowed character set using a regex
350    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
351
352    if regex.is_match(input) {
353        Ok(())
354    } else {
355        Err(ValidationError::new("invalid_characters"))
356    }
357}
358
359// TODO - enable restrictions to the character sets allowed for namespaces,
360// components, and endpoints.
361//
362// Put Validate traits on the struct and use the `validate_allowed_chars` method
363// to validate the fields.
364
365// #[cfg(test)]
366// mod tests {
367//     use super::*;
368//     use validator::Validate;
369
370//     #[test]
371//     fn test_valid_names() {
372//         // Valid strings
373//         let valid_inputs = vec![
374//             "abc",        // Lowercase letters
375//             "abc123",     // Letters and numbers
376//             "a-b-c",      // Letters with hyphens
377//             "a_b_c",      // Letters with underscores
378//             "a-b_c-123",  // Mixed valid characters
379//             "a",          // Single character
380//             "a_b",        // Short valid pattern
381//             "123456",     // Only numbers
382//             "a---b_c123", // Repeated hyphens/underscores
383//         ];
384
385//         for input in valid_inputs {
386//             let result = validate_allowed_chars(input);
387//             assert!(result.is_ok(), "Expected '{}' to be valid", input);
388//         }
389//     }
390
391//     #[test]
392//     fn test_invalid_names() {
393//         // Invalid strings
394//         let invalid_inputs = vec![
395//             "abc!",     // Invalid character `!`
396//             "abc@",     // Invalid character `@`
397//             "123$",     // Invalid character `$`
398//             "foo.bar",  // Invalid character `.`
399//             "foo/bar",  // Invalid character `/`
400//             "foo\\bar", // Invalid character `\`
401//             "abc#",     // Invalid character `#`
402//             "abc def",  // Spaces are not allowed
403//             "foo,",     // Invalid character `,`
404//             "",         // Empty string
405//         ];
406
407//         for input in invalid_inputs {
408//             let result = validate_allowed_chars(input);
409//             assert!(result.is_err(), "Expected '{}' to be invalid", input);
410//         }
411//     }
412
413//     // #[test]
414//     // fn test_struct_validation_valid() {
415//     //     // Struct with valid data
416//     //     let valid_data = InputData {
417//     //         name: "valid-name_123".to_string(),
418//     //     };
419//     //     assert!(valid_data.validate().is_ok());
420//     // }
421
422//     // #[test]
423//     // fn test_struct_validation_invalid() {
424//     //     // Struct with invalid data
425//     //     let invalid_data = InputData {
426//     //         name: "invalid!name".to_string(),
427//     //     };
428//     //     let result = invalid_data.validate();
429//     //     assert!(result.is_err());
430
431//     //     if let Err(errors) = result {
432//     //         let error_map = errors.field_errors();
433//     //         assert!(error_map.contains_key("name"));
434//     //         let name_errors = &error_map["name"];
435//     //         assert_eq!(name_errors[0].code, "invalid_characters");
436//     //     }
437//     // }
438
439//     #[test]
440//     fn test_edge_cases() {
441//         // Edge cases
442//         let edge_inputs = vec![
443//             ("-", true),   // Single hyphen
444//             ("_", true),   // Single underscore
445//             ("a-", true),  // Letter with hyphen
446//             ("-", false),  // Repeated hyphens
447//             ("-a", false), // Hyphen at the beginning
448//             ("a-", false), // Hyphen at the end
449//         ];
450
451//         for (input, expected_validity) in edge_inputs {
452//             let result = validate_allowed_chars(input);
453//             if expected_validity {
454//                 assert!(result.is_ok(), "Expected '{}' to be valid", input);
455//             } else {
456//                 assert!(result.is_err(), "Expected '{}' to be invalid", input);
457//             }
458//         }
459//     }
460// }