use crate::{Error, Result, ScanConfig, ScanEntry, Scanner};
use globset::GlobSet;
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{Receiver, Sender};
use std::time::SystemTime;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ScanEvent {
Initial(ScanEntry),
InitialComplete,
Created(ScanEntry),
Modified(ScanEntry),
Deleted(PathBuf),
}
pub struct ScanStream {
rx: Receiver<ScanEvent>,
_watcher: RecommendedWatcher,
}
impl ScanStream {
pub fn try_next(
&self,
) -> std::result::Result<Option<ScanEvent>, std::sync::mpsc::TryRecvError> {
match self.rx.try_recv() {
Ok(ev) => Ok(Some(ev)),
Err(std::sync::mpsc::TryRecvError::Empty) => Ok(None),
Err(e @ std::sync::mpsc::TryRecvError::Disconnected) => Err(e),
}
}
}
impl Iterator for ScanStream {
type Item = ScanEvent;
fn next(&mut self) -> Option<Self::Item> {
self.rx.recv().ok()
}
}
impl Scanner {
#[cfg(feature = "watch")]
pub fn scan<P: AsRef<Path>>(&self, root: P) -> Result<ScanStream> {
let root = root.as_ref().to_path_buf();
let (tx, rx) = std::sync::mpsc::channel();
let walk_config = self.config().clone();
let walk_root = root.clone();
let walk_tx = tx.clone();
std::thread::Builder::new()
.name("scankit-initial-walk".into())
.spawn(move || initial_walk(walk_config, walk_root, walk_tx))
.map_err(|e| Error::Watch(format!("could not spawn initial-walk thread: {e}")))?;
let excludes = self.excludes_for_watch().clone();
let max_size = self.config().max_file_size_bytes;
let mut watcher = notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
if let Ok(event) = res {
handle_notify_event(event, &excludes, max_size, &tx);
}
})
.map_err(|e| Error::Watch(format!("notify init failed: {e}")))?;
watcher
.watch(&root, RecursiveMode::Recursive)
.map_err(|e| Error::Watch(format!("could not watch {}: {e}", root.display())))?;
Ok(ScanStream {
rx,
_watcher: watcher,
})
}
}
#[allow(clippy::needless_pass_by_value)]
fn initial_walk(config: ScanConfig, root: PathBuf, tx: Sender<ScanEvent>) {
let Ok(scanner) = Scanner::new(config) else {
return;
};
for entry in scanner.walk(&root).flatten() {
if tx.send(ScanEvent::Initial(entry)).is_err() {
return;
}
}
let _ = tx.send(ScanEvent::InitialComplete);
}
fn handle_notify_event(
event: notify::Event,
excludes: &GlobSet,
max_size: Option<u64>,
tx: &Sender<ScanEvent>,
) {
let kind = event.kind;
for path in event.paths {
if excludes.is_match(&path) {
continue;
}
match kind {
EventKind::Create(_) => {
if let Some(entry) = entry_from_path(&path, max_size) {
let _ = tx.send(ScanEvent::Created(entry));
}
}
EventKind::Modify(_) => {
if let Some(entry) = entry_from_path(&path, max_size) {
let _ = tx.send(ScanEvent::Modified(entry));
}
}
EventKind::Remove(_) => {
let _ = tx.send(ScanEvent::Deleted(path));
}
_ => {}
}
}
}
fn entry_from_path(path: &Path, max_size: Option<u64>) -> Option<ScanEntry> {
let metadata = std::fs::metadata(path).ok()?;
if !metadata.is_file() {
return None;
}
let size_bytes = metadata.len();
if let Some(cap) = max_size {
if size_bytes > cap {
return None;
}
}
let modified: Option<SystemTime> = metadata.modified().ok();
let extension = path
.extension()
.and_then(|os| os.to_str())
.map(str::to_ascii_lowercase)
.unwrap_or_default();
Some(ScanEntry {
path: path.to_path_buf(),
size_bytes,
modified,
extension,
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::time::{Duration, Instant};
fn drain_initial(stream: &mut ScanStream, max_wait: Duration) -> Vec<ScanEvent> {
let deadline = Instant::now() + max_wait;
let mut events = Vec::new();
while Instant::now() < deadline {
match stream.try_next() {
Ok(Some(ev @ ScanEvent::InitialComplete)) => {
events.push(ev);
return events;
}
Ok(Some(ev)) => events.push(ev),
Ok(None) => std::thread::sleep(Duration::from_millis(20)),
Err(_) => return events,
}
}
events
}
#[test]
fn initial_walk_emits_existing_files_then_marker() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("a.txt"), "hi").unwrap();
fs::write(dir.path().join("b.txt"), "yo").unwrap();
let scanner = Scanner::new(ScanConfig::default()).unwrap();
let mut stream = scanner.scan(dir.path()).expect("watch start");
let events = drain_initial(&mut stream, Duration::from_secs(2));
let initials: Vec<_> = events
.iter()
.filter_map(|e| match e {
ScanEvent::Initial(entry) => Some(entry.path.clone()),
_ => None,
})
.collect();
assert_eq!(initials.len(), 2, "got {events:?}");
assert!(matches!(events.last(), Some(ScanEvent::InitialComplete)));
}
#[test]
fn live_creates_emit_after_initial_complete() {
let dir = tempfile::tempdir().unwrap();
let scanner = Scanner::new(ScanConfig::default()).unwrap();
let mut stream = scanner.scan(dir.path()).expect("watch start");
let _initial = drain_initial(&mut stream, Duration::from_secs(2));
fs::write(dir.path().join("new.txt"), "fresh").unwrap();
let deadline = Instant::now() + Duration::from_secs(5);
let mut saw_created = false;
while Instant::now() < deadline {
match stream.try_next() {
Ok(Some(ScanEvent::Created(entry))) => {
assert!(entry.path.ends_with("new.txt"));
saw_created = true;
break;
}
Ok(Some(_other)) => {} Ok(None) => std::thread::sleep(Duration::from_millis(20)),
Err(_) => break,
}
}
assert!(
saw_created,
"did not see Created event for new.txt within 5s"
);
}
#[test]
fn dropping_stream_stops_the_watcher() {
let dir = tempfile::tempdir().unwrap();
let scanner = Scanner::new(ScanConfig::default()).unwrap();
let stream = scanner.scan(dir.path()).expect("watch start");
drop(stream);
fs::write(dir.path().join("post.txt"), "ignored").unwrap();
}
#[test]
fn excludes_apply_to_initial_and_live_events() {
let dir = tempfile::tempdir().unwrap();
fs::create_dir_all(dir.path().join(".git")).unwrap();
fs::write(dir.path().join(".git/HEAD"), "ref").unwrap();
fs::write(dir.path().join("normal.txt"), "ok").unwrap();
let scanner =
Scanner::new(ScanConfig::default().add_exclude("**/.git/**").unwrap()).unwrap();
let mut stream = scanner.scan(dir.path()).expect("watch start");
let events = drain_initial(&mut stream, Duration::from_secs(2));
let paths: Vec<_> = events
.iter()
.filter_map(|e| match e {
ScanEvent::Initial(entry) => Some(entry.path.to_string_lossy().into_owned()),
_ => None,
})
.collect();
assert!(!paths.iter().any(|p| p.contains("/.git/")), "got {paths:?}");
assert!(paths.iter().any(|p| p.ends_with("normal.txt")));
}
}