use crate::{
DistributedRuntime,
component::Component,
metrics::{MetricsHierarchy, prometheus_names, prometheus_names::nats_service},
traits::*,
transports::nats,
utils::stream,
};
use anyhow::Result;
use anyhow::anyhow as error;
use async_nats::Message;
use async_stream::try_stream;
use bytes::Bytes;
use derive_getters::Dissolve;
use futures::stream::{StreamExt, TryStreamExt};
use prometheus;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use std::time::Duration;
pub struct ServiceClient {
nats_client: nats::Client,
}
impl ServiceClient {
pub fn new(nats_client: nats::Client) -> Self {
ServiceClient { nats_client }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceSet {
services: Vec<ServiceInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceInfo {
pub name: String,
pub id: String,
pub version: String,
pub started: String,
pub endpoints: Vec<EndpointInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct EndpointInfo {
pub name: String,
pub subject: String,
#[serde(flatten)]
pub data: Option<NatsStatsMetrics>,
}
impl EndpointInfo {
pub fn id(&self) -> Result<i64> {
let id = self
.subject
.split('-')
.next_back()
.ok_or_else(|| error!("No id found in subject"))?;
i64::from_str_radix(id, 16).map_err(|e| error!("Invalid id format: {}", e))
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct NatsStatsMetrics {
pub average_processing_time: u64, pub last_error: String,
pub num_errors: u64,
pub num_requests: u64,
pub processing_time: u64, pub queue_group: String,
pub data: serde_json::Value,
}
impl NatsStatsMetrics {
pub fn decode<T: for<'de> Deserialize<'de>>(self) -> Result<T> {
serde_json::from_value(self.data).map_err(Into::into)
}
}
impl ServiceClient {
pub async fn unary(
&self,
subject: impl Into<String>,
payload: impl Into<Bytes>,
) -> Result<Message> {
let response = self
.nats_client
.client()
.request(subject.into(), payload.into())
.await?;
Ok(response)
}
pub async fn collect_services(
&self,
service_name: &str,
timeout: Duration,
) -> Result<ServiceSet> {
let sub = self.nats_client.scrape_service(service_name).await?;
if timeout.is_zero() {
tracing::warn!("collect_services: timeout is zero");
}
if timeout > Duration::from_secs(10) {
tracing::warn!("collect_services: timeout is greater than 10 seconds");
}
let deadline = tokio::time::Instant::now() + timeout;
let mut services = vec![];
let mut s = stream::until_deadline(sub, deadline);
while let Some(message) = s.next().await {
if message.payload.is_empty() {
tracing::trace!(service_name, "collect_services: empty payload from nats");
continue;
}
let info = serde_json::from_slice::<ServiceInfo>(&message.payload);
match info {
Ok(info) => services.push(info),
Err(err) => {
let payload = String::from_utf8_lossy(&message.payload);
tracing::debug!(%err, service_name, %payload, "error decoding service info");
}
}
}
Ok(ServiceSet { services })
}
}
impl ServiceSet {
pub fn into_endpoints(self) -> impl Iterator<Item = EndpointInfo> {
self.services
.into_iter()
.flat_map(|s| s.endpoints.into_iter())
}
pub fn services(&self) -> &[ServiceInfo] {
&self.services
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_service_set() {
let services = vec![
ServiceInfo {
name: "service1".to_string(),
id: "1".to_string(),
version: "1.0".to_string(),
started: "2021-01-01".to_string(),
endpoints: vec![
EndpointInfo {
name: "endpoint1".to_string(),
subject: "subject1".to_string(),
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
processing_time: 100,
queue_group: "group1".to_string(),
data: serde_json::json!({"key": "value1"}),
}),
},
EndpointInfo {
name: "endpoint2-foo".to_string(),
subject: "subject2".to_string(),
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
processing_time: 100,
queue_group: "group1".to_string(),
data: serde_json::json!({"key": "value1"}),
}),
},
],
},
ServiceInfo {
name: "service1".to_string(),
id: "2".to_string(),
version: "1.0".to_string(),
started: "2021-01-01".to_string(),
endpoints: vec![
EndpointInfo {
name: "endpoint1".to_string(),
subject: "subject1".to_string(),
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
processing_time: 100,
queue_group: "group1".to_string(),
data: serde_json::json!({"key": "value1"}),
}),
},
EndpointInfo {
name: "endpoint2-bar".to_string(),
subject: "subject2".to_string(),
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
processing_time: 100,
queue_group: "group1".to_string(),
data: serde_json::json!({"key": "value2"}),
}),
},
],
},
];
let service_set = ServiceSet { services };
let endpoints: Vec<_> = service_set
.into_endpoints()
.filter(|e| e.name.starts_with("endpoint2"))
.collect();
assert_eq!(endpoints.len(), 2);
}
}
#[derive(Debug, Clone)]
pub struct ComponentNatsServerPrometheusMetrics {
pub service_processing_ms_avg: prometheus::Gauge,
pub service_errors_total: prometheus::IntGauge,
pub service_requests_total: prometheus::IntGauge,
pub service_processing_ms_total: prometheus::IntGauge,
pub service_active_services: prometheus::IntGauge,
pub service_active_endpoints: prometheus::IntGauge,
}
impl ComponentNatsServerPrometheusMetrics {
pub fn new(component: &Component) -> Result<Self> {
let service_name = component.service_name();
let mut labels_vec = vec![("service_name", service_name.as_str())];
for (key, value) in component.labels() {
labels_vec.push((key.as_str(), value.as_str()));
}
let labels: &[(&str, &str)] = &labels_vec;
let service_processing_ms_avg = component.metrics().create_gauge(
nats_service::PROCESSING_MS_AVG,
"Average processing time across all component endpoints in milliseconds",
labels,
)?;
let service_errors_total = component.metrics().create_intgauge(
nats_service::ERRORS_TOTAL,
"Total number of errors across all component endpoints",
labels,
)?;
let service_requests_total = component.metrics().create_intgauge(
nats_service::REQUESTS_TOTAL,
"Total number of requests across all component endpoints",
labels,
)?;
let service_processing_ms_total = component.metrics().create_intgauge(
nats_service::PROCESSING_MS_TOTAL,
"Total processing time across all component endpoints in milliseconds",
labels,
)?;
let service_active_services = component.metrics().create_intgauge(
nats_service::ACTIVE_SERVICES,
"Number of active services in this component",
labels,
)?;
let service_active_endpoints = component.metrics().create_intgauge(
nats_service::ACTIVE_ENDPOINTS,
"Number of active endpoints across all services",
labels,
)?;
Ok(Self {
service_processing_ms_avg,
service_errors_total,
service_requests_total,
service_processing_ms_total,
service_active_services,
service_active_endpoints,
})
}
pub fn update_from_service_set(&self, service_set: &ServiceSet) {
let mut processing_time_samples = 0u64; let mut total_errors = 0u64; let mut total_requests = 0u64; let mut total_processing_time_nanos = 0u64; let mut endpoint_count = 0u64;
let service_count = service_set.services().len() as i64;
for service in service_set.services() {
for endpoint in &service.endpoints {
endpoint_count += 1;
if let Some(ref stats) = endpoint.data {
total_errors += stats.num_errors;
total_requests += stats.num_requests;
total_processing_time_nanos += stats.processing_time;
if stats.num_requests > 0 {
processing_time_samples += 1;
}
}
}
}
if processing_time_samples > 0 && total_requests > 0 {
let avg_time_nanos = total_processing_time_nanos as f64 / total_requests as f64;
let avg_time_ms = avg_time_nanos / 1_000_000.0; self.service_processing_ms_avg.set(avg_time_ms);
} else {
self.service_processing_ms_avg.set(0.0);
}
self.service_errors_total.set(total_errors as i64); self.service_requests_total.set(total_requests as i64); self.service_processing_ms_total
.set((total_processing_time_nanos / 1_000_000) as i64); self.service_active_services.set(service_count); self.service_active_endpoints.set(endpoint_count as i64); }
pub fn reset_to_zeros(&self) {
self.service_processing_ms_avg.set(0.0);
self.service_errors_total.set(0);
self.service_requests_total.set(0);
self.service_processing_ms_total.set(0);
self.service_active_services.set(0);
self.service_active_endpoints.set(0);
}
}