1#![warn(missing_docs)]
4
5use fehler::{throw, throws};
6use log::error;
7use rusoto_core::RusotoError;
8use rusoto_logs::{
9 CloudWatchLogs, CloudWatchLogsClient, DescribeLogStreamsError,
10 DescribeLogStreamsRequest, InputLogEvent, PutLogEventsError,
11 PutLogEventsRequest,
12};
13use std::{
14 io,
15 sync::{Arc, Mutex},
16 thread,
17 time::{Duration, SystemTime},
18};
19
20pub const MAX_EVENTS_IN_BATCH: usize = 10_000;
22
23pub const MAX_BATCH_SIZE: usize = 1_048_576;
27
28pub const EVENT_OVERHEAD: usize = 26;
32
33pub type Timestamp = i64;
36
37pub const MAX_DURATION_MILLIS: i64 = 24 * 60 * 60 * 1000;
40
41pub fn get_current_timestamp() -> Timestamp {
44 if let Ok(duration) =
45 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
46 {
47 duration.as_millis() as Timestamp
48 } else {
49 0
50 }
51}
52
53#[derive(Debug, thiserror::Error)]
55pub enum Error {
56 #[error("event exceeds the max batch size")]
58 EventTooLarge(usize),
59
60 #[error("failed to upload log batch: {0}")]
62 PutLogsError(#[from] RusotoError<PutLogEventsError>),
63
64 #[error("failed to get sequence token: {0}")]
67 SequenceTokenError(#[from] RusotoError<DescribeLogStreamsError>),
68
69 #[error("invalid log stream")]
71 InvalidLogStream,
72
73 #[error("failed to lock the mutex")]
75 PoisonedLock,
76
77 #[error("upload thread already started")]
79 ThreadAlreadyStarted,
80
81 #[error("failed to spawn thread: {0}")]
83 SpawnError(io::Error),
84}
85
86#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
88pub struct TimestampRange {
89 pub start: Timestamp,
91 pub end: Timestamp,
93}
94
95impl TimestampRange {
96 pub fn new(t: Timestamp) -> TimestampRange {
98 TimestampRange { start: t, end: t }
99 }
100
101 pub fn duration_in_millis(&self) -> i64 {
103 self.end - self.start
104 }
105
106 pub fn expand_to_include(&mut self, t: Timestamp) {
108 if t < self.start {
109 self.start = t;
110 }
111 if t > self.end {
112 self.end = t;
113 }
114 }
115
116 pub fn expand_to_include_copy(&self, t: Timestamp) -> TimestampRange {
119 let mut copy = *self;
120 copy.expand_to_include(t);
121 copy
122 }
123}
124
125#[derive(Default)]
132pub struct QueuedBatches {
133 #[allow(clippy::vec_box)]
147 batches: Vec<Box<Vec<InputLogEvent>>>,
148
149 current_batch_size: usize,
151
152 current_batch_time_range: TimestampRange,
156}
157
158impl QueuedBatches {
159 #[throws]
169 pub fn add_event(&mut self, event: InputLogEvent) {
170 let event_size = event.message.as_bytes().len() + EVENT_OVERHEAD;
171 if event_size > MAX_BATCH_SIZE {
172 throw!(Error::EventTooLarge(event_size));
174 }
175
176 if self.is_new_batch_needed(&event, event_size) {
177 self.batches.push(Box::new(Vec::new()));
178 self.current_batch_size = 0;
179 self.current_batch_time_range =
180 TimestampRange::new(event.timestamp);
181 }
182
183 self.current_batch_size += event_size;
184 self.current_batch_time_range
185 .expand_to_include(event.timestamp);
186 let batch = self.batches.last_mut().unwrap();
189 batch.push(event);
190 }
191
192 fn is_new_batch_needed(
193 &self,
194 event: &InputLogEvent,
195 event_size: usize,
196 ) -> bool {
197 let batch = if let Some(batch) = self.batches.last() {
199 batch
200 } else {
201 return true;
202 };
203
204 if batch.len() >= MAX_EVENTS_IN_BATCH {
206 return true;
207 }
208
209 if self.current_batch_size + event_size > MAX_BATCH_SIZE {
211 return true;
212 }
213
214 if !batch.is_empty() {
216 let new_range = self
217 .current_batch_time_range
218 .expand_to_include_copy(event.timestamp);
219 if new_range.duration_in_millis() > MAX_DURATION_MILLIS {
220 return true;
221 }
222 }
223
224 false
225 }
226}
227
228#[derive(Clone, Debug, Eq, PartialEq)]
230pub struct UploadTarget {
231 pub group: String,
233 pub stream: String,
235}
236
237struct BatchUploaderInternal {
238 target: UploadTarget,
239
240 queued_batches: QueuedBatches,
241
242 client: CloudWatchLogsClient,
243 next_sequence_token: Option<String>,
244
245 thread_started: bool,
246}
247
248impl BatchUploaderInternal {
249 #[throws]
250 fn refresh_sequence_token(&mut self) {
251 let resp = self
252 .client
253 .describe_log_streams(DescribeLogStreamsRequest {
254 limit: Some(1),
255 order_by: Some("LogStreamName".into()),
256 log_group_name: self.target.group.clone(),
257 log_stream_name_prefix: Some(self.target.stream.clone()),
258 ..Default::default()
259 })
260 .sync()?;
261 let log_streams = resp.log_streams.ok_or(Error::InvalidLogStream)?;
262 let log_stream = log_streams.first().ok_or(Error::InvalidLogStream)?;
269 if Some(self.target.stream.clone()) != log_stream.log_stream_name {
270 error!(
272 "log stream name {} != {:?}",
273 self.target.stream, log_stream.log_stream_name
274 );
275 throw!(Error::InvalidLogStream);
276 }
277 self.next_sequence_token = log_stream.upload_sequence_token.clone();
278 }
279
280 #[throws]
281 fn upload_batch(&mut self) {
282 let mut batch = if let Some(batch) = self.queued_batches.batches.pop() {
283 *batch
284 } else {
285 return;
286 };
287
288 if self.next_sequence_token.is_none() {
293 self.refresh_sequence_token()?;
294 }
295
296 batch.sort_unstable_by_key(|event| event.timestamp);
298
299 let req = PutLogEventsRequest {
300 log_events: batch,
301 sequence_token: self.next_sequence_token.clone(),
302 log_group_name: self.target.group.clone(),
303 log_stream_name: self.target.stream.clone(),
304 };
305
306 match self.client.put_log_events(req).sync() {
307 Ok(resp) => {
308 self.next_sequence_token = resp.next_sequence_token;
309
310 }
312 Err(err) => {
313 self.next_sequence_token = None;
319
320 throw!(err);
321 }
322 }
323 }
324}
325
326#[derive(Clone)]
330pub struct BatchUploader {
331 internal: Arc<Mutex<BatchUploaderInternal>>,
332}
333
334impl BatchUploader {
335 pub fn new(
337 client: CloudWatchLogsClient,
338 target: UploadTarget,
339 ) -> BatchUploader {
340 BatchUploader {
341 internal: Arc::new(Mutex::new(BatchUploaderInternal {
342 target,
343 client,
344 queued_batches: QueuedBatches::default(),
345 next_sequence_token: None,
346 thread_started: false,
347 })),
348 }
349 }
350
351 #[throws]
361 pub fn add_event(&self, event: InputLogEvent) {
362 let mut guard =
363 self.internal.lock().map_err(|_| Error::PoisonedLock)?;
364 guard.queued_batches.add_event(event)?;
365 }
366
367 #[throws]
369 pub fn start_background_thread(&self) -> thread::JoinHandle<()> {
370 let mut guard =
371 self.internal.lock().map_err(|_| Error::PoisonedLock)?;
372 if guard.thread_started {
374 throw!(Error::ThreadAlreadyStarted);
375 }
376 guard.thread_started = true;
377
378 let builder =
379 thread::Builder::new().name("cloudwatch-logs-upload".into());
380 let internal = self.internal.clone();
381 let handle = builder
382 .spawn(move || loop {
383 if let Ok(mut guard) = internal.lock() {
384 for _ in 0..5 {
388 if let Err(err) = guard.upload_batch() {
389 error!(
390 "CloudWatch Logs batch upload failed: {}",
391 err
392 );
393 }
394 }
395 } else {
396 error!("CloudWatch Logs bad lock");
397 }
398 thread::sleep(Duration::from_secs(1));
399 })
400 .map_err(Error::SpawnError)?;
401 handle
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
410 fn test_event_too_large() {
411 let mut qb = QueuedBatches::default();
412
413 let max_message_size = MAX_BATCH_SIZE - EVENT_OVERHEAD;
415 let mut message = String::with_capacity(max_message_size + 1);
416 for _ in 0..message.capacity() - 1 {
417 message.push('x');
418 }
419
420 qb.add_event(InputLogEvent {
422 message: message.clone(),
423 timestamp: 0,
424 })
425 .unwrap();
426
427 message.push('x');
429 assert!(matches!(
430 qb.add_event(InputLogEvent {
431 message,
432 timestamp: 0,
433 }),
434 Err(Error::EventTooLarge(size)) if size == MAX_BATCH_SIZE + 1
435 ));
436 }
437
438 #[test]
439 fn test_max_events_in_batch() {
440 let mut qb = QueuedBatches::default();
441 for _ in 0..MAX_EVENTS_IN_BATCH {
442 qb.add_event(InputLogEvent {
443 ..Default::default()
444 })
445 .unwrap();
446 assert_eq!(qb.batches.len(), 1);
448 }
449
450 qb.add_event(InputLogEvent {
452 ..Default::default()
453 })
454 .unwrap();
455 assert_eq!(qb.batches.len(), 2);
456 }
457
458 #[test]
459 fn test_max_batch_size() {
460 let mut qb = QueuedBatches::default();
461
462 let message_size = MAX_BATCH_SIZE - EVENT_OVERHEAD * 2;
464 let mut message = String::with_capacity(message_size);
465 for _ in 0..message.capacity() {
466 message.push('x');
467 }
468
469 qb.add_event(InputLogEvent {
471 message: message.clone(),
472 timestamp: 0,
473 })
474 .unwrap();
475 assert_eq!(qb.batches.len(), 1);
476 assert_eq!(qb.current_batch_size, message_size + EVENT_OVERHEAD);
477
478 qb.add_event(InputLogEvent {
480 message: "".to_string(),
481 timestamp: 0,
482 })
483 .unwrap();
484 assert_eq!(qb.batches.len(), 1);
485 assert_eq!(qb.current_batch_size, message_size + EVENT_OVERHEAD * 2);
486
487 qb.add_event(InputLogEvent {
489 message: "".to_string(),
490 timestamp: 0,
491 })
492 .unwrap();
493 assert_eq!(qb.batches.len(), 2);
494 assert_eq!(qb.current_batch_size, EVENT_OVERHEAD);
495 }
496
497 #[test]
498 fn test_timestamp_order() {
499 let mut qb = QueuedBatches::default();
500
501 qb.add_event(InputLogEvent {
503 message: "".to_string(),
504 timestamp: 1,
505 })
506 .unwrap();
507 assert_eq!(qb.batches.len(), 1);
508
509 qb.add_event(InputLogEvent {
511 message: "".to_string(),
512 timestamp: 0,
513 })
514 .unwrap();
515 assert_eq!(qb.batches.len(), 1);
516 }
517
518 #[test]
519 fn test_batch_max_duration() {
520 let mut qb = QueuedBatches::default();
521
522 qb.add_event(InputLogEvent {
524 message: "".to_string(),
525 timestamp: 0,
526 })
527 .unwrap();
528 assert_eq!(qb.batches.len(), 1);
529
530 qb.add_event(InputLogEvent {
532 message: "".to_string(),
533 timestamp: MAX_DURATION_MILLIS + 1,
534 })
535 .unwrap();
536 assert_eq!(qb.batches.len(), 2);
537 }
538}