flexi_logger_datadog/
writer.rs1use crate::error::Error::ChannelError;
4use crate::error::{log_error, Error};
5use crate::DataDogConfig;
6use chrono::{DateTime, Duration, Utc};
7use flume::RecvTimeoutError;
8use itertools::Itertools;
9use log::debug;
10use reqwest::header::CONTENT_TYPE;
11use reqwest::Client;
12use std::time;
13
14const POLL_TIMEOUT_MS: u64 = 100;
16
17pub struct DataDogHttpWriter {
19 client: Client,
21 api_host: String,
23 api_key: String,
25 query: Vec<(String, String)>,
27 max_log_lines: usize,
29 max_payload_size: usize,
31 flush_interval: Option<Duration>,
33 last_flushed: DateTime<Utc>,
35 logs: flume::Receiver<String>,
37 flush_request: flume::Receiver<()>,
39 flush_response: flume::Sender<Result<(), Error>>,
41 buffer_lines: Vec<String>,
43 buffer_size: usize,
45}
46
47impl DataDogHttpWriter {
48 pub fn new(
50 datadog_config: DataDogConfig,
51 flush_interval: Option<Duration>,
52 logs: flume::Receiver<String>,
53 flush_request: flume::Receiver<()>,
54 flush_response: flume::Sender<Result<(), Error>>,
55 ) -> Self {
56 let query = vec![
57 ("host".to_string(), datadog_config.hostname),
58 ("service".to_string(), datadog_config.service),
59 ("ddsource".to_string(), datadog_config.source),
60 (
61 "ddtags".to_string(),
62 datadog_config
63 .tags
64 .into_iter()
65 .map(|(k, v)| format!("{}:{}", k, v))
66 .join(","),
67 ),
68 ];
69 Self {
70 client: Client::default(),
71 api_host: datadog_config.api_host,
72 api_key: datadog_config.api_key,
73 query,
74 max_log_lines: datadog_config.max_log_lines,
75 max_payload_size: datadog_config.max_payload_size,
76 flush_interval,
77 last_flushed: Utc::now(),
78 logs,
79 flush_request,
80 flush_response,
81 buffer_lines: vec![],
82 buffer_size: 0,
83 }
84 }
85
86 pub async fn poll(&mut self) {
90 let timeout = time::Duration::from_millis(POLL_TIMEOUT_MS);
91 loop {
92 if let Err(e) = self.time_based_flush().await {
94 log_error(e);
95 }
96
97 match self.receive_logs(timeout).await {
99 Ok(true) => (),
100 Ok(false) => break,
101 Err(e) => log_error(e),
102 }
103
104 match self.receive_flush(timeout).await {
106 Ok(true) => (),
107 Ok(false) => break,
108 Err(e) => log_error(e),
109 }
110 }
111
112 if let Err(e) = self.drain().await {
115 log_error(e);
116 }
117 if let Err(e) = self.flush().await {
118 log_error(e);
119 }
120 }
121
122 async fn receive_logs(&mut self, timeout: time::Duration) -> Result<bool, Error> {
124 match self.logs.recv_timeout(timeout) {
125 Ok(l) => {
126 self.on_message(l).await;
127 self.check_flush().await?;
128 Ok(true)
129 }
130 Err(RecvTimeoutError::Timeout) => Ok(true),
131 Err(RecvTimeoutError::Disconnected) => Ok(false),
132 }
133 }
134
135 async fn receive_flush(&mut self, timeout: time::Duration) -> Result<bool, Error> {
137 match self.flush_request.recv_timeout(timeout / 2) {
138 Ok(_) => {
139 self.drain().await?;
141 let flush_result = self.flush().await.map_err(|e| {
142 eprintln!("Failed to flush logs: {}", e);
143 e
144 });
145 self.flush_response
146 .send(flush_result)
147 .map_err(|e| ChannelError(format!("Failed to send flush response: {}", e)))?;
148 Ok(true)
149 }
150 Err(RecvTimeoutError::Timeout) => Ok(true),
151 Err(RecvTimeoutError::Disconnected) => Ok(false),
152 }
153 }
154
155 async fn on_message(&mut self, message: String) {
157 self.buffer_size += message.as_bytes().len();
158 self.buffer_lines.push(message);
159 }
160
161 async fn flush(&mut self) -> Result<(), Error> {
163 if self.buffer_size > 0 {
164 debug!("Flushing logger");
165 self.send().await?;
166 self.buffer_lines = vec![];
167 self.buffer_size = 0;
168 self.last_flushed = Utc::now();
169 }
170 Ok(())
171 }
172
173 async fn send(&mut self) -> Result<(), Error> {
175 debug!("Sending {} log lines", self.buffer_lines.len());
176 match self
177 .client
178 .post(&self.api_host)
179 .query(&self.query)
180 .header("DD-API-KEY", &self.api_key)
181 .header(CONTENT_TYPE, "text/plain")
182 .body(self.buffer_lines.join("\n"))
183 .send()
184 .await
185 {
186 Ok(r) => {
187 r.error_for_status()?;
188 Ok(())
189 }
190 Err(e) => Err(e.into()),
191 }
192 }
193
194 async fn time_based_flush(&mut self) -> Result<(), Error> {
196 if let Some(d) = self.flush_interval {
197 if Utc::now() > self.last_flushed + d {
198 self.flush().await?;
199 }
200 }
201 Ok(())
202 }
203
204 async fn drain(&mut self) -> Result<(), Error> {
206 let drained = self.logs.drain().collect_vec();
207 for message in drained {
208 self.on_message(message).await;
209 self.check_flush().await?;
210 }
211 Ok(())
212 }
213
214 async fn check_flush(&mut self) -> Result<(), Error> {
216 if self.buffer_lines.len() == self.max_log_lines {
217 self.flush().await
218 } else if self.buffer_size >= self.max_payload_size {
219 self.flush().await
220 } else {
221 Ok(())
222 }
223 }
224}