1use crate::{
11 DistributedRuntime, Result,
12 component::Component,
13 error,
14 metrics::{MetricsHierarchy, prometheus_names, prometheus_names::nats_service},
15 traits::*,
16 transports::nats,
17 utils::stream,
18};
19
20use async_nats::Message;
21use async_stream::try_stream;
22use bytes::Bytes;
23use derive_getters::Dissolve;
24use futures::stream::{StreamExt, TryStreamExt};
25use prometheus;
26use serde::{Deserialize, Serialize, de::DeserializeOwned};
27use std::time::Duration;
28
29pub struct ServiceClient {
30 nats_client: nats::Client,
31}
32
33impl ServiceClient {
34 pub fn new(nats_client: nats::Client) -> Self {
35 ServiceClient { nats_client }
36 }
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct ServiceSet {
62 services: Vec<ServiceInfo>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct ServiceInfo {
90 pub name: String,
91 pub id: String,
92 pub version: String,
93 pub started: String,
94 pub endpoints: Vec<EndpointInfo>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
99pub struct EndpointInfo {
100 pub name: String,
101 pub subject: String,
102
103 #[serde(flatten)]
105 pub data: Option<NatsStatsMetrics>,
106}
107
108impl EndpointInfo {
109 pub fn id(&self) -> Result<i64> {
110 let id = self
111 .subject
112 .split('-')
113 .next_back()
114 .ok_or_else(|| error!("No id found in subject"))?;
115
116 i64::from_str_radix(id, 16).map_err(|e| error!("Invalid id format: {}", e))
117 }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
127pub struct NatsStatsMetrics {
128 pub average_processing_time: u64, pub last_error: String,
131 pub num_errors: u64,
132 pub num_requests: u64,
133 pub processing_time: u64, pub queue_group: String,
135 pub data: serde_json::Value,
137}
138
139impl NatsStatsMetrics {
140 pub fn decode<T: for<'de> Deserialize<'de>>(self) -> Result<T> {
141 serde_json::from_value(self.data).map_err(Into::into)
142 }
143}
144
145impl ServiceClient {
146 pub async fn unary(
147 &self,
148 subject: impl Into<String>,
149 payload: impl Into<Bytes>,
150 ) -> Result<Message> {
151 let response = self
152 .nats_client
153 .client()
154 .request(subject.into(), payload.into())
155 .await?;
156 Ok(response)
157 }
158
159 pub async fn collect_services(
160 &self,
161 service_name: &str,
162 timeout: Duration,
163 ) -> Result<ServiceSet> {
164 let sub = self.nats_client.scrape_service(service_name).await?;
165 if timeout.is_zero() {
166 tracing::warn!("collect_services: timeout is zero");
167 }
168 if timeout > Duration::from_secs(10) {
169 tracing::warn!("collect_services: timeout is greater than 10 seconds");
170 }
171 let deadline = tokio::time::Instant::now() + timeout;
172
173 let mut services = vec![];
174 let mut s = stream::until_deadline(sub, deadline);
175 while let Some(message) = s.next().await {
176 if message.payload.is_empty() {
177 tracing::trace!(service_name, "collect_services: empty payload from nats");
179 continue;
180 }
181 let info = serde_json::from_slice::<ServiceInfo>(&message.payload);
182 match info {
183 Ok(info) => services.push(info),
184 Err(err) => {
185 let payload = String::from_utf8_lossy(&message.payload);
186 tracing::debug!(%err, service_name, %payload, "error decoding service info");
187 }
188 }
189 }
190
191 Ok(ServiceSet { services })
192 }
193}
194
195impl ServiceSet {
196 pub fn into_endpoints(self) -> impl Iterator<Item = EndpointInfo> {
197 self.services
198 .into_iter()
199 .flat_map(|s| s.endpoints.into_iter())
200 }
201
202 pub fn services(&self) -> &[ServiceInfo] {
204 &self.services
205 }
206}
207
208#[cfg(test)]
209mod tests {
210
211 use super::*;
212
213 #[test]
214 fn test_service_set() {
215 let services = vec![
216 ServiceInfo {
217 name: "service1".to_string(),
218 id: "1".to_string(),
219 version: "1.0".to_string(),
220 started: "2021-01-01".to_string(),
221 endpoints: vec![
222 EndpointInfo {
223 name: "endpoint1".to_string(),
224 subject: "subject1".to_string(),
225 data: Some(NatsStatsMetrics {
226 average_processing_time: 100_000, last_error: "none".to_string(),
228 num_errors: 0,
229 num_requests: 10,
230 processing_time: 100,
231 queue_group: "group1".to_string(),
232 data: serde_json::json!({"key": "value1"}),
233 }),
234 },
235 EndpointInfo {
236 name: "endpoint2-foo".to_string(),
237 subject: "subject2".to_string(),
238 data: Some(NatsStatsMetrics {
239 average_processing_time: 100_000, last_error: "none".to_string(),
241 num_errors: 0,
242 num_requests: 10,
243 processing_time: 100,
244 queue_group: "group1".to_string(),
245 data: serde_json::json!({"key": "value1"}),
246 }),
247 },
248 ],
249 },
250 ServiceInfo {
251 name: "service1".to_string(),
252 id: "2".to_string(),
253 version: "1.0".to_string(),
254 started: "2021-01-01".to_string(),
255 endpoints: vec![
256 EndpointInfo {
257 name: "endpoint1".to_string(),
258 subject: "subject1".to_string(),
259 data: Some(NatsStatsMetrics {
260 average_processing_time: 100_000, last_error: "none".to_string(),
262 num_errors: 0,
263 num_requests: 10,
264 processing_time: 100,
265 queue_group: "group1".to_string(),
266 data: serde_json::json!({"key": "value1"}),
267 }),
268 },
269 EndpointInfo {
270 name: "endpoint2-bar".to_string(),
271 subject: "subject2".to_string(),
272 data: Some(NatsStatsMetrics {
273 average_processing_time: 100_000, last_error: "none".to_string(),
275 num_errors: 0,
276 num_requests: 10,
277 processing_time: 100,
278 queue_group: "group1".to_string(),
279 data: serde_json::json!({"key": "value2"}),
280 }),
281 },
282 ],
283 },
284 ];
285
286 let service_set = ServiceSet { services };
287
288 let endpoints: Vec<_> = service_set
289 .into_endpoints()
290 .filter(|e| e.name.starts_with("endpoint2"))
291 .collect();
292
293 assert_eq!(endpoints.len(), 2);
294 }
295}
296
297#[derive(Debug, Clone)]
309pub struct ComponentNatsServerPrometheusMetrics {
313 pub service_processing_ms_avg: prometheus::Gauge,
315 pub service_errors_total: prometheus::IntGauge,
317 pub service_requests_total: prometheus::IntGauge,
319 pub service_processing_ms_total: prometheus::IntGauge,
321 pub service_active_services: prometheus::IntGauge,
323 pub service_active_endpoints: prometheus::IntGauge,
325}
326
327impl ComponentNatsServerPrometheusMetrics {
328 pub fn new(component: &Component) -> Result<Self> {
330 let service_name = component.service_name();
331
332 let mut labels_vec = vec![("service_name", service_name.as_str())];
334
335 for (key, value) in component.labels() {
337 labels_vec.push((key.as_str(), value.as_str()));
338 }
339
340 let labels: &[(&str, &str)] = &labels_vec;
341
342 let service_processing_ms_avg = component.metrics().create_gauge(
343 nats_service::PROCESSING_MS_AVG,
344 "Average processing time across all component endpoints in milliseconds",
345 labels,
346 )?;
347
348 let service_errors_total = component.metrics().create_intgauge(
349 nats_service::ERRORS_TOTAL,
350 "Total number of errors across all component endpoints",
351 labels,
352 )?;
353
354 let service_requests_total = component.metrics().create_intgauge(
355 nats_service::REQUESTS_TOTAL,
356 "Total number of requests across all component endpoints",
357 labels,
358 )?;
359
360 let service_processing_ms_total = component.metrics().create_intgauge(
361 nats_service::PROCESSING_MS_TOTAL,
362 "Total processing time across all component endpoints in milliseconds",
363 labels,
364 )?;
365
366 let service_active_services = component.metrics().create_intgauge(
367 nats_service::ACTIVE_SERVICES,
368 "Number of active services in this component",
369 labels,
370 )?;
371
372 let service_active_endpoints = component.metrics().create_intgauge(
373 nats_service::ACTIVE_ENDPOINTS,
374 "Number of active endpoints across all services",
375 labels,
376 )?;
377
378 Ok(Self {
379 service_processing_ms_avg,
380 service_errors_total,
381 service_requests_total,
382 service_processing_ms_total,
383 service_active_services,
384 service_active_endpoints,
385 })
386 }
387
388 pub fn update_from_service_set(&self, service_set: &ServiceSet) {
390 let mut processing_time_samples = 0u64; let mut total_errors = 0u64; let mut total_requests = 0u64; let mut total_processing_time_nanos = 0u64; let mut endpoint_count = 0u64; let service_count = service_set.services().len() as i64;
398
399 for service in service_set.services() {
400 for endpoint in &service.endpoints {
401 endpoint_count += 1;
402
403 if let Some(ref stats) = endpoint.data {
404 total_errors += stats.num_errors;
405 total_requests += stats.num_requests;
406 total_processing_time_nanos += stats.processing_time;
407
408 if stats.num_requests > 0 {
409 processing_time_samples += 1;
410 }
411 }
412 }
413 }
414
415 if processing_time_samples > 0 && total_requests > 0 {
418 let avg_time_nanos = total_processing_time_nanos as f64 / total_requests as f64;
419 let avg_time_ms = avg_time_nanos / 1_000_000.0; self.service_processing_ms_avg.set(avg_time_ms);
421 } else {
422 self.service_processing_ms_avg.set(0.0);
423 }
424
425 self.service_errors_total.set(total_errors as i64); self.service_requests_total.set(total_requests as i64); self.service_processing_ms_total
428 .set((total_processing_time_nanos / 1_000_000) as i64); self.service_active_services.set(service_count); self.service_active_endpoints.set(endpoint_count as i64); }
432
433 pub fn reset_to_zeros(&self) {
435 self.service_processing_ms_avg.set(0.0);
436 self.service_errors_total.set(0);
437 self.service_requests_total.set(0);
438 self.service_processing_ms_total.set(0);
439 self.service_active_services.set(0);
440 self.service_active_endpoints.set(0);
441 }
442}