use crate::{
DistributedRuntime,
component::Component,
metrics::{MetricsHierarchy, prometheus_names},
traits::*,
transports::nats,
utils::stream,
};
use anyhow::Result;
use anyhow::anyhow as error;
use async_nats::Message;
use async_stream::try_stream;
use bytes::Bytes;
use derive_getters::Dissolve;
use futures::stream::{StreamExt, TryStreamExt};
use prometheus;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use std::time::Duration;
pub struct ServiceClient {
nats_client: nats::Client,
}
impl ServiceClient {
pub fn new(nats_client: nats::Client) -> Self {
ServiceClient { nats_client }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceSet {
services: Vec<ServiceInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceInfo {
pub name: String,
pub id: String,
pub version: String,
pub started: String,
pub endpoints: Vec<EndpointInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct EndpointInfo {
pub name: String,
pub subject: String,
#[serde(flatten)]
pub data: Option<NatsStatsMetrics>,
}
impl EndpointInfo {
pub fn id(&self) -> Result<i64> {
let id = self
.subject
.split('-')
.next_back()
.ok_or_else(|| error!("No id found in subject"))?;
i64::from_str_radix(id, 16).map_err(|e| error!("Invalid id format: {}", e))
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct NatsStatsMetrics {
pub average_processing_time: u64, pub last_error: String,
pub num_errors: u64,
pub num_requests: u64,
pub processing_time: u64, pub queue_group: String,
pub data: serde_json::Value,
}
impl NatsStatsMetrics {
pub fn decode<T: for<'de> Deserialize<'de>>(self) -> Result<T> {
serde_json::from_value(self.data).map_err(Into::into)
}
}
impl ServiceClient {
pub async fn unary(
&self,
subject: impl Into<String>,
payload: impl Into<Bytes>,
) -> Result<Message> {
let response = self
.nats_client
.client()
.request(subject.into(), payload.into())
.await?;
Ok(response)
}
pub async fn collect_services(
&self,
service_name: &str,
timeout: Duration,
) -> Result<ServiceSet> {
let sub = self.nats_client.scrape_service(service_name).await?;
if timeout.is_zero() {
tracing::warn!("collect_services: timeout is zero");
}
if timeout > Duration::from_secs(10) {
tracing::warn!("collect_services: timeout is greater than 10 seconds");
}
let deadline = tokio::time::Instant::now() + timeout;
let mut services = vec![];
let mut s = stream::until_deadline(sub, deadline);
while let Some(message) = s.next().await {
if message.payload.is_empty() {
tracing::trace!(service_name, "collect_services: empty payload from nats");
continue;
}
let info = serde_json::from_slice::<ServiceInfo>(&message.payload);
match info {
Ok(info) => services.push(info),
Err(err) => {
let payload = String::from_utf8_lossy(&message.payload);
tracing::debug!(%err, service_name, %payload, "error decoding service info");
}
}
}
Ok(ServiceSet { services })
}
}
impl ServiceSet {
pub fn into_endpoints(self) -> impl Iterator<Item = EndpointInfo> {
self.services
.into_iter()
.flat_map(|s| s.endpoints.into_iter())
}
pub fn services(&self) -> &[ServiceInfo] {
&self.services
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_service_set() {
let services = vec![
ServiceInfo {
name: "service1".to_string(),
id: "1".to_string(),
version: "1.0".to_string(),
started: "2021-01-01".to_string(),
endpoints: vec![
EndpointInfo {
name: "endpoint1".to_string(),
subject: "subject1".to_string(),
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
processing_time: 100,
queue_group: "group1".to_string(),
data: serde_json::json!({"key": "value1"}),
}),
},
EndpointInfo {
name: "endpoint2-foo".to_string(),
subject: "subject2".to_string(),
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
processing_time: 100,
queue_group: "group1".to_string(),
data: serde_json::json!({"key": "value1"}),
}),
},
],
},
ServiceInfo {
name: "service1".to_string(),
id: "2".to_string(),
version: "1.0".to_string(),
started: "2021-01-01".to_string(),
endpoints: vec![
EndpointInfo {
name: "endpoint1".to_string(),
subject: "subject1".to_string(),
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
processing_time: 100,
queue_group: "group1".to_string(),
data: serde_json::json!({"key": "value1"}),
}),
},
EndpointInfo {
name: "endpoint2-bar".to_string(),
subject: "subject2".to_string(),
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
processing_time: 100,
queue_group: "group1".to_string(),
data: serde_json::json!({"key": "value2"}),
}),
},
],
},
];
let service_set = ServiceSet { services };
let endpoints: Vec<_> = service_set
.into_endpoints()
.filter(|e| e.name.starts_with("endpoint2"))
.collect();
assert_eq!(endpoints.len(), 2);
}
}