use crate::error::Result;
use crate::filter::Filter;
use crate::format::Formatter;
use crate::level::Level;
use crate::record::LogRecord;
use crate::rotation::{RotationManager, RotationPolicy};
use crossbeam_channel::{Sender, bounded};
use parking_lot::RwLock;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::Arc;
pub struct SinkConfig {
pub path: Option<PathBuf>,
pub rotation: Option<String>,
pub size_limit: Option<u64>,
pub retention: Option<usize>,
pub filter_min_level: Option<Level>,
pub filter_module: Option<String>,
pub filter_function: Option<String>,
pub async_write: bool,
pub buffer_size: usize,
pub flush_interval: u64,
pub max_buffered_lines: usize,
pub date_style: Option<String>,
pub date_enabled: bool,
pub format: Option<String>,
pub json: bool,
pub color: bool,
}
impl Default for SinkConfig {
fn default() -> Self {
Self {
path: None,
rotation: None,
size_limit: None,
retention: None,
filter_min_level: None,
filter_module: None,
filter_function: None,
async_write: true,
buffer_size: 8192,
flush_interval: 100,
max_buffered_lines: 1000,
date_style: None,
date_enabled: false,
format: None,
json: false,
color: true, }
}
}
pub struct Sink {
id: usize,
config: SinkConfig,
writer: Arc<RwLock<Option<BufWriter<File>>>>,
filter: Filter,
formatter: Formatter,
enabled: Arc<RwLock<bool>>,
sender: Option<Sender<LogRecord>>,
rotation_manager: Arc<RwLock<Option<RotationManager>>>,
}
impl Sink {
pub fn set_level_colors(
&mut self,
colors: std::collections::HashMap<crate::level::Level, String>,
) {
self.formatter = self.formatter.clone().with_level_colors(colors);
}
}
impl Sink {
pub fn new(id: usize, config: SinkConfig) -> Result<Self> {
let filter = Filter::new(
config.filter_min_level,
config.filter_module.clone(),
config.filter_function.clone(),
);
let formatter = Formatter::new(
config.format.clone(),
config.json,
config.date_enabled,
config.date_style.clone(),
)
.with_color(config.color);
let writer = if let Some(ref path) = config.path {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new().create(true).append(true).open(path)?;
Some(BufWriter::with_capacity(config.buffer_size, file))
} else {
None
};
let (sender, writer_arc) = if config.async_write {
let (s, r) = bounded(config.max_buffered_lines);
let writer_clone = Arc::new(RwLock::new(writer));
let writer_ref = Arc::clone(&writer_clone);
let formatter_clone = formatter.clone();
std::thread::spawn(move || {
while let Ok(record) = r.recv() {
if let Some(ref mut w) = *writer_ref.write() {
let formatted = formatter_clone.format(&record);
let _ = writeln!(w, "{}", formatted);
let _ = w.flush();
}
}
});
(Some(s), writer_clone)
} else {
(None, Arc::new(RwLock::new(writer)))
};
let rotation_manager = if let Some(ref path) = config.path {
if config.rotation.is_some() || config.size_limit.is_some() {
let policy = match (&config.rotation, config.size_limit) {
(Some(interval), Some(size)) => RotationPolicy::Both(size, interval.clone()),
(Some(interval), None) => RotationPolicy::Time(interval.clone()),
(None, Some(size)) => RotationPolicy::Size(size),
_ => RotationPolicy::Size(10 * 1024 * 1024), };
Some(RotationManager::new(path.clone(), policy, config.retention))
} else {
None
}
} else {
None
};
Ok(Self {
id,
config,
writer: writer_arc,
filter,
formatter,
enabled: Arc::new(RwLock::new(true)),
sender,
rotation_manager: Arc::new(RwLock::new(rotation_manager)),
})
}
pub fn log(
&self,
record: &LogRecord,
global_console: bool,
global_storage: bool,
) -> Result<()> {
if !*self.enabled.read() {
return Ok(());
}
if !self.filter.matches(record) {
return Ok(());
}
let formatted = self.formatter.format(record);
let data_size = formatted.len() as u64;
if self.config.path.is_none() && global_console {
println!("{}", formatted);
return Ok(());
}
if !global_storage {
return Ok(());
}
if let Some(ref mut rotation) = *self.rotation_manager.write() {
if rotation.should_rotate(data_size) {
rotation.rotate()?;
if let Some(ref path) = self.config.path {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new().create(true).append(true).open(path)?;
*self.writer.write() =
Some(BufWriter::with_capacity(self.config.buffer_size, file));
}
}
rotation.update_size(data_size);
}
if let Some(ref sender) = self.sender {
sender
.send(record.clone())
.map_err(|_| crate::error::LoglyError::ChannelSend)?;
} else if let Some(ref mut writer) = *self.writer.write() {
writeln!(writer, "{}", formatted)?;
writer.flush()?;
}
Ok(())
}
pub fn id(&self) -> usize {
self.id
}
pub fn enable(&self) {
*self.enabled.write() = true;
}
pub fn disable(&self) {
*self.enabled.write() = false;
}
pub fn is_enabled(&self) -> bool {
*self.enabled.read()
}
}