flexi_logger_datadog/
writer.rs

1//! Writer task that posts data to the api
2
3use crate::error::Error::ChannelError;
4use crate::error::{log_error, Error};
5use crate::DataDogConfig;
6use chrono::{DateTime, Duration, Utc};
7use flume::RecvTimeoutError;
8use itertools::Itertools;
9use log::debug;
10use reqwest::header::CONTENT_TYPE;
11use reqwest::Client;
12use std::time;
13
14/// Default channel recv timeout
15const POLL_TIMEOUT_MS: u64 = 100;
16
17/// API writer
18pub struct DataDogHttpWriter {
19    /// HTTP client
20    client: Client,
21    /// DataDog api url
22    api_host: String,
23    /// DataDog api key
24    api_key: String,
25    /// Query path
26    query: Vec<(String, String)>,
27    /// Maximum log lines in a single request
28    max_log_lines: usize,
29    /// Maximum allowed request size
30    max_payload_size: usize,
31    /// How often to flush writer (never if [`None`])
32    flush_interval: Option<Duration>,
33    /// When logs were last flushed
34    last_flushed: DateTime<Utc>,
35    /// Log receiver
36    logs: flume::Receiver<String>,
37    /// Flush request receiver
38    flush_request: flume::Receiver<()>,
39    /// Flush response sender
40    flush_response: flume::Sender<Result<(), Error>>,
41    /// Log buffer
42    buffer_lines: Vec<String>,
43    /// Size of buffer
44    buffer_size: usize,
45}
46
47impl DataDogHttpWriter {
48    /// Create new [`DataDogHttpWriter`]
49    pub fn new(
50        datadog_config: DataDogConfig,
51        flush_interval: Option<Duration>,
52        logs: flume::Receiver<String>,
53        flush_request: flume::Receiver<()>,
54        flush_response: flume::Sender<Result<(), Error>>,
55    ) -> Self {
56        let query = vec![
57            ("host".to_string(), datadog_config.hostname),
58            ("service".to_string(), datadog_config.service),
59            ("ddsource".to_string(), datadog_config.source),
60            (
61                "ddtags".to_string(),
62                datadog_config
63                    .tags
64                    .into_iter()
65                    .map(|(k, v)| format!("{}:{}", k, v))
66                    .join(","),
67            ),
68        ];
69        Self {
70            client: Client::default(),
71            api_host: datadog_config.api_host,
72            api_key: datadog_config.api_key,
73            query,
74            max_log_lines: datadog_config.max_log_lines,
75            max_payload_size: datadog_config.max_payload_size,
76            flush_interval,
77            last_flushed: Utc::now(),
78            logs,
79            flush_request,
80            flush_response,
81            buffer_lines: vec![],
82            buffer_size: 0,
83        }
84    }
85
86    /// Writer poll loop.
87    ///
88    /// This is what drives the actual execution of the logger
89    pub async fn poll(&mut self) {
90        let timeout = time::Duration::from_millis(POLL_TIMEOUT_MS);
91        loop {
92            // Check if a flush is necessary
93            if let Err(e) = self.time_based_flush().await {
94                log_error(e);
95            }
96
97            // Retrieve and handle any new log messages
98            match self.receive_logs(timeout).await {
99                Ok(true) => (),
100                Ok(false) => break,
101                Err(e) => log_error(e),
102            }
103
104            // Check for any flush requests
105            match self.receive_flush(timeout).await {
106                Ok(true) => (),
107                Ok(false) => break,
108                Err(e) => log_error(e),
109            }
110        }
111
112        // Loop has been exited here from one or all of the channels closing
113        // Drain and handle any remaining messages from the log channel and flush one last time
114        if let Err(e) = self.drain().await {
115            log_error(e);
116        }
117        if let Err(e) = self.flush().await {
118            log_error(e);
119        }
120    }
121
122    /// Receive and process any incoming log lines
123    async fn receive_logs(&mut self, timeout: time::Duration) -> Result<bool, Error> {
124        match self.logs.recv_timeout(timeout) {
125            Ok(l) => {
126                self.on_message(l).await;
127                self.check_flush().await?;
128                Ok(true)
129            }
130            Err(RecvTimeoutError::Timeout) => Ok(true),
131            Err(RecvTimeoutError::Disconnected) => Ok(false),
132        }
133    }
134
135    /// Receive and process any incoming flush requests
136    async fn receive_flush(&mut self, timeout: time::Duration) -> Result<bool, Error> {
137        match self.flush_request.recv_timeout(timeout / 2) {
138            Ok(_) => {
139                // On flush request, perform a flush and send the result back over the channel
140                self.drain().await?;
141                let flush_result = self.flush().await.map_err(|e| {
142                    eprintln!("Failed to flush logs: {}", e);
143                    e
144                });
145                self.flush_response
146                    .send(flush_result)
147                    .map_err(|e| ChannelError(format!("Failed to send flush response: {}", e)))?;
148                Ok(true)
149            }
150            Err(RecvTimeoutError::Timeout) => Ok(true),
151            Err(RecvTimeoutError::Disconnected) => Ok(false),
152        }
153    }
154
155    /// Handle incoming log line
156    async fn on_message(&mut self, message: String) {
157        self.buffer_size += message.as_bytes().len();
158        self.buffer_lines.push(message);
159    }
160
161    /// Flush log lines in buffer
162    async fn flush(&mut self) -> Result<(), Error> {
163        if self.buffer_size > 0 {
164            debug!("Flushing logger");
165            self.send().await?;
166            self.buffer_lines = vec![];
167            self.buffer_size = 0;
168            self.last_flushed = Utc::now();
169        }
170        Ok(())
171    }
172
173    /// Post data to api
174    async fn send(&mut self) -> Result<(), Error> {
175        debug!("Sending {} log lines", self.buffer_lines.len());
176        match self
177            .client
178            .post(&self.api_host)
179            .query(&self.query)
180            .header("DD-API-KEY", &self.api_key)
181            .header(CONTENT_TYPE, "text/plain")
182            .body(self.buffer_lines.join("\n"))
183            .send()
184            .await
185        {
186            Ok(r) => {
187                r.error_for_status()?;
188                Ok(())
189            }
190            Err(e) => Err(e.into()),
191        }
192    }
193
194    /// Check if flush interval has elapsed since last send, and flush if so
195    async fn time_based_flush(&mut self) -> Result<(), Error> {
196        if let Some(d) = self.flush_interval {
197            if Utc::now() > self.last_flushed + d {
198                self.flush().await?;
199            }
200        }
201        Ok(())
202    }
203
204    /// Drain and handle any messages on the log channel
205    async fn drain(&mut self) -> Result<(), Error> {
206        let drained = self.logs.drain().collect_vec();
207        for message in drained {
208            self.on_message(message).await;
209            self.check_flush().await?;
210        }
211        Ok(())
212    }
213
214    /// Check if buffer
215    async fn check_flush(&mut self) -> Result<(), Error> {
216        if self.buffer_lines.len() == self.max_log_lines {
217            self.flush().await
218        } else if self.buffer_size >= self.max_payload_size {
219            self.flush().await
220        } else {
221            Ok(())
222        }
223    }
224}