use fehler::{throw, throws};
use log::error;
use rusoto_core::RusotoError;
use rusoto_logs::{
CloudWatchLogs, CloudWatchLogsClient, DescribeLogStreamsError,
DescribeLogStreamsRequest, InputLogEvent, PutLogEventsError,
PutLogEventsRequest,
};
use std::{
io,
sync::{Arc, Mutex},
thread,
time::{Duration, SystemTime},
};
pub const MAX_EVENTS_IN_BATCH: usize = 10_000;
pub const MAX_BATCH_SIZE: usize = 1_048_576;
pub const EVENT_OVERHEAD: usize = 26;
pub type Timestamp = i64;
pub const MAX_DURATION_MILLIS: i64 = 24 * 60 * 60 * 1000;
pub fn get_current_timestamp() -> Timestamp {
if let Ok(duration) =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
{
duration.as_millis() as Timestamp
} else {
0
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("event exceeds the max batch size")]
EventTooLarge(usize),
#[error("failed to upload log batch: {0}")]
PutLogsError(#[from] RusotoError<PutLogEventsError>),
#[error("failed to get sequence token: {0}")]
SequenceTokenError(#[from] RusotoError<DescribeLogStreamsError>),
#[error("invalid log stream")]
InvalidLogStream,
#[error("failed to lock the mutex")]
PoisonedLock,
#[error("upload thread already started")]
ThreadAlreadyStarted,
#[error("failed to spawn thread: {0}")]
SpawnError(io::Error),
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct TimestampRange {
pub start: Timestamp,
pub end: Timestamp,
}
impl TimestampRange {
pub fn new(t: Timestamp) -> TimestampRange {
TimestampRange { start: t, end: t }
}
pub fn duration_in_millis(&self) -> i64 {
self.end - self.start
}
pub fn expand_to_include(&mut self, t: Timestamp) {
if t < self.start {
self.start = t;
}
if t > self.end {
self.end = t;
}
}
pub fn expand_to_include_copy(&self, t: Timestamp) -> TimestampRange {
let mut copy = *self;
copy.expand_to_include(t);
copy
}
}
#[derive(Default)]
pub struct QueuedBatches {
#[allow(clippy::vec_box)]
batches: Vec<Box<Vec<InputLogEvent>>>,
current_batch_size: usize,
current_batch_time_range: TimestampRange,
}
impl QueuedBatches {
#[throws]
pub fn add_event(&mut self, event: InputLogEvent) {
let event_size = event.message.as_bytes().len() + EVENT_OVERHEAD;
if event_size > MAX_BATCH_SIZE {
throw!(Error::EventTooLarge(event_size));
}
if self.is_new_batch_needed(&event, event_size) {
self.batches.push(Box::new(Vec::new()));
self.current_batch_size = 0;
self.current_batch_time_range =
TimestampRange::new(event.timestamp);
}
self.current_batch_size += event_size;
self.current_batch_time_range
.expand_to_include(event.timestamp);
let batch = self.batches.last_mut().unwrap();
batch.push(event);
}
fn is_new_batch_needed(
&self,
event: &InputLogEvent,
event_size: usize,
) -> bool {
let batch = if let Some(batch) = self.batches.last() {
batch
} else {
return true;
};
if batch.len() >= MAX_EVENTS_IN_BATCH {
return true;
}
if self.current_batch_size + event_size > MAX_BATCH_SIZE {
return true;
}
if !batch.is_empty() {
let new_range = self
.current_batch_time_range
.expand_to_include_copy(event.timestamp);
if new_range.duration_in_millis() > MAX_DURATION_MILLIS {
return true;
}
}
false
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UploadTarget {
pub group: String,
pub stream: String,
}
struct BatchUploaderInternal {
target: UploadTarget,
queued_batches: QueuedBatches,
client: CloudWatchLogsClient,
next_sequence_token: Option<String>,
thread_started: bool,
}
impl BatchUploaderInternal {
#[throws]
fn refresh_sequence_token(&mut self) {
let resp = self
.client
.describe_log_streams(DescribeLogStreamsRequest {
limit: Some(1),
order_by: Some("LogStreamName".into()),
log_group_name: self.target.group.clone(),
log_stream_name_prefix: Some(self.target.stream.clone()),
..Default::default()
})
.sync()?;
let log_streams = resp.log_streams.ok_or(Error::InvalidLogStream)?;
let log_stream = log_streams.first().ok_or(Error::InvalidLogStream)?;
if Some(self.target.stream.clone()) != log_stream.log_stream_name {
error!(
"log stream name {} != {:?}",
self.target.stream, log_stream.log_stream_name
);
throw!(Error::InvalidLogStream);
}
self.next_sequence_token = log_stream.upload_sequence_token.clone();
}
#[throws]
fn upload_batch(&mut self) {
let mut batch = if let Some(batch) = self.queued_batches.batches.pop() {
*batch
} else {
return;
};
if self.next_sequence_token.is_none() {
self.refresh_sequence_token()?;
}
batch.sort_unstable_by_key(|event| event.timestamp);
let req = PutLogEventsRequest {
log_events: batch,
sequence_token: self.next_sequence_token.clone(),
log_group_name: self.target.group.clone(),
log_stream_name: self.target.stream.clone(),
};
match self.client.put_log_events(req).sync() {
Ok(resp) => {
self.next_sequence_token = resp.next_sequence_token;
}
Err(err) => {
self.next_sequence_token = None;
throw!(err);
}
}
}
}
#[derive(Clone)]
pub struct BatchUploader {
internal: Arc<Mutex<BatchUploaderInternal>>,
}
impl BatchUploader {
pub fn new(
client: CloudWatchLogsClient,
target: UploadTarget,
) -> BatchUploader {
BatchUploader {
internal: Arc::new(Mutex::new(BatchUploaderInternal {
target,
client,
queued_batches: QueuedBatches::default(),
next_sequence_token: None,
thread_started: false,
})),
}
}
#[throws]
pub fn add_event(&self, event: InputLogEvent) {
let mut guard =
self.internal.lock().map_err(|_| Error::PoisonedLock)?;
guard.queued_batches.add_event(event)?;
}
#[throws]
pub fn start_background_thread(&self) -> thread::JoinHandle<()> {
let mut guard =
self.internal.lock().map_err(|_| Error::PoisonedLock)?;
if guard.thread_started {
throw!(Error::ThreadAlreadyStarted);
}
guard.thread_started = true;
let builder =
thread::Builder::new().name("cloudwatch-logs-upload".into());
let internal = self.internal.clone();
let handle = builder
.spawn(move || loop {
if let Ok(mut guard) = internal.lock() {
for _ in 0..5 {
if let Err(err) = guard.upload_batch() {
error!(
"CloudWatch Logs batch upload failed: {}",
err
);
}
}
} else {
error!("CloudWatch Logs bad lock");
}
thread::sleep(Duration::from_secs(1));
})
.map_err(Error::SpawnError)?;
handle
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_too_large() {
let mut qb = QueuedBatches::default();
let max_message_size = MAX_BATCH_SIZE - EVENT_OVERHEAD;
let mut message = String::with_capacity(max_message_size + 1);
for _ in 0..message.capacity() - 1 {
message.push('x');
}
qb.add_event(InputLogEvent {
message: message.clone(),
timestamp: 0,
})
.unwrap();
message.push('x');
assert!(matches!(
qb.add_event(InputLogEvent {
message,
timestamp: 0,
}),
Err(Error::EventTooLarge(size)) if size == MAX_BATCH_SIZE + 1
));
}
#[test]
fn test_max_events_in_batch() {
let mut qb = QueuedBatches::default();
for _ in 0..MAX_EVENTS_IN_BATCH {
qb.add_event(InputLogEvent {
..Default::default()
})
.unwrap();
assert_eq!(qb.batches.len(), 1);
}
qb.add_event(InputLogEvent {
..Default::default()
})
.unwrap();
assert_eq!(qb.batches.len(), 2);
}
#[test]
fn test_max_batch_size() {
let mut qb = QueuedBatches::default();
let message_size = MAX_BATCH_SIZE - EVENT_OVERHEAD * 2;
let mut message = String::with_capacity(message_size);
for _ in 0..message.capacity() {
message.push('x');
}
qb.add_event(InputLogEvent {
message: message.clone(),
timestamp: 0,
})
.unwrap();
assert_eq!(qb.batches.len(), 1);
assert_eq!(qb.current_batch_size, message_size + EVENT_OVERHEAD);
qb.add_event(InputLogEvent {
message: "".to_string(),
timestamp: 0,
})
.unwrap();
assert_eq!(qb.batches.len(), 1);
assert_eq!(qb.current_batch_size, message_size + EVENT_OVERHEAD * 2);
qb.add_event(InputLogEvent {
message: "".to_string(),
timestamp: 0,
})
.unwrap();
assert_eq!(qb.batches.len(), 2);
assert_eq!(qb.current_batch_size, EVENT_OVERHEAD);
}
#[test]
fn test_timestamp_order() {
let mut qb = QueuedBatches::default();
qb.add_event(InputLogEvent {
message: "".to_string(),
timestamp: 1,
})
.unwrap();
assert_eq!(qb.batches.len(), 1);
qb.add_event(InputLogEvent {
message: "".to_string(),
timestamp: 0,
})
.unwrap();
assert_eq!(qb.batches.len(), 1);
}
#[test]
fn test_batch_max_duration() {
let mut qb = QueuedBatches::default();
qb.add_event(InputLogEvent {
message: "".to_string(),
timestamp: 0,
})
.unwrap();
assert_eq!(qb.batches.len(), 1);
qb.add_event(InputLogEvent {
message: "".to_string(),
timestamp: MAX_DURATION_MILLIS + 1,
})
.unwrap();
assert_eq!(qb.batches.len(), 2);
}
}