1use crate::{
33 config::HealthStatus,
34 discovery::Lease,
35 metrics::{prometheus_names, MetricsRegistry},
36 service::ServiceSet,
37 transports::etcd::EtcdPath,
38};
39
40use super::{
41 error,
42 traits::*,
43 transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
44 transports::nats::Slug,
45 utils::Duration,
46 DistributedRuntime, Result, Runtime,
47};
48
49use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
50use crate::protocols::Endpoint as EndpointId;
51use crate::service::ComponentNatsServerPrometheusMetrics;
52use async_nats::{
53 rustls::quic,
54 service::{Service, ServiceExt},
55};
56use derive_builder::Builder;
57use derive_getters::Getters;
58use educe::Educe;
59use serde::{Deserialize, Serialize};
60use service::EndpointStatsHandler;
61use std::{collections::HashMap, hash::Hash, sync::Arc};
62use validator::{Validate, ValidationError};
63
64mod client;
65#[allow(clippy::module_inception)]
66mod component;
67mod endpoint;
68mod namespace;
69mod registry;
70pub mod service;
71
72pub use client::{Client, InstanceSource};
73
74pub const INSTANCE_ROOT_PATH: &str = "instances";
77
78pub const ETCD_ROOT_PATH: &str = "dynamo://";
80
81#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
82#[serde(rename_all = "snake_case")]
83pub enum TransportType {
84 NatsTcp(String),
85}
86
87#[derive(Default)]
88pub struct RegistryInner {
89 services: HashMap<String, Service>,
90 stats_handlers: HashMap<String, Arc<std::sync::Mutex<HashMap<String, EndpointStatsHandler>>>>,
91}
92
93#[derive(Clone)]
94pub struct Registry {
95 inner: Arc<tokio::sync::Mutex<RegistryInner>>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct Instance {
100 pub component: String,
101 pub endpoint: String,
102 pub namespace: String,
103 pub instance_id: i64,
104 pub transport: TransportType,
105}
106
107impl Instance {
108 pub fn id(&self) -> i64 {
109 self.instance_id
110 }
111}
112
113#[derive(Educe, Builder, Clone, Validate)]
119#[educe(Debug)]
120#[builder(pattern = "owned")]
121pub struct Component {
122 #[builder(private)]
123 #[educe(Debug(ignore))]
124 drt: Arc<DistributedRuntime>,
125
126 #[builder(setter(into))]
129 #[validate(custom(function = "validate_allowed_chars"))]
130 name: String,
131
132 #[builder(default = "Vec::new()")]
134 labels: Vec<(String, String)>,
135
136 #[builder(setter(into))]
139 namespace: Namespace,
140
141 is_static: bool,
144}
145
146impl Hash for Component {
147 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
148 self.namespace.name().hash(state);
149 self.name.hash(state);
150 self.is_static.hash(state);
151 }
152}
153
154impl PartialEq for Component {
155 fn eq(&self, other: &Self) -> bool {
156 self.namespace.name() == other.namespace.name()
157 && self.name == other.name
158 && self.is_static == other.is_static
159 }
160}
161
162impl Eq for Component {}
163
164impl std::fmt::Display for Component {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 write!(f, "{}.{}", self.namespace.name(), self.name)
167 }
168}
169
170impl DistributedRuntimeProvider for Component {
171 fn drt(&self) -> &DistributedRuntime {
172 &self.drt
173 }
174}
175
176impl RuntimeProvider for Component {
177 fn rt(&self) -> &Runtime {
178 self.drt.rt()
179 }
180}
181
182impl MetricsRegistry for Component {
183 fn basename(&self) -> String {
184 self.name.clone()
185 }
186
187 fn parent_hierarchy(&self) -> Vec<String> {
188 [
189 self.namespace.parent_hierarchy(),
190 vec![self.namespace.basename()],
191 ]
192 .concat()
193 }
194}
195
196impl Component {
197 pub fn etcd_root(&self) -> String {
199 let ns = self.namespace.name();
200 let cp = &self.name;
201 format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
202 }
203
204 pub fn service_name(&self) -> String {
205 let service_name = format!("{}_{}", self.namespace.name(), self.name);
206 Slug::slugify(&service_name).to_string()
207 }
208
209 pub fn path(&self) -> String {
210 format!("{}/{}", self.namespace.name(), self.name)
211 }
212
213 pub fn etcd_path(&self) -> EtcdPath {
214 EtcdPath::new_component(&self.namespace.name(), &self.name)
215 .expect("Component name and namespace should be valid")
216 }
217
218 pub fn namespace(&self) -> &Namespace {
219 &self.namespace
220 }
221
222 pub fn name(&self) -> String {
223 self.name.clone()
224 }
225
226 pub fn labels(&self) -> &[(String, String)] {
227 &self.labels
228 }
229
230 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
231 Endpoint {
232 component: self.clone(),
233 name: endpoint.into(),
234 is_static: self.is_static,
235 labels: Vec::new(),
236 }
237 }
238
239 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
240 let Some(etcd_client) = self.drt.etcd_client() else {
241 return Ok(vec![]);
242 };
243 let mut out = vec![];
244 for kv in etcd_client
246 .kv_get_prefix(format!("{}/", self.etcd_root()))
247 .await?
248 {
249 let val = match serde_json::from_slice::<Instance>(kv.value()) {
250 Ok(val) => val,
251 Err(err) => {
252 anyhow::bail!(
253 "Error converting etcd response to Instance: {err}. {}",
254 kv.value_str()?
255 );
256 }
257 };
258 out.push(val);
259 }
260 Ok(out)
261 }
262
263 pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
266 let service_name = self.service_name();
268 let service_client = self.drt().service_client();
269 service_client
270 .collect_services(&service_name, timeout)
271 .await
272 }
273
274 pub fn start_scraping_nats_service_component_metrics(&self) -> Result<()> {
281 const NATS_TIMEOUT_AND_INITIAL_DELAY_MS: std::time::Duration =
282 std::time::Duration::from_millis(300);
283 const MAX_DELAY_MS: std::time::Duration = std::time::Duration::from_millis(873);
284
285 let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
287
288 let component_clone = self.clone();
289 let mut hierarchies = self.parent_hierarchy();
290 hierarchies.push(self.hierarchy());
291 debug_assert!(hierarchies
292 .last()
293 .map(|x| x.as_str())
294 .unwrap_or_default()
295 .eq_ignore_ascii_case(&self.service_name())); let m = component_metrics.clone();
299 let c = component_clone.clone();
300
301 c.drt().runtime().secondary().spawn(async move {
310 let timeout = NATS_TIMEOUT_AND_INITIAL_DELAY_MS;
311 let mut interval = tokio::time::interval(MAX_DELAY_MS);
312 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
313
314 loop {
315 match c.scrape_stats(timeout).await {
316 Ok(service_set) => {
317 m.update_from_service_set(&service_set);
318 }
319 Err(err) => {
320 tracing::error!(
321 "Background scrape failed for {}: {}",
322 c.service_name(),
323 err
324 );
325 m.reset_to_zeros();
326 }
327 }
328 interval.tick().await;
329 }
330 });
331
332 Ok(())
333 }
334
335 pub async fn stats_stream(&self) -> Result<()> {
342 unimplemented!("collect_stats")
343 }
344
345 pub fn service_builder(&self) -> service::ServiceConfigBuilder {
346 service::ServiceConfigBuilder::from_component(self.clone())
347 }
348}
349
350impl ComponentBuilder {
351 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
352 Self::default().drt(drt)
353 }
354}
355
356#[derive(Debug, Clone)]
357pub struct Endpoint {
358 component: Component,
359
360 name: String,
363
364 is_static: bool,
365
366 labels: Vec<(String, String)>,
368}
369
370impl Hash for Endpoint {
371 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
372 self.component.hash(state);
373 self.name.hash(state);
374 self.is_static.hash(state);
375 }
376}
377
378impl PartialEq for Endpoint {
379 fn eq(&self, other: &Self) -> bool {
380 self.component == other.component
381 && self.name == other.name
382 && self.is_static == other.is_static
383 }
384}
385
386impl Eq for Endpoint {}
387
388impl DistributedRuntimeProvider for Endpoint {
389 fn drt(&self) -> &DistributedRuntime {
390 self.component.drt()
391 }
392}
393
394impl RuntimeProvider for Endpoint {
395 fn rt(&self) -> &Runtime {
396 self.component.rt()
397 }
398}
399
400impl MetricsRegistry for Endpoint {
401 fn basename(&self) -> String {
402 self.name.clone()
403 }
404
405 fn parent_hierarchy(&self) -> Vec<String> {
406 [
407 self.component.parent_hierarchy(),
408 vec![self.component.basename()],
409 ]
410 .concat()
411 }
412}
413
414impl Endpoint {
415 pub fn id(&self) -> EndpointId {
416 EndpointId {
417 namespace: self.component.namespace().name().to_string(),
418 component: self.component.name().to_string(),
419 name: self.name().to_string(),
420 }
421 }
422
423 pub fn name(&self) -> &str {
424 &self.name
425 }
426
427 pub fn component(&self) -> &Component {
428 &self.component
429 }
430
431 pub fn path(&self) -> String {
433 format!(
434 "{}/{}/{}",
435 self.component.path(),
436 ENDPOINT_KEYWORD,
437 self.name
438 )
439 }
440
441 pub fn etcd_root(&self) -> String {
443 let component_path = self.component.etcd_root();
444 let endpoint_name = &self.name;
445 format!("{component_path}/{endpoint_name}")
446 }
447
448 pub fn etcd_path(&self) -> EtcdPath {
450 EtcdPath::new_endpoint(
451 &self.component.namespace().name(),
452 &self.component.name(),
453 &self.name,
454 )
455 .expect("Endpoint name and component name should be valid")
456 }
457
458 pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String {
460 let endpoint_root = self.etcd_root();
461 if self.is_static {
462 endpoint_root
463 } else {
464 format!("{endpoint_root}:{lease_id:x}")
465 }
466 }
467
468 pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
470 if self.is_static {
471 self.etcd_path()
472 } else {
473 EtcdPath::new_endpoint_with_lease(
474 &self.component.namespace().name(),
475 &self.component.name(),
476 &self.name,
477 lease_id,
478 )
479 .expect("Endpoint name and component name should be valid")
480 }
481 }
482
483 pub fn name_with_id(&self, lease_id: i64) -> String {
484 if self.is_static {
485 self.name.clone()
486 } else {
487 format!("{}-{:x}", self.name, lease_id)
488 }
489 }
490
491 pub fn subject(&self) -> String {
492 format!("{}.{}", self.component.service_name(), self.name)
493 }
494
495 pub fn subject_to(&self, lease_id: i64) -> String {
497 format!(
498 "{}.{}",
499 self.component.service_name(),
500 self.name_with_id(lease_id)
501 )
502 }
503
504 pub async fn client(&self) -> Result<client::Client> {
505 if self.is_static {
506 client::Client::new_static(self.clone()).await
507 } else {
508 client::Client::new_dynamic(self.clone()).await
509 }
510 }
511
512 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
513 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
514 }
515}
516
517#[derive(Builder, Clone, Validate)]
518#[builder(pattern = "owned")]
519pub struct Namespace {
520 #[builder(private)]
521 runtime: Arc<DistributedRuntime>,
522
523 #[validate(custom(function = "validate_allowed_chars"))]
524 name: String,
525
526 is_static: bool,
527
528 #[builder(default = "None")]
529 parent: Option<Arc<Namespace>>,
530
531 #[builder(default = "Vec::new()")]
533 labels: Vec<(String, String)>,
534}
535
536impl DistributedRuntimeProvider for Namespace {
537 fn drt(&self) -> &DistributedRuntime {
538 &self.runtime
539 }
540}
541
542impl std::fmt::Debug for Namespace {
543 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544 write!(
545 f,
546 "Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
547 self.name, self.is_static, self.parent
548 )
549 }
550}
551
552impl RuntimeProvider for Namespace {
553 fn rt(&self) -> &Runtime {
554 self.runtime.rt()
555 }
556}
557
558impl std::fmt::Display for Namespace {
559 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560 write!(f, "{}", self.name)
561 }
562}
563
564impl Namespace {
565 pub(crate) fn new(runtime: DistributedRuntime, name: String, is_static: bool) -> Result<Self> {
566 Ok(NamespaceBuilder::default()
567 .runtime(Arc::new(runtime))
568 .name(name)
569 .is_static(is_static)
570 .build()?)
571 }
572
573 pub fn component(&self, name: impl Into<String>) -> Result<Component> {
575 let component = ComponentBuilder::from_runtime(self.runtime.clone())
576 .name(name)
577 .namespace(self.clone())
578 .is_static(self.is_static)
579 .build()?;
580
581 if let Err(err) = component.start_scraping_nats_service_component_metrics() {
585 let error_str = err.to_string();
586
587 if error_str.contains("Duplicate metrics") {
590 tracing::debug!(
593 "Duplicate metrics registration for component '{}' (expected when multiple components share the same service_name): {}",
594 component.service_name(),
595 error_str
596 );
597 } else {
598 tracing::warn!(
600 "Failed to start scraping metrics for component '{}': {}",
601 component.service_name(),
602 err
603 );
604 }
605 }
606
607 Ok(component)
608 }
609
610 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
612 Ok(NamespaceBuilder::default()
613 .runtime(self.runtime.clone())
614 .name(name.into())
615 .is_static(self.is_static)
616 .parent(Some(Arc::new(self.clone())))
617 .build()?)
618 }
619
620 pub fn etcd_path(&self) -> String {
621 format!("{}{}", ETCD_ROOT_PATH, self.name())
622 }
623
624 pub fn name(&self) -> String {
625 match &self.parent {
626 Some(parent) => format!("{}.{}", parent.name(), self.name),
627 None => self.name.clone(),
628 }
629 }
630}
631
632fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
634 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
636
637 if regex.is_match(input) {
638 Ok(())
639 } else {
640 Err(ValidationError::new("invalid_characters"))
641 }
642}
643
644