1use std::fmt;
33
34use crate::{
35 config::HealthStatus,
36 distributed::RequestPlaneMode,
37 metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38 service::ServiceClient,
39 service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47 rustls::quic,
48 service::{Service, ServiceExt},
49};
50use dashmap::DashMap;
51use derive_builder::Builder;
52use derive_getters::Getters;
53use educe::Educe;
54use serde::{Deserialize, Serialize};
55use std::{collections::HashMap, hash::Hash, sync::Arc};
56use validator::{Validate, ValidationError};
57
58mod client;
59#[allow(clippy::module_inception)]
60mod component;
61mod endpoint;
62mod namespace;
63mod registry;
64pub mod service;
65
66pub use client::Client;
67pub(crate) use client::RoutingOccupancyState;
68pub(crate) use client::get_or_create_routing_occupancy_state;
69pub use endpoint::build_transport_type;
70
71#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
72#[serde(rename_all = "snake_case")]
73pub enum TransportType {
74 #[serde(rename = "nats_tcp")]
75 Nats(String),
76 Http(String),
77 Tcp(String),
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
81#[serde(rename_all = "snake_case")]
82pub enum DeviceType {
83 Cpu,
84 Cuda,
85}
86
87#[derive(Default)]
88pub struct RegistryInner {
89 pub(crate) services: HashMap<String, Service>,
90}
91
92#[derive(Clone)]
93pub struct Registry {
94 pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
98pub struct Instance {
99 pub component: String,
100 pub endpoint: String,
101 pub namespace: String,
102 pub instance_id: u64,
103 pub transport: TransportType,
104 #[serde(default, skip_serializing_if = "Option::is_none")]
105 pub device_type: Option<DeviceType>,
106}
107
108impl Instance {
109 pub fn id(&self) -> u64 {
110 self.instance_id
111 }
112 pub fn endpoint_id(&self) -> EndpointId {
113 EndpointId {
114 namespace: self.namespace.clone(),
115 component: self.component.clone(),
116 name: self.endpoint.clone(),
117 }
118 }
119}
120
121impl fmt::Display for Instance {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 write!(
124 f,
125 "{}/{}/{}/{}",
126 self.namespace, self.component, self.endpoint, self.instance_id
127 )
128 }
129}
130
131impl std::cmp::Ord for Instance {
133 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
134 self.to_string().cmp(&other.to_string())
135 }
136}
137
138impl PartialOrd for Instance {
139 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
140 Some(self.cmp(other))
142 }
143}
144
145#[derive(Educe, Builder, Clone, Validate)]
151#[educe(Debug)]
152#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
153pub struct Component {
154 #[builder(private)]
155 #[educe(Debug(ignore))]
156 drt: Arc<DistributedRuntime>,
157
158 #[builder(setter(into))]
160 #[validate(custom(function = "validate_allowed_chars"))]
161 name: String,
162
163 #[builder(default = "Vec::new()")]
165 labels: Vec<(String, String)>,
166
167 #[builder(setter(into))]
170 namespace: Namespace,
171
172 #[builder(default = "crate::MetricsRegistry::new()")]
174 metrics_registry: crate::MetricsRegistry,
175}
176
177impl Hash for Component {
178 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
179 self.namespace.name().hash(state);
180 self.name.hash(state);
181 }
182}
183
184impl PartialEq for Component {
185 fn eq(&self, other: &Self) -> bool {
186 self.namespace.name() == other.namespace.name() && self.name == other.name
187 }
188}
189
190impl Eq for Component {}
191
192impl std::fmt::Display for Component {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 write!(f, "{}.{}", self.namespace.name(), self.name)
195 }
196}
197
198impl DistributedRuntimeProvider for Component {
199 fn drt(&self) -> &DistributedRuntime {
200 &self.drt
201 }
202}
203
204impl RuntimeProvider for Component {
205 fn rt(&self) -> &Runtime {
206 self.drt.rt()
207 }
208}
209
210impl MetricsHierarchy for Component {
211 fn basename(&self) -> String {
212 self.name.clone()
213 }
214
215 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
216 let mut parents = vec![];
217
218 parents.extend(self.namespace.parent_hierarchies());
220
221 parents.push(&self.namespace as &dyn MetricsHierarchy);
223
224 parents
225 }
226
227 fn get_metrics_registry(&self) -> &MetricsRegistry {
228 &self.metrics_registry
229 }
230
231 fn connection_id(&self) -> Option<u64> {
232 Some(self.drt.connection_id())
233 }
234}
235
236impl Component {
237 pub fn service_name(&self) -> String {
238 let service_name = format!("{}_{}", self.namespace.name(), self.name);
239 Slug::slugify(&service_name).to_string()
240 }
241
242 pub fn namespace(&self) -> &Namespace {
243 &self.namespace
244 }
245
246 pub fn name(&self) -> &str {
247 &self.name
248 }
249
250 pub fn labels(&self) -> &[(String, String)] {
251 &self.labels
252 }
253
254 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
255 let endpoint = Endpoint {
256 component: self.clone(),
257 name: endpoint.into(),
258 labels: Vec::new(),
259 metrics_registry: crate::MetricsRegistry::new(),
260 };
261 self.get_metrics_registry()
263 .add_child_registry(endpoint.get_metrics_registry());
264 endpoint
265 }
266
267 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
268 let discovery = self.drt.discovery();
269
270 let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
271 namespace: self.namespace.name(),
272 component: self.name.clone(),
273 };
274
275 let discovery_instances = discovery.list(discovery_query).await?;
276
277 let mut instances: Vec<Instance> = discovery_instances
279 .into_iter()
280 .filter_map(|di| match di {
281 crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
282 _ => None, })
284 .collect();
285
286 instances.sort();
287 Ok(instances)
288 }
289}
290
291impl ComponentBuilder {
292 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
293 Self::default().drt(drt)
294 }
295
296 pub fn build(self) -> Result<Component, anyhow::Error> {
297 let component = self.build_internal()?;
298 let drt = component.drt();
302 if drt.request_plane().is_nats() {
303 let mut rx = drt.register_nats_service(component.clone());
304 let result = tokio::task::block_in_place(|| rx.blocking_recv());
309 match result {
310 Some(Ok(())) => {
311 tracing::debug!(
312 component = component.service_name(),
313 "NATS service registration completed"
314 );
315 }
316 Some(Err(e)) => {
317 return Err(anyhow::anyhow!(
318 "NATS service registration failed for component '{}': {}",
319 component.service_name(),
320 e
321 ));
322 }
323 None => {
324 return Err(anyhow::anyhow!(
325 "NATS service registration channel closed unexpectedly for component '{}'",
326 component.service_name()
327 ));
328 }
329 }
330 }
331 Ok(component)
332 }
333}
334
335#[derive(Debug, Clone)]
336pub struct Endpoint {
337 component: Component,
338
339 name: String,
342
343 labels: Vec<(String, String)>,
345
346 metrics_registry: crate::MetricsRegistry,
348}
349
350impl Hash for Endpoint {
351 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
352 self.component.hash(state);
353 self.name.hash(state);
354 }
355}
356
357impl PartialEq for Endpoint {
358 fn eq(&self, other: &Self) -> bool {
359 self.component == other.component && self.name == other.name
360 }
361}
362
363impl Eq for Endpoint {}
364
365impl DistributedRuntimeProvider for Endpoint {
366 fn drt(&self) -> &DistributedRuntime {
367 self.component.drt()
368 }
369}
370
371impl RuntimeProvider for Endpoint {
372 fn rt(&self) -> &Runtime {
373 self.component.rt()
374 }
375}
376
377impl MetricsHierarchy for Endpoint {
378 fn basename(&self) -> String {
379 self.name.clone()
380 }
381
382 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
383 let mut parents = vec![];
384
385 parents.extend(self.component.parent_hierarchies());
387
388 parents.push(&self.component as &dyn MetricsHierarchy);
390
391 parents
392 }
393
394 fn get_metrics_registry(&self) -> &MetricsRegistry {
395 &self.metrics_registry
396 }
397
398 fn connection_id(&self) -> Option<u64> {
399 Some(self.component.drt().connection_id())
400 }
401}
402
403impl Endpoint {
404 pub fn id(&self) -> EndpointId {
405 EndpointId {
406 namespace: self.component.namespace().name().to_string(),
407 component: self.component.name().to_string(),
408 name: self.name().to_string(),
409 }
410 }
411
412 pub fn name(&self) -> &str {
413 &self.name
414 }
415
416 pub fn component(&self) -> &Component {
417 &self.component
418 }
419
420 pub async fn client(&self) -> anyhow::Result<client::Client> {
421 client::Client::new(self.clone()).await
422 }
423
424 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
425 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
426 }
427}
428
429#[derive(Builder, Clone, Validate)]
430#[builder(pattern = "owned")]
431pub struct Namespace {
432 #[builder(private)]
433 runtime: Arc<DistributedRuntime>,
434
435 #[validate(custom(function = "validate_allowed_chars"))]
436 name: String,
437
438 #[builder(default = "None")]
439 parent: Option<Arc<Namespace>>,
440
441 #[builder(default = "Vec::new()")]
443 labels: Vec<(String, String)>,
444
445 #[builder(default = "crate::MetricsRegistry::new()")]
447 metrics_registry: crate::MetricsRegistry,
448
449 #[builder(default = "Arc::new(DashMap::new())")]
454 component_cache: Arc<DashMap<String, Component>>,
455}
456
457impl DistributedRuntimeProvider for Namespace {
458 fn drt(&self) -> &DistributedRuntime {
459 &self.runtime
460 }
461}
462
463impl std::fmt::Debug for Namespace {
464 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465 write!(
466 f,
467 "Namespace {{ name: {}; parent: {:?} }}",
468 self.name, self.parent
469 )
470 }
471}
472
473impl RuntimeProvider for Namespace {
474 fn rt(&self) -> &Runtime {
475 self.runtime.rt()
476 }
477}
478
479impl std::fmt::Display for Namespace {
480 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481 write!(f, "{}", self.name)
482 }
483}
484
485impl Namespace {
486 pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
487 let ns = NamespaceBuilder::default()
488 .runtime(Arc::new(runtime))
489 .name(name)
490 .build()?;
491 ns.drt()
493 .get_metrics_registry()
494 .add_child_registry(ns.get_metrics_registry());
495 Ok(ns)
496 }
497
498 pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
504 let name = name.into();
505
506 if let Some(cached) = self.component_cache.get(&name) {
509 return Ok(cached.value().clone());
510 }
511
512 let component = ComponentBuilder::from_runtime(self.runtime.clone())
514 .name(&name)
515 .namespace(self.clone())
516 .build()?;
517
518 self.get_metrics_registry()
520 .add_child_registry(component.get_metrics_registry());
521
522 self.component_cache.insert(name, component.clone());
526
527 Ok(component)
528 }
529
530 pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
532 let child = NamespaceBuilder::default()
533 .runtime(self.runtime.clone())
534 .name(name.into())
535 .parent(Some(Arc::new(self.clone())))
536 .build()?;
537 self.get_metrics_registry()
539 .add_child_registry(child.get_metrics_registry());
540 Ok(child)
541 }
542
543 pub fn name(&self) -> String {
544 match &self.parent {
545 Some(parent) => format!("{}.{}", parent.name(), self.name),
546 None => self.name.clone(),
547 }
548 }
549}
550
551fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
553 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
555
556 if regex.is_match(input) {
557 Ok(())
558 } else {
559 Err(ValidationError::new("invalid_characters"))
560 }
561}