use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use notify_debouncer_full::{
Debouncer, RecommendedCache,
notify::{EventKind, RecommendedWatcher, RecursiveMode},
new_debouncer,
};
use tokio::sync::{Notify, Semaphore, mpsc, oneshot, watch};
use tracing::{debug, info, warn};
use crate::session::{AddTorrentParams, SessionHandle};
use crate::settings::Settings;
const IMPORT_CONCURRENCY: usize = 4;
const RETRY_BACKOFFS: [Duration; 3] = [
Duration::from_millis(100),
Duration::from_millis(200),
Duration::from_millis(500),
];
#[must_use]
pub fn is_torrent_file(path: &Path) -> bool {
path.extension()
.is_some_and(|e| e.eq_ignore_ascii_case("torrent"))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ImportOutcome {
Added,
Duplicate,
Malformed,
Skipped,
}
async fn read_and_validate(path: &Path) -> Result<Vec<u8>, ()> {
for (attempt, backoff) in std::iter::once(Duration::ZERO)
.chain(RETRY_BACKOFFS.iter().copied())
.enumerate()
{
if backoff != Duration::ZERO {
tokio::time::sleep(backoff).await;
}
let bytes = match tokio::fs::read(path).await {
Ok(b) => b,
Err(e) => {
debug!(
path = %path.display(),
attempt,
error = %e,
"watched folder: read attempt failed"
);
continue;
}
};
if bytes.is_empty() || bytes[0] != b'd' {
debug!(
path = %path.display(),
attempt,
len = bytes.len(),
"watched folder: bytes don't start with bencode dict; retry"
);
continue;
}
if irontide_core::torrent_from_bytes_any(&bytes).is_err() {
debug!(
path = %path.display(),
attempt,
"watched folder: bencode parse failed; retry"
);
continue;
}
return Ok(bytes);
}
Err(())
}
async fn validate_sandboxed_path(path: &Path, canonical_root: &Path) -> bool {
match tokio::fs::symlink_metadata(path).await {
Ok(md) if md.file_type().is_symlink() => {
warn!(path = %path.display(), "watched folder: rejecting symlink");
return false;
}
Ok(md) if !md.is_file() => {
return false;
}
Err(e) => {
debug!(path = %path.display(), error = %e, "watched folder: symlink_metadata failed");
return false;
}
_ => {}
}
match tokio::fs::canonicalize(path).await {
Ok(canonical) => {
if canonical.starts_with(canonical_root) {
true
} else {
warn!(
path = %path.display(),
canonical = %canonical.display(),
root = %canonical_root.display(),
"watched folder: rejecting path that escapes the watched-folder root"
);
false
}
}
Err(e) => {
debug!(
path = %path.display(),
error = %e,
"watched folder: canonicalize failed (file likely deleted mid-event)"
);
false
}
}
}
async fn rename_with_extension(path: &Path, new_ext: &str) {
let mut new_path = path.to_path_buf();
new_path.set_extension(new_ext);
if let Err(e) = tokio::fs::rename(path, &new_path).await {
warn!(
from = %path.display(),
to = %new_path.display(),
error = %e,
"watched folder: rename failed"
);
}
}
async fn import_one(
path: PathBuf,
canonical_root: PathBuf,
session: SessionHandle,
delete_after_add: bool,
) -> ImportOutcome {
if !validate_sandboxed_path(&path, &canonical_root).await {
return ImportOutcome::Skipped;
}
let Ok(bytes) = read_and_validate(&path).await else {
warn!(
path = %path.display(),
"watched folder: parse failed after retries; renaming to .malformed"
);
rename_with_extension(&path, "malformed").await;
return ImportOutcome::Malformed;
};
let params = AddTorrentParams::bytes(bytes);
match session.add_torrent(params).await {
Ok(info_hash) => {
info!(
path = %path.display(),
hash = %info_hash.to_hex(),
"watched folder: torrent added"
);
if delete_after_add
&& let Err(e) = tokio::fs::remove_file(&path).await
{
warn!(
path = %path.display(),
error = %e,
"watched folder: delete_after_add failed (torrent already accepted)"
);
}
ImportOutcome::Added
}
Err(crate::Error::DuplicateTorrent(hash)) => {
info!(
path = %path.display(),
hash = %hash.to_hex(),
"watched folder: torrent already in session (duplicate)"
);
if delete_after_add {
if let Err(e) = tokio::fs::remove_file(&path).await {
warn!(
path = %path.display(),
error = %e,
"watched folder: failed to delete duplicate"
);
}
} else {
rename_with_extension(&path, "duplicate").await;
}
ImportOutcome::Duplicate
}
Err(e) => {
warn!(
path = %path.display(),
error = %e,
"watched folder: add_torrent failed; file left in place"
);
ImportOutcome::Skipped
}
}
}
async fn initial_scan(root: &Path, queue: &mpsc::UnboundedSender<PathBuf>) {
let mut dir = match tokio::fs::read_dir(root).await {
Ok(d) => d,
Err(e) => {
warn!(
root = %root.display(),
error = %e,
"watched folder: initial scan read_dir failed"
);
return;
}
};
while let Ok(Some(entry)) = dir.next_entry().await {
let path = entry.path();
if is_torrent_file(&path) {
debug!(path = %path.display(), "watched folder: initial scan enqueue");
let _ = queue.send(path);
}
}
}
fn build_debouncer(
root: &Path,
out: mpsc::UnboundedSender<PathBuf>,
) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, notify_debouncer_full::notify::Error> {
let mut debouncer = new_debouncer(
Duration::from_millis(500),
None,
move |result: notify_debouncer_full::DebounceEventResult| {
let Ok(events) = result else { return };
let mut seen: HashSet<PathBuf> = HashSet::new();
for event in events {
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => {}
_ => continue,
}
for path in &event.paths {
if is_torrent_file(path) && seen.insert(path.clone()) {
let _ = out.send(path.clone());
}
}
}
},
)?;
debouncer.watch(root, RecursiveMode::NonRecursive)?;
Ok(debouncer)
}
#[must_use]
pub fn spawn_watched_folder_dispatcher(
session: SessionHandle,
settings_rx: watch::Receiver<Settings>,
watched_folder_changed: Arc<Notify>,
shutdown_rx: oneshot::Receiver<()>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<PathBuf>();
let semaphore = Arc::new(Semaphore::new(IMPORT_CONCURRENCY));
let mut debouncer: Option<Debouncer<RecommendedWatcher, RecommendedCache>> = None;
let mut canonical_root: Option<PathBuf> = None;
let mut shutdown_rx = shutdown_rx;
rebuild_debouncer(
&settings_rx,
&event_tx,
&mut debouncer,
&mut canonical_root,
)
.await;
loop {
tokio::select! {
_ = &mut shutdown_rx => {
debug!("watched folder: shutdown signal");
drop(debouncer.take());
break;
}
() = watched_folder_changed.notified() => {
rebuild_debouncer(
&settings_rx,
&event_tx,
&mut debouncer,
&mut canonical_root,
)
.await;
}
Some(path) = event_rx.recv() => {
let Some(root) = canonical_root.clone() else {
continue;
};
let delete_after_add = settings_rx
.borrow()
.delete_torrent_after_add;
let session_clone = session.clone();
let semaphore_clone = Arc::clone(&semaphore);
tokio::spawn(async move {
let Ok(_permit) = semaphore_clone.acquire_owned().await else {
return;
};
import_one(path, root, session_clone, delete_after_add).await;
});
}
}
}
})
}
async fn rebuild_debouncer(
settings_rx: &watch::Receiver<Settings>,
event_tx: &mpsc::UnboundedSender<PathBuf>,
debouncer: &mut Option<Debouncer<RecommendedWatcher, RecommendedCache>>,
canonical_root: &mut Option<PathBuf>,
) {
drop(debouncer.take());
*canonical_root = None;
let Some(path) = settings_rx.borrow().watched_folder.clone() else {
debug!("watched folder: setting cleared; watcher idle");
return;
};
let canonical = match tokio::fs::canonicalize(&path).await {
Ok(c) => c,
Err(e) => {
warn!(
path = %path.display(),
error = %e,
"watched folder: canonicalize failed; watcher stays idle until next apply_settings"
);
return;
}
};
match build_debouncer(&canonical, event_tx.clone()) {
Ok(d) => {
info!(path = %canonical.display(), "watched folder: watcher active");
*debouncer = Some(d);
*canonical_root = Some(canonical.clone());
initial_scan(&canonical, event_tx).await;
}
Err(e) => {
warn!(
path = %canonical.display(),
error = %e,
"watched folder: build_debouncer failed; watcher stays idle"
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn dummy_torrent_bytes() -> Vec<u8> {
let mut tmp = tempfile::NamedTempFile::new().unwrap();
std::io::Write::write_all(&mut tmp, &vec![0xABu8; 16384]).unwrap();
let result = irontide_core::CreateTorrent::new()
.add_file(tmp.path())
.set_name("test.bin")
.generate()
.unwrap();
result.bytes
}
#[test]
fn is_torrent_file_accepts_torrent_case_insensitive() {
assert!(is_torrent_file(&PathBuf::from("a.torrent")));
assert!(is_torrent_file(&PathBuf::from("a.TORRENT")));
assert!(is_torrent_file(&PathBuf::from("a.Torrent")));
}
#[test]
fn is_torrent_file_rejects_other_extensions() {
assert!(!is_torrent_file(&PathBuf::from("a.duplicate")));
assert!(!is_torrent_file(&PathBuf::from("a.malformed")));
assert!(!is_torrent_file(&PathBuf::from("a.txt")));
assert!(!is_torrent_file(&PathBuf::from("torrent")));
}
#[tokio::test]
async fn read_and_validate_succeeds_on_well_formed_torrent_bytes() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("good.torrent");
let bytes = dummy_torrent_bytes();
tokio::fs::write(&path, &bytes).await.unwrap();
let parsed = read_and_validate(&path).await.expect("parse must succeed");
assert_eq!(parsed, bytes);
}
#[tokio::test]
async fn read_and_validate_rejects_empty_file_after_retries() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("empty.torrent");
tokio::fs::write(&path, []).await.unwrap();
let start = std::time::Instant::now();
let result = read_and_validate(&path).await;
let elapsed = start.elapsed();
assert!(result.is_err());
assert!(
elapsed >= Duration::from_millis(800),
"expected ≥800ms cumulative backoff before giving up; got {elapsed:?}"
);
}
#[tokio::test]
async fn read_and_validate_rejects_non_bencode_content() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("garbage.torrent");
tokio::fs::write(&path, b"this is not bencode")
.await
.unwrap();
let result = read_and_validate(&path).await;
assert!(result.is_err());
}
#[tokio::test]
async fn validate_sandboxed_path_rejects_symlink_inside_root() {
let tmp = tempfile::tempdir().unwrap();
let canonical_root = tokio::fs::canonicalize(tmp.path()).await.unwrap();
let real = canonical_root.join("real.torrent");
tokio::fs::write(&real, dummy_torrent_bytes()).await.unwrap();
let link = canonical_root.join("link.torrent");
#[cfg(unix)]
std::os::unix::fs::symlink(&real, &link).unwrap();
#[cfg(not(unix))]
{
return;
}
assert!(!validate_sandboxed_path(&link, &canonical_root).await);
assert!(validate_sandboxed_path(&real, &canonical_root).await);
}
#[tokio::test]
async fn validate_sandboxed_path_rejects_missing_file() {
let tmp = tempfile::tempdir().unwrap();
let canonical_root = tokio::fs::canonicalize(tmp.path()).await.unwrap();
let missing = canonical_root.join("ghost.torrent");
assert!(!validate_sandboxed_path(&missing, &canonical_root).await);
}
#[tokio::test]
async fn validate_sandboxed_path_rejects_directory() {
let tmp = tempfile::tempdir().unwrap();
let canonical_root = tokio::fs::canonicalize(tmp.path()).await.unwrap();
let subdir = canonical_root.join("subdir");
tokio::fs::create_dir(&subdir).await.unwrap();
assert!(!validate_sandboxed_path(&subdir, &canonical_root).await);
}
#[tokio::test]
async fn rename_with_extension_swaps_suffix() {
let tmp = tempfile::tempdir().unwrap();
let original = tmp.path().join("foo.torrent");
tokio::fs::write(&original, b"hello").await.unwrap();
rename_with_extension(&original, "duplicate").await;
assert!(!original.exists());
assert!(tmp.path().join("foo.duplicate").exists());
}
#[tokio::test]
async fn watcher_initial_scan_picks_up_preexisting_file() {
use std::time::Instant;
let tmp = tempfile::tempdir().unwrap();
let watched = tmp.path().join("watched");
tokio::fs::create_dir(&watched).await.unwrap();
let download = tmp.path().join("download");
tokio::fs::create_dir(&download).await.unwrap();
let resume = tmp.path().join("resume");
tokio::fs::create_dir(&resume).await.unwrap();
let path = watched.join("preexisting.torrent");
tokio::fs::write(&path, dummy_torrent_bytes()).await.unwrap();
let settings = Settings {
download_dir: download.clone(),
resume_data_dir: Some(resume),
watched_folder: Some(watched.clone()),
..Default::default()
};
let session = SessionHandle::start(settings.clone()).await.unwrap();
let (settings_tx, settings_rx) = watch::channel(settings);
let watched_changed = Arc::new(Notify::new());
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let _join = spawn_watched_folder_dispatcher(
session.clone(),
settings_rx,
Arc::clone(&watched_changed),
shutdown_rx,
);
let deadline = Instant::now() + Duration::from_secs(5);
let mut backoff = Duration::from_millis(50);
let mut found = false;
while Instant::now() < deadline {
let count = session.list_torrents().await.unwrap().len();
if count >= 1 {
found = true;
break;
}
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_millis(500));
}
let _ = settings_tx;
let _ = shutdown_tx.send(());
session.shutdown().await.unwrap();
assert!(
found,
"F2 initial scan must import the preexisting .torrent within 5s"
);
}
}