irontide-session 1.0.1

BitTorrent session management: peers, torrents, and piece selection
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
//! M226 Step 6: engine-side watched-folder subsystem.
//!
//! Watches `settings.watched_folder` for `.torrent` files via
//! `notify-debouncer-full` (500 ms debounce) and auto-adds them through
//! the regular [`SessionHandle::add_torrent`] path. When the user lands
//! a torrent file in the dropbox directory it shows up in the engine
//! within ~1 second; the file is then either deleted
//! (`settings.delete_torrent_after_add = true`) or renamed to
//! `*.duplicate` / `*.malformed` so the next startup scan skips it.
//!
//! # Hardening (M226 plan §6 — Gemini OV auto-fixes)
//!
//! - **F1**: notify-debouncer-full's event callback is synchronous and
//!   runs on the notify worker thread; events are funnelled into the
//!   async loop via an `mpsc::UnboundedSender` so no blocking call
//!   touches the tokio runtime.
//! - **F2**: on startup AND every live-swap rebuild, sweep the new path
//!   once via [`tokio::fs::read_dir`] so torrents dropped while the
//!   engine was offline are not silently ignored.
//! - **F6**: parsing a `.torrent` may race the writer (rsync / NFS /
//!   browser-saved file mid-flight). Retry parse up to 3 times with
//!   backoff 100/200/500 ms; after that, rename the file to
//!   `<name>.malformed` so subsequent startup scans skip it instead of
//!   re-trying corrupt input on every restart.
//! - **F7/G1**: every candidate path is checked with
//!   [`tokio::fs::symlink_metadata`] (NOT [`tokio::fs::metadata`] —
//!   `metadata` follows symlinks and silently masks them). Symlinks
//!   are rejected outright. The resolved canonical path must also
//!   begin with the canonical watched-folder root so attackers cannot
//!   plant a symlink pointing outside the dropbox.
//! - **F8**: imports are gated by [`tokio::sync::Semaphore::new(4)`]
//!   so a mass-drop of 1,000 files doesn't fan out 1,000 concurrent
//!   `add_torrent` calls. Permits acquired before file-read.
//! - **G2**: on `DuplicateTorrent` either delete the file (when
//!   `delete_torrent_after_add = true`) or rename to `*.duplicate` so
//!   subsequent startup scans don't infinite-log the dup.
//! - **G3**: on live-swap the old `Debouncer` is dropped EXPLICITLY
//!   before constructing the new one so its inotify FDs + worker
//!   thread are released; otherwise repeated live-swaps exhaust
//!   `/proc/sys/fs/inotify/max_user_watches`.

use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use notify_debouncer_full::{
    Debouncer, RecommendedCache,
    notify::{EventKind, RecommendedWatcher, RecursiveMode},
    new_debouncer,
};
use tokio::sync::{Notify, Semaphore, mpsc, oneshot, watch};
use tracing::{debug, info, warn};

use crate::session::{AddTorrentParams, SessionHandle};
use crate::settings::Settings;

/// Maximum concurrent in-flight imports (F8 cap). Below
/// `Settings::hashing_threads` default (7) so genuine downloads still
/// get CPU; small enough that 1k-file mass-drops can't OOM the engine.
const IMPORT_CONCURRENCY: usize = 4;
/// Parse-retry backoffs (F6) — 3 attempts total at 100/200/500 ms.
const RETRY_BACKOFFS: [Duration; 3] = [
    Duration::from_millis(100),
    Duration::from_millis(200),
    Duration::from_millis(500),
];

/// `.torrent` case-insensitive file-name filter. Subsequent rename
/// passes flip the extension to `.duplicate` / `.malformed` so the
/// filter naturally rejects them on the next pass.
#[must_use]
pub fn is_torrent_file(path: &Path) -> bool {
    path.extension()
        .is_some_and(|e| e.eq_ignore_ascii_case("torrent"))
}

/// Outcome of an import attempt — kept narrow so callers / tests don't
/// have to discriminate on the engine's full `Error` surface.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ImportOutcome {
    /// Engine accepted the torrent. The file should be deleted if
    /// `delete_torrent_after_add = true`.
    Added,
    /// Engine already has this info-hash. File should be renamed
    /// to `.duplicate` (or deleted if the flag is set).
    Duplicate,
    /// Bencode parse failed after every retry. File renamed to
    /// `.malformed` so the next startup scan skips it.
    Malformed,
    /// Transient I/O error or rejected by security check; the file
    /// stays in place so the next startup scan or live event retries.
    Skipped,
}

/// Read the file and attempt to construct an [`AddTorrentParams`].
/// Retries up to 3 times on parse failure (F6 — partial-file race).
/// Returns `Ok(bytes)` on success or `Err(())` when every retry failed.
async fn read_and_validate(path: &Path) -> Result<Vec<u8>, ()> {
    for (attempt, backoff) in std::iter::once(Duration::ZERO)
        .chain(RETRY_BACKOFFS.iter().copied())
        .enumerate()
    {
        if backoff != Duration::ZERO {
            tokio::time::sleep(backoff).await;
        }
        let bytes = match tokio::fs::read(path).await {
            Ok(b) => b,
            Err(e) => {
                debug!(
                    path = %path.display(),
                    attempt,
                    error = %e,
                    "watched folder: read attempt failed"
                );
                continue;
            }
        };
        // BEP 3 .torrent files are bencoded dicts; the first byte is
        // ALWAYS 'd'. A zero-length or non-dict file is mid-write or
        // garbage — neither a transient read error nor parseable.
        if bytes.is_empty() || bytes[0] != b'd' {
            debug!(
                path = %path.display(),
                attempt,
                len = bytes.len(),
                "watched folder: bytes don't start with bencode dict; retry"
            );
            continue;
        }
        // Probe-parse via core's auto-detect parser — same path
        // `session.add_torrent` runs internally. Cheap (~µs) so the
        // double-parse is fine.
        if irontide_core::torrent_from_bytes_any(&bytes).is_err() {
            debug!(
                path = %path.display(),
                attempt,
                "watched folder: bencode parse failed; retry"
            );
            continue;
        }
        return Ok(bytes);
    }
    Err(())
}

/// Reject symlinks AND paths whose canonical form escapes the
/// canonical watched-folder root (F7 / G1). `symlink_metadata` is the
/// critical primitive: `metadata` follows symlinks and would silently
/// mask them.
async fn validate_sandboxed_path(path: &Path, canonical_root: &Path) -> bool {
    match tokio::fs::symlink_metadata(path).await {
        Ok(md) if md.file_type().is_symlink() => {
            warn!(path = %path.display(), "watched folder: rejecting symlink");
            return false;
        }
        Ok(md) if !md.is_file() => {
            // Directories, FIFOs, sockets, devices — skip silently.
            return false;
        }
        Err(e) => {
            debug!(path = %path.display(), error = %e, "watched folder: symlink_metadata failed");
            return false;
        }
        _ => {}
    }
    match tokio::fs::canonicalize(path).await {
        Ok(canonical) => {
            if canonical.starts_with(canonical_root) {
                true
            } else {
                warn!(
                    path = %path.display(),
                    canonical = %canonical.display(),
                    root = %canonical_root.display(),
                    "watched folder: rejecting path that escapes the watched-folder root"
                );
                false
            }
        }
        Err(e) => {
            debug!(
                path = %path.display(),
                error = %e,
                "watched folder: canonicalize failed (file likely deleted mid-event)"
            );
            false
        }
    }
}

/// Rename `path` to `<stem>.<new_ext>` — used by G2 (`.duplicate`) and
/// F6 (`.malformed`). Logs WARN on failure; does NOT propagate the
/// error since rename failure is non-fatal (the file stays put,
/// startup scan re-tries on next run).
async fn rename_with_extension(path: &Path, new_ext: &str) {
    let mut new_path = path.to_path_buf();
    new_path.set_extension(new_ext);
    if let Err(e) = tokio::fs::rename(path, &new_path).await {
        warn!(
            from = %path.display(),
            to = %new_path.display(),
            error = %e,
            "watched folder: rename failed"
        );
    }
}

/// Single-file import flow. Runs under a semaphore permit (F8) so the
/// caller MUST acquire the permit before invoking this. The body never
/// panics; every error path logs + classifies via [`ImportOutcome`].
async fn import_one(
    path: PathBuf,
    canonical_root: PathBuf,
    session: SessionHandle,
    delete_after_add: bool,
) -> ImportOutcome {
    if !validate_sandboxed_path(&path, &canonical_root).await {
        return ImportOutcome::Skipped;
    }
    let Ok(bytes) = read_and_validate(&path).await else {
        warn!(
            path = %path.display(),
            "watched folder: parse failed after retries; renaming to .malformed"
        );
        rename_with_extension(&path, "malformed").await;
        return ImportOutcome::Malformed;
    };
    let params = AddTorrentParams::bytes(bytes);
    match session.add_torrent(params).await {
        Ok(info_hash) => {
            info!(
                path = %path.display(),
                hash = %info_hash.to_hex(),
                "watched folder: torrent added"
            );
            if delete_after_add
                && let Err(e) = tokio::fs::remove_file(&path).await
            {
                warn!(
                    path = %path.display(),
                    error = %e,
                    "watched folder: delete_after_add failed (torrent already accepted)"
                );
            }
            ImportOutcome::Added
        }
        Err(crate::Error::DuplicateTorrent(hash)) => {
            info!(
                path = %path.display(),
                hash = %hash.to_hex(),
                "watched folder: torrent already in session (duplicate)"
            );
            if delete_after_add {
                if let Err(e) = tokio::fs::remove_file(&path).await {
                    warn!(
                        path = %path.display(),
                        error = %e,
                        "watched folder: failed to delete duplicate"
                    );
                }
            } else {
                rename_with_extension(&path, "duplicate").await;
            }
            ImportOutcome::Duplicate
        }
        Err(e) => {
            warn!(
                path = %path.display(),
                error = %e,
                "watched folder: add_torrent failed; file left in place"
            );
            ImportOutcome::Skipped
        }
    }
}

/// One-time directory walk (F2) — every `.torrent` file currently in
/// the folder is enqueued through the live-event channel as if a
/// filesystem event had just fired. Idempotent: if the same file fires
/// later via inotify the per-info-hash dedup happens at engine level.
async fn initial_scan(root: &Path, queue: &mpsc::UnboundedSender<PathBuf>) {
    let mut dir = match tokio::fs::read_dir(root).await {
        Ok(d) => d,
        Err(e) => {
            warn!(
                root = %root.display(),
                error = %e,
                "watched folder: initial scan read_dir failed"
            );
            return;
        }
    };
    while let Ok(Some(entry)) = dir.next_entry().await {
        let path = entry.path();
        if is_torrent_file(&path) {
            debug!(path = %path.display(), "watched folder: initial scan enqueue");
            let _ = queue.send(path);
        }
    }
}

/// Build the debouncer + start watching `root`. The 500 ms debounce
/// matches the GUI's M194 implementation and gives mid-flight writes a
/// chance to land before we read.
fn build_debouncer(
    root: &Path,
    out: mpsc::UnboundedSender<PathBuf>,
) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, notify_debouncer_full::notify::Error> {
    let mut debouncer = new_debouncer(
        Duration::from_millis(500),
        None,
        move |result: notify_debouncer_full::DebounceEventResult| {
            let Ok(events) = result else { return };
            // Dedup within this debounce window — a single mv can fire
            // both Create and Modify; we'd then queue the same path
            // twice for the async worker.
            let mut seen: HashSet<PathBuf> = HashSet::new();
            for event in events {
                match event.kind {
                    EventKind::Create(_) | EventKind::Modify(_) => {}
                    _ => continue,
                }
                for path in &event.paths {
                    if is_torrent_file(path) && seen.insert(path.clone()) {
                        let _ = out.send(path.clone());
                    }
                }
            }
        },
    )?;
    debouncer.watch(root, RecursiveMode::NonRecursive)?;
    Ok(debouncer)
}

/// Spawn the engine-side watched-folder dispatcher task.
///
/// The watcher subscribes to:
/// 1. `watched_folder_changed.notified()` — fires when
///    `handle_apply_settings` detects a delta on `watched_folder` OR
///    `delete_torrent_after_add`. Triggers a full debouncer rebuild
///    plus initial-scan.
/// 2. `shutdown_rx` — single-shot; when the receiver resolves (sender
///    sent OR sender was dropped) the watcher tears down cleanly.
/// 3. Internal mpsc fed by the debouncer callback.
#[must_use]
pub fn spawn_watched_folder_dispatcher(
    session: SessionHandle,
    settings_rx: watch::Receiver<Settings>,
    watched_folder_changed: Arc<Notify>,
    shutdown_rx: oneshot::Receiver<()>,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let (event_tx, mut event_rx) = mpsc::unbounded_channel::<PathBuf>();
        let semaphore = Arc::new(Semaphore::new(IMPORT_CONCURRENCY));
        let mut debouncer: Option<Debouncer<RecommendedWatcher, RecommendedCache>> = None;
        let mut canonical_root: Option<PathBuf> = None;
        let mut shutdown_rx = shutdown_rx;

        // Build initial debouncer from the current settings snapshot.
        rebuild_debouncer(
            &settings_rx,
            &event_tx,
            &mut debouncer,
            &mut canonical_root,
        )
        .await;

        loop {
            tokio::select! {
                _ = &mut shutdown_rx => {
                    debug!("watched folder: shutdown signal");
                    // G3: explicit drop before the function returns —
                    // dropping inside the loop is functionally
                    // equivalent but explicit makes the intent
                    // unambiguous for future readers.
                    drop(debouncer.take());
                    break;
                }
                () = watched_folder_changed.notified() => {
                    rebuild_debouncer(
                        &settings_rx,
                        &event_tx,
                        &mut debouncer,
                        &mut canonical_root,
                    )
                    .await;
                }
                Some(path) = event_rx.recv() => {
                    let Some(root) = canonical_root.clone() else {
                        // The watcher fired but the live-swap raced us
                        // and disabled the folder. Drop the event.
                        continue;
                    };
                    let delete_after_add = settings_rx
                        .borrow()
                        .delete_torrent_after_add;
                    let session_clone = session.clone();
                    let semaphore_clone = Arc::clone(&semaphore);
                    // Spawn the per-file import so a slow add_torrent
                    // (e.g. metadata fetch on a magnet derived from a
                    // small redirect torrent) doesn't block the select!
                    // loop. The semaphore caps in-flight concurrency.
                    tokio::spawn(async move {
                        let Ok(_permit) = semaphore_clone.acquire_owned().await else {
                            // Semaphore is never closed in production;
                            // this branch only fires if a test closes
                            // it mid-loop. Treat as shutdown — drop.
                            return;
                        };
                        import_one(path, root, session_clone, delete_after_add).await;
                    });
                }
            }
        }
    })
}

/// Drop the old debouncer (G3) and build a new one from the latest
/// settings snapshot. Tolerates absent / bad paths — if the rebuild
/// fails the watcher stays running with no debouncer; the user can
/// fix the setting via `apply_settings` and the next notify fires this
/// path again.
async fn rebuild_debouncer(
    settings_rx: &watch::Receiver<Settings>,
    event_tx: &mpsc::UnboundedSender<PathBuf>,
    debouncer: &mut Option<Debouncer<RecommendedWatcher, RecommendedCache>>,
    canonical_root: &mut Option<PathBuf>,
) {
    // G3: drop FIRST, before touching the new path. This releases
    // inotify FDs + the debouncer's internal worker thread.
    drop(debouncer.take());
    *canonical_root = None;

    let Some(path) = settings_rx.borrow().watched_folder.clone() else {
        debug!("watched folder: setting cleared; watcher idle");
        return;
    };
    let canonical = match tokio::fs::canonicalize(&path).await {
        Ok(c) => c,
        Err(e) => {
            warn!(
                path = %path.display(),
                error = %e,
                "watched folder: canonicalize failed; watcher stays idle until next apply_settings"
            );
            return;
        }
    };
    match build_debouncer(&canonical, event_tx.clone()) {
        Ok(d) => {
            info!(path = %canonical.display(), "watched folder: watcher active");
            *debouncer = Some(d);
            *canonical_root = Some(canonical.clone());
            initial_scan(&canonical, event_tx).await;
        }
        Err(e) => {
            warn!(
                path = %canonical.display(),
                error = %e,
                "watched folder: build_debouncer failed; watcher stays idle"
            );
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::path::PathBuf;

    fn dummy_torrent_bytes() -> Vec<u8> {
        // Minimal-but-valid v1 .torrent assembled by `CreateTorrent` so
        // we don't smuggle raw bencode through `format!` (which would
        // be UB on non-UTF-8 SHA1 hash bytes). 16 KiB single-file piece
        // with arbitrary data; the exact contents don't matter — the
        // parser only needs the structure.
        let mut tmp = tempfile::NamedTempFile::new().unwrap();
        std::io::Write::write_all(&mut tmp, &vec![0xABu8; 16384]).unwrap();
        let result = irontide_core::CreateTorrent::new()
            .add_file(tmp.path())
            .set_name("test.bin")
            .generate()
            .unwrap();
        result.bytes
    }

    #[test]
    fn is_torrent_file_accepts_torrent_case_insensitive() {
        assert!(is_torrent_file(&PathBuf::from("a.torrent")));
        assert!(is_torrent_file(&PathBuf::from("a.TORRENT")));
        assert!(is_torrent_file(&PathBuf::from("a.Torrent")));
    }

    #[test]
    fn is_torrent_file_rejects_other_extensions() {
        // The G2 rename-to-`.duplicate` / F6 rename-to-`.malformed`
        // tactic only works because the filter naturally drops them.
        assert!(!is_torrent_file(&PathBuf::from("a.duplicate")));
        assert!(!is_torrent_file(&PathBuf::from("a.malformed")));
        assert!(!is_torrent_file(&PathBuf::from("a.txt")));
        assert!(!is_torrent_file(&PathBuf::from("torrent")));
    }

    #[tokio::test]
    async fn read_and_validate_succeeds_on_well_formed_torrent_bytes() {
        let tmp = tempfile::tempdir().unwrap();
        let path = tmp.path().join("good.torrent");
        let bytes = dummy_torrent_bytes();
        tokio::fs::write(&path, &bytes).await.unwrap();
        let parsed = read_and_validate(&path).await.expect("parse must succeed");
        assert_eq!(parsed, bytes);
    }

    #[tokio::test]
    async fn read_and_validate_rejects_empty_file_after_retries() {
        let tmp = tempfile::tempdir().unwrap();
        let path = tmp.path().join("empty.torrent");
        tokio::fs::write(&path, []).await.unwrap();
        let start = std::time::Instant::now();
        let result = read_and_validate(&path).await;
        let elapsed = start.elapsed();
        assert!(result.is_err());
        // 100+200+500 ms = 800 ms of cumulative backoff.
        assert!(
            elapsed >= Duration::from_millis(800),
            "expected ≥800ms cumulative backoff before giving up; got {elapsed:?}"
        );
    }

    #[tokio::test]
    async fn read_and_validate_rejects_non_bencode_content() {
        let tmp = tempfile::tempdir().unwrap();
        let path = tmp.path().join("garbage.torrent");
        tokio::fs::write(&path, b"this is not bencode")
            .await
            .unwrap();
        let result = read_and_validate(&path).await;
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn validate_sandboxed_path_rejects_symlink_inside_root() {
        // F7 / G1: symlinks (even pointing to files inside the same
        // root) must be rejected, because an attacker who controls
        // the watched-folder DAC could plant a symlink → /etc/anything.
        let tmp = tempfile::tempdir().unwrap();
        let canonical_root = tokio::fs::canonicalize(tmp.path()).await.unwrap();
        let real = canonical_root.join("real.torrent");
        tokio::fs::write(&real, dummy_torrent_bytes()).await.unwrap();
        let link = canonical_root.join("link.torrent");
        #[cfg(unix)]
        std::os::unix::fs::symlink(&real, &link).unwrap();
        #[cfg(not(unix))]
        {
            // Windows: skip on non-Unix targets — symlink creation
            // needs admin in default configurations. The F7 regression
            // is Linux/macOS-relevant only.
            return;
        }
        assert!(!validate_sandboxed_path(&link, &canonical_root).await);
        // Real file still passes.
        assert!(validate_sandboxed_path(&real, &canonical_root).await);
    }

    #[tokio::test]
    async fn validate_sandboxed_path_rejects_missing_file() {
        let tmp = tempfile::tempdir().unwrap();
        let canonical_root = tokio::fs::canonicalize(tmp.path()).await.unwrap();
        let missing = canonical_root.join("ghost.torrent");
        assert!(!validate_sandboxed_path(&missing, &canonical_root).await);
    }

    #[tokio::test]
    async fn validate_sandboxed_path_rejects_directory() {
        let tmp = tempfile::tempdir().unwrap();
        let canonical_root = tokio::fs::canonicalize(tmp.path()).await.unwrap();
        let subdir = canonical_root.join("subdir");
        tokio::fs::create_dir(&subdir).await.unwrap();
        assert!(!validate_sandboxed_path(&subdir, &canonical_root).await);
    }

    #[tokio::test]
    async fn rename_with_extension_swaps_suffix() {
        let tmp = tempfile::tempdir().unwrap();
        let original = tmp.path().join("foo.torrent");
        tokio::fs::write(&original, b"hello").await.unwrap();
        rename_with_extension(&original, "duplicate").await;
        assert!(!original.exists());
        assert!(tmp.path().join("foo.duplicate").exists());
    }

    /// End-to-end watcher pipeline using a real `SessionHandle`. Drops
    /// a valid `.torrent` in the watched dir BEFORE the watcher starts
    /// → F2 initial scan must pick it up.
    #[tokio::test]
    async fn watcher_initial_scan_picks_up_preexisting_file() {
        use std::time::Instant;

        let tmp = tempfile::tempdir().unwrap();
        let watched = tmp.path().join("watched");
        tokio::fs::create_dir(&watched).await.unwrap();
        let download = tmp.path().join("download");
        tokio::fs::create_dir(&download).await.unwrap();
        let resume = tmp.path().join("resume");
        tokio::fs::create_dir(&resume).await.unwrap();

        // Drop the file BEFORE spawning the watcher (F2 invariant).
        let path = watched.join("preexisting.torrent");
        tokio::fs::write(&path, dummy_torrent_bytes()).await.unwrap();

        let settings = Settings {
            download_dir: download.clone(),
            resume_data_dir: Some(resume),
            watched_folder: Some(watched.clone()),
            ..Default::default()
        };
        let session = SessionHandle::start(settings.clone()).await.unwrap();

        let (settings_tx, settings_rx) = watch::channel(settings);
        let watched_changed = Arc::new(Notify::new());
        let (shutdown_tx, shutdown_rx) = oneshot::channel();
        let _join = spawn_watched_folder_dispatcher(
            session.clone(),
            settings_rx,
            Arc::clone(&watched_changed),
            shutdown_rx,
        );

        // Exponential-backoff poll for the torrent appearing in session.
        let deadline = Instant::now() + Duration::from_secs(5);
        let mut backoff = Duration::from_millis(50);
        let mut found = false;
        while Instant::now() < deadline {
            let count = session.list_torrents().await.unwrap().len();
            if count >= 1 {
                found = true;
                break;
            }
            tokio::time::sleep(backoff).await;
            backoff = (backoff * 2).min(Duration::from_millis(500));
        }
        let _ = settings_tx;
        let _ = shutdown_tx.send(());
        session.shutdown().await.unwrap();
        assert!(
            found,
            "F2 initial scan must import the preexisting .torrent within 5s"
        );
    }
}