tracing_betterstack/
export.rs1use std::time::Duration;
2use tokio::sync::mpsc::UnboundedReceiver;
3use tokio::time::interval;
4
5use crate::{
6 client::{BetterstackClientTrait, NoopBetterstackClient},
7 dispatch::LogEvent,
8};
9
10#[derive(Debug, Clone)]
11pub struct ExportConfig {
12 pub batch_size: usize,
13 pub interval: Duration,
14}
15
16impl Default for ExportConfig {
17 fn default() -> Self {
18 Self {
19 batch_size: 100,
20 interval: Duration::from_secs(5),
21 }
22 }
23}
24
25impl ExportConfig {
26 pub fn with_batch_size(self, batch_size: usize) -> Self {
27 Self { batch_size, ..self }
28 }
29
30 pub fn with_interval(self, interval: Duration) -> Self {
31 Self { interval, ..self }
32 }
33}
34
35#[derive(Debug, Clone, Default)]
36pub struct LogDestination;
37
38pub(crate) struct BatchExporter<C> {
39 client: C,
40 queue: Vec<LogEvent>,
41 config: ExportConfig,
42}
43
44impl Default for BatchExporter<NoopBetterstackClient> {
45 fn default() -> Self {
46 Self::new(NoopBetterstackClient::new(), ExportConfig::default())
47 }
48}
49
50impl<C> BatchExporter<C> {
51 pub(crate) fn new(client: C, config: ExportConfig) -> Self {
52 let queue = Vec::with_capacity(config.batch_size);
53 Self {
54 client,
55 config,
56 queue,
57 }
58 }
59}
60
61impl<C> BatchExporter<C>
62where
63 C: BetterstackClientTrait + Send + Sync + 'static,
64{
65 pub(crate) async fn run(mut self, mut rx: UnboundedReceiver<LogEvent>) {
66 let mut interval = interval(self.config.interval);
67
68 loop {
69 tokio::select! {
70 _ = interval.tick() => {
71 if !self.queue.is_empty() {
72 self.flush_queue().await;
73 }
74 }
75 event = rx.recv() => {
76 match event {
77 Some(event) => {
78 self.queue.push(event);
79 if self.queue.len() >= self.config.batch_size {
80 self.flush_queue().await;
81 }
82 }
83 None => {
84 if !self.queue.is_empty() {
86 self.flush_queue().await;
87 }
88 break;
89 }
90 }
91 }
92 }
93 }
94 }
95
96 async fn flush_queue(&mut self) {
97 if let Err(err) = self
98 .client
99 .put_logs(LogDestination, std::mem::take(&mut self.queue))
100 .await
101 {
102 eprintln!("[tracing-betterstack] Failed to send logs: {}", err);
103 }
104 self.queue.clear();
105 self.queue.reserve(self.config.batch_size);
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use crate::client::BetterstackError;
112
113 use super::*;
114 use std::{
115 future::Future,
116 pin::Pin,
117 sync::{Arc, Mutex},
118 };
119 use tokio::sync::mpsc;
120
121 struct TestClient {
122 received_logs: Arc<Mutex<Vec<LogEvent>>>,
123 }
124
125 impl TestClient {
126 fn new() -> Self {
127 Self {
128 received_logs: Arc::new(Mutex::new(Vec::new())),
129 }
130 }
131 }
132
133 impl BetterstackClientTrait for TestClient {
134 fn put_logs<'a>(
135 &'a self,
136 _: LogDestination,
137 logs: Vec<LogEvent>,
138 ) -> Pin<Box<dyn Future<Output = Result<(), BetterstackError>> + Send + 'a>> {
139 let received_logs = self.received_logs.clone();
140 Box::pin(async move {
141 received_logs.lock().unwrap().extend(logs);
142 Ok(())
143 })
144 }
145 }
146
147 #[tokio::test]
148 async fn test_batch_exporter_sends_on_full_batch() {
149 let client = TestClient::new();
150 let received_logs = client.received_logs.clone();
151 let config = ExportConfig {
152 batch_size: 2,
153 interval: Duration::from_secs(5),
154 };
155
156 let (tx, rx) = mpsc::unbounded_channel();
157 let exporter = BatchExporter::new(client, config);
158
159 let handle = tokio::spawn(exporter.run(rx));
160
161 let event1 = LogEvent::new("test1".into());
163 let event2 = LogEvent::new("test2".into());
164 tx.send(event1).unwrap();
165 tx.send(event2).unwrap();
166
167 tokio::time::sleep(Duration::from_millis(100)).await;
169
170 let logs = received_logs.lock().unwrap();
172 assert_eq!(logs.len(), 2);
173 assert_eq!(logs[0].message, "test1");
174 assert_eq!(logs[1].message, "test2");
175
176 drop(tx);
178 let _ = handle.await;
179 }
180
181 #[tokio::test]
182 async fn test_batch_exporter_sends_on_interval() {
183 let client = TestClient::new();
184 let received_logs = client.received_logs.clone();
185 let config = ExportConfig {
186 batch_size: 10, interval: Duration::from_millis(100), };
189
190 let (tx, rx) = mpsc::unbounded_channel();
191 let exporter = BatchExporter::new(client, config);
192
193 let handle = tokio::spawn(exporter.run(rx));
194
195 let event = LogEvent::new("test".into());
197 tx.send(event).unwrap();
198
199 tokio::time::sleep(Duration::from_millis(150)).await;
201
202 let logs = received_logs.lock().unwrap();
204 assert_eq!(logs.len(), 1);
205 assert_eq!(logs[0].message, "test");
206
207 drop(tx);
209 let _ = handle.await;
210 }
211
212 #[tokio::test]
213 async fn test_batch_exporter_flushes_on_drop() {
214 let client = TestClient::new();
215 let received_logs = client.received_logs.clone();
216 let config = ExportConfig {
217 batch_size: 10,
218 interval: Duration::from_secs(5),
219 };
220
221 let (tx, rx) = mpsc::unbounded_channel();
222 let exporter = BatchExporter::new(client, config);
223
224 let handle = tokio::spawn(exporter.run(rx));
225
226 let event = LogEvent::new("test".into());
228 tx.send(event).unwrap();
229
230 drop(tx);
232 let _ = handle.await;
233
234 let logs = received_logs.lock().unwrap();
236 assert_eq!(logs.len(), 1);
237 assert_eq!(logs[0].message, "test");
238 }
239
240 #[test]
241 fn test_export_config() {
242 let config = ExportConfig::default()
243 .with_batch_size(50)
244 .with_interval(Duration::from_secs(10));
245
246 assert_eq!(config.batch_size, 50);
247 assert_eq!(config.interval, Duration::from_secs(10));
248 }
249}