libdd_data_pipeline/agent_info/
fetcher.rs1use super::{schema::AgentInfo, AGENT_INFO_CACHE};
7use anyhow::{anyhow, Result};
8use http::header::HeaderName;
9use http_body_util::BodyExt;
10use libdd_common::{http_common, worker::Worker, Endpoint};
11use sha2::{Digest, Sha256};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tokio::time::sleep;
16use tracing::{debug, warn};
17
18const DATADOG_AGENT_STATE: HeaderName = HeaderName::from_static("datadog-agent-state");
20#[derive(Debug)]
22pub enum FetchInfoStatus {
23 SameState,
25 NewState(Box<AgentInfo>),
27}
28
29pub async fn fetch_info_with_state(
35 info_endpoint: &Endpoint,
36 current_state_hash: Option<&str>,
37) -> Result<FetchInfoStatus> {
38 let (new_state_hash, body_data) = fetch_and_hash_response(info_endpoint).await?;
39
40 if current_state_hash.is_some_and(|state| state == new_state_hash) {
41 return Ok(FetchInfoStatus::SameState);
42 }
43
44 let info = Box::new(AgentInfo {
45 state_hash: new_state_hash,
46 info: serde_json::from_slice(&body_data)?,
47 });
48 Ok(FetchInfoStatus::NewState(info))
49}
50
51pub async fn fetch_info(info_endpoint: &Endpoint) -> Result<Box<AgentInfo>> {
72 match fetch_info_with_state(info_endpoint, None).await? {
73 FetchInfoStatus::NewState(info) => Ok(info),
74 FetchInfoStatus::SameState => Err(anyhow!("Invalid state header")),
76 }
77}
78
79async fn fetch_and_hash_response(info_endpoint: &Endpoint) -> Result<(String, bytes::Bytes)> {
84 let req = info_endpoint
85 .to_request_builder(concat!("Libdatadog/", env!("CARGO_PKG_VERSION")))?
86 .method(http::Method::GET)
87 .body(http_common::Body::empty());
88 let client = http_common::new_default_client();
89 let res = client.request(req?).await?;
90
91 let body_bytes = res.into_body().collect().await?;
92 let body_data = body_bytes.to_bytes();
93 let hash = format!("{:x}", Sha256::digest(&body_data));
94
95 Ok((hash, body_data))
96}
97
98#[derive(Debug)]
142pub struct AgentInfoFetcher {
143 info_endpoint: Endpoint,
144 refresh_interval: Duration,
145 trigger_rx: Option<mpsc::Receiver<()>>,
146}
147
148impl AgentInfoFetcher {
149 pub fn new(info_endpoint: Endpoint, refresh_interval: Duration) -> (Self, ResponseObserver) {
156 let (trigger_tx, trigger_rx) = mpsc::channel(1);
158
159 let fetcher = Self {
160 info_endpoint,
161 refresh_interval,
162 trigger_rx: Some(trigger_rx),
163 };
164
165 let response_observer = ResponseObserver::new(trigger_tx);
166
167 (fetcher, response_observer)
168 }
169
170 pub fn drain(&mut self) {
172 if let Some(rx) = &mut self.trigger_rx {
174 let _ = rx.try_recv();
175 }
176 }
177}
178
179impl Worker for AgentInfoFetcher {
180 async fn run(&mut self) {
185 if AGENT_INFO_CACHE.load().is_none() {
188 self.fetch_and_update().await;
189 }
190
191 loop {
194 match &mut self.trigger_rx {
195 Some(trigger_rx) => {
196 tokio::select! {
197 trigger = trigger_rx.recv() => {
199 if trigger.is_some() {
200 self.fetch_and_update().await;
201 } else {
202 self.trigger_rx = None;
204 }
205 }
206 _ = sleep(self.refresh_interval) => {
208 self.fetch_and_update().await;
209 }
210 };
211 }
212 None => {
213 sleep(self.refresh_interval).await;
215 self.fetch_and_update().await;
216 }
217 }
218 }
219 }
220}
221
222impl AgentInfoFetcher {
223 async fn fetch_and_update(&self) {
225 let current_info = AGENT_INFO_CACHE.load();
226 let current_hash = current_info.as_ref().map(|info| info.state_hash.as_str());
227 let res = fetch_info_with_state(&self.info_endpoint, current_hash).await;
228 match res {
229 Ok(FetchInfoStatus::NewState(new_info)) => {
230 debug!("New /info state received");
231 AGENT_INFO_CACHE.store(Some(Arc::new(*new_info)));
232 }
233 Ok(FetchInfoStatus::SameState) => {
234 debug!("Agent info is up-to-date")
235 }
236 Err(err) => {
237 warn!(?err, "Error while fetching /info");
238 }
239 }
240 }
241}
242
243#[derive(Debug, Clone)]
248pub struct ResponseObserver {
249 trigger_tx: mpsc::Sender<()>,
250}
251
252impl ResponseObserver {
253 pub fn new(trigger_tx: mpsc::Sender<()>) -> Self {
255 Self { trigger_tx }
256 }
257
258 pub fn check_response(&self, response: &http_common::HttpResponse) {
264 if let Some(agent_state) = response.headers().get(DATADOG_AGENT_STATE) {
265 if let Ok(state_str) = agent_state.to_str() {
266 let current_state = AGENT_INFO_CACHE.load();
267 if current_state.as_ref().map(|s| s.state_hash.as_str()) != Some(state_str) {
268 match self.trigger_tx.try_send(()) {
269 Ok(_) => {}
270 Err(mpsc::error::TrySendError::Full(_)) => {
271 debug!(
272 "Response observer channel full, fetch has already been triggered"
273 );
274 }
275 Err(mpsc::error::TrySendError::Closed(_)) => {
276 debug!("Agent info fetcher channel closed, unable to trigger refresh");
277 }
278 }
279 }
280 }
281 }
282 }
283
284 pub fn manual_trigger(&self) {
286 let _ = self.trigger_tx.try_send(());
287 }
288}
289
290#[cfg(test)]
291mod single_threaded_tests {
292 use super::*;
293 use crate::agent_info;
294 use httpmock::prelude::*;
295
296 const TEST_INFO: &str = r#"{
297 "version": "0.0.0",
298 "git_commit": "0101010",
299 "endpoints": [
300 "/v0.4/traces",
301 "/v0.6/stats"
302 ],
303 "client_drop_p0s": true,
304 "span_meta_structs": true,
305 "long_running_spans": true,
306 "evp_proxy_allowed_headers": [
307 "Content-Type",
308 "Accept-Encoding"
309 ],
310 "config": {
311 "default_env": "none",
312 "target_tps": 10,
313 "max_eps": 200,
314 "receiver_port": 8126,
315 "receiver_socket": "",
316 "connection_limit": 0,
317 "receiver_timeout": 0,
318 "max_request_bytes": 26214400,
319 "statsd_port": 8125,
320 "max_memory": 0,
321 "max_cpu": 0,
322 "analyzed_spans_by_service": {},
323 "obfuscation": {
324 "elastic_search": true,
325 "mongo": true,
326 "sql_exec_plan": false,
327 "sql_exec_plan_normalize": false,
328 "http": {
329 "remove_query_string": false,
330 "remove_path_digits": false
331 },
332 "remove_stack_traces": false,
333 "redis": {
334 "Enabled": true,
335 "RemoveAllArgs": false
336 },
337 "memcached": {
338 "Enabled": true,
339 "KeepCommand": false
340 }
341 }
342 },
343 "peer_tags": ["db.hostname","http.host","aws.s3.bucket"]
344 }"#;
345
346 fn calculate_hash(json: &str) -> String {
347 format!("{:x}", Sha256::digest(json.as_bytes()))
348 }
349
350 const TEST_INFO_HASH: &str = "b7709671827946c15603847bca76c90438579c038ec134eae19c51f1f3e3dfea";
351
352 #[cfg_attr(miri, ignore)]
353 #[tokio::test]
354 async fn test_fetch_info_without_state() {
355 let server = MockServer::start();
356 let mock = server
357 .mock_async(|when, then| {
358 when.path("/info");
359 then.status(200)
360 .header("content-type", "application/json")
361 .body(TEST_INFO);
362 })
363 .await;
364 let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
365
366 let info_status = fetch_info_with_state(&endpoint, None).await.unwrap();
367 mock.assert();
368 assert!(
369 matches!(info_status, FetchInfoStatus::NewState(info) if *info == AgentInfo {
370 state_hash: TEST_INFO_HASH.to_string(),
371 info: serde_json::from_str(TEST_INFO).unwrap(),
372 }
373 )
374 );
375 }
376
377 #[cfg_attr(miri, ignore)]
378 #[tokio::test]
379 async fn test_fetch_info_with_state() {
380 let server = MockServer::start();
381 let mock = server
382 .mock_async(|when, then| {
383 when.path("/info");
384 then.status(200)
385 .header("content-type", "application/json")
386 .body(TEST_INFO);
387 })
388 .await;
389 let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
390
391 let new_state_info_status = fetch_info_with_state(&endpoint, Some("state"))
392 .await
393 .unwrap();
394 let same_state_info_status = fetch_info_with_state(&endpoint, Some(TEST_INFO_HASH))
395 .await
396 .unwrap();
397
398 mock.assert_calls(2);
399 assert!(
400 matches!(new_state_info_status, FetchInfoStatus::NewState(info) if *info == AgentInfo {
401 state_hash: TEST_INFO_HASH.to_string(),
402 info: serde_json::from_str(TEST_INFO).unwrap(),
403 }
404 )
405 );
406 assert!(matches!(same_state_info_status, FetchInfoStatus::SameState));
407 }
408
409 #[cfg_attr(miri, ignore)]
410 #[tokio::test]
411 async fn test_fetch_info() {
412 let server = MockServer::start();
413 let mock = server
414 .mock_async(|when, then| {
415 when.path("/info");
416 then.status(200)
417 .header("content-type", "application/json")
418 .body(TEST_INFO);
419 })
420 .await;
421 let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
422
423 let agent_info = fetch_info(&endpoint).await.unwrap();
424 mock.assert();
425 assert_eq!(
426 *agent_info,
427 AgentInfo {
428 state_hash: TEST_INFO_HASH.to_string(),
429 info: serde_json::from_str(TEST_INFO).unwrap(),
430 }
431 );
432 }
433
434 #[cfg_attr(miri, ignore)]
435 #[tokio::test]
436 async fn test_agent_info_fetcher_run() {
437 AGENT_INFO_CACHE.store(None);
438 let server = MockServer::start();
439 let mock_v1 = server
440 .mock_async(|when, then| {
441 when.path("/info");
442 then.status(200)
443 .header("content-type", "application/json")
444 .body(r#"{"version":"1"}"#);
445 })
446 .await;
447 let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
448 let (mut fetcher, _response_observer) =
449 AgentInfoFetcher::new(endpoint.clone(), Duration::from_millis(100));
450 assert!(agent_info::get_agent_info().is_none());
451 tokio::spawn(async move {
452 fetcher.run().await;
453 });
454
455 while agent_info::get_agent_info().is_none() {
457 tokio::time::sleep(Duration::from_millis(100)).await;
458 }
459
460 let version_1 = agent_info::get_agent_info()
461 .as_ref()
462 .unwrap()
463 .info
464 .version
465 .clone()
466 .unwrap();
467 assert_eq!(version_1, "1");
468 mock_v1.assert_async().await;
469
470 mock_v1.delete_async().await;
472 let mock_v2 = server
473 .mock_async(|when, then| {
474 when.path("/info");
475 then.status(200)
476 .header("content-type", "application/json")
477 .body(r#"{"version":"2"}"#);
478 })
479 .await;
480
481 while mock_v2.calls_async().await == 0 {
483 tokio::time::sleep(Duration::from_millis(100)).await;
484 }
485
486 for _ in 0..10 {
491 let version_2 = agent_info::get_agent_info()
492 .as_ref()
493 .unwrap()
494 .info
495 .version
496 .clone()
497 .unwrap();
498 if version_2 != version_1 {
499 assert_eq!(version_2, "2");
500 break;
501 }
502 tokio::time::sleep(Duration::from_millis(100)).await;
503 }
504 }
505
506 #[cfg_attr(miri, ignore)]
507 #[tokio::test]
508 async fn test_agent_info_trigger_different_state() {
509 let server = MockServer::start();
510 let mock = server
511 .mock_async(|when, then| {
512 when.path("/info");
513 then.status(200)
514 .header("content-type", "application/json")
515 .body(r#"{"version":"triggered"}"#);
516 })
517 .await;
518
519 AGENT_INFO_CACHE.store(Some(Arc::new(AgentInfo {
521 state_hash: "old_state".to_string(),
522 info: serde_json::from_str(r#"{"version":"old"}"#).unwrap(),
523 })));
524
525 let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
526 let (mut fetcher, response_observer) =
527 AgentInfoFetcher::new(endpoint, Duration::from_secs(3600));
529
530 tokio::spawn(async move {
531 fetcher.run().await;
532 });
533
534 let response = http_common::empty_response(
536 http::Response::builder()
537 .status(200)
538 .header("datadog-agent-state", "new_state"),
539 )
540 .unwrap();
541
542 response_observer.check_response(&response);
544
545 const MAX_ATTEMPTS: u32 = 500;
547 const SLEEP_DURATION_MS: u64 = 10;
548
549 let mut attempts = 0;
550 while mock.calls_async().await == 0 && attempts < MAX_ATTEMPTS {
551 attempts += 1;
552 tokio::time::sleep(Duration::from_millis(SLEEP_DURATION_MS)).await;
553 }
554
555 mock.assert_calls_async(1).await;
557
558 let mut attempts = 0;
560 let expected_hash = calculate_hash(r#"{"version":"triggered"}"#);
561
562 while attempts < MAX_ATTEMPTS {
563 let updated_info = AGENT_INFO_CACHE.load();
564 if let Some(info) = updated_info.as_ref() {
565 if info.state_hash == expected_hash {
566 break;
567 }
568 }
569 attempts += 1;
570 tokio::time::sleep(Duration::from_millis(SLEEP_DURATION_MS)).await;
571 }
572
573 let updated_info = AGENT_INFO_CACHE.load();
575 assert!(updated_info.is_some());
576 assert_eq!(updated_info.as_ref().unwrap().state_hash, expected_hash);
577 assert_eq!(
578 updated_info
579 .as_ref()
580 .unwrap()
581 .info
582 .version
583 .as_ref()
584 .unwrap(),
585 "triggered"
586 );
587 }
588
589 #[cfg_attr(miri, ignore)]
590 #[tokio::test]
591 async fn test_agent_info_trigger_same_state() {
592 let server = MockServer::start();
593 let mock = server
594 .mock_async(|when, then| {
595 when.path("/info");
596 then.status(200)
597 .header("content-type", "application/json")
598 .body(r#"{"version":"same"}"#);
599 })
600 .await;
601
602 let same_json = r#"{"version":"same"}"#;
603 let same_hash = calculate_hash(same_json);
604
605 AGENT_INFO_CACHE.store(Some(Arc::new(AgentInfo {
607 state_hash: same_hash.clone(),
608 info: serde_json::from_str(same_json).unwrap(),
609 })));
610
611 let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
612 let (mut fetcher, response_observer) =
613 AgentInfoFetcher::new(endpoint, Duration::from_secs(3600)); tokio::spawn(async move {
616 fetcher.run().await;
617 });
618
619 let response = http_common::empty_response(
621 http::Response::builder()
622 .status(200)
623 .header("datadog-agent-state", &same_hash),
624 )
625 .unwrap();
626
627 response_observer.check_response(&response);
629
630 tokio::time::sleep(Duration::from_millis(500)).await;
632
633 mock.assert_calls_async(0).await;
635 }
636}