1use crate::{
33 config::HealthStatus,
34 discovery::Lease,
35 metrics::{MetricsRegistry, prometheus_names},
36 service::ServiceSet,
37 transports::etcd::EtcdPath,
38};
39
40use super::{
41 DistributedRuntime, Result, Runtime, error,
42 traits::*,
43 transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
44 transports::nats::Slug,
45 utils::Duration,
46};
47
48use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
49use crate::protocols::EndpointId;
50use crate::service::ComponentNatsServerPrometheusMetrics;
51use async_nats::{
52 rustls::quic,
53 service::{Service, ServiceExt},
54};
55use derive_builder::Builder;
56use derive_getters::Getters;
57use educe::Educe;
58use serde::{Deserialize, Serialize};
59use service::EndpointStatsHandler;
60use std::{collections::HashMap, hash::Hash, sync::Arc};
61use validator::{Validate, ValidationError};
62
63mod client;
64#[allow(clippy::module_inception)]
65mod component;
66mod endpoint;
67mod namespace;
68mod registry;
69pub mod service;
70
71pub use client::{Client, InstanceSource};
72
73pub const INSTANCE_ROOT_PATH: &str = "instances";
76
77pub const ETCD_ROOT_PATH: &str = "dynamo://";
79
80#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
81#[serde(rename_all = "snake_case")]
82pub enum TransportType {
83 NatsTcp(String),
84}
85
86#[derive(Default)]
87pub struct RegistryInner {
88 services: HashMap<String, Service>,
89 stats_handlers: HashMap<String, Arc<std::sync::Mutex<HashMap<String, EndpointStatsHandler>>>>,
90}
91
92#[derive(Clone)]
93pub struct Registry {
94 inner: Arc<tokio::sync::Mutex<RegistryInner>>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct Instance {
99 pub component: String,
100 pub endpoint: String,
101 pub namespace: String,
102 pub instance_id: i64,
103 pub transport: TransportType,
104}
105
106impl Instance {
107 pub fn id(&self) -> i64 {
108 self.instance_id
109 }
110}
111
112#[derive(Educe, Builder, Clone, Validate)]
118#[educe(Debug)]
119#[builder(pattern = "owned")]
120pub struct Component {
121 #[builder(private)]
122 #[educe(Debug(ignore))]
123 drt: Arc<DistributedRuntime>,
124
125 #[builder(setter(into))]
128 #[validate(custom(function = "validate_allowed_chars"))]
129 name: String,
130
131 #[builder(default = "Vec::new()")]
133 labels: Vec<(String, String)>,
134
135 #[builder(setter(into))]
138 namespace: Namespace,
139
140 is_static: bool,
143}
144
145impl Hash for Component {
146 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
147 self.namespace.name().hash(state);
148 self.name.hash(state);
149 self.is_static.hash(state);
150 }
151}
152
153impl PartialEq for Component {
154 fn eq(&self, other: &Self) -> bool {
155 self.namespace.name() == other.namespace.name()
156 && self.name == other.name
157 && self.is_static == other.is_static
158 }
159}
160
161impl Eq for Component {}
162
163impl std::fmt::Display for Component {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 write!(f, "{}.{}", self.namespace.name(), self.name)
166 }
167}
168
169impl DistributedRuntimeProvider for Component {
170 fn drt(&self) -> &DistributedRuntime {
171 &self.drt
172 }
173}
174
175impl RuntimeProvider for Component {
176 fn rt(&self) -> &Runtime {
177 self.drt.rt()
178 }
179}
180
181impl MetricsRegistry for Component {
182 fn basename(&self) -> String {
183 self.name.clone()
184 }
185
186 fn parent_hierarchy(&self) -> Vec<String> {
187 [
188 self.namespace.parent_hierarchy(),
189 vec![self.namespace.basename()],
190 ]
191 .concat()
192 }
193}
194
195impl Component {
196 pub fn etcd_root(&self) -> String {
198 let ns = self.namespace.name();
199 let cp = &self.name;
200 format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
201 }
202
203 pub fn service_name(&self) -> String {
204 let service_name = format!("{}_{}", self.namespace.name(), self.name);
205 Slug::slugify(&service_name).to_string()
206 }
207
208 pub fn path(&self) -> String {
209 format!("{}/{}", self.namespace.name(), self.name)
210 }
211
212 pub fn etcd_path(&self) -> EtcdPath {
213 EtcdPath::new_component(&self.namespace.name(), &self.name)
214 .expect("Component name and namespace should be valid")
215 }
216
217 pub fn namespace(&self) -> &Namespace {
218 &self.namespace
219 }
220
221 pub fn name(&self) -> String {
222 self.name.clone()
223 }
224
225 pub fn labels(&self) -> &[(String, String)] {
226 &self.labels
227 }
228
229 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
230 Endpoint {
231 component: self.clone(),
232 name: endpoint.into(),
233 is_static: self.is_static,
234 labels: Vec::new(),
235 }
236 }
237
238 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
239 let Some(etcd_client) = self.drt.etcd_client() else {
240 return Ok(vec![]);
241 };
242 let mut out = vec![];
243 for kv in etcd_client
245 .kv_get_prefix(format!("{}/", self.etcd_root()))
246 .await?
247 {
248 let val = match serde_json::from_slice::<Instance>(kv.value()) {
249 Ok(val) => val,
250 Err(err) => {
251 anyhow::bail!(
252 "Error converting etcd response to Instance: {err}. {}",
253 kv.value_str()?
254 );
255 }
256 };
257 out.push(val);
258 }
259 Ok(out)
260 }
261
262 pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
265 let service_name = self.service_name();
267 let service_client = self.drt().service_client();
268 service_client
269 .collect_services(&service_name, timeout)
270 .await
271 }
272
273 pub fn start_scraping_nats_service_component_metrics(&self) -> Result<()> {
281 const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800); let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
285
286 let component_clone = self.clone();
287 let mut hierarchies = self.parent_hierarchy();
288 hierarchies.push(self.hierarchy());
289 debug_assert!(
290 hierarchies
291 .last()
292 .map(|x| x.as_str())
293 .unwrap_or_default()
294 .eq_ignore_ascii_case(&self.service_name())
295 ); let m = component_metrics.clone();
299 let c = component_clone.clone();
300
301 c.drt().runtime().secondary().spawn(async move {
310 let timeout = std::time::Duration::from_millis(500);
311 let mut interval = tokio::time::interval(MAX_WAIT_MS);
312 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
313
314 loop {
315 match c.scrape_stats(timeout).await {
316 Ok(service_set) => {
317 m.update_from_service_set(&service_set);
318 }
319 Err(err) => {
320 tracing::error!(
321 "Background scrape failed for {}: {}",
322 c.service_name(),
323 err
324 );
325 m.reset_to_zeros();
326 }
327 }
328
329 interval.tick().await;
330 }
331 });
332
333 Ok(())
334 }
335
336 pub async fn stats_stream(&self) -> Result<()> {
343 unimplemented!("collect_stats")
344 }
345
346 pub fn service_builder(&self) -> service::ServiceConfigBuilder {
347 service::ServiceConfigBuilder::from_component(self.clone())
348 }
349}
350
351impl ComponentBuilder {
352 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
353 Self::default().drt(drt)
354 }
355}
356
357#[derive(Debug, Clone)]
358pub struct Endpoint {
359 component: Component,
360
361 name: String,
364
365 is_static: bool,
366
367 labels: Vec<(String, String)>,
369}
370
371impl Hash for Endpoint {
372 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
373 self.component.hash(state);
374 self.name.hash(state);
375 self.is_static.hash(state);
376 }
377}
378
379impl PartialEq for Endpoint {
380 fn eq(&self, other: &Self) -> bool {
381 self.component == other.component
382 && self.name == other.name
383 && self.is_static == other.is_static
384 }
385}
386
387impl Eq for Endpoint {}
388
389impl DistributedRuntimeProvider for Endpoint {
390 fn drt(&self) -> &DistributedRuntime {
391 self.component.drt()
392 }
393}
394
395impl RuntimeProvider for Endpoint {
396 fn rt(&self) -> &Runtime {
397 self.component.rt()
398 }
399}
400
401impl MetricsRegistry for Endpoint {
402 fn basename(&self) -> String {
403 self.name.clone()
404 }
405
406 fn parent_hierarchy(&self) -> Vec<String> {
407 [
408 self.component.parent_hierarchy(),
409 vec![self.component.basename()],
410 ]
411 .concat()
412 }
413}
414
415impl Endpoint {
416 pub fn id(&self) -> EndpointId {
417 EndpointId {
418 namespace: self.component.namespace().name().to_string(),
419 component: self.component.name().to_string(),
420 name: self.name().to_string(),
421 }
422 }
423
424 pub fn name(&self) -> &str {
425 &self.name
426 }
427
428 pub fn component(&self) -> &Component {
429 &self.component
430 }
431
432 pub fn path(&self) -> String {
434 format!(
435 "{}/{}/{}",
436 self.component.path(),
437 ENDPOINT_KEYWORD,
438 self.name
439 )
440 }
441
442 pub fn etcd_root(&self) -> String {
444 let component_path = self.component.etcd_root();
445 let endpoint_name = &self.name;
446 format!("{component_path}/{endpoint_name}")
447 }
448
449 pub fn etcd_path(&self) -> EtcdPath {
451 EtcdPath::new_endpoint(
452 &self.component.namespace().name(),
453 &self.component.name(),
454 &self.name,
455 )
456 .expect("Endpoint name and component name should be valid")
457 }
458
459 pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String {
461 let endpoint_root = self.etcd_root();
462 if self.is_static {
463 endpoint_root
464 } else {
465 format!("{endpoint_root}:{lease_id:x}")
466 }
467 }
468
469 pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
471 if self.is_static {
472 self.etcd_path()
473 } else {
474 EtcdPath::new_endpoint_with_lease(
475 &self.component.namespace().name(),
476 &self.component.name(),
477 &self.name,
478 lease_id,
479 )
480 .expect("Endpoint name and component name should be valid")
481 }
482 }
483
484 pub fn name_with_id(&self, lease_id: i64) -> String {
485 if self.is_static {
486 self.name.clone()
487 } else {
488 format!("{}-{:x}", self.name, lease_id)
489 }
490 }
491
492 pub fn subject(&self) -> String {
493 format!("{}.{}", self.component.service_name(), self.name)
494 }
495
496 pub fn subject_to(&self, lease_id: i64) -> String {
498 format!(
499 "{}.{}",
500 self.component.service_name(),
501 self.name_with_id(lease_id)
502 )
503 }
504
505 pub async fn client(&self) -> Result<client::Client> {
506 if self.is_static {
507 client::Client::new_static(self.clone()).await
508 } else {
509 client::Client::new_dynamic(self.clone()).await
510 }
511 }
512
513 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
514 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
515 }
516}
517
518#[derive(Builder, Clone, Validate)]
519#[builder(pattern = "owned")]
520pub struct Namespace {
521 #[builder(private)]
522 runtime: Arc<DistributedRuntime>,
523
524 #[validate(custom(function = "validate_allowed_chars"))]
525 name: String,
526
527 is_static: bool,
528
529 #[builder(default = "None")]
530 parent: Option<Arc<Namespace>>,
531
532 #[builder(default = "Vec::new()")]
534 labels: Vec<(String, String)>,
535}
536
537impl DistributedRuntimeProvider for Namespace {
538 fn drt(&self) -> &DistributedRuntime {
539 &self.runtime
540 }
541}
542
543impl std::fmt::Debug for Namespace {
544 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
545 write!(
546 f,
547 "Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
548 self.name, self.is_static, self.parent
549 )
550 }
551}
552
553impl RuntimeProvider for Namespace {
554 fn rt(&self) -> &Runtime {
555 self.runtime.rt()
556 }
557}
558
559impl std::fmt::Display for Namespace {
560 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561 write!(f, "{}", self.name)
562 }
563}
564
565impl Namespace {
566 pub(crate) fn new(runtime: DistributedRuntime, name: String, is_static: bool) -> Result<Self> {
567 Ok(NamespaceBuilder::default()
568 .runtime(Arc::new(runtime))
569 .name(name)
570 .is_static(is_static)
571 .build()?)
572 }
573
574 pub fn component(&self, name: impl Into<String>) -> Result<Component> {
576 Ok(ComponentBuilder::from_runtime(self.runtime.clone())
577 .name(name)
578 .namespace(self.clone())
579 .is_static(self.is_static)
580 .build()?)
581 }
582
583 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
585 Ok(NamespaceBuilder::default()
586 .runtime(self.runtime.clone())
587 .name(name.into())
588 .is_static(self.is_static)
589 .parent(Some(Arc::new(self.clone())))
590 .build()?)
591 }
592
593 pub fn etcd_path(&self) -> String {
594 format!("{}{}", ETCD_ROOT_PATH, self.name())
595 }
596
597 pub fn name(&self) -> String {
598 match &self.parent {
599 Some(parent) => format!("{}.{}", parent.name(), self.name),
600 None => self.name.clone(),
601 }
602 }
603}
604
605fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
607 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
609
610 if regex.is_match(input) {
611 Ok(())
612 } else {
613 Err(ValidationError::new("invalid_characters"))
614 }
615}
616
617