Skip to main content

libdd_data_pipeline/agent_info/
fetcher.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3//! Provides utilities to fetch the agent /info endpoint and an automatic fetcher to keep info
4//! up-to-date
5
6use super::{schema::AgentInfo, AGENT_INFO_CACHE};
7use anyhow::{anyhow, Result};
8use http::header::HeaderName;
9use http_body_util::BodyExt;
10use libdd_common::{http_common, worker::Worker, Endpoint};
11use sha2::{Digest, Sha256};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tokio::time::sleep;
16use tracing::{debug, warn};
17
18/// HTTP header containing the agent state hash.
19const DATADOG_AGENT_STATE: HeaderName = HeaderName::from_static("datadog-agent-state");
20/// Whether the agent reported the same value or not.
21#[derive(Debug)]
22pub enum FetchInfoStatus {
23    /// Unchanged
24    SameState,
25    /// Has a new state
26    NewState(Box<AgentInfo>),
27}
28
29/// Fetch info from the given info_endpoint and compare its state to the current state hash.
30///
31/// If the state hash is different from the current one:
32/// - Return a `FetchInfoStatus::NewState` of the info struct
33/// - Else return `FetchInfoStatus::SameState`
34pub async fn fetch_info_with_state(
35    info_endpoint: &Endpoint,
36    current_state_hash: Option<&str>,
37) -> Result<FetchInfoStatus> {
38    let (new_state_hash, body_data) = fetch_and_hash_response(info_endpoint).await?;
39
40    if current_state_hash.is_some_and(|state| state == new_state_hash) {
41        return Ok(FetchInfoStatus::SameState);
42    }
43
44    let info = Box::new(AgentInfo {
45        state_hash: new_state_hash,
46        info: serde_json::from_slice(&body_data)?,
47    });
48    Ok(FetchInfoStatus::NewState(info))
49}
50
51/// Fetch the info endpoint once and return the info.
52///
53/// Can be used for one-time access to the agent's info. If you need to access the info several
54/// times use `AgentInfoFetcher` to keep the info up-to-date.
55///
56/// # Example
57/// ```no_run
58/// # use anyhow::Result;
59/// # #[tokio::main]
60/// # async fn main() -> Result<()> {
61/// // Define the endpoint
62/// let endpoint = libdd_common::Endpoint::from_url("http://localhost:8126/info".parse().unwrap());
63/// // Fetch the info
64/// let agent_info = libdd_data_pipeline::agent_info::fetch_info(&endpoint)
65///     .await
66///     .unwrap();
67/// println!("Agent version is {}", agent_info.info.version.unwrap());
68/// # Ok(())
69/// # }
70/// ```
71pub async fn fetch_info(info_endpoint: &Endpoint) -> Result<Box<AgentInfo>> {
72    match fetch_info_with_state(info_endpoint, None).await? {
73        FetchInfoStatus::NewState(info) => Ok(info),
74        // Should never be reached since there is no previous state.
75        FetchInfoStatus::SameState => Err(anyhow!("Invalid state header")),
76    }
77}
78
79/// Fetch and hash the response from the agent info endpoint.
80///
81/// Returns a tuple of (state_hash, response_body_bytes).
82/// The hash is calculated using SHA256 to match the agent's calculation method.
83async fn fetch_and_hash_response(info_endpoint: &Endpoint) -> Result<(String, bytes::Bytes)> {
84    let req = info_endpoint
85        .to_request_builder(concat!("Libdatadog/", env!("CARGO_PKG_VERSION")))?
86        .method(http::Method::GET)
87        .body(http_common::Body::empty());
88    let client = http_common::new_default_client();
89    let res = client.request(req?).await?;
90
91    let body_bytes = res.into_body().collect().await?;
92    let body_data = body_bytes.to_bytes();
93    let hash = format!("{:x}", Sha256::digest(&body_data));
94
95    Ok((hash, body_data))
96}
97
98/// Fetch the info endpoint and update an ArcSwap keeping it up-to-date.
99///
100/// Once the run method has been started, the fetcher will
101/// update the global info state based on the given refresh interval. You can access the current
102/// state with [`crate::agent_info::get_agent_info`]
103///
104/// # Response observer
105/// When the fetcher is created it also returns a [`ResponseObserver`] which can be used to check
106/// the `Datadog-Agent-State` header of an agent response and trigger early refresh if a new state
107/// is detected.
108///
109/// # Example
110/// ```no_run
111/// # use anyhow::Result;
112/// # use libdd_common::worker::Worker;
113/// # #[tokio::main]
114/// # async fn main() -> Result<()> {
115/// // Define the endpoint
116/// use libdd_data_pipeline::agent_info;
117/// let endpoint = libdd_common::Endpoint::from_url("http://localhost:8126/info".parse().unwrap());
118/// // Create the fetcher
119/// let (mut fetcher, _response_observer) = libdd_data_pipeline::agent_info::AgentInfoFetcher::new(
120///     endpoint,
121///     std::time::Duration::from_secs(5 * 60),
122/// );
123/// // Start the runner
124/// tokio::spawn(async move {
125///     fetcher.run().await;
126/// });
127///
128/// // Get the Arc to access the info
129/// let agent_info_arc = agent_info::get_agent_info();
130///
131/// // Access the info
132/// if let Some(agent_info) = agent_info_arc.as_ref() {
133///     println!(
134///         "Agent version is {}",
135///         agent_info.info.version.as_ref().unwrap()
136///     );
137/// }
138/// # Ok(())
139/// # }
140/// ```
141#[derive(Debug)]
142pub struct AgentInfoFetcher {
143    info_endpoint: Endpoint,
144    refresh_interval: Duration,
145    trigger_rx: Option<mpsc::Receiver<()>>,
146}
147
148impl AgentInfoFetcher {
149    /// Return a new `AgentInfoFetcher` fetching the `info_endpoint` on each `refresh_interval`
150    /// and updating the stored info.
151    ///
152    /// Returns a tuple of (fetcher, trigger_component) where:
153    /// - `fetcher`: The AgentInfoFetcher to be run in a background task
154    /// - `response_observer`: The ResponseObserver component for checking HTTP responses
155    pub fn new(info_endpoint: Endpoint, refresh_interval: Duration) -> (Self, ResponseObserver) {
156        // The trigger channel stores a single message to avoid multiple triggers.
157        let (trigger_tx, trigger_rx) = mpsc::channel(1);
158
159        let fetcher = Self {
160            info_endpoint,
161            refresh_interval,
162            trigger_rx: Some(trigger_rx),
163        };
164
165        let response_observer = ResponseObserver::new(trigger_tx);
166
167        (fetcher, response_observer)
168    }
169
170    /// Drain message from the trigger channel.
171    pub fn drain(&mut self) {
172        // We read only once as the channel has a capacity of 1
173        if let Some(rx) = &mut self.trigger_rx {
174            let _ = rx.try_recv();
175        }
176    }
177}
178
179impl Worker for AgentInfoFetcher {
180    /// Start fetching the info endpoint with the given interval.
181    ///
182    /// # Warning
183    /// This method does not return and should be called within a dedicated task.
184    async fn run(&mut self) {
185        // Skip the first fetch if some info is present to avoid calling the /info endpoint
186        // at fork for heavy-forking environment.
187        if AGENT_INFO_CACHE.load().is_none() {
188            self.fetch_and_update().await;
189        }
190
191        // Main loop waiting for a trigger event or the end of the refresh interval to trigger the
192        // fetch.
193        loop {
194            match &mut self.trigger_rx {
195                Some(trigger_rx) => {
196                    tokio::select! {
197                        // Wait for manual trigger (new state from headers)
198                        trigger = trigger_rx.recv() => {
199                            if trigger.is_some() {
200                                self.fetch_and_update().await;
201                            } else {
202                                // The channel has been closed
203                                self.trigger_rx = None;
204                            }
205                        }
206                        // Regular periodic fetch timer
207                        _ = sleep(self.refresh_interval) => {
208                            self.fetch_and_update().await;
209                        }
210                    };
211                }
212                None => {
213                    // If the trigger channel is closed we only use timed fetch.
214                    sleep(self.refresh_interval).await;
215                    self.fetch_and_update().await;
216                }
217            }
218        }
219    }
220}
221
222impl AgentInfoFetcher {
223    /// Fetch agent info and update cache if needed
224    async fn fetch_and_update(&self) {
225        let current_info = AGENT_INFO_CACHE.load();
226        let current_hash = current_info.as_ref().map(|info| info.state_hash.as_str());
227        let res = fetch_info_with_state(&self.info_endpoint, current_hash).await;
228        match res {
229            Ok(FetchInfoStatus::NewState(new_info)) => {
230                debug!("New /info state received");
231                AGENT_INFO_CACHE.store(Some(Arc::new(*new_info)));
232            }
233            Ok(FetchInfoStatus::SameState) => {
234                debug!("Agent info is up-to-date")
235            }
236            Err(err) => {
237                warn!(?err, "Error while fetching /info");
238            }
239        }
240    }
241}
242
243/// Component for observing HTTP responses and triggering agent info fetches.
244///
245/// This component checks HTTP responses for the `Datadog-Agent-State` header and
246/// sends trigger messages to the agent info fetcher when a new state is detected.
247#[derive(Debug, Clone)]
248pub struct ResponseObserver {
249    trigger_tx: mpsc::Sender<()>,
250}
251
252impl ResponseObserver {
253    /// Create a new ResponseObserver with the given channel sender.
254    pub fn new(trigger_tx: mpsc::Sender<()>) -> Self {
255        Self { trigger_tx }
256    }
257
258    /// Check the given HTTP response for agent state changes and trigger a fetch if needed.
259    ///
260    /// This method examines the `Datadog-Agent-State` header in the response and compares
261    /// it with the previously seen state. If the state has changed, it sends a trigger
262    /// message to the agent info fetcher.
263    pub fn check_response(&self, response: &http_common::HttpResponse) {
264        if let Some(agent_state) = response.headers().get(DATADOG_AGENT_STATE) {
265            if let Ok(state_str) = agent_state.to_str() {
266                let current_state = AGENT_INFO_CACHE.load();
267                if current_state.as_ref().map(|s| s.state_hash.as_str()) != Some(state_str) {
268                    match self.trigger_tx.try_send(()) {
269                        Ok(_) => {}
270                        Err(mpsc::error::TrySendError::Full(_)) => {
271                            debug!(
272                                "Response observer channel full, fetch has already been triggered"
273                            );
274                        }
275                        Err(mpsc::error::TrySendError::Closed(_)) => {
276                            debug!("Agent info fetcher channel closed, unable to trigger refresh");
277                        }
278                    }
279                }
280            }
281        }
282    }
283
284    /// Manually send a message to the trigger channel.
285    pub fn manual_trigger(&self) {
286        let _ = self.trigger_tx.try_send(());
287    }
288}
289
290#[cfg(test)]
291mod single_threaded_tests {
292    use super::*;
293    use crate::agent_info;
294    use httpmock::prelude::*;
295
296    const TEST_INFO: &str = r#"{
297        "version": "0.0.0",
298        "git_commit": "0101010",
299        "endpoints": [
300                "/v0.4/traces",
301                "/v0.6/stats"
302        ],
303        "client_drop_p0s": true,
304        "span_meta_structs": true,
305        "long_running_spans": true,
306        "evp_proxy_allowed_headers": [
307                "Content-Type",
308                "Accept-Encoding"
309        ],
310        "config": {
311                "default_env": "none",
312                "target_tps": 10,
313                "max_eps": 200,
314                "receiver_port": 8126,
315                "receiver_socket": "",
316                "connection_limit": 0,
317                "receiver_timeout": 0,
318                "max_request_bytes": 26214400,
319                "statsd_port": 8125,
320                "max_memory": 0,
321                "max_cpu": 0,
322                "analyzed_spans_by_service": {},
323                "obfuscation": {
324                        "elastic_search": true,
325                        "mongo": true,
326                        "sql_exec_plan": false,
327                        "sql_exec_plan_normalize": false,
328                        "http": {
329                                "remove_query_string": false,
330                                "remove_path_digits": false
331                        },
332                        "remove_stack_traces": false,
333                        "redis": {
334                                "Enabled": true,
335                                "RemoveAllArgs": false
336                        },
337                        "memcached": {
338                                "Enabled": true,
339                                "KeepCommand": false
340                        }
341                }
342        },
343        "peer_tags": ["db.hostname","http.host","aws.s3.bucket"]
344    }"#;
345
346    fn calculate_hash(json: &str) -> String {
347        format!("{:x}", Sha256::digest(json.as_bytes()))
348    }
349
350    const TEST_INFO_HASH: &str = "b7709671827946c15603847bca76c90438579c038ec134eae19c51f1f3e3dfea";
351
352    #[cfg_attr(miri, ignore)]
353    #[tokio::test]
354    async fn test_fetch_info_without_state() {
355        let server = MockServer::start();
356        let mock = server
357            .mock_async(|when, then| {
358                when.path("/info");
359                then.status(200)
360                    .header("content-type", "application/json")
361                    .body(TEST_INFO);
362            })
363            .await;
364        let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
365
366        let info_status = fetch_info_with_state(&endpoint, None).await.unwrap();
367        mock.assert();
368        assert!(
369            matches!(info_status, FetchInfoStatus::NewState(info) if *info == AgentInfo {
370                        state_hash: TEST_INFO_HASH.to_string(),
371                        info: serde_json::from_str(TEST_INFO).unwrap(),
372                    }
373            )
374        );
375    }
376
377    #[cfg_attr(miri, ignore)]
378    #[tokio::test]
379    async fn test_fetch_info_with_state() {
380        let server = MockServer::start();
381        let mock = server
382            .mock_async(|when, then| {
383                when.path("/info");
384                then.status(200)
385                    .header("content-type", "application/json")
386                    .body(TEST_INFO);
387            })
388            .await;
389        let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
390
391        let new_state_info_status = fetch_info_with_state(&endpoint, Some("state"))
392            .await
393            .unwrap();
394        let same_state_info_status = fetch_info_with_state(&endpoint, Some(TEST_INFO_HASH))
395            .await
396            .unwrap();
397
398        mock.assert_calls(2);
399        assert!(
400            matches!(new_state_info_status, FetchInfoStatus::NewState(info) if *info == AgentInfo {
401                        state_hash: TEST_INFO_HASH.to_string(),
402                        info: serde_json::from_str(TEST_INFO).unwrap(),
403                    }
404            )
405        );
406        assert!(matches!(same_state_info_status, FetchInfoStatus::SameState));
407    }
408
409    #[cfg_attr(miri, ignore)]
410    #[tokio::test]
411    async fn test_fetch_info() {
412        let server = MockServer::start();
413        let mock = server
414            .mock_async(|when, then| {
415                when.path("/info");
416                then.status(200)
417                    .header("content-type", "application/json")
418                    .body(TEST_INFO);
419            })
420            .await;
421        let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
422
423        let agent_info = fetch_info(&endpoint).await.unwrap();
424        mock.assert();
425        assert_eq!(
426            *agent_info,
427            AgentInfo {
428                state_hash: TEST_INFO_HASH.to_string(),
429                info: serde_json::from_str(TEST_INFO).unwrap(),
430            }
431        );
432    }
433
434    #[cfg_attr(miri, ignore)]
435    #[tokio::test]
436    async fn test_agent_info_fetcher_run() {
437        AGENT_INFO_CACHE.store(None);
438        let server = MockServer::start();
439        let mock_v1 = server
440            .mock_async(|when, then| {
441                when.path("/info");
442                then.status(200)
443                    .header("content-type", "application/json")
444                    .body(r#"{"version":"1"}"#);
445            })
446            .await;
447        let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
448        let (mut fetcher, _response_observer) =
449            AgentInfoFetcher::new(endpoint.clone(), Duration::from_millis(100));
450        assert!(agent_info::get_agent_info().is_none());
451        tokio::spawn(async move {
452            fetcher.run().await;
453        });
454
455        // Wait until the info is fetched
456        while agent_info::get_agent_info().is_none() {
457            tokio::time::sleep(Duration::from_millis(100)).await;
458        }
459
460        let version_1 = agent_info::get_agent_info()
461            .as_ref()
462            .unwrap()
463            .info
464            .version
465            .clone()
466            .unwrap();
467        assert_eq!(version_1, "1");
468        mock_v1.assert_async().await;
469
470        // Update the info endpoint
471        mock_v1.delete_async().await;
472        let mock_v2 = server
473            .mock_async(|when, then| {
474                when.path("/info");
475                then.status(200)
476                    .header("content-type", "application/json")
477                    .body(r#"{"version":"2"}"#);
478            })
479            .await;
480
481        // Wait for second fetch
482        while mock_v2.calls_async().await == 0 {
483            tokio::time::sleep(Duration::from_millis(100)).await;
484        }
485
486        // This check is not 100% deterministic, but between the time the mock returns the response
487        // and we swap the atomic pointer holding the agent_info we only need to perform
488        // very few operations. We wait for a maximum of 1s before failing the test and that should
489        // give way more time than necessary.
490        for _ in 0..10 {
491            let version_2 = agent_info::get_agent_info()
492                .as_ref()
493                .unwrap()
494                .info
495                .version
496                .clone()
497                .unwrap();
498            if version_2 != version_1 {
499                assert_eq!(version_2, "2");
500                break;
501            }
502            tokio::time::sleep(Duration::from_millis(100)).await;
503        }
504    }
505
506    #[cfg_attr(miri, ignore)]
507    #[tokio::test]
508    async fn test_agent_info_trigger_different_state() {
509        let server = MockServer::start();
510        let mock = server
511            .mock_async(|when, then| {
512                when.path("/info");
513                then.status(200)
514                    .header("content-type", "application/json")
515                    .body(r#"{"version":"triggered"}"#);
516            })
517            .await;
518
519        // Populate the cache with initial state
520        AGENT_INFO_CACHE.store(Some(Arc::new(AgentInfo {
521            state_hash: "old_state".to_string(),
522            info: serde_json::from_str(r#"{"version":"old"}"#).unwrap(),
523        })));
524
525        let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
526        let (mut fetcher, response_observer) =
527            // Interval is too long to fetch during the test
528            AgentInfoFetcher::new(endpoint, Duration::from_secs(3600));
529
530        tokio::spawn(async move {
531            fetcher.run().await;
532        });
533
534        // Create a mock HTTP response with the new agent state
535        let response = http_common::empty_response(
536            http::Response::builder()
537                .status(200)
538                .header("datadog-agent-state", "new_state"),
539        )
540        .unwrap();
541
542        // Use the trigger component to check the response
543        response_observer.check_response(&response);
544
545        // Wait for the fetch to complete
546        const MAX_ATTEMPTS: u32 = 500;
547        const SLEEP_DURATION_MS: u64 = 10;
548
549        let mut attempts = 0;
550        while mock.calls_async().await == 0 && attempts < MAX_ATTEMPTS {
551            attempts += 1;
552            tokio::time::sleep(Duration::from_millis(SLEEP_DURATION_MS)).await;
553        }
554
555        // Should trigger a fetch since the state is different
556        mock.assert_calls_async(1).await;
557
558        // Wait for the cache to be updated with proper timeout
559        let mut attempts = 0;
560        let expected_hash = calculate_hash(r#"{"version":"triggered"}"#);
561
562        while attempts < MAX_ATTEMPTS {
563            let updated_info = AGENT_INFO_CACHE.load();
564            if let Some(info) = updated_info.as_ref() {
565                if info.state_hash == expected_hash {
566                    break;
567                }
568            }
569            attempts += 1;
570            tokio::time::sleep(Duration::from_millis(SLEEP_DURATION_MS)).await;
571        }
572
573        // Verify the cache was updated
574        let updated_info = AGENT_INFO_CACHE.load();
575        assert!(updated_info.is_some());
576        assert_eq!(updated_info.as_ref().unwrap().state_hash, expected_hash);
577        assert_eq!(
578            updated_info
579                .as_ref()
580                .unwrap()
581                .info
582                .version
583                .as_ref()
584                .unwrap(),
585            "triggered"
586        );
587    }
588
589    #[cfg_attr(miri, ignore)]
590    #[tokio::test]
591    async fn test_agent_info_trigger_same_state() {
592        let server = MockServer::start();
593        let mock = server
594            .mock_async(|when, then| {
595                when.path("/info");
596                then.status(200)
597                    .header("content-type", "application/json")
598                    .body(r#"{"version":"same"}"#);
599            })
600            .await;
601
602        let same_json = r#"{"version":"same"}"#;
603        let same_hash = calculate_hash(same_json);
604
605        // Populate the cache with the same state
606        AGENT_INFO_CACHE.store(Some(Arc::new(AgentInfo {
607            state_hash: same_hash.clone(),
608            info: serde_json::from_str(same_json).unwrap(),
609        })));
610
611        let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
612        let (mut fetcher, response_observer) =
613            AgentInfoFetcher::new(endpoint, Duration::from_secs(3600)); // Very long interval
614
615        tokio::spawn(async move {
616            fetcher.run().await;
617        });
618
619        // Create a mock HTTP response with the same agent state
620        let response = http_common::empty_response(
621            http::Response::builder()
622                .status(200)
623                .header("datadog-agent-state", &same_hash),
624        )
625        .unwrap();
626
627        // Use the trigger component to check the response
628        response_observer.check_response(&response);
629
630        // Wait to ensure no fetch occurs
631        tokio::time::sleep(Duration::from_millis(500)).await;
632
633        // Should not trigger a fetch since the state is the same
634        mock.assert_calls_async(0).await;
635    }
636}