tracing_cloudwatch/
export.rs1use std::fmt::Debug;
2use std::num::NonZeroUsize;
3use std::time::Duration;
4
5use tokio::{sync::mpsc::UnboundedReceiver, time::interval};
6
7use crate::{client::NoopClient, dispatch::LogEvent, CloudWatchClient};
8
9#[derive(Debug, Clone)]
11pub struct ExportConfig {
12 batch_size: NonZeroUsize,
14 interval: Duration,
16 destination: LogDestination,
18}
19
20#[derive(Debug, Clone, Default)]
22pub struct LogDestination {
23 pub log_group_name: String,
25 pub log_stream_name: String,
27}
28
29impl Default for ExportConfig {
30 fn default() -> Self {
31 Self {
32 batch_size: NonZeroUsize::new(5).unwrap(),
33 interval: Duration::from_secs(5),
34 destination: LogDestination::default(),
35 }
36 }
37}
38
39impl ExportConfig {
40 pub fn with_batch_size<T>(self, batch_size: T) -> Self
42 where
43 T: TryInto<NonZeroUsize>,
44 <T as TryInto<NonZeroUsize>>::Error: Debug,
45 {
46 Self {
47 batch_size: batch_size
48 .try_into()
49 .expect("batch size must be greater than or equal to 1"),
50 ..self
51 }
52 }
53
54 pub fn with_interval(self, interval: Duration) -> Self {
56 Self { interval, ..self }
57 }
58
59 pub fn with_log_group_name(self, log_group_name: impl Into<String>) -> Self {
61 Self {
62 destination: LogDestination {
63 log_group_name: log_group_name.into(),
64 log_stream_name: self.destination.log_stream_name,
65 },
66 ..self
67 }
68 }
69
70 pub fn with_log_stream_name(self, log_stream_name: impl Into<String>) -> Self {
72 Self {
73 destination: LogDestination {
74 log_stream_name: log_stream_name.into(),
75 log_group_name: self.destination.log_group_name,
76 },
77 ..self
78 }
79 }
80}
81
82pub(crate) struct BatchExporter<C> {
83 client: C,
84 queue: Vec<LogEvent>,
85 config: ExportConfig,
86}
87
88impl Default for BatchExporter<NoopClient> {
89 fn default() -> Self {
90 Self::new(NoopClient::new(), ExportConfig::default())
91 }
92}
93
94impl<C> BatchExporter<C> {
95 pub(crate) fn new(client: C, config: ExportConfig) -> Self {
96 Self {
97 client,
98 config,
99 queue: Vec::new(),
100 }
101 }
102}
103
104impl<C> BatchExporter<C>
105where
106 C: CloudWatchClient + Send + Sync + 'static,
107{
108 pub(crate) async fn run(self, mut rx: UnboundedReceiver<LogEvent>) {
109 let BatchExporter {
110 client,
111 mut queue,
112 config,
113 } = self;
114
115 let mut interval = interval(config.interval);
116
117 loop {
118 tokio::select! {
119 _ = interval.tick() => {
120 if queue.is_empty() {
121 continue;
122 }
123 }
124 event = rx.recv() => {
125 let Some(event) = event else {
126 break;
127 };
128
129 queue.push(event);
130 if queue.len() < <NonZeroUsize as Into<usize>>::into(config.batch_size) {
131 continue
132 }
133 }
134 }
135
136 let logs: Vec<LogEvent> = Self::take_from_queue(&mut queue);
137
138 if let Err(err) = client.put_logs(config.destination.clone(), logs).await {
139 eprintln!(
140 "[tracing-cloudwatch] Unable to put logs to cloudwatch. Error: {err:?} {:?}",
141 config.destination
142 );
143 }
144 }
145 }
146
147 fn take_from_queue(queue: &mut Vec<LogEvent>) -> Vec<LogEvent> {
148 if cfg!(feature = "ordered_logs") {
149 let mut logs = std::mem::take(queue);
150 logs.sort_by_key(|log| log.timestamp);
151 logs
152 } else {
153 std::mem::take(queue)
154 }
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use chrono::{DateTime, Utc};
162
163 const ONE_DAY_NS: i64 = 86_400_000_000_000;
164 const DAY_ONE: DateTime<Utc> = DateTime::from_timestamp_nanos(0 + ONE_DAY_NS);
165 const DAY_TWO: DateTime<Utc> = DateTime::from_timestamp_nanos(0 + (ONE_DAY_NS * 2));
166 const DAY_THREE: DateTime<Utc> = DateTime::from_timestamp_nanos(0 + (ONE_DAY_NS * 3));
167
168 #[cfg(not(feature = "ordered_logs"))]
169 #[test]
170 fn does_not_order_logs_by_default() {
171 let mut unordered_queue = vec![
172 LogEvent {
173 message: "1".to_string(),
174 timestamp: DAY_ONE,
175 },
176 LogEvent {
177 message: "3".to_string(),
178 timestamp: DAY_THREE,
179 },
180 LogEvent {
181 message: "2".to_string(),
182 timestamp: DAY_TWO,
183 },
184 ];
185 let still_unordered_queue =
186 BatchExporter::<NoopClient>::take_from_queue(&mut unordered_queue);
187
188 let mut still_unordered_queue_iter = still_unordered_queue.iter();
189 assert_eq!(
190 DAY_ONE,
191 still_unordered_queue_iter.next().unwrap().timestamp
192 );
193 assert_eq!(
194 DAY_THREE,
195 still_unordered_queue_iter.next().unwrap().timestamp
196 );
197 assert_eq!(
198 DAY_TWO,
199 still_unordered_queue_iter.next().unwrap().timestamp
200 );
201 }
202
203 #[cfg(feature = "ordered_logs")]
204 mod ordering {
205 use super::*;
206
207 fn assert_is_ordered(logs: Vec<LogEvent>) {
208 let mut last_timestamp = DateTime::from_timestamp_nanos(0);
209
210 for log in logs {
211 assert!(
212 log.timestamp > last_timestamp,
213 "Not true: {} > {}",
214 log.timestamp,
215 last_timestamp
216 );
217 last_timestamp = log.timestamp;
218 }
219 }
220
221 #[test]
222 fn orders_logs_when_enabled() {
223 let mut unordered_queue = vec![
224 LogEvent {
225 message: "1".to_string(),
226 timestamp: DAY_ONE,
227 },
228 LogEvent {
229 message: "3".to_string(),
230 timestamp: DAY_THREE,
231 },
232 LogEvent {
233 message: "2".to_string(),
234 timestamp: DAY_TWO,
235 },
236 ];
237 let ordered_queue = BatchExporter::<NoopClient>::take_from_queue(&mut unordered_queue);
238 assert_is_ordered(ordered_queue);
239 }
240 }
241}