use chrono::DateTime;
use chrono::Datelike;
use chrono::Local;
use chrono::SecondsFormat;
use chrono::TimeZone;
use chrono::Utc;
use lazy_static::lazy_static;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use snafu::Snafu;
use std::collections::HashMap;
use std::string::ToString;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use strum_macros::Display;
use strum_macros::EnumString;
lazy_static! {
static ref LOG: RwLock<Option<std::sync::mpsc::SyncSender<LogMessage>>> = RwLock::new(None);
}
lazy_static! {
static ref SHOULD_LOOP: AtomicBool = AtomicBool::new(false);
}
lazy_static! {
static ref IS_FLUSHED: AtomicBool = AtomicBool::new(false);
}
lazy_static! {
static ref LOG_TO_CONSOLE: AtomicBool = AtomicBool::new(false);
}
lazy_static! {
static ref LOG_TO_ELASTIC: AtomicBool = AtomicBool::new(false);
}
static WAIT: Duration = Duration::from_secs(1);
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Request failed: {}", inner))]
RequestFailed { inner: reqwest::Error },
#[snafu(display("Failed to deserialize log API reply: {}", inner))]
DeserializationFailed { inner: reqwest::Error },
#[snafu(display(
"Some, or all chunked logs, were not accepted by the log API: {}",
errors
))]
SomeLogsWereNotAccepted { errors: String },
#[snafu(display("The log API rejected the whole chunked log request: {}", errors))]
ApiRejectedLogPayload { errors: String },
}
#[derive(Display, Debug, PartialEq, Eq, EnumString)]
pub enum LogLevel {
#[strum(to_string = "DBG")]
Debug,
#[strum(to_string = "INF")]
Information,
#[strum(to_string = "WRN")]
Warning,
#[strum(to_string = "ERR")]
Error,
#[strum(to_string = "FTL")]
Fatal,
}
#[derive(Serialize, Deserialize, Display, PartialEq, Eq, Hash, EnumString, Clone)]
pub enum LogEnvironment {
Production,
Development,
}
pub struct LogMessage {
level: LogLevel,
message_template: String,
message: String,
fields: HashMap<String, String>,
}
#[derive(Serialize)]
struct LogInnerIndex {
#[serde(rename(serialize = "_index"))]
index: String,
}
#[derive(Serialize)]
struct LogIndex {
index: LogInnerIndex,
}
#[derive(Deserialize, Serialize, Debug)]
struct ElasticErrorCause {
r#type: String,
reason: String,
}
#[derive(Deserialize, Serialize, Debug)]
struct ElasticError {
caused_by: ElasticErrorCause,
}
#[derive(Serialize, Deserialize)]
struct ElasticSingleLogStatus {
#[serde(rename(deserialize = "_index"))]
index: String,
status: u16,
#[serde(rename(deserialize = "_id"))]
id: String,
error: Option<ElasticError>,
}
#[derive(serde::Deserialize, Serialize)]
struct ElasticSingleReplyIndex {
index: ElasticSingleLogStatus,
}
#[derive(serde::Deserialize, Serialize)]
struct ElasticReply {
errors: bool,
items: Vec<ElasticSingleReplyIndex>,
}
#[derive(serde::Deserialize, Serialize)]
struct ElasticErrorReplyError {
reason: String,
r#type: String,
}
#[derive(serde::Deserialize, Serialize)]
struct ElasticErrorReply {
error: ElasticErrorReplyError,
status: u16,
}
pub fn create_console_message<T>(level: &LogLevel, message: &str, time: &DateTime<T>) -> String
where
T: TimeZone,
{
let time_str = time.time().format("%H:%M:%S");
format!("[{} {}]: {}", time_str, level, message)
}
pub fn log<S>(level: LogLevel, template: &str, fields: HashMap<S, String>)
where
S: std::fmt::Display,
{
let mut message = template.to_string();
for (key, value) in fields.iter() {
message = message.replacen(format!("{{{}}}", key).as_str(), value.as_str(), 1);
}
let mut fields_copy = HashMap::<String, String>::with_capacity(fields.len());
for (key, value) in fields.into_iter() {
fields_copy.insert(key.to_string(), value);
}
let log_msg = LogMessage {
level,
message_template: template.to_string(),
message,
fields: fields_copy,
};
{
let logger = LOG.read().unwrap();
if let Some(logger_) = &*logger {
logger_
.send(log_msg)
.expect("Relastic queue has stopped working");
return; }
}
let time = chrono::Local::now();
if LOG_TO_ELASTIC.load(std::sync::atomic::Ordering::SeqCst) {
println!(
"{}",
create_console_message(
&LogLevel::Warning,
"Could not log to elastic. Logging to console as fallback",
&time
)
);
}
println!(
"{}",
create_console_message(&log_msg.level, &log_msg.message, &time)
);
}
pub fn get_matches(input: &str) -> std::collections::VecDeque<String> {
let mut matches = std::collections::VecDeque::new();
let mut word = std::string::String::new();
for c in input.chars() {
match c {
'{' => {
word.clear();
}
'}' => {
matches.push_back(word.clone());
word.clear();
}
_ => {
word.push(c);
}
}
}
matches
}
#[macro_export]
macro_rules! log_debug {
($fmt: literal, $($arg:expr),*) => {
{
let mut matches = $crate::log::get_matches($fmt);
let mut map = std::collections::HashMap::new();
$(
if let Some(log_variable_name) = matches.pop_front()
{
map.insert(log_variable_name, $arg.to_string());
}
)*
$crate::log::log($crate::log::LogLevel::Debug, $fmt, map);
}
}
}
#[macro_export]
macro_rules! log_information {
($fmt: literal, $($arg:expr),*) => {
{
let mut matches = $crate::log::get_matches($fmt);
let mut map = std::collections::HashMap::new();
$(
if let Some(log_variable_name) = matches.pop_front()
{
map.insert(log_variable_name, $arg.to_string());
}
)*
$crate::log::log($crate::log::LogLevel::Information, $fmt, map);
}
}
}
#[macro_export]
macro_rules! log_warning {
($fmt: literal, $($arg:expr),*) => {
{
let mut matches = $crate::log::get_matches($fmt);
let mut map = std::collections::HashMap::new();
$(
if let Some(log_variable_name) = matches.pop_front()
{
map.insert(log_variable_name, $arg.to_string());
}
)*
$crate::log::log($crate::log::LogLevel::Warning, $fmt, map);
}
}
}
#[macro_export]
macro_rules! log_error {
($fmt: literal, $($arg:expr),*) => {
{
let mut matches = $crate::log::get_matches($fmt);
let mut map = std::collections::HashMap::new();
$(
if let Some(log_variable_name) = matches.pop_front()
{
map.insert(log_variable_name, $arg.to_string());
}
)*
$crate::log::log($crate::log::LogLevel::Error, $fmt, map);
}
}
}
#[macro_export]
macro_rules! log_fatal {
($fmt: literal, $($arg:expr),*) => {
{
let mut matches = $crate::log::get_matches($fmt);
let mut map = std::collections::HashMap::new();
$(
if let Some(log_variable_name) = matches.pop_front()
{
map.insert(log_variable_name, $arg.to_string());
}
)*
$crate::log::log($crate::log::LogLevel::Fatal, $fmt, map);
}
}
}
pub fn information(template: &str, fields: HashMap<&str, String>) {
log(LogLevel::Information, template, fields)
}
pub fn error(template: &str, fields: HashMap<&str, String>) {
log(LogLevel::Error, template, fields)
}
pub fn debug(template: &str, fields: HashMap<&str, String>) {
log(LogLevel::Debug, template, fields)
}
pub fn fatal(template: &str, fields: HashMap<&str, String>) {
log(LogLevel::Fatal, template, fields)
}
pub fn warning(template: &str, fields: HashMap<&str, String>) {
log(LogLevel::Warning, template, fields)
}
pub fn flush() {
println!("Waiting for log grace-period to expire...");
thread::sleep(WAIT);
println!("Closing log transceiver");
{
let mut sender = LOG.write().unwrap();
*sender = None;
}
println!("Flushing logs");
SHOULD_LOOP.store(false, std::sync::atomic::Ordering::SeqCst);
while !IS_FLUSHED.load(std::sync::atomic::Ordering::SeqCst) {
println!("Waiting for logs to flush...");
thread::sleep(WAIT);
}
println!("Flushed logs");
}
#[derive(Serialize)]
struct LogPayload {
#[serde(rename(serialize = "@timestamp"))]
timestamp: String,
level: String,
#[serde(rename(serialize = "messageTemplate"))]
message_template: String,
message: String,
fields: HashMap<String, String>,
}
#[derive(Clone, PartialEq)]
pub struct ElasticConfig {
pub username: String,
pub password: String,
pub url: String,
pub environment: LogEnvironment,
pub application_name: String,
pub log_to_console: Option<bool>,
}
fn concat_fields(
default_fields: &HashMap<String, String>,
extra_fields: HashMap<String, String>,
) -> HashMap<String, String> {
let mut all_fields = HashMap::new();
for (key, value) in default_fields.iter() {
all_fields.insert(key.to_owned(), value.to_owned());
}
for (key, value) in extra_fields.into_iter() {
all_fields.insert(key, value);
}
all_fields
}
#[cfg(test)]
pub fn test_post_to_elastic(elastic_credentials: ElasticConfig, json: String) -> Result<(), Error> {
let client = reqwest::blocking::Client::new();
post_to_elastic(client, elastic_credentials, json)
}
fn post_to_elastic(
client: reqwest::blocking::Client,
elastic_credentials: ElasticConfig,
json: String,
) -> Result<(), Error> {
let result = client
.post(format!("{}/_bulk", elastic_credentials.url))
.basic_auth(
elastic_credentials.username,
Some(elastic_credentials.password),
)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.body(json.to_owned())
.send()
.map_err(|x| Error::RequestFailed { inner: x })?;
if result.status().is_success() {
let ok_result = result
.json::<ElasticReply>()
.map_err(|x| Error::DeserializationFailed { inner: x })?;
let mut errors = Vec::<String>::new();
for index in ok_result.items.into_iter() {
if let Some(some_error) = index.index.error {
errors.push(format!(
"[{}]: {}",
some_error.caused_by.r#type, some_error.caused_by.reason
));
}
}
if !errors.is_empty() {
return Err(Error::SomeLogsWereNotAccepted {
errors: errors.join("\n"),
});
}
return Ok(());
}
let err_result = result.json::<ElasticErrorReply>();
if let Ok(some_err_result) = err_result {
Err(Error::ApiRejectedLogPayload {
errors: json!(some_err_result).to_string(),
})
} else {
Err(Error::ApiRejectedLogPayload {
errors: format!("[Unhandled Error]: Payload {}", json),
})
}
}
fn serialize_to_chunks(log_chunks: Vec<(LogIndex, LogPayload)>) -> String {
let mut chunks = Vec::<String>::new();
for log_chunk in log_chunks.into_iter() {
chunks.push(json!(log_chunk.0).to_string());
chunks.push(json!(log_chunk.1).to_string());
}
chunks.push("\n".to_string());
chunks.join("\n")
}
fn log_to_elastic(
client: reqwest::blocking::Client,
elastic_credentials: ElasticConfig,
log_chunks: Vec<(LogIndex, LogPayload)>,
) {
let json = serialize_to_chunks(log_chunks);
let result = post_to_elastic(client, elastic_credentials, json);
if let Err(result_err) = result {
eprintln!("error: {}", result_err)
}
}
pub fn setup_console_log() {
LOG_TO_CONSOLE.store(true, std::sync::atomic::Ordering::SeqCst);
SHOULD_LOOP.store(false, std::sync::atomic::Ordering::SeqCst);
IS_FLUSHED.store(true, std::sync::atomic::Ordering::SeqCst);
LOG_TO_ELASTIC.store(false, std::sync::atomic::Ordering::SeqCst);
println!(
"{}",
create_console_message(
&LogLevel::Information,
"Relastic initialized to log to console",
&Local::now()
)
);
}
pub fn setup_elastic_log(config: ElasticConfig, buffer_size: usize) {
LOG_TO_CONSOLE.store(
config.log_to_console.unwrap_or(true),
std::sync::atomic::Ordering::SeqCst,
);
LOG_TO_ELASTIC.store(true, std::sync::atomic::Ordering::SeqCst);
let client = reqwest::blocking::Client::new();
let (tx, rx) = mpsc::sync_channel::<LogMessage>(buffer_size);
{
let mut logger = LOG.write().unwrap();
*logger = Some(tx);
}
let app_name_lowercase = config.application_name.to_lowercase();
let env_str_lowercase = config.environment.to_owned().to_string().to_lowercase();
let default_fields = HashMap::from([
("ApplicationName".to_string(), app_name_lowercase.to_owned()),
(
"HostingEnvironment".to_string(),
env_str_lowercase.to_owned(),
),
]);
SHOULD_LOOP.store(true, std::sync::atomic::Ordering::SeqCst);
thread::spawn(move || {
loop {
let mut payload_chunks = Vec::<(LogIndex, LogPayload)>::new();
let next_logs = rx.recv_timeout(WAIT);
if next_logs.is_err() {
if !SHOULD_LOOP.load(std::sync::atomic::Ordering::SeqCst) {
IS_FLUSHED.store(true, std::sync::atomic::Ordering::SeqCst);
return;
}
continue;
}
for received in next_logs.into_iter() {
let dt = Utc::now();
let str_index: String = format!(
"{}-{}-{}.{}",
app_name_lowercase,
env_str_lowercase,
dt.date_naive().year(),
dt.date_naive().month()
);
let index = LogIndex {
index: LogInnerIndex { index: str_index },
};
let log_payload = LogPayload {
timestamp: dt.to_rfc3339_opts(SecondsFormat::Nanos, true),
level: received.level.to_string(),
message_template: received.message_template,
message: received.message,
fields: concat_fields(&default_fields, received.fields),
};
if received.level == LogLevel::Debug
&& config.environment == LogEnvironment::Production
{
continue;
}
if LOG_TO_CONSOLE.load(std::sync::atomic::Ordering::SeqCst) {
let local_time = Local::now();
println!(
"{}",
create_console_message(&received.level, &log_payload.message, &local_time)
);
}
payload_chunks.push((index, log_payload));
}
if !payload_chunks.is_empty() {
log_to_elastic(client.to_owned(), config.clone(), payload_chunks);
}
if !SHOULD_LOOP.load(std::sync::atomic::Ordering::SeqCst) {
IS_FLUSHED.store(true, std::sync::atomic::Ordering::SeqCst);
return;
}
}
});
}