1use crate::{
2 convert::{
3 attrs_map_to_properties, attrs_to_map, attrs_to_properties, duration_to_string,
4 status_to_result_code, time_to_string, value_to_severity_level,
5 },
6 models::{
7 context_tag_keys::attrs::CUSTOM_EVENT_NAME, Data, Envelope, EventData, ExceptionData,
8 ExceptionDetails, LimitedLenString, MessageData, RemoteDependencyData, RequestData,
9 },
10 tags::{get_tags_for_event, get_tags_for_span},
11 Exporter,
12};
13use opentelemetry::{
14 trace::{Event, SpanKind, Status},
15 Value,
16};
17use opentelemetry_http::HttpClient;
18use opentelemetry_sdk::{
19 error::OTelSdkResult,
20 trace::{SpanData, SpanExporter},
21 Resource,
22};
23use opentelemetry_semantic_conventions as semcov;
24use std::{borrow::Cow, collections::HashMap, sync::Arc, time::Duration};
25
26const DEPRECATED_HTTP_HOST: &str = "http.host";
30
31const HTTP_REQUEST_HEADER_HOST: &str = "http.request.header.host";
33
34const DEPRECATED_NET_PEER_IP: &str = "net.peer.ip";
39
40const DEPRECATED_HTTP_CLIENT_IP: &str = "http.client_ip";
45
46const DEPRECATED_CLIENT_SOCKET_ADDRESS: &str = "client.socket.address";
51
52const DEPRECATED_SERVER_SOCKET_ADDRESS: &str = "server.socket.address";
57
58const DEPRECATED_SERVER_SOCKET_PORT: &str = "server.socket.port";
63
64pub(crate) const EVENT_NAME_CUSTOM: &str = "ai.custom";
65pub(crate) const EVENT_NAME_EXCEPTION: &str = "exception";
66
67impl<C> Exporter<C> {
68 fn create_envelopes_for_span(&self, span: SpanData, resource: &Resource) -> Vec<Envelope> {
69 let mut result = Vec::with_capacity(1 + span.events.len());
70
71 let (data, tags, name) = match span.span_kind {
72 SpanKind::Server | SpanKind::Consumer => {
73 let data: RequestData = SpanAndResource(&span, resource).into();
74 let tags = get_tags_for_span(&span, resource);
75 (
76 Data::Request(data),
77 tags,
78 "Microsoft.ApplicationInsights.Request",
79 )
80 }
81 SpanKind::Client | SpanKind::Producer | SpanKind::Internal => {
82 let data: RemoteDependencyData = SpanAndResource(&span, resource).into();
83 let tags = get_tags_for_span(&span, resource);
84 (
85 Data::RemoteDependency(data),
86 tags,
87 "Microsoft.ApplicationInsights.RemoteDependency",
88 )
89 }
90 };
91 result.push(Envelope {
92 name,
93 time: time_to_string(span.start_time).into(),
94 sample_rate: Some(self.sample_rate),
95 i_key: Some(self.instrumentation_key.clone().into()),
96 tags: Some(tags),
97 data: Some(data),
98 });
99
100 let event_resource = if self.resource_attributes_in_events_and_logs {
101 Some(resource)
102 } else {
103 None
104 };
105 for event in span.events.iter() {
106 let (data, name) = match event.name.as_ref() {
107 x if x == EVENT_NAME_CUSTOM => (
108 Data::Event(EventAndResource(event, event_resource).into()),
109 "Microsoft.ApplicationInsights.Event",
110 ),
111 x if x == EVENT_NAME_EXCEPTION => (
112 Data::Exception(EventAndResource(event, event_resource).into()),
113 "Microsoft.ApplicationInsights.Exception",
114 ),
115 _ => (
116 Data::Message(EventAndResource(event, event_resource).into()),
117 "Microsoft.ApplicationInsights.Message",
118 ),
119 };
120 result.push(Envelope {
121 name,
122 time: time_to_string(event.timestamp).into(),
123 sample_rate: Some(self.sample_rate),
124 i_key: Some(self.instrumentation_key.clone().into()),
125 tags: Some(get_tags_for_event(&span, resource)),
126 data: Some(data),
127 });
128 }
129
130 result
131 }
132}
133
134#[cfg_attr(docsrs, doc(cfg(feature = "trace")))]
135impl<C> SpanExporter for Exporter<C>
136where
137 C: HttpClient + 'static,
138{
139 async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
141 let client = Arc::clone(&self.client);
142 let endpoint = Arc::clone(&self.track_endpoint);
143 let envelopes: Vec<_> = batch
144 .into_iter()
145 .flat_map(|span| self.create_envelopes_for_span(span, &self.resource))
146 .collect();
147
148 crate::uploader::send(
149 client.as_ref(),
150 endpoint.as_ref(),
151 envelopes,
152 self.retry_notify.clone(),
153 )
154 .await
155 .map_err(Into::into)
156 }
157
158 fn set_resource(&mut self, resource: &Resource) {
159 self.resource = resource.clone();
160 }
161}
162
163fn get_url_path_and_query<'v>(attrs: &HashMap<&str, &'v Value>) -> Option<Cow<'v, str>> {
164 if let Some(path) = attrs.get(semcov::trace::URL_PATH) {
165 if let Some(query) = attrs.get(semcov::trace::URL_QUERY) {
166 Some(format!("{}?{}", path, query).into())
167 } else {
168 Some(path.as_str())
169 }
170 } else {
171 attrs
172 .get(
173 #[allow(deprecated)]
174 semcov::attribute::HTTP_TARGET,
175 )
176 .map(|target| target.as_str())
177 }
178}
179
180fn get_server_host<'v>(attrs: &HashMap<&str, &'v Value>) -> Option<Cow<'v, str>> {
181 if let Some(host) = attrs.get(HTTP_REQUEST_HEADER_HOST) {
182 Some(host.as_str())
183 } else if let Some(host) = attrs.get(DEPRECATED_HTTP_HOST) {
184 Some(host.as_str())
185 } else if let (Some(host_name), Some(host_port)) = (
186 attrs.get(semcov::trace::SERVER_ADDRESS).or_else(|| {
187 attrs.get(
188 #[allow(deprecated)]
189 semcov::attribute::NET_HOST_NAME,
190 )
191 }),
192 attrs.get(semcov::trace::SERVER_PORT).or_else(|| {
193 attrs.get(
194 #[allow(deprecated)]
195 semcov::attribute::NET_HOST_PORT,
196 )
197 }),
198 ) {
199 Some(format!("{}:{}", host_name.as_str(), host_port.as_str()).into())
200 } else {
201 None
202 }
203}
204
205pub(crate) fn get_duration(span: &SpanData) -> Duration {
206 span.end_time
207 .duration_since(span.start_time)
208 .unwrap_or_default()
209}
210
211pub(crate) fn is_request_success(span: &SpanData) -> bool {
212 !matches!(span.status, Status::Error { .. })
213}
214
215pub(crate) fn is_remote_dependency_success(span: &SpanData) -> Option<bool> {
216 match span.status {
217 Status::Unset => None,
218 Status::Ok => Some(true),
219 Status::Error { .. } => Some(false),
220 }
221}
222
223struct SpanAndResource<'a>(&'a SpanData, &'a Resource);
224
225impl<'a> From<SpanAndResource<'a>> for RequestData {
226 fn from(SpanAndResource(span, resource): SpanAndResource<'a>) -> RequestData {
227 let mut data = RequestData {
228 ver: 2,
229 id: span.span_context.span_id().to_string().into(),
230 name: Some(LimitedLenString::<1024>::from(span.name.clone()))
231 .filter(|x| !x.as_ref().is_empty()),
232 duration: duration_to_string(get_duration(span)),
233 response_code: status_to_result_code(&span.status).to_string().into(),
234 success: is_request_success(span),
235 source: None,
236 url: None,
237 properties: attrs_to_properties(
238 span.attributes.iter(),
239 Some(resource),
240 &span.links.links,
241 ),
242 };
243
244 let attrs: HashMap<&str, &Value> = span
245 .attributes
246 .iter()
247 .map(|kv| (kv.key.as_str(), &kv.value))
248 .collect();
249
250 if let Some(&method) = attrs.get(semcov::trace::HTTP_REQUEST_METHOD).or_else(|| {
251 #[allow(deprecated)]
252 attrs.get(semcov::attribute::HTTP_METHOD)
253 }) {
254 data.name = Some(if let Some(route) = attrs.get(semcov::trace::HTTP_ROUTE) {
255 format!("{} {}", method.as_str(), route.as_str()).into()
256 } else {
257 method.into()
258 });
259 }
260
261 if let Some(&status_code) = attrs.get(semcov::trace::HTTP_RESPONSE_STATUS_CODE) {
262 data.response_code = status_code.into();
263 } else if let Some(&status_code) = attrs.get(
264 #[allow(deprecated)]
265 semcov::attribute::HTTP_STATUS_CODE,
266 ) {
267 data.response_code = status_code.into();
268 }
269
270 if let Some(&url) = attrs.get(semcov::trace::URL_FULL) {
271 data.url = Some(url.into());
272 } else if let Some(&url) = attrs.get(
273 #[allow(deprecated)]
274 semcov::attribute::HTTP_URL,
275 ) {
276 data.url = Some(url.into());
277 } else if let Some(target) = get_url_path_and_query(&attrs) {
278 let mut target = target.into_owned();
279 if !target.starts_with('/') {
280 target.insert(0, '/');
281 }
282
283 if let (Some(scheme), Some(host)) = (
284 attrs.get(semcov::trace::URL_SCHEME).or_else(|| {
285 attrs.get(
286 #[allow(deprecated)]
287 semcov::attribute::HTTP_SCHEME,
288 )
289 }),
290 get_server_host(&attrs),
291 ) {
292 data.url = Some(format!("{}://{}{}", scheme.as_str(), host, target).into());
293 } else {
294 data.url = Some(target.into());
295 }
296 }
297
298 if let Some(&client_address) = attrs.get(semcov::trace::CLIENT_ADDRESS) {
299 data.source = Some(client_address.into());
300 } else if let Some(&client_ip) = attrs.get(DEPRECATED_HTTP_CLIENT_IP) {
301 data.source = Some(client_ip.into());
302 } else if let Some(&peer_addr) = attrs.get(semcov::trace::NETWORK_PEER_ADDRESS) {
303 data.source = Some(peer_addr.into());
304 } else if let Some(&peer_addr) = attrs.get(DEPRECATED_CLIENT_SOCKET_ADDRESS) {
305 data.source = Some(peer_addr.into());
306 } else if let Some(&peer_addr) = attrs.get(
307 #[allow(deprecated)]
308 semcov::attribute::NET_SOCK_PEER_ADDR,
309 ) {
310 data.source = Some(peer_addr.into());
311 } else if let Some(&peer_ip) = attrs.get(DEPRECATED_NET_PEER_IP) {
312 data.source = Some(peer_ip.into());
313 }
314
315 data
316 }
317}
318
319impl<'a> From<SpanAndResource<'a>> for RemoteDependencyData {
320 fn from(SpanAndResource(span, resource): SpanAndResource<'a>) -> RemoteDependencyData {
321 let mut data = RemoteDependencyData {
322 ver: 2,
323 id: Some(span.span_context.span_id().to_string().into()),
324 name: span.name.clone().into(),
325 duration: duration_to_string(get_duration(span)),
326 result_code: Some(status_to_result_code(&span.status).to_string().into()),
327 success: is_remote_dependency_success(span),
328 data: None,
329 target: None,
330 type_: None,
331 properties: attrs_to_properties(
332 span.attributes.iter(),
333 Some(resource),
334 &span.links.links,
335 ),
336 };
337
338 let attrs: HashMap<&str, &Value> = span
339 .attributes
340 .iter()
341 .map(|kv| (kv.key.as_str(), &kv.value))
342 .collect();
343
344 if let Some(&status_code) = attrs.get(semcov::trace::HTTP_RESPONSE_STATUS_CODE) {
345 data.result_code = Some(status_code.into());
346 } else if let Some(&status_code) = attrs.get(
347 #[allow(deprecated)]
348 semcov::attribute::HTTP_STATUS_CODE,
349 ) {
350 data.result_code = Some(status_code.into());
351 }
352
353 if let Some(&url) = attrs.get(semcov::trace::URL_FULL) {
354 data.data = Some(url.into());
355 } else if let Some(&url) = attrs.get(
356 #[allow(deprecated)]
357 semcov::attribute::HTTP_URL,
358 ) {
359 data.data = Some(url.into());
360 } else if let Some(&statement) = attrs.get(semcov::attribute::DB_QUERY_TEXT).or_else(|| {
361 attrs.get(
362 #[allow(deprecated)]
363 semcov::attribute::DB_STATEMENT,
364 )
365 }) {
366 data.data = Some(statement.into());
367 }
368
369 if let Some(&host) = attrs.get(HTTP_REQUEST_HEADER_HOST) {
370 data.target = Some(host.into());
371 } else if let Some(&host) = attrs.get(DEPRECATED_HTTP_HOST) {
372 data.target = Some(host.into());
373 } else if let Some(&peer_name) = attrs
374 .get(semcov::trace::SERVER_ADDRESS)
375 .or_else(|| attrs.get(semcov::trace::NETWORK_PEER_ADDRESS))
376 .or_else(|| attrs.get(DEPRECATED_SERVER_SOCKET_ADDRESS))
377 .or_else(|| {
378 attrs.get(
379 #[allow(deprecated)]
380 semcov::attribute::NET_SOCK_PEER_NAME,
381 )
382 })
383 .or_else(|| {
384 attrs.get(
385 #[allow(deprecated)]
386 semcov::attribute::NET_PEER_NAME,
387 )
388 })
389 .or_else(|| {
390 attrs.get(
391 #[allow(deprecated)]
392 semcov::attribute::NET_SOCK_PEER_ADDR,
393 )
394 })
395 .or_else(|| attrs.get(DEPRECATED_NET_PEER_IP))
396 {
397 if let Some(peer_port) = attrs
398 .get(semcov::trace::SERVER_PORT)
399 .or_else(|| attrs.get(semcov::trace::NETWORK_PEER_PORT))
400 .or_else(|| attrs.get(DEPRECATED_SERVER_SOCKET_PORT))
401 .or_else(|| {
402 attrs.get(
403 #[allow(deprecated)]
404 semcov::attribute::NET_SOCK_PEER_PORT,
405 )
406 })
407 .or_else(|| {
408 attrs.get(
409 #[allow(deprecated)]
410 semcov::attribute::NET_PEER_PORT,
411 )
412 })
413 {
414 data.target = Some(format!("{}:{}", peer_name.as_str(), peer_port.as_str()).into());
415 } else {
416 data.target = Some(peer_name.into());
417 }
418 } else if let Some(&db_name) = attrs.get(semcov::attribute::DB_NAMESPACE).or_else(|| {
419 attrs.get(
420 #[allow(deprecated)]
421 semcov::attribute::DB_NAME,
422 )
423 }) {
424 data.target = Some(db_name.into());
425 }
426
427 if span.span_kind == SpanKind::Internal {
428 data.type_ = Some("InProc".into());
429 } else if let Some(&db_system) = attrs.get(semcov::trace::DB_SYSTEM_NAME).or_else(|| {
430 attrs.get(
431 #[allow(deprecated)]
432 semcov::attribute::DB_SYSTEM,
433 )
434 }) {
435 data.type_ = Some(db_system.into());
436 } else if let Some(&messaging_system) = attrs.get(semcov::attribute::MESSAGING_SYSTEM) {
437 data.type_ = Some(messaging_system.into());
438 } else if let Some(&rpc_system) = attrs.get(semcov::trace::RPC_SYSTEM) {
439 data.type_ = Some(rpc_system.into());
440 } else if let Some(ref properties) = data.properties {
441 if properties.keys().any(|x| x.as_ref().starts_with("http.")) {
442 data.type_ = Some("HTTP".into());
443 } else if properties.keys().any(|x| x.as_ref().starts_with("db.")) {
444 data.type_ = Some("DB".into());
445 }
446 }
447
448 data
449 }
450}
451
452struct EventAndResource<'a>(&'a Event, Option<&'a Resource>);
453
454impl From<EventAndResource<'_>> for ExceptionData {
455 fn from(EventAndResource(event, resource): EventAndResource<'_>) -> Self {
456 let mut attrs = attrs_to_map(event.attributes.iter());
457 let exception = ExceptionDetails {
458 type_name: attrs
459 .remove(semcov::trace::EXCEPTION_TYPE)
460 .map(Into::into)
461 .unwrap_or_else(|| "<no type>".into()),
462 message: attrs
463 .remove(semcov::trace::EXCEPTION_MESSAGE)
464 .map(Into::into)
465 .unwrap_or_else(|| "<no message>".into()),
466 stack: attrs
467 .remove(semcov::trace::EXCEPTION_STACKTRACE)
468 .map(Into::into),
469 };
470 ExceptionData {
471 ver: 2,
472 exceptions: vec![exception],
473 severity_level: None,
474 properties: attrs_map_to_properties(attrs, resource),
475 }
476 }
477}
478
479impl From<EventAndResource<'_>> for EventData {
480 fn from(EventAndResource(event, resource): EventAndResource<'_>) -> Self {
481 let mut attrs = attrs_to_map(event.attributes.iter());
482 EventData {
483 ver: 2,
484 name: attrs
485 .remove(CUSTOM_EVENT_NAME)
486 .map(Into::into)
487 .unwrap_or_else(|| "<no name>".into()),
488 properties: attrs_map_to_properties(attrs, resource),
489 }
490 }
491}
492
493const LEVEL: &str = "level";
497
498impl From<EventAndResource<'_>> for MessageData {
499 fn from(EventAndResource(event, resource): EventAndResource<'_>) -> Self {
500 let mut attrs = attrs_to_map(event.attributes.iter());
501 let severity_level = attrs.get(LEVEL).and_then(|&x| value_to_severity_level(x));
502 if severity_level.is_some() {
503 attrs.remove(LEVEL);
504 }
505 MessageData {
506 ver: 2,
507 severity_level,
508 message: if event.name.is_empty() {
509 "<no message>".into()
510 } else {
511 event.name.clone().into_owned().into()
512 },
513 properties: attrs_map_to_properties(attrs, resource),
514 }
515 }
516}