use anyhow::{Context, Result, bail};
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::time::Duration;
use lru::LruCache;
use tokio::io::AsyncBufReadExt;
use tokio::io::unix::AsyncFd;
use tokio::signal::unix::{SignalKind, signal};
use moka::sync::Cache;
use crate::FileEvent;
use crate::config::ResolvedCacheConfig;
use crate::fid_parser::FsGroup;
use crate::filters::{self, PathOptions};
use crate::metrics::MetricsRegistry;
use crate::monitored::PathEntry;
use crate::proc_cache::{self, PidTree, ProcCache};
use crate::watchdog::Watchdog;
use serde_json;
use slotmap::SlotMap;
pub(crate) type FsGroupKey = slotmap::DefaultKey;
macro_rules! debug_log {
($debug:expr, $($arg:tt)*) => {
if $debug { eprintln!("[DEBUG] {}", format!($($arg)*)); }
};
}
mod channel;
mod events;
mod file_writer;
mod filtering;
mod init;
mod live_path;
mod reader;
mod socket_handler;
pub(crate) use channel::{EventReceiver, EventSender};
pub(crate) use events::PendingEvent;
pub(crate) use file_writer::FileLogWriter;
pub(crate) use reader::ReaderState;
#[cfg(test)]
pub(crate) use socket_handler::chains_contain;
pub(crate) use socket_handler::tokio_io_oneshot;
pub struct MonitorConfig {
pub paths_and_options: Vec<(PathBuf, PathOptions)>,
pub log_dir: Option<PathBuf>,
pub monitored_path: Option<PathBuf>,
pub buffer_size: Option<usize>,
pub socket_listener: Option<tokio::net::UnixListener>,
pub debug: bool,
pub cache_config: Option<ResolvedCacheConfig>,
pub disk_min_free: Option<String>,
pub sync_interval: Option<std::time::Duration>,
pub subscribe_buf: Option<usize>,
pub local_time: bool,
pub metrics_interval: Option<u64>,
pub watchdog_interval: Option<u64>,
}
pub struct Monitor {
pub(crate) paths: Vec<PathBuf>,
pub(crate) canonical_paths: Vec<PathBuf>,
pub(crate) monitored_entries: Vec<(PathBuf, PathOptions)>,
pub(crate) log_dir: Option<PathBuf>,
pub(crate) monitored_path: Option<PathBuf>,
pub(crate) proc_cache: Option<ProcCache>,
pub(crate) pid_tree: Option<PidTree>,
pub(crate) file_size_cache: LruCache<PathBuf, u64>,
pub(crate) buffer_size: usize,
pub(crate) socket_listener: Option<tokio::net::UnixListener>,
pub(crate) fs_groups: SlotMap<FsGroupKey, FsGroup>,
pub(crate) path_to_group: HashMap<PathBuf, FsGroupKey>,
pub(crate) dir_cache: Cache<fanotify_fid::types::HandleKey, PathBuf>,
pub(crate) event_tx: Option<EventSender>,
pub(crate) shared_dir_cache: Option<Cache<fanotify_fid::types::HandleKey, PathBuf>>,
pub(crate) pending_paths: Vec<(PathBuf, PathEntry)>,
pub(crate) inotify: Option<inotify::Inotify>,
pub(crate) _inotify_watches: Vec<(PathBuf, inotify::WatchDescriptor)>,
pub(crate) daemon_pid: u32,
pub(crate) cache_config: ResolvedCacheConfig,
pub(crate) debug: bool,
pub(crate) reader_death_rx: tokio::sync::mpsc::UnboundedReceiver<FsGroupKey>,
pub(crate) reader_death_tx: tokio::sync::mpsc::UnboundedSender<FsGroupKey>,
pub(crate) reader_states: HashMap<FsGroupKey, ReaderState>,
pub(crate) started_at: std::time::Instant,
disk_min_free: Option<String>,
sync_interval: Option<std::time::Duration>,
metrics_interval: Option<std::time::Duration>,
pub(crate) event_stream_tx: Option<tokio::sync::broadcast::Sender<(FileEvent, String)>>,
pub(crate) local_time: bool,
pub(crate) metrics: MetricsRegistry,
pub(crate) temp_parent_marks: HashMap<PathBuf, (PathBuf, FsGroupKey)>,
pub(crate) watchdog: Option<Watchdog>,
}
impl Monitor {
#[allow(clippy::too_many_arguments)]
fn new(
paths_and_options: Vec<(PathBuf, PathOptions)>,
log_dir: Option<PathBuf>,
monitored_path: Option<PathBuf>,
buffer_size: Option<usize>,
socket_listener: Option<tokio::net::UnixListener>,
debug: bool,
cache_config: Option<ResolvedCacheConfig>,
disk_min_free: Option<String>,
sync_interval: Option<std::time::Duration>,
subscribe_buf: Option<usize>,
local_time: bool,
metrics_interval: Option<u64>,
watchdog_interval: Option<u64>,
) -> Result<Self> {
let cache_config = cache_config.unwrap_or_default();
let buffer_size = buffer_size.unwrap_or(cache_config.buffer_size);
if buffer_size < 4096 {
bail!("buffer_size must be at least 4096 bytes (4KB)");
}
if buffer_size > 1024 * 1024 {
bail!("buffer_size must not exceed 1048576 bytes (1MB)");
}
let mut paths = Vec::new();
let mut seen = std::collections::HashSet::new();
let mut monitored_entries = Vec::new();
let log_dir_canonical = log_dir
.as_ref()
.map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()));
for (path, opts) in &paths_and_options {
let resolved = filters::resolve_recursion_check(path);
if let Some(ref log_dir) = log_dir_canonical {
let is_exact = log_dir.as_path() == resolved;
let is_parent_recursive = opts.recursive && log_dir.starts_with(&resolved);
if is_exact || is_parent_recursive {
bail!(
"Cannot monitor '{}': {} — \
Tip: use a path outside the log directory, or use a different logging.path",
path.display(),
if is_exact {
"this path is the log directory itself".to_string()
} else {
format!("log directory '{}' is inside this path", log_dir.display())
},
);
}
}
if opts.cmd.as_deref() == Some("fsmon") {
bail!(
"Cannot monitor 'fsmon' process: fsmon daemon's own events \
are excluded from monitoring."
);
}
if seen.insert(resolved.clone()) {
paths.push(resolved.clone());
}
monitored_entries.push((resolved.clone(), opts.clone()));
}
let (reader_death_tx, reader_death_rx) =
tokio::sync::mpsc::unbounded_channel::<FsGroupKey>();
let monitor = Self {
paths,
canonical_paths: Vec::new(),
monitored_entries,
log_dir,
monitored_path,
proc_cache: None,
pid_tree: None,
file_size_cache: LruCache::new(
NonZeroUsize::new(cache_config.file_size_capacity).unwrap(),
),
buffer_size,
dir_cache: Cache::builder()
.max_capacity(cache_config.dir_capacity)
.time_to_live(Duration::from_secs(cache_config.dir_ttl_secs))
.build(),
cache_config,
socket_listener,
debug,
fs_groups: SlotMap::new(),
path_to_group: HashMap::new(),
event_tx: None,
shared_dir_cache: None,
pending_paths: Vec::new(),
inotify: None,
_inotify_watches: Vec::new(), daemon_pid: std::process::id(),
reader_death_rx,
reader_death_tx,
reader_states: HashMap::new(),
started_at: std::time::Instant::now(),
disk_min_free,
sync_interval,
metrics_interval: metrics_interval
.filter(|&n| n > 0)
.map(std::time::Duration::from_secs),
event_stream_tx: {
let cap = subscribe_buf.unwrap_or(4096).max(1);
let (tx, _) = tokio::sync::broadcast::channel::<(FileEvent, String)>(cap);
Some(tx)
},
local_time,
metrics: MetricsRegistry::new(metrics_interval.is_some()),
temp_parent_marks: HashMap::new(),
watchdog: Some(Watchdog::new(watchdog_interval)),
};
if debug {
eprintln!(
"[DEBUG] Monitor initialized with {} path entries:",
paths_and_options.len()
);
for (i, (p, o)) in paths_and_options.iter().enumerate() {
let label = o.cmd.as_deref().unwrap_or("global");
eprintln!(
"[DEBUG] [{}] {} cmd={} recursive={}",
i,
p.display(),
label,
o.recursive
);
}
eprintln!("[DEBUG] log_dir: {:?}", monitor.log_dir);
eprintln!("[DEBUG] buffer_size: {}", buffer_size);
}
Ok(monitor)
}
pub fn from_config(cfg: MonitorConfig) -> Result<Self> {
Self::new(
cfg.paths_and_options,
cfg.log_dir,
cfg.monitored_path,
cfg.buffer_size,
cfg.socket_listener,
cfg.debug,
cfg.cache_config,
cfg.disk_min_free,
cfg.sync_interval,
cfg.subscribe_buf,
cfg.local_time,
cfg.metrics_interval,
cfg.watchdog_interval,
)
}
pub async fn run(&mut self) -> Result<()> {
self.check_root()?;
let proc_conn = self.init_process_cache();
let fan_group_count = self.init_fanotify()?;
self.init_logging()?;
self.print_startup_status(fan_group_count);
let (mut event_rx, dir_cache) = self.spawn_tasks();
let mut sigterm =
signal(SignalKind::terminate()).context("failed to create SIGTERM signal handler")?;
let mut sighup =
signal(SignalKind::hangup()).context("failed to create SIGHUP signal handler")?;
let socket_listener = self.socket_listener.take();
let inotify_async = self.inotify.as_ref().map(|ino| {
let fd = ino.as_raw_fd();
AsyncFd::new(crate::fid_parser::FanFd(fd)).expect("inotify AsyncFd")
});
let proc_afd = proc_conn.and_then(|conn| {
let fd = conn.as_raw_fd();
match AsyncFd::new(conn) {
Ok(a) => Some(a),
Err(e) => {
eprintln!("[ERROR] AsyncFd for proc connector fd {}: {}", fd, e);
None
}
}
});
let mut proc_buf = vec![0u8; 65536];
let proc_cache = self.proc_cache.clone().unwrap();
let pid_tree = self.pid_tree.clone().unwrap();
let mut reader_death_rx = std::mem::replace(
&mut self.reader_death_rx,
tokio::sync::mpsc::unbounded_channel::<FsGroupKey>().1,
);
if let Err(e) = crate::watchdog::sd_notify(libsystemd::daemon::NotifyState::Ready) {
eprintln!("[WARNING] systemd notify READY failed: {}", e);
}
let mut heartbeat_tick: Option<tokio::time::Interval> =
self.watchdog.as_ref().and_then(|wd| {
if !wd.is_enabled() {
return None;
}
if self.debug {
debug_log!(
self.debug,
"systemd watchdog enabled (interval: {}s, heartbeat in main loop)",
wd.interval().as_secs()
);
}
let start = tokio::time::Instant::now() + wd.interval();
Some(tokio::time::interval_at(start, wd.interval()))
});
let mut metrics_tick: Option<tokio::time::Interval> =
self.metrics_interval.map(tokio::time::interval);
loop {
tokio::select! {
Some(events) = event_rx.recv() => {
self.drain_proc_events(&proc_afd, &mut proc_buf, &proc_cache, &pid_tree);
let mut pending = self.process_event_batch(&events);
self.drain_proc_events(&proc_afd, &mut proc_buf, &proc_cache, &pid_tree);
self.patch_pending_events(&mut pending);
self.send_pending_events(&pending);
self.check_pending();
}
_ = tokio::signal::ctrl_c() => {
self.drain_remaining_events(&mut event_rx, &proc_afd, &mut proc_buf, &proc_cache, &pid_tree);
break;
}
_ = sigterm.recv() => {
self.drain_remaining_events(&mut event_rx, &proc_afd, &mut proc_buf, &proc_cache, &pid_tree);
break;
}
_ = sighup.recv() => {
if let Err(e) = self.reload_config() {
eprintln!("Config reload error: {e}");
}
}
_ = async {
match metrics_tick.as_mut() {
Some(t) => t.tick().await,
None => std::future::pending().await,
}
} => {
let report = self.collect_metrics(&dir_cache, &proc_cache, &pid_tree);
eprintln!(
"[metrics] uptime={}s rss={:.1}MB caches(d/p/t/f)={}/{}/{}/{} readers={}/{}/{} subs={} paths={} pending={} disk_buf={}",
report.uptime_secs,
report.rss_mb,
report.dir_cache_entries,
report.proc_cache_entries,
report.pid_tree_entries,
report.file_size_cache_entries,
report.reader_groups_total,
report.reader_groups_alive,
report.reader_groups_gave_up,
report.subscribers,
report.monitored_paths,
report.pending_paths,
report.disk_buffer_events,
);
}
_ = async {
match heartbeat_tick.as_mut() {
Some(t) => t.tick().await,
None => std::future::pending().await,
}
} => {
if let Some(ref wd) = self.watchdog
&& let Err(e) = wd.send_heartbeat()
{
eprintln!("[ERROR] systemd watchdog notify failed: {}", e);
}
}
proc_readable = async {
match proc_afd.as_ref() {
Some(afd) => afd.readable().await,
None => std::future::pending().await,
}
} => {
if let Ok(mut guard) = proc_readable {
loop {
match guard.get_inner().recv_raw(&mut proc_buf) {
Ok(n) => {
proc_cache::handle_proc_events(&proc_cache, &pid_tree, &proc_buf, n);
}
Err(proc_connector::Error::WouldBlock) => break,
Err(proc_connector::Error::Interrupted) => continue,
Err(e) => {
eprintln!("proc connector error: {e}");
break;
}
}
}
guard.clear_ready();
}
}
inotify_ready = async {
match inotify_async.as_ref() {
Some(afd) => afd.readable().await,
None => std::future::pending().await,
}
} => {
debug_log!(self.debug, "inotify fd became readable");
if let Ok(mut guard) = inotify_ready {
self.handle_inotify_events();
guard.clear_ready();
}
}
accept_result = async {
match socket_listener.as_ref() {
Some(listener) => Self::accept_socket_connection(listener).await,
None => std::future::pending().await,
}
} => {
self.handle_socket_accept(accept_result).await;
}
Some(dead_idx) = reader_death_rx.recv() => {
self.restart_reader(dead_idx);
}
}
}
println!("\nStopping file trace monitor...");
drop(self.event_stream_tx.take());
Ok(())
}
pub(crate) fn collect_metrics(
&self,
dir_cache: &moka::sync::Cache<fanotify_fid::types::HandleKey, std::path::PathBuf>,
proc_cache: &crate::proc_cache::ProcCache,
pid_tree: &crate::proc_cache::PidTree,
) -> MetricsReport {
let reader_groups_alive = self.reader_states.values().filter(|s| !s.gave_up).count() as u64;
let reader_groups_gave_up =
self.reader_states.values().filter(|s| s.gave_up).count() as u64;
MetricsReport {
uptime_secs: self.started_at.elapsed().as_secs(),
rss_mb: get_rss_mb(),
dir_cache_entries: dir_cache.entry_count(),
proc_cache_entries: proc_cache.entry_count(),
pid_tree_entries: pid_tree.entry_count(),
file_size_cache_entries: self.file_size_cache.len() as u64,
reader_groups_total: self.fs_groups.len() as u64,
reader_groups_alive,
reader_groups_gave_up,
subscribers: self.metrics.subscribers() as u64,
monitored_paths: self.metrics.monitored_paths() as u64,
pending_paths: self.metrics.pending_paths() as u64,
disk_buffer_events: self.metrics.disk_buffer_events() as u64,
}
}
fn send_pending_events(&self, pending: &[PendingEvent]) {
if let Some(ref tx) = self.event_stream_tx {
for pe in pending {
let _ = tx.send((pe.event.clone(), pe.cmd_name.clone()));
}
}
}
fn drain_proc_events(
&self,
proc_afd: &Option<AsyncFd<proc_connector::ProcConnector>>,
proc_buf: &mut [u8],
proc_cache: &ProcCache,
pid_tree: &PidTree,
) {
if let Some(afd) = proc_afd.as_ref() {
let conn = afd.get_ref();
loop {
match conn.recv_raw(proc_buf) {
Ok(n) => {
proc_cache::handle_proc_events(proc_cache, pid_tree, proc_buf, n);
}
Err(proc_connector::Error::WouldBlock) => break,
Err(proc_connector::Error::Interrupted) => continue,
Err(e) => {
eprintln!("proc connector error: {e}");
break;
}
}
}
}
}
fn drain_remaining_events(
&mut self,
event_rx: &mut EventReceiver,
proc_afd: &Option<AsyncFd<proc_connector::ProcConnector>>,
proc_buf: &mut [u8],
proc_cache: &ProcCache,
pid_tree: &PidTree,
) {
while let Ok(events) = event_rx.try_recv() {
self.drain_proc_events(proc_afd, proc_buf, proc_cache, pid_tree);
let mut pending = self.process_event_batch(&events);
self.patch_pending_events(&mut pending);
self.send_pending_events(&pending);
}
}
async fn accept_socket_connection(
listener: &tokio::net::UnixListener,
) -> Result<(tokio::net::unix::OwnedWriteHalf, String), std::io::Error> {
let (stream, _) = listener.accept().await?;
let (reader, writer) = stream.into_split();
let mut buf_reader = tokio::io::BufReader::new(reader);
let mut message = String::new();
loop {
let mut line = String::new();
let bytes = buf_reader.read_line(&mut line).await?;
if bytes == 0 {
break;
}
if line.trim().is_empty() && !message.is_empty() {
break;
}
message.push_str(&line);
}
Ok((writer, message.trim().to_string()))
}
async fn handle_socket_accept(
&mut self,
result: Result<(tokio::net::unix::OwnedWriteHalf, String), std::io::Error>,
) {
match result {
Ok((writer, cmd_str)) => {
let cmd = match serde_json::from_str::<crate::socket::SocketCmd>(&cmd_str) {
Ok(c) => c,
Err(e) => {
let resp: Result<
crate::socket::SocketResponse,
crate::socket::SocketError,
> = Err(crate::socket::SocketError::Transient(format!(
"Invalid command: {e}"
)));
if let Ok(json_str) = serde_json::to_string(&resp) {
let _ = tokio_io_oneshot(writer, &format!("{json_str}\n")).await;
}
return;
}
};
if let crate::socket::SocketCmd::Subscribe { .. } = cmd {
self.handle_subscribe(writer, &cmd);
} else {
let result = self.handle_socket_cmd(cmd);
if let Ok(json_str) = serde_json::to_string(&result) {
let resp_bytes = format!("{json_str}\n");
let _ = tokio_io_oneshot(writer, &resp_bytes).await;
}
}
}
Err(e) => eprintln!("Socket accept error: {e}"),
}
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct MetricsReport {
pub uptime_secs: u64,
pub rss_mb: f64,
pub dir_cache_entries: u64,
pub proc_cache_entries: u64,
pub pid_tree_entries: u64,
pub file_size_cache_entries: u64,
pub reader_groups_total: u64,
pub reader_groups_alive: u64,
pub reader_groups_gave_up: u64,
pub subscribers: u64,
pub monitored_paths: u64,
pub pending_paths: u64,
pub disk_buffer_events: u64,
}
fn get_rss_mb() -> f64 {
std::fs::read_to_string("/proc/self/statm")
.ok()
.and_then(|s| {
let parts: Vec<&str> = s.split_whitespace().collect();
parts.get(1).and_then(|p| p.parse::<u64>().ok())
})
.map(|pages| (pages * 4096) as f64 / (1024.0 * 1024.0))
.unwrap_or(0.0)
}
#[cfg(test)]
#[path = "tests.rs"]
mod tests;