triton_distributed/
component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! The [Component] module defines the top-level API for building distributed applications.
17//!
18//! A distributed application consists of a set of [Component][Component] that can host one
19//! or more [Endpoint][Endpoint]. Each [Endpoint][Endpoint] is a network-accessible service
20//! that can be accessed by other [Component][Component] in the distributed application.
21//!
22//! A [Component] is made discoverable by registering it with the distributed runtime under
23//! a [`Namespace`].
24//!
25//! A [`Namespace`] is a logical grouping of [Component][Component] that are grouped together.
26//!
27//! We might extend namespace to include grouping behavior, which would define groups of
28//! components that are tightly coupled.
29//!
30//! A [Component] is the core building block of a distributed application. It is a logical
31//! unit of work such as a `Preprocessor` or `SmartRouter` that has a well-defined role in the
32//! distributed application.
33//!
34//! A [Component] can present to the distributed application one or more configuration files
35//! which define how that component was constructed/configured and what capabilities it can
36//! provide.
37//!
38//! Other [Component][Component] can write to watching locations within a [Component] etcd
39//! path. This allows the [Component] to take dynamic actions depending on the watch
40//! triggers.
41//!
42//! TODO: Top-level Overview of Endpoints/Functions
43
44use crate::discovery::Lease;
45
46use super::{error, log, transports::nats::Slug, DistributedRuntime, Result};
47
48use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
49use async_nats::{
50    rustls::quic,
51    service::{Service, ServiceExt},
52};
53use derive_builder::Builder;
54use derive_getters::Getters;
55use educe::Educe;
56use serde::{Deserialize, Serialize};
57use std::{collections::HashMap, sync::Arc};
58use validator::{Validate, ValidationError};
59
60mod client;
61mod endpoint;
62mod registry;
63mod service;
64
65pub use client::Client;
66
67#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
68#[serde(rename_all = "snake_case")]
69pub enum TransportType {
70    NatsTcp(String),
71}
72
73#[derive(Clone)]
74pub struct Registry {
75    services: Arc<tokio::sync::Mutex<HashMap<String, Service>>>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct ComponentEndpointInfo {
80    pub component: String,
81    pub endpoint: String,
82    pub namespace: String,
83    pub lease_id: i64,
84    pub transport: TransportType,
85}
86
87/// A [Component] a discoverable entity in the distributed runtime.
88/// You can host [Endpoint][Endpoint] on a [Component] by first creating
89/// a [Service] then adding one or more [Endpoint][Endpoint] to the [Service].
90///
91/// You can also issue a request to a [Component]'s [Endpoint] by creating a [Client].
92#[derive(Educe, Builder, Clone)]
93#[educe(Debug)]
94#[builder(pattern = "owned")]
95pub struct Component {
96    #[builder(private)]
97    #[educe(Debug(ignore))]
98    drt: DistributedRuntime,
99
100    // todo - restrict the namespace to a-z0-9-_A-Z
101    /// Name of the component
102    #[builder(setter(into))]
103    name: String,
104
105    // todo - restrict the namespace to a-z0-9-_A-Z
106    /// Namespace
107    #[builder(setter(into))]
108    namespace: String,
109}
110
111impl Component {
112    pub fn etcd_path(&self) -> String {
113        format!("{}/components/{}", self.namespace, self.name)
114    }
115
116    fn slug(&self) -> Slug {
117        Slug::from_string(self.etcd_path())
118    }
119
120    pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
121        Endpoint {
122            component: self.clone(),
123            name: endpoint.into(),
124        }
125    }
126
127    /// Get keys from etcd on the slug, splitting the endpoints and only returning the
128    /// set of unique endpoints.
129    pub async fn list_endpoints(&self) -> Vec<Endpoint> {
130        unimplemented!("endpoints")
131    }
132
133    /// This method will scrape the stats for all available services
134    /// Returns a stream of [`ServiceInfo`] objects.
135    /// This should be consumed by a `[tokio::time::timeout_at`] because each services
136    /// will only respond once, but there is no way to know when all services have responded.
137    pub async fn stats_stream(&self) -> Result<()> {
138        unimplemented!("collect_stats")
139    }
140
141    pub fn service_builder(&self) -> service::ServiceConfigBuilder {
142        service::ServiceConfigBuilder::from_component(self.clone())
143    }
144}
145
146impl ComponentBuilder {
147    pub fn from_runtime(drt: DistributedRuntime) -> Self {
148        Self::default().drt(drt)
149    }
150}
151
152#[derive(Debug, Clone)]
153pub struct Endpoint {
154    component: Component,
155
156    // todo - restrict alphabet
157    /// Endpoint name
158    name: String,
159}
160
161impl Endpoint {
162    pub fn name(&self) -> &str {
163        &self.name
164    }
165
166    pub fn etcd_path(&self) -> String {
167        format!("{}/{}", self.component.etcd_path(), self.name)
168    }
169
170    pub fn etcd_path_with_id(&self, lease_id: i64) -> String {
171        format!("{}:{:x}", self.etcd_path(), lease_id)
172    }
173
174    pub fn name_with_id(&self, lease_id: i64) -> String {
175        format!("{}-{:x}", self.name, lease_id)
176    }
177
178    pub fn subject(&self, lease_id: i64) -> String {
179        format!("{}.{}", self.component.slug(), self.name_with_id(lease_id))
180    }
181
182    pub async fn client<Req, Resp>(&self) -> Result<client::Client<Req, Resp>>
183    where
184        Req: Serialize + Send + Sync + 'static,
185        Resp: for<'de> Deserialize<'de> + Send + Sync + 'static,
186    {
187        client::Client::new(self.clone()).await
188    }
189
190    pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
191        endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
192    }
193}
194
195#[derive(Educe, Builder, Clone, Validate)]
196#[educe(Debug)]
197#[builder(pattern = "owned")]
198pub struct Namespace {
199    #[builder(private)]
200    #[educe(Debug(ignore))]
201    runtime: DistributedRuntime,
202
203    #[validate()]
204    name: String,
205}
206
207impl Namespace {
208    pub(crate) fn new(runtime: DistributedRuntime, name: String) -> Result<Self> {
209        Ok(NamespaceBuilder::default()
210            .runtime(runtime)
211            .name(name)
212            .build()?)
213    }
214
215    /// Create a [`Component`] in the namespace
216    pub fn component(&self, name: impl Into<String>) -> Result<Component> {
217        Ok(ComponentBuilder::from_runtime(self.runtime.clone())
218            .name(name)
219            .namespace(self.name.clone())
220            .build()?)
221    }
222}
223
224// Custom validator function
225fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
226    // Define the allowed character set using a regex
227    let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
228
229    if regex.is_match(input) {
230        Ok(())
231    } else {
232        Err(ValidationError::new("invalid_characters"))
233    }
234}
235
236// TODO - enable restrictions to the character sets allowed for namespaces,
237// components, and endpoints.
238//
239// Put Validate traits on the struct and use the `validate_allowed_chars` method
240// to validate the fields.
241
242// #[cfg(test)]
243// mod tests {
244//     use super::*;
245//     use validator::Validate;
246
247//     #[test]
248//     fn test_valid_names() {
249//         // Valid strings
250//         let valid_inputs = vec![
251//             "abc",        // Lowercase letters
252//             "abc123",     // Letters and numbers
253//             "a-b-c",      // Letters with hyphens
254//             "a_b_c",      // Letters with underscores
255//             "a-b_c-123",  // Mixed valid characters
256//             "a",          // Single character
257//             "a_b",        // Short valid pattern
258//             "123456",     // Only numbers
259//             "a---b_c123", // Repeated hyphens/underscores
260//         ];
261
262//         for input in valid_inputs {
263//             let result = validate_allowed_chars(input);
264//             assert!(result.is_ok(), "Expected '{}' to be valid", input);
265//         }
266//     }
267
268//     #[test]
269//     fn test_invalid_names() {
270//         // Invalid strings
271//         let invalid_inputs = vec![
272//             "abc!",     // Invalid character `!`
273//             "abc@",     // Invalid character `@`
274//             "123$",     // Invalid character `$`
275//             "foo.bar",  // Invalid character `.`
276//             "foo/bar",  // Invalid character `/`
277//             "foo\\bar", // Invalid character `\`
278//             "abc#",     // Invalid character `#`
279//             "abc def",  // Spaces are not allowed
280//             "foo,",     // Invalid character `,`
281//             "",         // Empty string
282//         ];
283
284//         for input in invalid_inputs {
285//             let result = validate_allowed_chars(input);
286//             assert!(result.is_err(), "Expected '{}' to be invalid", input);
287//         }
288//     }
289
290//     // #[test]
291//     // fn test_struct_validation_valid() {
292//     //     // Struct with valid data
293//     //     let valid_data = InputData {
294//     //         name: "valid-name_123".to_string(),
295//     //     };
296//     //     assert!(valid_data.validate().is_ok());
297//     // }
298
299//     // #[test]
300//     // fn test_struct_validation_invalid() {
301//     //     // Struct with invalid data
302//     //     let invalid_data = InputData {
303//     //         name: "invalid!name".to_string(),
304//     //     };
305//     //     let result = invalid_data.validate();
306//     //     assert!(result.is_err());
307
308//     //     if let Err(errors) = result {
309//     //         let error_map = errors.field_errors();
310//     //         assert!(error_map.contains_key("name"));
311//     //         let name_errors = &error_map["name"];
312//     //         assert_eq!(name_errors[0].code, "invalid_characters");
313//     //     }
314//     // }
315
316//     #[test]
317//     fn test_edge_cases() {
318//         // Edge cases
319//         let edge_inputs = vec![
320//             ("-", true),   // Single hyphen
321//             ("_", true),   // Single underscore
322//             ("a-", true),  // Letter with hyphen
323//             ("-", false),  // Repeated hyphens
324//             ("-a", false), // Hyphen at the beginning
325//             ("a-", false), // Hyphen at the end
326//         ];
327
328//         for (input, expected_validity) in edge_inputs {
329//             let result = validate_allowed_chars(input);
330//             if expected_validity {
331//                 assert!(result.is_ok(), "Expected '{}' to be valid", input);
332//             } else {
333//                 assert!(result.is_err(), "Expected '{}' to be invalid", input);
334//             }
335//         }
336//     }
337// }