cloudwatch_logs_upload/
lib.rs

1//! Rust library for uploading events to AWS CloudWatch Logs.
2
3#![warn(missing_docs)]
4
5use fehler::{throw, throws};
6use log::error;
7use rusoto_core::RusotoError;
8use rusoto_logs::{
9    CloudWatchLogs, CloudWatchLogsClient, DescribeLogStreamsError,
10    DescribeLogStreamsRequest, InputLogEvent, PutLogEventsError,
11    PutLogEventsRequest,
12};
13use std::{
14    io,
15    sync::{Arc, Mutex},
16    thread,
17    time::{Duration, SystemTime},
18};
19
20/// The maximum number of log events in a batch is 10,000.
21pub const MAX_EVENTS_IN_BATCH: usize = 10_000;
22
23/// The maximum batch size is 1,048,576 bytes, and this size is
24/// calculated as the sum of all event messages in UTF-8, plus 26
25/// bytes for each log event.
26pub const MAX_BATCH_SIZE: usize = 1_048_576;
27
28/// The maximum batch size is 1,048,576 bytes, and this size is
29/// calculated as the sum of all event messages in UTF-8, plus 26
30/// bytes for each log event.
31pub const EVENT_OVERHEAD: usize = 26;
32
33/// The time the event occurred, expressed as the number of
34/// milliseconds after Jan 1, 1970 00:00:00 UTC.
35pub type Timestamp = i64;
36
37/// A batch of log events in a single request cannot span more than 24
38/// hours. This constant is in milliseconds.
39pub const MAX_DURATION_MILLIS: i64 = 24 * 60 * 60 * 1000;
40
41/// Get the current timestamp. Returns 0 if the time is before the
42/// unix epoch.
43pub fn get_current_timestamp() -> Timestamp {
44    if let Ok(duration) =
45        SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
46    {
47        duration.as_millis() as Timestamp
48    } else {
49        0
50    }
51}
52
53/// Unified event type for upload errors.
54#[derive(Debug, thiserror::Error)]
55pub enum Error {
56    /// Event message string is larger than 1,048,550 bytes.
57    #[error("event exceeds the max batch size")]
58    EventTooLarge(usize),
59
60    /// An API error was returned when trying to upload a batch.
61    #[error("failed to upload log batch: {0}")]
62    PutLogsError(#[from] RusotoError<PutLogEventsError>),
63
64    /// An API error was returned when trying to get the log stream
65    /// upload sequence token.
66    #[error("failed to get sequence token: {0}")]
67    SequenceTokenError(#[from] RusotoError<DescribeLogStreamsError>),
68
69    /// The log stream does not exist.
70    #[error("invalid log stream")]
71    InvalidLogStream,
72
73    /// An internal mutex error occurred.
74    #[error("failed to lock the mutex")]
75    PoisonedLock,
76
77    /// The background upload thread can only be started once.
78    #[error("upload thread already started")]
79    ThreadAlreadyStarted,
80
81    /// Failed to start the background upload thread.
82    #[error("failed to spawn thread: {0}")]
83    SpawnError(io::Error),
84}
85
86/// An inclusive range of timestamps.
87#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
88pub struct TimestampRange {
89    /// Start time (inclusive).
90    pub start: Timestamp,
91    /// End time (inclusive).
92    pub end: Timestamp,
93}
94
95impl TimestampRange {
96    /// Create a range for a single instant in time.
97    pub fn new(t: Timestamp) -> TimestampRange {
98        TimestampRange { start: t, end: t }
99    }
100
101    /// Time in milliseconds between start and end.
102    pub fn duration_in_millis(&self) -> i64 {
103        self.end - self.start
104    }
105
106    /// Adjust start and end as needed to include `t`.
107    pub fn expand_to_include(&mut self, t: Timestamp) {
108        if t < self.start {
109            self.start = t;
110        }
111        if t > self.end {
112            self.end = t;
113        }
114    }
115
116    /// Create a new TimestampRange with start and end adjusted as
117    /// needed to include `t`.
118    pub fn expand_to_include_copy(&self, t: Timestamp) -> TimestampRange {
119        let mut copy = *self;
120        copy.expand_to_include(t);
121        copy
122    }
123}
124
125/// Internal interface for creating batches of events.
126///
127/// This shouldn't normally need to be used directly, but is exposed
128/// in case the BatchUploader interface doesn't behave quite the way
129/// you want, you might still find value in using QueuedBatches to
130/// handle the various batch upload limits.
131#[derive(Default)]
132pub struct QueuedBatches {
133    /// Queued batches that haven't been sent yet.
134    ///
135    /// Box the batch so that adding and removing elements from the
136    /// batches vector is cheap.
137    ///
138    /// Events are queued up in a vector that is not necessarily in
139    /// order. CloudWatch Logs requires events to be sorted by
140    /// timestamp, but in practice if you have a lot of events coming
141    /// in from different threads they will end up being out of order
142    /// due to waiting on a shared lock. So if you try to keep the
143    /// events in each batch in order, and start a new batch every
144    /// time an out-of-order event is received, the batches end up
145    /// being very small.
146    #[allow(clippy::vec_box)]
147    batches: Vec<Box<Vec<InputLogEvent>>>,
148
149    /// Total size of the batch at the end of the batches vector.
150    current_batch_size: usize,
151
152    /// Oldest and newest timestamps of events in the batch at the end
153    /// of the batches vector. This is used to ensure that a batch
154    /// does not exceed the 24-hour limit.
155    current_batch_time_range: TimestampRange,
156}
157
158impl QueuedBatches {
159    /// Add a new event.
160    ///
161    /// There are a couple AWS limits not enforced yet:
162    ///
163    /// - None of the log events in the batch can be more than 2 hours
164    ///   in the future
165    ///
166    /// - None of the log events in the batch can be older than 14 days
167    ///   or older than the retention period of the log group
168    #[throws]
169    pub fn add_event(&mut self, event: InputLogEvent) {
170        let event_size = event.message.as_bytes().len() + EVENT_OVERHEAD;
171        if event_size > MAX_BATCH_SIZE {
172            // The event is bigger than the batch size
173            throw!(Error::EventTooLarge(event_size));
174        }
175
176        if self.is_new_batch_needed(&event, event_size) {
177            self.batches.push(Box::new(Vec::new()));
178            self.current_batch_size = 0;
179            self.current_batch_time_range =
180                TimestampRange::new(event.timestamp);
181        }
182
183        self.current_batch_size += event_size;
184        self.current_batch_time_range
185            .expand_to_include(event.timestamp);
186        // Ok to unwrap here, the code above ensures there is at least
187        // one available batch
188        let batch = self.batches.last_mut().unwrap();
189        batch.push(event);
190    }
191
192    fn is_new_batch_needed(
193        &self,
194        event: &InputLogEvent,
195        event_size: usize,
196    ) -> bool {
197        // Ensure there's at least one batch
198        let batch = if let Some(batch) = self.batches.last() {
199            batch
200        } else {
201            return true;
202        };
203
204        // Check if maximum number of events exceeded
205        if batch.len() >= MAX_EVENTS_IN_BATCH {
206            return true;
207        }
208
209        // Check if maximum payload size exceeded
210        if self.current_batch_size + event_size > MAX_BATCH_SIZE {
211            return true;
212        }
213
214        // Check if 24-hour limit is exceeded
215        if !batch.is_empty() {
216            let new_range = self
217                .current_batch_time_range
218                .expand_to_include_copy(event.timestamp);
219            if new_range.duration_in_millis() > MAX_DURATION_MILLIS {
220                return true;
221            }
222        }
223
224        false
225    }
226}
227
228/// Log group and stream names.
229#[derive(Clone, Debug, Eq, PartialEq)]
230pub struct UploadTarget {
231    /// Log group name.
232    pub group: String,
233    /// Log stream name.
234    pub stream: String,
235}
236
237struct BatchUploaderInternal {
238    target: UploadTarget,
239
240    queued_batches: QueuedBatches,
241
242    client: CloudWatchLogsClient,
243    next_sequence_token: Option<String>,
244
245    thread_started: bool,
246}
247
248impl BatchUploaderInternal {
249    #[throws]
250    fn refresh_sequence_token(&mut self) {
251        let resp = self
252            .client
253            .describe_log_streams(DescribeLogStreamsRequest {
254                limit: Some(1),
255                order_by: Some("LogStreamName".into()),
256                log_group_name: self.target.group.clone(),
257                log_stream_name_prefix: Some(self.target.stream.clone()),
258                ..Default::default()
259            })
260            .sync()?;
261        let log_streams = resp.log_streams.ok_or(Error::InvalidLogStream)?;
262        // TODO: need to verify that this is correct if you have
263        // multiple log streams with the same prefix. I *think* it's
264        // good because we are sorting by stream name and limiting to
265        // 1, so as long as "myPrefix" gets sorted before
266        // "myPrefixAndOtherStuff" this should be correct. Needs
267        // testing though.
268        let log_stream = log_streams.first().ok_or(Error::InvalidLogStream)?;
269        if Some(self.target.stream.clone()) != log_stream.log_stream_name {
270            // This should never happen
271            error!(
272                "log stream name {} != {:?}",
273                self.target.stream, log_stream.log_stream_name
274            );
275            throw!(Error::InvalidLogStream);
276        }
277        self.next_sequence_token = log_stream.upload_sequence_token.clone();
278    }
279
280    #[throws]
281    fn upload_batch(&mut self) {
282        let mut batch = if let Some(batch) = self.queued_batches.batches.pop() {
283            *batch
284        } else {
285            return;
286        };
287
288        // Refresh the sequence token if necessary. This actually
289        // isn't needed right after creating the log stream, so
290        // there's an unnecessary fetch here the first time, but
291        // that's probably fine.
292        if self.next_sequence_token.is_none() {
293            self.refresh_sequence_token()?;
294        }
295
296        // The events must be sorted by timestamp
297        batch.sort_unstable_by_key(|event| event.timestamp);
298
299        let req = PutLogEventsRequest {
300            log_events: batch,
301            sequence_token: self.next_sequence_token.clone(),
302            log_group_name: self.target.group.clone(),
303            log_stream_name: self.target.stream.clone(),
304        };
305
306        match self.client.put_log_events(req).sync() {
307            Ok(resp) => {
308                self.next_sequence_token = resp.next_sequence_token;
309
310                // TODO: handle rejected events
311            }
312            Err(err) => {
313                // TODO: if the batch upload failed, consider putting
314                // the batch back.
315
316                // Clear the sequence token so that it gets refreshed
317                // next time
318                self.next_sequence_token = None;
319
320                throw!(err);
321            }
322        }
323    }
324}
325
326/// Main interface for uploading logs in batches to AWS CloudWatch Logs.
327///
328/// This can be safely used from multiple threads by cloning it.
329#[derive(Clone)]
330pub struct BatchUploader {
331    internal: Arc<Mutex<BatchUploaderInternal>>,
332}
333
334impl BatchUploader {
335    /// Create a new `BatchUploader`.
336    pub fn new(
337        client: CloudWatchLogsClient,
338        target: UploadTarget,
339    ) -> BatchUploader {
340        BatchUploader {
341            internal: Arc::new(Mutex::new(BatchUploaderInternal {
342                target,
343                client,
344                queued_batches: QueuedBatches::default(),
345                next_sequence_token: None,
346                thread_started: false,
347            })),
348        }
349    }
350
351    /// Add a new event.
352    ///
353    /// There are a couple AWS limits not enforced yet:
354    ///
355    /// - None of the log events in the batch can be more than 2 hours
356    ///   in the future
357    ///
358    /// - None of the log events in the batch can be older than 14 days
359    ///   or older than the retention period of the log group
360    #[throws]
361    pub fn add_event(&self, event: InputLogEvent) {
362        let mut guard =
363            self.internal.lock().map_err(|_| Error::PoisonedLock)?;
364        guard.queued_batches.add_event(event)?;
365    }
366
367    /// Start a background thread for uploading batches of events.
368    #[throws]
369    pub fn start_background_thread(&self) -> thread::JoinHandle<()> {
370        let mut guard =
371            self.internal.lock().map_err(|_| Error::PoisonedLock)?;
372        // Prevent multiple upload threads from being started
373        if guard.thread_started {
374            throw!(Error::ThreadAlreadyStarted);
375        }
376        guard.thread_started = true;
377
378        let builder =
379            thread::Builder::new().name("cloudwatch-logs-upload".into());
380        let internal = self.internal.clone();
381        let handle = builder
382            .spawn(move || loop {
383                if let Ok(mut guard) = internal.lock() {
384                    // There is a quota of 5 requests per second per log
385                    // stream, so upload up to five batches, then sleep
386                    // for at least one second.
387                    for _ in 0..5 {
388                        if let Err(err) = guard.upload_batch() {
389                            error!(
390                                "CloudWatch Logs batch upload failed: {}",
391                                err
392                            );
393                        }
394                    }
395                } else {
396                    error!("CloudWatch Logs bad lock");
397                }
398                thread::sleep(Duration::from_secs(1));
399            })
400            .map_err(Error::SpawnError)?;
401        handle
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408
409    #[test]
410    fn test_event_too_large() {
411        let mut qb = QueuedBatches::default();
412
413        // Create a message that is at the limit
414        let max_message_size = MAX_BATCH_SIZE - EVENT_OVERHEAD;
415        let mut message = String::with_capacity(max_message_size + 1);
416        for _ in 0..message.capacity() - 1 {
417            message.push('x');
418        }
419
420        // Verify the message is added successfully
421        qb.add_event(InputLogEvent {
422            message: message.clone(),
423            timestamp: 0,
424        })
425        .unwrap();
426
427        // Make the message too big
428        message.push('x');
429        assert!(matches!(
430            qb.add_event(InputLogEvent {
431                message,
432                timestamp: 0,
433            }),
434            Err(Error::EventTooLarge(size)) if size == MAX_BATCH_SIZE + 1
435        ));
436    }
437
438    #[test]
439    fn test_max_events_in_batch() {
440        let mut qb = QueuedBatches::default();
441        for _ in 0..MAX_EVENTS_IN_BATCH {
442            qb.add_event(InputLogEvent {
443                ..Default::default()
444            })
445            .unwrap();
446            // All of these events should fit in one batch
447            assert_eq!(qb.batches.len(), 1);
448        }
449
450        // Verify that adding one more event creates a new batch
451        qb.add_event(InputLogEvent {
452            ..Default::default()
453        })
454        .unwrap();
455        assert_eq!(qb.batches.len(), 2);
456    }
457
458    #[test]
459    fn test_max_batch_size() {
460        let mut qb = QueuedBatches::default();
461
462        // Create a message slightly under the limit
463        let message_size = MAX_BATCH_SIZE - EVENT_OVERHEAD * 2;
464        let mut message = String::with_capacity(message_size);
465        for _ in 0..message.capacity() {
466            message.push('x');
467        }
468
469        // Verify the message is added successfully
470        qb.add_event(InputLogEvent {
471            message: message.clone(),
472            timestamp: 0,
473        })
474        .unwrap();
475        assert_eq!(qb.batches.len(), 1);
476        assert_eq!(qb.current_batch_size, message_size + EVENT_OVERHEAD);
477
478        // Verify that adding one more message within the batch is OK
479        qb.add_event(InputLogEvent {
480            message: "".to_string(),
481            timestamp: 0,
482        })
483        .unwrap();
484        assert_eq!(qb.batches.len(), 1);
485        assert_eq!(qb.current_batch_size, message_size + EVENT_OVERHEAD * 2);
486
487        // Verify that adding anything else goes into a new batch
488        qb.add_event(InputLogEvent {
489            message: "".to_string(),
490            timestamp: 0,
491        })
492        .unwrap();
493        assert_eq!(qb.batches.len(), 2);
494        assert_eq!(qb.current_batch_size, EVENT_OVERHEAD);
495    }
496
497    #[test]
498    fn test_timestamp_order() {
499        let mut qb = QueuedBatches::default();
500
501        // Add an event at time 1
502        qb.add_event(InputLogEvent {
503            message: "".to_string(),
504            timestamp: 1,
505        })
506        .unwrap();
507        assert_eq!(qb.batches.len(), 1);
508
509        // Add an event at time 0, verify it goes into the same batch
510        qb.add_event(InputLogEvent {
511            message: "".to_string(),
512            timestamp: 0,
513        })
514        .unwrap();
515        assert_eq!(qb.batches.len(), 1);
516    }
517
518    #[test]
519    fn test_batch_max_duration() {
520        let mut qb = QueuedBatches::default();
521
522        // Add an event at time 0
523        qb.add_event(InputLogEvent {
524            message: "".to_string(),
525            timestamp: 0,
526        })
527        .unwrap();
528        assert_eq!(qb.batches.len(), 1);
529
530        // Add an event over 24 hours later, verify it goes into a new batch
531        qb.add_event(InputLogEvent {
532            message: "".to_string(),
533            timestamp: MAX_DURATION_MILLIS + 1,
534        })
535        .unwrap();
536        assert_eq!(qb.batches.len(), 2);
537    }
538}