Skip to main content

ts_runtime/
taildrop.rs

1//! Taildrop file store — the receiving half of Tailscale's peer-to-peer file transfer.
2//!
3//! A peer sends a file to this node via the peerAPI route `PUT /v0/put/<name>` (handled in
4//! `peerapi`). This module owns the on-disk store those puts land in, faithfully mirroring
5//! Go's `taildrop.manager`:
6//!
7//! - Incoming bytes are written to a per-transfer **partial** file (`<base>.partial`) so an
8//!   interrupted transfer never exposes a truncated file under its real name, and can be resumed
9//!   from an offset.
10//! - On successful completion the partial is **atomically renamed** to the final base name. If the
11//!   final name already exists, a non-clobbering ` (n)` suffix is chosen (Go `nextFilename`).
12//! - File names are strictly validated ([`validate_base_name`](crate::taildrop::validate_base_name)) to defeat path traversal and
13//!   reserved-suffix abuse before any path is constructed — this is the security boundary.
14//!
15//! # Anti-abuse / safety
16//!
17//! Every name is validated to be a single, local, non-traversing path component before it touches
18//! the filesystem; a name containing `/`, `\`, `..`, a NUL, control chars, or the reserved
19//! `.partial` / `.deleted` suffixes is rejected with [`TaildropError::InvalidFileName`](crate::taildrop::TaildropError::InvalidFileName). The store
20//! root is fixed at construction; all I/O is confined to it by joining only validated base names.
21
22use std::{
23    collections::HashSet,
24    io::{self, Seek, Write},
25    path::{Path, PathBuf},
26    sync::{Arc, Mutex},
27    time::{Duration, SystemTime},
28};
29
30use tokio::io::{AsyncRead, AsyncReadExt};
31
32/// How long an abandoned `.partial` is kept before the reaper deletes it (Go
33/// `feature/taildrop/delete.go` `deleteDelay = time.Hour`). A resume within the window keeps the
34/// partial alive (its mtime advances on every write, and an in-flight transfer is skipped outright),
35/// so this is the grace period for a transfer to be resumed before its leftovers are reclaimed.
36pub const DELETE_DELAY: Duration = Duration::from_secs(60 * 60);
37
38/// Suffix for in-progress transfers. A completed transfer is renamed off this suffix; a name
39/// ending in it is itself never accepted as a base name (Go `partialSuffix`).
40const PARTIAL_SUFFIX: &str = ".partial";
41/// Suffix Go uses to tombstone files pending deletion on platforms with async close; we reject it
42/// as a base name for parity so a sender can't create one (Go `deletedSuffix`).
43const DELETED_SUFFIX: &str = ".deleted";
44/// Maximum base-name length in bytes (Go `validateBaseName`: 255).
45const MAX_BASE_NAME_LEN: usize = 255;
46
47/// Errors from the Taildrop file store.
48#[derive(Debug)]
49pub enum TaildropError {
50    /// The requested file name is invalid (traversal, reserved suffix, empty, too long, bad runes).
51    /// Maps to peerAPI `400 Bad Request`.
52    InvalidFileName,
53    /// A transfer for this exact base name is already in progress. Maps to peerAPI `409 Conflict`.
54    FileExists,
55    /// Underlying filesystem I/O failure. Maps to peerAPI `500`.
56    Io(io::Error),
57}
58
59impl core::fmt::Display for TaildropError {
60    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
61        match self {
62            TaildropError::InvalidFileName => write!(f, "invalid taildrop file name"),
63            TaildropError::FileExists => {
64                write!(f, "a transfer for this file is already in progress")
65            }
66            TaildropError::Io(e) => write!(f, "taildrop I/O error: {e}"),
67        }
68    }
69}
70
71impl std::error::Error for TaildropError {
72    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
73        match self {
74            TaildropError::Io(e) => Some(e),
75            _ => None,
76        }
77    }
78}
79
80impl From<io::Error> for TaildropError {
81    fn from(e: io::Error) -> Self {
82        TaildropError::Io(e)
83    }
84}
85
86/// A waiting (fully-received) Taildrop file, as reported to the embedder. Mirrors Go
87/// `apitype.WaitingFile` (default field-name JSON marshalling: `Name`, `Size`).
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct WaitingFile {
90    /// The file's base name.
91    pub name: String,
92    /// The file's size in bytes.
93    pub size: u64,
94}
95
96/// Validate a Taildrop base name, mirroring Go `taildrop.validateBaseName`.
97///
98/// Returns the name unchanged when it is a safe, single, local path component; otherwise `None`.
99/// Rejection rules (any one fails): empty or `> 255` bytes; leading/trailing ASCII space; contains
100/// a path separator (`/` or `\`), a NUL, or an ASCII control char; is `.` or `..`; equals a cleaned
101/// path other than itself (catches embedded `..`/`.` segments and absolute paths); or ends in the
102/// reserved `.partial` / `.deleted` suffixes.
103pub fn validate_base_name(name: &str) -> Option<&str> {
104    if name.is_empty() || name.len() > MAX_BASE_NAME_LEN {
105        return None;
106    }
107    if name.starts_with(' ') || name.ends_with(' ') {
108        return None;
109    }
110    if name == "." || name == ".." {
111        return None;
112    }
113    if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
114        return None;
115    }
116    // Reject any separator, NUL, or control character outright. This is the core traversal guard:
117    // with no `/`, `\`, or `..` segment possible, the name can only ever be a leaf in the store dir.
118    for ch in name.chars() {
119        if ch == '/' || ch == '\\' || ch == '\0' || ch.is_control() {
120            return None;
121        }
122    }
123    // Defense in depth: a name that does not survive `Path` normalization as a single normal
124    // component is rejected (catches `..`, absolute paths, and any platform-specific oddity).
125    let p = Path::new(name);
126    let mut comps = p.components();
127    match (comps.next(), comps.next()) {
128        (Some(std::path::Component::Normal(c)), None) if c == name => Some(name),
129        _ => None,
130    }
131}
132
133/// Choose a non-clobbering final name for `base` within `dir`, mirroring Go `nextFilename`:
134/// `foo.txt` -> `foo (1).txt` -> `foo (2).txt` ... inserting ` (n)` before the extension. Returns
135/// the first candidate (incl. `base` itself) whose path does not yet exist. Bounded to avoid an
136/// unbounded loop on a pathological directory.
137fn next_available_name(dir: &Path, base: &str) -> String {
138    if !path_present(&dir.join(base)) {
139        return base.to_string();
140    }
141    let (stem, ext) = match base.rsplit_once('.') {
142        // Keep the dot with the extension; an empty stem (dotfile like ".bashrc") has no split.
143        Some((stem, ext)) if !stem.is_empty() => (stem, format!(".{ext}")),
144        _ => (base, String::new()),
145    };
146    for n in 1..=10_000u32 {
147        let candidate = format!("{stem} ({n}){ext}");
148        if !path_present(&dir.join(&candidate)) {
149            return candidate;
150        }
151    }
152    // Pathological fallback: suffix with a high counter; extremely unlikely to be reached.
153    format!("{stem} (overflow){ext}")
154}
155
156/// Whether a path is present, treating a symlink (even a dangling one) as present. Unlike
157/// `Path::exists()` (which follows the link and returns `false` for a dangling symlink), this uses
158/// `symlink_metadata` so a planted symlink can never be mistaken for a free name in
159/// [`next_available_name`] — we must not select, then rename onto, a symlink.
160fn path_present(path: &Path) -> bool {
161    std::fs::symlink_metadata(path).is_ok()
162}
163
164/// Reject a path that is (or whose final component is) a symlink. This is hardening **beyond**
165/// upstream Go, whose taildrop (`feature/taildrop/fileops_fs.go` at v1.100.0) opens with a plain
166/// `os.OpenFile(O_CREATE|O_RDWR)` / `os.Open` and refuses symlinks nowhere — it relies on name
167/// validation (`joinDir`) alone. `validate_base_name` already blocks a traversing *name*,
168/// but not a symlink **component already planted in the store root** by a local attacker (e.g.
169/// `root/foo.txt -> /etc/cron.d/x`), which a plain `open`/`rename`/`remove` would follow. Uses
170/// `symlink_metadata` (lstat — does NOT follow the final symlink); a non-existent path is fine
171/// (returns `Ok(())`), only an existing symlink is refused.
172///
173/// This is a check-then-act guard, so it is not atomic with the open/rename/remove that follows.
174/// It kills the **persistent-plant** attack (a symlink left in the store root is no longer followed
175/// deterministically), and the per-name in-flight lock serializes our OWN operations on a name, but
176/// on its own it does not close a sub-millisecond race where an external process swaps the path for
177/// a symlink between this lstat and the syscall. The two paths that actually *open* a store file —
178/// the `offset > 0` resume write-open and the read-open — additionally pass `O_NOFOLLOW`
179/// ([`open_nofollow`]) so the kernel refuses a final-component symlink atomically, closing that
180/// residual race; `refuse_symlink` is retained ahead of them as a
181/// portable defense-in-depth check that also yields a clean typed error. The `offset == 0` put is
182/// atomically protected by `create_new` (`O_EXCL`, which refuses an existing symlink), and the
183/// non-opening ops (`rename`/`remove_file`/`read_dir`) act on the link itself rather than following
184/// it, so the advisory check is sufficient there. The residual external-swap window therefore only
185/// requires an external writer who already holds store-dir write access (the threat bound for this
186/// hardening).
187fn refuse_symlink(path: &Path) -> Result<(), TaildropError> {
188    match std::fs::symlink_metadata(path) {
189        Ok(meta) if meta.file_type().is_symlink() => Err(TaildropError::Io(io::Error::new(
190            io::ErrorKind::InvalidInput,
191            "taildrop path is a symlink; refusing to follow it",
192        ))),
193        Ok(_) => Ok(()),
194        // Not present yet (the common case for a fresh partial / final name) — nothing to refuse.
195        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
196        Err(e) => Err(e.into()),
197    }
198}
199
200/// Open an existing store file with the given [`OpenOptions`](std::fs::OpenOptions), refusing a
201/// final-component symlink **atomically** in the kernel via `O_NOFOLLOW` on Unix. This is the atomic
202/// counterpart to the advisory [`refuse_symlink`] check: where `refuse_symlink` lstat's the path
203/// first (a check-then-act guard with a sub-millisecond swap window), `O_NOFOLLOW` makes the kernel
204/// fail the `open` itself (`ELOOP`) if the final path component is a symlink, so an external process
205/// cannot win a race by swapping the path for a symlink after the lstat. This is fork hardening with
206/// no upstream-Go equivalent (Go's taildrop opens without `O_NOFOLLOW`).
207///
208/// On non-Unix targets `O_NOFOLLOW` has no portable equivalent, so this is a plain `open` and the
209/// preceding `refuse_symlink` advisory check is the only symlink defense there (Windows does not use
210/// the Unix symlink threat model for this store).
211fn open_nofollow(opts: &mut std::fs::OpenOptions, path: &Path) -> io::Result<std::fs::File> {
212    #[cfg(unix)]
213    {
214        use std::os::unix::fs::OpenOptionsExt;
215        // `custom_flags` sets the raw `open(2)` flag set, which `open()` then ORs with the access
216        // mode (`O_RDONLY`/`O_WRONLY`) derived from `.read`/`.write` — so the access mode is
217        // preserved alongside `O_NOFOLLOW`. `O_NOFOLLOW` makes the kernel return `ELOOP` instead of
218        // following a final-component symlink. It does not affect non-final components, but the store
219        // root is fixed and names are validated to a single component, so the final component is the
220        // only attacker-influenceable one.
221        opts.custom_flags(libc::O_NOFOLLOW);
222    }
223    opts.open(path)
224}
225
226/// A Taildrop file store rooted at a fixed directory. All operations are confined to this root by
227/// joining only [`validate_base_name`]-validated names.
228#[derive(Debug, Clone)]
229pub struct TaildropStore {
230    root: PathBuf,
231    /// Base names with a transfer currently in flight. A `put_file` claims its base name here for
232    /// the whole receive (both a fresh `offset == 0` transfer and a resumed `offset > 0` one), so
233    /// two concurrent PUTs for the same name cannot interleave `set_len`/`seek`/`write_all` and
234    /// corrupt the shared `.partial`. Shared (`Arc`) so it survives `TaildropStore::clone()` — the
235    /// store is handed around as `Arc<TaildropStore>` but cloning it must not fork the guard set.
236    in_flight: Arc<Mutex<HashSet<String>>>,
237}
238
239/// RAII claim on an in-flight transfer name; releasing it (on drop) frees the name for the next
240/// transfer. Holds the shared guard set so the entry is removed even on an early return / error /
241/// panic in `put_file`.
242struct InFlightGuard {
243    set: Arc<Mutex<HashSet<String>>>,
244    name: String,
245}
246
247impl Drop for InFlightGuard {
248    fn drop(&mut self) {
249        // A poisoned lock still lets us recover the set and remove our entry — leaving a stale name
250        // claimed would wedge all future transfers of that name behind a phantom conflict.
251        let mut set = self.set.lock().unwrap_or_else(|p| p.into_inner());
252        set.remove(&self.name);
253    }
254}
255
256impl TaildropStore {
257    /// Create a store rooted at `root`, creating the directory (and parents) if needed.
258    pub fn new(root: impl Into<PathBuf>) -> Result<Self, TaildropError> {
259        let root = root.into();
260        std::fs::create_dir_all(&root)?;
261        Ok(Self {
262            root,
263            in_flight: Arc::new(Mutex::new(HashSet::new())),
264        })
265    }
266
267    /// Claim `base` as in-flight, returning an RAII guard that frees it on drop. Returns
268    /// [`TaildropError::FileExists`] if another transfer already holds the name — this is the
269    /// concurrency analog of the on-disk `.partial` conflict, and it serializes all transfers of one
270    /// name so a resume (`offset > 0`) cannot race a concurrent transfer's `set_len`/`seek`/`write`.
271    fn claim_in_flight(&self, base: &str) -> Result<InFlightGuard, TaildropError> {
272        let name = base.to_string();
273        let mut set = self.in_flight.lock().unwrap_or_else(|p| p.into_inner());
274        if !set.insert(name.clone()) {
275            return Err(TaildropError::FileExists);
276        }
277        Ok(InFlightGuard {
278            set: self.in_flight.clone(),
279            name,
280        })
281    }
282
283    /// The partial-file path for an already-validated base name.
284    fn partial_path(&self, base: &str) -> PathBuf {
285        self.root.join(format!("{base}{PARTIAL_SUFFIX}"))
286    }
287
288    /// Reap abandoned `.partial` files: delete every `<base>.partial` in the store root whose last
289    /// modification is older than `delete_delay` relative to `now` AND whose base name has no
290    /// in-flight transfer. Returns the number deleted. Mirrors Go `feature/taildrop/delete.go`'s
291    /// `fileDeleter`, which GCs a partial `deleteDelay` (1h) after it was last touched, sparing one
292    /// that an active put is still writing.
293    ///
294    /// This fork has no per-file timer queue (the store is a passive `Arc`, not an actor); instead a
295    /// periodic background sweep — see [`spawn_partial_reaper`] — calls this. The two Go cancellation
296    /// signals are both honored: an **active** transfer's base name is in `in_flight` (skipped here,
297    /// the analog of Go's "no active put" check), and a **resumed** transfer advances the partial's
298    /// mtime on every write (so a partial resumed within the window looks recent and is spared). A
299    /// permanently-abandoned partial is neither, so it ages out and is deleted, reclaiming the disk
300    /// and clearing the stale-partial `409` that would otherwise block an `offset == 0` re-send of
301    /// the same name forever.
302    ///
303    /// `now` and `delete_delay` are parameters (not read from the clock here) so the reap logic is
304    /// deterministically testable. A partial whose mtime is unreadable or in the future is treated as
305    /// fresh (kept) — fail-safe toward never deleting a file that might still be live.
306    pub fn reap_abandoned_partials(&self, now: SystemTime, delete_delay: Duration) -> usize {
307        let entries = match std::fs::read_dir(&self.root) {
308            Ok(e) => e,
309            Err(e) if e.kind() == io::ErrorKind::NotFound => return 0,
310            Err(e) => {
311                tracing::warn!(error = %e, "taildrop reaper: cannot read store dir");
312                return 0;
313            }
314        };
315        // Snapshot the in-flight names once; an active transfer must never have its partial reaped.
316        let in_flight: HashSet<String> = self
317            .in_flight
318            .lock()
319            .unwrap_or_else(|p| p.into_inner())
320            .clone();
321
322        let mut deleted = 0usize;
323        for entry in entries.flatten() {
324            // `metadata()` here is lstat-based (does not follow symlinks); a symlink is never a
325            // partial we created, so `is_file()` is false for it and it is skipped — consistent with
326            // the symlink refusal elsewhere in this store.
327            let Ok(meta) = entry.metadata() else { continue };
328            if !meta.is_file() {
329                continue;
330            }
331            let Ok(name) = entry.file_name().into_string() else {
332                continue;
333            };
334            let Some(base) = name.strip_suffix(PARTIAL_SUFFIX) else {
335                continue; // not a partial
336            };
337            if in_flight.contains(base) {
338                continue; // an active transfer owns this partial — never reap it (Go "no active put")
339            }
340            // Age check: keep anything modified within `delete_delay` of `now`. An unreadable or
341            // future mtime is treated as fresh (kept) — fail-safe toward not deleting a live file.
342            let age_ok = meta
343                .modified()
344                .ok()
345                .and_then(|m| now.duration_since(m).ok())
346                .is_some_and(|age| age >= delete_delay);
347            if !age_ok {
348                continue;
349            }
350            // Final guard against the snapshot TOCTOU: re-check in-flight membership under the lock
351            // immediately before deleting, so a transfer that claimed this base AFTER the upfront
352            // snapshot (the microsecond window between the snapshot and here) still spares its
353            // partial. The mtime check above already makes a wrong delete unreachable in practice (a
354            // live partial is < delete_delay old), but this closes the window completely and cheaply
355            // (one lock per aged candidate, which is rare).
356            if self
357                .in_flight
358                .lock()
359                .unwrap_or_else(|p| p.into_inner())
360                .contains(base)
361            {
362                continue;
363            }
364            match std::fs::remove_file(entry.path()) {
365                Ok(()) => {
366                    deleted += 1;
367                    tracing::info!(partial = %name, "taildrop reaper: deleted abandoned partial");
368                }
369                Err(e) if e.kind() == io::ErrorKind::NotFound => {} // already gone, fine
370                Err(e) => {
371                    tracing::warn!(error = %e, partial = %name, "taildrop reaper: delete failed")
372                }
373            }
374        }
375        deleted
376    }
377
378    /// Receive a file named `name` from `reader`, writing to `<name>.partial` then atomically
379    /// renaming to a non-clobbering final name on success. Mirrors Go `manager.PutFile`.
380    ///
381    /// `offset` lets a resumed transfer append past already-written bytes (the partial is opened, the
382    /// write starts at `offset`, and any bytes already on disk past `offset` are truncated away).
383    /// `expected_len` is the declared total length of the completed file (the request's
384    /// `Content-Length` plus `offset`); the transfer is finalized only if exactly that many bytes are
385    /// present. Returns the total number of bytes in the completed file.
386    ///
387    /// Fail-closed: an invalid name is rejected before any path is built; an in-progress partial for
388    /// the same name yields [`TaildropError::FileExists`]; an out-of-range resume `offset` (past the
389    /// current partial length) is rejected; an I/O error mid-transfer — or a body that ends before
390    /// `expected_len` (a short/interrupted stream) — leaves the `.partial` on disk and the final name
391    /// is never created. This matches Go `feature/taildrop/send.go`, which errors when the copied
392    /// length does not equal the declared length rather than publishing a truncated file.
393    ///
394    /// The retained `.partial` is resumable only by a peer that issues a ranged retry (an `offset > 0`
395    /// PUT); a sender that always restarts at `offset == 0` will instead hit the in-progress-conflict
396    /// path ([`TaildropError::FileExists`]) until the stale partial is cleared. A permanently-abandoned
397    /// partial is reclaimed by the background reaper after [`DELETE_DELAY`] (Go's `fileDeleter`
398    /// equivalent — see [`reap_abandoned_partials`](Self::reap_abandoned_partials) /
399    /// [`spawn_partial_reaper`]), which also clears that `offset == 0` conflict once the stale partial
400    /// ages out.
401    pub async fn put_file<R>(
402        &self,
403        name: &str,
404        mut reader: R,
405        offset: u64,
406        expected_len: u64,
407    ) -> Result<u64, TaildropError>
408    where
409        R: AsyncRead + Unpin,
410    {
411        let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
412        let partial = self.partial_path(base);
413
414        // Claim the name for the whole transfer FIRST, so two concurrent PUTs for the same base name
415        // (especially two resumes, `offset > 0`, which reopen the same `.partial`) cannot interleave
416        // their `set_len`/`seek`/`write_all` and corrupt the shared partial. The fresh-transfer path
417        // is already protected on disk by `create_new`, but the resume path opens with plain
418        // `write(true)` and needs this lock. The guard frees the name on drop (incl. early return /
419        // error / panic). Held across the await — it is a cheap `HashSet` membership marker, not a
420        // lock held during I/O, so it never blocks the runtime.
421        let _claim = self.claim_in_flight(base)?;
422
423        // Refuse to follow a symlink planted in the store root (fork hardening, no upstream-Go
424        // equivalent): the partial must be a regular file we create/own, never a pre-existing
425        // symlink to elsewhere.
426        refuse_symlink(&partial)?;
427
428        // A fresh transfer (offset 0) must not collide with another in-flight transfer of the same
429        // name; a resume (offset > 0) reopens the existing partial. File handles are std (the tokio
430        // `fs` feature is intentionally not enabled in this crate); the body is read async off the
431        // overlay stream and written to the blocking handle in a bounded loop.
432        let mut file = if offset == 0 {
433            match std::fs::OpenOptions::new()
434                .write(true)
435                .create_new(true)
436                .open(&partial)
437            {
438                Ok(f) => f,
439                Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
440                    return Err(TaildropError::FileExists);
441                }
442                Err(e) => return Err(e.into()),
443            }
444        } else {
445            // Resume open: `O_NOFOLLOW` so a symlink swapped in for the partial after the
446            // `refuse_symlink` lstat above is refused atomically by the kernel rather than followed.
447            let mut f = open_nofollow(std::fs::OpenOptions::new().write(true), &partial)?;
448            // Bound the resume offset to the current partial length and truncate any bytes past it,
449            // matching Go `feature/taildrop/fileops_fs.go` (`OpenWriter` rejects `offset > curr` and
450            // `Truncate(offset)`s). Without the bound a too-large offset would leave a zero-filled
451            // sparse hole; without the truncate a shorter resumed body would leave a prior attempt's
452            // stale tail past the new end. `metadata().len()` is the partial's current size.
453            let current = f.metadata()?.len();
454            if offset > current {
455                return Err(TaildropError::Io(io::Error::new(
456                    io::ErrorKind::InvalidInput,
457                    "taildrop resume offset is past the end of the partial file",
458                )));
459            }
460            f.set_len(offset)?;
461            f.seek(io::SeekFrom::Start(offset))?;
462            f
463        };
464
465        let mut copied: u64 = 0;
466        let mut buf = [0u8; 64 * 1024];
467        loop {
468            let n = reader.read(&mut buf).await?;
469            if n == 0 {
470                break;
471            }
472            // Each `write_all` only pushes the chunk into the page cache (microseconds); the
473            // genuinely blocking cost is the terminal `flush`/`sync_all`/`rename` below, which we
474            // hand to a blocking thread so a flood of concurrent transfers can't starve the tokio
475            // worker pool on fsync (see `peerapi::MAX_INFLIGHT`).
476            file.write_all(&buf[..n])?;
477            copied += n as u64;
478        }
479
480        // Length check (Go `send.go`: error when `copyLength != length`). A body that ended before the
481        // declared length — an interrupted/short stream — must NOT be finalized as a complete file;
482        // leave the `.partial` on disk (with the bytes received so far) so a Range-capable peer can
483        // resume it. `checked_add` rather than a bare `+`: `offset` is an attacker-supplied header and
484        // the bound above already rejects an `offset` past the (real, on-disk) partial length, so this
485        // cannot overflow in practice — but treat an overflow as a length mismatch rather than a panic.
486        let total = match offset.checked_add(copied) {
487            Some(t) if t == expected_len => t,
488            _ => {
489                return Err(TaildropError::Io(io::Error::new(
490                    io::ErrorKind::UnexpectedEof,
491                    format!(
492                        "taildrop body ended early: got {copied} of {expected_len} expected bytes \
493                         at offset {offset}; leaving partial for resume"
494                    ),
495                )));
496            }
497        };
498
499        // Finalize off the async runtime: `sync_all` (fsync) and `rename` are the dominant blocking
500        // operations, so run them on a blocking thread. The `File` and both paths are owned by the
501        // closure (`Send + 'static`), and `next_available_name` (which `stat`s candidates) goes with
502        // them. Fail-closed: any I/O error — or a join failure — propagates without ever publishing
503        // the final name, leaving the `.partial` in place for a later resume.
504        let root = self.root.clone();
505        let base = base.to_string();
506        tokio::task::spawn_blocking(move || -> io::Result<()> {
507            file.flush()?;
508            file.sync_all()?;
509            drop(file);
510
511            // Atomically publish under a non-clobbering final name. `next_available_name` probes
512            // candidates with `symlink_metadata` (not `exists`, which follows symlinks), so it will
513            // not treat a planted symlink as "free" and rename onto it; and we refuse to rename onto
514            // an existing symlink target outright (fork hardening, beyond Go). The `_claim` guard (held
515            // by the caller for the whole transfer) keeps this name serialized against other PUTs.
516            let final_name = next_available_name(&root, &base);
517            let final_path = root.join(&final_name);
518            if let Err(e) = refuse_symlink(&final_path) {
519                return Err(match e {
520                    TaildropError::Io(io_err) => io_err,
521                    other => io::Error::other(other.to_string()),
522                });
523            }
524            std::fs::rename(&partial, &final_path)?;
525            Ok(())
526        })
527        .await
528        .map_err(|join_err| {
529            // A panicked/cancelled finalize task: surface as I/O so the caller maps it to a 500 and
530            // the partial is left untouched (never publishes the final name).
531            TaildropError::Io(io::Error::other(format!(
532                "taildrop finalize task failed: {join_err}"
533            )))
534        })??;
535
536        Ok(total)
537    }
538
539    /// List fully-received (non-partial) files, sorted by name (Go `WaitingFiles`).
540    pub fn waiting_files(&self) -> Result<Vec<WaitingFile>, TaildropError> {
541        let mut out = Vec::new();
542        let entries = match std::fs::read_dir(&self.root) {
543            Ok(e) => e,
544            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
545            Err(e) => return Err(e.into()),
546        };
547        for entry in entries {
548            let entry = entry?;
549            // `entry.metadata()` does NOT follow symlinks (it is `lstat`-based), so a planted
550            // symlink has `is_file() == false` here and is skipped — a symlink in the store root is
551            // never reported as a waiting file (fork hardening, beyond Go), even one pointing at a real
552            // regular file elsewhere.
553            let meta = entry.metadata()?;
554            if meta.file_type().is_symlink() || !meta.is_file() {
555                continue;
556            }
557            let Ok(name) = entry.file_name().into_string() else {
558                continue;
559            };
560            // Skip in-progress / tombstoned files.
561            if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
562                continue;
563            }
564            out.push(WaitingFile {
565                name,
566                size: meta.len(),
567            });
568        }
569        out.sort_by(|a, b| a.name.cmp(&b.name));
570        Ok(out)
571    }
572
573    /// Delete a fully-received file by base name (Go `DeleteFile`). The name is validated first, so a
574    /// traversal attempt can never escape the store root, and a symlink at the target is refused
575    /// (fork hardening, beyond Go) rather than followed — a planted `root/foo.txt -> /etc/passwd`
576    /// must not let a `delete foo.txt` remove the link's target. (`remove_file` unlinks the symlink
577    /// itself rather than its referent, so the advisory `refuse_symlink` is sufficient here.)
578    pub fn delete_file(&self, name: &str) -> Result<(), TaildropError> {
579        let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
580        let path = self.root.join(base);
581        refuse_symlink(&path)?;
582        std::fs::remove_file(path)?;
583        Ok(())
584    }
585
586    /// Open a fully-received file by base name for reading, returning the handle and its size (Go
587    /// `OpenFile`). The name is validated first, and a symlink at the target is refused — advisorily
588    /// by `refuse_symlink` and then atomically by `O_NOFOLLOW` on the open itself (fork hardening
589    /// with no upstream-Go equivalent) — so a planted (or race-swapped) symlink cannot redirect the
590    /// read to an arbitrary file.
591    pub fn open_file(&self, name: &str) -> Result<(std::fs::File, u64), TaildropError> {
592        let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
593        let path = self.root.join(base);
594        refuse_symlink(&path)?;
595        // `O_NOFOLLOW` so a symlink swapped in after the `refuse_symlink` lstat is refused atomically
596        // by the kernel rather than followed, never redirecting the read to an arbitrary file.
597        let f = open_nofollow(std::fs::OpenOptions::new().read(true), &path)?;
598        let size = f.metadata()?.len();
599        Ok((f, size))
600    }
601}
602
603/// Spawn the background reaper that periodically GCs abandoned `.partial` files (Go
604/// `feature/taildrop/delete.go`'s `fileDeleter`). It sweeps every [`DELETE_DELAY`], deleting partials
605/// older than `DELETE_DELAY` that have no in-flight transfer (see
606/// [`TaildropStore::reap_abandoned_partials`]), and exits when `shutdown` flips to `true`.
607///
608/// Returns a [`JoinHandle`](tokio::task::JoinHandle) the caller should abort on drop so the task
609/// never outlives the runtime (the established `reauth_bridge` / `DerpLatencyMeasurer` pattern). An
610/// `Arc<TaildropStore>` is held (cheap clone), so the sweep sees live `in_flight` state.
611///
612/// The first sweep is deferred one full `DELETE_DELAY` (not run at startup): a partial on disk at
613/// boot is by definition at least 0s old, and Go likewise only deletes after the delay elapses, so
614/// waiting one interval avoids reaping a partial a just-restarted node might still resume.
615pub fn spawn_partial_reaper(
616    store: Arc<TaildropStore>,
617    mut shutdown: tokio::sync::watch::Receiver<bool>,
618) -> tokio::task::JoinHandle<()> {
619    tokio::spawn(async move {
620        let mut tick = tokio::time::interval(DELETE_DELAY);
621        // `interval` fires immediately on the first `tick()`; consume that so the first real sweep is
622        // one `DELETE_DELAY` out (no startup reap — a partial must age the full delay first).
623        tick.tick().await;
624        loop {
625            tokio::select! {
626                _ = tick.tick() => {
627                    let n = store.reap_abandoned_partials(SystemTime::now(), DELETE_DELAY);
628                    if n > 0 {
629                        tracing::info!(deleted = n, "taildrop reaper: swept abandoned partials");
630                    }
631                }
632                _ = shutdown.wait_for(|x| *x) => break,
633            }
634        }
635    })
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641
642    fn tmp_root() -> PathBuf {
643        // A per-call atomic counter guarantees uniqueness across tests that run concurrently in the
644        // same binary. A timestamp alone is NOT enough: `SystemTime` resolution is coarse on some
645        // platforms, so two tests starting in the same tick would collide on one dir and stomp each
646        // other's files (the cause of intermittent taildrop-test flakiness under parallel runs).
647        static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
648        let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
649        let mut p = std::env::temp_dir();
650        p.push(format!("taildrop-test-{}-{n}", std::process::id()));
651        p
652    }
653
654    #[test]
655    fn validate_rejects_traversal_and_reserved() {
656        // Valid leaf names.
657        assert_eq!(validate_base_name("photo.jpg"), Some("photo.jpg"));
658        assert_eq!(
659            validate_base_name("a file with spaces.txt"),
660            Some("a file with spaces.txt")
661        );
662        assert_eq!(validate_base_name(".bashrc"), Some(".bashrc"));
663
664        // Traversal / separators.
665        assert_eq!(validate_base_name("../etc/passwd"), None);
666        assert_eq!(validate_base_name("a/b"), None);
667        assert_eq!(validate_base_name("a\\b"), None);
668        assert_eq!(validate_base_name("/abs"), None);
669        assert_eq!(validate_base_name(".."), None);
670        assert_eq!(validate_base_name("."), None);
671
672        // NUL / control.
673        assert_eq!(validate_base_name("a\0b"), None);
674        assert_eq!(validate_base_name("a\nb"), None);
675
676        // Reserved suffixes.
677        assert_eq!(validate_base_name("x.partial"), None);
678        assert_eq!(validate_base_name("x.deleted"), None);
679
680        // Edges.
681        assert_eq!(validate_base_name(""), None);
682        assert_eq!(validate_base_name(" leading"), None);
683        assert_eq!(validate_base_name("trailing "), None);
684        assert_eq!(validate_base_name(&"a".repeat(256)), None);
685        assert_eq!(
686            validate_base_name(&"a".repeat(255)).map(|s| s.len()),
687            Some(255)
688        );
689    }
690
691    #[tokio::test]
692    async fn put_file_writes_then_atomically_renames() {
693        let root = tmp_root();
694        let store = TaildropStore::new(&root).unwrap();
695
696        let data = b"hello taildrop";
697        let n = store
698            .put_file("greeting.txt", &data[..], 0, data.len() as u64)
699            .await
700            .unwrap();
701        assert_eq!(n, data.len() as u64);
702
703        // The final file exists; no .partial remains.
704        let body = std::fs::read(root.join("greeting.txt")).unwrap();
705        assert_eq!(body, data);
706        assert!(!root.join("greeting.txt.partial").exists());
707
708        let wf = store.waiting_files().unwrap();
709        assert_eq!(wf.len(), 1);
710        assert_eq!(wf[0].name, "greeting.txt");
711        assert_eq!(wf[0].size, data.len() as u64);
712
713        std::fs::remove_dir_all(&root).ok();
714    }
715
716    #[tokio::test]
717    async fn put_file_resumes_from_offset() {
718        let root = tmp_root();
719        let store = TaildropStore::new(&root).unwrap();
720
721        // Pre-write a prefix into the `.partial` directly, simulating bytes already received by an
722        // earlier (interrupted) transfer.
723        let prefix = b"the first half ";
724        let partial = root.join("resume.txt.partial");
725        std::fs::write(&partial, prefix).unwrap();
726
727        // Resume at offset == the prefix length: `put_file` opens the existing partial, seeks past
728        // the prefix, and appends the rest.
729        let rest = b"and the second half";
730        let total = store
731            .put_file(
732                "resume.txt",
733                &rest[..],
734                prefix.len() as u64,
735                (prefix.len() + rest.len()) as u64,
736            )
737            .await
738            .unwrap();
739
740        // The returned count is offset + freshly-copied bytes, and the final file is the prefix and
741        // the resumed bytes concatenated (the seek positioned the write correctly).
742        assert_eq!(total, (prefix.len() + rest.len()) as u64);
743        let body = std::fs::read(root.join("resume.txt")).unwrap();
744        let mut expected = prefix.to_vec();
745        expected.extend_from_slice(rest);
746        assert_eq!(body, expected);
747        assert!(!partial.exists());
748
749        std::fs::remove_dir_all(&root).ok();
750    }
751
752    #[tokio::test]
753    async fn put_file_short_body_leaves_partial_not_truncated_final() {
754        // F2: a body that ends before the declared length must NOT be finalized as a complete (but
755        // truncated) file under the real name. Go errors when copyLength != length; we leave the
756        // `.partial` in place for resume.
757        let root = tmp_root();
758        let store = TaildropStore::new(&root).unwrap();
759
760        // Reader yields 5 bytes but we declare 10 expected (a short/interrupted stream).
761        let err = store
762            .put_file("short.txt", &b"world"[..], 0, 10)
763            .await
764            .unwrap_err();
765        assert!(
766            matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::UnexpectedEof),
767            "a short body must error, got {err:?}"
768        );
769        // The final name was NEVER created; the partial remains with the bytes received so far.
770        assert!(!root.join("short.txt").exists(), "no truncated final file");
771        let partial = std::fs::read(root.join("short.txt.partial")).unwrap();
772        assert_eq!(
773            partial, b"world",
774            "partial holds the received prefix for resume"
775        );
776        assert!(store.waiting_files().unwrap().is_empty());
777
778        std::fs::remove_dir_all(&root).ok();
779    }
780
781    #[tokio::test]
782    async fn put_file_resume_offset_past_end_is_rejected() {
783        // F3: a resume offset beyond the current partial length must be rejected (Go errors
784        // "offset out of range"), not produce a zero-filled sparse hole.
785        let root = tmp_root();
786        let store = TaildropStore::new(&root).unwrap();
787        std::fs::write(root.join("sparse.txt.partial"), b"abc").unwrap(); // 3 bytes on disk
788
789        let err = store
790            .put_file("sparse.txt", &b"xyz"[..], 99, 102)
791            .await
792            .unwrap_err();
793        assert!(
794            matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
795            "offset past end must be rejected, got {err:?}"
796        );
797        // The partial is untouched (still 3 bytes), no final file.
798        assert_eq!(
799            std::fs::read(root.join("sparse.txt.partial")).unwrap(),
800            b"abc"
801        );
802        assert!(!root.join("sparse.txt").exists());
803
804        std::fs::remove_dir_all(&root).ok();
805    }
806
807    #[tokio::test]
808    async fn put_file_resume_truncates_stale_tail() {
809        // F3: resuming at an offset LESS than the current partial length must truncate the bytes
810        // past the offset (Go `Truncate(offset)`), so a stale tail from a prior attempt cannot
811        // survive past the newly-written end.
812        let root = tmp_root();
813        let store = TaildropStore::new(&root).unwrap();
814        // A prior attempt left 20 bytes; we resume at offset 5 with a 3-byte tail ⇒ final is 8 bytes.
815        std::fs::write(root.join("retry.txt.partial"), b"KEEPme-STALE-TAILxxx").unwrap();
816
817        let total = store
818            .put_file("retry.txt", &b"NEW"[..], 5, 8)
819            .await
820            .unwrap();
821        assert_eq!(total, 8);
822        let body = std::fs::read(root.join("retry.txt")).unwrap();
823        assert_eq!(
824            body, b"KEEPmNEW",
825            "bytes past offset 5 truncated, then NEW appended"
826        );
827
828        std::fs::remove_dir_all(&root).ok();
829    }
830
831    #[tokio::test]
832    async fn put_file_conflict_picks_non_clobbering_name() {
833        let root = tmp_root();
834        let store = TaildropStore::new(&root).unwrap();
835
836        store
837            .put_file("dup.txt", &b"first"[..], 0, 5)
838            .await
839            .unwrap();
840        store
841            .put_file("dup.txt", &b"second"[..], 0, 6)
842            .await
843            .unwrap();
844        store
845            .put_file("dup.txt", &b"third"[..], 0, 5)
846            .await
847            .unwrap();
848
849        // Original plus two non-clobbering renames.
850        assert!(root.join("dup.txt").exists());
851        assert!(root.join("dup (1).txt").exists());
852        assert!(root.join("dup (2).txt").exists());
853
854        let wf = store.waiting_files().unwrap();
855        assert_eq!(wf.len(), 3);
856
857        std::fs::remove_dir_all(&root).ok();
858    }
859
860    #[tokio::test]
861    async fn put_file_in_progress_partial_is_conflict() {
862        let root = tmp_root();
863        let store = TaildropStore::new(&root).unwrap();
864
865        // Simulate an in-flight transfer by pre-creating the .partial file.
866        std::fs::write(root.join("busy.txt.partial"), b"partial").unwrap();
867
868        let err = store
869            .put_file("busy.txt", &b"x"[..], 0, 1)
870            .await
871            .unwrap_err();
872        assert!(matches!(err, TaildropError::FileExists));
873
874        std::fs::remove_dir_all(&root).ok();
875    }
876
877    #[tokio::test]
878    async fn put_file_rejects_bad_name_before_any_io() {
879        let root = tmp_root();
880        let store = TaildropStore::new(&root).unwrap();
881
882        let err = store
883            .put_file("../escape", &b"x"[..], 0, 1)
884            .await
885            .unwrap_err();
886        assert!(matches!(err, TaildropError::InvalidFileName));
887        // Nothing was written anywhere.
888        assert!(store.waiting_files().unwrap().is_empty());
889
890        std::fs::remove_dir_all(&root).ok();
891    }
892
893    #[tokio::test]
894    async fn delete_and_open_roundtrip() {
895        let root = tmp_root();
896        let store = TaildropStore::new(&root).unwrap();
897
898        store.put_file("doc.bin", &b"abc"[..], 0, 3).await.unwrap();
899        let (_f, size) = store.open_file("doc.bin").unwrap();
900        assert_eq!(size, 3);
901
902        store.delete_file("doc.bin").unwrap();
903        assert!(store.waiting_files().unwrap().is_empty());
904
905        // Traversal can't reach outside the root.
906        assert!(matches!(
907            store.delete_file("../../etc/passwd"),
908            Err(TaildropError::InvalidFileName)
909        ));
910
911        std::fs::remove_dir_all(&root).ok();
912    }
913
914    #[tokio::test]
915    async fn concurrent_resume_for_same_name_is_serialized() {
916        // The in-flight name guard: while one transfer holds a base name, a second PUT for the SAME
917        // name (the resume-race the lock closes) is rejected with FileExists rather than interleaving
918        // writes into the shared `.partial`. We hold the first transfer open with a reader that never
919        // completes until we let it, then fire the second concurrently.
920        let root = tmp_root();
921        let store = Arc::new(TaildropStore::new(&root).unwrap());
922
923        // A reader that delivers a byte, then blocks until released — keeps transfer #1 in flight
924        // (and thus the name claimed) while we attempt transfer #2.
925        let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
926        struct BlockingReader {
927            sent: bool,
928            release: Option<tokio::sync::oneshot::Receiver<()>>,
929        }
930        impl AsyncRead for BlockingReader {
931            fn poll_read(
932                mut self: std::pin::Pin<&mut Self>,
933                cx: &mut std::task::Context<'_>,
934                buf: &mut tokio::io::ReadBuf<'_>,
935            ) -> std::task::Poll<io::Result<()>> {
936                if !self.sent {
937                    buf.put_slice(b"x");
938                    self.sent = true;
939                    return std::task::Poll::Ready(Ok(()));
940                }
941                // After the first byte, park until released, then report EOF.
942                match self.release.as_mut() {
943                    Some(rx) => match std::pin::Pin::new(rx).poll(cx) {
944                        std::task::Poll::Ready(_) => {
945                            self.release = None;
946                            std::task::Poll::Ready(Ok(())) // EOF (no bytes written)
947                        }
948                        std::task::Poll::Pending => std::task::Poll::Pending,
949                    },
950                    None => std::task::Poll::Ready(Ok(())),
951                }
952            }
953        }
954
955        let s1 = store.clone();
956        let t1 = tokio::spawn(async move {
957            let reader = BlockingReader {
958                sent: false,
959                release: Some(release_rx),
960            };
961            // expected_len 1: completes once the single byte is read and the reader returns EOF.
962            s1.put_file("race.bin", reader, 0, 1).await
963        });
964
965        // Wait until transfer #1 has actually claimed the name (its partial exists).
966        let partial = root.join("race.bin.partial");
967        for _ in 0..200 {
968            if partial.exists() {
969                break;
970            }
971            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
972        }
973        assert!(
974            partial.exists(),
975            "transfer #1 should have created the partial"
976        );
977
978        // Transfer #2 for the SAME name, while #1 still holds the claim → FileExists, no interleave.
979        let err = store
980            .put_file("race.bin", &b"yy"[..], 1, 3)
981            .await
982            .unwrap_err();
983        assert!(
984            matches!(err, TaildropError::FileExists),
985            "a concurrent transfer for an in-flight name must be rejected, got {err:?}"
986        );
987
988        // Release #1 so it finalizes cleanly, and confirm the name frees afterward.
989        release_tx.send(()).unwrap();
990        t1.await.unwrap().unwrap();
991        // Now the name is free: a fresh transfer succeeds (the guard was released on drop).
992        store.put_file("race.bin", &b"z"[..], 0, 1).await.unwrap();
993
994        std::fs::remove_dir_all(&root).ok();
995    }
996
997    #[cfg(unix)]
998    #[tokio::test]
999    async fn symlink_in_store_root_is_refused_not_followed() {
1000        use std::os::unix::fs::symlink;
1001
1002        let root = tmp_root();
1003        let store = TaildropStore::new(&root).unwrap();
1004
1005        // An attacker-planted symlink in the store root, pointing at a sensitive file OUTSIDE it.
1006        let outside = tmp_root();
1007        std::fs::create_dir_all(&outside).unwrap();
1008        let secret = outside.join("secret");
1009        std::fs::write(&secret, b"TOP SECRET").unwrap();
1010
1011        // (a) open_file must refuse a symlink target, never read through it.
1012        let link = root.join("link.txt");
1013        symlink(&secret, &link).unwrap();
1014        let open_err = store.open_file("link.txt").unwrap_err();
1015        assert!(
1016            matches!(open_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
1017            "open_file must refuse a symlink, got {open_err:?}"
1018        );
1019
1020        // (b) delete_file must refuse the symlink, leaving BOTH the link and its target intact.
1021        let del_err = store.delete_file("link.txt").unwrap_err();
1022        assert!(
1023            matches!(del_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
1024            "delete_file must refuse a symlink, got {del_err:?}"
1025        );
1026        assert!(
1027            secret.exists(),
1028            "the symlink target must NOT have been deleted"
1029        );
1030        assert_eq!(std::fs::read(&secret).unwrap(), b"TOP SECRET");
1031
1032        // (c) waiting_files must not report the symlink as a waiting file.
1033        assert!(
1034            store.waiting_files().unwrap().is_empty(),
1035            "a symlink in the store root must not be listed as a waiting file"
1036        );
1037
1038        // (d) put_file onto a symlinked partial must refuse rather than write through the link.
1039        let link_partial = root.join("evil.bin.partial");
1040        symlink(&secret, &link_partial).unwrap();
1041        let put_err = store
1042            .put_file("evil.bin", &b"data"[..], 0, 4)
1043            .await
1044            .unwrap_err();
1045        assert!(
1046            matches!(put_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
1047            "put_file must refuse a symlinked partial, got {put_err:?}"
1048        );
1049        assert_eq!(
1050            std::fs::read(&secret).unwrap(),
1051            b"TOP SECRET",
1052            "the symlink target must NOT have been written through"
1053        );
1054
1055        std::fs::remove_dir_all(&root).ok();
1056        std::fs::remove_dir_all(&outside).ok();
1057    }
1058
1059    /// `open_nofollow` must refuse a final-component symlink **atomically in the kernel**, not merely
1060    /// via the advisory `refuse_symlink` lstat. This is the property that closes the TOCTOU window an
1061    /// external process could otherwise win by swapping the path for a symlink after the lstat: the
1062    /// open itself fails (`ELOOP`) on a symlinked final component. The test deliberately bypasses
1063    /// `refuse_symlink` and points `open_nofollow` straight at a symlink, so it would PASS-by-reading
1064    /// the target if the flag were dropped — i.e. it is a real regression guard for the `O_NOFOLLOW`
1065    /// wiring, not just a re-test of the advisory check.
1066    #[cfg(unix)]
1067    #[test]
1068    fn open_nofollow_refuses_a_symlinked_target_atomically() {
1069        use std::os::unix::fs::symlink;
1070
1071        let root = tmp_root();
1072        std::fs::create_dir_all(&root).unwrap();
1073        let outside = tmp_root();
1074        std::fs::create_dir_all(&outside).unwrap();
1075        let secret = outside.join("secret");
1076        std::fs::write(&secret, b"TOP SECRET").unwrap();
1077
1078        let link = root.join("link.bin");
1079        symlink(&secret, &link).unwrap();
1080
1081        // Read open through the symlink must fail at the kernel (ELOOP), never returning a handle to
1082        // the target. `ErrorKind::FilesystemLoop` is the mapped kind on current platforms; assert on
1083        // the raw `ELOOP` errno too so the guard holds even if the mapping changes.
1084        let read_err =
1085            open_nofollow(std::fs::OpenOptions::new().read(true), &link).expect_err("read open");
1086        assert_eq!(
1087            read_err.raw_os_error(),
1088            Some(libc::ELOOP),
1089            "O_NOFOLLOW read open of a symlink must fail with ELOOP, got {read_err:?}"
1090        );
1091
1092        // Write open (the resume path) likewise refuses the symlink atomically.
1093        let write_err =
1094            open_nofollow(std::fs::OpenOptions::new().write(true), &link).expect_err("write open");
1095        assert_eq!(
1096            write_err.raw_os_error(),
1097            Some(libc::ELOOP),
1098            "O_NOFOLLOW write open of a symlink must fail with ELOOP, got {write_err:?}"
1099        );
1100
1101        // The target was never opened/written through.
1102        assert_eq!(std::fs::read(&secret).unwrap(), b"TOP SECRET");
1103
1104        // A real (non-symlink) file at the same final name opens fine — O_NOFOLLOW only rejects a
1105        // symlinked final component, so the normal store path is unaffected.
1106        let regular = root.join("regular.bin");
1107        std::fs::write(&regular, b"hello").unwrap();
1108        let mut f = open_nofollow(std::fs::OpenOptions::new().read(true), &regular)
1109            .expect("O_NOFOLLOW open of a regular file must succeed");
1110        let mut got = String::new();
1111        std::io::Read::read_to_string(&mut f, &mut got).unwrap();
1112        assert_eq!(got, "hello");
1113
1114        std::fs::remove_dir_all(&root).ok();
1115        std::fs::remove_dir_all(&outside).ok();
1116    }
1117
1118    #[test]
1119    fn next_available_name_inserts_before_extension() {
1120        let root = tmp_root();
1121        std::fs::create_dir_all(&root).unwrap();
1122        assert_eq!(next_available_name(&root, "a.txt"), "a.txt");
1123        std::fs::write(root.join("a.txt"), b"x").unwrap();
1124        assert_eq!(next_available_name(&root, "a.txt"), "a (1).txt");
1125        std::fs::write(root.join("a (1).txt"), b"x").unwrap();
1126        assert_eq!(next_available_name(&root, "a.txt"), "a (2).txt");
1127        // Dotfile (no real extension) appends at end.
1128        std::fs::write(root.join(".env"), b"x").unwrap();
1129        assert_eq!(next_available_name(&root, ".env"), ".env (1)");
1130        std::fs::remove_dir_all(&root).ok();
1131    }
1132
1133    /// The reaper deletes a `.partial` older than `delete_delay` and keeps a fresh one. Aging is
1134    /// driven by the passed-in `now` (the partial's real mtime is ~now): `now + 2h` with a 1h delay
1135    /// makes an existing partial look 2h old (reaped); `now` with a 1h delay keeps it (~0s old).
1136    #[test]
1137    fn reaper_deletes_aged_partial_keeps_fresh_and_final() {
1138        let root = tmp_root();
1139        let store = TaildropStore::new(&root).unwrap();
1140
1141        std::fs::write(root.join("abandoned.bin.partial"), b"half").unwrap();
1142        std::fs::write(root.join("done.bin"), b"complete").unwrap(); // a finished file, not a partial
1143
1144        let now = SystemTime::now();
1145        let delay = Duration::from_secs(3600);
1146
1147        // Fresh: nothing is older than the delay yet.
1148        assert_eq!(
1149            store.reap_abandoned_partials(now, delay),
1150            0,
1151            "nothing aged out"
1152        );
1153        assert!(root.join("abandoned.bin.partial").exists());
1154
1155        // Aged: 2h in the future vs a 1h delay → the partial is reaped, the final file is untouched.
1156        let reaped = store.reap_abandoned_partials(now + Duration::from_secs(2 * 3600), delay);
1157        assert_eq!(reaped, 1, "the aged partial is reaped");
1158        assert!(
1159            !root.join("abandoned.bin.partial").exists(),
1160            "aged partial deleted"
1161        );
1162        assert!(
1163            root.join("done.bin").exists(),
1164            "a completed (non-partial) file must never be reaped"
1165        );
1166
1167        std::fs::remove_dir_all(&root).ok();
1168    }
1169
1170    /// An in-flight transfer's partial is NEVER reaped, even when aged (Go's "no active put" check):
1171    /// while a base name is claimed, the reaper skips its partial regardless of mtime.
1172    #[tokio::test]
1173    async fn reaper_spares_in_flight_partial() {
1174        let root = tmp_root();
1175        let store = TaildropStore::new(&root).unwrap();
1176
1177        // Pre-write an (old) partial and claim its base name as in-flight.
1178        std::fs::write(root.join("live.bin.partial"), b"in progress").unwrap();
1179        let _claim = store.claim_in_flight("live.bin").expect("claim");
1180
1181        // Even with a far-future `now` (well past the delay), the in-flight partial is spared.
1182        let reaped = store.reap_abandoned_partials(
1183            SystemTime::now() + Duration::from_secs(48 * 3600),
1184            Duration::from_secs(3600),
1185        );
1186        assert_eq!(reaped, 0, "an in-flight partial must not be reaped");
1187        assert!(
1188            root.join("live.bin.partial").exists(),
1189            "the in-flight partial survives the sweep"
1190        );
1191
1192        // Once the claim drops, the same aged partial IS reaped.
1193        drop(_claim);
1194        let reaped = store.reap_abandoned_partials(
1195            SystemTime::now() + Duration::from_secs(48 * 3600),
1196            Duration::from_secs(3600),
1197        );
1198        assert_eq!(
1199            reaped, 1,
1200            "after the claim drops, the aged partial is reaped"
1201        );
1202
1203        std::fs::remove_dir_all(&root).ok();
1204    }
1205
1206    /// The background reaper task: (1) does NOT reap at startup, and (2) terminates when shutdown
1207    /// flips true. With the real DELETE_DELAY (1h) interval, the first real sweep is an hour out, so
1208    /// within this millisecond-scale test no sweep ever fires — which is exactly what proves both
1209    /// "the startup tick is skipped" (an aged partial present at start survives) and "the task is
1210    /// parked on the interval, woken only by shutdown". (`test-util`/paused-time is intentionally not
1211    /// pulled into this crate's deps, so this uses real time and the fact that 1h >> the test.)
1212    #[tokio::test]
1213    async fn reaper_task_skips_startup_then_terminates_on_shutdown() {
1214        let root = tmp_root();
1215        let store = Arc::new(TaildropStore::new(&root).unwrap());
1216        // A partial present at boot — it must survive (no startup reap fires within the test window).
1217        std::fs::write(root.join("boot.bin.partial"), b"x").unwrap();
1218
1219        let (tx, rx) = tokio::sync::watch::channel(false);
1220        let handle = spawn_partial_reaper(store.clone(), rx);
1221
1222        // Give the task a moment to reach its select!; the first real sweep is DELETE_DELAY (1h) out,
1223        // so nothing is reaped here.
1224        tokio::time::sleep(Duration::from_millis(50)).await;
1225        assert!(
1226            root.join("boot.bin.partial").exists(),
1227            "no reap before the first real sweep (startup tick is skipped; first sweep is 1h out)"
1228        );
1229
1230        // Flip shutdown: the task must terminate via its select! shutdown arm, not hang on the 1h
1231        // interval. If the shutdown arm were broken this `timeout` would elapse (the next tick is ~1h
1232        // away), so the timeout firing IS the regression signal.
1233        tx.send_replace(true);
1234        tokio::time::timeout(Duration::from_secs(5), handle)
1235            .await
1236            .expect("reaper must terminate on shutdown (not wait for the 1h interval)")
1237            .expect("reaper task must not panic");
1238
1239        std::fs::remove_dir_all(&root).ok();
1240    }
1241
1242    /// A reaper spawned when shutdown is ALREADY true terminates on its first loop iteration
1243    /// (`watch::Receiver::wait_for` returns immediately when the predicate already holds), rather
1244    /// than waiting a full DELETE_DELAY (1h) for the first tick.
1245    #[tokio::test]
1246    async fn reaper_task_terminates_when_shutdown_already_set() {
1247        let root = tmp_root();
1248        let store = Arc::new(TaildropStore::new(&root).unwrap());
1249        let (_tx, rx) = tokio::sync::watch::channel(true); // already shut down
1250        let handle = spawn_partial_reaper(store, rx);
1251        tokio::time::timeout(Duration::from_secs(5), handle)
1252            .await
1253            .expect("a reaper spawned post-shutdown must exit promptly, not wait the 1h interval")
1254            .expect("reaper task must not panic");
1255        std::fs::remove_dir_all(&root).ok();
1256    }
1257}