Skip to main content

libdd_telemetry/worker/
http_client.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use http_body_util::BodyExt;
5use libdd_common::{http_common, HttpRequestBuilder};
6use std::{
7    fs::OpenOptions,
8    future::Future,
9    io::Write,
10    pin::Pin,
11    sync::{Arc, Mutex},
12};
13
14use crate::config::Config;
15use tracing::{debug, error};
16
17pub mod header {
18    #![allow(clippy::declare_interior_mutable_const)]
19    use http::header::HeaderName;
20    pub const REQUEST_TYPE: HeaderName = HeaderName::from_static("dd-telemetry-request-type");
21    pub const API_VERSION: HeaderName = HeaderName::from_static("dd-telemetry-api-version");
22    pub const LIBRARY_LANGUAGE: HeaderName = HeaderName::from_static("dd-client-library-language");
23    pub const LIBRARY_VERSION: HeaderName = HeaderName::from_static("dd-client-library-version");
24
25    /// Header key for whether to enable debug mode of telemetry.
26    pub const DEBUG_ENABLED: HeaderName = HeaderName::from_static("dd-telemetry-debug-enabled");
27}
28
29pub type ResponseFuture =
30    Pin<Box<dyn Future<Output = Result<http_common::HttpResponse, http_common::Error>> + Send>>;
31
32pub trait HttpClient {
33    fn request(&self, req: http_common::HttpRequest) -> ResponseFuture;
34}
35
36pub fn request_builder(c: &Config) -> anyhow::Result<HttpRequestBuilder> {
37    match &c.endpoint {
38        Some(e) => {
39            debug!(
40                endpoint.url = %e.url,
41                endpoint.timeout_ms = e.timeout_ms,
42                telemetry.version = env!("CARGO_PKG_VERSION"),
43                "Building telemetry request"
44            );
45            let mut builder =
46                e.to_request_builder(concat!("telemetry/", env!("CARGO_PKG_VERSION")));
47            if c.debug_enabled {
48                debug!(
49                    telemetry.debug_enabled = true,
50                    "Telemetry debug mode enabled"
51                );
52                builder = Ok(builder?.header(header::DEBUG_ENABLED, "true"))
53            }
54            builder
55        }
56        None => {
57            error!("No valid telemetry endpoint found, cannot build request");
58            Err(anyhow::Error::msg(
59                "no valid endpoint found, can't build the request".to_string(),
60            ))
61        }
62    }
63}
64
65pub fn from_config(c: &Config) -> Box<dyn HttpClient + Sync + Send> {
66    match &c.endpoint {
67        Some(e) if e.url.scheme_str() == Some("file") => {
68            #[allow(clippy::expect_used)]
69            let file_path = libdd_common::decode_uri_path_in_authority(&e.url)
70                .expect("file urls should always have been encoded in authority");
71            debug!(
72                file.path = ?file_path,
73                "Using file-based mock telemetry client"
74            );
75            return Box::new(MockClient {
76                #[allow(clippy::expect_used)]
77                file: Arc::new(Mutex::new(Box::new(
78                    OpenOptions::new()
79                        .create(true)
80                        .append(true)
81                        .open(file_path.as_path())
82                        .expect("Couldn't open mock client file"),
83                ))),
84            });
85        }
86        Some(e) => {
87            debug!(
88                endpoint.url = %e.url,
89                endpoint.timeout_ms = e.timeout_ms,
90                "Using HTTP telemetry client"
91            );
92        }
93        None => {
94            debug!(
95                endpoint = "default",
96                "No telemetry endpoint configured, using default HTTP client"
97            );
98        }
99    };
100    Box::new(HyperClient {
101        inner: http_common::new_client_periodic(),
102    })
103}
104
105pub struct HyperClient {
106    inner: libdd_common::HttpClient,
107}
108
109impl HttpClient for HyperClient {
110    fn request(&self, req: http_common::HttpRequest) -> ResponseFuture {
111        let resp = self.inner.request(req);
112        Box::pin(async move {
113            match resp.await {
114                Ok(response) => Ok(http_common::into_response(response)),
115                Err(e) => Err(http_common::Error::Client(http_common::into_error(e))),
116            }
117        })
118    }
119}
120
121#[derive(Clone)]
122pub struct MockClient {
123    file: Arc<Mutex<Box<dyn Write + Sync + Send>>>,
124}
125
126impl HttpClient for MockClient {
127    fn request(&self, req: http_common::HttpRequest) -> ResponseFuture {
128        let s = self.clone();
129        Box::pin(async move {
130            debug!("MockClient writing request to file");
131            let mut body = req.collect().await?.to_bytes().to_vec();
132            body.push(b'\n');
133
134            {
135                #[allow(clippy::expect_used)]
136                let mut writer = s.file.lock().expect("mutex poisoned");
137
138                match writer.write_all(body.as_ref()) {
139                    Ok(()) => debug!(
140                        file.bytes_written = body.len(),
141                        "Successfully wrote payload to mock file"
142                    ),
143                    Err(e) => {
144                        error!(
145                            error = %e,
146                            "Failed to write to mock file"
147                        );
148                        return Err(http_common::Error::from(e));
149                    }
150                }
151            }
152
153            debug!(http.status = 202, "MockClient returning success response");
154            http_common::empty_response(http::Response::builder().status(202))
155        })
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use libdd_common::HttpRequestBuilder;
162
163    use super::*;
164
165    #[tokio::test]
166    #[cfg_attr(miri, ignore)]
167    async fn test_mock_client() {
168        let output: Vec<u8> = Vec::new();
169        let c = MockClient {
170            file: Arc::new(Mutex::new(Box::new(output))),
171        };
172        c.request(
173            HttpRequestBuilder::new()
174                .body(http_common::Body::from("hello world\n"))
175                .unwrap(),
176        )
177        .await
178        .unwrap();
179    }
180}