use anyhow::{Context, Result, bail};
use chrono::Utc;
use fanotify_fid::prelude::*;
use fanotify_fid::types::FidEvent;
use fanotify_fid::consts::{
AT_FDCWD, FAN_CLASS_NOTIF, FAN_CLOEXEC, FAN_MARK_ADD,
FAN_MARK_FILESYSTEM, FAN_MARK_REMOVE, FAN_NONBLOCK, FAN_Q_OVERFLOW,
FAN_REPORT_DIR_FID, FAN_REPORT_FID, FAN_REPORT_NAME,
};
use dashmap::DashMap;
use std::collections::HashMap;
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::num::NonZeroUsize;
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use lru::LruCache;
use tokio::io::unix::AsyncFd;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::signal::unix::{SignalKind, signal};
use crate::dir_cache;
use crate::proc_cache::{self, ProcCache, ProcInfo};
use crate::socket::{SocketCmd, SocketResp};
use crate::managed::PathEntry;
use crate::managed::Managed;
use crate::utils::{format_size, get_process_info_by_pid, parse_size_filter};
use crate::filters::{self, PathOptions};
use crate::fid_parser::{FanFd, FsGroup, read_fid_events_dashmap, mask_to_event_types,
path_mask_from_options, mark_directory, mark_recursive, chown_to_user,
FILE_SIZE_CACHE_CAP, PROC_CONNECTOR_TIMEOUT_SECS};
use crate::{EventType, FileEvent};
pub struct Monitor {
paths: Vec<PathBuf>,
canonical_paths: Vec<PathBuf>,
path_options: HashMap<PathBuf, PathOptions>,
log_dir: Option<PathBuf>,
managed_path: Option<PathBuf>,
proc_cache: Option<ProcCache>,
file_size_cache: LruCache<PathBuf, u64>,
pid_cache: LruCache<u32, ProcInfo>,
buffer_size: usize,
socket_listener: Option<tokio::net::UnixListener>,
fs_groups: Vec<FsGroup>,
path_to_group: HashMap<PathBuf, usize>,
dir_cache: DashMap<fanotify_fid::types::HandleKey, PathBuf>,
event_tx: Option<tokio::sync::mpsc::UnboundedSender<Vec<FidEvent>>>,
shared_dir_cache: Option<Arc<DashMap<fanotify_fid::types::HandleKey, PathBuf>>>,
pending_paths: Vec<(PathBuf, PathEntry)>,
inotify: Option<inotify::Inotify>,
_inotify_watches: Vec<inotify::WatchDescriptor>,
}
impl Monitor {
pub fn new(
paths_and_options: Vec<(PathBuf, PathOptions)>,
log_dir: Option<PathBuf>,
managed_path: Option<PathBuf>,
buffer_size: Option<usize>,
socket_listener: Option<tokio::net::UnixListener>,
) -> Result<Self> {
let buffer_size = buffer_size.unwrap_or(4096 * 8);
if buffer_size < 4096 {
bail!("buffer_size must be at least 4096 bytes (4KB)");
}
if buffer_size > 1024 * 1024 {
bail!("buffer_size must not exceed 1048576 bytes (1MB)");
}
let mut paths = Vec::new();
let mut path_options = HashMap::new();
let log_dir_canonical = log_dir.as_ref().map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()));
for (path, opts) in &paths_and_options {
let resolved = filters::resolve_recursion_check(path);
if let Some(ref log_dir) = log_dir_canonical
&& log_dir.starts_with(&resolved)
{
bail!(
"Cannot monitor '{}': log directory '{}' is inside this path — \
would cause infinite recursion on every log write.\n\
Tip: fsmon remove {} or exclude the log directory with --exclude \
or use a different logging.path",
path.display(),
log_dir_canonical.as_ref().unwrap().display(),
path.display()
);
}
path_options.insert(resolved.clone(), opts.clone());
paths.push(resolved.clone());
}
Ok(Self {
paths,
canonical_paths: Vec::new(),
path_options,
log_dir,
managed_path,
proc_cache: None,
file_size_cache: LruCache::new(NonZeroUsize::new(FILE_SIZE_CACHE_CAP).unwrap()),
pid_cache: LruCache::new(NonZeroUsize::new(4096).unwrap()),
buffer_size,
socket_listener,
fs_groups: Vec::new(),
path_to_group: HashMap::new(),
dir_cache: DashMap::new(),
event_tx: None,
shared_dir_cache: None,
pending_paths: Vec::new(),
inotify: None,
_inotify_watches: Vec::new(),
})
}
fn dup_fd(fd: &impl AsRawFd) -> std::io::Result<OwnedFd> {
let new_raw = nix::unistd::dup(fd.as_raw_fd())
.map_err(|e| std::io::Error::other(e))?;
Ok(unsafe { OwnedFd::from_raw_fd(new_raw) })
}
fn open_dir(path: &Path) -> std::io::Result<OwnedFd> {
let raw = nix::fcntl::open(
path,
nix::fcntl::OFlag::O_DIRECTORY,
nix::sys::stat::Mode::empty(),
)
.map_err(|e| std::io::Error::other(e))?;
Ok(unsafe { OwnedFd::from_raw_fd(raw) })
}
pub async fn run(&mut self) -> Result<()> {
if nix::unistd::geteuid().as_raw() != 0 {
let hint = if let Ok(exe) = std::env::current_exe() {
if exe.to_string_lossy().contains(".cargo/bin") {
"\n\nHint: It looks like fsmon was installed via cargo install (~/.cargo/bin).\n\
sudo cannot find it because ~/.cargo/bin is not in sudo's secure_path.\n\
Please either:\n\
1. Copy to system path: sudo cp ~/.cargo/bin/fsmon /usr/local/bin/\n\
2. Or use full path: sudo ~/.cargo/bin/fsmon monitor ..."
} else {
""
}
} else {
""
};
bail!(
"fanotify requires root privileges, please run with sudo{}",
hint
);
}
let (proc_cache, proc_ready) = proc_cache::start_proc_listener();
self.proc_cache = Some(proc_cache);
let deadline = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(PROC_CONNECTOR_TIMEOUT_SECS);
let mut poll_interval = tokio::time::Duration::from_millis(1);
loop {
if proc_ready.load(std::sync::atomic::Ordering::Acquire) {
break;
}
if tokio::time::Instant::now() >= deadline {
eprintln!(
"[WARNING] proc connector subscription timed out after {}s. \
Process name attribution may be incomplete. Monitoring continues.",
PROC_CONNECTOR_TIMEOUT_SECS
);
self.proc_cache = None;
break;
}
tokio::time::sleep(poll_interval).await;
poll_interval = (poll_interval * 2).min(tokio::time::Duration::from_millis(50));
}
let combined_mask = self.path_options.values()
.map(path_mask_from_options)
.fold(0, |a, b| a | b);
let mut keep_paths: Vec<PathBuf> = Vec::new();
let mut keep_opts = HashMap::new();
for path in std::mem::take(&mut self.paths) {
let opts = self.path_options.remove(&path)
.expect("path in paths but not in path_options");
if path.exists() {
let canonical = path.canonicalize().unwrap_or_else(|_| path.clone());
self.canonical_paths.push(canonical);
keep_paths.push(path.clone());
keep_opts.insert(path, opts);
} else {
eprintln!(
"[INFO] Path '{}' does not exist yet — will start monitoring when created.",
path.display()
);
let entry_path = path.clone();
self.pending_paths.push((path, PathEntry {
path: entry_path,
recursive: Some(opts.recursive),
types: opts.event_types.as_ref().map(
|v| v.iter().map(|t| t.to_string()).collect()
),
size: opts.size_filter.map(|f| format!("{}{}", f.op, format_size(f.bytes))),
exclude: opts.exclude_regex.as_ref().map(|r| vec![r.as_str().to_string()]),
exclude_cmd: None,
}));
}
}
self.paths = keep_paths;
self.path_options = keep_opts;
self.inotify = Some(inotify::Inotify::init().context("inotify_init")?);
self.setup_inotify_watches();
let mut fs_group_devs: Vec<u64> = Vec::new();
for (i, canonical) in self.canonical_paths.iter().enumerate() {
let path_mask = combined_mask;
let dev_id = std::fs::metadata(canonical)
.ok()
.map(|m| std::os::linux::fs::MetadataExt::st_dev(&m))
.unwrap_or(0);
let mut reuse_idx = None;
for (gi, gdev) in fs_group_devs.iter().enumerate() {
if *gdev == dev_id {
reuse_idx = Some(gi);
break;
}
}
if let Some(gi) = reuse_idx {
let group = &self.fs_groups[gi];
if !group.is_fs_mark {
let fan_fd = &group.fan_fd;
if let Err(e) = mark_directory(fan_fd, path_mask, canonical) {
eprintln!(
"[WARNING] Cannot inode-mark {} on fd {}: {:#}",
canonical.display(),
fan_fd.as_raw_fd(),
e
);
} else {
eprintln!(
"[INFO] Monitoring {} (inode mark) on existing fd {}",
canonical.display(),
fan_fd.as_raw_fd()
);
let opts = self.paths.get(i).and_then(|p| self.path_options.get(p));
if opts.is_some_and(|o| o.recursive) && canonical.is_dir() {
mark_recursive(fan_fd, path_mask, canonical);
}
}
}
self.fs_groups[gi].ref_count += 1;
self.path_to_group.insert(self.paths[i].clone(), gi);
continue;
}
let new_fd = fanotify_init(
FAN_CLOEXEC
| FAN_NONBLOCK
| FAN_CLASS_NOTIF
| FAN_REPORT_FID
| FAN_REPORT_DIR_FID
| FAN_REPORT_NAME,
(libc::O_CLOEXEC | libc::O_RDONLY) as u32,
)
.with_context(|| {
format!(
"fanotify_init failed for {} (requires Linux 5.9+ kernel)",
canonical.display()
)
})?;
let (is_fs_mark, _) = match fanotify_mark(
&new_fd,
FAN_MARK_ADD | FAN_MARK_FILESYSTEM,
path_mask,
AT_FDCWD,
canonical,
) {
Ok(()) => {
eprintln!(
"[INFO] Monitoring {} (filesystem mark) on fd {}",
canonical.display(),
new_fd.as_raw_fd()
);
(true, true)
}
Err(FanotifyError::Mark(code)) if code == libc::EXDEV => {
match mark_directory(&new_fd, path_mask, canonical) {
Ok(()) => {
eprintln!(
"[INFO] Monitoring {} (inode mark) on fd {}",
canonical.display(),
new_fd.as_raw_fd()
);
let opts = self.paths.get(i).and_then(|p| self.path_options.get(p));
if opts.is_some_and(|o| o.recursive) && canonical.is_dir() {
mark_recursive(&new_fd, path_mask, canonical);
}
(false, true)
}
Err(e) => {
eprintln!(
"[WARNING] Cannot monitor {} (inode mark): {:#}",
canonical.display(),
e
);
drop(new_fd);
continue;
}
}
}
Err(e) => {
eprintln!("[WARNING] Cannot monitor {}: {:#}", canonical.display(), e);
drop(new_fd);
continue;
}
};
if !is_fs_mark {
}
let mount_fd = match Self::open_dir(canonical) {
Ok(fd) => fd,
Err(e) => {
eprintln!(
"[WARNING] Could not open directory fd for {}: {}",
canonical.display(),
e
);
drop(new_fd);
continue;
}
};
let gi = self.fs_groups.len();
self.fs_groups.push(FsGroup {
dev_id,
is_fs_mark,
fan_fd: new_fd,
mount_fd,
ref_count: 1,
});
fs_group_devs.push(dev_id);
self.path_to_group.insert(self.paths[i].clone(), gi);
}
let fan_group_count = self.fs_groups.len();
if fan_group_count > 0 {
for (i, canonical) in self.canonical_paths.iter().enumerate() {
if canonical.is_dir() {
let opts = self.paths.get(i).and_then(|p| self.path_options.get(p));
let recursive = opts.is_some_and(|o| o.recursive);
if recursive {
dir_cache::cache_recursive(&self.dir_cache, canonical);
} else {
dir_cache::cache_dir_handle(&self.dir_cache, canonical);
}
}
}
} else if self.pending_paths.is_empty() {
eprintln!("No paths configured. Waiting for socket commands (use 'fsmon add <path>').");
}
if let Some(ref dir) = self.log_dir {
fs::create_dir_all(dir)
.with_context(|| format!("Failed to create log directory {}", dir.display()))?;
match chown_to_user(dir) {
Ok(true) => {}
Ok(false) => {
eprintln!("[WARNING] Log directory '{}' is on a filesystem that does not support\n ownership changes (e.g. vfat/exfat/NFS). Log files will remain owned by root.\n Run 'sudo fsmon clean' if you cannot clean logs as a normal user.", dir.display());
}
Err(e) => {
eprintln!("[WARNING] Could not chown log directory '{}': {}.\n Log files may remain owned by root.", dir.display(), e);
}
}
}
println!("Starting file trace monitor...");
if !self.canonical_paths.is_empty() {
println!(
"Active paths: {}",
self.canonical_paths
.iter()
.map(|p| p.display().to_string())
.collect::<Vec<_>>()
.join(", "),
);
println!(" FDs: {} file-descriptor(s)", fan_group_count);
}
if !self.pending_paths.is_empty() {
println!(
"Pending paths (waiting for directory creation): {}",
self.pending_paths
.iter()
.map(|(p, _)| p.display().to_string())
.collect::<Vec<_>>()
.join(", "),
);
}
let (event_tx, mut event_rx) =
tokio::sync::mpsc::unbounded_channel::<Vec<FidEvent>>();
let dir_cache = Arc::new(std::mem::take(&mut self.dir_cache));
let buf_size = self.buffer_size;
self.event_tx = Some(event_tx.clone());
self.shared_dir_cache = Some(Arc::clone(&dir_cache));
for gi in 0..self.fs_groups.len() {
let owned_fan_fd = match Self::dup_fd(&self.fs_groups[gi].fan_fd) {
Ok(fd) => fd,
Err(e) => {
eprintln!(
"[ERROR] Failed to dup fanotify fd {}: {}",
self.fs_groups[gi].fan_fd.as_raw_fd(),
e
);
continue;
}
};
let owned_mount_fd = match Self::dup_fd(&self.fs_groups[gi].mount_fd) {
Ok(fd) => fd,
Err(e) => {
eprintln!(
"[ERROR] Failed to dup mount fd {}: {}",
self.fs_groups[gi].mount_fd.as_raw_fd(),
e
);
continue;
}
};
let mfds = Arc::new(vec![owned_mount_fd]);
let tx = event_tx.clone();
let dc = Arc::clone(&dir_cache);
let raw_fd = owned_fan_fd.as_raw_fd();
tokio::spawn(async move {
let afd = match AsyncFd::new(owned_fan_fd) {
Ok(a) => a,
Err(e) => {
eprintln!("[ERROR] AsyncFd for fd {}: {}", raw_fd, e);
return;
}
};
let mut buf = vec![0u8; buf_size];
loop {
let result = afd.readable().await;
let mut guard = match result {
Ok(g) => g,
Err(e) => {
eprintln!("[ERROR] fd {} readable: {}", raw_fd, e);
break;
}
};
let events =
read_fid_events_dashmap(afd.get_ref(), &mfds, dc.as_ref(), &mut buf);
if !events.is_empty() && tx.send(events).is_err() {
break;
}
guard.clear_ready();
}
});
}
let mut sigterm =
signal(SignalKind::terminate()).context("failed to create SIGTERM signal handler")?;
let mut sighup =
signal(SignalKind::hangup()).context("failed to create SIGHUP signal handler")?;
let socket_listener = self.socket_listener.take();
let inotify_async = self.inotify.as_ref().map(|ino| {
let fd = ino.as_raw_fd();
AsyncFd::new(FanFd(fd)).expect("inotify AsyncFd")
});
loop {
tokio::select! {
Some(events) = event_rx.recv() => {
for raw in &events {
if raw.mask & FAN_Q_OVERFLOW != 0 {
eprintln!("[WARNING] fanotify queue overflow - some events may have been lost");
continue;
}
let event_types = mask_to_event_types(raw.mask);
let matched_path = self.matching_path(&raw.path).cloned();
let is_delete_self = event_types.contains(&EventType::DeleteSelf)
|| event_types.contains(&EventType::MovedFrom);
let is_canonical_root = is_delete_self
&& self.canonical_paths.iter().any(|cp| cp == &raw.path);
if is_canonical_root {
if let Some(ref path) = matched_path {
let opts = self.path_options.get(path);
let pending_entry = PathEntry {
path: path.clone(),
recursive: opts.map(|o| o.recursive),
types: opts.and_then(|o| o.event_types.as_ref().map(
|v| v.iter().map(|t| t.to_string()).collect()
)),
size: opts.and_then(|o| o.size_filter.map(|f| format!("{}{}", f.op, format_size(f.bytes)))),
exclude: opts.and_then(|o| o.exclude_regex.as_ref().map(|r| vec![r.as_str().to_string()])),
exclude_cmd: None,
};
if let Err(e) = self.remove_path(path) {
eprintln!("[WARNING] Failed to remove deleted path '{}': {e}", path.display());
}
self.pending_paths.push((path.clone(), pending_entry));
self.setup_inotify_watches();
}
continue;
}
for event_type in event_types {
let event = self.build_file_event(raw, event_type);
if !self.is_path_in_scope(&event.path) {
continue;
}
if self.should_output(&event)
&& let Err(e) = self.write_event(&event)
{
eprintln!("[ERROR] Failed to write event: {}", e);
}
}
}
}
_ = tokio::signal::ctrl_c() => {
break;
}
_ = sigterm.recv() => {
break;
}
_ = sighup.recv() => {
if let Err(e) = self.reload_config() {
eprintln!("Config reload error: {e}");
}
}
inotify_ready = async {
match inotify_async.as_ref() {
Some(afd) => afd.readable().await,
None => std::future::pending().await,
}
} => {
if let Ok(mut guard) = inotify_ready {
if let Some(ref mut inotify) = self.inotify {
let mut buf = [0u8; 4096];
let _ = inotify.read_events(&mut buf);
self.check_pending();
}
guard.clear_ready();
}
}
accept_result = async {
match socket_listener.as_ref() {
Some(listener) => {
let (stream, _) = listener.accept().await?;
let (reader, writer) = stream.into_split();
let mut buf_reader = tokio::io::BufReader::new(reader);
let mut message = String::new();
loop {
let mut line = String::new();
let bytes = buf_reader.read_line(&mut line).await?;
if bytes == 0 {
break;
}
if line.trim().is_empty() && !message.is_empty() {
break;
}
message.push_str(&line);
}
Ok::<(tokio::net::unix::OwnedWriteHalf, String), std::io::Error>((writer, message.trim().to_string()))
}
None => std::future::pending().await,
}
} => {
match accept_result {
Ok((mut writer, cmd_str)) => {
let resp = match toml::from_str::<SocketCmd>(&cmd_str) {
Ok(cmd) => self.handle_socket_cmd(cmd),
Err(e) => SocketResp::err(format!("Invalid command: {e}")),
};
if let Ok(toml_str) = toml::to_string(&resp) {
let resp_bytes = format!("{toml_str}\n");
let _ = writer.write_all(resp_bytes.as_bytes()).await;
}
}
Err(e) => eprintln!("Socket accept error: {e}"),
}
}
}
}
println!("\nStopping file trace monitor...");
Ok(())
}
fn build_file_event(
&mut self,
raw: &FidEvent,
event_type: EventType,
) -> FileEvent {
let pid = raw.pid.unsigned_abs();
let (cmd, user) = if let Some(info) = self.pid_cache.get(&pid) {
(info.cmd.clone(), info.user.clone())
} else {
let (cmd, user) =
get_process_info_by_pid(pid, &raw.path, self.proc_cache.as_ref());
if cmd != "unknown" || user != "unknown" {
self.pid_cache.put(
pid,
ProcInfo {
cmd: cmd.clone(),
user: user.clone(),
},
);
}
(cmd, user)
};
let file_size = match event_type {
EventType::Create | EventType::Modify | EventType::CloseWrite => {
let size = fs::metadata(&raw.path).map(|m| m.len()).unwrap_or(0);
self.file_size_cache.put(raw.path.clone(), size);
size
}
EventType::Delete | EventType::DeleteSelf | EventType::MovedFrom => {
self.file_size_cache.pop(&raw.path).unwrap_or(0)
}
_ => self.file_size_cache.get(&raw.path).map_or(0, |&s| s ),
};
FileEvent {
time: Utc::now(),
event_type,
path: raw.path.clone(),
pid,
cmd,
user,
file_size,
}
}
fn get_matching_path_options(&self, path: &Path) -> Option<&PathOptions> {
filters::get_matching_path_options(&self.paths, &self.path_options, &self.canonical_paths, path)
}
fn spawn_fd_reader(&mut self, group_idx: usize) {
let tx = match self.event_tx.as_ref() {
Some(t) => t.clone(),
None => {
eprintln!("[ERROR] Cannot spawn reader: event_tx not initialized");
return;
}
};
let dc = match self.shared_dir_cache.as_ref() {
Some(d) => Arc::clone(d),
None => {
eprintln!("[ERROR] Cannot spawn reader: shared_dir_cache not initialized");
return;
}
};
let buf_size = self.buffer_size;
let group = &self.fs_groups[group_idx];
let owned_fan_fd = match Self::dup_fd(&group.fan_fd) {
Ok(fd) => fd,
Err(e) => {
eprintln!(
"[ERROR] Failed to dup fanotify fd {}: {}",
group.fan_fd.as_raw_fd(),
e
);
return;
}
};
let owned_mount_fd = match Self::dup_fd(&group.mount_fd) {
Ok(fd) => fd,
Err(e) => {
eprintln!(
"[ERROR] Failed to dup mount fd {}: {}",
group.mount_fd.as_raw_fd(),
e
);
return;
}
};
let raw_fd = owned_fan_fd.as_raw_fd();
let mfds = Arc::new(vec![owned_mount_fd]);
tokio::spawn(async move {
let afd = match AsyncFd::new(owned_fan_fd) {
Ok(a) => a,
Err(e) => {
eprintln!("[ERROR] AsyncFd for fd {}: {}", raw_fd, e);
return;
}
};
let mut buf = vec![0u8; buf_size];
loop {
let result = afd.readable().await;
let mut guard = match result {
Ok(g) => g,
Err(e) => {
eprintln!("[ERROR] fd {} readable: {}", raw_fd, e);
break;
}
};
let events =
read_fid_events_dashmap(afd.get_ref(), &mfds, dc.as_ref(), &mut buf);
if !events.is_empty() && tx.send(events).is_err() {
break;
}
guard.clear_ready();
}
});
}
pub fn add_path(&mut self, entry: &PathEntry) -> Result<()> {
let path = filters::resolve_recursion_check(&entry.path);
if self.path_options.contains_key(&path) {
bail!("Path already being monitored: {}", path.display());
}
if let Some(ref log_dir) = self.log_dir
&& log_dir.canonicalize().unwrap_or_else(|_| log_dir.clone()).starts_with(&path)
{
bail!(
"Cannot monitor '{}': log directory '{}' is inside this path — \
every log write would trigger a new event, causing infinite recursion.\n\
Tip: exclude the log directory with --exclude or use a different logging.path",
path.display(),
log_dir.display()
);
}
if !path.exists() {
eprintln!(
"[INFO] Path '{}' does not exist yet — will start monitoring when created.",
path.display()
);
self.pending_paths.push((path.clone(), entry.clone()));
self.setup_inotify_watches();
return Ok(());
}
let canonical = path.canonicalize().unwrap_or_else(|_| path.clone());
let event_types = entry.types.as_ref().map(|types| {
types
.iter()
.filter_map(|s| s.parse::<EventType>().ok())
.collect()
});
let size_filter = entry.size.as_ref().map(|s| parse_size_filter(s)).transpose()?;
let (exclude_regex, exclude_invert) = filters::build_exclude_regex(entry.exclude.as_deref(), "exclude")?;
let (exclude_cmd_regex, exclude_cmd_invert) = filters::build_exclude_regex(entry.exclude_cmd.as_deref(), "--exclude-cmd")?;
let recursive = entry.recursive.unwrap_or(false);
let opts = PathOptions {
size_filter,
event_types,
exclude_regex,
exclude_invert,
exclude_cmd_regex,
exclude_cmd_invert,
recursive,
};
let path_mask = path_mask_from_options(&opts);
println!(
"Added path: {} (recursive={})",
path.display(),
recursive,
);
let dev_id = std::fs::metadata(&canonical)
.ok()
.map(|m| std::os::linux::fs::MetadataExt::st_dev(&m))
.unwrap_or(0);
let existing_idx = self.fs_groups.iter().position(|g| g.dev_id == dev_id);
let group_idx = if let Some(idx) = existing_idx {
if !self.fs_groups[idx].is_fs_mark {
let fan_fd = &self.fs_groups[idx].fan_fd;
if let Err(e) = mark_directory(fan_fd, path_mask, &canonical) {
eprintln!(
"[WARNING] Cannot inode-mark {} on fd {}: {:#}",
canonical.display(),
fan_fd.as_raw_fd(),
e
);
} else {
if recursive && canonical.is_dir() {
mark_recursive(fan_fd, path_mask, &canonical);
}
}
}
self.fs_groups[idx].ref_count += 1;
eprintln!(
"[INFO] Monitoring {} on existing fd {}",
canonical.display(),
self.fs_groups[idx].fan_fd.as_raw_fd()
);
idx
} else {
let new_fd = fanotify_init(
FAN_CLOEXEC
| FAN_NONBLOCK
| FAN_CLASS_NOTIF
| FAN_REPORT_FID
| FAN_REPORT_DIR_FID
| FAN_REPORT_NAME,
(libc::O_CLOEXEC | libc::O_RDONLY) as u32,
)
.with_context(|| {
format!(
"fanotify_init failed for {} (requires Linux 5.9+ kernel)",
canonical.display()
)
})?;
let is_fs_mark = match fanotify_mark(
&new_fd,
FAN_MARK_ADD | FAN_MARK_FILESYSTEM,
path_mask,
AT_FDCWD,
&canonical,
) {
Ok(()) => {
eprintln!(
"[INFO] Monitoring {} (fs mark) on new fd {}",
canonical.display(),
new_fd.as_raw_fd()
);
true
}
Err(FanotifyError::Mark(code)) if code == libc::EXDEV => {
match mark_directory(&new_fd, path_mask, &canonical) {
Ok(()) => {
eprintln!(
"[INFO] Monitoring {} (inode mark) on new fd {}",
canonical.display(),
new_fd.as_raw_fd()
);
if recursive && canonical.is_dir() {
mark_recursive(&new_fd, path_mask, &canonical);
}
false
}
Err(e) => {
eprintln!(
"[WARNING] Cannot monitor {} (inode mark): {:#}",
canonical.display(),
e
);
drop(new_fd);
bail!("Failed to mark {}: {:#}", canonical.display(), e);
}
}
}
Err(e) => {
eprintln!("[WARNING] Cannot monitor {}: {:#}", canonical.display(), e);
drop(new_fd);
bail!("Failed to mark {}: {:#}", canonical.display(), e);
}
};
let mount_fd = Self::open_dir(&canonical)?;
let idx = self.fs_groups.len();
self.fs_groups.push(FsGroup {
dev_id,
is_fs_mark,
fan_fd: new_fd,
mount_fd,
ref_count: 1,
});
self.spawn_fd_reader(idx);
idx
};
self.path_to_group.insert(path.clone(), group_idx);
self.paths.push(path.clone());
self.canonical_paths.push(canonical.clone());
self.path_options.insert(path.clone(), opts);
if canonical.is_dir()
&& let Some(ref cache) = self.shared_dir_cache
{
if recursive {
dir_cache::cache_recursive(cache.as_ref(), &canonical);
} else {
dir_cache::cache_dir_handle(cache.as_ref(), &canonical);
}
}
Ok(())
}
pub fn remove_path(&mut self, path: &Path) -> Result<()> {
let pos = self
.paths
.iter()
.position(|p| p == path)
.ok_or_else(|| anyhow::anyhow!("Path not being monitored: {}", path.display()))?;
let canonical = &self.canonical_paths[pos];
let opts = self
.path_options
.get(path)
.ok_or_else(|| anyhow::anyhow!("No options for path: {}", path.display()))?;
let path_mask = path_mask_from_options(opts);
if let Some(&gi) = self.path_to_group.get(path) {
let fan_fd = &self.fs_groups[gi].fan_fd;
let _ = fanotify_mark(
fan_fd,
FAN_MARK_REMOVE | FAN_MARK_FILESYSTEM,
path_mask,
AT_FDCWD,
canonical,
);
let _ = fanotify_mark(fan_fd, FAN_MARK_REMOVE, path_mask, AT_FDCWD, canonical);
self.fs_groups[gi].ref_count = self.fs_groups[gi].ref_count.saturating_sub(1);
if self.fs_groups[gi].ref_count == 0 {
self.fs_groups.remove(gi);
self.path_to_group.iter_mut().for_each(|(_, idx)| {
if *idx > gi {
*idx -= 1;
}
});
}
}
self.paths.remove(pos);
self.canonical_paths.remove(pos);
self.path_options.remove(path);
self.path_to_group.remove(path);
println!("Removed path: {}", path.display());
Ok(())
}
fn handle_socket_cmd(&mut self, cmd: SocketCmd) -> SocketResp {
match cmd.cmd.as_str() {
"add" => {
let raw = match &cmd.path {
Some(p) => p.clone(),
None => {
return SocketResp::err("Missing 'path' field");
}
};
let path = raw;
let _ = self.remove_path(&path);
let entry = PathEntry {
path,
recursive: cmd.recursive,
types: cmd.types.clone(),
size: cmd.size.clone(),
exclude: cmd.exclude.clone(),
exclude_cmd: cmd.exclude_cmd.clone(),
};
match self.add_path(&entry) {
Ok(()) => {
SocketResp::ok()
}
Err(e) => {
let msg = e.to_string();
if msg.contains("infinite recursion") || msg.contains("log directory") {
SocketResp::permanent_err(msg)
} else {
SocketResp::err(msg)
}
},
}
}
"remove" => {
let path = match &cmd.path {
Some(p) => p.clone(),
None => {
return SocketResp::err("Missing 'path' field");
}
};
match self.remove_path(&path) {
Ok(()) => {
SocketResp::ok()
}
Err(e) => {
let msg = e.to_string();
if msg.contains("infinite recursion") || msg.contains("log directory") {
SocketResp::permanent_err(msg)
} else {
SocketResp::err(msg)
}
},
}
}
"list" => {
let paths: Vec<PathEntry> = self
.paths
.iter()
.map(|p| {
let opts = self.path_options.get(p);
PathEntry {
path: p.clone(),
recursive: opts.map(|o| o.recursive),
types: opts.and_then(|o| {
o.event_types
.as_ref()
.map(|v| v.iter().map(|t| t.to_string()).collect())
}),
size: opts.and_then(|o| o.size_filter.map(|f| format!("{}{}", f.op, format_size(f.bytes)))),
exclude: opts.and_then(|o| {
o.exclude_regex.as_ref().map(|r| vec![r.as_str().to_string()])
}),
exclude_cmd: None,
}
})
.collect();
SocketResp {
ok: true,
error: None,
error_kind: None,
paths: Some(paths),
}
}
_ => SocketResp::err(format!("Unknown command: {}", cmd.cmd)),
}
}
fn reload_config(&mut self) -> Result<()> {
let managed_path = self
.managed_path
.as_ref()
.context("No store path configured")?;
let store = Managed::load(managed_path)?;
for entry in &store.entries {
if !self.path_options.contains_key(&entry.path)
&& let Err(e) = self.add_path(entry)
{
eprintln!("Failed to add path {} on reload: {e}", entry.path.display());
}
}
let current_paths: Vec<PathBuf> = self.paths.clone();
for path in ¤t_paths {
if !store.entries.iter().any(|p| p.path == *path)
&& let Err(e) = self.remove_path(path)
{
eprintln!("Failed to remove path {} on reload: {e}", path.display());
}
}
Ok(())
}
fn nearest_existing_ancestor(path: &Path) -> Option<PathBuf> {
let mut p = path.to_path_buf();
loop {
if p.is_dir() {
return Some(p);
}
if !p.pop() {
return None;
}
}
}
fn setup_inotify_watches(&mut self) {
use inotify::WatchMask;
self._inotify_watches.clear();
let inotify = match self.inotify.as_ref() {
Some(ino) => ino,
None => return,
};
for (path, _) in &self.pending_paths {
if let Some(parent) = Self::nearest_existing_ancestor(path)
&& let Ok(wd) = inotify.watches().add(
&parent,
WatchMask::CREATE | WatchMask::MOVED_TO,
)
{
self._inotify_watches.push(wd);
}
}
}
fn check_pending(&mut self) {
let mut i = 0;
while i < self.pending_paths.len() {
let (path, _) = &self.pending_paths[i];
if path.exists() {
let entry = self.pending_paths.swap_remove(i);
match self.add_path(&entry.1) {
Ok(()) => {
eprintln!(
"[INFO] Path '{}' now exists — monitoring started.",
entry.0.display()
);
}
Err(e) => {
eprintln!(
"[WARNING] Path '{}' exists but monitoring setup failed: {e}",
entry.0.display()
);
i += 1;
}
}
} else {
i += 1;
}
}
self.setup_inotify_watches();
}
fn should_output(&self, event: &FileEvent) -> bool {
let opts = self.get_matching_path_options(&event.path);
filters::should_output(opts, event)
}
fn matching_path(&self, path: &Path) -> Option<&PathBuf> {
filters::matching_path(&self.paths, &self.path_options, &self.canonical_paths, path)
}
fn write_event(&self, event: &FileEvent) -> std::io::Result<()> {
let log_dir = match self.log_dir.as_ref() {
Some(d) => d,
None => return Ok(()),
};
let matched_path = self.matching_path(&event.path)
.cloned()
.unwrap_or_else(|| event.path.clone());
let log_path = log_dir.join(crate::utils::path_to_log_name(&matched_path));
let is_new = !log_path.exists();
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)?;
if is_new {
match chown_to_user(&log_path) {
Ok(true) => {}
Ok(false) => {
}
Err(e) => {
eprintln!("[WARNING] Could not chown log file '{}': {}",
log_path.display(), e);
}
}
}
writeln!(file, "{}", event.to_jsonl_string())?;
Ok(())
}
fn is_path_in_scope(&self, path: &Path) -> bool {
filters::is_path_in_scope(&self.paths, &self.path_options, &self.canonical_paths, path)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use fanotify_fid::consts::{FAN_CREATE, FAN_DELETE, FAN_EVENT_ON_CHILD, FAN_MODIFY, FAN_ONDIR};
use crate::utils::{SizeFilter, SizeOp};
#[test]
fn test_mask_to_event_types_single() {
let types = mask_to_event_types(FAN_CREATE);
assert_eq!(types.len(), 1);
assert_eq!(types[0], EventType::Create);
}
#[test]
fn test_mask_to_event_types_multiple() {
let mask = FAN_CREATE | FAN_DELETE | FAN_MODIFY;
let types = mask_to_event_types(mask);
assert_eq!(types.len(), 3);
assert!(types.contains(&EventType::Create));
assert!(types.contains(&EventType::Delete));
assert!(types.contains(&EventType::Modify));
}
#[test]
fn test_mask_to_event_types_none() {
let types = mask_to_event_types(0);
assert!(types.is_empty());
}
#[test]
fn test_mask_to_event_types_all() {
use fanotify_fid::consts::{
FAN_ACCESS, FAN_ATTRIB, FAN_CLOSE_NOWRITE, FAN_CLOSE_WRITE,
FAN_DELETE_SELF, FAN_FS_ERROR, FAN_MOVE_SELF, FAN_MOVED_FROM, FAN_MOVED_TO,
FAN_OPEN, FAN_OPEN_EXEC,
};
let mask = FAN_ACCESS | FAN_MODIFY | FAN_CLOSE_WRITE | FAN_CLOSE_NOWRITE
| FAN_OPEN | FAN_OPEN_EXEC | FAN_ATTRIB | FAN_CREATE | FAN_DELETE
| FAN_DELETE_SELF | FAN_FS_ERROR | FAN_MOVED_FROM | FAN_MOVED_TO | FAN_MOVE_SELF;
let types = mask_to_event_types(mask);
assert_eq!(types.len(), 14);
}
#[test]
fn test_mask_to_event_types_with_flags() {
let mask = FAN_CREATE | FAN_EVENT_ON_CHILD | FAN_ONDIR;
let types = mask_to_event_types(mask);
assert_eq!(types.len(), 1);
assert_eq!(types[0], EventType::Create);
}
fn options(
size_filter: Option<SizeFilter>,
event_types: Option<Vec<EventType>>,
exclude: Option<&str>,
recursive: bool,
) -> PathOptions {
let exclude_regex = exclude.map(|p| {
regex::Regex::new(p).expect("invalid exclude pattern")
});
PathOptions {
size_filter,
event_types,
exclude_regex,
exclude_invert: false,
exclude_cmd_regex: None,
exclude_cmd_invert: false,
recursive,
}
}
fn make_monitor(
paths: Vec<&str>,
size_filter: Option<SizeFilter>,
event_types: Option<Vec<EventType>>,
exclude: Option<&str>,
recursive: bool,
) -> Monitor {
Monitor::new(
paths
.into_iter()
.map(|p| {
(
PathBuf::from(p),
options(
size_filter,
event_types.clone(),
exclude,
recursive,
),
)
})
.collect(),
None,
None,
None,
None,
)
.unwrap()
}
#[test]
fn test_should_output_no_filters() {
let m = make_monitor(vec!["/tmp"], None, None, None, false);
let event = make_event("/tmp/test.txt", EventType::Create, 1000, 1024);
assert!(m.should_output(&event));
}
#[test]
fn test_should_output_type_filter_match() {
let m = make_monitor(
vec!["/tmp"],
None,
Some(vec![EventType::Create, EventType::Delete]),
None,
false,
);
assert!(m.should_output(&make_event("/tmp/a", EventType::Create, 1, 0)));
assert!(m.should_output(&make_event("/tmp/a", EventType::Delete, 1, 0)));
assert!(!m.should_output(&make_event("/tmp/a", EventType::Modify, 1, 0)));
}
#[test]
fn test_should_output_size_filter() {
let m = make_monitor(vec!["/tmp"], Some(SizeFilter { op: SizeOp::Ge, bytes: 1000 }), None, None, false);
assert!(m.should_output(&make_event("/tmp/a", EventType::Create, 1, 2000)));
assert!(!m.should_output(&make_event("/tmp/a", EventType::Create, 1, 500)));
}
#[test]
fn test_should_output_exclude_pattern() {
let m = make_monitor(vec!["/tmp"], None, None, Some(".*\\.tmp$"), false);
assert!(!m.should_output(&make_event("/tmp/test.tmp", EventType::Create, 1, 0)));
assert!(!m.should_output(&make_event("/tmp/foo.tmp", EventType::Delete, 1, 0)));
}
#[test]
fn test_should_output_exclude_exact_pattern() {
let m = make_monitor(vec!["/tmp"], None, None, Some("test\\.tmp$"), false);
assert!(m.should_output(&make_event("/tmp/test.txt", EventType::Create, 1, 0)));
assert!(!m.should_output(&make_event("/tmp/test.tmp", EventType::Create, 1, 0)));
assert!(m.should_output(&make_event("/tmp/foo.tmp", EventType::Delete, 1, 0)));
assert!(m.should_output(&make_event("/tmp/testXtmp", EventType::Create, 1, 0)));
}
#[test]
fn test_should_output_combined_filters() {
let m = make_monitor(
vec!["/tmp"],
Some(SizeFilter { op: SizeOp::Ge, bytes: 100 }),
Some(vec![EventType::Create]),
Some(".*\\.log$"),
false,
);
assert!(m.should_output(&make_event("/tmp/data", EventType::Create, 1, 200)));
assert!(!m.should_output(&make_event("/tmp/data", EventType::Delete, 1, 200)));
assert!(!m.should_output(&make_event("/tmp/data", EventType::Create, 1, 50)));
assert!(!m.should_output(&make_event("/tmp/app.log", EventType::Create, 1, 200)));
}
#[test]
fn test_is_path_in_scope_recursive() {
let m = make_monitor(vec!["/tmp"], None, None, None, true);
assert!(m.is_path_in_scope(Path::new("/tmp")));
assert!(m.is_path_in_scope(Path::new("/tmp/sub")));
assert!(m.is_path_in_scope(Path::new("/tmp/sub/deep/file.txt")));
assert!(!m.is_path_in_scope(Path::new("/var/log")));
assert!(!m.is_path_in_scope(Path::new("/tmpfile")));
}
#[test]
fn test_is_path_in_scope_non_recursive() {
let m = make_monitor(vec!["/tmp"], None, None, None, false);
assert!(m.is_path_in_scope(Path::new("/tmp")));
assert!(m.is_path_in_scope(Path::new("/tmp/file.txt")));
assert!(!m.is_path_in_scope(Path::new("/tmp/sub/file.txt")));
assert!(!m.is_path_in_scope(Path::new("/var/log")));
}
#[test]
fn test_is_path_in_scope_multiple_paths() {
let m = make_monitor(vec!["/tmp", "/var/log"], None, None, None, true);
assert!(m.is_path_in_scope(Path::new("/tmp/file")));
assert!(m.is_path_in_scope(Path::new("/var/log/syslog")));
assert!(!m.is_path_in_scope(Path::new("/etc/passwd")));
}
#[test]
fn test_should_output_exclude_pipe_multiple() {
let m = make_monitor_exclude(Some(".*\\.tmp$|.*\\.log$"), None, false, false);
assert!(!m.should_output(&make_event("/tmp/a.tmp", EventType::Create, 1, 0)));
assert!(!m.should_output(&make_event("/tmp/a.log", EventType::Create, 1, 0)));
assert!(m.should_output(&make_event("/tmp/a.txt", EventType::Create, 1, 0)));
}
#[test]
fn test_should_output_exclude_invert() {
let m = make_monitor_exclude(Some("!.*\\.py$"), None, false, false);
assert!(m.should_output(&make_event("/tmp/main.py", EventType::Create, 1, 0)));
assert!(!m.should_output(&make_event("/tmp/main.rs", EventType::Create, 1, 0)));
assert!(!m.should_output(&make_event("/tmp/a.txt", EventType::Create, 1, 0)));
}
#[test]
fn test_should_output_exclude_cmd_basic() {
let m = make_monitor_exclude(None, Some("rsync"), false, false);
assert!(!m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 1, 0, "rsync")));
assert!(m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 2, 0, "nginx")));
}
#[test]
fn test_should_output_exclude_cmd_pipe() {
let m = make_monitor_exclude(None, Some("rsync|apt"), false, false);
assert!(!m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 1, 0, "rsync")));
assert!(!m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 2, 0, "apt")));
assert!(m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 3, 0, "nginx")));
}
#[test]
fn test_should_output_exclude_cmd_invert() {
let m = make_monitor_exclude(None, Some("!nginx"), false, false);
assert!(m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 1, 0, "nginx")));
assert!(!m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 2, 0, "rsync")));
assert!(!m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 3, 0, "apt")));
}
#[test]
fn test_should_output_exclude_cmd_invert_multi() {
let m = make_monitor_exclude(None, Some("!nginx|python"), false, false);
assert!(m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 1, 0, "nginx")));
assert!(m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 2, 0, "python")));
assert!(!m.should_output(&make_event_cmd("/tmp/a", EventType::Create, 3, 0, "rsync")));
}
#[test]
fn test_should_output_exclude_and_exclude_cmd() {
let m = make_monitor_exclude(Some(".*\\.tmp$"), Some("rsync"), false, false);
assert!(!m.should_output(&make_event_cmd("/tmp/a.tmp", EventType::Create, 1, 0, "vim")));
assert!(!m.should_output(&make_event_cmd("/tmp/a.txt", EventType::Create, 1, 0, "rsync")));
assert!(m.should_output(&make_event_cmd("/tmp/a.txt", EventType::Create, 2, 0, "vim")));
}
#[test]
fn test_file_size_cache_eviction() {
use lru::LruCache;
use std::num::NonZeroUsize;
let mut cache = LruCache::new(NonZeroUsize::new(3).unwrap());
cache.put(PathBuf::from("/a"), 100);
cache.put(PathBuf::from("/b"), 200);
cache.put(PathBuf::from("/c"), 300);
assert_eq!(cache.len(), 3);
cache.put(PathBuf::from("/d"), 400);
assert_eq!(cache.len(), 3);
assert!(cache.get(&PathBuf::from("/a")).is_none());
assert_eq!(cache.get(&PathBuf::from("/b")), Some(&200));
assert_eq!(cache.get(&PathBuf::from("/d")), Some(&400));
cache.get(&PathBuf::from("/b"));
cache.put(PathBuf::from("/e"), 500);
assert!(cache.get(&PathBuf::from("/c")).is_none());
assert_eq!(cache.get(&PathBuf::from("/b")), Some(&200));
}
#[test]
fn test_monitor_buffer_size_validation() {
let opts = options(None, None, None, false);
let result = Monitor::new(
vec![(PathBuf::from("/tmp"), opts.clone())],
None,
None,
Some(1024),
None,
);
assert!(result.is_err());
assert!(result.err().unwrap().to_string().contains("at least 4096"));
let result = Monitor::new(
vec![(PathBuf::from("/tmp"), opts.clone())],
None,
None,
Some(2 * 1024 * 1024),
None,
);
assert!(result.is_err());
assert!(result.err().unwrap().to_string().contains("not exceed"));
let result = Monitor::new(
vec![(PathBuf::from("/tmp"), opts.clone())],
None,
None,
Some(65536),
None,
);
assert!(result.is_ok());
}
#[test]
fn test_add_path_and_remove_path() {
let mut m = Monitor::new(vec![], None, None, None, None).unwrap();
let entry = PathEntry {
path: PathBuf::from("/tmp/test_add"),
recursive: Some(true),
types: None,
size: None,
exclude: None,
exclude_cmd: None,
};
let result = m.add_path(&entry);
assert!(result.is_ok());
assert!(m.pending_paths.iter().any(|(p, _)| p == Path::new("/tmp/test_add")));
assert!(!m.path_options.contains_key(Path::new("/tmp/test_add")));
let result = m.remove_path(Path::new("/nonexistent"));
assert!(result.is_err());
}
fn make_monitor_exclude(
exclude: Option<&str>,
exclude_cmd: Option<&str>,
_exclude_invert: bool,
_exclude_cmd_invert: bool,
) -> Monitor {
let (exclude_regex, exclude_invert) = match exclude {
Some(p) => {
let raw = p.strip_prefix('!').unwrap_or(p);
(Some(regex::Regex::new(raw).expect("invalid exclude pattern")), p.starts_with('!'))
}
None => (None, false),
};
let (exclude_cmd_regex, exclude_cmd_invert) = match exclude_cmd {
Some(p) => {
let raw = p.strip_prefix('!').unwrap_or(p);
(Some(regex::Regex::new(raw).expect("invalid exclude-cmd pattern")), p.starts_with('!'))
}
None => (None, false),
};
Monitor::new(
vec![(
PathBuf::from("/tmp"),
PathOptions {
size_filter: None,
event_types: None,
exclude_regex,
exclude_invert,
exclude_cmd_regex,
exclude_cmd_invert,
recursive: false,
},
)],
None,
None,
None,
None,
)
.unwrap()
}
fn make_event(path: &str, event_type: EventType, pid: u32, size: u64) -> FileEvent {
FileEvent {
time: Utc::now(),
event_type,
path: PathBuf::from(path),
pid,
cmd: "test".to_string(),
user: "root".to_string(),
file_size: size,
}
}
fn make_event_cmd(path: &str, event_type: EventType, pid: u32, size: u64, cmd: &str) -> FileEvent {
FileEvent {
time: Utc::now(),
event_type,
path: PathBuf::from(path),
pid,
cmd: cmd.to_string(),
user: "root".to_string(),
file_size: size,
}
}
#[test]
#[ignore]
fn test_fanotify_init() {
let fd = fanotify_init(
FAN_CLOEXEC
| FAN_NONBLOCK
| FAN_CLASS_NOTIF
| FAN_REPORT_FID
| FAN_REPORT_DIR_FID
| FAN_REPORT_NAME,
(libc::O_CLOEXEC | libc::O_RDONLY) as u32,
);
assert!(fd.is_ok(), "fanotify_init should succeed with root");
}
#[test]
#[ignore]
fn test_fanotify_mark_directory() {
let test_dir = std::env::temp_dir().join("fsmon_test_mark");
std::fs::create_dir_all(&test_dir).unwrap();
let fd = fanotify_init(
FAN_CLOEXEC
| FAN_NONBLOCK
| FAN_CLASS_NOTIF
| FAN_REPORT_FID
| FAN_REPORT_DIR_FID
| FAN_REPORT_NAME,
(libc::O_CLOEXEC | libc::O_RDONLY) as u32,
)
.unwrap();
let mask = FAN_CREATE | FAN_DELETE | FAN_CLOSE_WRITE;
let result = fanotify_mark(
&fd,
FAN_MARK_ADD | FAN_MARK_FILESYSTEM,
mask,
AT_FDCWD,
&test_dir,
);
assert!(
result.is_ok(),
"fanotify_mark should succeed on existing directory"
);
drop(fd);
let _ = std::fs::remove_dir_all(&test_dir);
}
#[test]
#[ignore]
fn test_fanotify_mark_nonexistent_path() {
let fd = fanotify_init(
FAN_CLOEXEC
| FAN_NONBLOCK
| FAN_CLASS_NOTIF
| FAN_REPORT_FID
| FAN_REPORT_DIR_FID
| FAN_REPORT_NAME,
(libc::O_CLOEXEC | libc::O_RDONLY) as u32,
)
.unwrap();
let mask = FAN_CREATE;
let result = fanotify_mark(
&fd,
FAN_MARK_ADD,
mask,
AT_FDCWD,
Path::new("/nonexistent_path_12345"),
);
assert!(
result.is_err(),
"fanotify_mark should fail on nonexistent path"
);
drop(fd);
}
#[test]
#[ignore]
fn test_monitor_run_captures_events() {
use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
let test_dir = std::env::temp_dir().join("fsmon_test_events");
std::fs::create_dir_all(&test_dir).unwrap();
let test_dir_for_cleanup = test_dir.clone();
let rt = tokio::runtime::Runtime::new().unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let test_dir_clone = test_dir.clone();
let handle = rt.spawn(async move {
let fd = fanotify_init(
FAN_CLOEXEC
| FAN_NONBLOCK
| FAN_CLASS_NOTIF
| FAN_REPORT_FID
| FAN_REPORT_DIR_FID
| FAN_REPORT_NAME,
(libc::O_CLOEXEC | libc::O_RDONLY) as u32,
)
.unwrap();
let mask = FAN_CREATE | FAN_CLOSE_WRITE | FAN_EVENT_ON_CHILD | FAN_ONDIR;
fanotify_mark(
&fd,
FAN_MARK_ADD | FAN_MARK_FILESYSTEM,
mask,
AT_FDCWD,
&test_dir_clone,
)
.unwrap();
let mut buf = vec![0u8; 4096];
let start = std::time::Instant::now();
while start.elapsed() < std::time::Duration::from_millis(200) {
if let Ok(events) = fanotify_fid::read::read_fid_events(
&fd, &[], &mut buf, None,
) {
if !events.is_empty() {
counter_clone.fetch_add(events.len(), Ordering::SeqCst);
}
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
drop(fd);
});
std::thread::sleep(std::time::Duration::from_millis(50));
for i in 0..3 {
let path = test_dir.join(format!("test_{}.txt", i));
let mut f = std::fs::File::create(&path).unwrap();
writeln!(f, "content {}", i).unwrap();
}
rt.block_on(handle).unwrap();
let events_captured = counter.load(Ordering::SeqCst);
assert!(
events_captured > 0,
"Should capture at least some events, got {}",
events_captured
);
let _ = std::fs::remove_dir_all(&test_dir_for_cleanup);
}
#[test]
fn test_build_exclude_regex_none() {
let (re, inv) = filters::build_exclude_regex(None, "exclude").unwrap();
assert!(re.is_none());
assert!(!inv);
}
#[test]
fn test_build_exclude_regex_empty() {
let (re, inv) = filters::build_exclude_regex(Some(&[]), "exclude").unwrap();
assert!(re.is_none());
assert!(!inv);
}
#[test]
fn test_build_exclude_regex_single_pattern() {
let patterns = vec![".*\\.tmp$".to_string()];
let (re, inv) = filters::build_exclude_regex(Some(&patterns), "exclude").unwrap();
assert!(re.is_some());
assert!(!inv);
assert!(re.as_ref().unwrap().is_match("foo.tmp"));
assert!(!re.as_ref().unwrap().is_match("foo.txt"));
}
#[test]
fn test_build_exclude_regex_multiple_patterns() {
let patterns = vec![".*\\.tmp$".to_string(), ".*\\.log$".to_string()];
let (re, inv) = filters::build_exclude_regex(Some(&patterns), "exclude").unwrap();
assert!(re.is_some());
assert!(!inv);
assert!(re.as_ref().unwrap().is_match("foo.tmp"));
assert!(re.as_ref().unwrap().is_match("bar.log"));
assert!(!re.as_ref().unwrap().is_match("foo.txt"));
}
#[test]
fn test_build_exclude_regex_invert() {
let patterns = vec!["!.*\\.py$".to_string()];
let (re, inv) = filters::build_exclude_regex(Some(&patterns), "exclude").unwrap();
assert!(re.is_some());
assert!(inv);
assert!(re.as_ref().unwrap().is_match("foo.py"));
assert!(!re.as_ref().unwrap().is_match("foo.tmp"));
}
#[test]
fn test_build_exclude_regex_cmd() {
let patterns = vec!["rsync".to_string(), "apt".to_string()];
let (re, inv) = filters::build_exclude_regex(Some(&patterns), "--exclude-cmd").unwrap();
assert!(re.is_some());
assert!(!inv);
assert!(re.as_ref().unwrap().is_match("rsync"));
assert!(re.as_ref().unwrap().is_match("apt"));
assert!(!re.as_ref().unwrap().is_match("nginx"));
}
#[test]
fn test_build_exclude_regex_cmd_wildcard() {
let patterns = vec!["nginx.*".to_string()];
let (re, _inv) = filters::build_exclude_regex(Some(&patterns), "--exclude-cmd").unwrap();
assert!(re.is_some());
assert!(re.as_ref().unwrap().is_match("nginx"));
assert!(re.as_ref().unwrap().is_match("nginx-worker"));
assert!(!re.as_ref().unwrap().is_match("apache"));
}
}