1use std::fmt;
33
34use crate::{
35 config::HealthStatus,
36 distributed::RequestPlaneMode,
37 metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
38 service::ServiceClient,
39 service::ServiceSet,
40};
41
42use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, utils::Duration};
43
44use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
45use crate::protocols::EndpointId;
46use async_nats::{
47 rustls::quic,
48 service::{Service, ServiceExt},
49};
50use derive_builder::Builder;
51use derive_getters::Getters;
52use educe::Educe;
53use serde::{Deserialize, Serialize};
54use std::{collections::HashMap, hash::Hash, sync::Arc};
55use validator::{Validate, ValidationError};
56
57mod client;
58#[allow(clippy::module_inception)]
59mod component;
60mod endpoint;
61mod namespace;
62mod registry;
63pub mod service;
64
65pub use client::Client;
66pub use endpoint::build_transport_type;
67
68#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
69#[serde(rename_all = "snake_case")]
70pub enum TransportType {
71 #[serde(rename = "nats_tcp")]
72 Nats(String),
73 Http(String),
74 Tcp(String),
75}
76
77#[derive(Default)]
78pub struct RegistryInner {
79 pub(crate) services: HashMap<String, Service>,
80}
81
82#[derive(Clone)]
83pub struct Registry {
84 pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
88pub struct Instance {
89 pub component: String,
90 pub endpoint: String,
91 pub namespace: String,
92 pub instance_id: u64,
93 pub transport: TransportType,
94}
95
96impl Instance {
97 pub fn id(&self) -> u64 {
98 self.instance_id
99 }
100 pub fn endpoint_id(&self) -> EndpointId {
101 EndpointId {
102 namespace: self.namespace.clone(),
103 component: self.component.clone(),
104 name: self.endpoint.clone(),
105 }
106 }
107}
108
109impl fmt::Display for Instance {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 write!(
112 f,
113 "{}/{}/{}/{}",
114 self.namespace, self.component, self.endpoint, self.instance_id
115 )
116 }
117}
118
119impl std::cmp::Ord for Instance {
121 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
122 self.to_string().cmp(&other.to_string())
123 }
124}
125
126impl PartialOrd for Instance {
127 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
128 Some(self.cmp(other))
130 }
131}
132
133#[derive(Educe, Builder, Clone, Validate)]
139#[educe(Debug)]
140#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
141pub struct Component {
142 #[builder(private)]
143 #[educe(Debug(ignore))]
144 drt: Arc<DistributedRuntime>,
145
146 #[builder(setter(into))]
148 #[validate(custom(function = "validate_allowed_chars"))]
149 name: String,
150
151 #[builder(default = "Vec::new()")]
153 labels: Vec<(String, String)>,
154
155 #[builder(setter(into))]
158 namespace: Namespace,
159
160 #[builder(default = "crate::MetricsRegistry::new()")]
162 metrics_registry: crate::MetricsRegistry,
163}
164
165impl Hash for Component {
166 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
167 self.namespace.name().hash(state);
168 self.name.hash(state);
169 }
170}
171
172impl PartialEq for Component {
173 fn eq(&self, other: &Self) -> bool {
174 self.namespace.name() == other.namespace.name() && self.name == other.name
175 }
176}
177
178impl Eq for Component {}
179
180impl std::fmt::Display for Component {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 write!(f, "{}.{}", self.namespace.name(), self.name)
183 }
184}
185
186impl DistributedRuntimeProvider for Component {
187 fn drt(&self) -> &DistributedRuntime {
188 &self.drt
189 }
190}
191
192impl RuntimeProvider for Component {
193 fn rt(&self) -> &Runtime {
194 self.drt.rt()
195 }
196}
197
198impl MetricsHierarchy for Component {
199 fn basename(&self) -> String {
200 self.name.clone()
201 }
202
203 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
204 let mut parents = vec![];
205
206 parents.extend(self.namespace.parent_hierarchies());
208
209 parents.push(&self.namespace as &dyn MetricsHierarchy);
211
212 parents
213 }
214
215 fn get_metrics_registry(&self) -> &MetricsRegistry {
216 &self.metrics_registry
217 }
218}
219
220impl Component {
221 pub fn service_name(&self) -> String {
222 let service_name = format!("{}_{}", self.namespace.name(), self.name);
223 Slug::slugify(&service_name).to_string()
224 }
225
226 pub fn namespace(&self) -> &Namespace {
227 &self.namespace
228 }
229
230 pub fn name(&self) -> &str {
231 &self.name
232 }
233
234 pub fn labels(&self) -> &[(String, String)] {
235 &self.labels
236 }
237
238 pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
239 Endpoint {
240 component: self.clone(),
241 name: endpoint.into(),
242 labels: Vec::new(),
243 metrics_registry: crate::MetricsRegistry::new(),
244 }
245 }
246
247 pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
248 let discovery = self.drt.discovery();
249
250 let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
251 namespace: self.namespace.name(),
252 component: self.name.clone(),
253 };
254
255 let discovery_instances = discovery.list(discovery_query).await?;
256
257 let mut instances: Vec<Instance> = discovery_instances
259 .into_iter()
260 .filter_map(|di| match di {
261 crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
262 _ => None, })
264 .collect();
265
266 instances.sort();
267 Ok(instances)
268 }
269}
270
271impl ComponentBuilder {
272 pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
273 Self::default().drt(drt)
274 }
275
276 pub fn build(self) -> Result<Component, anyhow::Error> {
277 let component = self.build_internal()?;
278 let drt = component.drt();
282 if drt.request_plane().is_nats() {
283 let mut rx = drt.register_nats_service(component.clone());
284 let result = tokio::task::block_in_place(|| rx.blocking_recv());
289 match result {
290 Some(Ok(())) => {
291 tracing::debug!(
292 component = component.service_name(),
293 "NATS service registration completed"
294 );
295 }
296 Some(Err(e)) => {
297 return Err(anyhow::anyhow!(
298 "NATS service registration failed for component '{}': {}",
299 component.service_name(),
300 e
301 ));
302 }
303 None => {
304 return Err(anyhow::anyhow!(
305 "NATS service registration channel closed unexpectedly for component '{}'",
306 component.service_name()
307 ));
308 }
309 }
310 }
311 Ok(component)
312 }
313}
314
315#[derive(Debug, Clone)]
316pub struct Endpoint {
317 component: Component,
318
319 name: String,
322
323 labels: Vec<(String, String)>,
325
326 metrics_registry: crate::MetricsRegistry,
328}
329
330impl Hash for Endpoint {
331 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
332 self.component.hash(state);
333 self.name.hash(state);
334 }
335}
336
337impl PartialEq for Endpoint {
338 fn eq(&self, other: &Self) -> bool {
339 self.component == other.component && self.name == other.name
340 }
341}
342
343impl Eq for Endpoint {}
344
345impl DistributedRuntimeProvider for Endpoint {
346 fn drt(&self) -> &DistributedRuntime {
347 self.component.drt()
348 }
349}
350
351impl RuntimeProvider for Endpoint {
352 fn rt(&self) -> &Runtime {
353 self.component.rt()
354 }
355}
356
357impl MetricsHierarchy for Endpoint {
358 fn basename(&self) -> String {
359 self.name.clone()
360 }
361
362 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
363 let mut parents = vec![];
364
365 parents.extend(self.component.parent_hierarchies());
367
368 parents.push(&self.component as &dyn MetricsHierarchy);
370
371 parents
372 }
373
374 fn get_metrics_registry(&self) -> &MetricsRegistry {
375 &self.metrics_registry
376 }
377}
378
379impl Endpoint {
380 pub fn id(&self) -> EndpointId {
381 EndpointId {
382 namespace: self.component.namespace().name().to_string(),
383 component: self.component.name().to_string(),
384 name: self.name().to_string(),
385 }
386 }
387
388 pub fn name(&self) -> &str {
389 &self.name
390 }
391
392 pub fn component(&self) -> &Component {
393 &self.component
394 }
395
396 pub async fn client(&self) -> anyhow::Result<client::Client> {
397 client::Client::new(self.clone()).await
398 }
399
400 pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
401 endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
402 }
403}
404
405#[derive(Builder, Clone, Validate)]
406#[builder(pattern = "owned")]
407pub struct Namespace {
408 #[builder(private)]
409 runtime: Arc<DistributedRuntime>,
410
411 #[validate(custom(function = "validate_allowed_chars"))]
412 name: String,
413
414 #[builder(default = "None")]
415 parent: Option<Arc<Namespace>>,
416
417 #[builder(default = "Vec::new()")]
419 labels: Vec<(String, String)>,
420
421 #[builder(default = "crate::MetricsRegistry::new()")]
423 metrics_registry: crate::MetricsRegistry,
424}
425
426impl DistributedRuntimeProvider for Namespace {
427 fn drt(&self) -> &DistributedRuntime {
428 &self.runtime
429 }
430}
431
432impl std::fmt::Debug for Namespace {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 write!(
435 f,
436 "Namespace {{ name: {}; parent: {:?} }}",
437 self.name, self.parent
438 )
439 }
440}
441
442impl RuntimeProvider for Namespace {
443 fn rt(&self) -> &Runtime {
444 self.runtime.rt()
445 }
446}
447
448impl std::fmt::Display for Namespace {
449 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450 write!(f, "{}", self.name)
451 }
452}
453
454impl Namespace {
455 pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
456 Ok(NamespaceBuilder::default()
457 .runtime(Arc::new(runtime))
458 .name(name)
459 .build()?)
460 }
461
462 pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
464 ComponentBuilder::from_runtime(self.runtime.clone())
465 .name(name)
466 .namespace(self.clone())
467 .build()
468 }
469
470 pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
472 Ok(NamespaceBuilder::default()
473 .runtime(self.runtime.clone())
474 .name(name.into())
475 .parent(Some(Arc::new(self.clone())))
476 .build()?)
477 }
478
479 pub fn name(&self) -> String {
480 match &self.parent {
481 Some(parent) => format!("{}.{}", parent.name(), self.name),
482 None => self.name.clone(),
483 }
484 }
485}
486
487fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
489 let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
491
492 if regex.is_match(input) {
493 Ok(())
494 } else {
495 Err(ValidationError::new("invalid_characters"))
496 }
497}