1use std::fmt;
33
34use crate::{
35 config::{HealthStatus, RequestPlaneMode},
36 metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
37 service::ServiceSet,
38 transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
39};
40
41use super::{
42 DistributedRuntime, Runtime,
43 traits::*,
44 transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
45 transports::nats::Slug,
46 utils::Duration,
47};
48
49use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
50use crate::protocols::EndpointId;
51use crate::service::ComponentNatsServerPrometheusMetrics;
52use async_nats::{
53 rustls::quic,
54 service::{Service, ServiceExt},
55};
56use derive_builder::Builder;
57use derive_getters::Getters;
58use educe::Educe;
59use serde::{Deserialize, Serialize};
60use service::EndpointStatsHandler;
61use std::{collections::HashMap, hash::Hash, sync::Arc};
62use validator::{Validate, ValidationError};
63
64mod client;
65#[allow(clippy::module_inception)]
66mod component;
67mod endpoint;
68mod namespace;
69mod registry;
70pub mod service;
71
72pub use client::{Client, InstanceSource};
73
74pub const INSTANCE_ROOT_PATH: &str = "v1/instances";
77
78#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
79#[serde(rename_all = "snake_case")]
80pub enum TransportType {
81 #[serde(rename = "nats_tcp")]
82 Nats(String),
83 Http(String),
84 Tcp(String),
85}
86
87#[derive(Default)]
88pub struct RegistryInner {
89 pub(crate) services: HashMap<String, Service>,
90 pub(crate) stats_handlers:
91 HashMap<String, Arc<parking_lot::Mutex<HashMap<String, EndpointStatsHandler>>>>,
92}
93
94#[derive(Clone)]
95pub struct Registry {
96 pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
97 is_static: bool,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
101pub struct Instance {
102 pub component: String,
103 pub endpoint: String,
104 pub namespace: String,
105 pub instance_id: u64,
106 pub transport: TransportType,
107}
108
109impl Instance {
110 pub fn id(&self) -> u64 {
111 self.instance_id
112 }
113 pub fn endpoint_id(&self) -> EndpointId {
114 EndpointId {
115 namespace: self.namespace.clone(),
116 component: self.component.clone(),
117 name: self.endpoint.clone(),
118 }
119 }
120}
121
122impl fmt::Display for Instance {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 write!(
125 f,
126 "{}/{}/{}/{}",
127 self.namespace, self.component, self.endpoint, self.instance_id
128 )
129 }
130}
131
132impl std::cmp::Ord for Instance {
134 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
135 self.to_string().cmp(&other.to_string())
136 }
137}
138
139impl PartialOrd for Instance {
140 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
141 Some(self.cmp(other))
143 }
144}
145
146#[derive(Educe, Builder, Clone, Validate)]
152#[educe(Debug)]
153#[builder(pattern = "owned")]
154pub struct Component {
155 #[builder(private)]
156 #[educe(Debug(ignore))]
157 drt: Arc<DistributedRuntime>,
158
159 #[builder(setter(into))]
162 #[validate(custom(function = "validate_allowed_chars"))]
163 name: String,
164
165 #[builder(default = "Vec::new()")]
167 labels: Vec<(String, String)>,
168
169 #[builder(setter(into))]
172 namespace: Namespace,
173
174 is_static: bool,
177
178 #[builder(default = "crate::MetricsRegistry::new()")]
180 metrics_registry: crate::MetricsRegistry,
181}
182
183impl Hash for Component {
184 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
185 self.namespace.name().hash(state);
186 self.name.hash(state);
187 self.is_static.hash(state);
188 }
189}
190
191impl PartialEq for Component {
192 fn eq(&self, other: &Self) -> bool {
193 self.namespace.name() == other.namespace.name()
194 && self.name == other.name
195 && self.is_static == other.is_static
196 }
197}
198
199impl Eq for Component {}
200
201impl std::fmt::Display for Component {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 write!(f, "{}.{}", self.namespace.name(), self.name)
204 }
205}
206
207impl DistributedRuntimeProvider for Component {
208 fn drt(&self) -> &DistributedRuntime {
209 &self.drt
210 }
211}
212
213impl RuntimeProvider for Component {
214 fn rt(&self) -> &Runtime {
215 self.drt.rt()
216 }
217}
218
219impl MetricsHierarchy for Component {
220 fn basename(&self) -> String {
221 self.name.clone()
222 }
223
224 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
225 let mut parents = vec![];
226
227 parents.extend(self.namespace.parent_hierarchies());
229
230 parents.push(&self.namespace as &dyn MetricsHierarchy);
232
233 parents
234 }
235
236 fn get_metrics_registry(&self) -> &MetricsRegistry {
237 &self.metrics_registry
238 }
239}
240
241impl Component {
242 pub fn instance_root(&self) -> String {
244 let ns = self.namespace.name();
245 let cp = &self.name;
246 format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
247 }
248
249 pub fn service_name(&self) -> String {
250 let service_name = format!("{}_{}", self.namespace.name(), self.name);
251 Slug::slugify(&service_name).to_string()
252 }
253
254 pub fn path(&self) -> String {
255 format!("{}/{}", self.namespace.name(), self.name)
256 }
257
258 pub fn etcd_path(&self) -> EtcdPath {
259 EtcdPath::new_component(&self.namespace.name(), &self.name)
260 .expect("Component name and namespace should be valid")
261 }
262
263 pub fn namespace(&self) -> &Namespace {
264 &self.namespace
265 }
266
267 pub fn name(&self) -> &str {
268 &self.name
269 }
270
271 pub fn labels(&self) -> &[(String, String)] {
272 &self.labels
273 }
274
275 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
276 Endpoint {
277 component: self.clone(),
278 name: endpoint.into(),
279 is_static: self.is_static,
280 labels: Vec::new(),
281 metrics_registry: crate::MetricsRegistry::new(),
282 }
283 }
284
285 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
286 let discovery = self.drt.discovery();
287
288 let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
289 namespace: self.namespace.name(),
290 component: self.name.clone(),
291 };
292
293 let discovery_instances = discovery.list(discovery_query).await?;
294
295 let mut instances: Vec<Instance> = discovery_instances
297 .into_iter()
298 .filter_map(|di| match di {
299 crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
300 _ => None, })
302 .collect();
303
304 instances.sort();
305 Ok(instances)
306 }
307
308 pub async fn scrape_stats(&self, timeout: Duration) -> anyhow::Result<ServiceSet> {
311 let service_name = self.service_name();
313 let Some(service_client) = self.drt().service_client() else {
314 anyhow::bail!("ServiceSet is gathered via NATS, do not call this in non-NATS setups.");
315 };
316 service_client
317 .collect_services(&service_name, timeout)
318 .await
319 }
320
321 pub fn start_scraping_nats_service_component_metrics(&self) -> anyhow::Result<()> {
329 const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800); let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
333
334 let component_clone = self.clone();
335
336 let m = component_metrics.clone();
338 let c = component_clone.clone();
339
340 c.drt().runtime().secondary().spawn(async move {
349 let timeout = std::time::Duration::from_millis(500);
350 let mut interval = tokio::time::interval(MAX_WAIT_MS);
351 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
352
353 loop {
354 match c.scrape_stats(timeout).await {
355 Ok(service_set) => {
356 m.update_from_service_set(&service_set);
357 }
358 Err(err) => {
359 tracing::error!(
360 "Background scrape failed for {}: {}",
361 c.service_name(),
362 err
363 );
364 m.reset_to_zeros();
365 }
366 }
367
368 interval.tick().await;
369 }
370 });
371
372 Ok(())
373 }
374
375 pub async fn stats_stream(&self) -> anyhow::Result<()> {
382 unimplemented!("collect_stats")
383 }
384
385 pub async fn add_stats_service(&mut self) -> anyhow::Result<()> {
386 let service_name = self.service_name();
387
388 if self
390 .drt
391 .component_registry()
392 .inner
393 .lock()
394 .await
395 .services
396 .contains_key(&service_name)
397 {
398 anyhow::bail!("Service {service_name} already exists");
399 }
400
401 let Some(nats_client) = self.drt.nats_client() else {
402 anyhow::bail!("Cannot create NATS service without NATS.");
403 };
404 let description = None;
405 let (nats_service, stats_reg) =
406 service::build_nats_service(nats_client, self, description).await?;
407
408 let mut guard = self.drt.component_registry().inner.lock().await;
409 if !guard.services.contains_key(&service_name) {
410 guard.services.insert(service_name.clone(), nats_service);
412 guard.stats_handlers.insert(service_name.clone(), stats_reg);
413 drop(guard);
414 } else {
415 drop(guard);
416 let _ = nats_service.stop().await;
417 return Err(anyhow::anyhow!(
418 "Service create race for {service_name}, now already exists"
419 ));
420 }
421
422 let request_plane_mode = RequestPlaneMode::get();
425 match request_plane_mode {
426 RequestPlaneMode::Nats => {
427 if let Err(err) = self.start_scraping_nats_service_component_metrics() {
428 tracing::debug!(
429 "Metrics registration failed for '{}': {}",
430 self.service_name(),
431 err
432 );
433 }
434 }
435 _ => {
436 tracing::info!(
437 "Skipping NATS service metrics collection for '{}' - request plane mode is '{}'",
438 self.service_name(),
439 request_plane_mode
440 );
441 }
442 }
443 Ok(())
444 }
445}
446
447impl ComponentBuilder {
448 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
449 Self::default().drt(drt)
450 }
451}
452
453#[derive(Debug, Clone)]
454pub struct Endpoint {
455 component: Component,
456
457 name: String,
460
461 is_static: bool,
462
463 labels: Vec<(String, String)>,
465
466 metrics_registry: crate::MetricsRegistry,
468}
469
470impl Hash for Endpoint {
471 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
472 self.component.hash(state);
473 self.name.hash(state);
474 self.is_static.hash(state);
475 }
476}
477
478impl PartialEq for Endpoint {
479 fn eq(&self, other: &Self) -> bool {
480 self.component == other.component
481 && self.name == other.name
482 && self.is_static == other.is_static
483 }
484}
485
486impl Eq for Endpoint {}
487
488impl DistributedRuntimeProvider for Endpoint {
489 fn drt(&self) -> &DistributedRuntime {
490 self.component.drt()
491 }
492}
493
494impl RuntimeProvider for Endpoint {
495 fn rt(&self) -> &Runtime {
496 self.component.rt()
497 }
498}
499
500impl MetricsHierarchy for Endpoint {
501 fn basename(&self) -> String {
502 self.name.clone()
503 }
504
505 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
506 let mut parents = vec![];
507
508 parents.extend(self.component.parent_hierarchies());
510
511 parents.push(&self.component as &dyn MetricsHierarchy);
513
514 parents
515 }
516
517 fn get_metrics_registry(&self) -> &MetricsRegistry {
518 &self.metrics_registry
519 }
520}
521
522impl Endpoint {
523 pub fn id(&self) -> EndpointId {
524 EndpointId {
525 namespace: self.component.namespace().name().to_string(),
526 component: self.component.name().to_string(),
527 name: self.name().to_string(),
528 }
529 }
530
531 pub fn name(&self) -> &str {
532 &self.name
533 }
534
535 pub fn component(&self) -> &Component {
536 &self.component
537 }
538
539 pub fn path(&self) -> String {
541 format!(
542 "{}/{}/{}",
543 self.component.path(),
544 ENDPOINT_KEYWORD,
545 self.name
546 )
547 }
548
549 pub fn etcd_root(&self) -> String {
551 let component_path = self.component.instance_root();
552 let endpoint_name = &self.name;
553 format!("{component_path}/{endpoint_name}")
554 }
555
556 pub fn etcd_path(&self) -> EtcdPath {
558 EtcdPath::new_endpoint(
559 &self.component.namespace().name(),
560 self.component.name(),
561 &self.name,
562 )
563 .expect("Endpoint name and component name should be valid")
564 }
565
566 pub fn etcd_path_with_lease_id(&self, lease_id: u64) -> String {
568 format!("{INSTANCE_ROOT_PATH}/{}", self.unique_path(lease_id))
569 }
570
571 pub fn unique_path(&self, lease_id: u64) -> String {
573 let ns = self.component.namespace().name();
574 let cp = self.component.name();
575 let ep = self.name();
576 format!("{ns}/{cp}/{ep}/{lease_id:x}")
577 }
578
579 pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
581 if self.is_static {
582 self.etcd_path()
583 } else {
584 EtcdPath::new_endpoint_with_lease(
585 &self.component.namespace().name(),
586 self.component.name(),
587 &self.name,
588 lease_id,
589 )
590 .expect("Endpoint name and component name should be valid")
591 }
592 }
593
594 pub fn name_with_id(&self, lease_id: u64) -> String {
595 if self.is_static {
596 self.name.clone()
597 } else {
598 format!("{}-{:x}", self.name, lease_id)
599 }
600 }
601
602 pub fn subject(&self) -> String {
603 format!("{}.{}", self.component.service_name(), self.name)
604 }
605
606 pub fn subject_to(&self, lease_id: u64) -> String {
608 format!(
609 "{}.{}",
610 self.component.service_name(),
611 self.name_with_id(lease_id)
612 )
613 }
614
615 pub async fn client(&self) -> anyhow::Result<client::Client> {
616 if self.is_static {
617 client::Client::new_static(self.clone()).await
618 } else {
619 client::Client::new_dynamic(self.clone()).await
620 }
621 }
622
623 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
624 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
625 }
626}
627
628#[derive(Builder, Clone, Validate)]
629#[builder(pattern = "owned")]
630pub struct Namespace {
631 #[builder(private)]
632 runtime: Arc<DistributedRuntime>,
633
634 #[validate(custom(function = "validate_allowed_chars"))]
635 name: String,
636
637 is_static: bool,
638
639 #[builder(default = "None")]
640 parent: Option<Arc<Namespace>>,
641
642 #[builder(default = "Vec::new()")]
644 labels: Vec<(String, String)>,
645
646 #[builder(default = "crate::MetricsRegistry::new()")]
648 metrics_registry: crate::MetricsRegistry,
649}
650
651impl DistributedRuntimeProvider for Namespace {
652 fn drt(&self) -> &DistributedRuntime {
653 &self.runtime
654 }
655}
656
657impl std::fmt::Debug for Namespace {
658 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
659 write!(
660 f,
661 "Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
662 self.name, self.is_static, self.parent
663 )
664 }
665}
666
667impl RuntimeProvider for Namespace {
668 fn rt(&self) -> &Runtime {
669 self.runtime.rt()
670 }
671}
672
673impl std::fmt::Display for Namespace {
674 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675 write!(f, "{}", self.name)
676 }
677}
678
679impl Namespace {
680 pub(crate) fn new(
681 runtime: DistributedRuntime,
682 name: String,
683 is_static: bool,
684 ) -> anyhow::Result<Self> {
685 Ok(NamespaceBuilder::default()
686 .runtime(Arc::new(runtime))
687 .name(name)
688 .is_static(is_static)
689 .build()?)
690 }
691
692 pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
694 Ok(ComponentBuilder::from_runtime(self.runtime.clone())
695 .name(name)
696 .namespace(self.clone())
697 .is_static(self.is_static)
698 .build()?)
699 }
700
701 pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
703 Ok(NamespaceBuilder::default()
704 .runtime(self.runtime.clone())
705 .name(name.into())
706 .is_static(self.is_static)
707 .parent(Some(Arc::new(self.clone())))
708 .build()?)
709 }
710
711 pub fn etcd_path(&self) -> String {
712 format!("{ETCD_ROOT_PATH}{}", self.name())
713 }
714
715 pub fn name(&self) -> String {
716 match &self.parent {
717 Some(parent) => format!("{}.{}", parent.name(), self.name),
718 None => self.name.clone(),
719 }
720 }
721}
722
723fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
725 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
727
728 if regex.is_match(input) {
729 Ok(())
730 } else {
731 Err(ValidationError::new("invalid_characters"))
732 }
733}
734
735