use crate::builder::LoggerError;
use crate::client::compressed_log_upload;
use crate::client::plaintext_log_upload;
use crate::compression::Compression;
use crate::compression::Encoder;
use crate::compression::FinishValue;
use actix::System;
use lazy_static::lazy_static;
use log::{Level, Log, Metadata, Record};
use std::cell::RefCell;
use std::fs::OpenOptions;
use std::io::Write;
use std::sync::Mutex;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
lazy_static! {
pub static ref TIMER: Arc<RwLock<Instant>> = Arc::new(RwLock::new(Instant::now()));
}
const LOG_TIMEOUT: Duration = Duration::from_secs(10 * 60);
const LOG_FILE_THRESH: u64 = 500_000;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct PlaintextLogs {
pub logs: Vec<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct CompressedLogs {
pub compressed_plaintext_logs: Vec<u8>,
}
pub struct Logger {
level: Level,
compression: Compression,
threshold: usize,
encoder: Mutex<RefCell<Encoder>>,
sink_url: String,
format: Box<dyn Fn(&Record) -> String + Sync + Send>,
store_tmp_logs: bool,
}
impl Logger {
pub fn new_encoder(compression: Compression) -> Result<Encoder, LoggerError> {
Ok(Encoder::new(compression))
}
pub fn with_level(
level: Level,
compression: Compression,
threshold: usize,
sink_url: String,
format: Box<dyn Fn(&Record) -> String + Sync + Send>,
store_tmp_logs: bool,
) -> Result<Self, LoggerError> {
let encoder = Logger::new_encoder(compression)?;
Ok(Self {
level,
compression,
threshold,
encoder: Mutex::new(RefCell::new(encoder)),
sink_url,
format,
store_tmp_logs,
})
}
fn rotate(&self, encoder: &RefCell<Encoder>) -> Result<FinishValue, LoggerError> {
let new_encoder = Logger::new_encoder(self.compression)?;
let old_encoder = encoder.replace(new_encoder);
let res = old_encoder.finish()?;
Ok(res)
}
}
impl Drop for Logger {
fn drop(&mut self) {
debug_eprintln!("Drop handler called!");
let encoder = self.encoder.lock().expect("Unable to acquire buffer lock");
let data = self.rotate(&encoder).expect("Unable to rotate the buffer");
match data {
FinishValue::Compressed(ref c) => {
if c.compressed_plaintext_logs.is_empty() {
return;
}
}
FinishValue::Uncompressed(ref c) => {
if c.logs.is_empty() {
return;
}
}
}
let url = self.sink_url.clone();
upload_logs(url, data);
}
}
impl Log for Logger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= self.level
}
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
let encoder = self.encoder.lock().expect("Unable to acquire encoder lock");
let mut log_string = (self.format)(record);
let current_size = { encoder.borrow_mut().uncompressed_bytes() };
let mut log_len = log_string.as_bytes().len();
if log_len > self.threshold {
let error_str = format!("Single log line greater than log threshold of {}, please reduce log size.\n This logs starts with {:?}", self.threshold, {
let mut log_clone = log_string.clone();
log_clone.truncate(1000);
log_clone
});
debug_eprintln!("{}", error_str);
log_string = error_str;
log_len = log_string.as_bytes().len();
}
if self.store_tmp_logs {
if let Ok(mut log_file) = OpenOptions::new()
.append(true)
.create(true)
.open("/tmp/compressed_logs_2")
{
log_file
.write_all(log_string.as_bytes())
.expect("write failed");
if log_file.metadata().expect("why did this fail").len() > LOG_FILE_THRESH {
drop(log_file);
if let Err(e) =
std::fs::rename("/tmp/compressed_logs_2", "/tmp/compressed_logs_1")
{
debug_eprintln!("rename compressed_logs_2 failed with {}", e);
};
}
} else {
debug_eprintln!("Unable to open /tmp/compressed_logs_2");
}
}
if (current_size + log_len < self.threshold)
&& (Instant::now() - *TIMER.read().unwrap() < LOG_TIMEOUT)
{
encoder.borrow_mut().add_line(log_string.clone());
debug_eprintln!(
"Buffer {} of {} bytes",
current_size + log_len,
self.threshold
);
debug_eprintln!("First 5000 bytes of Line {:?}", {
let mut log_clone = log_string;
log_clone.truncate(5000);
log_clone
});
return;
}
debug_eprintln!("Size greater than threshold or timeout hit, sending logs");
let data = self.rotate(&encoder).expect("Unable to rotate the buffer");
let url = self.sink_url.clone();
upload_logs(url, data);
drop(encoder);
self.log(record);
}
}
fn flush(&self) {
let encoder = self.encoder.lock().expect("Unable to acquire encoder lock");
let data = self.rotate(&encoder).expect("Unable to rotate the buffer");
debug_eprintln!("Flush called, dropping logs!");
let url = self.sink_url.clone();
upload_logs(url, data);
}
}
fn upload_logs(url: String, data: FinishValue) {
debug_eprintln!("Uploading logs");
thread::spawn(|| {
debug_eprintln!("thread spawned");
let runner = System::new();
runner.block_on(async move {
match data {
FinishValue::Compressed(c) => {
let _ = compressed_log_upload(c, url).await;
}
FinishValue::Uncompressed(c) => {
let _ = plaintext_log_upload(c, url).await;
}
}
System::current().stop();
});
});
*TIMER.write().unwrap() = Instant::now();
}
#[test]
fn test_logging() {
use super::*;
use crate::builder::LoggerBuilder;
use log::LevelFilter;
let logging_url = "https://stats.altheamesh.com:9999/compressed_sink";
let level = LevelFilter::Info;
let logger = LoggerBuilder::default()
.set_level(level.to_level().unwrap())
.set_compression_level(Compression::Fast)
.set_sink_url(logging_url)
.set_format(Box::new(move |record: &Record| {
format!("compressed-logger-tester! {}\n", record.args())
}))
.build();
let logger = logger.unwrap();
log::set_boxed_logger(Box::new(logger)).unwrap();
log::set_max_level(level);
println!(
"Remote compressed logging enabled with target {}",
logging_url
);
for _ in 0..100_000 {
info!("test!")
}
}
#[test]
fn file_playground() {
let mut data_file = OpenOptions::new()
.append(true)
.create(true)
.open("./compressed_log")
.unwrap();
data_file
.write_all("Hello, World!\n".as_bytes())
.expect("write failed");
data_file
.write_all("Line2!\n".as_bytes())
.expect("write failed");
println!("File size is {:?}", data_file.metadata().unwrap().len());
if data_file.metadata().unwrap().len() > 120 {
drop(data_file);
if let Err(e) = std::fs::rename("./compressed_log", "./compressed_log_backup") {
println!("rename failed with {}", e);
};
}
}