1use std::fmt;
33
34use crate::{
35 config::HealthStatus,
36 distributed::RequestPlaneMode,
37 metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38 service::ServiceClient,
39 service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47 rustls::quic,
48 service::{Service, ServiceExt},
49};
50use derive_builder::Builder;
51use derive_getters::Getters;
52use educe::Educe;
53use serde::{Deserialize, Serialize};
54use std::{collections::HashMap, hash::Hash, sync::Arc};
55use validator::{Validate, ValidationError};
56
57mod client;
58#[allow(clippy::module_inception)]
59mod component;
60mod endpoint;
61mod namespace;
62mod registry;
63pub mod service;
64
65pub use client::Client;
66pub use endpoint::build_transport_type;
67
68#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
69#[serde(rename_all = "snake_case")]
70pub enum TransportType {
71 #[serde(rename = "nats_tcp")]
72 Nats(String),
73 Http(String),
74 Tcp(String),
75}
76
77#[derive(Default)]
78pub struct RegistryInner {
79 pub(crate) services: HashMap<String, Service>,
80}
81
82#[derive(Clone)]
83pub struct Registry {
84 pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
88pub struct Instance {
89 pub component: String,
90 pub endpoint: String,
91 pub namespace: String,
92 pub instance_id: u64,
93 pub transport: TransportType,
94}
95
96impl Instance {
97 pub fn id(&self) -> u64 {
98 self.instance_id
99 }
100 pub fn endpoint_id(&self) -> EndpointId {
101 EndpointId {
102 namespace: self.namespace.clone(),
103 component: self.component.clone(),
104 name: self.endpoint.clone(),
105 }
106 }
107}
108
109impl fmt::Display for Instance {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 write!(
112 f,
113 "{}/{}/{}/{}",
114 self.namespace, self.component, self.endpoint, self.instance_id
115 )
116 }
117}
118
119impl std::cmp::Ord for Instance {
121 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
122 self.to_string().cmp(&other.to_string())
123 }
124}
125
126impl PartialOrd for Instance {
127 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
128 Some(self.cmp(other))
130 }
131}
132
133#[derive(Educe, Builder, Clone, Validate)]
139#[educe(Debug)]
140#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
141pub struct Component {
142 #[builder(private)]
143 #[educe(Debug(ignore))]
144 drt: Arc<DistributedRuntime>,
145
146 #[builder(setter(into))]
148 #[validate(custom(function = "validate_allowed_chars"))]
149 name: String,
150
151 #[builder(default = "Vec::new()")]
153 labels: Vec<(String, String)>,
154
155 #[builder(setter(into))]
158 namespace: Namespace,
159
160 #[builder(default = "crate::MetricsRegistry::new()")]
162 metrics_registry: crate::MetricsRegistry,
163}
164
165impl Hash for Component {
166 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
167 self.namespace.name().hash(state);
168 self.name.hash(state);
169 }
170}
171
172impl PartialEq for Component {
173 fn eq(&self, other: &Self) -> bool {
174 self.namespace.name() == other.namespace.name() && self.name == other.name
175 }
176}
177
178impl Eq for Component {}
179
180impl std::fmt::Display for Component {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 write!(f, "{}.{}", self.namespace.name(), self.name)
183 }
184}
185
186impl DistributedRuntimeProvider for Component {
187 fn drt(&self) -> &DistributedRuntime {
188 &self.drt
189 }
190}
191
192impl RuntimeProvider for Component {
193 fn rt(&self) -> &Runtime {
194 self.drt.rt()
195 }
196}
197
198impl MetricsHierarchy for Component {
199 fn basename(&self) -> String {
200 self.name.clone()
201 }
202
203 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
204 let mut parents = vec![];
205
206 parents.extend(self.namespace.parent_hierarchies());
208
209 parents.push(&self.namespace as &dyn MetricsHierarchy);
211
212 parents
213 }
214
215 fn get_metrics_registry(&self) -> &MetricsRegistry {
216 &self.metrics_registry
217 }
218}
219
220impl Component {
221 pub fn service_name(&self) -> String {
222 let service_name = format!("{}_{}", self.namespace.name(), self.name);
223 Slug::slugify(&service_name).to_string()
224 }
225
226 pub fn namespace(&self) -> &Namespace {
227 &self.namespace
228 }
229
230 pub fn name(&self) -> &str {
231 &self.name
232 }
233
234 pub fn labels(&self) -> &[(String, String)] {
235 &self.labels
236 }
237
238 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
239 let endpoint = Endpoint {
240 component: self.clone(),
241 name: endpoint.into(),
242 labels: Vec::new(),
243 metrics_registry: crate::MetricsRegistry::new(),
244 };
245 self.get_metrics_registry()
247 .add_child_registry(endpoint.get_metrics_registry());
248 endpoint
249 }
250
251 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
252 let discovery = self.drt.discovery();
253
254 let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
255 namespace: self.namespace.name(),
256 component: self.name.clone(),
257 };
258
259 let discovery_instances = discovery.list(discovery_query).await?;
260
261 let mut instances: Vec<Instance> = discovery_instances
263 .into_iter()
264 .filter_map(|di| match di {
265 crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
266 _ => None, })
268 .collect();
269
270 instances.sort();
271 Ok(instances)
272 }
273}
274
275impl ComponentBuilder {
276 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
277 Self::default().drt(drt)
278 }
279
280 pub fn build(self) -> Result<Component, anyhow::Error> {
281 let component = self.build_internal()?;
282 let drt = component.drt();
286 if drt.request_plane().is_nats() {
287 let mut rx = drt.register_nats_service(component.clone());
288 let result = tokio::task::block_in_place(|| rx.blocking_recv());
293 match result {
294 Some(Ok(())) => {
295 tracing::debug!(
296 component = component.service_name(),
297 "NATS service registration completed"
298 );
299 }
300 Some(Err(e)) => {
301 return Err(anyhow::anyhow!(
302 "NATS service registration failed for component '{}': {}",
303 component.service_name(),
304 e
305 ));
306 }
307 None => {
308 return Err(anyhow::anyhow!(
309 "NATS service registration channel closed unexpectedly for component '{}'",
310 component.service_name()
311 ));
312 }
313 }
314 }
315 Ok(component)
316 }
317}
318
319#[derive(Debug, Clone)]
320pub struct Endpoint {
321 component: Component,
322
323 name: String,
326
327 labels: Vec<(String, String)>,
329
330 metrics_registry: crate::MetricsRegistry,
332}
333
334impl Hash for Endpoint {
335 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
336 self.component.hash(state);
337 self.name.hash(state);
338 }
339}
340
341impl PartialEq for Endpoint {
342 fn eq(&self, other: &Self) -> bool {
343 self.component == other.component && self.name == other.name
344 }
345}
346
347impl Eq for Endpoint {}
348
349impl DistributedRuntimeProvider for Endpoint {
350 fn drt(&self) -> &DistributedRuntime {
351 self.component.drt()
352 }
353}
354
355impl RuntimeProvider for Endpoint {
356 fn rt(&self) -> &Runtime {
357 self.component.rt()
358 }
359}
360
361impl MetricsHierarchy for Endpoint {
362 fn basename(&self) -> String {
363 self.name.clone()
364 }
365
366 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
367 let mut parents = vec![];
368
369 parents.extend(self.component.parent_hierarchies());
371
372 parents.push(&self.component as &dyn MetricsHierarchy);
374
375 parents
376 }
377
378 fn get_metrics_registry(&self) -> &MetricsRegistry {
379 &self.metrics_registry
380 }
381}
382
383impl Endpoint {
384 pub fn id(&self) -> EndpointId {
385 EndpointId {
386 namespace: self.component.namespace().name().to_string(),
387 component: self.component.name().to_string(),
388 name: self.name().to_string(),
389 }
390 }
391
392 pub fn name(&self) -> &str {
393 &self.name
394 }
395
396 pub fn component(&self) -> &Component {
397 &self.component
398 }
399
400 pub async fn client(&self) -> anyhow::Result<client::Client> {
401 client::Client::new(self.clone()).await
402 }
403
404 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
405 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
406 }
407}
408
409#[derive(Builder, Clone, Validate)]
410#[builder(pattern = "owned")]
411pub struct Namespace {
412 #[builder(private)]
413 runtime: Arc<DistributedRuntime>,
414
415 #[validate(custom(function = "validate_allowed_chars"))]
416 name: String,
417
418 #[builder(default = "None")]
419 parent: Option<Arc<Namespace>>,
420
421 #[builder(default = "Vec::new()")]
423 labels: Vec<(String, String)>,
424
425 #[builder(default = "crate::MetricsRegistry::new()")]
427 metrics_registry: crate::MetricsRegistry,
428}
429
430impl DistributedRuntimeProvider for Namespace {
431 fn drt(&self) -> &DistributedRuntime {
432 &self.runtime
433 }
434}
435
436impl std::fmt::Debug for Namespace {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 write!(
439 f,
440 "Namespace {{ name: {}; parent: {:?} }}",
441 self.name, self.parent
442 )
443 }
444}
445
446impl RuntimeProvider for Namespace {
447 fn rt(&self) -> &Runtime {
448 self.runtime.rt()
449 }
450}
451
452impl std::fmt::Display for Namespace {
453 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454 write!(f, "{}", self.name)
455 }
456}
457
458impl Namespace {
459 pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
460 let ns = NamespaceBuilder::default()
461 .runtime(Arc::new(runtime))
462 .name(name)
463 .build()?;
464 ns.drt()
466 .get_metrics_registry()
467 .add_child_registry(ns.get_metrics_registry());
468 Ok(ns)
469 }
470
471 pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
473 let component = ComponentBuilder::from_runtime(self.runtime.clone())
474 .name(name)
475 .namespace(self.clone())
476 .build()?;
477 self.get_metrics_registry()
479 .add_child_registry(component.get_metrics_registry());
480 Ok(component)
481 }
482
483 pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
485 let child = NamespaceBuilder::default()
486 .runtime(self.runtime.clone())
487 .name(name.into())
488 .parent(Some(Arc::new(self.clone())))
489 .build()?;
490 self.get_metrics_registry()
492 .add_child_registry(child.get_metrics_registry());
493 Ok(child)
494 }
495
496 pub fn name(&self) -> String {
497 match &self.parent {
498 Some(parent) => format!("{}.{}", parent.name(), self.name),
499 None => self.name.clone(),
500 }
501 }
502}
503
504fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
506 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
508
509 if regex.is_match(input) {
510 Ok(())
511 } else {
512 Err(ValidationError::new("invalid_characters"))
513 }
514}