Skip to main content

ts_runtime/
taildrop.rs

1//! Taildrop file store — the receiving half of Tailscale's peer-to-peer file transfer.
2//!
3//! A peer sends a file to this node via the peerAPI route `PUT /v0/put/<name>` (handled in
4//! `peerapi`). This module owns the on-disk store those puts land in, faithfully mirroring
5//! Go's `taildrop.manager`:
6//!
7//! - Incoming bytes are written to a per-transfer **partial** file (`<base>.partial`) so an
8//!   interrupted transfer never exposes a truncated file under its real name, and can be resumed
9//!   from an offset.
10//! - On successful completion the partial is **atomically renamed** to the final base name. If the
11//!   final name already exists, a non-clobbering ` (n)` suffix is chosen (Go `nextFilename`).
12//! - File names are strictly validated ([`validate_base_name`](crate::taildrop::validate_base_name)) to defeat path traversal and
13//!   reserved-suffix abuse before any path is constructed — this is the security boundary.
14//!
15//! # Anti-abuse / safety
16//!
17//! Every name is validated to be a single, local, non-traversing path component before it touches
18//! the filesystem; a name containing `/`, `\`, `..`, a NUL, control chars, or the reserved
19//! `.partial` / `.deleted` suffixes is rejected with [`TaildropError::InvalidFileName`](crate::taildrop::TaildropError::InvalidFileName). The store
20//! root is fixed at construction; all I/O is confined to it by joining only validated base names.
21
22use std::{
23    collections::HashSet,
24    io::{self, Seek, Write},
25    path::{Path, PathBuf},
26    sync::{Arc, Mutex},
27    time::{Duration, SystemTime},
28};
29
30use tokio::io::{AsyncRead, AsyncReadExt};
31
32/// How long an abandoned `.partial` is kept before the reaper deletes it (Go
33/// `feature/taildrop/delete.go` `deleteDelay = time.Hour`). A resume within the window keeps the
34/// partial alive (its mtime advances on every write, and an in-flight transfer is skipped outright),
35/// so this is the grace period for a transfer to be resumed before its leftovers are reclaimed.
36pub const DELETE_DELAY: Duration = Duration::from_secs(60 * 60);
37
38/// Suffix for in-progress transfers. A completed transfer is renamed off this suffix; a name
39/// ending in it is itself never accepted as a base name (Go `partialSuffix`).
40const PARTIAL_SUFFIX: &str = ".partial";
41/// Suffix Go uses to tombstone files pending deletion on platforms with async close; we reject it
42/// as a base name for parity so a sender can't create one (Go `deletedSuffix`).
43const DELETED_SUFFIX: &str = ".deleted";
44/// Maximum base-name length in bytes (Go `validateBaseName`: 255).
45const MAX_BASE_NAME_LEN: usize = 255;
46
47/// Errors from the Taildrop file store.
48#[derive(Debug)]
49pub enum TaildropError {
50    /// The requested file name is invalid (traversal, reserved suffix, empty, too long, bad runes).
51    /// Maps to peerAPI `400 Bad Request`.
52    InvalidFileName,
53    /// A transfer for this exact base name is already in progress. Maps to peerAPI `409 Conflict`.
54    FileExists,
55    /// Underlying filesystem I/O failure. Maps to peerAPI `500`.
56    Io(io::Error),
57}
58
59impl core::fmt::Display for TaildropError {
60    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
61        match self {
62            TaildropError::InvalidFileName => write!(f, "invalid taildrop file name"),
63            TaildropError::FileExists => {
64                write!(f, "a transfer for this file is already in progress")
65            }
66            TaildropError::Io(e) => write!(f, "taildrop I/O error: {e}"),
67        }
68    }
69}
70
71impl std::error::Error for TaildropError {
72    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
73        match self {
74            TaildropError::Io(e) => Some(e),
75            _ => None,
76        }
77    }
78}
79
80impl From<io::Error> for TaildropError {
81    fn from(e: io::Error) -> Self {
82        TaildropError::Io(e)
83    }
84}
85
86/// A waiting (fully-received) Taildrop file, as reported to the embedder. Mirrors Go
87/// `apitype.WaitingFile` (default field-name JSON marshalling: `Name`, `Size`).
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct WaitingFile {
90    /// The file's base name.
91    pub name: String,
92    /// The file's size in bytes.
93    pub size: u64,
94}
95
96/// Validate a Taildrop base name, mirroring Go `taildrop.validateBaseName`.
97///
98/// Returns the name unchanged when it is a safe, single, local path component; otherwise `None`.
99/// Rejection rules (any one fails): empty or `> 255` bytes; leading/trailing ASCII space; contains
100/// a path separator (`/` or `\`), a NUL, or an ASCII control char; is `.` or `..`; equals a cleaned
101/// path other than itself (catches embedded `..`/`.` segments and absolute paths); or ends in the
102/// reserved `.partial` / `.deleted` suffixes.
103pub fn validate_base_name(name: &str) -> Option<&str> {
104    if name.is_empty() || name.len() > MAX_BASE_NAME_LEN {
105        return None;
106    }
107    if name.starts_with(' ') || name.ends_with(' ') {
108        return None;
109    }
110    if name == "." || name == ".." {
111        return None;
112    }
113    if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
114        return None;
115    }
116    // Reject any separator, NUL, or control character outright. This is the core traversal guard:
117    // with no `/`, `\`, or `..` segment possible, the name can only ever be a leaf in the store dir.
118    for ch in name.chars() {
119        if ch == '/' || ch == '\\' || ch == '\0' || ch.is_control() {
120            return None;
121        }
122    }
123    // Defense in depth: a name that does not survive `Path` normalization as a single normal
124    // component is rejected (catches `..`, absolute paths, and any platform-specific oddity).
125    let p = Path::new(name);
126    let mut comps = p.components();
127    match (comps.next(), comps.next()) {
128        (Some(std::path::Component::Normal(c)), None) if c == name => Some(name),
129        _ => None,
130    }
131}
132
133/// Choose a non-clobbering final name for `base` within `dir`, mirroring Go `nextFilename`:
134/// `foo.txt` -> `foo (1).txt` -> `foo (2).txt` ... inserting ` (n)` before the extension. Returns
135/// the first candidate (incl. `base` itself) whose path does not yet exist. Bounded to avoid an
136/// unbounded loop on a pathological directory.
137fn next_available_name(dir: &Path, base: &str) -> String {
138    if !path_present(&dir.join(base)) {
139        return base.to_string();
140    }
141    let (stem, ext) = match base.rsplit_once('.') {
142        // Keep the dot with the extension; an empty stem (dotfile like ".bashrc") has no split.
143        Some((stem, ext)) if !stem.is_empty() => (stem, format!(".{ext}")),
144        _ => (base, String::new()),
145    };
146    for n in 1..=10_000u32 {
147        let candidate = format!("{stem} ({n}){ext}");
148        if !path_present(&dir.join(&candidate)) {
149            return candidate;
150        }
151    }
152    // Pathological fallback: suffix with a high counter; extremely unlikely to be reached.
153    format!("{stem} (overflow){ext}")
154}
155
156/// Whether a path is present, treating a symlink (even a dangling one) as present. Unlike
157/// `Path::exists()` (which follows the link and returns `false` for a dangling symlink), this uses
158/// `symlink_metadata` so a planted symlink can never be mistaken for a free name in
159/// [`next_available_name`] — we must not select, then rename onto, a symlink.
160fn path_present(path: &Path) -> bool {
161    std::fs::symlink_metadata(path).is_ok()
162}
163
164/// Reject a path that is (or whose final component is) a symlink, mirroring the intent of Go's
165/// `O_NOFOLLOW` on the taildrop file ops. `validate_base_name` already blocks a traversing *name*,
166/// but not a symlink **component already planted in the store root** by a local attacker (e.g.
167/// `root/foo.txt -> /etc/cron.d/x`), which a plain `open`/`rename`/`remove` would follow. Uses
168/// `symlink_metadata` (lstat — does NOT follow the final symlink); a non-existent path is fine
169/// (returns `Ok(())`), only an existing symlink is refused.
170///
171/// This is a check-then-act guard, so it is not atomic with the open/rename/remove that follows.
172/// It kills the **persistent-plant** attack (a symlink left in the store root is no longer followed
173/// deterministically), and the per-name in-flight lock serializes our OWN operations on a name, but
174/// it does not close a sub-millisecond race where an external process swaps the path for a symlink
175/// between this lstat and the syscall. That residual requires an external writer who already holds
176/// store-dir write access (the threat bound for this hardening), and is the one axis where this is
177/// weaker than Go's atomic `O_NOFOLLOW`. The `offset == 0` put is additionally protected by
178/// `create_new` (`O_EXCL`, which refuses an existing symlink atomically); the `offset > 0` resume
179/// open is plain `write(true)` and relies on this advisory check until `O_NOFOLLOW` is wired
180/// (tracked as a follow-up — its raw value is arch-dependent, so a portable open flag is deferred).
181fn refuse_symlink(path: &Path) -> Result<(), TaildropError> {
182    match std::fs::symlink_metadata(path) {
183        Ok(meta) if meta.file_type().is_symlink() => Err(TaildropError::Io(io::Error::new(
184            io::ErrorKind::InvalidInput,
185            "taildrop path is a symlink; refusing to follow it",
186        ))),
187        Ok(_) => Ok(()),
188        // Not present yet (the common case for a fresh partial / final name) — nothing to refuse.
189        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
190        Err(e) => Err(e.into()),
191    }
192}
193
194/// A Taildrop file store rooted at a fixed directory. All operations are confined to this root by
195/// joining only [`validate_base_name`]-validated names.
196#[derive(Debug, Clone)]
197pub struct TaildropStore {
198    root: PathBuf,
199    /// Base names with a transfer currently in flight. A `put_file` claims its base name here for
200    /// the whole receive (both a fresh `offset == 0` transfer and a resumed `offset > 0` one), so
201    /// two concurrent PUTs for the same name cannot interleave `set_len`/`seek`/`write_all` and
202    /// corrupt the shared `.partial`. Shared (`Arc`) so it survives `TaildropStore::clone()` — the
203    /// store is handed around as `Arc<TaildropStore>` but cloning it must not fork the guard set.
204    in_flight: Arc<Mutex<HashSet<String>>>,
205}
206
207/// RAII claim on an in-flight transfer name; releasing it (on drop) frees the name for the next
208/// transfer. Holds the shared guard set so the entry is removed even on an early return / error /
209/// panic in `put_file`.
210struct InFlightGuard {
211    set: Arc<Mutex<HashSet<String>>>,
212    name: String,
213}
214
215impl Drop for InFlightGuard {
216    fn drop(&mut self) {
217        // A poisoned lock still lets us recover the set and remove our entry — leaving a stale name
218        // claimed would wedge all future transfers of that name behind a phantom conflict.
219        let mut set = self.set.lock().unwrap_or_else(|p| p.into_inner());
220        set.remove(&self.name);
221    }
222}
223
224impl TaildropStore {
225    /// Create a store rooted at `root`, creating the directory (and parents) if needed.
226    pub fn new(root: impl Into<PathBuf>) -> Result<Self, TaildropError> {
227        let root = root.into();
228        std::fs::create_dir_all(&root)?;
229        Ok(Self {
230            root,
231            in_flight: Arc::new(Mutex::new(HashSet::new())),
232        })
233    }
234
235    /// Claim `base` as in-flight, returning an RAII guard that frees it on drop. Returns
236    /// [`TaildropError::FileExists`] if another transfer already holds the name — this is the
237    /// concurrency analog of the on-disk `.partial` conflict, and it serializes all transfers of one
238    /// name so a resume (`offset > 0`) cannot race a concurrent transfer's `set_len`/`seek`/`write`.
239    fn claim_in_flight(&self, base: &str) -> Result<InFlightGuard, TaildropError> {
240        let name = base.to_string();
241        let mut set = self.in_flight.lock().unwrap_or_else(|p| p.into_inner());
242        if !set.insert(name.clone()) {
243            return Err(TaildropError::FileExists);
244        }
245        Ok(InFlightGuard {
246            set: self.in_flight.clone(),
247            name,
248        })
249    }
250
251    /// The partial-file path for an already-validated base name.
252    fn partial_path(&self, base: &str) -> PathBuf {
253        self.root.join(format!("{base}{PARTIAL_SUFFIX}"))
254    }
255
256    /// Reap abandoned `.partial` files: delete every `<base>.partial` in the store root whose last
257    /// modification is older than `delete_delay` relative to `now` AND whose base name has no
258    /// in-flight transfer. Returns the number deleted. Mirrors Go `feature/taildrop/delete.go`'s
259    /// `fileDeleter`, which GCs a partial `deleteDelay` (1h) after it was last touched, sparing one
260    /// that an active put is still writing.
261    ///
262    /// This fork has no per-file timer queue (the store is a passive `Arc`, not an actor); instead a
263    /// periodic background sweep — see [`spawn_partial_reaper`] — calls this. The two Go cancellation
264    /// signals are both honored: an **active** transfer's base name is in `in_flight` (skipped here,
265    /// the analog of Go's "no active put" check), and a **resumed** transfer advances the partial's
266    /// mtime on every write (so a partial resumed within the window looks recent and is spared). A
267    /// permanently-abandoned partial is neither, so it ages out and is deleted, reclaiming the disk
268    /// and clearing the stale-partial `409` that would otherwise block an `offset == 0` re-send of
269    /// the same name forever.
270    ///
271    /// `now` and `delete_delay` are parameters (not read from the clock here) so the reap logic is
272    /// deterministically testable. A partial whose mtime is unreadable or in the future is treated as
273    /// fresh (kept) — fail-safe toward never deleting a file that might still be live.
274    pub fn reap_abandoned_partials(&self, now: SystemTime, delete_delay: Duration) -> usize {
275        let entries = match std::fs::read_dir(&self.root) {
276            Ok(e) => e,
277            Err(e) if e.kind() == io::ErrorKind::NotFound => return 0,
278            Err(e) => {
279                tracing::warn!(error = %e, "taildrop reaper: cannot read store dir");
280                return 0;
281            }
282        };
283        // Snapshot the in-flight names once; an active transfer must never have its partial reaped.
284        let in_flight: HashSet<String> = self
285            .in_flight
286            .lock()
287            .unwrap_or_else(|p| p.into_inner())
288            .clone();
289
290        let mut deleted = 0usize;
291        for entry in entries.flatten() {
292            // `metadata()` here is lstat-based (does not follow symlinks); a symlink is never a
293            // partial we created, so `is_file()` is false for it and it is skipped — consistent with
294            // the symlink refusal elsewhere in this store.
295            let Ok(meta) = entry.metadata() else { continue };
296            if !meta.is_file() {
297                continue;
298            }
299            let Ok(name) = entry.file_name().into_string() else {
300                continue;
301            };
302            let Some(base) = name.strip_suffix(PARTIAL_SUFFIX) else {
303                continue; // not a partial
304            };
305            if in_flight.contains(base) {
306                continue; // an active transfer owns this partial — never reap it (Go "no active put")
307            }
308            // Age check: keep anything modified within `delete_delay` of `now`. An unreadable or
309            // future mtime is treated as fresh (kept) — fail-safe toward not deleting a live file.
310            let age_ok = meta
311                .modified()
312                .ok()
313                .and_then(|m| now.duration_since(m).ok())
314                .is_some_and(|age| age >= delete_delay);
315            if !age_ok {
316                continue;
317            }
318            // Final guard against the snapshot TOCTOU: re-check in-flight membership under the lock
319            // immediately before deleting, so a transfer that claimed this base AFTER the upfront
320            // snapshot (the microsecond window between the snapshot and here) still spares its
321            // partial. The mtime check above already makes a wrong delete unreachable in practice (a
322            // live partial is < delete_delay old), but this closes the window completely and cheaply
323            // (one lock per aged candidate, which is rare).
324            if self
325                .in_flight
326                .lock()
327                .unwrap_or_else(|p| p.into_inner())
328                .contains(base)
329            {
330                continue;
331            }
332            match std::fs::remove_file(entry.path()) {
333                Ok(()) => {
334                    deleted += 1;
335                    tracing::info!(partial = %name, "taildrop reaper: deleted abandoned partial");
336                }
337                Err(e) if e.kind() == io::ErrorKind::NotFound => {} // already gone, fine
338                Err(e) => {
339                    tracing::warn!(error = %e, partial = %name, "taildrop reaper: delete failed")
340                }
341            }
342        }
343        deleted
344    }
345
346    /// Receive a file named `name` from `reader`, writing to `<name>.partial` then atomically
347    /// renaming to a non-clobbering final name on success. Mirrors Go `manager.PutFile`.
348    ///
349    /// `offset` lets a resumed transfer append past already-written bytes (the partial is opened, the
350    /// write starts at `offset`, and any bytes already on disk past `offset` are truncated away).
351    /// `expected_len` is the declared total length of the completed file (the request's
352    /// `Content-Length` plus `offset`); the transfer is finalized only if exactly that many bytes are
353    /// present. Returns the total number of bytes in the completed file.
354    ///
355    /// Fail-closed: an invalid name is rejected before any path is built; an in-progress partial for
356    /// the same name yields [`TaildropError::FileExists`]; an out-of-range resume `offset` (past the
357    /// current partial length) is rejected; an I/O error mid-transfer — or a body that ends before
358    /// `expected_len` (a short/interrupted stream) — leaves the `.partial` on disk and the final name
359    /// is never created. This matches Go `feature/taildrop/send.go`, which errors when the copied
360    /// length does not equal the declared length rather than publishing a truncated file.
361    ///
362    /// The retained `.partial` is resumable only by a peer that issues a ranged retry (an `offset > 0`
363    /// PUT); a sender that always restarts at `offset == 0` will instead hit the in-progress-conflict
364    /// path ([`TaildropError::FileExists`]) until the stale partial is cleared. A permanently-abandoned
365    /// partial is reclaimed by the background reaper after [`DELETE_DELAY`] (Go's `fileDeleter`
366    /// equivalent — see [`reap_abandoned_partials`](Self::reap_abandoned_partials) /
367    /// [`spawn_partial_reaper`]), which also clears that `offset == 0` conflict once the stale partial
368    /// ages out.
369    pub async fn put_file<R>(
370        &self,
371        name: &str,
372        mut reader: R,
373        offset: u64,
374        expected_len: u64,
375    ) -> Result<u64, TaildropError>
376    where
377        R: AsyncRead + Unpin,
378    {
379        let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
380        let partial = self.partial_path(base);
381
382        // Claim the name for the whole transfer FIRST, so two concurrent PUTs for the same base name
383        // (especially two resumes, `offset > 0`, which reopen the same `.partial`) cannot interleave
384        // their `set_len`/`seek`/`write_all` and corrupt the shared partial. The fresh-transfer path
385        // is already protected on disk by `create_new`, but the resume path opens with plain
386        // `write(true)` and needs this lock. The guard frees the name on drop (incl. early return /
387        // error / panic). Held across the await — it is a cheap `HashSet` membership marker, not a
388        // lock held during I/O, so it never blocks the runtime.
389        let _claim = self.claim_in_flight(base)?;
390
391        // Refuse to follow a symlink planted in the store root (Go's `O_NOFOLLOW` intent): the
392        // partial must be a regular file we create/own, never a pre-existing symlink to elsewhere.
393        refuse_symlink(&partial)?;
394
395        // A fresh transfer (offset 0) must not collide with another in-flight transfer of the same
396        // name; a resume (offset > 0) reopens the existing partial. File handles are std (the tokio
397        // `fs` feature is intentionally not enabled in this crate); the body is read async off the
398        // overlay stream and written to the blocking handle in a bounded loop.
399        let mut file = if offset == 0 {
400            match std::fs::OpenOptions::new()
401                .write(true)
402                .create_new(true)
403                .open(&partial)
404            {
405                Ok(f) => f,
406                Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
407                    return Err(TaildropError::FileExists);
408                }
409                Err(e) => return Err(e.into()),
410            }
411        } else {
412            let mut f = std::fs::OpenOptions::new().write(true).open(&partial)?;
413            // Bound the resume offset to the current partial length and truncate any bytes past it,
414            // matching Go `feature/taildrop/fileops_fs.go` (`OpenWriter` rejects `offset > curr` and
415            // `Truncate(offset)`s). Without the bound a too-large offset would leave a zero-filled
416            // sparse hole; without the truncate a shorter resumed body would leave a prior attempt's
417            // stale tail past the new end. `metadata().len()` is the partial's current size.
418            let current = f.metadata()?.len();
419            if offset > current {
420                return Err(TaildropError::Io(io::Error::new(
421                    io::ErrorKind::InvalidInput,
422                    "taildrop resume offset is past the end of the partial file",
423                )));
424            }
425            f.set_len(offset)?;
426            f.seek(io::SeekFrom::Start(offset))?;
427            f
428        };
429
430        let mut copied: u64 = 0;
431        let mut buf = [0u8; 64 * 1024];
432        loop {
433            let n = reader.read(&mut buf).await?;
434            if n == 0 {
435                break;
436            }
437            // Each `write_all` only pushes the chunk into the page cache (microseconds); the
438            // genuinely blocking cost is the terminal `flush`/`sync_all`/`rename` below, which we
439            // hand to a blocking thread so a flood of concurrent transfers can't starve the tokio
440            // worker pool on fsync (see `peerapi::MAX_INFLIGHT`).
441            file.write_all(&buf[..n])?;
442            copied += n as u64;
443        }
444
445        // Length check (Go `send.go`: error when `copyLength != length`). A body that ended before the
446        // declared length — an interrupted/short stream — must NOT be finalized as a complete file;
447        // leave the `.partial` on disk (with the bytes received so far) so a Range-capable peer can
448        // resume it. `checked_add` rather than a bare `+`: `offset` is an attacker-supplied header and
449        // the bound above already rejects an `offset` past the (real, on-disk) partial length, so this
450        // cannot overflow in practice — but treat an overflow as a length mismatch rather than a panic.
451        let total = match offset.checked_add(copied) {
452            Some(t) if t == expected_len => t,
453            _ => {
454                return Err(TaildropError::Io(io::Error::new(
455                    io::ErrorKind::UnexpectedEof,
456                    format!(
457                        "taildrop body ended early: got {copied} of {expected_len} expected bytes \
458                         at offset {offset}; leaving partial for resume"
459                    ),
460                )));
461            }
462        };
463
464        // Finalize off the async runtime: `sync_all` (fsync) and `rename` are the dominant blocking
465        // operations, so run them on a blocking thread. The `File` and both paths are owned by the
466        // closure (`Send + 'static`), and `next_available_name` (which `stat`s candidates) goes with
467        // them. Fail-closed: any I/O error — or a join failure — propagates without ever publishing
468        // the final name, leaving the `.partial` in place for a later resume.
469        let root = self.root.clone();
470        let base = base.to_string();
471        tokio::task::spawn_blocking(move || -> io::Result<()> {
472            file.flush()?;
473            file.sync_all()?;
474            drop(file);
475
476            // Atomically publish under a non-clobbering final name. `next_available_name` probes
477            // candidates with `symlink_metadata` (not `exists`, which follows symlinks), so it will
478            // not treat a planted symlink as "free" and rename onto it; and we refuse to rename onto
479            // an existing symlink target outright (Go `O_NOFOLLOW` intent). The `_claim` guard (held
480            // by the caller for the whole transfer) keeps this name serialized against other PUTs.
481            let final_name = next_available_name(&root, &base);
482            let final_path = root.join(&final_name);
483            if let Err(e) = refuse_symlink(&final_path) {
484                return Err(match e {
485                    TaildropError::Io(io_err) => io_err,
486                    other => io::Error::other(other.to_string()),
487                });
488            }
489            std::fs::rename(&partial, &final_path)?;
490            Ok(())
491        })
492        .await
493        .map_err(|join_err| {
494            // A panicked/cancelled finalize task: surface as I/O so the caller maps it to a 500 and
495            // the partial is left untouched (never publishes the final name).
496            TaildropError::Io(io::Error::other(format!(
497                "taildrop finalize task failed: {join_err}"
498            )))
499        })??;
500
501        Ok(total)
502    }
503
504    /// List fully-received (non-partial) files, sorted by name (Go `WaitingFiles`).
505    pub fn waiting_files(&self) -> Result<Vec<WaitingFile>, TaildropError> {
506        let mut out = Vec::new();
507        let entries = match std::fs::read_dir(&self.root) {
508            Ok(e) => e,
509            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
510            Err(e) => return Err(e.into()),
511        };
512        for entry in entries {
513            let entry = entry?;
514            // `entry.metadata()` does NOT follow symlinks (it is `lstat`-based), so a planted
515            // symlink has `is_file() == false` here and is skipped — a symlink in the store root is
516            // never reported as a waiting file (Go `O_NOFOLLOW` intent), even one pointing at a real
517            // regular file elsewhere.
518            let meta = entry.metadata()?;
519            if meta.file_type().is_symlink() || !meta.is_file() {
520                continue;
521            }
522            let Ok(name) = entry.file_name().into_string() else {
523                continue;
524            };
525            // Skip in-progress / tombstoned files.
526            if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
527                continue;
528            }
529            out.push(WaitingFile {
530                name,
531                size: meta.len(),
532            });
533        }
534        out.sort_by(|a, b| a.name.cmp(&b.name));
535        Ok(out)
536    }
537
538    /// Delete a fully-received file by base name (Go `DeleteFile`). The name is validated first, so a
539    /// traversal attempt can never escape the store root, and a symlink at the target is refused (Go
540    /// `O_NOFOLLOW` intent) rather than followed — a planted `root/foo.txt -> /etc/passwd` must not
541    /// let a `delete foo.txt` remove the link's target.
542    pub fn delete_file(&self, name: &str) -> Result<(), TaildropError> {
543        let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
544        let path = self.root.join(base);
545        refuse_symlink(&path)?;
546        std::fs::remove_file(path)?;
547        Ok(())
548    }
549
550    /// Open a fully-received file by base name for reading, returning the handle and its size (Go
551    /// `OpenFile`). The name is validated first, and a symlink at the target is refused (Go
552    /// `O_NOFOLLOW` intent) so a planted symlink cannot redirect the read to an arbitrary file.
553    pub fn open_file(&self, name: &str) -> Result<(std::fs::File, u64), TaildropError> {
554        let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
555        let path = self.root.join(base);
556        refuse_symlink(&path)?;
557        let f = std::fs::File::open(path)?;
558        let size = f.metadata()?.len();
559        Ok((f, size))
560    }
561}
562
563/// Spawn the background reaper that periodically GCs abandoned `.partial` files (Go
564/// `feature/taildrop/delete.go`'s `fileDeleter`). It sweeps every [`DELETE_DELAY`], deleting partials
565/// older than `DELETE_DELAY` that have no in-flight transfer (see
566/// [`TaildropStore::reap_abandoned_partials`]), and exits when `shutdown` flips to `true`.
567///
568/// Returns a [`JoinHandle`](tokio::task::JoinHandle) the caller should abort on drop so the task
569/// never outlives the runtime (the established `reauth_bridge` / `DerpLatencyMeasurer` pattern). An
570/// `Arc<TaildropStore>` is held (cheap clone), so the sweep sees live `in_flight` state.
571///
572/// The first sweep is deferred one full `DELETE_DELAY` (not run at startup): a partial on disk at
573/// boot is by definition at least 0s old, and Go likewise only deletes after the delay elapses, so
574/// waiting one interval avoids reaping a partial a just-restarted node might still resume.
575pub fn spawn_partial_reaper(
576    store: Arc<TaildropStore>,
577    mut shutdown: tokio::sync::watch::Receiver<bool>,
578) -> tokio::task::JoinHandle<()> {
579    tokio::spawn(async move {
580        let mut tick = tokio::time::interval(DELETE_DELAY);
581        // `interval` fires immediately on the first `tick()`; consume that so the first real sweep is
582        // one `DELETE_DELAY` out (no startup reap — a partial must age the full delay first).
583        tick.tick().await;
584        loop {
585            tokio::select! {
586                _ = tick.tick() => {
587                    let n = store.reap_abandoned_partials(SystemTime::now(), DELETE_DELAY);
588                    if n > 0 {
589                        tracing::info!(deleted = n, "taildrop reaper: swept abandoned partials");
590                    }
591                }
592                _ = shutdown.wait_for(|x| *x) => break,
593            }
594        }
595    })
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    fn tmp_root() -> PathBuf {
603        // A per-call atomic counter guarantees uniqueness across tests that run concurrently in the
604        // same binary. A timestamp alone is NOT enough: `SystemTime` resolution is coarse on some
605        // platforms, so two tests starting in the same tick would collide on one dir and stomp each
606        // other's files (the cause of intermittent taildrop-test flakiness under parallel runs).
607        static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
608        let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
609        let mut p = std::env::temp_dir();
610        p.push(format!("taildrop-test-{}-{n}", std::process::id()));
611        p
612    }
613
614    #[test]
615    fn validate_rejects_traversal_and_reserved() {
616        // Valid leaf names.
617        assert_eq!(validate_base_name("photo.jpg"), Some("photo.jpg"));
618        assert_eq!(
619            validate_base_name("a file with spaces.txt"),
620            Some("a file with spaces.txt")
621        );
622        assert_eq!(validate_base_name(".bashrc"), Some(".bashrc"));
623
624        // Traversal / separators.
625        assert_eq!(validate_base_name("../etc/passwd"), None);
626        assert_eq!(validate_base_name("a/b"), None);
627        assert_eq!(validate_base_name("a\\b"), None);
628        assert_eq!(validate_base_name("/abs"), None);
629        assert_eq!(validate_base_name(".."), None);
630        assert_eq!(validate_base_name("."), None);
631
632        // NUL / control.
633        assert_eq!(validate_base_name("a\0b"), None);
634        assert_eq!(validate_base_name("a\nb"), None);
635
636        // Reserved suffixes.
637        assert_eq!(validate_base_name("x.partial"), None);
638        assert_eq!(validate_base_name("x.deleted"), None);
639
640        // Edges.
641        assert_eq!(validate_base_name(""), None);
642        assert_eq!(validate_base_name(" leading"), None);
643        assert_eq!(validate_base_name("trailing "), None);
644        assert_eq!(validate_base_name(&"a".repeat(256)), None);
645        assert_eq!(
646            validate_base_name(&"a".repeat(255)).map(|s| s.len()),
647            Some(255)
648        );
649    }
650
651    #[tokio::test]
652    async fn put_file_writes_then_atomically_renames() {
653        let root = tmp_root();
654        let store = TaildropStore::new(&root).unwrap();
655
656        let data = b"hello taildrop";
657        let n = store
658            .put_file("greeting.txt", &data[..], 0, data.len() as u64)
659            .await
660            .unwrap();
661        assert_eq!(n, data.len() as u64);
662
663        // The final file exists; no .partial remains.
664        let body = std::fs::read(root.join("greeting.txt")).unwrap();
665        assert_eq!(body, data);
666        assert!(!root.join("greeting.txt.partial").exists());
667
668        let wf = store.waiting_files().unwrap();
669        assert_eq!(wf.len(), 1);
670        assert_eq!(wf[0].name, "greeting.txt");
671        assert_eq!(wf[0].size, data.len() as u64);
672
673        std::fs::remove_dir_all(&root).ok();
674    }
675
676    #[tokio::test]
677    async fn put_file_resumes_from_offset() {
678        let root = tmp_root();
679        let store = TaildropStore::new(&root).unwrap();
680
681        // Pre-write a prefix into the `.partial` directly, simulating bytes already received by an
682        // earlier (interrupted) transfer.
683        let prefix = b"the first half ";
684        let partial = root.join("resume.txt.partial");
685        std::fs::write(&partial, prefix).unwrap();
686
687        // Resume at offset == the prefix length: `put_file` opens the existing partial, seeks past
688        // the prefix, and appends the rest.
689        let rest = b"and the second half";
690        let total = store
691            .put_file(
692                "resume.txt",
693                &rest[..],
694                prefix.len() as u64,
695                (prefix.len() + rest.len()) as u64,
696            )
697            .await
698            .unwrap();
699
700        // The returned count is offset + freshly-copied bytes, and the final file is the prefix and
701        // the resumed bytes concatenated (the seek positioned the write correctly).
702        assert_eq!(total, (prefix.len() + rest.len()) as u64);
703        let body = std::fs::read(root.join("resume.txt")).unwrap();
704        let mut expected = prefix.to_vec();
705        expected.extend_from_slice(rest);
706        assert_eq!(body, expected);
707        assert!(!partial.exists());
708
709        std::fs::remove_dir_all(&root).ok();
710    }
711
712    #[tokio::test]
713    async fn put_file_short_body_leaves_partial_not_truncated_final() {
714        // F2: a body that ends before the declared length must NOT be finalized as a complete (but
715        // truncated) file under the real name. Go errors when copyLength != length; we leave the
716        // `.partial` in place for resume.
717        let root = tmp_root();
718        let store = TaildropStore::new(&root).unwrap();
719
720        // Reader yields 5 bytes but we declare 10 expected (a short/interrupted stream).
721        let err = store
722            .put_file("short.txt", &b"world"[..], 0, 10)
723            .await
724            .unwrap_err();
725        assert!(
726            matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::UnexpectedEof),
727            "a short body must error, got {err:?}"
728        );
729        // The final name was NEVER created; the partial remains with the bytes received so far.
730        assert!(!root.join("short.txt").exists(), "no truncated final file");
731        let partial = std::fs::read(root.join("short.txt.partial")).unwrap();
732        assert_eq!(
733            partial, b"world",
734            "partial holds the received prefix for resume"
735        );
736        assert!(store.waiting_files().unwrap().is_empty());
737
738        std::fs::remove_dir_all(&root).ok();
739    }
740
741    #[tokio::test]
742    async fn put_file_resume_offset_past_end_is_rejected() {
743        // F3: a resume offset beyond the current partial length must be rejected (Go errors
744        // "offset out of range"), not produce a zero-filled sparse hole.
745        let root = tmp_root();
746        let store = TaildropStore::new(&root).unwrap();
747        std::fs::write(root.join("sparse.txt.partial"), b"abc").unwrap(); // 3 bytes on disk
748
749        let err = store
750            .put_file("sparse.txt", &b"xyz"[..], 99, 102)
751            .await
752            .unwrap_err();
753        assert!(
754            matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
755            "offset past end must be rejected, got {err:?}"
756        );
757        // The partial is untouched (still 3 bytes), no final file.
758        assert_eq!(
759            std::fs::read(root.join("sparse.txt.partial")).unwrap(),
760            b"abc"
761        );
762        assert!(!root.join("sparse.txt").exists());
763
764        std::fs::remove_dir_all(&root).ok();
765    }
766
767    #[tokio::test]
768    async fn put_file_resume_truncates_stale_tail() {
769        // F3: resuming at an offset LESS than the current partial length must truncate the bytes
770        // past the offset (Go `Truncate(offset)`), so a stale tail from a prior attempt cannot
771        // survive past the newly-written end.
772        let root = tmp_root();
773        let store = TaildropStore::new(&root).unwrap();
774        // A prior attempt left 20 bytes; we resume at offset 5 with a 3-byte tail ⇒ final is 8 bytes.
775        std::fs::write(root.join("retry.txt.partial"), b"KEEPme-STALE-TAILxxx").unwrap();
776
777        let total = store
778            .put_file("retry.txt", &b"NEW"[..], 5, 8)
779            .await
780            .unwrap();
781        assert_eq!(total, 8);
782        let body = std::fs::read(root.join("retry.txt")).unwrap();
783        assert_eq!(
784            body, b"KEEPmNEW",
785            "bytes past offset 5 truncated, then NEW appended"
786        );
787
788        std::fs::remove_dir_all(&root).ok();
789    }
790
791    #[tokio::test]
792    async fn put_file_conflict_picks_non_clobbering_name() {
793        let root = tmp_root();
794        let store = TaildropStore::new(&root).unwrap();
795
796        store
797            .put_file("dup.txt", &b"first"[..], 0, 5)
798            .await
799            .unwrap();
800        store
801            .put_file("dup.txt", &b"second"[..], 0, 6)
802            .await
803            .unwrap();
804        store
805            .put_file("dup.txt", &b"third"[..], 0, 5)
806            .await
807            .unwrap();
808
809        // Original plus two non-clobbering renames.
810        assert!(root.join("dup.txt").exists());
811        assert!(root.join("dup (1).txt").exists());
812        assert!(root.join("dup (2).txt").exists());
813
814        let wf = store.waiting_files().unwrap();
815        assert_eq!(wf.len(), 3);
816
817        std::fs::remove_dir_all(&root).ok();
818    }
819
820    #[tokio::test]
821    async fn put_file_in_progress_partial_is_conflict() {
822        let root = tmp_root();
823        let store = TaildropStore::new(&root).unwrap();
824
825        // Simulate an in-flight transfer by pre-creating the .partial file.
826        std::fs::write(root.join("busy.txt.partial"), b"partial").unwrap();
827
828        let err = store
829            .put_file("busy.txt", &b"x"[..], 0, 1)
830            .await
831            .unwrap_err();
832        assert!(matches!(err, TaildropError::FileExists));
833
834        std::fs::remove_dir_all(&root).ok();
835    }
836
837    #[tokio::test]
838    async fn put_file_rejects_bad_name_before_any_io() {
839        let root = tmp_root();
840        let store = TaildropStore::new(&root).unwrap();
841
842        let err = store
843            .put_file("../escape", &b"x"[..], 0, 1)
844            .await
845            .unwrap_err();
846        assert!(matches!(err, TaildropError::InvalidFileName));
847        // Nothing was written anywhere.
848        assert!(store.waiting_files().unwrap().is_empty());
849
850        std::fs::remove_dir_all(&root).ok();
851    }
852
853    #[tokio::test]
854    async fn delete_and_open_roundtrip() {
855        let root = tmp_root();
856        let store = TaildropStore::new(&root).unwrap();
857
858        store.put_file("doc.bin", &b"abc"[..], 0, 3).await.unwrap();
859        let (_f, size) = store.open_file("doc.bin").unwrap();
860        assert_eq!(size, 3);
861
862        store.delete_file("doc.bin").unwrap();
863        assert!(store.waiting_files().unwrap().is_empty());
864
865        // Traversal can't reach outside the root.
866        assert!(matches!(
867            store.delete_file("../../etc/passwd"),
868            Err(TaildropError::InvalidFileName)
869        ));
870
871        std::fs::remove_dir_all(&root).ok();
872    }
873
874    #[tokio::test]
875    async fn concurrent_resume_for_same_name_is_serialized() {
876        // The in-flight name guard: while one transfer holds a base name, a second PUT for the SAME
877        // name (the resume-race the lock closes) is rejected with FileExists rather than interleaving
878        // writes into the shared `.partial`. We hold the first transfer open with a reader that never
879        // completes until we let it, then fire the second concurrently.
880        let root = tmp_root();
881        let store = Arc::new(TaildropStore::new(&root).unwrap());
882
883        // A reader that delivers a byte, then blocks until released — keeps transfer #1 in flight
884        // (and thus the name claimed) while we attempt transfer #2.
885        let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
886        struct BlockingReader {
887            sent: bool,
888            release: Option<tokio::sync::oneshot::Receiver<()>>,
889        }
890        impl AsyncRead for BlockingReader {
891            fn poll_read(
892                mut self: std::pin::Pin<&mut Self>,
893                cx: &mut std::task::Context<'_>,
894                buf: &mut tokio::io::ReadBuf<'_>,
895            ) -> std::task::Poll<io::Result<()>> {
896                if !self.sent {
897                    buf.put_slice(b"x");
898                    self.sent = true;
899                    return std::task::Poll::Ready(Ok(()));
900                }
901                // After the first byte, park until released, then report EOF.
902                match self.release.as_mut() {
903                    Some(rx) => match std::pin::Pin::new(rx).poll(cx) {
904                        std::task::Poll::Ready(_) => {
905                            self.release = None;
906                            std::task::Poll::Ready(Ok(())) // EOF (no bytes written)
907                        }
908                        std::task::Poll::Pending => std::task::Poll::Pending,
909                    },
910                    None => std::task::Poll::Ready(Ok(())),
911                }
912            }
913        }
914
915        let s1 = store.clone();
916        let t1 = tokio::spawn(async move {
917            let reader = BlockingReader {
918                sent: false,
919                release: Some(release_rx),
920            };
921            // expected_len 1: completes once the single byte is read and the reader returns EOF.
922            s1.put_file("race.bin", reader, 0, 1).await
923        });
924
925        // Wait until transfer #1 has actually claimed the name (its partial exists).
926        let partial = root.join("race.bin.partial");
927        for _ in 0..200 {
928            if partial.exists() {
929                break;
930            }
931            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
932        }
933        assert!(
934            partial.exists(),
935            "transfer #1 should have created the partial"
936        );
937
938        // Transfer #2 for the SAME name, while #1 still holds the claim → FileExists, no interleave.
939        let err = store
940            .put_file("race.bin", &b"yy"[..], 1, 3)
941            .await
942            .unwrap_err();
943        assert!(
944            matches!(err, TaildropError::FileExists),
945            "a concurrent transfer for an in-flight name must be rejected, got {err:?}"
946        );
947
948        // Release #1 so it finalizes cleanly, and confirm the name frees afterward.
949        release_tx.send(()).unwrap();
950        t1.await.unwrap().unwrap();
951        // Now the name is free: a fresh transfer succeeds (the guard was released on drop).
952        store.put_file("race.bin", &b"z"[..], 0, 1).await.unwrap();
953
954        std::fs::remove_dir_all(&root).ok();
955    }
956
957    #[cfg(unix)]
958    #[tokio::test]
959    async fn symlink_in_store_root_is_refused_not_followed() {
960        use std::os::unix::fs::symlink;
961
962        let root = tmp_root();
963        let store = TaildropStore::new(&root).unwrap();
964
965        // An attacker-planted symlink in the store root, pointing at a sensitive file OUTSIDE it.
966        let outside = tmp_root();
967        std::fs::create_dir_all(&outside).unwrap();
968        let secret = outside.join("secret");
969        std::fs::write(&secret, b"TOP SECRET").unwrap();
970
971        // (a) open_file must refuse a symlink target, never read through it.
972        let link = root.join("link.txt");
973        symlink(&secret, &link).unwrap();
974        let open_err = store.open_file("link.txt").unwrap_err();
975        assert!(
976            matches!(open_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
977            "open_file must refuse a symlink, got {open_err:?}"
978        );
979
980        // (b) delete_file must refuse the symlink, leaving BOTH the link and its target intact.
981        let del_err = store.delete_file("link.txt").unwrap_err();
982        assert!(
983            matches!(del_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
984            "delete_file must refuse a symlink, got {del_err:?}"
985        );
986        assert!(
987            secret.exists(),
988            "the symlink target must NOT have been deleted"
989        );
990        assert_eq!(std::fs::read(&secret).unwrap(), b"TOP SECRET");
991
992        // (c) waiting_files must not report the symlink as a waiting file.
993        assert!(
994            store.waiting_files().unwrap().is_empty(),
995            "a symlink in the store root must not be listed as a waiting file"
996        );
997
998        // (d) put_file onto a symlinked partial must refuse rather than write through the link.
999        let link_partial = root.join("evil.bin.partial");
1000        symlink(&secret, &link_partial).unwrap();
1001        let put_err = store
1002            .put_file("evil.bin", &b"data"[..], 0, 4)
1003            .await
1004            .unwrap_err();
1005        assert!(
1006            matches!(put_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
1007            "put_file must refuse a symlinked partial, got {put_err:?}"
1008        );
1009        assert_eq!(
1010            std::fs::read(&secret).unwrap(),
1011            b"TOP SECRET",
1012            "the symlink target must NOT have been written through"
1013        );
1014
1015        std::fs::remove_dir_all(&root).ok();
1016        std::fs::remove_dir_all(&outside).ok();
1017    }
1018
1019    #[test]
1020    fn next_available_name_inserts_before_extension() {
1021        let root = tmp_root();
1022        std::fs::create_dir_all(&root).unwrap();
1023        assert_eq!(next_available_name(&root, "a.txt"), "a.txt");
1024        std::fs::write(root.join("a.txt"), b"x").unwrap();
1025        assert_eq!(next_available_name(&root, "a.txt"), "a (1).txt");
1026        std::fs::write(root.join("a (1).txt"), b"x").unwrap();
1027        assert_eq!(next_available_name(&root, "a.txt"), "a (2).txt");
1028        // Dotfile (no real extension) appends at end.
1029        std::fs::write(root.join(".env"), b"x").unwrap();
1030        assert_eq!(next_available_name(&root, ".env"), ".env (1)");
1031        std::fs::remove_dir_all(&root).ok();
1032    }
1033
1034    /// The reaper deletes a `.partial` older than `delete_delay` and keeps a fresh one. Aging is
1035    /// driven by the passed-in `now` (the partial's real mtime is ~now): `now + 2h` with a 1h delay
1036    /// makes an existing partial look 2h old (reaped); `now` with a 1h delay keeps it (~0s old).
1037    #[test]
1038    fn reaper_deletes_aged_partial_keeps_fresh_and_final() {
1039        let root = tmp_root();
1040        let store = TaildropStore::new(&root).unwrap();
1041
1042        std::fs::write(root.join("abandoned.bin.partial"), b"half").unwrap();
1043        std::fs::write(root.join("done.bin"), b"complete").unwrap(); // a finished file, not a partial
1044
1045        let now = SystemTime::now();
1046        let delay = Duration::from_secs(3600);
1047
1048        // Fresh: nothing is older than the delay yet.
1049        assert_eq!(
1050            store.reap_abandoned_partials(now, delay),
1051            0,
1052            "nothing aged out"
1053        );
1054        assert!(root.join("abandoned.bin.partial").exists());
1055
1056        // Aged: 2h in the future vs a 1h delay → the partial is reaped, the final file is untouched.
1057        let reaped = store.reap_abandoned_partials(now + Duration::from_secs(2 * 3600), delay);
1058        assert_eq!(reaped, 1, "the aged partial is reaped");
1059        assert!(
1060            !root.join("abandoned.bin.partial").exists(),
1061            "aged partial deleted"
1062        );
1063        assert!(
1064            root.join("done.bin").exists(),
1065            "a completed (non-partial) file must never be reaped"
1066        );
1067
1068        std::fs::remove_dir_all(&root).ok();
1069    }
1070
1071    /// An in-flight transfer's partial is NEVER reaped, even when aged (Go's "no active put" check):
1072    /// while a base name is claimed, the reaper skips its partial regardless of mtime.
1073    #[tokio::test]
1074    async fn reaper_spares_in_flight_partial() {
1075        let root = tmp_root();
1076        let store = TaildropStore::new(&root).unwrap();
1077
1078        // Pre-write an (old) partial and claim its base name as in-flight.
1079        std::fs::write(root.join("live.bin.partial"), b"in progress").unwrap();
1080        let _claim = store.claim_in_flight("live.bin").expect("claim");
1081
1082        // Even with a far-future `now` (well past the delay), the in-flight partial is spared.
1083        let reaped = store.reap_abandoned_partials(
1084            SystemTime::now() + Duration::from_secs(48 * 3600),
1085            Duration::from_secs(3600),
1086        );
1087        assert_eq!(reaped, 0, "an in-flight partial must not be reaped");
1088        assert!(
1089            root.join("live.bin.partial").exists(),
1090            "the in-flight partial survives the sweep"
1091        );
1092
1093        // Once the claim drops, the same aged partial IS reaped.
1094        drop(_claim);
1095        let reaped = store.reap_abandoned_partials(
1096            SystemTime::now() + Duration::from_secs(48 * 3600),
1097            Duration::from_secs(3600),
1098        );
1099        assert_eq!(
1100            reaped, 1,
1101            "after the claim drops, the aged partial is reaped"
1102        );
1103
1104        std::fs::remove_dir_all(&root).ok();
1105    }
1106
1107    /// The background reaper task: (1) does NOT reap at startup, and (2) terminates when shutdown
1108    /// flips true. With the real DELETE_DELAY (1h) interval, the first real sweep is an hour out, so
1109    /// within this millisecond-scale test no sweep ever fires — which is exactly what proves both
1110    /// "the startup tick is skipped" (an aged partial present at start survives) and "the task is
1111    /// parked on the interval, woken only by shutdown". (`test-util`/paused-time is intentionally not
1112    /// pulled into this crate's deps, so this uses real time and the fact that 1h >> the test.)
1113    #[tokio::test]
1114    async fn reaper_task_skips_startup_then_terminates_on_shutdown() {
1115        let root = tmp_root();
1116        let store = Arc::new(TaildropStore::new(&root).unwrap());
1117        // A partial present at boot — it must survive (no startup reap fires within the test window).
1118        std::fs::write(root.join("boot.bin.partial"), b"x").unwrap();
1119
1120        let (tx, rx) = tokio::sync::watch::channel(false);
1121        let handle = spawn_partial_reaper(store.clone(), rx);
1122
1123        // Give the task a moment to reach its select!; the first real sweep is DELETE_DELAY (1h) out,
1124        // so nothing is reaped here.
1125        tokio::time::sleep(Duration::from_millis(50)).await;
1126        assert!(
1127            root.join("boot.bin.partial").exists(),
1128            "no reap before the first real sweep (startup tick is skipped; first sweep is 1h out)"
1129        );
1130
1131        // Flip shutdown: the task must terminate via its select! shutdown arm, not hang on the 1h
1132        // interval. If the shutdown arm were broken this `timeout` would elapse (the next tick is ~1h
1133        // away), so the timeout firing IS the regression signal.
1134        tx.send_replace(true);
1135        tokio::time::timeout(Duration::from_secs(5), handle)
1136            .await
1137            .expect("reaper must terminate on shutdown (not wait for the 1h interval)")
1138            .expect("reaper task must not panic");
1139
1140        std::fs::remove_dir_all(&root).ok();
1141    }
1142
1143    /// A reaper spawned when shutdown is ALREADY true terminates on its first loop iteration
1144    /// (`watch::Receiver::wait_for` returns immediately when the predicate already holds), rather
1145    /// than waiting a full DELETE_DELAY (1h) for the first tick.
1146    #[tokio::test]
1147    async fn reaper_task_terminates_when_shutdown_already_set() {
1148        let root = tmp_root();
1149        let store = Arc::new(TaildropStore::new(&root).unwrap());
1150        let (_tx, rx) = tokio::sync::watch::channel(true); // already shut down
1151        let handle = spawn_partial_reaper(store, rx);
1152        tokio::time::timeout(Duration::from_secs(5), handle)
1153            .await
1154            .expect("a reaper spawned post-shutdown must exit promptly, not wait the 1h interval")
1155            .expect("reaper task must not panic");
1156        std::fs::remove_dir_all(&root).ok();
1157    }
1158}