opentelemetry_application_insights/
quick_pulse.rs

1use crate::{
2    models::{context_tag_keys, QuickPulseEnvelope, QuickPulseMetric},
3    tags::get_tags_for_resource,
4    trace::{get_duration, is_remote_dependency_success, is_request_success, EVENT_NAME_EXCEPTION},
5    uploader_quick_pulse::{self, PostOrPing},
6    Error, Exporter,
7};
8use futures_util::{pin_mut, select_biased, FutureExt as _, StreamExt as _};
9use opentelemetry::{trace::SpanKind, Context, Key};
10use opentelemetry_http::HttpClient;
11use opentelemetry_sdk::{
12    error::OTelSdkResult,
13    runtime::{RuntimeChannel, TrySend},
14    trace::{IdGenerator as _, RandomIdGenerator, Span, SpanData, SpanProcessor},
15    Resource,
16};
17use opentelemetry_semantic_conventions as semcov;
18use std::{
19    sync::{
20        atomic::{AtomicBool, Ordering},
21        Arc, Mutex,
22    },
23    time::{Duration, SystemTime},
24};
25use sysinfo::{Pid, ProcessRefreshKind, System};
26
27const MAX_POST_WAIT_TIME: Duration = Duration::from_secs(20);
28const MAX_PING_WAIT_TIME: Duration = Duration::from_secs(60);
29const FALLBACK_INTERVAL: Duration = Duration::from_secs(60);
30const PING_INTERVAL: Duration = Duration::from_secs(5);
31const POST_INTERVAL: Duration = Duration::from_secs(1);
32
33const METRIC_PROCESSOR_TIME: &str = "\\Processor(_Total)\\% Processor Time";
34const METRIC_COMMITTED_BYTES: &str = "\\Memory\\Committed Bytes";
35const METRIC_REQUEST_RATE: &str = "\\ApplicationInsights\\Requests/Sec";
36const METRIC_REQUEST_FAILURE_RATE: &str = "\\ApplicationInsights\\Requests Failed/Sec";
37const METRIC_REQUEST_DURATION: &str = "\\ApplicationInsights\\Request Duration";
38const METRIC_DEPENDENCY_RATE: &str = "\\ApplicationInsights\\Dependency Calls/Sec";
39const METRIC_DEPENDENCY_FAILURE_RATE: &str = "\\ApplicationInsights\\Dependency Calls Failed/Sec";
40const METRIC_DEPENDENCY_DURATION: &str = "\\ApplicationInsights\\Dependency Call Duration";
41const METRIC_EXCEPTION_RATE: &str = "\\ApplicationInsights\\Exceptions/Sec";
42
43/// Application Insights live metrics span processor
44///
45/// Enables live metrics collection: <https://learn.microsoft.com/en-us/azure/azure-monitor/app/live-stream>.
46///
47/// ```no_run
48/// #[tokio::main]
49/// async fn main() {
50///     let exporter = opentelemetry_application_insights::Exporter::new_from_connection_string(
51///         "connection_string",
52///         reqwest::Client::new(),
53///     )
54///     .expect("valid connection string");
55///     let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
56///        .with_span_processor(opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder(exporter.clone(), opentelemetry_sdk::runtime::Tokio).build())
57///        .with_span_processor(opentelemetry_application_insights::LiveMetricsSpanProcessor::new(exporter, opentelemetry_sdk::runtime::Tokio))
58///        .build();
59///     opentelemetry::global::set_tracer_provider(tracer_provider.clone());
60///
61///     // ... send traces ...
62///
63///     tracer_provider.shutdown().unwrap();
64/// }
65/// ```
66pub struct LiveMetricsSpanProcessor<R: RuntimeChannel> {
67    is_collecting: Arc<AtomicBool>,
68    shared: Arc<Mutex<Shared>>,
69    message_sender: R::Sender<Message>,
70}
71
72impl<R: RuntimeChannel> std::fmt::Debug for LiveMetricsSpanProcessor<R> {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("LiveMetricsSpanProcessor").finish()
75    }
76}
77
78#[derive(Debug)]
79enum Message {
80    Send,
81    Stop,
82}
83
84impl<R: RuntimeChannel> LiveMetricsSpanProcessor<R> {
85    /// Create new live metrics span processor.
86    pub fn new<C: HttpClient + 'static>(
87        exporter: Exporter<C>,
88        runtime: R,
89    ) -> LiveMetricsSpanProcessor<R> {
90        let (message_sender, message_receiver) = runtime.batch_message_channel(1);
91        let delay_runtime = runtime.clone();
92        let is_collecting_outer = Arc::new(AtomicBool::new(false));
93        let is_collecting = is_collecting_outer.clone();
94        let shared_outer = Arc::new(Mutex::new(Shared {
95            metrics_collector: MetricsCollector::new(),
96            resource_data: (&exporter.resource).into(),
97        }));
98        let shared = shared_outer.clone();
99        runtime.spawn(Box::pin(async move {
100            let mut sender = Sender::new(
101                exporter.client,
102                exporter.live_post_endpoint,
103                exporter.live_ping_endpoint,
104            );
105
106            let message_receiver = message_receiver.fuse();
107            pin_mut!(message_receiver);
108            let mut send_delay = Box::pin(delay_runtime.delay(PING_INTERVAL).fuse());
109
110            loop {
111                let msg = select_biased! {
112                    msg = message_receiver.next() => msg.unwrap_or(Message::Stop),
113                    _ = send_delay => Message::Send
114                };
115                match msg {
116                    Message::Send => {
117                        let curr_is_collecting = is_collecting.load(Ordering::SeqCst);
118                        let (resource_data, metrics) = {
119                            let mut shared = shared.lock().unwrap();
120                            let resource_data = shared.resource_data.clone();
121                            let metrics = curr_is_collecting
122                                .then(|| shared.metrics_collector.collect_and_reset())
123                                .unwrap_or_default();
124                            (resource_data, metrics)
125                        };
126                        let (next_is_collecting, next_timeout) = sender
127                            .send(curr_is_collecting, resource_data, metrics)
128                            .await;
129                        if curr_is_collecting != next_is_collecting {
130                            is_collecting.store(next_is_collecting, Ordering::SeqCst);
131                            if next_is_collecting {
132                                // Reset last collection time to get accurate metrics on next collection.
133                                shared.lock().unwrap().metrics_collector.reset();
134                            }
135                        }
136                        send_delay = Box::pin(delay_runtime.delay(next_timeout).fuse());
137                    }
138                    Message::Stop => break,
139                }
140            }
141        }));
142
143        LiveMetricsSpanProcessor {
144            is_collecting: is_collecting_outer,
145            shared: shared_outer,
146            message_sender,
147        }
148    }
149}
150
151impl<R: RuntimeChannel> SpanProcessor for LiveMetricsSpanProcessor<R> {
152    fn on_start(&self, _span: &mut Span, _cx: &Context) {}
153
154    fn on_end(&self, span: SpanData) {
155        if self.is_collecting.load(Ordering::SeqCst) {
156            self.shared
157                .lock()
158                .unwrap()
159                .metrics_collector
160                .count_span(span);
161        }
162    }
163
164    fn force_flush(&self) -> OTelSdkResult {
165        Ok(())
166    }
167
168    fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
169        self.message_sender
170            .try_send(Message::Stop)
171            .map_err(Error::QuickPulseShutdown)
172            .map_err(Into::into)
173    }
174
175    fn set_resource(&mut self, resource: &Resource) {
176        let mut shared = self.shared.lock().unwrap();
177        shared.resource_data = resource.into();
178    }
179}
180
181impl<R: RuntimeChannel> Drop for LiveMetricsSpanProcessor<R> {
182    fn drop(&mut self) {
183        if let Err(err) = self.shutdown() {
184            let err: &dyn std::error::Error = &err;
185            opentelemetry::otel_warn!(name: "ApplicationInsights.LiveMetrics.ShutdownFailed", error = err);
186        }
187    }
188}
189
190struct Shared {
191    resource_data: ResourceData,
192    metrics_collector: MetricsCollector,
193}
194
195#[derive(Clone)]
196struct ResourceData {
197    version: Option<String>,
198    machine_name: String,
199    instance: String,
200    role_name: Option<String>,
201}
202
203impl From<&Resource> for ResourceData {
204    fn from(resource: &Resource) -> Self {
205        let mut tags = get_tags_for_resource(resource);
206        let machine_name = resource
207            .get(&Key::from_static_str(semcov::resource::HOST_NAME))
208            .map(|v| v.as_str().into_owned())
209            .unwrap_or_else(|| "Unknown".into());
210        Self {
211            version: tags.remove(context_tag_keys::INTERNAL_SDK_VERSION),
212            role_name: tags.remove(context_tag_keys::CLOUD_ROLE),
213            instance: tags
214                .remove(context_tag_keys::CLOUD_ROLE_INSTANCE)
215                .unwrap_or_else(|| machine_name.clone()),
216            machine_name,
217        }
218    }
219}
220
221struct Sender<C: HttpClient + 'static> {
222    client: Arc<C>,
223    live_post_endpoint: http::Uri,
224    live_ping_endpoint: http::Uri,
225    last_success_time: SystemTime,
226    polling_interval_hint: Option<Duration>,
227    stream_id: String,
228}
229
230impl<C: HttpClient + 'static> Sender<C> {
231    fn new(client: Arc<C>, live_post_endpoint: http::Uri, live_ping_endpoint: http::Uri) -> Self {
232        Self {
233            client,
234            live_post_endpoint,
235            live_ping_endpoint,
236            last_success_time: SystemTime::now(),
237            polling_interval_hint: None,
238            stream_id: format!("{:032x}", RandomIdGenerator::default().new_trace_id()),
239        }
240    }
241
242    async fn send(
243        &mut self,
244        is_collecting: bool,
245        resource_data: ResourceData,
246        metrics: Vec<QuickPulseMetric>,
247    ) -> (bool, Duration) {
248        let now = SystemTime::now();
249        let now_ms = now
250            .duration_since(SystemTime::UNIX_EPOCH)
251            .map(|d| d.as_millis())
252            .unwrap_or(0);
253        let envelope = QuickPulseEnvelope {
254            metrics,
255            invariant_version: 1,
256            timestamp: format!("/Date({})/", now_ms),
257            version: resource_data.version,
258            stream_id: self.stream_id.clone(),
259            machine_name: resource_data.machine_name,
260            instance: resource_data.instance,
261            role_name: resource_data.role_name,
262        };
263
264        let res = uploader_quick_pulse::send(
265            self.client.as_ref(),
266            if is_collecting {
267                &self.live_post_endpoint
268            } else {
269                &self.live_ping_endpoint
270            },
271            if is_collecting {
272                PostOrPing::Post
273            } else {
274                PostOrPing::Ping
275            },
276            envelope,
277        )
278        .await;
279        let (last_send_succeeded, mut next_is_collecting) = if let Ok(res) = res {
280            self.last_success_time = now;
281            if let Some(redirected_host) = res.redirected_host {
282                self.live_post_endpoint =
283                    replace_host(self.live_post_endpoint.clone(), redirected_host.clone());
284                self.live_ping_endpoint =
285                    replace_host(self.live_ping_endpoint.clone(), redirected_host);
286            }
287            if res.polling_interval_hint.is_some() {
288                self.polling_interval_hint = res.polling_interval_hint;
289            }
290            (true, res.should_post)
291        } else {
292            (false, is_collecting)
293        };
294
295        let mut next_timeout = if next_is_collecting {
296            POST_INTERVAL
297        } else {
298            self.polling_interval_hint.unwrap_or(PING_INTERVAL)
299        };
300        if !last_send_succeeded {
301            let time_since_last_success = now
302                .duration_since(self.last_success_time)
303                .unwrap_or(Duration::MAX);
304            if next_is_collecting && time_since_last_success >= MAX_POST_WAIT_TIME {
305                // Haven't posted successfully in 20 seconds, so wait 60 seconds and ping
306                next_is_collecting = false;
307                next_timeout = FALLBACK_INTERVAL;
308            } else if !next_is_collecting && time_since_last_success >= MAX_PING_WAIT_TIME {
309                // Haven't pinged successfully in 60 seconds, so wait another 60 seconds
310                next_timeout = FALLBACK_INTERVAL;
311            }
312        }
313
314        (next_is_collecting, next_timeout)
315    }
316}
317
318struct MetricsCollector {
319    system: System,
320    process_refresh_kind: ProcessRefreshKind,
321    process_id: Pid,
322    request_count: usize,
323    request_failed_count: usize,
324    request_duration: Duration,
325    dependency_count: usize,
326    dependency_failed_count: usize,
327    dependency_duration: Duration,
328    exception_count: usize,
329    last_collection_time: SystemTime,
330}
331
332impl MetricsCollector {
333    fn new() -> Self {
334        Self {
335            system: System::new(),
336            process_refresh_kind: ProcessRefreshKind::nothing().with_cpu().with_memory(),
337            process_id: Pid::from_u32(std::process::id()),
338            request_count: 0,
339            request_failed_count: 0,
340            request_duration: Duration::default(),
341            dependency_count: 0,
342            dependency_failed_count: 0,
343            dependency_duration: Duration::default(),
344            exception_count: 0,
345            last_collection_time: SystemTime::now(),
346        }
347    }
348
349    fn reset(&mut self) {
350        self.request_count = 0;
351        self.request_failed_count = 0;
352        self.request_duration = Duration::default();
353        self.dependency_count = 0;
354        self.dependency_failed_count = 0;
355        self.dependency_duration = Duration::default();
356        self.exception_count = 0;
357        self.last_collection_time = SystemTime::now();
358    }
359
360    fn count_span(&mut self, span: SpanData) {
361        // https://github.com/microsoft/ApplicationInsights-node.js/blob/aaafbfd8ffbc454d4a5c30cda4492891410b9f66/TelemetryProcessors/PerformanceMetricsTelemetryProcessor.ts#L6
362        match span.span_kind {
363            SpanKind::Server | SpanKind::Consumer => {
364                self.request_count += 1;
365                if !is_request_success(&span) {
366                    self.request_failed_count += 1;
367                }
368                self.request_duration += get_duration(&span);
369            }
370            SpanKind::Client | SpanKind::Producer | SpanKind::Internal => {
371                self.dependency_count += 1;
372                if let Some(false) = is_remote_dependency_success(&span) {
373                    self.dependency_failed_count += 1;
374                }
375                self.dependency_duration += get_duration(&span);
376            }
377        }
378
379        for event in span.events.iter() {
380            if event.name == EVENT_NAME_EXCEPTION {
381                self.exception_count += 1;
382            }
383        }
384    }
385
386    fn collect_and_reset(&mut self) -> Vec<QuickPulseMetric> {
387        let mut metrics = Vec::new();
388        self.system.refresh_processes_specifics(
389            sysinfo::ProcessesToUpdate::Some(&[self.process_id]),
390            true,
391            self.process_refresh_kind,
392        );
393        self.collect_cpu_usage(&mut metrics);
394        self.collect_memory_usage(&mut metrics);
395        self.collect_requests_dependencies_exceptions(&mut metrics);
396        self.reset();
397        metrics
398    }
399
400    fn collect_cpu_usage(&mut self, metrics: &mut Vec<QuickPulseMetric>) {
401        let cpu_usage = if let Some(process) = self.system.process(self.process_id) {
402            f64::from(process.cpu_usage())
403        } else {
404            0.
405        };
406
407        metrics.push(QuickPulseMetric {
408            name: METRIC_PROCESSOR_TIME,
409            value: cpu_usage,
410            weight: 1,
411        });
412    }
413
414    fn collect_memory_usage(&mut self, metrics: &mut Vec<QuickPulseMetric>) {
415        let memory_usage = if let Some(process) = self.system.process(self.process_id) {
416            process.memory()
417        } else {
418            0
419        };
420
421        metrics.push(QuickPulseMetric {
422            name: METRIC_COMMITTED_BYTES,
423            value: memory_usage as f64,
424            weight: 1,
425        });
426    }
427
428    fn collect_requests_dependencies_exceptions(&mut self, metrics: &mut Vec<QuickPulseMetric>) {
429        let elapsed_seconds = SystemTime::now()
430            .duration_since(self.last_collection_time)
431            .unwrap_or_default()
432            .as_secs();
433        if elapsed_seconds == 0 {
434            return;
435        }
436
437        metrics.push(QuickPulseMetric {
438            name: METRIC_REQUEST_RATE,
439            value: self.request_count as f64 / elapsed_seconds as f64,
440            weight: 1,
441        });
442        metrics.push(QuickPulseMetric {
443            name: METRIC_REQUEST_FAILURE_RATE,
444            value: self.request_failed_count as f64 / elapsed_seconds as f64,
445            weight: 1,
446        });
447        if self.request_count > 0 {
448            metrics.push(QuickPulseMetric {
449                name: METRIC_REQUEST_DURATION,
450                value: self.request_duration.as_millis() as f64 / self.request_count as f64,
451                weight: 1,
452            });
453        }
454
455        metrics.push(QuickPulseMetric {
456            name: METRIC_DEPENDENCY_RATE,
457            value: self.dependency_count as f64 / elapsed_seconds as f64,
458            weight: 1,
459        });
460        metrics.push(QuickPulseMetric {
461            name: METRIC_DEPENDENCY_FAILURE_RATE,
462            value: self.dependency_failed_count as f64 / elapsed_seconds as f64,
463            weight: 1,
464        });
465        if self.dependency_count > 0 {
466            metrics.push(QuickPulseMetric {
467                name: METRIC_DEPENDENCY_DURATION,
468                value: self.dependency_duration.as_millis() as f64 / self.dependency_count as f64,
469                weight: 1,
470            });
471        }
472
473        metrics.push(QuickPulseMetric {
474            name: METRIC_EXCEPTION_RATE,
475            value: self.exception_count as f64 / elapsed_seconds as f64,
476            weight: 1,
477        });
478    }
479}
480
481fn replace_host(uri: http::Uri, new_host: http::Uri) -> http::Uri {
482    let mut parts = uri.into_parts();
483    let new_parts = new_host.into_parts();
484    parts.scheme = new_parts.scheme;
485    parts.authority = new_parts.authority;
486    http::Uri::from_parts(parts).expect("valid uri + valid uri = valid uri")
487}