1use std::fmt;
33
34use crate::{
35 config::HealthStatus,
36 distributed::RequestPlaneMode,
37 metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38 service::ServiceClient,
39 service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47 rustls::quic,
48 service::{Service, ServiceExt},
49};
50use dashmap::DashMap;
51use derive_builder::Builder;
52use derive_getters::Getters;
53use educe::Educe;
54use serde::{Deserialize, Serialize};
55use std::{collections::HashMap, hash::Hash, sync::Arc};
56use validator::{Validate, ValidationError};
57
58mod client;
59#[allow(clippy::module_inception)]
60mod component;
61mod endpoint;
62mod namespace;
63mod registry;
64pub mod service;
65
66pub use client::Client;
67pub use endpoint::build_transport_type;
68
69#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
70#[serde(rename_all = "snake_case")]
71pub enum TransportType {
72 #[serde(rename = "nats_tcp")]
73 Nats(String),
74 Http(String),
75 Tcp(String),
76}
77
78#[derive(Default)]
79pub struct RegistryInner {
80 pub(crate) services: HashMap<String, Service>,
81}
82
83#[derive(Clone)]
84pub struct Registry {
85 pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
89pub struct Instance {
90 pub component: String,
91 pub endpoint: String,
92 pub namespace: String,
93 pub instance_id: u64,
94 pub transport: TransportType,
95}
96
97impl Instance {
98 pub fn id(&self) -> u64 {
99 self.instance_id
100 }
101 pub fn endpoint_id(&self) -> EndpointId {
102 EndpointId {
103 namespace: self.namespace.clone(),
104 component: self.component.clone(),
105 name: self.endpoint.clone(),
106 }
107 }
108}
109
110impl fmt::Display for Instance {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 write!(
113 f,
114 "{}/{}/{}/{}",
115 self.namespace, self.component, self.endpoint, self.instance_id
116 )
117 }
118}
119
120impl std::cmp::Ord for Instance {
122 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
123 self.to_string().cmp(&other.to_string())
124 }
125}
126
127impl PartialOrd for Instance {
128 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
129 Some(self.cmp(other))
131 }
132}
133
134#[derive(Educe, Builder, Clone, Validate)]
140#[educe(Debug)]
141#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
142pub struct Component {
143 #[builder(private)]
144 #[educe(Debug(ignore))]
145 drt: Arc<DistributedRuntime>,
146
147 #[builder(setter(into))]
149 #[validate(custom(function = "validate_allowed_chars"))]
150 name: String,
151
152 #[builder(default = "Vec::new()")]
154 labels: Vec<(String, String)>,
155
156 #[builder(setter(into))]
159 namespace: Namespace,
160
161 #[builder(default = "crate::MetricsRegistry::new()")]
163 metrics_registry: crate::MetricsRegistry,
164}
165
166impl Hash for Component {
167 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
168 self.namespace.name().hash(state);
169 self.name.hash(state);
170 }
171}
172
173impl PartialEq for Component {
174 fn eq(&self, other: &Self) -> bool {
175 self.namespace.name() == other.namespace.name() && self.name == other.name
176 }
177}
178
179impl Eq for Component {}
180
181impl std::fmt::Display for Component {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 write!(f, "{}.{}", self.namespace.name(), self.name)
184 }
185}
186
187impl DistributedRuntimeProvider for Component {
188 fn drt(&self) -> &DistributedRuntime {
189 &self.drt
190 }
191}
192
193impl RuntimeProvider for Component {
194 fn rt(&self) -> &Runtime {
195 self.drt.rt()
196 }
197}
198
199impl MetricsHierarchy for Component {
200 fn basename(&self) -> String {
201 self.name.clone()
202 }
203
204 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
205 let mut parents = vec![];
206
207 parents.extend(self.namespace.parent_hierarchies());
209
210 parents.push(&self.namespace as &dyn MetricsHierarchy);
212
213 parents
214 }
215
216 fn get_metrics_registry(&self) -> &MetricsRegistry {
217 &self.metrics_registry
218 }
219}
220
221impl Component {
222 pub fn service_name(&self) -> String {
223 let service_name = format!("{}_{}", self.namespace.name(), self.name);
224 Slug::slugify(&service_name).to_string()
225 }
226
227 pub fn namespace(&self) -> &Namespace {
228 &self.namespace
229 }
230
231 pub fn name(&self) -> &str {
232 &self.name
233 }
234
235 pub fn labels(&self) -> &[(String, String)] {
236 &self.labels
237 }
238
239 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
240 let endpoint = Endpoint {
241 component: self.clone(),
242 name: endpoint.into(),
243 labels: Vec::new(),
244 metrics_registry: crate::MetricsRegistry::new(),
245 };
246 self.get_metrics_registry()
248 .add_child_registry(endpoint.get_metrics_registry());
249 endpoint
250 }
251
252 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
253 let discovery = self.drt.discovery();
254
255 let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
256 namespace: self.namespace.name(),
257 component: self.name.clone(),
258 };
259
260 let discovery_instances = discovery.list(discovery_query).await?;
261
262 let mut instances: Vec<Instance> = discovery_instances
264 .into_iter()
265 .filter_map(|di| match di {
266 crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
267 _ => None, })
269 .collect();
270
271 instances.sort();
272 Ok(instances)
273 }
274}
275
276impl ComponentBuilder {
277 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
278 Self::default().drt(drt)
279 }
280
281 pub fn build(self) -> Result<Component, anyhow::Error> {
282 let component = self.build_internal()?;
283 let drt = component.drt();
287 if drt.request_plane().is_nats() {
288 let mut rx = drt.register_nats_service(component.clone());
289 let result = tokio::task::block_in_place(|| rx.blocking_recv());
294 match result {
295 Some(Ok(())) => {
296 tracing::debug!(
297 component = component.service_name(),
298 "NATS service registration completed"
299 );
300 }
301 Some(Err(e)) => {
302 return Err(anyhow::anyhow!(
303 "NATS service registration failed for component '{}': {}",
304 component.service_name(),
305 e
306 ));
307 }
308 None => {
309 return Err(anyhow::anyhow!(
310 "NATS service registration channel closed unexpectedly for component '{}'",
311 component.service_name()
312 ));
313 }
314 }
315 }
316 Ok(component)
317 }
318}
319
320#[derive(Debug, Clone)]
321pub struct Endpoint {
322 component: Component,
323
324 name: String,
327
328 labels: Vec<(String, String)>,
330
331 metrics_registry: crate::MetricsRegistry,
333}
334
335impl Hash for Endpoint {
336 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
337 self.component.hash(state);
338 self.name.hash(state);
339 }
340}
341
342impl PartialEq for Endpoint {
343 fn eq(&self, other: &Self) -> bool {
344 self.component == other.component && self.name == other.name
345 }
346}
347
348impl Eq for Endpoint {}
349
350impl DistributedRuntimeProvider for Endpoint {
351 fn drt(&self) -> &DistributedRuntime {
352 self.component.drt()
353 }
354}
355
356impl RuntimeProvider for Endpoint {
357 fn rt(&self) -> &Runtime {
358 self.component.rt()
359 }
360}
361
362impl MetricsHierarchy for Endpoint {
363 fn basename(&self) -> String {
364 self.name.clone()
365 }
366
367 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
368 let mut parents = vec![];
369
370 parents.extend(self.component.parent_hierarchies());
372
373 parents.push(&self.component as &dyn MetricsHierarchy);
375
376 parents
377 }
378
379 fn get_metrics_registry(&self) -> &MetricsRegistry {
380 &self.metrics_registry
381 }
382}
383
384impl Endpoint {
385 pub fn id(&self) -> EndpointId {
386 EndpointId {
387 namespace: self.component.namespace().name().to_string(),
388 component: self.component.name().to_string(),
389 name: self.name().to_string(),
390 }
391 }
392
393 pub fn name(&self) -> &str {
394 &self.name
395 }
396
397 pub fn component(&self) -> &Component {
398 &self.component
399 }
400
401 pub async fn client(&self) -> anyhow::Result<client::Client> {
402 client::Client::new(self.clone()).await
403 }
404
405 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
406 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
407 }
408}
409
410#[derive(Builder, Clone, Validate)]
411#[builder(pattern = "owned")]
412pub struct Namespace {
413 #[builder(private)]
414 runtime: Arc<DistributedRuntime>,
415
416 #[validate(custom(function = "validate_allowed_chars"))]
417 name: String,
418
419 #[builder(default = "None")]
420 parent: Option<Arc<Namespace>>,
421
422 #[builder(default = "Vec::new()")]
424 labels: Vec<(String, String)>,
425
426 #[builder(default = "crate::MetricsRegistry::new()")]
428 metrics_registry: crate::MetricsRegistry,
429
430 #[builder(default = "Arc::new(DashMap::new())")]
435 component_cache: Arc<DashMap<String, Component>>,
436}
437
438impl DistributedRuntimeProvider for Namespace {
439 fn drt(&self) -> &DistributedRuntime {
440 &self.runtime
441 }
442}
443
444impl std::fmt::Debug for Namespace {
445 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446 write!(
447 f,
448 "Namespace {{ name: {}; parent: {:?} }}",
449 self.name, self.parent
450 )
451 }
452}
453
454impl RuntimeProvider for Namespace {
455 fn rt(&self) -> &Runtime {
456 self.runtime.rt()
457 }
458}
459
460impl std::fmt::Display for Namespace {
461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462 write!(f, "{}", self.name)
463 }
464}
465
466impl Namespace {
467 pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
468 let ns = NamespaceBuilder::default()
469 .runtime(Arc::new(runtime))
470 .name(name)
471 .build()?;
472 ns.drt()
474 .get_metrics_registry()
475 .add_child_registry(ns.get_metrics_registry());
476 Ok(ns)
477 }
478
479 pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
485 let name = name.into();
486
487 if let Some(cached) = self.component_cache.get(&name) {
490 return Ok(cached.value().clone());
491 }
492
493 let component = ComponentBuilder::from_runtime(self.runtime.clone())
495 .name(&name)
496 .namespace(self.clone())
497 .build()?;
498
499 self.get_metrics_registry()
501 .add_child_registry(component.get_metrics_registry());
502
503 self.component_cache.insert(name, component.clone());
507
508 Ok(component)
509 }
510
511 pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
513 let child = NamespaceBuilder::default()
514 .runtime(self.runtime.clone())
515 .name(name.into())
516 .parent(Some(Arc::new(self.clone())))
517 .build()?;
518 self.get_metrics_registry()
520 .add_child_registry(child.get_metrics_registry());
521 Ok(child)
522 }
523
524 pub fn name(&self) -> String {
525 match &self.parent {
526 Some(parent) => format!("{}.{}", parent.name(), self.name),
527 None => self.name.clone(),
528 }
529 }
530}
531
532fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
534 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
536
537 if regex.is_match(input) {
538 Ok(())
539 } else {
540 Err(ValidationError::new("invalid_characters"))
541 }
542}