1use std::fmt;
33
34use crate::{
35 config::HealthStatus,
36 discovery::Lease,
37 metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38 service::ServiceSet,
39 transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
40};
41
42use super::{
43 DistributedRuntime, Result, Runtime, error,
44 traits::*,
45 transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
46 transports::nats::Slug,
47 utils::Duration,
48};
49
50use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
51use crate::protocols::EndpointId;
52use crate::service::ComponentNatsServerPrometheusMetrics;
53use async_nats::{
54 rustls::quic,
55 service::{Service, ServiceExt},
56};
57use derive_builder::Builder;
58use derive_getters::Getters;
59use educe::Educe;
60use serde::{Deserialize, Serialize};
61use service::EndpointStatsHandler;
62use std::{collections::HashMap, hash::Hash, sync::Arc};
63use validator::{Validate, ValidationError};
64
65mod client;
66#[allow(clippy::module_inception)]
67mod component;
68mod endpoint;
69mod namespace;
70mod registry;
71pub mod service;
72
73pub use client::{Client, InstanceSource};
74
75pub const INSTANCE_ROOT_PATH: &str = "v1/instances";
78
79#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
80#[serde(rename_all = "snake_case")]
81pub enum TransportType {
82 NatsTcp(String),
83}
84
85#[derive(Default)]
86pub struct RegistryInner {
87 services: HashMap<String, Service>,
88 stats_handlers: HashMap<String, Arc<parking_lot::Mutex<HashMap<String, EndpointStatsHandler>>>>,
89}
90
91#[derive(Clone)]
92pub struct Registry {
93 inner: Arc<tokio::sync::Mutex<RegistryInner>>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97pub struct Instance {
98 pub component: String,
99 pub endpoint: String,
100 pub namespace: String,
101 pub instance_id: u64,
102 pub transport: TransportType,
103}
104
105impl Instance {
106 pub fn id(&self) -> u64 {
107 self.instance_id
108 }
109 pub fn endpoint_id(&self) -> EndpointId {
110 EndpointId {
111 namespace: self.namespace.clone(),
112 component: self.component.clone(),
113 name: self.endpoint.clone(),
114 }
115 }
116}
117
118impl fmt::Display for Instance {
119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 write!(
121 f,
122 "{}/{}/{}/{}",
123 self.namespace, self.component, self.endpoint, self.instance_id
124 )
125 }
126}
127
128impl std::cmp::Ord for Instance {
130 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
131 self.to_string().cmp(&other.to_string())
132 }
133}
134
135impl PartialOrd for Instance {
136 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
137 Some(self.cmp(other))
139 }
140}
141
142#[derive(Educe, Builder, Clone, Validate)]
148#[educe(Debug)]
149#[builder(pattern = "owned")]
150pub struct Component {
151 #[builder(private)]
152 #[educe(Debug(ignore))]
153 drt: Arc<DistributedRuntime>,
154
155 #[builder(setter(into))]
158 #[validate(custom(function = "validate_allowed_chars"))]
159 name: String,
160
161 #[builder(default = "Vec::new()")]
163 labels: Vec<(String, String)>,
164
165 #[builder(setter(into))]
168 namespace: Namespace,
169
170 is_static: bool,
173
174 #[builder(default = "crate::MetricsRegistry::new()")]
176 metrics_registry: crate::MetricsRegistry,
177}
178
179impl Hash for Component {
180 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
181 self.namespace.name().hash(state);
182 self.name.hash(state);
183 self.is_static.hash(state);
184 }
185}
186
187impl PartialEq for Component {
188 fn eq(&self, other: &Self) -> bool {
189 self.namespace.name() == other.namespace.name()
190 && self.name == other.name
191 && self.is_static == other.is_static
192 }
193}
194
195impl Eq for Component {}
196
197impl std::fmt::Display for Component {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 write!(f, "{}.{}", self.namespace.name(), self.name)
200 }
201}
202
203impl DistributedRuntimeProvider for Component {
204 fn drt(&self) -> &DistributedRuntime {
205 &self.drt
206 }
207}
208
209impl RuntimeProvider for Component {
210 fn rt(&self) -> &Runtime {
211 self.drt.rt()
212 }
213}
214
215impl MetricsHierarchy for Component {
216 fn basename(&self) -> String {
217 self.name.clone()
218 }
219
220 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
221 let mut parents = vec![];
222
223 parents.extend(self.namespace.parent_hierarchies());
225
226 parents.push(&self.namespace as &dyn MetricsHierarchy);
228
229 parents
230 }
231
232 fn get_metrics_registry(&self) -> &MetricsRegistry {
233 &self.metrics_registry
234 }
235}
236
237impl Component {
238 pub fn instance_root(&self) -> String {
240 let ns = self.namespace.name();
241 let cp = &self.name;
242 format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
243 }
244
245 pub fn service_name(&self) -> String {
246 let service_name = format!("{}_{}", self.namespace.name(), self.name);
247 Slug::slugify(&service_name).to_string()
248 }
249
250 pub fn path(&self) -> String {
251 format!("{}/{}", self.namespace.name(), self.name)
252 }
253
254 pub fn etcd_path(&self) -> EtcdPath {
255 EtcdPath::new_component(&self.namespace.name(), &self.name)
256 .expect("Component name and namespace should be valid")
257 }
258
259 pub fn namespace(&self) -> &Namespace {
260 &self.namespace
261 }
262
263 pub fn name(&self) -> &str {
264 &self.name
265 }
266
267 pub fn labels(&self) -> &[(String, String)] {
268 &self.labels
269 }
270
271 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
272 Endpoint {
273 component: self.clone(),
274 name: endpoint.into(),
275 is_static: self.is_static,
276 labels: Vec::new(),
277 metrics_registry: crate::MetricsRegistry::new(),
278 }
279 }
280
281 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
282 let client = self.drt.store();
283 let Some(bucket) = client.get_bucket(&self.instance_root()).await? else {
284 return Ok(vec![]);
285 };
286 let entries = bucket.entries().await?;
287 let mut instances = Vec::with_capacity(entries.len());
288 for (name, bytes) in entries.into_iter() {
289 let val = match serde_json::from_slice::<Instance>(&bytes) {
290 Ok(val) => val,
291 Err(err) => {
292 anyhow::bail!("Error converting storage response to Instance: {err}. {name}",);
293 }
294 };
295 instances.push(val);
296 }
297 instances.sort();
298 Ok(instances)
299 }
300
301 pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
304 let service_name = self.service_name();
306 let Some(service_client) = self.drt().service_client() else {
307 anyhow::bail!("ServiceSet is gathered via NATS, do not call this in non-NATS setups.");
308 };
309 service_client
310 .collect_services(&service_name, timeout)
311 .await
312 }
313
314 pub fn start_scraping_nats_service_component_metrics(&self) -> Result<()> {
322 const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800); let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
326
327 let component_clone = self.clone();
328
329 let m = component_metrics.clone();
331 let c = component_clone.clone();
332
333 c.drt().runtime().secondary().spawn(async move {
342 let timeout = std::time::Duration::from_millis(500);
343 let mut interval = tokio::time::interval(MAX_WAIT_MS);
344 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
345
346 loop {
347 match c.scrape_stats(timeout).await {
348 Ok(service_set) => {
349 m.update_from_service_set(&service_set);
350 }
351 Err(err) => {
352 tracing::error!(
353 "Background scrape failed for {}: {}",
354 c.service_name(),
355 err
356 );
357 m.reset_to_zeros();
358 }
359 }
360
361 interval.tick().await;
362 }
363 });
364
365 Ok(())
366 }
367
368 pub async fn stats_stream(&self) -> Result<()> {
375 unimplemented!("collect_stats")
376 }
377
378 pub async fn add_stats_service(&mut self) -> anyhow::Result<()> {
379 let service_name = self.service_name();
380
381 if self
383 .drt
384 .component_registry
385 .inner
386 .lock()
387 .await
388 .services
389 .contains_key(&service_name)
390 {
391 anyhow::bail!("Service {service_name} already exists");
392 }
393
394 let Some(nats_client) = self.drt.nats_client() else {
395 anyhow::bail!("Cannot create NATS service without NATS.");
396 };
397 let description = None;
398 let (nats_service, stats_reg) =
399 service::build_nats_service(nats_client, self, description).await?;
400
401 let mut guard = self.drt.component_registry.inner.lock().await;
402 if !guard.services.contains_key(&service_name) {
403 guard.services.insert(service_name.clone(), nats_service);
405 guard.stats_handlers.insert(service_name.clone(), stats_reg);
406 drop(guard);
407 } else {
408 drop(guard);
409 let _ = nats_service.stop().await;
410 return Err(anyhow::anyhow!(
411 "Service create race for {service_name}, now already exists"
412 ));
413 }
414
415 if let Err(err) = self.start_scraping_nats_service_component_metrics() {
417 tracing::debug!(service_name, error = %err, "Metrics registration failed");
418 }
419 Ok(())
420 }
421}
422
423impl ComponentBuilder {
424 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
425 Self::default().drt(drt)
426 }
427}
428
429#[derive(Debug, Clone)]
430pub struct Endpoint {
431 component: Component,
432
433 name: String,
436
437 is_static: bool,
438
439 labels: Vec<(String, String)>,
441
442 metrics_registry: crate::MetricsRegistry,
444}
445
446impl Hash for Endpoint {
447 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
448 self.component.hash(state);
449 self.name.hash(state);
450 self.is_static.hash(state);
451 }
452}
453
454impl PartialEq for Endpoint {
455 fn eq(&self, other: &Self) -> bool {
456 self.component == other.component
457 && self.name == other.name
458 && self.is_static == other.is_static
459 }
460}
461
462impl Eq for Endpoint {}
463
464impl DistributedRuntimeProvider for Endpoint {
465 fn drt(&self) -> &DistributedRuntime {
466 self.component.drt()
467 }
468}
469
470impl RuntimeProvider for Endpoint {
471 fn rt(&self) -> &Runtime {
472 self.component.rt()
473 }
474}
475
476impl MetricsHierarchy for Endpoint {
477 fn basename(&self) -> String {
478 self.name.clone()
479 }
480
481 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
482 let mut parents = vec![];
483
484 parents.extend(self.component.parent_hierarchies());
486
487 parents.push(&self.component as &dyn MetricsHierarchy);
489
490 parents
491 }
492
493 fn get_metrics_registry(&self) -> &MetricsRegistry {
494 &self.metrics_registry
495 }
496}
497
498impl Endpoint {
499 pub fn id(&self) -> EndpointId {
500 EndpointId {
501 namespace: self.component.namespace().name().to_string(),
502 component: self.component.name().to_string(),
503 name: self.name().to_string(),
504 }
505 }
506
507 pub fn name(&self) -> &str {
508 &self.name
509 }
510
511 pub fn component(&self) -> &Component {
512 &self.component
513 }
514
515 pub fn path(&self) -> String {
517 format!(
518 "{}/{}/{}",
519 self.component.path(),
520 ENDPOINT_KEYWORD,
521 self.name
522 )
523 }
524
525 pub fn etcd_root(&self) -> String {
527 let component_path = self.component.instance_root();
528 let endpoint_name = &self.name;
529 format!("{component_path}/{endpoint_name}")
530 }
531
532 pub fn etcd_path(&self) -> EtcdPath {
534 EtcdPath::new_endpoint(
535 &self.component.namespace().name(),
536 self.component.name(),
537 &self.name,
538 )
539 .expect("Endpoint name and component name should be valid")
540 }
541
542 pub fn etcd_path_with_lease_id(&self, lease_id: u64) -> String {
544 format!("{INSTANCE_ROOT_PATH}/{}", self.unique_path(lease_id))
545 }
546
547 pub fn unique_path(&self, lease_id: u64) -> String {
549 let ns = self.component.namespace().name();
550 let cp = self.component.name();
551 let ep = self.name();
552 format!("{ns}/{cp}/{ep}/{lease_id:x}")
553 }
554
555 pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
557 if self.is_static {
558 self.etcd_path()
559 } else {
560 EtcdPath::new_endpoint_with_lease(
561 &self.component.namespace().name(),
562 self.component.name(),
563 &self.name,
564 lease_id,
565 )
566 .expect("Endpoint name and component name should be valid")
567 }
568 }
569
570 pub fn name_with_id(&self, lease_id: u64) -> String {
571 if self.is_static {
572 self.name.clone()
573 } else {
574 format!("{}-{:x}", self.name, lease_id)
575 }
576 }
577
578 pub fn subject(&self) -> String {
579 format!("{}.{}", self.component.service_name(), self.name)
580 }
581
582 pub fn subject_to(&self, lease_id: u64) -> String {
584 format!(
585 "{}.{}",
586 self.component.service_name(),
587 self.name_with_id(lease_id)
588 )
589 }
590
591 pub async fn client(&self) -> Result<client::Client> {
592 if self.is_static {
593 client::Client::new_static(self.clone()).await
594 } else {
595 client::Client::new_dynamic(self.clone()).await
596 }
597 }
598
599 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
600 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
601 }
602}
603
604#[derive(Builder, Clone, Validate)]
605#[builder(pattern = "owned")]
606pub struct Namespace {
607 #[builder(private)]
608 runtime: Arc<DistributedRuntime>,
609
610 #[validate(custom(function = "validate_allowed_chars"))]
611 name: String,
612
613 is_static: bool,
614
615 #[builder(default = "None")]
616 parent: Option<Arc<Namespace>>,
617
618 #[builder(default = "Vec::new()")]
620 labels: Vec<(String, String)>,
621
622 #[builder(default = "crate::MetricsRegistry::new()")]
624 metrics_registry: crate::MetricsRegistry,
625}
626
627impl DistributedRuntimeProvider for Namespace {
628 fn drt(&self) -> &DistributedRuntime {
629 &self.runtime
630 }
631}
632
633impl std::fmt::Debug for Namespace {
634 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
635 write!(
636 f,
637 "Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
638 self.name, self.is_static, self.parent
639 )
640 }
641}
642
643impl RuntimeProvider for Namespace {
644 fn rt(&self) -> &Runtime {
645 self.runtime.rt()
646 }
647}
648
649impl std::fmt::Display for Namespace {
650 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
651 write!(f, "{}", self.name)
652 }
653}
654
655impl Namespace {
656 pub(crate) fn new(runtime: DistributedRuntime, name: String, is_static: bool) -> Result<Self> {
657 Ok(NamespaceBuilder::default()
658 .runtime(Arc::new(runtime))
659 .name(name)
660 .is_static(is_static)
661 .build()?)
662 }
663
664 pub fn component(&self, name: impl Into<String>) -> Result<Component> {
666 Ok(ComponentBuilder::from_runtime(self.runtime.clone())
667 .name(name)
668 .namespace(self.clone())
669 .is_static(self.is_static)
670 .build()?)
671 }
672
673 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
675 Ok(NamespaceBuilder::default()
676 .runtime(self.runtime.clone())
677 .name(name.into())
678 .is_static(self.is_static)
679 .parent(Some(Arc::new(self.clone())))
680 .build()?)
681 }
682
683 pub fn etcd_path(&self) -> String {
684 format!("{ETCD_ROOT_PATH}{}", self.name())
685 }
686
687 pub fn name(&self) -> String {
688 match &self.parent {
689 Some(parent) => format!("{}.{}", parent.name(), self.name),
690 None => self.name.clone(),
691 }
692 }
693}
694
695fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
697 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
699
700 if regex.is_match(input) {
701 Ok(())
702 } else {
703 Err(ValidationError::new("invalid_characters"))
704 }
705}
706
707