1use crate::{
2 models::{context_tag_keys, QuickPulseEnvelope, QuickPulseMetric},
3 tags::get_tags_for_resource,
4 trace::{get_duration, is_remote_dependency_success, is_request_success, EVENT_NAME_EXCEPTION},
5 uploader_quick_pulse::{self, PostOrPing},
6 Error, Exporter,
7};
8use futures_util::{pin_mut, select_biased, FutureExt as _, StreamExt as _};
9use opentelemetry::{trace::SpanKind, Context, Key};
10use opentelemetry_http::HttpClient;
11use opentelemetry_sdk::{
12 error::OTelSdkResult,
13 runtime::{RuntimeChannel, TrySend},
14 trace::{IdGenerator as _, RandomIdGenerator, Span, SpanData, SpanProcessor},
15 Resource,
16};
17use opentelemetry_semantic_conventions as semcov;
18use std::{
19 sync::{
20 atomic::{AtomicBool, Ordering},
21 Arc, Mutex,
22 },
23 time::{Duration, SystemTime},
24};
25use sysinfo::{Pid, ProcessRefreshKind, System};
26
27const MAX_POST_WAIT_TIME: Duration = Duration::from_secs(20);
28const MAX_PING_WAIT_TIME: Duration = Duration::from_secs(60);
29const FALLBACK_INTERVAL: Duration = Duration::from_secs(60);
30const PING_INTERVAL: Duration = Duration::from_secs(5);
31const POST_INTERVAL: Duration = Duration::from_secs(1);
32
33const METRIC_PROCESSOR_TIME: &str = "\\Processor(_Total)\\% Processor Time";
34const METRIC_COMMITTED_BYTES: &str = "\\Memory\\Committed Bytes";
35const METRIC_REQUEST_RATE: &str = "\\ApplicationInsights\\Requests/Sec";
36const METRIC_REQUEST_FAILURE_RATE: &str = "\\ApplicationInsights\\Requests Failed/Sec";
37const METRIC_REQUEST_DURATION: &str = "\\ApplicationInsights\\Request Duration";
38const METRIC_DEPENDENCY_RATE: &str = "\\ApplicationInsights\\Dependency Calls/Sec";
39const METRIC_DEPENDENCY_FAILURE_RATE: &str = "\\ApplicationInsights\\Dependency Calls Failed/Sec";
40const METRIC_DEPENDENCY_DURATION: &str = "\\ApplicationInsights\\Dependency Call Duration";
41const METRIC_EXCEPTION_RATE: &str = "\\ApplicationInsights\\Exceptions/Sec";
42
43pub struct LiveMetricsSpanProcessor<R: RuntimeChannel> {
67 is_collecting: Arc<AtomicBool>,
68 shared: Arc<Mutex<Shared>>,
69 message_sender: R::Sender<Message>,
70}
71
72impl<R: RuntimeChannel> std::fmt::Debug for LiveMetricsSpanProcessor<R> {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("LiveMetricsSpanProcessor").finish()
75 }
76}
77
78#[derive(Debug)]
79enum Message {
80 Send,
81 Stop,
82}
83
84impl<R: RuntimeChannel> LiveMetricsSpanProcessor<R> {
85 pub fn new<C: HttpClient + 'static>(
87 exporter: Exporter<C>,
88 runtime: R,
89 ) -> LiveMetricsSpanProcessor<R> {
90 let (message_sender, message_receiver) = runtime.batch_message_channel(1);
91 let delay_runtime = runtime.clone();
92 let is_collecting_outer = Arc::new(AtomicBool::new(false));
93 let is_collecting = is_collecting_outer.clone();
94 let shared_outer = Arc::new(Mutex::new(Shared {
95 metrics_collector: MetricsCollector::new(),
96 resource_data: (&exporter.resource).into(),
97 }));
98 let shared = shared_outer.clone();
99 runtime.spawn(Box::pin(async move {
100 let mut sender = Sender::new(
101 exporter.client,
102 exporter.live_post_endpoint,
103 exporter.live_ping_endpoint,
104 );
105
106 let message_receiver = message_receiver.fuse();
107 pin_mut!(message_receiver);
108 let mut send_delay = Box::pin(delay_runtime.delay(PING_INTERVAL).fuse());
109
110 loop {
111 let msg = select_biased! {
112 msg = message_receiver.next() => msg.unwrap_or(Message::Stop),
113 _ = send_delay => Message::Send
114 };
115 match msg {
116 Message::Send => {
117 let curr_is_collecting = is_collecting.load(Ordering::SeqCst);
118 let (resource_data, metrics) = {
119 let mut shared = shared.lock().unwrap();
120 let resource_data = shared.resource_data.clone();
121 let metrics = curr_is_collecting
122 .then(|| shared.metrics_collector.collect_and_reset())
123 .unwrap_or_default();
124 (resource_data, metrics)
125 };
126 let (next_is_collecting, next_timeout) = sender
127 .send(curr_is_collecting, resource_data, metrics)
128 .await;
129 if curr_is_collecting != next_is_collecting {
130 is_collecting.store(next_is_collecting, Ordering::SeqCst);
131 if next_is_collecting {
132 shared.lock().unwrap().metrics_collector.reset();
134 }
135 }
136 send_delay = Box::pin(delay_runtime.delay(next_timeout).fuse());
137 }
138 Message::Stop => break,
139 }
140 }
141 }));
142
143 LiveMetricsSpanProcessor {
144 is_collecting: is_collecting_outer,
145 shared: shared_outer,
146 message_sender,
147 }
148 }
149}
150
151impl<R: RuntimeChannel> SpanProcessor for LiveMetricsSpanProcessor<R> {
152 fn on_start(&self, _span: &mut Span, _cx: &Context) {}
153
154 fn on_end(&self, span: SpanData) {
155 if self.is_collecting.load(Ordering::SeqCst) {
156 self.shared
157 .lock()
158 .unwrap()
159 .metrics_collector
160 .count_span(span);
161 }
162 }
163
164 fn force_flush(&self) -> OTelSdkResult {
165 Ok(())
166 }
167
168 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
169 self.message_sender
170 .try_send(Message::Stop)
171 .map_err(Error::QuickPulseShutdown)
172 .map_err(Into::into)
173 }
174
175 fn set_resource(&mut self, resource: &Resource) {
176 let mut shared = self.shared.lock().unwrap();
177 shared.resource_data = resource.into();
178 }
179}
180
181impl<R: RuntimeChannel> Drop for LiveMetricsSpanProcessor<R> {
182 fn drop(&mut self) {
183 if let Err(err) = self.shutdown() {
184 let err: &dyn std::error::Error = &err;
185 opentelemetry::otel_warn!(name: "ApplicationInsights.LiveMetrics.ShutdownFailed", error = err);
186 }
187 }
188}
189
190struct Shared {
191 resource_data: ResourceData,
192 metrics_collector: MetricsCollector,
193}
194
195#[derive(Clone)]
196struct ResourceData {
197 version: Option<String>,
198 machine_name: String,
199 instance: String,
200 role_name: Option<String>,
201}
202
203impl From<&Resource> for ResourceData {
204 fn from(resource: &Resource) -> Self {
205 let mut tags = get_tags_for_resource(resource);
206 let machine_name = resource
207 .get(&Key::from_static_str(semcov::resource::HOST_NAME))
208 .map(|v| v.as_str().into_owned())
209 .unwrap_or_else(|| "Unknown".into());
210 Self {
211 version: tags.remove(context_tag_keys::INTERNAL_SDK_VERSION),
212 role_name: tags.remove(context_tag_keys::CLOUD_ROLE),
213 instance: tags
214 .remove(context_tag_keys::CLOUD_ROLE_INSTANCE)
215 .unwrap_or_else(|| machine_name.clone()),
216 machine_name,
217 }
218 }
219}
220
221struct Sender<C: HttpClient + 'static> {
222 client: Arc<C>,
223 live_post_endpoint: http::Uri,
224 live_ping_endpoint: http::Uri,
225 last_success_time: SystemTime,
226 polling_interval_hint: Option<Duration>,
227 stream_id: String,
228}
229
230impl<C: HttpClient + 'static> Sender<C> {
231 fn new(client: Arc<C>, live_post_endpoint: http::Uri, live_ping_endpoint: http::Uri) -> Self {
232 Self {
233 client,
234 live_post_endpoint,
235 live_ping_endpoint,
236 last_success_time: SystemTime::now(),
237 polling_interval_hint: None,
238 stream_id: format!("{:032x}", RandomIdGenerator::default().new_trace_id()),
239 }
240 }
241
242 async fn send(
243 &mut self,
244 is_collecting: bool,
245 resource_data: ResourceData,
246 metrics: Vec<QuickPulseMetric>,
247 ) -> (bool, Duration) {
248 let now = SystemTime::now();
249 let now_ms = now
250 .duration_since(SystemTime::UNIX_EPOCH)
251 .map(|d| d.as_millis())
252 .unwrap_or(0);
253 let envelope = QuickPulseEnvelope {
254 metrics,
255 invariant_version: 1,
256 timestamp: format!("/Date({})/", now_ms),
257 version: resource_data.version,
258 stream_id: self.stream_id.clone(),
259 machine_name: resource_data.machine_name,
260 instance: resource_data.instance,
261 role_name: resource_data.role_name,
262 };
263
264 let res = uploader_quick_pulse::send(
265 self.client.as_ref(),
266 if is_collecting {
267 &self.live_post_endpoint
268 } else {
269 &self.live_ping_endpoint
270 },
271 if is_collecting {
272 PostOrPing::Post
273 } else {
274 PostOrPing::Ping
275 },
276 envelope,
277 )
278 .await;
279 let (last_send_succeeded, mut next_is_collecting) = if let Ok(res) = res {
280 self.last_success_time = now;
281 if let Some(redirected_host) = res.redirected_host {
282 self.live_post_endpoint =
283 replace_host(self.live_post_endpoint.clone(), redirected_host.clone());
284 self.live_ping_endpoint =
285 replace_host(self.live_ping_endpoint.clone(), redirected_host);
286 }
287 if res.polling_interval_hint.is_some() {
288 self.polling_interval_hint = res.polling_interval_hint;
289 }
290 (true, res.should_post)
291 } else {
292 (false, is_collecting)
293 };
294
295 let mut next_timeout = if next_is_collecting {
296 POST_INTERVAL
297 } else {
298 self.polling_interval_hint.unwrap_or(PING_INTERVAL)
299 };
300 if !last_send_succeeded {
301 let time_since_last_success = now
302 .duration_since(self.last_success_time)
303 .unwrap_or(Duration::MAX);
304 if next_is_collecting && time_since_last_success >= MAX_POST_WAIT_TIME {
305 next_is_collecting = false;
307 next_timeout = FALLBACK_INTERVAL;
308 } else if !next_is_collecting && time_since_last_success >= MAX_PING_WAIT_TIME {
309 next_timeout = FALLBACK_INTERVAL;
311 }
312 }
313
314 (next_is_collecting, next_timeout)
315 }
316}
317
318struct MetricsCollector {
319 system: System,
320 process_refresh_kind: ProcessRefreshKind,
321 process_id: Pid,
322 request_count: usize,
323 request_failed_count: usize,
324 request_duration: Duration,
325 dependency_count: usize,
326 dependency_failed_count: usize,
327 dependency_duration: Duration,
328 exception_count: usize,
329 last_collection_time: SystemTime,
330}
331
332impl MetricsCollector {
333 fn new() -> Self {
334 Self {
335 system: System::new(),
336 process_refresh_kind: ProcessRefreshKind::nothing().with_cpu().with_memory(),
337 process_id: Pid::from_u32(std::process::id()),
338 request_count: 0,
339 request_failed_count: 0,
340 request_duration: Duration::default(),
341 dependency_count: 0,
342 dependency_failed_count: 0,
343 dependency_duration: Duration::default(),
344 exception_count: 0,
345 last_collection_time: SystemTime::now(),
346 }
347 }
348
349 fn reset(&mut self) {
350 self.request_count = 0;
351 self.request_failed_count = 0;
352 self.request_duration = Duration::default();
353 self.dependency_count = 0;
354 self.dependency_failed_count = 0;
355 self.dependency_duration = Duration::default();
356 self.exception_count = 0;
357 self.last_collection_time = SystemTime::now();
358 }
359
360 fn count_span(&mut self, span: SpanData) {
361 match span.span_kind {
363 SpanKind::Server | SpanKind::Consumer => {
364 self.request_count += 1;
365 if !is_request_success(&span) {
366 self.request_failed_count += 1;
367 }
368 self.request_duration += get_duration(&span);
369 }
370 SpanKind::Client | SpanKind::Producer | SpanKind::Internal => {
371 self.dependency_count += 1;
372 if let Some(false) = is_remote_dependency_success(&span) {
373 self.dependency_failed_count += 1;
374 }
375 self.dependency_duration += get_duration(&span);
376 }
377 }
378
379 for event in span.events.iter() {
380 if event.name == EVENT_NAME_EXCEPTION {
381 self.exception_count += 1;
382 }
383 }
384 }
385
386 fn collect_and_reset(&mut self) -> Vec<QuickPulseMetric> {
387 let mut metrics = Vec::new();
388 self.system.refresh_processes_specifics(
389 sysinfo::ProcessesToUpdate::Some(&[self.process_id]),
390 true,
391 self.process_refresh_kind,
392 );
393 self.collect_cpu_usage(&mut metrics);
394 self.collect_memory_usage(&mut metrics);
395 self.collect_requests_dependencies_exceptions(&mut metrics);
396 self.reset();
397 metrics
398 }
399
400 fn collect_cpu_usage(&mut self, metrics: &mut Vec<QuickPulseMetric>) {
401 let cpu_usage = if let Some(process) = self.system.process(self.process_id) {
402 f64::from(process.cpu_usage())
403 } else {
404 0.
405 };
406
407 metrics.push(QuickPulseMetric {
408 name: METRIC_PROCESSOR_TIME,
409 value: cpu_usage,
410 weight: 1,
411 });
412 }
413
414 fn collect_memory_usage(&mut self, metrics: &mut Vec<QuickPulseMetric>) {
415 let memory_usage = if let Some(process) = self.system.process(self.process_id) {
416 process.memory()
417 } else {
418 0
419 };
420
421 metrics.push(QuickPulseMetric {
422 name: METRIC_COMMITTED_BYTES,
423 value: memory_usage as f64,
424 weight: 1,
425 });
426 }
427
428 fn collect_requests_dependencies_exceptions(&mut self, metrics: &mut Vec<QuickPulseMetric>) {
429 let elapsed_seconds = SystemTime::now()
430 .duration_since(self.last_collection_time)
431 .unwrap_or_default()
432 .as_secs();
433 if elapsed_seconds == 0 {
434 return;
435 }
436
437 metrics.push(QuickPulseMetric {
438 name: METRIC_REQUEST_RATE,
439 value: self.request_count as f64 / elapsed_seconds as f64,
440 weight: 1,
441 });
442 metrics.push(QuickPulseMetric {
443 name: METRIC_REQUEST_FAILURE_RATE,
444 value: self.request_failed_count as f64 / elapsed_seconds as f64,
445 weight: 1,
446 });
447 if self.request_count > 0 {
448 metrics.push(QuickPulseMetric {
449 name: METRIC_REQUEST_DURATION,
450 value: self.request_duration.as_millis() as f64 / self.request_count as f64,
451 weight: 1,
452 });
453 }
454
455 metrics.push(QuickPulseMetric {
456 name: METRIC_DEPENDENCY_RATE,
457 value: self.dependency_count as f64 / elapsed_seconds as f64,
458 weight: 1,
459 });
460 metrics.push(QuickPulseMetric {
461 name: METRIC_DEPENDENCY_FAILURE_RATE,
462 value: self.dependency_failed_count as f64 / elapsed_seconds as f64,
463 weight: 1,
464 });
465 if self.dependency_count > 0 {
466 metrics.push(QuickPulseMetric {
467 name: METRIC_DEPENDENCY_DURATION,
468 value: self.dependency_duration.as_millis() as f64 / self.dependency_count as f64,
469 weight: 1,
470 });
471 }
472
473 metrics.push(QuickPulseMetric {
474 name: METRIC_EXCEPTION_RATE,
475 value: self.exception_count as f64 / elapsed_seconds as f64,
476 weight: 1,
477 });
478 }
479}
480
481fn replace_host(uri: http::Uri, new_host: http::Uri) -> http::Uri {
482 let mut parts = uri.into_parts();
483 let new_parts = new_host.into_parts();
484 parts.scheme = new_parts.scheme;
485 parts.authority = new_parts.authority;
486 http::Uri::from_parts(parts).expect("valid uri + valid uri = valid uri")
487}