use crate::{
config::{LogFormat, SinkConfigBuild, SinkConfigTrait},
log_impl::{LogSink, LogSinkTrait},
rotation::*,
time::Timer,
};
use log::{Level, Record};
use std::fs::metadata;
use std::hash::{Hash, Hasher};
use std::os::unix::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Once};
use std::time::{Duration, SystemTime};
use crate::file_impl::open_file;
use crossfire::{mpsc, MTx, RecvTimeoutError, Rx};
use std::thread;
const FLUSH_SIZE_DEFAULT: usize = 4096;
#[derive(Hash)]
pub struct LogBufFile {
pub level: Level,
pub format: LogFormat,
pub file_path: Box<Path>,
pub flush_millis: usize,
pub rotation: Option<Rotation>,
pub flush_size: usize,
}
impl LogBufFile {
pub fn new<P1, P2>(
dir: P1, file_name: P2, level: Level, format: LogFormat, flush_millis: usize,
) -> Self
where
P1: Into<PathBuf>,
P2: Into<PathBuf>,
{
let dir_path: PathBuf = dir.into();
if !dir_path.exists() {
std::fs::create_dir(&dir_path).expect("create dir for log");
}
let file_path = dir_path.join(file_name.into()).into_boxed_path();
Self {
level,
format,
file_path,
flush_millis,
rotation: None,
flush_size: FLUSH_SIZE_DEFAULT,
}
}
pub fn rotation(mut self, ro: Rotation) -> Self {
self.rotation = Some(ro);
self
}
}
impl SinkConfigBuild for LogBufFile {
fn build(&self) -> LogSink {
LogSink::BufFile(LogSinkBufFile::new(self))
}
}
impl SinkConfigTrait for LogBufFile {
fn get_level(&self) -> Level {
self.level
}
fn get_file_path(&self) -> Option<Box<Path>> {
Some(self.file_path.clone())
}
fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
self.hash(hasher);
hasher.write(b"LogBufFile");
}
}
pub(crate) struct LogSinkBufFile {
max_level: Level,
formatter: LogFormat,
_th: thread::JoinHandle<()>,
tx: MTx<mpsc::Array<Msg>>,
}
impl LogSinkBufFile {
fn new(config: &LogBufFile) -> Self {
let (tx, rx) = mpsc::bounded_blocking(1024);
let mut flush_millis = config.flush_millis;
if flush_millis == 0 || flush_millis > 1000 {
flush_millis = 1000;
}
let mut rotate_impl: Option<LogRotate> = None;
if let Some(r) = &config.rotation {
rotate_impl = Some(r.build(&config.file_path));
}
let mut flush_size = config.flush_size;
if flush_size == 0 {
flush_size = FLUSH_SIZE_DEFAULT;
}
let mut inner = BufFileInner {
size: 0,
create_time: None,
path: config.file_path.to_path_buf(),
f: None,
flush_millis,
flush_size,
buf: Vec::with_capacity(4096),
rotate: rotate_impl,
};
let _th = thread::spawn(move || inner.log_writer(rx));
Self { max_level: config.level, formatter: config.format.clone(), tx, _th }
}
}
impl LogSinkTrait for LogSinkBufFile {
#[inline]
fn open(&self) -> std::io::Result<()> {
self.reopen()
}
fn reopen(&self) -> std::io::Result<()> {
let _ = self.tx.send(Msg::Reopen);
Ok(())
}
#[inline(always)]
fn log(&self, now: &Timer, r: &Record) {
if r.level() <= self.max_level {
let buf = self.formatter.process(now, r);
let _ = self.tx.send(Msg::Line(buf));
}
}
#[inline(always)]
fn flush(&self) {
let o = Arc::new(Once::new());
if self.tx.send(Msg::Flush(o.clone())).is_ok() {
o.wait();
}
}
}
enum Msg {
Line(String),
Reopen,
Flush(Arc<Once>),
}
struct BufFileInner {
size: u64,
create_time: Option<SystemTime>,
path: PathBuf,
f: Option<std::fs::File>,
buf: Vec<u8>,
flush_millis: usize,
rotate: Option<LogRotate>,
flush_size: usize,
}
impl FileSinkTrait for BufFileInner {
#[inline(always)]
fn get_create_time(&self) -> SystemTime {
self.create_time.unwrap()
}
#[inline(always)]
fn get_size(&self) -> u64 {
self.size
}
}
impl BufFileInner {
fn reopen(&mut self) {
match open_file(&self.path) {
Ok(f) => {
let mt = metadata(&self.path).expect("get metadata");
self.size = mt.len();
if self.create_time.is_none() {
self.create_time = Some(mt.modified().unwrap());
}
self.f.replace(f);
}
Err(e) => {
eprintln!("open logfile {:#?} failed: {:?}", &self.path, e);
}
}
}
fn write(&mut self, mut s: Vec<u8>) {
if self.buf.len() + s.len() > self.flush_size {
if self.buf.len() > 0 {
self.flush(false);
}
}
self.buf.reserve(s.len());
self.buf.append(&mut s);
if self.buf.len() >= self.flush_size {
self.flush(false);
}
}
#[inline(always)]
fn check_rotate(&mut self) {
if let Some(ro) = self.rotate.as_ref() {
if ro.rotate(self) {
self.reopen();
}
}
}
fn flush(&mut self, wait_rotate: bool) {
if let Some(f) = self.f.as_ref() {
self.size += self.buf.len() as u64;
let mut p = self.buf.as_ptr() as *const u8;
let mut l = self.buf.len();
loop {
let r = unsafe {
libc::write(f.as_raw_fd() as libc::c_int, p as *const libc::c_void, l)
};
if r == l as isize || r < 0 {
break;
}
l -= r as usize;
p = unsafe { p.add(r as usize) };
}
unsafe { self.buf.set_len(0) };
self.check_rotate();
}
if wait_rotate {
if let Some(ro) = self.rotate.as_ref() {
ro.wait();
}
}
}
fn log_writer(&mut self, rx: Rx<mpsc::Array<Msg>>) {
self.reopen();
self.check_rotate();
macro_rules! process {
($msg: expr) => {
match $msg {
Msg::Line(line) => {
self.write(line.into());
}
Msg::Reopen => {
self.reopen();
}
Msg::Flush(o) => {
self.flush(true);
o.call_once(|| {});
}
}
};
}
if self.flush_millis > 0 {
loop {
match rx.recv_timeout(Duration::from_millis(self.flush_millis as u64)) {
Ok(msg) => {
process!(msg);
while let Ok(msg) = rx.try_recv() {
process!(msg);
}
}
Err(RecvTimeoutError::Timeout) => {
self.flush(false);
}
Err(RecvTimeoutError::Disconnected) => {
self.flush(true);
return;
}
}
}
} else {
loop {
match rx.recv() {
Ok(msg) => {
process!(msg);
while let Ok(msg) = rx.try_recv() {
process!(msg);
}
self.flush(false);
}
Err(_) => {
self.flush(true);
return;
}
}
}
}
}
}