use super::FsGroupKey;
use anyhow::{Context, Result, bail};
use fanotify_fid::consts::{
FAN_CLASS_NOTIF, FAN_CLOEXEC, FAN_NONBLOCK, FAN_REPORT_DIR_FID, FAN_REPORT_FID, FAN_REPORT_NAME,
};
use fanotify_fid::prelude::*;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use super::{EventReceiver, EventSender, FileLogWriter, Monitor};
use crate::dir_cache;
use crate::fid_parser::{DIR_CACHE_CAP, FsGroup, chown_to_user, mark_directory, mark_recursive};
use crate::filters::PathOptions;
use crate::monitored::PathEntry;
use crate::proc_cache;
use crate::proc_cache::{PID_TREE_CAP, PROC_CACHE_CAP};
use crate::utils::format_size;
use proc_connector::ProcConnector;
impl Monitor {
pub(crate) fn check_root(&self) -> Result<()> {
if nix::unistd::geteuid().as_raw() != 0 {
let hint = if let Ok(exe) = std::env::current_exe() {
if exe.to_string_lossy().contains(".cargo/bin") {
"\n\nHint: It looks like fsmon was installed via cargo install (~/.cargo/bin).\n\
sudo cannot find it because ~/.cargo/bin is not in sudo's secure_path.\n\
Please either:\n\
1. Copy to system path: sudo cp ~/.cargo/bin/fsmon /usr/local/bin/\n\
2. Or use full path: sudo ~/.cargo/bin/fsmon monitor ..."
} else {
""
}
} else {
""
};
bail!(
"fanotify requires root privileges, please run with sudo{}",
hint
);
}
Ok(())
}
pub(crate) fn init_process_cache(&mut self) -> Option<ProcConnector> {
let proc_conn = proc_cache::try_create_connector();
let proc_cache =
proc_cache::DefaultCache::new(PROC_CACHE_CAP, self.cache_config.proc_ttl_secs);
self.proc_cache = Some(proc_cache.clone());
let pid_tree = proc_cache::DefaultTree::new(PID_TREE_CAP, self.cache_config.proc_ttl_secs);
proc_tree::snapshot(&pid_tree, &proc_cache);
self.pid_tree = Some(pid_tree.clone());
proc_conn
}
pub(crate) fn init_fanotify(&mut self) -> Result<usize> {
let combined_mask = self
.monitored_entries
.iter()
.map(|(_, opts)| crate::fid_parser::path_mask_from_options(opts))
.fold(0, |a, b| a | b);
debug_log!(self.debug, "combined fanotify mask: {:#x}", combined_mask);
let mut keep_paths: Vec<PathBuf> = Vec::new();
for path in std::mem::take(&mut self.paths) {
if path.exists() {
let canonical = path.canonicalize().unwrap_or_else(|_| path.clone());
self.canonical_paths.push(canonical);
keep_paths.push(path);
} else {
eprintln!(
"[INFO] Path '{}' does not exist yet — will start monitoring when created.",
path.display()
);
let pending_opts: Vec<PathOptions> = self
.monitored_entries
.iter()
.filter(|(p, _)| p == &path)
.map(|(_, o)| o.clone())
.collect();
self.monitored_entries.retain(|(p, _)| p != &path);
for opts in pending_opts {
self.pending_paths.push((
path.clone(),
PathEntry {
path: path.clone(),
recursive: Some(opts.recursive),
types: opts
.event_types
.as_ref()
.map(|v| v.iter().map(|t| t.to_string()).collect()),
size: opts
.size_filter
.map(|f| format!("{}{}", f.op, format_size(f.bytes))),
cmd: opts.cmd,
},
));
}
}
}
self.paths = keep_paths;
self.inotify = Some(inotify::Inotify::init().context("inotify_init")?);
self.setup_inotify_watches();
let mut fs_group_devs: std::collections::HashMap<u64, FsGroupKey> =
std::collections::HashMap::new();
for (i, canonical) in self.canonical_paths.iter().enumerate() {
let path_mask = combined_mask;
let dev_id = std::fs::metadata(canonical)
.ok()
.map(|m| std::os::linux::fs::MetadataExt::st_dev(&m))
.unwrap_or(0);
if let Some(&key) = fs_group_devs.get(&dev_id) {
let fan_fd = &self.fs_groups[key].fan_fd;
if let Err(e) = mark_directory(fan_fd, path_mask, canonical) {
eprintln!(
"[WARNING] Cannot inode-mark {} on fd {}: {:#}",
canonical.display(),
fan_fd.as_raw_fd(),
e
);
} else {
eprintln!(
"[INFO] Added {} (inode mark) on existing fd {}",
canonical.display(),
fan_fd.as_raw_fd()
);
let opts = self.paths.get(i).and_then(|p| self.first_opt_for_path(p));
if opts.is_some_and(|o| o.recursive) && canonical.is_dir() {
mark_recursive(fan_fd, path_mask, canonical);
}
}
self.fs_groups[key].ref_count += 1;
self.path_to_group.insert(self.paths[i].clone(), key);
continue;
}
let new_fd = fanotify_init(
FAN_CLOEXEC
| FAN_NONBLOCK
| FAN_CLASS_NOTIF
| FAN_REPORT_FID
| FAN_REPORT_DIR_FID
| FAN_REPORT_NAME,
(libc::O_CLOEXEC | libc::O_RDONLY) as u32,
)
.with_context(|| {
format!(
"fanotify_init failed for {} (requires Linux 5.9+ kernel)",
canonical.display()
)
})?;
let opts = self.paths.get(i).and_then(|p| self.first_opt_for_path(p));
let recursive = opts.is_some_and(|o| o.recursive) && canonical.is_dir();
if self
.add_mark_upward(&new_fd, path_mask, canonical, recursive)
.is_none()
{
drop(new_fd);
continue;
}
let mount_fd = match Self::open_dir(canonical) {
Ok(fd) => fd,
Err(e) => {
eprintln!(
"[WARNING] Could not open directory fd for {}: {}",
canonical.display(),
e
);
drop(new_fd);
continue;
}
};
let key = self.fs_groups.insert(FsGroup {
dev_id,
fan_fd: new_fd,
mount_fd,
ref_count: 1,
});
fs_group_devs.insert(dev_id, key);
self.path_to_group.insert(self.paths[i].clone(), key);
}
let fan_group_count = self.fs_groups.len();
if fan_group_count > 0 {
for (i, canonical) in self.canonical_paths.iter().enumerate() {
if canonical.is_dir() {
let opts = self.paths.get(i).and_then(|p| self.first_opt_for_path(p));
let recursive = opts.is_some_and(|o| o.recursive);
if recursive {
dir_cache::cache_recursive(&self.dir_cache, canonical);
} else {
dir_cache::cache_dir_handle(&self.dir_cache, canonical);
}
}
}
} else if self.pending_paths.is_empty() {
eprintln!(
"No entries configured. Waiting for socket commands (use 'fsmon add <cmd> --path <path>')."
);
}
Ok(fan_group_count)
}
pub(crate) fn init_logging(&self) -> Result<()> {
if let Some(ref dir) = self.log_dir {
std::fs::create_dir_all(dir)
.with_context(|| format!("Failed to create log directory {}", dir.display()))?;
match chown_to_user(dir) {
Ok(true) => {}
Ok(false) => {
eprintln!(
"[WARNING] Log directory '{}' is on a filesystem that does not support\n ownership changes (e.g. vfat/exfat/NFS). Log files will remain owned by root.\n Run 'sudo fsmon clean' if you cannot clean logs as a normal user.",
dir.display()
);
}
Err(e) => {
eprintln!(
"[WARNING] Could not chown log directory '{}': {}.\n Log files may remain owned by root.",
dir.display(),
e
);
}
}
}
if let Some(ref threshold_str) = self.disk_min_free
&& let Some(ref dir) = self.log_dir
{
Self::check_disk_space(dir, threshold_str);
}
Ok(())
}
pub(crate) fn print_startup_status(&self, fan_group_count: usize) {
println!("Starting file trace monitor...");
self.metrics
.set_monitored_paths(self.monitored_entries.len() as i64);
self.metrics
.set_pending_paths(self.pending_paths.len() as i64);
self.metrics.set_reader_groups(self.fs_groups.len() as i64);
if !self.canonical_paths.is_empty() {
println!("Active paths ({} fd(s)):", fan_group_count);
for (path, opts) in &self.monitored_entries {
let label = match opts.cmd {
Some(ref name) => format!("[{}]", name),
None => "[global]".to_string(),
};
println!(" {} {}", label, path.display());
}
}
if self.debug {
debug_log!(
self.debug,
"monitored_entries ({} entries, full list):",
self.monitored_entries.len()
);
for (i, (p, o)) in self.monitored_entries.iter().enumerate() {
debug_log!(
self.debug,
" [{}] {} cmd={} recursive={}",
i,
p.display(),
o.cmd.as_deref().unwrap_or("global"),
o.recursive
);
}
debug_log!(self.debug, "--- cache stats ---");
debug_log!(
self.debug,
" dir_cache: {}/{} entries",
self.dir_cache.entry_count(),
DIR_CACHE_CAP
);
if let Some(ref c) = self.proc_cache {
debug_log!(
self.debug,
" proc_cache: {}/{} entries",
c.len(),
PROC_CACHE_CAP
);
}
if let Some(ref t) = self.pid_tree {
debug_log!(
self.debug,
" pid_tree: {}/{} entries",
t.len(),
PID_TREE_CAP
);
}
debug_log!(
self.debug,
" file_size_cache: {}/{} entries",
self.file_size_cache.len(),
self.file_size_cache.cap()
);
}
if !self.pending_paths.is_empty() {
println!("Pending paths (waiting for directory creation):");
let mut by_cmd: std::collections::BTreeMap<Option<String>, Vec<&PathBuf>> =
std::collections::BTreeMap::new();
for (path, entry) in &self.pending_paths {
let cmd = entry.cmd.as_deref().and_then(|c| {
if c == crate::monitored::CMD_GLOBAL {
None
} else {
Some(c.to_string())
}
});
by_cmd.entry(cmd).or_default().push(path);
}
for (cmd, paths) in &by_cmd {
let label = match cmd {
Some(name) => format!("[{}]", name),
None => "[global]".to_string(),
};
for path in paths {
println!(" {} {}", label, path.display());
}
}
}
}
pub(crate) fn spawn_tasks(
&mut self,
) -> (
EventReceiver,
moka::sync::Cache<fanotify_fid::types::HandleKey, std::path::PathBuf>,
) {
let (event_tx, event_rx) = match self.cache_config.channel_capacity {
Some(cap) if cap > 0 => {
let (tx, rx) = tokio::sync::mpsc::channel(cap);
(EventSender::Bounded(tx), EventReceiver::Bounded(rx))
}
_ => {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
(EventSender::Unbounded(tx), EventReceiver::Unbounded(rx))
}
};
let dir_cache = self.dir_cache.clone();
self.event_tx = Some(event_tx);
self.shared_dir_cache = Some(dir_cache.clone());
let keys: Vec<_> = self.fs_groups.keys().collect();
for key in keys {
self.spawn_fd_reader(key);
}
let fw_log_dir = self.log_dir.clone();
let fw_sync = self.sync_interval;
let fw_debug = self.debug;
let fw_local = self.local_time;
let fw_metrics = self.metrics.clone();
if let Some(fw_log_dir) = fw_log_dir
&& let Some(ref tx) = self.event_stream_tx
{
let fw_rx = tx.subscribe();
let fw = FileLogWriter::new(fw_log_dir, fw_sync, fw_debug, fw_local, fw_metrics);
tokio::spawn(async move {
fw.run(fw_rx).await;
});
}
(event_rx, dir_cache)
}
}