1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use minitrace::prelude::*;
use rmp_serde::Serializer;
use serde::Serialize;
use std::collections::HashMap;
use std::error::Error;
use std::net::SocketAddr;

#[allow(clippy::too_many_arguments)]
pub fn encode(
    service_name: &str,
    trace_type: &str,
    resource: &str,
    error_code: i32,
    trace_id: u64,
    root_parent_span_id: u64,
    span_id_prefix: u32,
    spans: &[SpanRecord],
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync + 'static>> {
    let spans = spans.iter().map(|s| MPSpan {
        name: s.event,
        service: service_name,
        trace_type,
        resource,
        start: s.begin_unix_time_ns as i64,
        duration: s.duration_ns as i64,
        meta: if s.properties.is_empty() {
            None
        } else {
            Some(s.properties.iter().map(|(k, v)| (*k, v.as_ref())).collect())
        },
        error_code,
        span_id: (span_id_prefix as u64) << 32 | s.id as u64,
        trace_id,
        parent_id: if s.parent_id == 0 {
            root_parent_span_id
        } else {
            (span_id_prefix as u64) << 32 | s.parent_id as u64
        },
    });

    let mut buf = vec![0b10010001];
    spans
        .collect::<Vec<_>>()
        .serialize(&mut Serializer::new(&mut buf).with_struct_map())?;

    Ok(buf)
}

pub fn report_blocking(
    agent: SocketAddr,
    bytes: Vec<u8>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
    let client = reqwest::blocking::Client::new();
    let rep = client
        .post(&format!("http://{}/v0.4/traces", agent))
        .header("Datadog-Meta-Tracer-Version", "v1.27.0")
        .header("Content-Type", "application/msgpack")
        .body(bytes)
        .send()?;

    if rep.status().as_u16() >= 400 {
        let status = rep.status();
        return Err(format!("{} (Status: {})", rep.text()?, status).into());
    }

    Ok(())
}

pub async fn report(
    agent: SocketAddr,
    bytes: Vec<u8>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
    let client = reqwest::Client::new();
    let rep = client
        .post(&format!("http://{}/v0.4/traces", agent))
        .header("Datadog-Meta-Tracer-Version", "v1.27.0")
        .header("Content-Type", "application/msgpack")
        .body(bytes)
        .send()
        .await?;

    if rep.status().as_u16() >= 400 {
        let status = rep.status();
        return Err(format!("{} (Status: {})", rep.text().await?, status).into());
    }

    Ok(())
}

#[derive(Serialize)]
struct MPSpan<'a> {
    name: &'a str,
    service: &'a str,
    #[serde(rename = "type")]
    trace_type: &'a str,
    resource: &'a str,
    start: i64,
    duration: i64,
    #[serde(skip_serializing_if = "Option::is_none")]
    meta: Option<HashMap<&'a str, &'a str>>,
    error_code: i32,
    span_id: u64,
    trace_id: u64,
    parent_id: u64,
}