libdd_telemetry/worker/
http_client.rs1use http_body_util::BodyExt;
5use libdd_common::{http_common, HttpRequestBuilder};
6use std::{
7 fs::OpenOptions,
8 future::Future,
9 io::Write,
10 pin::Pin,
11 sync::{Arc, Mutex},
12};
13
14use crate::config::Config;
15use tracing::{debug, error};
16
17pub mod header {
18 #![allow(clippy::declare_interior_mutable_const)]
19 use http::header::HeaderName;
20 pub const REQUEST_TYPE: HeaderName = HeaderName::from_static("dd-telemetry-request-type");
21 pub const API_VERSION: HeaderName = HeaderName::from_static("dd-telemetry-api-version");
22 pub const LIBRARY_LANGUAGE: HeaderName = HeaderName::from_static("dd-client-library-language");
23 pub const LIBRARY_VERSION: HeaderName = HeaderName::from_static("dd-client-library-version");
24
25 pub const DEBUG_ENABLED: HeaderName = HeaderName::from_static("dd-telemetry-debug-enabled");
27}
28
29pub type ResponseFuture =
30 Pin<Box<dyn Future<Output = Result<http_common::HttpResponse, http_common::Error>> + Send>>;
31
32pub trait HttpClient {
33 fn request(&self, req: http_common::HttpRequest) -> ResponseFuture;
34}
35
36pub fn request_builder(c: &Config) -> anyhow::Result<HttpRequestBuilder> {
37 match &c.endpoint {
38 Some(e) => {
39 debug!(
40 endpoint.url = %e.url,
41 endpoint.timeout_ms = e.timeout_ms,
42 telemetry.version = env!("CARGO_PKG_VERSION"),
43 "Building telemetry request"
44 );
45 let mut builder =
46 e.to_request_builder(concat!("telemetry/", env!("CARGO_PKG_VERSION")));
47 if c.debug_enabled {
48 debug!(
49 telemetry.debug_enabled = true,
50 "Telemetry debug mode enabled"
51 );
52 builder = Ok(builder?.header(header::DEBUG_ENABLED, "true"))
53 }
54 builder
55 }
56 None => {
57 error!("No valid telemetry endpoint found, cannot build request");
58 Err(anyhow::Error::msg(
59 "no valid endpoint found, can't build the request".to_string(),
60 ))
61 }
62 }
63}
64
65pub fn from_config(c: &Config) -> Box<dyn HttpClient + Sync + Send> {
66 match &c.endpoint {
67 Some(e) if e.url.scheme_str() == Some("file") => {
68 #[allow(clippy::expect_used)]
69 let file_path = libdd_common::decode_uri_path_in_authority(&e.url)
70 .expect("file urls should always have been encoded in authority");
71 debug!(
72 file.path = ?file_path,
73 "Using file-based mock telemetry client"
74 );
75 return Box::new(MockClient {
76 #[allow(clippy::expect_used)]
77 file: Arc::new(Mutex::new(Box::new(
78 OpenOptions::new()
79 .create(true)
80 .append(true)
81 .open(file_path.as_path())
82 .expect("Couldn't open mock client file"),
83 ))),
84 });
85 }
86 Some(e) => {
87 debug!(
88 endpoint.url = %e.url,
89 endpoint.timeout_ms = e.timeout_ms,
90 "Using HTTP telemetry client"
91 );
92 }
93 None => {
94 debug!(
95 endpoint = "default",
96 "No telemetry endpoint configured, using default HTTP client"
97 );
98 }
99 };
100 Box::new(HyperClient {
101 inner: http_common::new_client_periodic(),
102 })
103}
104
105pub struct HyperClient {
106 inner: libdd_common::HttpClient,
107}
108
109impl HttpClient for HyperClient {
110 fn request(&self, req: http_common::HttpRequest) -> ResponseFuture {
111 let resp = self.inner.request(req);
112 Box::pin(async move {
113 match resp.await {
114 Ok(response) => Ok(http_common::into_response(response)),
115 Err(e) => Err(http_common::Error::Client(http_common::into_error(e))),
116 }
117 })
118 }
119}
120
121#[derive(Clone)]
122pub struct MockClient {
123 file: Arc<Mutex<Box<dyn Write + Sync + Send>>>,
124}
125
126impl HttpClient for MockClient {
127 fn request(&self, req: http_common::HttpRequest) -> ResponseFuture {
128 let s = self.clone();
129 Box::pin(async move {
130 debug!("MockClient writing request to file");
131 let mut body = req.collect().await?.to_bytes().to_vec();
132 body.push(b'\n');
133
134 {
135 #[allow(clippy::expect_used)]
136 let mut writer = s.file.lock().expect("mutex poisoned");
137
138 match writer.write_all(body.as_ref()) {
139 Ok(()) => debug!(
140 file.bytes_written = body.len(),
141 "Successfully wrote payload to mock file"
142 ),
143 Err(e) => {
144 error!(
145 error = %e,
146 "Failed to write to mock file"
147 );
148 return Err(http_common::Error::from(e));
149 }
150 }
151 }
152
153 debug!(http.status = 202, "MockClient returning success response");
154 http_common::empty_response(http::Response::builder().status(202))
155 })
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use libdd_common::HttpRequestBuilder;
162
163 use super::*;
164
165 #[tokio::test]
166 #[cfg_attr(miri, ignore)]
167 async fn test_mock_client() {
168 let output: Vec<u8> = Vec::new();
169 let c = MockClient {
170 file: Arc::new(Mutex::new(Box::new(output))),
171 };
172 c.request(
173 HttpRequestBuilder::new()
174 .body(http_common::Body::from("hello world\n"))
175 .unwrap(),
176 )
177 .await
178 .unwrap();
179 }
180}