tracing_betterstack/
export.rs

1use std::time::Duration;
2use tokio::sync::mpsc::UnboundedReceiver;
3use tokio::time::interval;
4
5use crate::{
6    client::{BetterstackClientTrait, NoopBetterstackClient},
7    dispatch::LogEvent,
8};
9
10#[derive(Debug, Clone)]
11pub struct ExportConfig {
12    pub batch_size: usize,
13    pub interval: Duration,
14}
15
16impl Default for ExportConfig {
17    fn default() -> Self {
18        Self {
19            batch_size: 100,
20            interval: Duration::from_secs(5),
21        }
22    }
23}
24
25impl ExportConfig {
26    pub fn with_batch_size(self, batch_size: usize) -> Self {
27        Self { batch_size, ..self }
28    }
29
30    pub fn with_interval(self, interval: Duration) -> Self {
31        Self { interval, ..self }
32    }
33}
34
35#[derive(Debug, Clone, Default)]
36pub struct LogDestination;
37
38pub(crate) struct BatchExporter<C> {
39    client: C,
40    queue: Vec<LogEvent>,
41    config: ExportConfig,
42}
43
44impl Default for BatchExporter<NoopBetterstackClient> {
45    fn default() -> Self {
46        Self::new(NoopBetterstackClient::new(), ExportConfig::default())
47    }
48}
49
50impl<C> BatchExporter<C> {
51    pub(crate) fn new(client: C, config: ExportConfig) -> Self {
52        let queue = Vec::with_capacity(config.batch_size);
53        Self {
54            client,
55            config,
56            queue,
57        }
58    }
59}
60
61impl<C> BatchExporter<C>
62where
63    C: BetterstackClientTrait + Send + Sync + 'static,
64{
65    pub(crate) async fn run(mut self, mut rx: UnboundedReceiver<LogEvent>) {
66        let mut interval = interval(self.config.interval);
67
68        loop {
69            tokio::select! {
70                _ = interval.tick() => {
71                    if !self.queue.is_empty() {
72                        self.flush_queue().await;
73                    }
74                }
75                event = rx.recv() => {
76                    match event {
77                        Some(event) => {
78                            self.queue.push(event);
79                            if self.queue.len() >= self.config.batch_size {
80                                self.flush_queue().await;
81                            }
82                        }
83                        None => {
84                            // Channel closed, flush remaining events
85                            if !self.queue.is_empty() {
86                                self.flush_queue().await;
87                            }
88                            break;
89                        }
90                    }
91                }
92            }
93        }
94    }
95
96    async fn flush_queue(&mut self) {
97        if let Err(err) = self
98            .client
99            .put_logs(LogDestination, std::mem::take(&mut self.queue))
100            .await
101        {
102            eprintln!("[tracing-betterstack] Failed to send logs: {}", err);
103        }
104        self.queue.clear();
105        self.queue.reserve(self.config.batch_size);
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use crate::client::BetterstackError;
112
113    use super::*;
114    use std::{
115        future::Future,
116        pin::Pin,
117        sync::{Arc, Mutex},
118    };
119    use tokio::sync::mpsc;
120
121    struct TestClient {
122        received_logs: Arc<Mutex<Vec<LogEvent>>>,
123    }
124
125    impl TestClient {
126        fn new() -> Self {
127            Self {
128                received_logs: Arc::new(Mutex::new(Vec::new())),
129            }
130        }
131    }
132
133    impl BetterstackClientTrait for TestClient {
134        fn put_logs<'a>(
135            &'a self,
136            _: LogDestination,
137            logs: Vec<LogEvent>,
138        ) -> Pin<Box<dyn Future<Output = Result<(), BetterstackError>> + Send + 'a>> {
139            let received_logs = self.received_logs.clone();
140            Box::pin(async move {
141                received_logs.lock().unwrap().extend(logs);
142                Ok(())
143            })
144        }
145    }
146
147    #[tokio::test]
148    async fn test_batch_exporter_sends_on_full_batch() {
149        let client = TestClient::new();
150        let received_logs = client.received_logs.clone();
151        let config = ExportConfig {
152            batch_size: 2,
153            interval: Duration::from_secs(5),
154        };
155
156        let (tx, rx) = mpsc::unbounded_channel();
157        let exporter = BatchExporter::new(client, config);
158
159        let handle = tokio::spawn(exporter.run(rx));
160
161        // Send events
162        let event1 = LogEvent::new("test1".into());
163        let event2 = LogEvent::new("test2".into());
164        tx.send(event1).unwrap();
165        tx.send(event2).unwrap();
166
167        // Give some time for processing
168        tokio::time::sleep(Duration::from_millis(100)).await;
169
170        // Check received logs
171        let logs = received_logs.lock().unwrap();
172        assert_eq!(logs.len(), 2);
173        assert_eq!(logs[0].message, "test1");
174        assert_eq!(logs[1].message, "test2");
175
176        // Cleanup
177        drop(tx);
178        let _ = handle.await;
179    }
180
181    #[tokio::test]
182    async fn test_batch_exporter_sends_on_interval() {
183        let client = TestClient::new();
184        let received_logs = client.received_logs.clone();
185        let config = ExportConfig {
186            batch_size: 10,                       // Larger than what we'll send
187            interval: Duration::from_millis(100), // Short interval for testing
188        };
189
190        let (tx, rx) = mpsc::unbounded_channel();
191        let exporter = BatchExporter::new(client, config);
192
193        let handle = tokio::spawn(exporter.run(rx));
194
195        // Send one event
196        let event = LogEvent::new("test".into());
197        tx.send(event).unwrap();
198
199        // Wait for interval to trigger
200        tokio::time::sleep(Duration::from_millis(150)).await;
201
202        // Check received logs
203        let logs = received_logs.lock().unwrap();
204        assert_eq!(logs.len(), 1);
205        assert_eq!(logs[0].message, "test");
206
207        // Cleanup
208        drop(tx);
209        let _ = handle.await;
210    }
211
212    #[tokio::test]
213    async fn test_batch_exporter_flushes_on_drop() {
214        let client = TestClient::new();
215        let received_logs = client.received_logs.clone();
216        let config = ExportConfig {
217            batch_size: 10,
218            interval: Duration::from_secs(5),
219        };
220
221        let (tx, rx) = mpsc::unbounded_channel();
222        let exporter = BatchExporter::new(client, config);
223
224        let handle = tokio::spawn(exporter.run(rx));
225
226        // Send an event
227        let event = LogEvent::new("test".into());
228        tx.send(event).unwrap();
229
230        // Drop the sender to trigger flush
231        drop(tx);
232        let _ = handle.await;
233
234        // Check that logs were flushed
235        let logs = received_logs.lock().unwrap();
236        assert_eq!(logs.len(), 1);
237        assert_eq!(logs[0].message, "test");
238    }
239
240    #[test]
241    fn test_export_config() {
242        let config = ExportConfig::default()
243            .with_batch_size(50)
244            .with_interval(Duration::from_secs(10));
245
246        assert_eq!(config.batch_size, 50);
247        assert_eq!(config.interval, Duration::from_secs(10));
248    }
249}