libdd-trace-utils 4.0.0

Trace utilities including span processing, MessagePack encoding/decoding, payload handling, and HTTP transport with retry logic for Datadog APM
Documentation
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

#[cfg(feature = "mini_agent")]
pub use mini_agent::*;

#[cfg(feature = "mini_agent")]
mod mini_agent {
    use bytes::{Buf, Bytes};
    use http_body_util::BodyExt;
    use libdd_capabilities::HttpClientCapability;
    use libdd_common::http_common;
    use libdd_common::Endpoint;
    use libdd_trace_protobuf::pb;
    use std::io::Write;
    use tracing::debug;

    pub async fn get_stats_from_request_body(
        body: http_common::Body,
    ) -> anyhow::Result<pb::ClientStatsPayload> {
        let buffer = BodyExt::collect(body).await?.aggregate();

        let client_stats_payload: pb::ClientStatsPayload =
            match rmp_serde::from_read(buffer.reader()) {
                Ok(res) => res,
                Err(err) => {
                    anyhow::bail!("Error deserializing stats from request body: {err}")
                }
            };

        if client_stats_payload.stats.is_empty() {
            debug!("Empty trace stats payload received, but this is okay");
        }
        Ok(client_stats_payload)
    }

    pub fn construct_stats_payload(stats: Vec<pb::ClientStatsPayload>) -> pb::StatsPayload {
        // set hostname on stats from tracer to empty string for serverless
        let stats = stats
            .into_iter()
            .map(|mut stat| {
                stat.hostname = "".to_string();
                stat
            })
            .collect();
        pb::StatsPayload {
            agent_hostname: "".to_string(),
            agent_env: "".to_string(),
            stats,
            agent_version: "".to_string(),
            client_computed: true,
            split_payload: false,
        }
    }

    pub fn serialize_stats_payload(payload: pb::StatsPayload) -> anyhow::Result<Vec<u8>> {
        let msgpack = rmp_serde::to_vec_named(&payload)?;
        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
        encoder.write_all(&msgpack)?;
        match encoder.finish() {
            Ok(res) => Ok(res),
            Err(e) => anyhow::bail!("Error serializing stats payload: {e}"),
        }
    }

    pub async fn send_stats_payload<H: HttpClientCapability>(
        data: Vec<u8>,
        target: &Endpoint,
        api_key: &str,
    ) -> anyhow::Result<()> {
        let client = H::new_client();
        let req = http::Request::builder()
            .method(http::Method::POST)
            .uri(target.url.clone())
            .header("Content-Type", "application/msgpack")
            .header("Content-Encoding", "gzip")
            .header("DD-API-KEY", api_key)
            .body(Bytes::from(data))?;

        let response = client
            .request(req)
            .await
            .map_err(|e| anyhow::anyhow!("Failed to send trace stats: {e}"))?;

        if response.status() != http::StatusCode::ACCEPTED {
            let response_body =
                String::from_utf8(response.into_body().to_vec()).unwrap_or_default();
            anyhow::bail!("Server did not accept trace stats: {response_body}");
        }
        Ok(())
    }
}

#[cfg(test)]
#[cfg(feature = "mini_agent")]
mod mini_agent_tests {
    use crate::stats_utils;
    use http::Request;
    use libdd_common::http_common;
    use libdd_trace_protobuf::pb::{
        ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, Trilean::NotSet,
    };
    use serde_json::Value;

    #[tokio::test]
    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
    async fn test_get_stats_from_request_body() {
        let stats_json = r#"{
            "Hostname": "TestHost",
            "Env": "test",
            "Version": "1.0.0",
            "Stats": [
                {
                    "Start": 0,
                    "Duration": 10000000000,
                    "Stats": [
                        {
                            "Name": "test-span",
                            "Service": "test-service",
                            "Resource": "test-span",
                            "Type": "",
                            "HTTPStatusCode": 0,
                            "Synthetics": false,
                            "Hits": 1,
                            "TopLevelHits": 1,
                            "Errors": 0,
                            "Duration": 10000000,
                            "OkSummary": [
                                0,
                                0,
                                0
                            ],
                            "ErrorSummary": [
                                0,
                                0,
                                0
                            ],
                            "GRPCStatusCode": "0",
                            "HTTPMethod": "GET",
                            "HTTPEndpoint": "/test"
                        }
                    ]
                }
            ],
            "Lang": "javascript",
            "TracerVersion": "1.0.0",
            "RuntimeID": "00000000-0000-0000-0000-000000000000",
            "Sequence": 1
        }"#;

        let v: Value = match serde_json::from_str(stats_json) {
            Ok(value) => value,
            Err(err) => {
                panic!("Failed to parse stats JSON: {err}");
            }
        };

        let bytes = rmp_serde::to_vec(&v).unwrap();
        let request = Request::builder()
            .body(http_common::Body::from(bytes))
            .unwrap();

        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;

        let client_stats_payload = ClientStatsPayload {
            hostname: "TestHost".to_string(),
            env: "test".to_string(),
            version: "1.0.0".to_string(),
            stats: vec![ClientStatsBucket {
                start: 0,
                duration: 10000000000,
                stats: vec![ClientGroupedStats {
                    service: "test-service".to_string(),
                    name: "test-span".to_string(),
                    resource: "test-span".to_string(),
                    http_status_code: 0,
                    r#type: "".to_string(),
                    db_type: "".to_string(),
                    hits: 1,
                    errors: 0,
                    duration: 10000000,
                    ok_summary: vec![0, 0, 0],
                    error_summary: vec![0, 0, 0],
                    synthetics: false,
                    top_level_hits: 1,
                    span_kind: "".to_string(),
                    peer_tags: vec![],
                    is_trace_root: NotSet.into(),
                    grpc_status_code: "0".to_string(),
                    http_endpoint: "/test".to_string(),
                    http_method: "GET".to_string(),
                    service_source: "".to_string(),
                    span_derived_primary_tags: vec![],
                }],
                agent_time_shift: 0,
            }],
            lang: "javascript".to_string(),
            tracer_version: "1.0.0".to_string(),
            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
            sequence: 1,
            agent_aggregation: "".to_string(),
            service: "".to_string(),
            container_id: "".to_string(),
            tags: vec![],
            git_commit_sha: "".to_string(),
            image_tag: "".to_string(),
            process_tags_hash: 0,
            process_tags: "".to_string(),
        };

        assert!(
            res.is_ok(),
            "Expected Ok result, but got Err: {}",
            res.unwrap_err()
        );
        assert_eq!(res.unwrap(), client_stats_payload)
    }

    #[tokio::test]
    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
    async fn test_get_stats_from_request_body_without_stats() {
        let stats_json = r#"{
            "Hostname": "TestHost",
            "Env": "test",
            "Version": "1.0.0",
            "Lang": "javascript",
            "TracerVersion": "1.0.0",
            "RuntimeID": "00000000-0000-0000-0000-000000000000",
            "Sequence": 1
        }"#;

        let v: Value = match serde_json::from_str(stats_json) {
            Ok(value) => value,
            Err(err) => {
                panic!("Failed to parse stats JSON: {err}");
            }
        };

        let bytes = rmp_serde::to_vec(&v).unwrap();
        let request = Request::builder()
            .body(http_common::Body::from(bytes))
            .unwrap();

        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;

        let client_stats_payload = ClientStatsPayload {
            hostname: "TestHost".to_string(),
            env: "test".to_string(),
            version: "1.0.0".to_string(),
            stats: vec![],
            lang: "javascript".to_string(),
            tracer_version: "1.0.0".to_string(),
            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
            sequence: 1,
            agent_aggregation: "".to_string(),
            service: "".to_string(),
            container_id: "".to_string(),
            tags: vec![],
            git_commit_sha: "".to_string(),
            image_tag: "".to_string(),
            process_tags_hash: 0,
            process_tags: "".to_string(),
        };

        assert!(
            res.is_ok(),
            "Expected Ok result, but got Err: {}",
            res.unwrap_err()
        );
        assert_eq!(res.unwrap(), client_stats_payload)
    }

    #[tokio::test]
    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
    async fn test_serialize_client_stats_payload_without_stats() {
        let client_stats_payload_without_stats = ClientStatsPayload {
            hostname: "TestHost".to_string(),
            env: "test".to_string(),
            version: "1.0.0".to_string(),
            stats: vec![],
            lang: "javascript".to_string(),
            tracer_version: "1.0.0".to_string(),
            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
            sequence: 1,
            agent_aggregation: "".to_string(),
            service: "".to_string(),
            container_id: "".to_string(),
            tags: vec![],
            git_commit_sha: "".to_string(),
            image_tag: "".to_string(),
            process_tags_hash: 0,
            process_tags: "".to_string(),
        };

        let client_stats_payload_without_inner_stats = ClientStatsPayload {
            hostname: "TestHost".to_string(),
            env: "test".to_string(),
            version: "1.0.0".to_string(),
            stats: vec![ClientStatsBucket {
                start: 0,
                duration: 10000000000,
                stats: vec![],
                agent_time_shift: 0,
            }],
            lang: "javascript".to_string(),
            tracer_version: "1.0.0".to_string(),
            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
            sequence: 1,
            agent_aggregation: "".to_string(),
            service: "".to_string(),
            container_id: "".to_string(),
            tags: vec![],
            git_commit_sha: "".to_string(),
            image_tag: "".to_string(),
            process_tags_hash: 0,
            process_tags: "".to_string(),
        };

        let res = stats_utils::serialize_stats_payload(stats_utils::construct_stats_payload(vec![
            client_stats_payload_without_stats,
            client_stats_payload_without_inner_stats,
        ]));

        assert!(
            res.is_ok(),
            "Expected Ok result, but got Err: {}",
            res.unwrap_err()
        );
    }
}