1use std::fmt;
33
34use crate::{
35 config::HealthStatus,
36 discovery::Lease,
37 metrics::{MetricsRegistry, prometheus_names},
38 service::ServiceSet,
39 transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
40};
41
42use super::{
43 DistributedRuntime, Result, Runtime, error,
44 traits::*,
45 transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
46 transports::nats::Slug,
47 utils::Duration,
48};
49
50use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
51use crate::protocols::EndpointId;
52use crate::service::ComponentNatsServerPrometheusMetrics;
53use async_nats::{
54 rustls::quic,
55 service::{Service, ServiceExt},
56};
57use derive_builder::Builder;
58use derive_getters::Getters;
59use educe::Educe;
60use serde::{Deserialize, Serialize};
61use service::EndpointStatsHandler;
62use std::{collections::HashMap, hash::Hash, sync::Arc};
63use validator::{Validate, ValidationError};
64
65mod client;
66#[allow(clippy::module_inception)]
67mod component;
68mod endpoint;
69mod namespace;
70mod registry;
71pub mod service;
72
73pub use client::{Client, InstanceSource};
74
75pub const INSTANCE_ROOT_PATH: &str = "v1/instances";
78
79#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
80#[serde(rename_all = "snake_case")]
81pub enum TransportType {
82 NatsTcp(String),
83}
84
85#[derive(Default)]
86pub struct RegistryInner {
87 services: HashMap<String, Service>,
88 stats_handlers: HashMap<String, Arc<std::sync::Mutex<HashMap<String, EndpointStatsHandler>>>>,
89}
90
91#[derive(Clone)]
92pub struct Registry {
93 inner: Arc<tokio::sync::Mutex<RegistryInner>>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97pub struct Instance {
98 pub component: String,
99 pub endpoint: String,
100 pub namespace: String,
101 pub instance_id: i64,
102 pub transport: TransportType,
103}
104
105impl Instance {
106 pub fn id(&self) -> i64 {
107 self.instance_id
108 }
109 pub fn endpoint_id(&self) -> EndpointId {
110 EndpointId {
111 namespace: self.namespace.clone(),
112 component: self.component.clone(),
113 name: self.endpoint.clone(),
114 }
115 }
116}
117
118impl fmt::Display for Instance {
119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 write!(
121 f,
122 "{}/{}/{}/{}",
123 self.namespace, self.component, self.endpoint, self.instance_id
124 )
125 }
126}
127
128impl std::cmp::Ord for Instance {
130 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
131 self.to_string().cmp(&other.to_string())
132 }
133}
134
135impl PartialOrd for Instance {
136 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
137 Some(self.cmp(other))
139 }
140}
141
142#[derive(Educe, Builder, Clone, Validate)]
148#[educe(Debug)]
149#[builder(pattern = "owned")]
150pub struct Component {
151 #[builder(private)]
152 #[educe(Debug(ignore))]
153 drt: Arc<DistributedRuntime>,
154
155 #[builder(setter(into))]
158 #[validate(custom(function = "validate_allowed_chars"))]
159 name: String,
160
161 #[builder(default = "Vec::new()")]
163 labels: Vec<(String, String)>,
164
165 #[builder(setter(into))]
168 namespace: Namespace,
169
170 is_static: bool,
173}
174
175impl Hash for Component {
176 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
177 self.namespace.name().hash(state);
178 self.name.hash(state);
179 self.is_static.hash(state);
180 }
181}
182
183impl PartialEq for Component {
184 fn eq(&self, other: &Self) -> bool {
185 self.namespace.name() == other.namespace.name()
186 && self.name == other.name
187 && self.is_static == other.is_static
188 }
189}
190
191impl Eq for Component {}
192
193impl std::fmt::Display for Component {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 write!(f, "{}.{}", self.namespace.name(), self.name)
196 }
197}
198
199impl DistributedRuntimeProvider for Component {
200 fn drt(&self) -> &DistributedRuntime {
201 &self.drt
202 }
203}
204
205impl RuntimeProvider for Component {
206 fn rt(&self) -> &Runtime {
207 self.drt.rt()
208 }
209}
210
211impl MetricsRegistry for Component {
212 fn basename(&self) -> String {
213 self.name.clone()
214 }
215
216 fn parent_hierarchy(&self) -> Vec<String> {
217 [
218 self.namespace.parent_hierarchy(),
219 vec![self.namespace.basename()],
220 ]
221 .concat()
222 }
223}
224
225impl Component {
226 pub fn instance_root(&self) -> String {
228 let ns = self.namespace.name();
229 let cp = &self.name;
230 format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
231 }
232
233 pub fn service_name(&self) -> String {
234 let service_name = format!("{}_{}", self.namespace.name(), self.name);
235 Slug::slugify(&service_name).to_string()
236 }
237
238 pub fn path(&self) -> String {
239 format!("{}/{}", self.namespace.name(), self.name)
240 }
241
242 pub fn etcd_path(&self) -> EtcdPath {
243 EtcdPath::new_component(&self.namespace.name(), &self.name)
244 .expect("Component name and namespace should be valid")
245 }
246
247 pub fn namespace(&self) -> &Namespace {
248 &self.namespace
249 }
250
251 pub fn name(&self) -> &str {
252 &self.name
253 }
254
255 pub fn labels(&self) -> &[(String, String)] {
256 &self.labels
257 }
258
259 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
260 Endpoint {
261 component: self.clone(),
262 name: endpoint.into(),
263 is_static: self.is_static,
264 labels: Vec::new(),
265 }
266 }
267
268 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
269 let client = self.drt.store();
270 let Some(bucket) = client.get_bucket(&self.instance_root()).await? else {
271 return Ok(vec![]);
272 };
273 let entries = bucket.entries().await?;
274 let mut instances = Vec::with_capacity(entries.len());
275 for (name, bytes) in entries.into_iter() {
276 let val = match serde_json::from_slice::<Instance>(&bytes) {
277 Ok(val) => val,
278 Err(err) => {
279 anyhow::bail!("Error converting storage response to Instance: {err}. {name}",);
280 }
281 };
282 instances.push(val);
283 }
284 instances.sort();
285 Ok(instances)
286 }
287
288 pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
291 let service_name = self.service_name();
293 let service_client = self.drt().service_client();
294 service_client
295 .collect_services(&service_name, timeout)
296 .await
297 }
298
299 pub fn start_scraping_nats_service_component_metrics(&self) -> Result<()> {
307 const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800); let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
311
312 let component_clone = self.clone();
313 let mut hierarchies = self.parent_hierarchy();
314 hierarchies.push(self.hierarchy());
315 debug_assert!(
316 hierarchies
317 .last()
318 .map(|x| x.as_str())
319 .unwrap_or_default()
320 .eq_ignore_ascii_case(&self.service_name())
321 ); let m = component_metrics.clone();
325 let c = component_clone.clone();
326
327 c.drt().runtime().secondary().spawn(async move {
336 let timeout = std::time::Duration::from_millis(500);
337 let mut interval = tokio::time::interval(MAX_WAIT_MS);
338 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
339
340 loop {
341 match c.scrape_stats(timeout).await {
342 Ok(service_set) => {
343 m.update_from_service_set(&service_set);
344 }
345 Err(err) => {
346 tracing::error!(
347 "Background scrape failed for {}: {}",
348 c.service_name(),
349 err
350 );
351 m.reset_to_zeros();
352 }
353 }
354
355 interval.tick().await;
356 }
357 });
358
359 Ok(())
360 }
361
362 pub async fn stats_stream(&self) -> Result<()> {
369 unimplemented!("collect_stats")
370 }
371
372 pub fn service_builder(&self) -> service::ServiceConfigBuilder {
373 service::ServiceConfigBuilder::from_component(self.clone())
374 }
375}
376
377impl ComponentBuilder {
378 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
379 Self::default().drt(drt)
380 }
381}
382
383#[derive(Debug, Clone)]
384pub struct Endpoint {
385 component: Component,
386
387 name: String,
390
391 is_static: bool,
392
393 labels: Vec<(String, String)>,
395}
396
397impl Hash for Endpoint {
398 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
399 self.component.hash(state);
400 self.name.hash(state);
401 self.is_static.hash(state);
402 }
403}
404
405impl PartialEq for Endpoint {
406 fn eq(&self, other: &Self) -> bool {
407 self.component == other.component
408 && self.name == other.name
409 && self.is_static == other.is_static
410 }
411}
412
413impl Eq for Endpoint {}
414
415impl DistributedRuntimeProvider for Endpoint {
416 fn drt(&self) -> &DistributedRuntime {
417 self.component.drt()
418 }
419}
420
421impl RuntimeProvider for Endpoint {
422 fn rt(&self) -> &Runtime {
423 self.component.rt()
424 }
425}
426
427impl MetricsRegistry for Endpoint {
428 fn basename(&self) -> String {
429 self.name.clone()
430 }
431
432 fn parent_hierarchy(&self) -> Vec<String> {
433 [
434 self.component.parent_hierarchy(),
435 vec![self.component.basename()],
436 ]
437 .concat()
438 }
439}
440
441impl Endpoint {
442 pub fn id(&self) -> EndpointId {
443 EndpointId {
444 namespace: self.component.namespace().name().to_string(),
445 component: self.component.name().to_string(),
446 name: self.name().to_string(),
447 }
448 }
449
450 pub fn name(&self) -> &str {
451 &self.name
452 }
453
454 pub fn component(&self) -> &Component {
455 &self.component
456 }
457
458 pub fn path(&self) -> String {
460 format!(
461 "{}/{}/{}",
462 self.component.path(),
463 ENDPOINT_KEYWORD,
464 self.name
465 )
466 }
467
468 pub fn etcd_root(&self) -> String {
470 let component_path = self.component.instance_root();
471 let endpoint_name = &self.name;
472 format!("{component_path}/{endpoint_name}")
473 }
474
475 pub fn etcd_path(&self) -> EtcdPath {
477 EtcdPath::new_endpoint(
478 &self.component.namespace().name(),
479 self.component.name(),
480 &self.name,
481 )
482 .expect("Endpoint name and component name should be valid")
483 }
484
485 pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String {
487 format!("{INSTANCE_ROOT_PATH}/{}", self.unique_path(lease_id))
488 }
489
490 pub fn unique_path(&self, lease_id: i64) -> String {
492 let ns = self.component.namespace().name();
493 let cp = self.component.name();
494 let ep = self.name();
495 format!("{ns}/{cp}/{ep}/{lease_id:x}")
496 }
497
498 pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
500 if self.is_static {
501 self.etcd_path()
502 } else {
503 EtcdPath::new_endpoint_with_lease(
504 &self.component.namespace().name(),
505 self.component.name(),
506 &self.name,
507 lease_id,
508 )
509 .expect("Endpoint name and component name should be valid")
510 }
511 }
512
513 pub fn name_with_id(&self, lease_id: i64) -> String {
514 if self.is_static {
515 self.name.clone()
516 } else {
517 format!("{}-{:x}", self.name, lease_id)
518 }
519 }
520
521 pub fn subject(&self) -> String {
522 format!("{}.{}", self.component.service_name(), self.name)
523 }
524
525 pub fn subject_to(&self, lease_id: i64) -> String {
527 format!(
528 "{}.{}",
529 self.component.service_name(),
530 self.name_with_id(lease_id)
531 )
532 }
533
534 pub async fn client(&self) -> Result<client::Client> {
535 if self.is_static {
536 client::Client::new_static(self.clone()).await
537 } else {
538 client::Client::new_dynamic(self.clone()).await
539 }
540 }
541
542 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
543 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
544 }
545}
546
547#[derive(Builder, Clone, Validate)]
548#[builder(pattern = "owned")]
549pub struct Namespace {
550 #[builder(private)]
551 runtime: Arc<DistributedRuntime>,
552
553 #[validate(custom(function = "validate_allowed_chars"))]
554 name: String,
555
556 is_static: bool,
557
558 #[builder(default = "None")]
559 parent: Option<Arc<Namespace>>,
560
561 #[builder(default = "Vec::new()")]
563 labels: Vec<(String, String)>,
564}
565
566impl DistributedRuntimeProvider for Namespace {
567 fn drt(&self) -> &DistributedRuntime {
568 &self.runtime
569 }
570}
571
572impl std::fmt::Debug for Namespace {
573 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
574 write!(
575 f,
576 "Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
577 self.name, self.is_static, self.parent
578 )
579 }
580}
581
582impl RuntimeProvider for Namespace {
583 fn rt(&self) -> &Runtime {
584 self.runtime.rt()
585 }
586}
587
588impl std::fmt::Display for Namespace {
589 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
590 write!(f, "{}", self.name)
591 }
592}
593
594impl Namespace {
595 pub(crate) fn new(runtime: DistributedRuntime, name: String, is_static: bool) -> Result<Self> {
596 Ok(NamespaceBuilder::default()
597 .runtime(Arc::new(runtime))
598 .name(name)
599 .is_static(is_static)
600 .build()?)
601 }
602
603 pub fn component(&self, name: impl Into<String>) -> Result<Component> {
605 Ok(ComponentBuilder::from_runtime(self.runtime.clone())
606 .name(name)
607 .namespace(self.clone())
608 .is_static(self.is_static)
609 .build()?)
610 }
611
612 pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
614 Ok(NamespaceBuilder::default()
615 .runtime(self.runtime.clone())
616 .name(name.into())
617 .is_static(self.is_static)
618 .parent(Some(Arc::new(self.clone())))
619 .build()?)
620 }
621
622 pub fn etcd_path(&self) -> String {
623 format!("{ETCD_ROOT_PATH}{}", self.name())
624 }
625
626 pub fn name(&self) -> String {
627 match &self.parent {
628 Some(parent) => format!("{}.{}", parent.name(), self.name),
629 None => self.name.clone(),
630 }
631 }
632}
633
634fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
636 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
638
639 if regex.is_match(input) {
640 Ok(())
641 } else {
642 Err(ValidationError::new("invalid_characters"))
643 }
644}
645
646