#![cfg(feature = "aws")]
#![cfg_attr(docsrs, doc(cfg(feature = "aws")))]
use crate::logger::Status;
use crate::service::aws::{Config, Data, StandardMessageFormatter, MessageFormatter};
use crate::service::{CloudWatchMessage, ServiceError};
use crate::{Fallback, Message};
use aws_config::{BehaviorVersion, Region};
use aws_sdk_cloudwatchlogs::types::InputLogEvent;
use chrono::{SecondsFormat, Utc};
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use tokio::runtime::Builder;
pub trait CloudWatch: Fallback + Sync + Send {
fn status(&self) -> Status;
fn work(&self, receiver: Receiver<CloudWatchMessage>);
}
pub struct SimpleCloudWatch {
data: Arc<Data>,
}
impl SimpleCloudWatch {
pub fn new(config: Config) -> Box<dyn CloudWatch + Send + Sync> {
Self::new_formatted(config, StandardMessageFormatter {})
}
pub fn new_formatted<F>(config: Config, formatter: F) -> Box<dyn CloudWatch + Send + Sync>
where
F: MessageFormatter + Send + Sync + 'static,
{
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let access_key_id = config.get_access_key_id().to_string();
let access_key_secret = config.get_access_key_secret().to_string();
let session_token = config.get_session_token().map(|t| t.to_string());
let expires_in = config.get_expires_in();
let provider = config.get_provider();
let region = config.get_region().to_string();
let client = rt.block_on(async {
let creds = aws_sdk_cloudwatchlogs::config::Credentials::new(
access_key_id,
access_key_secret,
session_token,
expires_in,
provider,
);
let sdk_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(region))
.credentials_provider(creds)
.load()
.await;
aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
});
Box::new(SimpleCloudWatch {
data: Arc::new(Data {
client,
rt,
log_group: config.get_log_group().to_string(),
formatter: Box::new(formatter),
}),
})
}
pub fn from_env(log_group: String) -> Box<dyn CloudWatch + Send + Sync> {
Self::from_env_formatted(log_group, StandardMessageFormatter {})
}
pub fn from_env_formatted<F>(
log_group: String,
formatter: F,
) -> Box<dyn CloudWatch + Send + Sync>
where
F: MessageFormatter + Send + Sync + 'static,
{
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let client = rt.block_on(async {
let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
});
Box::new(SimpleCloudWatch {
data: Arc::new(Data {
client,
rt,
log_group,
formatter: Box::new(formatter),
}),
})
}
pub fn log_group(&self) -> &str {
&self.data.log_group
}
pub fn get_data(&self) -> Arc<Data> {
self.data.clone()
}
}
impl CloudWatch for SimpleCloudWatch {
fn status(&self) -> Status {
let result = self.data.rt.block_on(async {
self.data
.client
.describe_log_groups()
.log_group_name_prefix(&self.data.log_group)
.send()
.await
});
match result {
Ok(output) => {
let exists = output
.log_groups()
.iter()
.any(|g| g.log_group_name() == Some(&self.data.log_group));
if exists {
Status::Running
} else {
Status::Broken
}
}
Err(_) => {
Status::Broken
}
}
}
fn work(&self, receiver: Receiver<CloudWatchMessage>) {
let mut messages = Vec::with_capacity(128);
while let Ok(message) = receiver.recv() {
messages.clear();
messages.push(message);
while let Ok(message) = receiver.try_recv() {
messages.push(message);
}
let data = &self.data;
data.rt.block_on(async {
let mut log_events = Vec::with_capacity(messages.len());
for msg in &messages {
let event = InputLogEvent::builder()
.message(data.formatter.format(&msg.message))
.timestamp(msg.timestamp) .build()
.expect("Failed to build log event");
log_events.push(event);
}
let _ = data
.client
.put_log_events()
.log_group_name(&data.log_group)
.log_stream_name("my-stream-name") .set_log_events(Some(log_events))
.send()
.await;
});
} }
}
impl Fallback for SimpleCloudWatch {
fn fallback(&self, _error: &ServiceError, msg: &Message) {
let now: chrono::DateTime<Utc> = msg.instant().into();
let now = now.to_rfc3339_opts(SecondsFormat::Nanos, true);
println!("{} [{}] | {}", now, msg.level(), msg.content());
}
}