1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
use fehler::{throw, throws};
use log::error;
use rusoto_core::RusotoError;
use rusoto_logs::{
    CloudWatchLogs, CloudWatchLogsClient, DescribeLogStreamsError,
    DescribeLogStreamsRequest, InputLogEvent, PutLogEventsError,
    PutLogEventsRequest,
};
use std::{
    io,
    sync::{Arc, Mutex},
    thread,
    time::{Duration, SystemTime},
};

/// The maximum number of log events in a batch is 10,000.
pub const MAX_EVENTS_IN_BATCH: usize = 10_000;

/// The maximum batch size is 1,048,576 bytes, and this size is
/// calculated as the sum of all event messages in UTF-8, plus 26
/// bytes for each log event.
pub const MAX_BATCH_SIZE: usize = 1_048_576;

/// The maximum batch size is 1,048,576 bytes, and this size is
/// calculated as the sum of all event messages in UTF-8, plus 26
/// bytes for each log event.
pub const EVENT_OVERHEAD: usize = 26;

/// The time the event occurred, expressed as the number of
/// milliseconds after Jan 1, 1970 00:00:00 UTC.
pub type Timestamp = i64;

/// A batch of log events in a single request cannot span more than 24
/// hours. This constant is in milliseconds.
pub const MAX_DURATION_MILLIS: i64 = 24 * 60 * 60 * 1000;

/// Get the current timestamp. Returns 0 if the time is before the
/// unix epoch.
pub fn get_current_timestamp() -> Timestamp {
    if let Ok(duration) =
        SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
    {
        duration.as_millis() as Timestamp
    } else {
        0
    }
}

/// Unified event type for upload errors.
#[derive(Debug, thiserror::Error)]
pub enum Error {
    /// Event message string is larger than 1,048,550 bytes.
    #[error("event exceeds the max batch size")]
    EventTooLarge(usize),

    /// An API error was returned when trying to upload a batch.
    #[error("failed to upload log batch: {0}")]
    PutLogsError(#[from] RusotoError<PutLogEventsError>),

    /// An API error was returned when trying to get the log stream
    /// upload sequence token.
    #[error("failed to get sequence token: {0}")]
    SequenceTokenError(#[from] RusotoError<DescribeLogStreamsError>),

    /// The log stream does not exist.
    #[error("invalid log stream")]
    InvalidLogStream,

    /// An internal mutex error occurred.
    #[error("failed to lock the mutex")]
    PoisonedLock,

    /// The background upload thread can only be started once.
    #[error("upload thread already started")]
    ThreadAlreadyStarted,

    /// Failed to start the background upload thread.
    #[error("failed to spawn thread: {0}")]
    SpawnError(io::Error),
}

/// An inclusive range of timestamps.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct TimestampRange {
    pub start: Timestamp,
    pub end: Timestamp,
}

impl TimestampRange {
    /// Create a range for a single instant in time.
    pub fn new(t: Timestamp) -> TimestampRange {
        TimestampRange { start: t, end: t }
    }

    /// Time in milliseconds between start and end.
    pub fn duration_in_millis(&self) -> i64 {
        self.end - self.start
    }

    /// Adjust start and end as needed to include `t`.
    pub fn expand_to_include(&mut self, t: Timestamp) {
        if t < self.start {
            self.start = t;
        }
        if t > self.end {
            self.end = t;
        }
    }

    /// Create a new TimestampRange with start and end adjusted as
    /// needed to include `t`.
    pub fn expand_to_include_copy(&self, t: Timestamp) -> TimestampRange {
        let mut copy = *self;
        copy.expand_to_include(t);
        copy
    }
}

/// Internal interface for creating batches of events.
///
/// This shouldn't normally need to be used directly, but is exposed
/// in case the BatchUploader interface doesn't behave quite the way
/// you want, you might still find value in using QueuedBatches to
/// handle the various batch upload limits.
#[derive(Default)]
pub struct QueuedBatches {
    /// Queued batches that haven't been sent yet.
    ///
    /// Box the batch so that adding and removing elements from the
    /// batches vector is cheap.
    ///
    /// Events are queued up in a vector that is not necessarily in
    /// order. CloudWatch Logs requires events to be sorted by
    /// timestamp, but in practice if you have a lot of events coming
    /// in from different threads they will end up being out of order
    /// due to waiting on a shared lock. So if you try to keep the
    /// events in each batch in order, and start a new batch every
    /// time an out-of-order event is received, the batches end up
    /// being very small.
    #[allow(clippy::vec_box)]
    batches: Vec<Box<Vec<InputLogEvent>>>,

    /// Total size of the batch at the end of the batches vector.
    current_batch_size: usize,

    /// Oldest and newest timestamps of events in the batch at the end
    /// of the batches vector. This is used to ensure that a batch
    /// does not exceed the 24-hour limit.
    current_batch_time_range: TimestampRange,
}

impl QueuedBatches {
    /// Add a new event.
    ///
    /// There are a couple AWS limits not enforced yet:
    ///
    /// - None of the log events in the batch can be more than 2 hours
    ///   in the future
    ///
    /// - None of the log events in the batch can be older than 14 days
    ///   or older than the retention period of the log group
    #[throws]
    pub fn add_event(&mut self, event: InputLogEvent) {
        let event_size = event.message.as_bytes().len() + EVENT_OVERHEAD;
        if event_size > MAX_BATCH_SIZE {
            // The event is bigger than the batch size
            throw!(Error::EventTooLarge(event_size));
        }

        if self.is_new_batch_needed(&event, event_size) {
            self.batches.push(Box::new(Vec::new()));
            self.current_batch_size = 0;
            self.current_batch_time_range =
                TimestampRange::new(event.timestamp);
        }

        self.current_batch_size += event_size;
        self.current_batch_time_range
            .expand_to_include(event.timestamp);
        // Ok to unwrap here, the code above ensures there is at least
        // one available batch
        let batch = self.batches.last_mut().unwrap();
        batch.push(event);
    }

    fn is_new_batch_needed(
        &self,
        event: &InputLogEvent,
        event_size: usize,
    ) -> bool {
        // Ensure there's at least one batch
        let batch = if let Some(batch) = self.batches.last() {
            batch
        } else {
            return true;
        };

        // Check if maximum number of events exceeded
        if batch.len() >= MAX_EVENTS_IN_BATCH {
            return true;
        }

        // Check if maximum payload size exceeded
        if self.current_batch_size + event_size > MAX_BATCH_SIZE {
            return true;
        }

        // Check if 24-hour limit is exceeded
        if !batch.is_empty() {
            let new_range = self
                .current_batch_time_range
                .expand_to_include_copy(event.timestamp);
            if new_range.duration_in_millis() > MAX_DURATION_MILLIS {
                return true;
            }
        }

        false
    }
}

/// Log group and stream names.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UploadTarget {
    /// Log group name.
    pub group: String,
    /// Log stream name.
    pub stream: String,
}

struct BatchUploaderInternal {
    target: UploadTarget,

    queued_batches: QueuedBatches,

    client: CloudWatchLogsClient,
    next_sequence_token: Option<String>,

    thread_started: bool,
}

impl BatchUploaderInternal {
    #[throws]
    fn refresh_sequence_token(&mut self) {
        let resp = self
            .client
            .describe_log_streams(DescribeLogStreamsRequest {
                limit: Some(1),
                order_by: Some("LogStreamName".into()),
                log_group_name: self.target.group.clone(),
                log_stream_name_prefix: Some(self.target.stream.clone()),
                ..Default::default()
            })
            .sync()?;
        let log_streams = resp.log_streams.ok_or(Error::InvalidLogStream)?;
        // TODO: need to verify that this is correct if you have
        // multiple log streams with the same prefix. I *think* it's
        // good because we are sorting by stream name and limiting to
        // 1, so as long as "myPrefix" gets sorted before
        // "myPrefixAndOtherStuff" this should be correct. Needs
        // testing though.
        let log_stream = log_streams.first().ok_or(Error::InvalidLogStream)?;
        if Some(self.target.stream.clone()) != log_stream.log_stream_name {
            // This should never happen
            error!(
                "log stream name {} != {:?}",
                self.target.stream, log_stream.log_stream_name
            );
            throw!(Error::InvalidLogStream);
        }
        self.next_sequence_token = log_stream.upload_sequence_token.clone();
    }

    #[throws]
    fn upload_batch(&mut self) {
        let mut batch = if let Some(batch) = self.queued_batches.batches.pop() {
            *batch
        } else {
            return;
        };

        // Refresh the sequence token if necessary. This actually
        // isn't needed right after creating the log stream, so
        // there's an unnecessary fetch here the first time, but
        // that's probably fine.
        if self.next_sequence_token.is_none() {
            self.refresh_sequence_token()?;
        }

        // The events must be sorted by timestamp
        batch.sort_unstable_by_key(|event| event.timestamp);

        let req = PutLogEventsRequest {
            log_events: batch,
            sequence_token: self.next_sequence_token.clone(),
            log_group_name: self.target.group.clone(),
            log_stream_name: self.target.stream.clone(),
        };

        match self.client.put_log_events(req).sync() {
            Ok(resp) => {
                self.next_sequence_token = resp.next_sequence_token;

                // TODO: handle rejected events
            }
            Err(err) => {
                // TODO: if the batch upload failed, consider putting
                // the batch back.

                // Clear the sequence token so that it gets refreshed
                // next time
                self.next_sequence_token = None;

                throw!(err);
            }
        }
    }
}

/// Main interface for uploading logs in batches to AWS CloudWatch Logs.
///
/// This can be safely used from multiple threads by cloning it.
#[derive(Clone)]
pub struct BatchUploader {
    internal: Arc<Mutex<BatchUploaderInternal>>,
}

impl BatchUploader {
    pub fn new(
        client: CloudWatchLogsClient,
        target: UploadTarget,
    ) -> BatchUploader {
        BatchUploader {
            internal: Arc::new(Mutex::new(BatchUploaderInternal {
                target,
                client,
                queued_batches: QueuedBatches::default(),
                next_sequence_token: None,
                thread_started: false,
            })),
        }
    }

    /// Add a new event.
    ///
    /// There are a couple AWS limits not enforced yet:
    ///
    /// - None of the log events in the batch can be more than 2 hours
    ///   in the future
    ///
    /// - None of the log events in the batch can be older than 14 days
    ///   or older than the retention period of the log group
    #[throws]
    pub fn add_event(&self, event: InputLogEvent) {
        let mut guard =
            self.internal.lock().map_err(|_| Error::PoisonedLock)?;
        guard.queued_batches.add_event(event)?;
    }

    /// Start a background thread for uploading batches of events.
    #[throws]
    pub fn start_background_thread(&self) -> thread::JoinHandle<()> {
        let mut guard =
            self.internal.lock().map_err(|_| Error::PoisonedLock)?;
        // Prevent multiple upload threads from being started
        if guard.thread_started {
            throw!(Error::ThreadAlreadyStarted);
        }
        guard.thread_started = true;

        let builder =
            thread::Builder::new().name("cloudwatch-logs-upload".into());
        let internal = self.internal.clone();
        let handle = builder
            .spawn(move || loop {
                if let Ok(mut guard) = internal.lock() {
                    // There is a quota of 5 requests per second per log
                    // stream, so upload up to five batches, then sleep
                    // for at least one second.
                    for _ in 0..5 {
                        if let Err(err) = guard.upload_batch() {
                            error!(
                                "CloudWatch Logs batch upload failed: {}",
                                err
                            );
                        }
                    }
                } else {
                    error!("CloudWatch Logs bad lock");
                }
                thread::sleep(Duration::from_secs(1));
            })
            .map_err(Error::SpawnError)?;
        handle
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_event_too_large() {
        let mut qb = QueuedBatches::default();

        // Create a message that is at the limit
        let max_message_size = MAX_BATCH_SIZE - EVENT_OVERHEAD;
        let mut message = String::with_capacity(max_message_size + 1);
        for _ in 0..message.capacity() - 1 {
            message.push('x');
        }

        // Verify the message is added successfully
        qb.add_event(InputLogEvent {
            message: message.clone(),
            timestamp: 0,
        })
        .unwrap();

        // Make the message too big
        message.push('x');
        assert!(matches!(
            qb.add_event(InputLogEvent {
                message,
                timestamp: 0,
            }),
            Err(Error::EventTooLarge(size)) if size == MAX_BATCH_SIZE + 1
        ));
    }

    #[test]
    fn test_max_events_in_batch() {
        let mut qb = QueuedBatches::default();
        for _ in 0..MAX_EVENTS_IN_BATCH {
            qb.add_event(InputLogEvent {
                ..Default::default()
            })
            .unwrap();
            // All of these events should fit in one batch
            assert_eq!(qb.batches.len(), 1);
        }

        // Verify that adding one more event creates a new batch
        qb.add_event(InputLogEvent {
            ..Default::default()
        })
        .unwrap();
        assert_eq!(qb.batches.len(), 2);
    }

    #[test]
    fn test_max_batch_size() {
        let mut qb = QueuedBatches::default();

        // Create a message slightly under the limit
        let message_size = MAX_BATCH_SIZE - EVENT_OVERHEAD * 2;
        let mut message = String::with_capacity(message_size);
        for _ in 0..message.capacity() {
            message.push('x');
        }

        // Verify the message is added successfully
        qb.add_event(InputLogEvent {
            message: message.clone(),
            timestamp: 0,
        })
        .unwrap();
        assert_eq!(qb.batches.len(), 1);
        assert_eq!(qb.current_batch_size, message_size + EVENT_OVERHEAD);

        // Verify that adding one more message within the batch is OK
        qb.add_event(InputLogEvent {
            message: "".to_string(),
            timestamp: 0,
        })
        .unwrap();
        assert_eq!(qb.batches.len(), 1);
        assert_eq!(qb.current_batch_size, message_size + EVENT_OVERHEAD * 2);

        // Verify that adding anything else goes into a new batch
        qb.add_event(InputLogEvent {
            message: "".to_string(),
            timestamp: 0,
        })
        .unwrap();
        assert_eq!(qb.batches.len(), 2);
        assert_eq!(qb.current_batch_size, EVENT_OVERHEAD);
    }

    #[test]
    fn test_timestamp_order() {
        let mut qb = QueuedBatches::default();

        // Add an event at time 1
        qb.add_event(InputLogEvent {
            message: "".to_string(),
            timestamp: 1,
        })
        .unwrap();
        assert_eq!(qb.batches.len(), 1);

        // Add an event at time 0, verify it goes into the same batch
        qb.add_event(InputLogEvent {
            message: "".to_string(),
            timestamp: 0,
        })
        .unwrap();
        assert_eq!(qb.batches.len(), 1);
    }

    #[test]
    fn test_batch_max_duration() {
        let mut qb = QueuedBatches::default();

        // Add an event at time 0
        qb.add_event(InputLogEvent {
            message: "".to_string(),
            timestamp: 0,
        })
        .unwrap();
        assert_eq!(qb.batches.len(), 1);

        // Add an event over 24 hours later, verify it goes into a new batch
        qb.add_event(InputLogEvent {
            message: "".to_string(),
            timestamp: MAX_DURATION_MILLIS + 1,
        })
        .unwrap();
        assert_eq!(qb.batches.len(), 2);
    }
}