use std::collections::{HashMap, HashSet, VecDeque};
use std::fs::{self, File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::time::Duration;
use crate::FileEvent;
use crate::metrics::MetricsRegistry;
const MAX_OPEN_HANDLES: usize = 64;
pub(crate) struct FileLogWriter {
log_dir: PathBuf,
disk_buf: VecDeque<(FileEvent, String)>,
disk_healthy: bool,
last_disk_check: std::time::Instant,
dirty_logs: HashSet<PathBuf>,
sync_interval: Option<Duration>,
debug: bool,
local_time: bool,
metrics: MetricsRegistry,
handles: HashMap<PathBuf, BufWriter<File>>,
}
impl FileLogWriter {
pub(crate) fn new(
log_dir: PathBuf,
sync_interval: Option<Duration>,
debug: bool,
local_time: bool,
metrics: MetricsRegistry,
) -> Self {
Self {
log_dir,
disk_buf: VecDeque::with_capacity(10_000),
disk_healthy: true,
last_disk_check: std::time::Instant::now(),
dirty_logs: HashSet::new(),
sync_interval,
metrics,
debug,
local_time,
handles: HashMap::new(),
}
}
fn jsonl_string(&self, event: &FileEvent) -> String {
if self.local_time {
event.to_jsonl_string_local()
} else {
event.to_jsonl_string()
}
}
pub(crate) async fn run(
mut self,
mut rx: tokio::sync::broadcast::Receiver<(FileEvent, String)>,
) {
use tokio::time::interval;
let mut sync_timer = self.sync_interval.map(|d| interval(d));
loop {
tokio::select! {
result = rx.recv() => {
match result {
Ok((event, cmd_name)) => {
if let Err(e) = self.write_event(&event, &cmd_name)
&& self.debug {
eprintln!("[DEBUG] FileLogWriter write error: {}", e);
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
eprintln!("[WARNING] FileLogWriter dropped {} events (disk too slow)", n);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
_ = async {
match sync_timer.as_mut() {
Some(timer) => timer.tick().await,
None => std::future::pending().await,
}
} => {
self.sync_dirty_logs();
}
}
}
self.sync_dirty_logs();
}
fn get_or_open(&mut self, log_path: &PathBuf) -> std::io::Result<&mut BufWriter<File>> {
if !self.handles.contains_key(log_path) {
if self.handles.len() >= MAX_OPEN_HANDLES
&& let Some(evict_path) = self.handles.keys().next().cloned()
{
self.handles.remove(&evict_path);
}
let file = open_log_file(log_path, &self.log_dir)?;
self.handles.insert(log_path.clone(), BufWriter::new(file));
}
Ok(self.handles.get_mut(log_path).unwrap())
}
fn write_event(&mut self, event: &FileEvent, cmd_name: &str) -> std::io::Result<()> {
let log_path = self.log_dir.join(crate::utils::cmd_to_log_name(cmd_name));
if !self.disk_healthy
&& self.last_disk_check.elapsed() >= std::time::Duration::from_secs(10)
{
self.flush_disk_buf();
}
let line = self.jsonl_string(event);
let is_new = !log_path.exists();
match self.get_or_open(&log_path) {
Ok(writer) => {
if is_new && let Err(e) = crate::fid_parser::chown_to_user(&log_path) {
eprintln!(
"[WARNING] Could not chown log file '{}': {}",
log_path.display(),
e
);
}
writeln!(writer, "{}", line)?;
if self.sync_interval.is_some() {
self.dirty_logs.insert(log_path);
}
self.disk_healthy = true;
Ok(())
}
Err(e) => {
self.handles.remove(&log_path);
self.disk_healthy = false;
self.last_disk_check = std::time::Instant::now();
if self.disk_buf.len() < 10_000 {
self.disk_buf
.push_back((event.clone(), cmd_name.to_string()));
}
self.metrics
.set_disk_buffer_events(self.disk_buf.len() as i64);
Err(e)
}
}
}
fn flush_disk_buf(&mut self) {
if self.disk_buf.is_empty() {
self.disk_healthy = true;
return;
}
let mut remaining = VecDeque::new();
while let Some((event, cmd_name)) = self.disk_buf.pop_front() {
let log_path = self.log_dir.join(crate::utils::cmd_to_log_name(&cmd_name));
let line = self.jsonl_string(&event);
match self.get_or_open(&log_path) {
Ok(writer) => {
if writeln!(writer, "{}", line).is_err() {
self.handles.remove(&log_path);
remaining.push_back((event, cmd_name));
}
}
Err(_) => {
remaining.push_back((event, cmd_name));
}
}
}
self.disk_buf = remaining;
self.disk_healthy = self.disk_buf.is_empty();
self.last_disk_check = std::time::Instant::now();
self.metrics
.set_disk_buffer_events(self.disk_buf.len() as i64);
}
fn sync_dirty_logs(&mut self) {
if self.dirty_logs.is_empty() {
return;
}
let paths: Vec<PathBuf> = self.dirty_logs.drain().collect();
for path in &paths {
if let Some(writer) = self.handles.get(path) {
if let Err(e) = writer.get_ref().sync_data() {
eprintln!("[WARNING] fdatasync failed for '{}': {}", path.display(), e);
}
} else {
match File::options().write(true).open(path) {
Ok(file) => {
if let Err(e) = file.sync_data() {
eprintln!("[WARNING] fdatasync failed for '{}': {}", path.display(), e);
}
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
eprintln!(
"[WARNING] Could not open '{}' for sync: {}",
path.display(),
e
);
}
}
}
}
}
}
fn open_log_file(log_path: &PathBuf, log_dir: &PathBuf) -> std::io::Result<File> {
OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
.or_else(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
let _ = fs::create_dir_all(log_dir);
let _ = crate::fid_parser::chown_to_user(log_dir);
OpenOptions::new().create(true).append(true).open(log_path)
} else {
Err(e)
}
})
}