dynamo_runtime/component.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [Component] module defines the top-level API for building distributed applications.
17//!
18//! A distributed application consists of a set of [Component] that can host one
19//! or more [Endpoint]. Each [Endpoint] is a network-accessible service
20//! that can be accessed by other [Component] in the distributed application.
21//!
22//! A [Component] is made discoverable by registering it with the distributed runtime under
23//! a [`Namespace`].
24//!
25//! A [`Namespace`] is a logical grouping of [Component] that are grouped together.
26//!
27//! We might extend namespace to include grouping behavior, which would define groups of
28//! components that are tightly coupled.
29//!
30//! A [Component] is the core building block of a distributed application. It is a logical
31//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
32//! distributed application.
33//!
34//! A [Component] can present to the distributed application one or more configuration files
35//! which define how that component was constructed/configured and what capabilities it can
36//! provide.
37//!
38//! Other [Component] can write to watching locations within a [Component] etcd
39//! path. This allows the [Component] to take dynamic actions depending on the watch
40//! triggers.
41//!
42//! TODO: Top-level Overview of Endpoints/Functions
43
44use crate::{discovery::Lease, service::ServiceSet};
45
46use super::{
47 error, traits::*, transports::nats::Slug, utils::Duration, DistributedRuntime, Result, Runtime,
48};
49
50use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
51use async_nats::{
52 rustls::quic,
53 service::{Service, ServiceExt},
54};
55use derive_builder::Builder;
56use derive_getters::Getters;
57use educe::Educe;
58use serde::{Deserialize, Serialize};
59use service::EndpointStatsHandler;
60use std::{collections::HashMap, sync::Arc};
61use validator::{Validate, ValidationError};
62
63mod client;
64#[allow(clippy::module_inception)]
65mod component;
66mod endpoint;
67mod namespace;
68mod registry;
69mod service;
70
71pub use client::Client;
72
73#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
74#[serde(rename_all = "snake_case")]
75pub enum TransportType {
76 NatsTcp(String),
77}
78
79#[derive(Default)]
80pub struct RegistryInner {
81 services: HashMap<String, Service>,
82 stats_handlers: HashMap<String, Arc<std::sync::Mutex<HashMap<String, EndpointStatsHandler>>>>,
83}
84
85#[derive(Clone)]
86pub struct Registry {
87 inner: Arc<tokio::sync::Mutex<RegistryInner>>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ComponentEndpointInfo {
92 pub component: String,
93 pub endpoint: String,
94 pub namespace: String,
95 pub lease_id: i64,
96 pub transport: TransportType,
97}
98
99/// A [Component] a discoverable entity in the distributed runtime.
100/// You can host [Endpoint] on a [Component] by first creating
101/// a [Service] then adding one or more [Endpoint] to the [Service].
102///
103/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
104#[derive(Educe, Builder, Clone)]
105#[educe(Debug)]
106#[builder(pattern = "owned")]
107pub struct Component {
108 #[builder(private)]
109 #[educe(Debug(ignore))]
110 drt: DistributedRuntime,
111
112 // todo - restrict the namespace to a-z0-9-_A-Z
113 /// Name of the component
114 #[builder(setter(into))]
115 name: String,
116
117 // todo - restrict the namespace to a-z0-9-_A-Z
118 /// Namespace
119 #[builder(setter(into))]
120 namespace: Namespace,
121}
122
123impl std::fmt::Display for Component {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 write!(f, "{}.{}", self.namespace.name(), self.name)
126 }
127}
128
129impl DistributedRuntimeProvider for Component {
130 fn drt(&self) -> &DistributedRuntime {
131 &self.drt
132 }
133}
134
135impl RuntimeProvider for Component {
136 fn rt(&self) -> &Runtime {
137 self.drt.rt()
138 }
139}
140
141impl Component {
142 pub fn etcd_path(&self) -> String {
143 format!("{}/components/{}", self.namespace.name(), self.name)
144 }
145
146 pub fn service_name(&self) -> String {
147 Slug::from_string(format!("{}|{}", self.namespace.name(), self.name)).to_string()
148 }
149
150 pub fn path(&self) -> String {
151 format!("{}/{}", self.namespace.name(), self.name)
152 }
153
154 pub fn namespace(&self) -> &Namespace {
155 &self.namespace
156 }
157
158 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
159 Endpoint {
160 component: self.clone(),
161 name: endpoint.into(),
162 }
163 }
164
165 /// Get keys from etcd on the slug, splitting the endpoints and only returning the
166 /// set of unique endpoints.
167 pub async fn list_endpoints(&self) -> Vec<Endpoint> {
168 unimplemented!("endpoints")
169 }
170
171 pub async fn scrape_stats(&self, duration: Duration) -> Result<ServiceSet> {
172 let service_name = self.service_name();
173 let service_client = self.drt().service_client();
174 service_client
175 .collect_services(&service_name, duration)
176 .await
177 }
178
179 /// TODO
180 ///
181 /// This method will scrape the stats for all available services
182 /// Returns a stream of `ServiceInfo` objects.
183 /// This should be consumed by a `[tokio::time::timeout_at`] because each services
184 /// will only respond once, but there is no way to know when all services have responded.
185 pub async fn stats_stream(&self) -> Result<()> {
186 unimplemented!("collect_stats")
187 }
188
189 pub fn service_builder(&self) -> service::ServiceConfigBuilder {
190 service::ServiceConfigBuilder::from_component(self.clone())
191 }
192}
193
194impl ComponentBuilder {
195 pub fn from_runtime(drt: DistributedRuntime) -> Self {
196 Self::default().drt(drt)
197 }
198}
199
200#[derive(Debug, Clone)]
201pub struct Endpoint {
202 component: Component,
203
204 // todo - restrict alphabet
205 /// Endpoint name
206 name: String,
207}
208
209impl DistributedRuntimeProvider for Endpoint {
210 fn drt(&self) -> &DistributedRuntime {
211 self.component.drt()
212 }
213}
214
215impl RuntimeProvider for Endpoint {
216 fn rt(&self) -> &Runtime {
217 self.component.rt()
218 }
219}
220
221impl Endpoint {
222 pub fn name(&self) -> &str {
223 &self.name
224 }
225
226 pub fn component(&self) -> &Component {
227 &self.component
228 }
229
230 pub fn path(&self) -> String {
231 format!("{}/{}", self.component.path(), self.name)
232 }
233
234 pub fn etcd_path(&self) -> String {
235 format!("{}/{}", self.component.etcd_path(), self.name)
236 }
237
238 pub fn etcd_path_with_id(&self, lease_id: i64) -> String {
239 format!("{}:{:x}", self.etcd_path(), lease_id)
240 }
241
242 pub fn name_with_id(&self, lease_id: i64) -> String {
243 format!("{}-{:x}", self.name, lease_id)
244 }
245
246 pub fn subject(&self) -> String {
247 format!("{}.{}", self.component.service_name(), self.name)
248 }
249
250 /// Subject to an instance of the [Endpoint] with a specific lease id
251 pub fn subject_to(&self, lease_id: i64) -> String {
252 format!(
253 "{}.{}",
254 self.component.service_name(),
255 self.name_with_id(lease_id)
256 )
257 }
258
259 pub async fn client<Req, Resp>(&self) -> Result<client::Client<Req, Resp>>
260 where
261 Req: Serialize + Send + Sync + 'static,
262 Resp: for<'de> Deserialize<'de> + Send + Sync + 'static,
263 {
264 client::Client::new(self.clone()).await
265 }
266
267 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
268 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
269 }
270}
271
272#[derive(Educe, Builder, Clone, Validate)]
273#[educe(Debug)]
274#[builder(pattern = "owned")]
275pub struct Namespace {
276 #[builder(private)]
277 #[educe(Debug(ignore))]
278 runtime: DistributedRuntime,
279
280 #[validate()]
281 name: String,
282}
283
284impl DistributedRuntimeProvider for Namespace {
285 fn drt(&self) -> &DistributedRuntime {
286 &self.runtime
287 }
288}
289
290impl RuntimeProvider for Namespace {
291 fn rt(&self) -> &Runtime {
292 self.runtime.rt()
293 }
294}
295
296impl std::fmt::Display for Namespace {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 write!(f, "{}", self.name)
299 }
300}
301
302impl Namespace {
303 pub(crate) fn new(runtime: DistributedRuntime, name: String) -> Result<Self> {
304 Ok(NamespaceBuilder::default()
305 .runtime(runtime)
306 .name(name)
307 .build()?)
308 }
309
310 /// Create a [`Component`] in the namespace
311 pub fn component(&self, name: impl Into<String>) -> Result<Component> {
312 Ok(ComponentBuilder::from_runtime(self.runtime.clone())
313 .name(name)
314 .namespace(self.clone())
315 .build()?)
316 }
317
318 pub fn name(&self) -> &str {
319 &self.name
320 }
321}
322
323// Custom validator function
324fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
325 // Define the allowed character set using a regex
326 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
327
328 if regex.is_match(input) {
329 Ok(())
330 } else {
331 Err(ValidationError::new("invalid_characters"))
332 }
333}
334
335// TODO - enable restrictions to the character sets allowed for namespaces,
336// components, and endpoints.
337//
338// Put Validate traits on the struct and use the `validate_allowed_chars` method
339// to validate the fields.
340
341// #[cfg(test)]
342// mod tests {
343// use super::*;
344// use validator::Validate;
345
346// #[test]
347// fn test_valid_names() {
348// // Valid strings
349// let valid_inputs = vec![
350// "abc", // Lowercase letters
351// "abc123", // Letters and numbers
352// "a-b-c", // Letters with hyphens
353// "a_b_c", // Letters with underscores
354// "a-b_c-123", // Mixed valid characters
355// "a", // Single character
356// "a_b", // Short valid pattern
357// "123456", // Only numbers
358// "a---b_c123", // Repeated hyphens/underscores
359// ];
360
361// for input in valid_inputs {
362// let result = validate_allowed_chars(input);
363// assert!(result.is_ok(), "Expected '{}' to be valid", input);
364// }
365// }
366
367// #[test]
368// fn test_invalid_names() {
369// // Invalid strings
370// let invalid_inputs = vec![
371// "abc!", // Invalid character `!`
372// "abc@", // Invalid character `@`
373// "123$", // Invalid character `$`
374// "foo.bar", // Invalid character `.`
375// "foo/bar", // Invalid character `/`
376// "foo\\bar", // Invalid character `\`
377// "abc#", // Invalid character `#`
378// "abc def", // Spaces are not allowed
379// "foo,", // Invalid character `,`
380// "", // Empty string
381// ];
382
383// for input in invalid_inputs {
384// let result = validate_allowed_chars(input);
385// assert!(result.is_err(), "Expected '{}' to be invalid", input);
386// }
387// }
388
389// // #[test]
390// // fn test_struct_validation_valid() {
391// // // Struct with valid data
392// // let valid_data = InputData {
393// // name: "valid-name_123".to_string(),
394// // };
395// // assert!(valid_data.validate().is_ok());
396// // }
397
398// // #[test]
399// // fn test_struct_validation_invalid() {
400// // // Struct with invalid data
401// // let invalid_data = InputData {
402// // name: "invalid!name".to_string(),
403// // };
404// // let result = invalid_data.validate();
405// // assert!(result.is_err());
406
407// // if let Err(errors) = result {
408// // let error_map = errors.field_errors();
409// // assert!(error_map.contains_key("name"));
410// // let name_errors = &error_map["name"];
411// // assert_eq!(name_errors[0].code, "invalid_characters");
412// // }
413// // }
414
415// #[test]
416// fn test_edge_cases() {
417// // Edge cases
418// let edge_inputs = vec![
419// ("-", true), // Single hyphen
420// ("_", true), // Single underscore
421// ("a-", true), // Letter with hyphen
422// ("-", false), // Repeated hyphens
423// ("-a", false), // Hyphen at the beginning
424// ("a-", false), // Hyphen at the end
425// ];
426
427// for (input, expected_validity) in edge_inputs {
428// let result = validate_allowed_chars(input);
429// if expected_validity {
430// assert!(result.is_ok(), "Expected '{}' to be valid", input);
431// } else {
432// assert!(result.is_err(), "Expected '{}' to be invalid", input);
433// }
434// }
435// }
436// }