use std::path::{Path, PathBuf};
use std::sync::Arc;
use notify::{Event, EventKind, RecursiveMode, Watcher};
use tokio::sync::mpsc;
use tracing::{error, info, warn};
use crate::queue_manager::QueueManager;
pub struct DirWatcher {
watch_dir: PathBuf,
queue_manager: Arc<QueueManager>,
}
impl DirWatcher {
pub fn new(watch_dir: PathBuf, queue_manager: Arc<QueueManager>) -> Self {
Self {
watch_dir,
queue_manager,
}
}
pub async fn run(self) {
info!(dir = %self.watch_dir.display(), "Starting directory watcher");
if let Err(e) = std::fs::create_dir_all(&self.watch_dir) {
error!(error = %e, "Failed to create watch directory");
return;
}
self.process_existing_files().await;
let (tx, mut rx) = mpsc::channel(100);
let _watcher = {
let tx = tx.clone();
let mut watcher =
notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
if let Ok(event) = res {
let _ = tx.blocking_send(event);
}
})
.expect("Failed to create file watcher");
watcher
.watch(&self.watch_dir, RecursiveMode::NonRecursive)
.expect("Failed to watch directory");
watcher };
while let Some(event) = rx.recv().await {
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => {
for path in &event.paths {
if self.is_nzb_file(path) {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
self.process_file(path).await;
}
}
}
_ => {}
}
}
}
fn is_nzb_file(&self, path: &Path) -> bool {
path.extension().is_some_and(|ext| ext == "nzb")
|| path.to_str().is_some_and(|s| s.ends_with(".nzb.gz"))
}
async fn process_existing_files(&self) {
let entries = match std::fs::read_dir(&self.watch_dir) {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "Failed to read watch directory");
return;
}
};
for entry in entries.flatten() {
let path = entry.path();
if self.is_nzb_file(&path) {
self.process_file(&path).await;
}
}
}
async fn process_file(&self, path: &Path) {
info!(file = %path.display(), "Processing NZB from watch directory");
let data = match std::fs::read(path) {
Ok(d) => d,
Err(e) => {
warn!(error = %e, file = %path.display(), "Failed to read NZB file");
return;
}
};
let name = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
match crate::nzb_core::nzb_parser::parse_nzb(&name, &data) {
Ok(mut job) => {
job.work_dir = self.queue_manager.incomplete_dir().join(&job.id);
job.output_dir = self.queue_manager.complete_dir().join(&job.name);
if let Err(e) = std::fs::create_dir_all(&job.work_dir) {
error!(error = %e, "Failed to create work directory");
return;
}
info!(name = %job.name, id = %job.id, "Auto-enqueuing NZB from watch dir");
if let Err(e) = self.queue_manager.add_job(job, Some(data)) {
error!(error = %e, "Failed to enqueue NZB");
return;
}
let processed_dir = self.watch_dir.join("processed");
let _ = std::fs::create_dir_all(&processed_dir);
let dest = processed_dir.join(path.file_name().unwrap_or_default());
if let Err(_e) = std::fs::rename(path, &dest) {
if let Err(e2) =
std::fs::copy(path, &dest).and_then(|_| std::fs::remove_file(path))
{
warn!(error = %e2, "Failed to move processed NZB file");
}
}
}
Err(e) => {
warn!(error = %e, file = %path.display(), "Failed to parse NZB from watch dir");
}
}
}
}