tracing_cloudwatch/
export.rs

1use std::fmt::Debug;
2use std::num::NonZeroUsize;
3use std::time::Duration;
4
5use tokio::{sync::mpsc::UnboundedReceiver, time::interval};
6
7use crate::{client::NoopClient, dispatch::LogEvent, CloudWatchClient};
8
9/// Configurations to control the behavior of exporting logs to CloudWatch.
10#[derive(Debug, Clone)]
11pub struct ExportConfig {
12    /// The number of logs to retain in the buffer within the interval period.
13    batch_size: NonZeroUsize,
14    /// The interval for putting logs.
15    interval: Duration,
16    /// To which logs sended.
17    destination: LogDestination,
18}
19
20/// To which logs to sended.
21#[derive(Debug, Clone, Default)]
22pub struct LogDestination {
23    /// The name of the log group.
24    pub log_group_name: String,
25    /// The name of the log stream.
26    pub log_stream_name: String,
27}
28
29impl Default for ExportConfig {
30    fn default() -> Self {
31        Self {
32            batch_size: NonZeroUsize::new(5).unwrap(),
33            interval: Duration::from_secs(5),
34            destination: LogDestination::default(),
35        }
36    }
37}
38
39impl ExportConfig {
40    /// Set batch size.
41    pub fn with_batch_size<T>(self, batch_size: T) -> Self
42    where
43        T: TryInto<NonZeroUsize>,
44        <T as TryInto<NonZeroUsize>>::Error: Debug,
45    {
46        Self {
47            batch_size: batch_size
48                .try_into()
49                .expect("batch size must be greater than or equal to 1"),
50            ..self
51        }
52    }
53
54    /// Set export interval.
55    pub fn with_interval(self, interval: Duration) -> Self {
56        Self { interval, ..self }
57    }
58
59    /// Set log group name.
60    pub fn with_log_group_name(self, log_group_name: impl Into<String>) -> Self {
61        Self {
62            destination: LogDestination {
63                log_group_name: log_group_name.into(),
64                log_stream_name: self.destination.log_stream_name,
65            },
66            ..self
67        }
68    }
69
70    /// Set log stream name.
71    pub fn with_log_stream_name(self, log_stream_name: impl Into<String>) -> Self {
72        Self {
73            destination: LogDestination {
74                log_stream_name: log_stream_name.into(),
75                log_group_name: self.destination.log_group_name,
76            },
77            ..self
78        }
79    }
80}
81
82pub(crate) struct BatchExporter<C> {
83    client: C,
84    queue: Vec<LogEvent>,
85    config: ExportConfig,
86}
87
88impl Default for BatchExporter<NoopClient> {
89    fn default() -> Self {
90        Self::new(NoopClient::new(), ExportConfig::default())
91    }
92}
93
94impl<C> BatchExporter<C> {
95    pub(crate) fn new(client: C, config: ExportConfig) -> Self {
96        Self {
97            client,
98            config,
99            queue: Vec::new(),
100        }
101    }
102}
103
104impl<C> BatchExporter<C>
105where
106    C: CloudWatchClient + Send + Sync + 'static,
107{
108    pub(crate) async fn run(self, mut rx: UnboundedReceiver<LogEvent>) {
109        let BatchExporter {
110            client,
111            mut queue,
112            config,
113        } = self;
114
115        let mut interval = interval(config.interval);
116
117        loop {
118            tokio::select! {
119                 _ = interval.tick() => {
120                    if queue.is_empty() {
121                        continue;
122                    }
123                }
124                event = rx.recv() => {
125                    let Some(event) = event else {
126                        break;
127                    };
128
129                    queue.push(event);
130                    if queue.len() < <NonZeroUsize as Into<usize>>::into(config.batch_size) {
131                        continue
132                    }
133                }
134            }
135
136            let logs: Vec<LogEvent> = Self::take_from_queue(&mut queue);
137
138            if let Err(err) = client.put_logs(config.destination.clone(), logs).await {
139                eprintln!(
140                    "[tracing-cloudwatch] Unable to put logs to cloudwatch. Error: {err:?} {:?}",
141                    config.destination
142                );
143            }
144        }
145    }
146
147    fn take_from_queue(queue: &mut Vec<LogEvent>) -> Vec<LogEvent> {
148        if cfg!(feature = "ordered_logs") {
149            let mut logs = std::mem::take(queue);
150            logs.sort_by_key(|log| log.timestamp);
151            logs
152        } else {
153            std::mem::take(queue)
154        }
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use chrono::{DateTime, Utc};
162
163    const ONE_DAY_NS: i64 = 86_400_000_000_000;
164    const DAY_ONE: DateTime<Utc> = DateTime::from_timestamp_nanos(0 + ONE_DAY_NS);
165    const DAY_TWO: DateTime<Utc> = DateTime::from_timestamp_nanos(0 + (ONE_DAY_NS * 2));
166    const DAY_THREE: DateTime<Utc> = DateTime::from_timestamp_nanos(0 + (ONE_DAY_NS * 3));
167
168    #[cfg(not(feature = "ordered_logs"))]
169    #[test]
170    fn does_not_order_logs_by_default() {
171        let mut unordered_queue = vec![
172            LogEvent {
173                message: "1".to_string(),
174                timestamp: DAY_ONE,
175            },
176            LogEvent {
177                message: "3".to_string(),
178                timestamp: DAY_THREE,
179            },
180            LogEvent {
181                message: "2".to_string(),
182                timestamp: DAY_TWO,
183            },
184        ];
185        let still_unordered_queue =
186            BatchExporter::<NoopClient>::take_from_queue(&mut unordered_queue);
187
188        let mut still_unordered_queue_iter = still_unordered_queue.iter();
189        assert_eq!(
190            DAY_ONE,
191            still_unordered_queue_iter.next().unwrap().timestamp
192        );
193        assert_eq!(
194            DAY_THREE,
195            still_unordered_queue_iter.next().unwrap().timestamp
196        );
197        assert_eq!(
198            DAY_TWO,
199            still_unordered_queue_iter.next().unwrap().timestamp
200        );
201    }
202
203    #[cfg(feature = "ordered_logs")]
204    mod ordering {
205        use super::*;
206
207        fn assert_is_ordered(logs: Vec<LogEvent>) {
208            let mut last_timestamp = DateTime::from_timestamp_nanos(0);
209
210            for log in logs {
211                assert!(
212                    log.timestamp > last_timestamp,
213                    "Not true: {} > {}",
214                    log.timestamp,
215                    last_timestamp
216                );
217                last_timestamp = log.timestamp;
218            }
219        }
220
221        #[test]
222        fn orders_logs_when_enabled() {
223            let mut unordered_queue = vec![
224                LogEvent {
225                    message: "1".to_string(),
226                    timestamp: DAY_ONE,
227                },
228                LogEvent {
229                    message: "3".to_string(),
230                    timestamp: DAY_THREE,
231                },
232                LogEvent {
233                    message: "2".to_string(),
234                    timestamp: DAY_TWO,
235                },
236            ];
237            let ordered_queue = BatchExporter::<NoopClient>::take_from_queue(&mut unordered_queue);
238            assert_is_ordered(ordered_queue);
239        }
240    }
241}