opentelemetry_application_insights/
trace.rs

1use crate::{
2    convert::{
3        attrs_map_to_properties, attrs_to_map, attrs_to_properties, duration_to_string,
4        status_to_result_code, time_to_string, value_to_severity_level,
5    },
6    models::{
7        context_tag_keys::attrs::CUSTOM_EVENT_NAME, Data, Envelope, EventData, ExceptionData,
8        ExceptionDetails, LimitedLenString, MessageData, RemoteDependencyData, RequestData,
9    },
10    tags::{get_tags_for_event, get_tags_for_span},
11    Exporter,
12};
13use opentelemetry::{
14    trace::{Event, SpanKind, Status},
15    Value,
16};
17use opentelemetry_http::HttpClient;
18use opentelemetry_sdk::{
19    error::OTelSdkResult,
20    trace::{SpanData, SpanExporter},
21    Resource,
22};
23use opentelemetry_semantic_conventions as semcov;
24use std::{borrow::Cow, collections::HashMap, sync::Arc, time::Duration};
25
26/// Deprecated semantic convention key for HTTP host
27///
28/// Removed in https://github.com/open-telemetry/opentelemetry-specification/pull/2469.
29const DEPRECATED_HTTP_HOST: &str = "http.host";
30
31/// Semantic convention key for HTTP 'Host' request header.
32const HTTP_REQUEST_HEADER_HOST: &str = "http.request.header.host";
33
34/// Deprecated semantic convention key for peer IP.
35///
36/// Replaced in https://github.com/open-telemetry/opentelemetry-specification/pull/2614 with
37/// `net.sock.peer.addr`.
38const DEPRECATED_NET_PEER_IP: &str = "net.peer.ip";
39
40/// Deprecated semantic convention key for HTTP client IP.
41///
42/// Replaced in https://github.com/open-telemetry/opentelemetry-specification/pull/3402 with
43/// `client.address`.
44const DEPRECATED_HTTP_CLIENT_IP: &str = "http.client_ip";
45
46/// Deprecated semantic convention key for client socket address.
47///
48/// Replaced in https://github.com/open-telemetry/opentelemetry-specification/pull/3713 with
49/// `network.peer.address`.
50const DEPRECATED_CLIENT_SOCKET_ADDRESS: &str = "client.socket.address";
51
52/// Deprecated semantic convention key for server socket address.
53///
54/// Replaced in https://github.com/open-telemetry/opentelemetry-specification/pull/3713 with
55/// `network.local.address`.
56const DEPRECATED_SERVER_SOCKET_ADDRESS: &str = "server.socket.address";
57
58/// Deprecated semantic convention key for server socket port.
59///
60/// Replaced in https://github.com/open-telemetry/opentelemetry-specification/pull/3713 with
61/// `network.local.port`.
62const DEPRECATED_SERVER_SOCKET_PORT: &str = "server.socket.port";
63
64pub(crate) const EVENT_NAME_CUSTOM: &str = "ai.custom";
65pub(crate) const EVENT_NAME_EXCEPTION: &str = "exception";
66
67impl<C> Exporter<C> {
68    fn create_envelopes_for_span(&self, span: SpanData, resource: &Resource) -> Vec<Envelope> {
69        let mut result = Vec::with_capacity(1 + span.events.len());
70
71        let (data, tags, name) = match span.span_kind {
72            SpanKind::Server | SpanKind::Consumer => {
73                let data: RequestData = SpanAndResource(&span, resource).into();
74                let tags = get_tags_for_span(&span, resource);
75                (
76                    Data::Request(data),
77                    tags,
78                    "Microsoft.ApplicationInsights.Request",
79                )
80            }
81            SpanKind::Client | SpanKind::Producer | SpanKind::Internal => {
82                let data: RemoteDependencyData = SpanAndResource(&span, resource).into();
83                let tags = get_tags_for_span(&span, resource);
84                (
85                    Data::RemoteDependency(data),
86                    tags,
87                    "Microsoft.ApplicationInsights.RemoteDependency",
88                )
89            }
90        };
91        result.push(Envelope {
92            name,
93            time: time_to_string(span.start_time).into(),
94            sample_rate: Some(self.sample_rate),
95            i_key: Some(self.instrumentation_key.clone().into()),
96            tags: Some(tags),
97            data: Some(data),
98        });
99
100        let event_resource = if self.resource_attributes_in_events_and_logs {
101            Some(resource)
102        } else {
103            None
104        };
105        for event in span.events.iter() {
106            let (data, name) = match event.name.as_ref() {
107                x if x == EVENT_NAME_CUSTOM => (
108                    Data::Event(EventAndResource(event, event_resource).into()),
109                    "Microsoft.ApplicationInsights.Event",
110                ),
111                x if x == EVENT_NAME_EXCEPTION => (
112                    Data::Exception(EventAndResource(event, event_resource).into()),
113                    "Microsoft.ApplicationInsights.Exception",
114                ),
115                _ => (
116                    Data::Message(EventAndResource(event, event_resource).into()),
117                    "Microsoft.ApplicationInsights.Message",
118                ),
119            };
120            result.push(Envelope {
121                name,
122                time: time_to_string(event.timestamp).into(),
123                sample_rate: Some(self.sample_rate),
124                i_key: Some(self.instrumentation_key.clone().into()),
125                tags: Some(get_tags_for_event(&span, resource)),
126                data: Some(data),
127            });
128        }
129
130        result
131    }
132}
133
134#[cfg_attr(docsrs, doc(cfg(feature = "trace")))]
135impl<C> SpanExporter for Exporter<C>
136where
137    C: HttpClient + 'static,
138{
139    /// Export spans to Application Insights
140    async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
141        let client = Arc::clone(&self.client);
142        let endpoint = Arc::clone(&self.track_endpoint);
143        let envelopes: Vec<_> = batch
144            .into_iter()
145            .flat_map(|span| self.create_envelopes_for_span(span, &self.resource))
146            .collect();
147
148        crate::uploader::send(
149            client.as_ref(),
150            endpoint.as_ref(),
151            envelopes,
152            self.retry_notify.clone(),
153        )
154        .await
155        .map_err(Into::into)
156    }
157
158    fn set_resource(&mut self, resource: &Resource) {
159        self.resource = resource.clone();
160    }
161}
162
163fn get_url_path_and_query<'v>(attrs: &HashMap<&str, &'v Value>) -> Option<Cow<'v, str>> {
164    if let Some(path) = attrs.get(semcov::trace::URL_PATH) {
165        if let Some(query) = attrs.get(semcov::trace::URL_QUERY) {
166            Some(format!("{}?{}", path, query).into())
167        } else {
168            Some(path.as_str())
169        }
170    } else {
171        attrs
172            .get(
173                #[allow(deprecated)]
174                semcov::attribute::HTTP_TARGET,
175            )
176            .map(|target| target.as_str())
177    }
178}
179
180fn get_server_host<'v>(attrs: &HashMap<&str, &'v Value>) -> Option<Cow<'v, str>> {
181    if let Some(host) = attrs.get(HTTP_REQUEST_HEADER_HOST) {
182        Some(host.as_str())
183    } else if let Some(host) = attrs.get(DEPRECATED_HTTP_HOST) {
184        Some(host.as_str())
185    } else if let (Some(host_name), Some(host_port)) = (
186        attrs.get(semcov::trace::SERVER_ADDRESS).or_else(|| {
187            attrs.get(
188                #[allow(deprecated)]
189                semcov::attribute::NET_HOST_NAME,
190            )
191        }),
192        attrs.get(semcov::trace::SERVER_PORT).or_else(|| {
193            attrs.get(
194                #[allow(deprecated)]
195                semcov::attribute::NET_HOST_PORT,
196            )
197        }),
198    ) {
199        Some(format!("{}:{}", host_name.as_str(), host_port.as_str()).into())
200    } else {
201        None
202    }
203}
204
205pub(crate) fn get_duration(span: &SpanData) -> Duration {
206    span.end_time
207        .duration_since(span.start_time)
208        .unwrap_or_default()
209}
210
211pub(crate) fn is_request_success(span: &SpanData) -> bool {
212    !matches!(span.status, Status::Error { .. })
213}
214
215pub(crate) fn is_remote_dependency_success(span: &SpanData) -> Option<bool> {
216    match span.status {
217        Status::Unset => None,
218        Status::Ok => Some(true),
219        Status::Error { .. } => Some(false),
220    }
221}
222
223struct SpanAndResource<'a>(&'a SpanData, &'a Resource);
224
225impl<'a> From<SpanAndResource<'a>> for RequestData {
226    fn from(SpanAndResource(span, resource): SpanAndResource<'a>) -> RequestData {
227        let mut data = RequestData {
228            ver: 2,
229            id: span.span_context.span_id().to_string().into(),
230            name: Some(LimitedLenString::<1024>::from(span.name.clone()))
231                .filter(|x| !x.as_ref().is_empty()),
232            duration: duration_to_string(get_duration(span)),
233            response_code: status_to_result_code(&span.status).to_string().into(),
234            success: is_request_success(span),
235            source: None,
236            url: None,
237            properties: attrs_to_properties(
238                span.attributes.iter(),
239                Some(resource),
240                &span.links.links,
241            ),
242        };
243
244        let attrs: HashMap<&str, &Value> = span
245            .attributes
246            .iter()
247            .map(|kv| (kv.key.as_str(), &kv.value))
248            .collect();
249
250        if let Some(&method) = attrs.get(semcov::trace::HTTP_REQUEST_METHOD).or_else(|| {
251            #[allow(deprecated)]
252            attrs.get(semcov::attribute::HTTP_METHOD)
253        }) {
254            data.name = Some(if let Some(route) = attrs.get(semcov::trace::HTTP_ROUTE) {
255                format!("{} {}", method.as_str(), route.as_str()).into()
256            } else {
257                method.into()
258            });
259        }
260
261        if let Some(&status_code) = attrs.get(semcov::trace::HTTP_RESPONSE_STATUS_CODE) {
262            data.response_code = status_code.into();
263        } else if let Some(&status_code) = attrs.get(
264            #[allow(deprecated)]
265            semcov::attribute::HTTP_STATUS_CODE,
266        ) {
267            data.response_code = status_code.into();
268        }
269
270        if let Some(&url) = attrs.get(semcov::trace::URL_FULL) {
271            data.url = Some(url.into());
272        } else if let Some(&url) = attrs.get(
273            #[allow(deprecated)]
274            semcov::attribute::HTTP_URL,
275        ) {
276            data.url = Some(url.into());
277        } else if let Some(target) = get_url_path_and_query(&attrs) {
278            let mut target = target.into_owned();
279            if !target.starts_with('/') {
280                target.insert(0, '/');
281            }
282
283            if let (Some(scheme), Some(host)) = (
284                attrs.get(semcov::trace::URL_SCHEME).or_else(|| {
285                    attrs.get(
286                        #[allow(deprecated)]
287                        semcov::attribute::HTTP_SCHEME,
288                    )
289                }),
290                get_server_host(&attrs),
291            ) {
292                data.url = Some(format!("{}://{}{}", scheme.as_str(), host, target).into());
293            } else {
294                data.url = Some(target.into());
295            }
296        }
297
298        if let Some(&client_address) = attrs.get(semcov::trace::CLIENT_ADDRESS) {
299            data.source = Some(client_address.into());
300        } else if let Some(&client_ip) = attrs.get(DEPRECATED_HTTP_CLIENT_IP) {
301            data.source = Some(client_ip.into());
302        } else if let Some(&peer_addr) = attrs.get(semcov::trace::NETWORK_PEER_ADDRESS) {
303            data.source = Some(peer_addr.into());
304        } else if let Some(&peer_addr) = attrs.get(DEPRECATED_CLIENT_SOCKET_ADDRESS) {
305            data.source = Some(peer_addr.into());
306        } else if let Some(&peer_addr) = attrs.get(
307            #[allow(deprecated)]
308            semcov::attribute::NET_SOCK_PEER_ADDR,
309        ) {
310            data.source = Some(peer_addr.into());
311        } else if let Some(&peer_ip) = attrs.get(DEPRECATED_NET_PEER_IP) {
312            data.source = Some(peer_ip.into());
313        }
314
315        data
316    }
317}
318
319impl<'a> From<SpanAndResource<'a>> for RemoteDependencyData {
320    fn from(SpanAndResource(span, resource): SpanAndResource<'a>) -> RemoteDependencyData {
321        let mut data = RemoteDependencyData {
322            ver: 2,
323            id: Some(span.span_context.span_id().to_string().into()),
324            name: span.name.clone().into(),
325            duration: duration_to_string(get_duration(span)),
326            result_code: Some(status_to_result_code(&span.status).to_string().into()),
327            success: is_remote_dependency_success(span),
328            data: None,
329            target: None,
330            type_: None,
331            properties: attrs_to_properties(
332                span.attributes.iter(),
333                Some(resource),
334                &span.links.links,
335            ),
336        };
337
338        let attrs: HashMap<&str, &Value> = span
339            .attributes
340            .iter()
341            .map(|kv| (kv.key.as_str(), &kv.value))
342            .collect();
343
344        if let Some(&status_code) = attrs.get(semcov::trace::HTTP_RESPONSE_STATUS_CODE) {
345            data.result_code = Some(status_code.into());
346        } else if let Some(&status_code) = attrs.get(
347            #[allow(deprecated)]
348            semcov::attribute::HTTP_STATUS_CODE,
349        ) {
350            data.result_code = Some(status_code.into());
351        }
352
353        if let Some(&url) = attrs.get(semcov::trace::URL_FULL) {
354            data.data = Some(url.into());
355        } else if let Some(&url) = attrs.get(
356            #[allow(deprecated)]
357            semcov::attribute::HTTP_URL,
358        ) {
359            data.data = Some(url.into());
360        } else if let Some(&statement) = attrs.get(semcov::attribute::DB_QUERY_TEXT).or_else(|| {
361            attrs.get(
362                #[allow(deprecated)]
363                semcov::attribute::DB_STATEMENT,
364            )
365        }) {
366            data.data = Some(statement.into());
367        }
368
369        if let Some(&host) = attrs.get(HTTP_REQUEST_HEADER_HOST) {
370            data.target = Some(host.into());
371        } else if let Some(&host) = attrs.get(DEPRECATED_HTTP_HOST) {
372            data.target = Some(host.into());
373        } else if let Some(&peer_name) = attrs
374            .get(semcov::trace::SERVER_ADDRESS)
375            .or_else(|| attrs.get(semcov::trace::NETWORK_PEER_ADDRESS))
376            .or_else(|| attrs.get(DEPRECATED_SERVER_SOCKET_ADDRESS))
377            .or_else(|| {
378                attrs.get(
379                    #[allow(deprecated)]
380                    semcov::attribute::NET_SOCK_PEER_NAME,
381                )
382            })
383            .or_else(|| {
384                attrs.get(
385                    #[allow(deprecated)]
386                    semcov::attribute::NET_PEER_NAME,
387                )
388            })
389            .or_else(|| {
390                attrs.get(
391                    #[allow(deprecated)]
392                    semcov::attribute::NET_SOCK_PEER_ADDR,
393                )
394            })
395            .or_else(|| attrs.get(DEPRECATED_NET_PEER_IP))
396        {
397            if let Some(peer_port) = attrs
398                .get(semcov::trace::SERVER_PORT)
399                .or_else(|| attrs.get(semcov::trace::NETWORK_PEER_PORT))
400                .or_else(|| attrs.get(DEPRECATED_SERVER_SOCKET_PORT))
401                .or_else(|| {
402                    attrs.get(
403                        #[allow(deprecated)]
404                        semcov::attribute::NET_SOCK_PEER_PORT,
405                    )
406                })
407                .or_else(|| {
408                    attrs.get(
409                        #[allow(deprecated)]
410                        semcov::attribute::NET_PEER_PORT,
411                    )
412                })
413            {
414                data.target = Some(format!("{}:{}", peer_name.as_str(), peer_port.as_str()).into());
415            } else {
416                data.target = Some(peer_name.into());
417            }
418        } else if let Some(&db_name) = attrs.get(semcov::attribute::DB_NAMESPACE).or_else(|| {
419            attrs.get(
420                #[allow(deprecated)]
421                semcov::attribute::DB_NAME,
422            )
423        }) {
424            data.target = Some(db_name.into());
425        }
426
427        if span.span_kind == SpanKind::Internal {
428            data.type_ = Some("InProc".into());
429        } else if let Some(&db_system) = attrs.get(semcov::trace::DB_SYSTEM_NAME).or_else(|| {
430            attrs.get(
431                #[allow(deprecated)]
432                semcov::attribute::DB_SYSTEM,
433            )
434        }) {
435            data.type_ = Some(db_system.into());
436        } else if let Some(&messaging_system) = attrs.get(semcov::attribute::MESSAGING_SYSTEM) {
437            data.type_ = Some(messaging_system.into());
438        } else if let Some(&rpc_system) = attrs.get(semcov::trace::RPC_SYSTEM) {
439            data.type_ = Some(rpc_system.into());
440        } else if let Some(ref properties) = data.properties {
441            if properties.keys().any(|x| x.as_ref().starts_with("http.")) {
442                data.type_ = Some("HTTP".into());
443            } else if properties.keys().any(|x| x.as_ref().starts_with("db.")) {
444                data.type_ = Some("DB".into());
445            }
446        }
447
448        data
449    }
450}
451
452struct EventAndResource<'a>(&'a Event, Option<&'a Resource>);
453
454impl From<EventAndResource<'_>> for ExceptionData {
455    fn from(EventAndResource(event, resource): EventAndResource<'_>) -> Self {
456        let mut attrs = attrs_to_map(event.attributes.iter());
457        let exception = ExceptionDetails {
458            type_name: attrs
459                .remove(semcov::trace::EXCEPTION_TYPE)
460                .map(Into::into)
461                .unwrap_or_else(|| "<no type>".into()),
462            message: attrs
463                .remove(semcov::trace::EXCEPTION_MESSAGE)
464                .map(Into::into)
465                .unwrap_or_else(|| "<no message>".into()),
466            stack: attrs
467                .remove(semcov::trace::EXCEPTION_STACKTRACE)
468                .map(Into::into),
469        };
470        ExceptionData {
471            ver: 2,
472            exceptions: vec![exception],
473            severity_level: None,
474            properties: attrs_map_to_properties(attrs, resource),
475        }
476    }
477}
478
479impl From<EventAndResource<'_>> for EventData {
480    fn from(EventAndResource(event, resource): EventAndResource<'_>) -> Self {
481        let mut attrs = attrs_to_map(event.attributes.iter());
482        EventData {
483            ver: 2,
484            name: attrs
485                .remove(CUSTOM_EVENT_NAME)
486                .map(Into::into)
487                .unwrap_or_else(|| "<no name>".into()),
488            properties: attrs_map_to_properties(attrs, resource),
489        }
490    }
491}
492
493/// The `tracing` create includes the severity level in an attribute called "level".
494///
495/// https://github.com/tokio-rs/tracing/blob/a0126b2e2d465e8e6d514acdf128fcef5b863d27/tracing-opentelemetry/src/subscriber.rs#L839
496const LEVEL: &str = "level";
497
498impl From<EventAndResource<'_>> for MessageData {
499    fn from(EventAndResource(event, resource): EventAndResource<'_>) -> Self {
500        let mut attrs = attrs_to_map(event.attributes.iter());
501        let severity_level = attrs.get(LEVEL).and_then(|&x| value_to_severity_level(x));
502        if severity_level.is_some() {
503            attrs.remove(LEVEL);
504        }
505        MessageData {
506            ver: 2,
507            severity_level,
508            message: if event.name.is_empty() {
509                "<no message>".into()
510            } else {
511                event.name.clone().into_owned().into()
512            },
513            properties: attrs_map_to_properties(attrs, resource),
514        }
515    }
516}