libdd_telemetry/worker/
http_client.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use http_body_util::BodyExt;
5use libdd_common::{hyper_migration, HttpRequestBuilder};
6use std::{
7    fs::OpenOptions,
8    future::Future,
9    io::Write,
10    pin::Pin,
11    sync::{Arc, Mutex},
12};
13
14use crate::config::Config;
15use tracing::{debug, error};
16
17pub mod header {
18    #![allow(clippy::declare_interior_mutable_const)]
19    use http::header::HeaderName;
20    pub const REQUEST_TYPE: HeaderName = HeaderName::from_static("dd-telemetry-request-type");
21    pub const API_VERSION: HeaderName = HeaderName::from_static("dd-telemetry-api-version");
22    pub const LIBRARY_LANGUAGE: HeaderName = HeaderName::from_static("dd-client-library-language");
23    pub const LIBRARY_VERSION: HeaderName = HeaderName::from_static("dd-client-library-version");
24
25    /// Header key for whether to enable debug mode of telemetry.
26    pub const DEBUG_ENABLED: HeaderName = HeaderName::from_static("dd-telemetry-debug-enabled");
27}
28
29pub type ResponseFuture = Pin<
30    Box<dyn Future<Output = Result<hyper_migration::HttpResponse, hyper_migration::Error>> + Send>,
31>;
32
33pub trait HttpClient {
34    fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture;
35}
36
37pub fn request_builder(c: &Config) -> anyhow::Result<HttpRequestBuilder> {
38    match &c.endpoint {
39        Some(e) => {
40            debug!(
41                endpoint.url = %e.url,
42                endpoint.timeout_ms = e.timeout_ms,
43                telemetry.version = env!("CARGO_PKG_VERSION"),
44                "Building telemetry request"
45            );
46            let mut builder =
47                e.to_request_builder(concat!("telemetry/", env!("CARGO_PKG_VERSION")));
48            if c.debug_enabled {
49                debug!(
50                    telemetry.debug_enabled = true,
51                    "Telemetry debug mode enabled"
52                );
53                builder = Ok(builder?.header(header::DEBUG_ENABLED, "true"))
54            }
55            builder
56        }
57        None => {
58            error!("No valid telemetry endpoint found, cannot build request");
59            Err(anyhow::Error::msg(
60                "no valid endpoint found, can't build the request".to_string(),
61            ))
62        }
63    }
64}
65
66pub fn from_config(c: &Config) -> Box<dyn HttpClient + Sync + Send> {
67    match &c.endpoint {
68        Some(e) if e.url.scheme_str() == Some("file") => {
69            #[allow(clippy::expect_used)]
70            let file_path = libdd_common::decode_uri_path_in_authority(&e.url)
71                .expect("file urls should always have been encoded in authority");
72            debug!(
73                file.path = ?file_path,
74                "Using file-based mock telemetry client"
75            );
76            return Box::new(MockClient {
77                #[allow(clippy::expect_used)]
78                file: Arc::new(Mutex::new(Box::new(
79                    OpenOptions::new()
80                        .create(true)
81                        .append(true)
82                        .open(file_path.as_path())
83                        .expect("Couldn't open mock client file"),
84                ))),
85            });
86        }
87        Some(e) => {
88            debug!(
89                endpoint.url = %e.url,
90                endpoint.timeout_ms = e.timeout_ms,
91                "Using HTTP telemetry client"
92            );
93        }
94        None => {
95            debug!(
96                endpoint = "default",
97                "No telemetry endpoint configured, using default HTTP client"
98            );
99        }
100    };
101    Box::new(HyperClient {
102        inner: hyper_migration::new_client_periodic(),
103    })
104}
105
106pub struct HyperClient {
107    inner: libdd_common::HttpClient,
108}
109
110impl HttpClient for HyperClient {
111    fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture {
112        let resp = self.inner.request(req);
113        Box::pin(async move {
114            match resp.await {
115                Ok(response) => Ok(hyper_migration::into_response(response)),
116                Err(e) => Err(e.into()),
117            }
118        })
119    }
120}
121
122#[derive(Clone)]
123pub struct MockClient {
124    file: Arc<Mutex<Box<dyn Write + Sync + Send>>>,
125}
126
127impl HttpClient for MockClient {
128    fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture {
129        let s = self.clone();
130        Box::pin(async move {
131            debug!("MockClient writing request to file");
132            let mut body = req.collect().await?.to_bytes().to_vec();
133            body.push(b'\n');
134
135            {
136                #[allow(clippy::expect_used)]
137                let mut writer = s.file.lock().expect("mutex poisoned");
138
139                match writer.write_all(body.as_ref()) {
140                    Ok(()) => debug!(
141                        file.bytes_written = body.len(),
142                        "Successfully wrote payload to mock file"
143                    ),
144                    Err(e) => {
145                        error!(
146                            error = %e,
147                            "Failed to write to mock file"
148                        );
149                        return Err(hyper_migration::Error::from(e));
150                    }
151                }
152            }
153
154            debug!(http.status = 202, "MockClient returning success response");
155            hyper_migration::empty_response(hyper::Response::builder().status(202))
156        })
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use libdd_common::HttpRequestBuilder;
163
164    use super::*;
165
166    #[tokio::test]
167    #[cfg_attr(miri, ignore)]
168    async fn test_mock_client() {
169        let output: Vec<u8> = Vec::new();
170        let c = MockClient {
171            file: Arc::new(Mutex::new(Box::new(output))),
172        };
173        c.request(
174            HttpRequestBuilder::new()
175                .body(hyper_migration::Body::from("hello world\n"))
176                .unwrap(),
177        )
178        .await
179        .unwrap();
180    }
181}