1use std::fmt;
33
34use crate::{
35 config::HealthStatus,
36 distributed::RequestPlaneMode,
37 metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38 service::ServiceClient,
39 service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47 rustls::quic,
48 service::{Service, ServiceExt},
49};
50use dashmap::DashMap;
51use derive_builder::Builder;
52use derive_getters::Getters;
53use educe::Educe;
54use serde::{Deserialize, Serialize};
55use std::{collections::HashMap, hash::Hash, sync::Arc};
56use validator::{Validate, ValidationError};
57
58mod client;
59#[allow(clippy::module_inception)]
60mod component;
61mod endpoint;
62mod namespace;
63mod registry;
64pub mod service;
65
66pub use client::Client;
67pub(crate) use client::EndpointDiscoverySource;
68pub(crate) use client::RoutingOccupancyState;
69pub(crate) use client::get_or_create_routing_occupancy_state;
70pub use endpoint::build_transport_type;
71
72#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
73#[serde(rename_all = "snake_case")]
74pub enum TransportType {
75 #[serde(rename = "nats_tcp")]
76 Nats(String),
77 Http(String),
78 Tcp(String),
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
82#[serde(rename_all = "snake_case")]
83pub enum DeviceType {
84 Cpu,
85 Cuda,
86}
87
88#[derive(Default)]
89pub struct RegistryInner {
90 pub(crate) services: HashMap<String, Service>,
91}
92
93#[derive(Clone)]
94pub struct Registry {
95 pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
99pub struct Instance {
100 pub component: String,
101 pub endpoint: String,
102 pub namespace: String,
103 pub instance_id: u64,
104 pub transport: TransportType,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub device_type: Option<DeviceType>,
107}
108
109impl Instance {
110 pub fn id(&self) -> u64 {
111 self.instance_id
112 }
113
114 pub fn endpoint_id(&self) -> EndpointId {
115 EndpointId {
116 namespace: self.namespace.clone(),
117 component: self.component.clone(),
118 name: self.endpoint.clone(),
119 }
120 }
121
122 pub fn endpoint_instance_id(&self) -> crate::discovery::EndpointInstanceId {
123 crate::discovery::EndpointInstanceId {
124 namespace: self.namespace.clone(),
125 component: self.component.clone(),
126 endpoint: self.endpoint.clone(),
127 instance_id: self.instance_id,
128 }
129 }
130}
131
132impl fmt::Display for Instance {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(
135 f,
136 "{}/{}/{}/{}",
137 self.namespace, self.component, self.endpoint, self.instance_id
138 )
139 }
140}
141
142impl std::cmp::Ord for Instance {
144 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
145 self.to_string().cmp(&other.to_string())
146 }
147}
148
149impl PartialOrd for Instance {
150 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
151 Some(self.cmp(other))
153 }
154}
155
156#[derive(Educe, Builder, Clone, Validate)]
162#[educe(Debug)]
163#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
164pub struct Component {
165 #[builder(private)]
166 #[educe(Debug(ignore))]
167 drt: Arc<DistributedRuntime>,
168
169 #[builder(setter(into))]
171 #[validate(custom(function = "validate_allowed_chars"))]
172 name: String,
173
174 #[builder(default = "Vec::new()")]
176 labels: Vec<(String, String)>,
177
178 #[builder(setter(into))]
181 namespace: Namespace,
182
183 #[builder(default = "crate::MetricsRegistry::new()")]
185 metrics_registry: crate::MetricsRegistry,
186}
187
188impl Hash for Component {
189 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
190 self.namespace.name().hash(state);
191 self.name.hash(state);
192 }
193}
194
195impl PartialEq for Component {
196 fn eq(&self, other: &Self) -> bool {
197 self.namespace.name() == other.namespace.name() && self.name == other.name
198 }
199}
200
201impl Eq for Component {}
202
203impl std::fmt::Display for Component {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 write!(f, "{}.{}", self.namespace.name(), self.name)
206 }
207}
208
209impl DistributedRuntimeProvider for Component {
210 fn drt(&self) -> &DistributedRuntime {
211 &self.drt
212 }
213}
214
215impl RuntimeProvider for Component {
216 fn rt(&self) -> &Runtime {
217 self.drt.rt()
218 }
219}
220
221impl MetricsHierarchy for Component {
222 fn basename(&self) -> String {
223 self.name.clone()
224 }
225
226 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
227 let mut parents = vec![];
228
229 parents.extend(self.namespace.parent_hierarchies());
231
232 parents.push(&self.namespace as &dyn MetricsHierarchy);
234
235 parents
236 }
237
238 fn get_metrics_registry(&self) -> &MetricsRegistry {
239 &self.metrics_registry
240 }
241
242 fn connection_id(&self) -> Option<u64> {
243 Some(self.drt.connection_id())
244 }
245}
246
247impl Component {
248 pub fn service_name(&self) -> String {
249 let service_name = format!("{}_{}", self.namespace.name(), self.name);
250 Slug::slugify(&service_name).to_string()
251 }
252
253 pub fn namespace(&self) -> &Namespace {
254 &self.namespace
255 }
256
257 pub fn name(&self) -> &str {
258 &self.name
259 }
260
261 pub fn labels(&self) -> &[(String, String)] {
262 &self.labels
263 }
264
265 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
266 let endpoint = Endpoint {
267 component: self.clone(),
268 name: endpoint.into(),
269 labels: Vec::new(),
270 metrics_registry: crate::MetricsRegistry::new(),
271 };
272 self.get_metrics_registry()
274 .add_child_registry(endpoint.get_metrics_registry());
275 endpoint
276 }
277
278 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
279 let discovery = self.drt.discovery();
280
281 let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
282 namespace: self.namespace.name(),
283 component: self.name.clone(),
284 };
285
286 let discovery_instances = discovery.list(discovery_query).await?;
287
288 let mut instances: Vec<Instance> = discovery_instances
290 .into_iter()
291 .filter_map(|di| match di {
292 crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
293 _ => None, })
295 .collect();
296
297 instances.sort();
298 Ok(instances)
299 }
300}
301
302impl ComponentBuilder {
303 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
304 Self::default().drt(drt)
305 }
306
307 pub fn build(self) -> Result<Component, anyhow::Error> {
308 let component = self.build_internal()?;
309 let drt = component.drt();
313 if drt.request_plane().is_nats() {
314 let mut rx = drt.register_nats_service(component.clone());
315 let result = tokio::task::block_in_place(|| rx.blocking_recv());
320 match result {
321 Some(Ok(())) => {
322 tracing::debug!(
323 component = component.service_name(),
324 "NATS service registration completed"
325 );
326 }
327 Some(Err(e)) => {
328 return Err(anyhow::anyhow!(
329 "NATS service registration failed for component '{}': {}",
330 component.service_name(),
331 e
332 ));
333 }
334 None => {
335 return Err(anyhow::anyhow!(
336 "NATS service registration channel closed unexpectedly for component '{}'",
337 component.service_name()
338 ));
339 }
340 }
341 }
342 Ok(component)
343 }
344}
345
346#[derive(Debug, Clone)]
347pub struct Endpoint {
348 component: Component,
349
350 name: String,
353
354 labels: Vec<(String, String)>,
356
357 metrics_registry: crate::MetricsRegistry,
359}
360
361impl Hash for Endpoint {
362 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
363 self.component.hash(state);
364 self.name.hash(state);
365 }
366}
367
368impl PartialEq for Endpoint {
369 fn eq(&self, other: &Self) -> bool {
370 self.component == other.component && self.name == other.name
371 }
372}
373
374impl Eq for Endpoint {}
375
376impl DistributedRuntimeProvider for Endpoint {
377 fn drt(&self) -> &DistributedRuntime {
378 self.component.drt()
379 }
380}
381
382impl RuntimeProvider for Endpoint {
383 fn rt(&self) -> &Runtime {
384 self.component.rt()
385 }
386}
387
388impl MetricsHierarchy for Endpoint {
389 fn basename(&self) -> String {
390 self.name.clone()
391 }
392
393 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
394 let mut parents = vec![];
395
396 parents.extend(self.component.parent_hierarchies());
398
399 parents.push(&self.component as &dyn MetricsHierarchy);
401
402 parents
403 }
404
405 fn get_metrics_registry(&self) -> &MetricsRegistry {
406 &self.metrics_registry
407 }
408
409 fn connection_id(&self) -> Option<u64> {
410 Some(self.component.drt().connection_id())
411 }
412}
413
414impl Endpoint {
415 pub fn id(&self) -> EndpointId {
416 EndpointId {
417 namespace: self.component.namespace().name().to_string(),
418 component: self.component.name().to_string(),
419 name: self.name().to_string(),
420 }
421 }
422
423 pub fn name(&self) -> &str {
424 &self.name
425 }
426
427 pub fn component(&self) -> &Component {
428 &self.component
429 }
430
431 pub async fn client(&self) -> anyhow::Result<client::Client> {
432 client::Client::new(self.clone()).await
433 }
434
435 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
436 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
437 }
438}
439
440#[derive(Builder, Clone, Validate)]
441#[builder(pattern = "owned")]
442pub struct Namespace {
443 #[builder(private)]
444 runtime: Arc<DistributedRuntime>,
445
446 #[validate(custom(function = "validate_allowed_chars"))]
447 name: String,
448
449 #[builder(default = "None")]
450 parent: Option<Arc<Namespace>>,
451
452 #[builder(default = "Vec::new()")]
454 labels: Vec<(String, String)>,
455
456 #[builder(default = "crate::MetricsRegistry::new()")]
458 metrics_registry: crate::MetricsRegistry,
459
460 #[builder(default = "Arc::new(DashMap::new())")]
465 component_cache: Arc<DashMap<String, Component>>,
466}
467
468impl DistributedRuntimeProvider for Namespace {
469 fn drt(&self) -> &DistributedRuntime {
470 &self.runtime
471 }
472}
473
474impl std::fmt::Debug for Namespace {
475 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476 write!(
477 f,
478 "Namespace {{ name: {}; parent: {:?} }}",
479 self.name, self.parent
480 )
481 }
482}
483
484impl RuntimeProvider for Namespace {
485 fn rt(&self) -> &Runtime {
486 self.runtime.rt()
487 }
488}
489
490impl std::fmt::Display for Namespace {
491 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492 write!(f, "{}", self.name)
493 }
494}
495
496impl Namespace {
497 pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
498 let ns = NamespaceBuilder::default()
499 .runtime(Arc::new(runtime))
500 .name(name)
501 .build()?;
502 ns.drt()
504 .get_metrics_registry()
505 .add_child_registry(ns.get_metrics_registry());
506 Ok(ns)
507 }
508
509 pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
515 let name = name.into();
516
517 if let Some(cached) = self.component_cache.get(&name) {
520 return Ok(cached.value().clone());
521 }
522
523 let component = ComponentBuilder::from_runtime(self.runtime.clone())
525 .name(&name)
526 .namespace(self.clone())
527 .build()?;
528
529 self.get_metrics_registry()
531 .add_child_registry(component.get_metrics_registry());
532
533 self.component_cache.insert(name, component.clone());
537
538 Ok(component)
539 }
540
541 pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
543 let child = NamespaceBuilder::default()
544 .runtime(self.runtime.clone())
545 .name(name.into())
546 .parent(Some(Arc::new(self.clone())))
547 .build()?;
548 self.get_metrics_registry()
550 .add_child_registry(child.get_metrics_registry());
551 Ok(child)
552 }
553
554 pub fn name(&self) -> String {
555 match &self.parent {
556 Some(parent) => format!("{}.{}", parent.name(), self.name),
557 None => self.name.clone(),
558 }
559 }
560}
561
562fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
564 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
566
567 if regex.is_match(input) {
568 Ok(())
569 } else {
570 Err(ValidationError::new("invalid_characters"))
571 }
572}