use crate::UsenetDownloader;
use crate::config::{WatchFolderAction, WatchFolderConfig};
use crate::error::{Error, Result};
use crate::types::DownloadOptions;
use notify::{
Config as NotifyConfig, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
pub struct FolderWatcher {
watcher: RecommendedWatcher,
rx: mpsc::UnboundedReceiver<notify::Result<Event>>,
downloader: Arc<UsenetDownloader>,
configs: Vec<WatchFolderConfig>,
}
impl FolderWatcher {
pub fn new(downloader: Arc<UsenetDownloader>, configs: Vec<WatchFolderConfig>) -> Result<Self> {
let (tx, rx) = mpsc::unbounded_channel();
let watcher = RecommendedWatcher::new(
move |res| {
if let Err(e) = tx.send(res) {
error!("Failed to send filesystem event: {}", e);
}
},
NotifyConfig::default(),
)
.map_err(|e| Error::FolderWatch(e.to_string()))?;
Ok(Self {
watcher,
rx,
downloader,
configs,
})
}
pub fn start(&mut self) -> Result<()> {
for config in &self.configs {
if !config.path.exists() {
std::fs::create_dir_all(&config.path).map_err(|e| {
Error::FolderWatch(format!("Failed to create watch folder: {}", e))
})?;
info!("Created watch folder: {}", config.path.display());
}
self.watcher
.watch(&config.path, RecursiveMode::NonRecursive)
.map_err(|e| Error::FolderWatch(format!("Failed to watch folder: {}", e)))?;
info!(
"Watching folder: {} (category: {:?})",
config.path.display(),
config.category.as_deref().unwrap_or("default")
);
}
Ok(())
}
pub async fn run(mut self) {
info!("Folder watcher started");
while let Some(result) = self.rx.recv().await {
match result {
Ok(event) => {
if let Err(e) = self.handle_event(event).await {
error!("Error handling folder event: {}", e);
}
}
Err(e) => {
error!("Filesystem watcher error: {}", e);
}
}
}
info!("Folder watcher stopped");
}
pub fn stop(self) {
drop(self.watcher);
info!("Folder watcher stopped");
}
async fn handle_event(&self, event: Event) -> Result<()> {
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => {
for path in event.paths {
if self.is_nzb_file(&path) {
self.process_nzb_file(&path).await?;
}
}
}
_ => {
}
}
Ok(())
}
fn is_nzb_file(&self, path: &Path) -> bool {
path.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.eq_ignore_ascii_case("nzb"))
.unwrap_or(false)
}
async fn process_nzb_file(&self, path: &Path) -> Result<()> {
debug!("Processing NZB file: {}", path.display());
let config = self.find_config_for_path(path)?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let options = DownloadOptions {
category: config.category.clone(),
..Default::default()
};
match self.downloader.add_nzb(path, options).await {
Ok(id) => {
info!(
"Added NZB from watch folder: {} (download_id: {}, category: {:?})",
path.display(),
id,
config.category.as_deref().unwrap_or("default")
);
if let Err(e) = self.handle_after_import(path, config).await {
error!(
"Failed to handle after_import action for {}: {}",
path.display(),
e
);
}
}
Err(e) => {
error!(
"Failed to add NZB from watch folder {}: {}",
path.display(),
e
);
return Err(e);
}
}
Ok(())
}
fn find_config_for_path(&self, path: &Path) -> Result<&WatchFolderConfig> {
let parent = path
.parent()
.ok_or_else(|| Error::FolderWatch("File has no parent directory".to_string()))?;
self.configs
.iter()
.find(|c| c.path == parent)
.ok_or_else(|| {
Error::FolderWatch(format!(
"No watch folder config found for: {}",
parent.display()
))
})
}
async fn handle_after_import(&self, path: &Path, config: &WatchFolderConfig) -> Result<()> {
match config.after_import {
WatchFolderAction::Delete => {
debug!("Deleting NZB file: {}", path.display());
tokio::fs::remove_file(path)
.await
.map_err(|e| Error::FolderWatch(format!("Failed to delete file: {}", e)))?;
info!("Deleted processed NZB: {}", path.display());
}
WatchFolderAction::MoveToProcessed => {
let parent = path.parent().ok_or_else(|| {
Error::FolderWatch("File has no parent directory".to_string())
})?;
let processed_dir = parent.join("processed");
if !processed_dir.exists() {
tokio::fs::create_dir(&processed_dir).await.map_err(|e| {
Error::FolderWatch(format!("Failed to create processed directory: {}", e))
})?;
}
let dest = processed_dir.join(
path.file_name()
.ok_or_else(|| Error::FolderWatch("File has no filename".to_string()))?,
);
debug!("Moving NZB file: {} -> {}", path.display(), dest.display());
tokio::fs::rename(path, &dest)
.await
.map_err(|e| Error::FolderWatch(format!("Failed to move file: {}", e)))?;
info!("Moved processed NZB to: {}", dest.display());
}
WatchFolderAction::Keep => {
debug!("Keeping NZB file in place: {}", path.display());
if let Err(e) = self.downloader.mark_nzb_processed(path).await {
warn!("Failed to mark NZB as processed in database: {}", e);
}
}
}
Ok(())
}
}
#[allow(clippy::unwrap_used, clippy::expect_used)]
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{Config, WatchFolderAction, WatchFolderConfig};
use tempfile::TempDir;
use tokio::time::{Duration, sleep};
async fn create_test_downloader() -> Arc<UsenetDownloader> {
let (downloader, _temp_dir) =
crate::downloader::test_helpers::create_test_downloader().await;
Arc::new(downloader)
}
#[tokio::test]
async fn test_is_nzb_file() {
let downloader = create_test_downloader().await;
let watcher = FolderWatcher::new(downloader, vec![]).unwrap();
assert!(watcher.is_nzb_file(Path::new("test.nzb")));
assert!(watcher.is_nzb_file(Path::new("test.NZB")));
assert!(watcher.is_nzb_file(Path::new("/path/to/file.nzb")));
assert!(!watcher.is_nzb_file(Path::new("test.txt")));
assert!(!watcher.is_nzb_file(Path::new("test")));
assert!(!watcher.is_nzb_file(Path::new("test.zip")));
}
#[tokio::test]
async fn test_folder_watcher_start() {
let downloader = create_test_downloader().await;
let temp_dir = TempDir::new().unwrap();
let watch_path = temp_dir.path().join("watch");
let config = WatchFolderConfig {
path: watch_path.clone(),
after_import: WatchFolderAction::Delete,
category: Some("test".to_string()),
scan_interval: Duration::from_secs(5),
};
let mut watcher = FolderWatcher::new(downloader, vec![config]).unwrap();
assert!(!watch_path.exists());
watcher.start().unwrap();
assert!(watch_path.exists());
}
#[tokio::test]
async fn test_find_config_for_path() {
let downloader = create_test_downloader().await;
let temp_dir = TempDir::new().unwrap();
let watch_path = temp_dir.path().join("watch");
std::fs::create_dir_all(&watch_path).unwrap();
let config = WatchFolderConfig {
path: watch_path.clone(),
after_import: WatchFolderAction::Delete,
category: Some("test".to_string()),
scan_interval: Duration::from_secs(5),
};
let watcher = FolderWatcher::new(downloader, vec![config]).unwrap();
let test_file = watch_path.join("test.nzb");
let found_config = watcher.find_config_for_path(&test_file).unwrap();
assert_eq!(found_config.path, watch_path);
assert_eq!(found_config.category.as_deref(), Some("test"));
}
const TEST_NZB: &str = r#"<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE nzb PUBLIC "-//newzBin//DTD NZB 1.1//EN" "http://www.newzbin.com/DTD/nzb/nzb-1.1.dtd">
<nzb xmlns="http://www.newzbin.com/DTD/2003/nzb">
<file poster="test@example.com" date="1234567890" subject="test file">
<groups><group>alt.binaries.test</group></groups>
<segments>
<segment bytes="1024" number="1">test-msg-id@example.com</segment>
</segments>
</file>
</nzb>"#;
async fn create_watcher_with_watch_dir() -> (
FolderWatcher,
Arc<UsenetDownloader>,
std::path::PathBuf,
TempDir,
) {
let temp_dir = TempDir::new().unwrap();
let watch_path = temp_dir.path().join("watch");
std::fs::create_dir_all(&watch_path).unwrap();
let mut config = Config::default();
config.persistence.database_path = temp_dir.path().join("test.db");
config.download.download_dir = temp_dir.path().join("downloads");
config.download.temp_dir = temp_dir.path().join("temp");
let downloader = Arc::new(UsenetDownloader::new(config).await.unwrap());
let watch_config = WatchFolderConfig {
path: watch_path.clone(),
after_import: WatchFolderAction::Keep,
category: Some("test-cat".to_string()),
scan_interval: Duration::from_secs(5),
};
let watcher = FolderWatcher::new(downloader.clone(), vec![watch_config]).unwrap();
(watcher, downloader, watch_path, temp_dir)
}
#[tokio::test]
async fn handle_event_create_nzb_triggers_processing() {
let (watcher, downloader, watch_path, _temp_dir) = create_watcher_with_watch_dir().await;
let nzb_path = watch_path.join("movie.nzb");
std::fs::write(&nzb_path, TEST_NZB).unwrap();
let event = Event {
kind: EventKind::Create(notify::event::CreateKind::File),
paths: vec![nzb_path],
attrs: Default::default(),
};
watcher.handle_event(event).await.unwrap();
let downloads = downloader.db.list_downloads().await.unwrap();
assert_eq!(
downloads.len(),
1,
"Create event for .nzb file should add it to the download queue"
);
assert_eq!(
downloads[0].category.as_deref(),
Some("test-cat"),
"download should inherit the watch folder category"
);
}
#[tokio::test]
async fn handle_event_create_txt_file_is_ignored() {
let (watcher, downloader, watch_path, _temp_dir) = create_watcher_with_watch_dir().await;
let txt_path = watch_path.join("readme.txt");
std::fs::write(&txt_path, "hello world").unwrap();
let event = Event {
kind: EventKind::Create(notify::event::CreateKind::File),
paths: vec![txt_path],
attrs: Default::default(),
};
watcher.handle_event(event).await.unwrap();
let downloads = downloader.db.list_downloads().await.unwrap();
assert_eq!(
downloads.len(),
0,
"Create event for non-.nzb file should be ignored"
);
}
#[tokio::test]
async fn handle_event_modify_nzb_triggers_processing() {
let (watcher, downloader, watch_path, _temp_dir) = create_watcher_with_watch_dir().await;
let nzb_path = watch_path.join("show.nzb");
std::fs::write(&nzb_path, TEST_NZB).unwrap();
let event = Event {
kind: EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
paths: vec![nzb_path],
attrs: Default::default(),
};
watcher.handle_event(event).await.unwrap();
let downloads = downloader.db.list_downloads().await.unwrap();
assert_eq!(
downloads.len(),
1,
"Modify event for .nzb file should trigger processing"
);
}
#[tokio::test]
async fn handle_event_remove_nzb_is_ignored() {
let (watcher, downloader, watch_path, _temp_dir) = create_watcher_with_watch_dir().await;
let nzb_path = watch_path.join("deleted.nzb");
let event = Event {
kind: EventKind::Remove(notify::event::RemoveKind::File),
paths: vec![nzb_path],
attrs: Default::default(),
};
watcher.handle_event(event).await.unwrap();
let downloads = downloader.db.list_downloads().await.unwrap();
assert_eq!(
downloads.len(),
0,
"Remove events should be ignored — only Create and Modify trigger processing"
);
}
#[tokio::test]
async fn test_folder_watching_with_file_creation() {
let temp_dir = TempDir::new().unwrap();
let watch_path = temp_dir.path().join("watch");
std::fs::create_dir_all(&watch_path).unwrap();
let mut config = Config::default();
config.persistence.database_path = temp_dir.path().join("test.db");
config.download.download_dir = temp_dir.path().join("downloads");
config.download.temp_dir = temp_dir.path().join("temp");
let downloader = Arc::new(UsenetDownloader::new(config).await.unwrap());
let watch_config = WatchFolderConfig {
path: watch_path.clone(),
after_import: WatchFolderAction::Delete,
category: Some("movies".to_string()),
scan_interval: Duration::from_secs(1),
};
let mut watcher = FolderWatcher::new(downloader.clone(), vec![watch_config]).unwrap();
watcher.start().unwrap();
let watcher_handle = tokio::spawn(async move {
watcher.run().await;
});
sleep(Duration::from_millis(100)).await;
let nzb_content = r#"<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE nzb PUBLIC "-//newzBin//DTD NZB 1.1//EN" "http://www.newzbin.com/DTD/nzb/nzb-1.1.dtd">
<nzb xmlns="http://www.newzbin.com/DTD/2003/nzb">
<file poster="test@example.com" date="1234567890" subject="test file">
<groups>
<group>alt.binaries.test</group>
</groups>
<segments>
<segment bytes="1024" number="1">test-message-id@example.com</segment>
</segments>
</file>
</nzb>"#;
let nzb_path = watch_path.join("test_movie.nzb");
std::fs::write(&nzb_path, nzb_content).unwrap();
sleep(Duration::from_millis(500)).await;
assert!(
!nzb_path.exists(),
"NZB file should have been deleted after import"
);
let downloads = downloader.db.list_downloads().await.unwrap();
assert_eq!(downloads.len(), 1, "Expected 1 download in queue");
let download = &downloads[0];
assert_eq!(download.category.as_deref(), Some("movies"));
assert!(download.name.contains("test_movie") || download.name.contains("test file"));
watcher_handle.abort();
let _ = watcher_handle.await;
}
}