use std::{
fs,
io::{self, Error, ErrorKind},
path::{Path, PathBuf},
ptr,
sync::{
Arc,
atomic::{self, AtomicBool},
mpsc::{self, SyncSender},
},
thread,
time::{Duration, Instant, SystemTime},
};
use rusqlite::{Connection, ToSql};
const SQL_CREATOR: &'static str = include_str!("../res/db_creator.sql");
const LOG_FILE_EXT_WITH_DOT: &'static str = ".sqlite";
const JOURNAL_LOG_FILE_EXT_WITH_DOT: &'static str = ".sqlite-journal";
#[derive(Debug)]
pub struct Config {
pub work_dir: PathBuf,
pub max_file_len: u64,
pub log_files_reserved: Duration,
pub buf_len: usize,
pub disk_flush_interval: Duration,
}
#[derive(Debug)]
pub struct Log {
pub time: u64,
pub remote_ip: String,
pub url: String,
pub response_size: Option<u64>,
pub code: u16,
pub runtime: Duration,
pub notes: Option<String>,
}
#[derive(Debug)]
pub enum Cmd {
StoreLog(Log),
FlushToDisk,
Ping,
}
pub type CmdSender = SyncSender<Cmd>;
pub struct Logger {
logs: Vec<Log>,
config: Config,
output: Option<PathBuf>,
log_file_cleaner_is_running: Arc<AtomicBool>,
}
impl Logger {
pub fn make(config: Config) -> io::Result<CmdSender> {
if config.disk_flush_interval.as_secs() == 0 {
return Err(Error::new(ErrorKind::InvalidInput, "Disk flush interval must be larger than zero"));
}
let disk_flush_interval = config.disk_flush_interval;
let (sender, receiver) = mpsc::sync_channel(config.buf_len);
thread::spawn(move || {
let mut logger = Self {
logs: Vec::with_capacity(config.buf_len),
config,
output: None,
log_file_cleaner_is_running: Arc::new(AtomicBool::new(false)),
};
loop {
match receiver.recv() {
Ok(Cmd::StoreLog(log)) => logger.push(log),
Ok(Cmd::FlushToDisk) => if let Err(err) = logger.flush_to_disk_and_clear_logs() {
__e!("{}", err);
},
Ok(Cmd::Ping) => {},
Err(err) => {
__e!("Failed to receive command: {} -> stopping server", err);
break;
},
};
}
});
{
let sender = sender.clone();
thread::spawn(move || {
let sleep_time = Duration::from_secs(disk_flush_interval.as_secs().min(10));
let mut last_saved = Instant::now();
loop {
let now = Instant::now();
match now > last_saved && now - last_saved >= disk_flush_interval {
true => match sender.send(Cmd::FlushToDisk) {
Ok(()) => last_saved = Instant::now(),
Err(err) => {
__e!("Failed sending {:?} to server -> {}", Cmd::FlushToDisk, err);
break;
},
},
false => if let Err(err) = sender.send(Cmd::Ping) {
__e!("Failed sending {:?} to server -> {}", Cmd::Ping, err);
break;
},
};
thread::sleep(sleep_time);
}
});
}
Ok(sender)
}
pub fn push(&mut self, log: Log) {
self.logs.push(log);
if self.logs.len() >= self.logs.capacity() {
__p!("Buffer full; flushing to disk...");
if let Err(err) = self.flush_to_disk_and_clear_logs() {
__e!("Failed flushing logs to disk: {}", err);
}
}
}
fn flush_to_disk_and_clear_logs(&mut self) -> io::Result<()> {
let need_new_file = match self.output.as_ref() {
Some(output) => output.metadata()?.len() >= self.config.max_file_len,
None => true,
};
if need_new_file {
let (year, month, day, hour, min, sec) = unsafe {
let localtime = libc::localtime(&libc::time(ptr::null_mut()));
match localtime.is_null() {
true => return Err(Error::new(ErrorKind::Other, __!("Failed to get local time"))),
false => (
(*localtime).tm_year.saturating_add(1900), (*localtime).tm_mon.saturating_add(1), (*localtime).tm_mday,
(*localtime).tm_hour, (*localtime).tm_min, (*localtime).tm_sec,
),
}
};
let path = self.config.work_dir.join(
format!("{:04}-{:02}-{:02}__{:02}-{:02}-{:02}{}", year, month, day, hour, min, sec, LOG_FILE_EXT_WITH_DOT)
);
match path.exists() {
true => return Err(Error::new(ErrorKind::Other, __!("Failed to create output file (it already exists): {:?}", path))),
false => self.output = Some(path),
}
}
if let Some(output) = self.output.clone() {
self.flush_to_file(output)?;
}
self.logs.clear();
self.clean_up_old_log_files();
Ok(())
}
fn flush_to_file(&self, file: impl AsRef<Path>) -> io::Result<()> {
let file = file.as_ref();
let start_time = Instant::now();
let file_existed = file.exists();
let mut conn = Connection::open(file).map_err(|err|
Error::new(ErrorKind::Other, __!("Failed to open database connection: {}", err))
)?;
if file_existed == false {
conn.execute_batch(SQL_CREATOR).map_err(|err| Error::new(ErrorKind::Other, __!("Failed making new database: {}", err)))?;
}
let transaction = conn.transaction().map_err(|err|
Error::new(ErrorKind::Other, __!("Failed to make new database transaction: {}", err))
)?;
{
let mut statement = transaction.prepare_cached(
"insert into logs (time, remote_ip, url, response_size, code, runtime, runtime_millis, notes) values (?,?,?,?,?,?,?,?);"
).map_err(|err| {
Error::new(ErrorKind::Other, __!("Internal error: {}", err))
})?;
for log in self.logs.iter() {
let params: &[&ToSql] = &[
&(log.time as i64), &log.remote_ip, &log.url, &log.response_size.map(|s| s as i64), &log.code,
&(log.runtime.as_secs() as i64), &log.runtime.subsec_millis(), &(log.notes),
];
statement.execute(params).map_err(|err| Error::new(ErrorKind::Other, __!("Failed running SQL statement: {}", err)))?;
}
}
transaction.commit().map_err(|err| Error::new(ErrorKind::Other, __!("Failed to commit: {}", err)))?;
__p!(
"Flushed {} record{} to disk successfully, in {:?}",
self.logs.len(), match self.logs.len() { 1 => "", _ => "s" }, Instant::now().duration_since(start_time),
);
Ok(())
}
fn clean_up_old_log_files(&self) {
const ATOMIC_ORDERING: atomic::Ordering = atomic::Ordering::Relaxed;
if self.log_file_cleaner_is_running.compare_and_swap(false, true, ATOMIC_ORDERING) {
__p!("Log file cleaner was scheduled but another instance is still running...");
return;
}
let work_dir = self.config.work_dir.clone();
let log_files_reserved = self.config.log_files_reserved.clone();
let log_file_cleaner_is_running = self.log_file_cleaner_is_running.clone();
thread::spawn(move || {
match fs::read_dir(&work_dir) {
Ok(read_dir) => for dir_entry in read_dir {
if let Ok(dir_entry) = dir_entry {
match dir_entry.file_name().to_str() {
Some(file_name) if file_name.ends_with(LOG_FILE_EXT_WITH_DOT) || file_name.ends_with(JOURNAL_LOG_FILE_EXT_WITH_DOT)
=> {},
_ => continue,
};
match dir_entry.metadata().map(|m| m.modified().map(|m| SystemTime::now().duration_since(m))) {
Ok(Ok(Ok(duration))) if duration > log_files_reserved => {
let path = dir_entry.path();
if path.is_file() == false {
continue;
}
match fs::remove_file(&path) {
Ok(()) => __p!("Removed old log file: {:?}", path),
Err(err) => __e!("Failed to remove old log file {:?} -> {}", path, err),
}
},
_ => continue,
};
}
},
Err(err) => __e!("Cleaning up old log files: failed to read work directory {:?} -> {}", work_dir, err),
};
log_file_cleaner_is_running.store(false, ATOMIC_ORDERING);
});
}
}