libdd_telemetry/worker/
http_client.rs1use http_body_util::BodyExt;
5use libdd_common::{hyper_migration, HttpRequestBuilder};
6use std::{
7 fs::OpenOptions,
8 future::Future,
9 io::Write,
10 pin::Pin,
11 sync::{Arc, Mutex},
12};
13
14use crate::config::Config;
15use tracing::{debug, error};
16
17pub mod header {
18 #![allow(clippy::declare_interior_mutable_const)]
19 use http::header::HeaderName;
20 pub const REQUEST_TYPE: HeaderName = HeaderName::from_static("dd-telemetry-request-type");
21 pub const API_VERSION: HeaderName = HeaderName::from_static("dd-telemetry-api-version");
22 pub const LIBRARY_LANGUAGE: HeaderName = HeaderName::from_static("dd-client-library-language");
23 pub const LIBRARY_VERSION: HeaderName = HeaderName::from_static("dd-client-library-version");
24
25 pub const DEBUG_ENABLED: HeaderName = HeaderName::from_static("dd-telemetry-debug-enabled");
27}
28
29pub type ResponseFuture = Pin<
30 Box<dyn Future<Output = Result<hyper_migration::HttpResponse, hyper_migration::Error>> + Send>,
31>;
32
33pub trait HttpClient {
34 fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture;
35}
36
37pub fn request_builder(c: &Config) -> anyhow::Result<HttpRequestBuilder> {
38 match &c.endpoint {
39 Some(e) => {
40 debug!(
41 endpoint.url = %e.url,
42 endpoint.timeout_ms = e.timeout_ms,
43 telemetry.version = env!("CARGO_PKG_VERSION"),
44 "Building telemetry request"
45 );
46 let mut builder =
47 e.to_request_builder(concat!("telemetry/", env!("CARGO_PKG_VERSION")));
48 if c.debug_enabled {
49 debug!(
50 telemetry.debug_enabled = true,
51 "Telemetry debug mode enabled"
52 );
53 builder = Ok(builder?.header(header::DEBUG_ENABLED, "true"))
54 }
55 builder
56 }
57 None => {
58 error!("No valid telemetry endpoint found, cannot build request");
59 Err(anyhow::Error::msg(
60 "no valid endpoint found, can't build the request".to_string(),
61 ))
62 }
63 }
64}
65
66pub fn from_config(c: &Config) -> Box<dyn HttpClient + Sync + Send> {
67 match &c.endpoint {
68 Some(e) if e.url.scheme_str() == Some("file") => {
69 #[allow(clippy::expect_used)]
70 let file_path = libdd_common::decode_uri_path_in_authority(&e.url)
71 .expect("file urls should always have been encoded in authority");
72 debug!(
73 file.path = ?file_path,
74 "Using file-based mock telemetry client"
75 );
76 return Box::new(MockClient {
77 #[allow(clippy::expect_used)]
78 file: Arc::new(Mutex::new(Box::new(
79 OpenOptions::new()
80 .create(true)
81 .append(true)
82 .open(file_path.as_path())
83 .expect("Couldn't open mock client file"),
84 ))),
85 });
86 }
87 Some(e) => {
88 debug!(
89 endpoint.url = %e.url,
90 endpoint.timeout_ms = e.timeout_ms,
91 "Using HTTP telemetry client"
92 );
93 }
94 None => {
95 debug!(
96 endpoint = "default",
97 "No telemetry endpoint configured, using default HTTP client"
98 );
99 }
100 };
101 Box::new(HyperClient {
102 inner: hyper_migration::new_client_periodic(),
103 })
104}
105
106pub struct HyperClient {
107 inner: libdd_common::HttpClient,
108}
109
110impl HttpClient for HyperClient {
111 fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture {
112 let resp = self.inner.request(req);
113 Box::pin(async move {
114 match resp.await {
115 Ok(response) => Ok(hyper_migration::into_response(response)),
116 Err(e) => Err(e.into()),
117 }
118 })
119 }
120}
121
122#[derive(Clone)]
123pub struct MockClient {
124 file: Arc<Mutex<Box<dyn Write + Sync + Send>>>,
125}
126
127impl HttpClient for MockClient {
128 fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture {
129 let s = self.clone();
130 Box::pin(async move {
131 debug!("MockClient writing request to file");
132 let mut body = req.collect().await?.to_bytes().to_vec();
133 body.push(b'\n');
134
135 {
136 #[allow(clippy::expect_used)]
137 let mut writer = s.file.lock().expect("mutex poisoned");
138
139 match writer.write_all(body.as_ref()) {
140 Ok(()) => debug!(
141 file.bytes_written = body.len(),
142 "Successfully wrote payload to mock file"
143 ),
144 Err(e) => {
145 error!(
146 error = %e,
147 "Failed to write to mock file"
148 );
149 return Err(hyper_migration::Error::from(e));
150 }
151 }
152 }
153
154 debug!(http.status = 202, "MockClient returning success response");
155 hyper_migration::empty_response(hyper::Response::builder().status(202))
156 })
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use libdd_common::HttpRequestBuilder;
163
164 use super::*;
165
166 #[tokio::test]
167 #[cfg_attr(miri, ignore)]
168 async fn test_mock_client() {
169 let output: Vec<u8> = Vec::new();
170 let c = MockClient {
171 file: Arc::new(Mutex::new(Box::new(output))),
172 };
173 c.request(
174 HttpRequestBuilder::new()
175 .body(hyper_migration::Body::from("hello world\n"))
176 .unwrap(),
177 )
178 .await
179 .unwrap();
180 }
181}