use std::io::Write;
use std::path::Path;
use std::sync::mpsc;
use std::time::Duration;
use notify::{PollWatcher, RecursiveMode, Watcher};
use sled::Db;
use crate::colours;
use crate::config::Config;
use crate::error::AppError;
use crate::ingest;
use crate::search;
pub fn run(config: &Config, db: &Db, index: &tantivy::Index) -> Result<(), AppError> {
let screenshots_dir = &config.paths.screenshots;
if !screenshots_dir.exists() {
return Err(AppError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!(
"Screenshots directory does not exist: {}",
screenshots_dir.display()
),
)));
}
let canonical =
std::fs::canonicalize(screenshots_dir).unwrap_or_else(|_| screenshots_dir.clone());
colours::info(&format!(" Resolved watch path: {}", canonical.display()));
let (tx, rx) = mpsc::channel();
let poll_config = notify::Config::default().with_poll_interval(Duration::from_secs(2));
let mut watcher = PollWatcher::new(
move |res: Result<notify::Event, notify::Error>| match res {
Ok(event) => {
if let Err(e) = tx.send(event) {
eprintln!(" [poll-watcher] channel send failed: {}", e);
}
}
Err(e) => {
eprintln!(" [poll-watcher] β error: {}", e);
}
},
poll_config,
)
.map_err(|e| AppError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
watcher
.watch(&canonical, RecursiveMode::Recursive)
.map_err(|e| AppError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
colours::success(&format!(
"π Watching {} for new screenshots (Ctrl-C to stop)",
canonical.display()
));
let _ = std::io::stdout().flush();
let mut tantivy_writer = search::writer(index).map_err(|e| AppError::Search(e.to_string()))?;
loop {
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(event) => {
let kind_label = format!("{:?}", event.kind);
for p in &event.paths {
colours::info(&format!(" π‘ {} β {}", kind_label, p.display()));
}
let _ = std::io::stdout().flush();
let dominated_event = matches!(
event.kind,
notify::EventKind::Create(_) | notify::EventKind::Modify(_)
);
let mut paths: Vec<std::path::PathBuf> = if dominated_event {
event
.paths
.into_iter()
.filter(|p| ingest::is_png(p))
.collect()
} else {
Vec::new()
};
let debounce = Duration::from_millis(1500);
while let Ok(extra) = rx.recv_timeout(debounce) {
for p in &extra.paths {
colours::info(&format!(" π‘ {:?} β {}", extra.kind, p.display()));
}
if matches!(
extra.kind,
notify::EventKind::Create(_) | notify::EventKind::Modify(_)
) {
paths.extend(extra.paths.into_iter().filter(|p| ingest::is_png(p)));
}
}
paths.sort();
paths.dedup();
if paths.is_empty() {
colours::info(" β³ no new PNG files in this batch, skipping");
let _ = std::io::stdout().flush();
continue;
}
colours::info(&format!(" β³ processing {} PNG file(s)β¦", paths.len()));
let _ = std::io::stdout().flush();
for path in &paths {
process_and_commit(path, config, db, &mut tantivy_writer);
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {
colours::info(" π still watchingβ¦ (no events in the last 30s)");
let _ = std::io::stdout().flush();
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
colours::warn("File watcher stopped unexpectedly (channel closed).");
break;
}
}
}
Ok(())
}
fn process_and_commit(
path: &Path,
config: &Config,
db: &Db,
tantivy_writer: &mut tantivy::IndexWriter,
) {
match ingest::process_single_file(path, config, db, tantivy_writer) {
Ok(()) => {
if let Err(e) = tantivy_writer.commit() {
colours::warn(&format!(" β Tantivy commit failed: {}", e));
}
}
Err(e) => {
colours::warn(&format!(" β {}: {}", path.display(), e));
}
}
}