use std::fmt;
use crate::{
config::{HealthStatus, RequestPlaneMode},
metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
service::ServiceSet,
transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
};
use super::{
DistributedRuntime, Runtime,
traits::*,
transports::etcd::{COMPONENT_KEYWORD, ENDPOINT_KEYWORD},
transports::nats::Slug,
utils::Duration,
};
use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
use crate::protocols::EndpointId;
use crate::service::ComponentNatsServerPrometheusMetrics;
use async_nats::{
rustls::quic,
service::{Service, ServiceExt},
};
use derive_builder::Builder;
use derive_getters::Getters;
use educe::Educe;
use serde::{Deserialize, Serialize};
use service::EndpointStatsHandler;
use std::{collections::HashMap, hash::Hash, sync::Arc};
use validator::{Validate, ValidationError};
mod client;
#[allow(clippy::module_inception)]
mod component;
mod endpoint;
mod namespace;
mod registry;
pub mod service;
pub use client::{Client, InstanceSource};
pub const INSTANCE_ROOT_PATH: &str = "v1/instances";
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum TransportType {
#[serde(rename = "nats_tcp")]
Nats(String),
Http(String),
Tcp(String),
}
#[derive(Default)]
pub struct RegistryInner {
pub(crate) services: HashMap<String, Service>,
pub(crate) stats_handlers:
HashMap<String, Arc<parking_lot::Mutex<HashMap<String, EndpointStatsHandler>>>>,
}
#[derive(Clone)]
pub struct Registry {
pub(crate) inner: Arc<tokio::sync::Mutex<RegistryInner>>,
is_static: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Instance {
pub component: String,
pub endpoint: String,
pub namespace: String,
pub instance_id: u64,
pub transport: TransportType,
}
impl Instance {
pub fn id(&self) -> u64 {
self.instance_id
}
pub fn endpoint_id(&self) -> EndpointId {
EndpointId {
namespace: self.namespace.clone(),
component: self.component.clone(),
name: self.endpoint.clone(),
}
}
}
impl fmt::Display for Instance {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}/{}/{}/{}",
self.namespace, self.component, self.endpoint, self.instance_id
)
}
}
impl std::cmp::Ord for Instance {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.to_string().cmp(&other.to_string())
}
}
impl PartialOrd for Instance {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Educe, Builder, Clone, Validate)]
#[educe(Debug)]
#[builder(pattern = "owned")]
pub struct Component {
#[builder(private)]
#[educe(Debug(ignore))]
drt: Arc<DistributedRuntime>,
#[builder(setter(into))]
#[validate(custom(function = "validate_allowed_chars"))]
name: String,
#[builder(default = "Vec::new()")]
labels: Vec<(String, String)>,
#[builder(setter(into))]
namespace: Namespace,
is_static: bool,
#[builder(default = "crate::MetricsRegistry::new()")]
metrics_registry: crate::MetricsRegistry,
}
impl Hash for Component {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.namespace.name().hash(state);
self.name.hash(state);
self.is_static.hash(state);
}
}
impl PartialEq for Component {
fn eq(&self, other: &Self) -> bool {
self.namespace.name() == other.namespace.name()
&& self.name == other.name
&& self.is_static == other.is_static
}
}
impl Eq for Component {}
impl std::fmt::Display for Component {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.namespace.name(), self.name)
}
}
impl DistributedRuntimeProvider for Component {
fn drt(&self) -> &DistributedRuntime {
&self.drt
}
}
impl RuntimeProvider for Component {
fn rt(&self) -> &Runtime {
self.drt.rt()
}
}
impl MetricsHierarchy for Component {
fn basename(&self) -> String {
self.name.clone()
}
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
let mut parents = vec![];
parents.extend(self.namespace.parent_hierarchies());
parents.push(&self.namespace as &dyn MetricsHierarchy);
parents
}
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
}
impl Component {
pub fn instance_root(&self) -> String {
let ns = self.namespace.name();
let cp = &self.name;
format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
}
pub fn service_name(&self) -> String {
let service_name = format!("{}_{}", self.namespace.name(), self.name);
Slug::slugify(&service_name).to_string()
}
pub fn path(&self) -> String {
format!("{}/{}", self.namespace.name(), self.name)
}
pub fn etcd_path(&self) -> EtcdPath {
EtcdPath::new_component(&self.namespace.name(), &self.name)
.expect("Component name and namespace should be valid")
}
pub fn namespace(&self) -> &Namespace {
&self.namespace
}
pub fn name(&self) -> &str {
&self.name
}
pub fn labels(&self) -> &[(String, String)] {
&self.labels
}
pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
Endpoint {
component: self.clone(),
name: endpoint.into(),
is_static: self.is_static,
labels: Vec::new(),
metrics_registry: crate::MetricsRegistry::new(),
}
}
pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
let discovery = self.drt.discovery();
let discovery_query = crate::discovery::DiscoveryQuery::ComponentEndpoints {
namespace: self.namespace.name(),
component: self.name.clone(),
};
let discovery_instances = discovery.list(discovery_query).await?;
let mut instances: Vec<Instance> = discovery_instances
.into_iter()
.filter_map(|di| match di {
crate::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
_ => None, })
.collect();
instances.sort();
Ok(instances)
}
pub async fn scrape_stats(&self, timeout: Duration) -> anyhow::Result<ServiceSet> {
let service_name = self.service_name();
let Some(service_client) = self.drt().service_client() else {
anyhow::bail!("ServiceSet is gathered via NATS, do not call this in non-NATS setups.");
};
service_client
.collect_services(&service_name, timeout)
.await
}
pub fn start_scraping_nats_service_component_metrics(&self) -> anyhow::Result<()> {
const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800);
let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
let component_clone = self.clone();
let m = component_metrics.clone();
let c = component_clone.clone();
c.drt().runtime().secondary().spawn(async move {
let timeout = std::time::Duration::from_millis(500);
let mut interval = tokio::time::interval(MAX_WAIT_MS);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
match c.scrape_stats(timeout).await {
Ok(service_set) => {
m.update_from_service_set(&service_set);
}
Err(err) => {
tracing::error!(
"Background scrape failed for {}: {}",
c.service_name(),
err
);
m.reset_to_zeros();
}
}
interval.tick().await;
}
});
Ok(())
}
pub async fn stats_stream(&self) -> anyhow::Result<()> {
unimplemented!("collect_stats")
}
pub async fn add_stats_service(&mut self) -> anyhow::Result<()> {
let service_name = self.service_name();
if self
.drt
.component_registry()
.inner
.lock()
.await
.services
.contains_key(&service_name)
{
anyhow::bail!("Service {service_name} already exists");
}
let Some(nats_client) = self.drt.nats_client() else {
anyhow::bail!("Cannot create NATS service without NATS.");
};
let description = None;
let (nats_service, stats_reg) =
service::build_nats_service(nats_client, self, description).await?;
let mut guard = self.drt.component_registry().inner.lock().await;
if !guard.services.contains_key(&service_name) {
guard.services.insert(service_name.clone(), nats_service);
guard.stats_handlers.insert(service_name.clone(), stats_reg);
drop(guard);
} else {
drop(guard);
let _ = nats_service.stop().await;
return Err(anyhow::anyhow!(
"Service create race for {service_name}, now already exists"
));
}
let request_plane_mode = RequestPlaneMode::get();
match request_plane_mode {
RequestPlaneMode::Nats => {
if let Err(err) = self.start_scraping_nats_service_component_metrics() {
tracing::debug!(
"Metrics registration failed for '{}': {}",
self.service_name(),
err
);
}
}
_ => {
tracing::info!(
"Skipping NATS service metrics collection for '{}' - request plane mode is '{}'",
self.service_name(),
request_plane_mode
);
}
}
Ok(())
}
}
impl ComponentBuilder {
pub fn from_runtime(drt: Arc<DistributedRuntime>) -> Self {
Self::default().drt(drt)
}
}
#[derive(Debug, Clone)]
pub struct Endpoint {
component: Component,
name: String,
is_static: bool,
labels: Vec<(String, String)>,
metrics_registry: crate::MetricsRegistry,
}
impl Hash for Endpoint {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.component.hash(state);
self.name.hash(state);
self.is_static.hash(state);
}
}
impl PartialEq for Endpoint {
fn eq(&self, other: &Self) -> bool {
self.component == other.component
&& self.name == other.name
&& self.is_static == other.is_static
}
}
impl Eq for Endpoint {}
impl DistributedRuntimeProvider for Endpoint {
fn drt(&self) -> &DistributedRuntime {
self.component.drt()
}
}
impl RuntimeProvider for Endpoint {
fn rt(&self) -> &Runtime {
self.component.rt()
}
}
impl MetricsHierarchy for Endpoint {
fn basename(&self) -> String {
self.name.clone()
}
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
let mut parents = vec![];
parents.extend(self.component.parent_hierarchies());
parents.push(&self.component as &dyn MetricsHierarchy);
parents
}
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
}
impl Endpoint {
pub fn id(&self) -> EndpointId {
EndpointId {
namespace: self.component.namespace().name().to_string(),
component: self.component.name().to_string(),
name: self.name().to_string(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn component(&self) -> &Component {
&self.component
}
pub fn path(&self) -> String {
format!(
"{}/{}/{}",
self.component.path(),
ENDPOINT_KEYWORD,
self.name
)
}
pub fn etcd_root(&self) -> String {
let component_path = self.component.instance_root();
let endpoint_name = &self.name;
format!("{component_path}/{endpoint_name}")
}
pub fn etcd_path(&self) -> EtcdPath {
EtcdPath::new_endpoint(
&self.component.namespace().name(),
self.component.name(),
&self.name,
)
.expect("Endpoint name and component name should be valid")
}
pub fn etcd_path_with_lease_id(&self, lease_id: u64) -> String {
format!("{INSTANCE_ROOT_PATH}/{}", self.unique_path(lease_id))
}
pub fn unique_path(&self, lease_id: u64) -> String {
let ns = self.component.namespace().name();
let cp = self.component.name();
let ep = self.name();
format!("{ns}/{cp}/{ep}/{lease_id:x}")
}
pub fn etcd_path_object_with_lease_id(&self, lease_id: i64) -> EtcdPath {
if self.is_static {
self.etcd_path()
} else {
EtcdPath::new_endpoint_with_lease(
&self.component.namespace().name(),
self.component.name(),
&self.name,
lease_id,
)
.expect("Endpoint name and component name should be valid")
}
}
pub fn name_with_id(&self, lease_id: u64) -> String {
if self.is_static {
self.name.clone()
} else {
format!("{}-{:x}", self.name, lease_id)
}
}
pub fn subject(&self) -> String {
format!("{}.{}", self.component.service_name(), self.name)
}
pub fn subject_to(&self, lease_id: u64) -> String {
format!(
"{}.{}",
self.component.service_name(),
self.name_with_id(lease_id)
)
}
pub async fn client(&self) -> anyhow::Result<client::Client> {
if self.is_static {
client::Client::new_static(self.clone()).await
} else {
client::Client::new_dynamic(self.clone()).await
}
}
pub fn endpoint_builder(&self) -> endpoint::EndpointConfigBuilder {
endpoint::EndpointConfigBuilder::from_endpoint(self.clone())
}
}
#[derive(Builder, Clone, Validate)]
#[builder(pattern = "owned")]
pub struct Namespace {
#[builder(private)]
runtime: Arc<DistributedRuntime>,
#[validate(custom(function = "validate_allowed_chars"))]
name: String,
is_static: bool,
#[builder(default = "None")]
parent: Option<Arc<Namespace>>,
#[builder(default = "Vec::new()")]
labels: Vec<(String, String)>,
#[builder(default = "crate::MetricsRegistry::new()")]
metrics_registry: crate::MetricsRegistry,
}
impl DistributedRuntimeProvider for Namespace {
fn drt(&self) -> &DistributedRuntime {
&self.runtime
}
}
impl std::fmt::Debug for Namespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Namespace {{ name: {}; is_static: {}; parent: {:?} }}",
self.name, self.is_static, self.parent
)
}
}
impl RuntimeProvider for Namespace {
fn rt(&self) -> &Runtime {
self.runtime.rt()
}
}
impl std::fmt::Display for Namespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
}
impl Namespace {
pub(crate) fn new(
runtime: DistributedRuntime,
name: String,
is_static: bool,
) -> anyhow::Result<Self> {
Ok(NamespaceBuilder::default()
.runtime(Arc::new(runtime))
.name(name)
.is_static(is_static)
.build()?)
}
pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
Ok(ComponentBuilder::from_runtime(self.runtime.clone())
.name(name)
.namespace(self.clone())
.is_static(self.is_static)
.build()?)
}
pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
Ok(NamespaceBuilder::default()
.runtime(self.runtime.clone())
.name(name.into())
.is_static(self.is_static)
.parent(Some(Arc::new(self.clone())))
.build()?)
}
pub fn etcd_path(&self) -> String {
format!("{ETCD_ROOT_PATH}{}", self.name())
}
pub fn name(&self) -> String {
match &self.parent {
Some(parent) => format!("{}.{}", parent.name(), self.name),
None => self.name.clone(),
}
}
}
fn validate_allowed_chars(input: &str) -> Result<(), ValidationError> {
let regex = regex::Regex::new(r"^[a-z0-9-_]+$").unwrap();
if regex.is_match(input) {
Ok(())
} else {
Err(ValidationError::new("invalid_characters"))
}
}