ts_runtime/taildrop.rs
1//! Taildrop file store — the receiving half of Tailscale's peer-to-peer file transfer.
2//!
3//! A peer sends a file to this node via the peerAPI route `PUT /v0/put/<name>` (handled in
4//! `peerapi`). This module owns the on-disk store those puts land in, faithfully mirroring
5//! Go's `taildrop.manager`:
6//!
7//! - Incoming bytes are written to a per-transfer **partial** file (`<base>.partial`) so an
8//! interrupted transfer never exposes a truncated file under its real name, and can be resumed
9//! from an offset.
10//! - On successful completion the partial is **atomically renamed** to the final base name. If the
11//! final name already exists, a non-clobbering ` (n)` suffix is chosen (Go `nextFilename`).
12//! - File names are strictly validated ([`validate_base_name`](crate::taildrop::validate_base_name)) to defeat path traversal and
13//! reserved-suffix abuse before any path is constructed — this is the security boundary.
14//!
15//! # Anti-abuse / safety
16//!
17//! Every name is validated to be a single, local, non-traversing path component before it touches
18//! the filesystem; a name containing `/`, `\`, `..`, a NUL, control chars, or the reserved
19//! `.partial` / `.deleted` suffixes is rejected with [`TaildropError::InvalidFileName`](crate::taildrop::TaildropError::InvalidFileName). The store
20//! root is fixed at construction; all I/O is confined to it by joining only validated base names.
21
22use std::{
23 collections::HashSet,
24 io::{self, Seek, Write},
25 path::{Path, PathBuf},
26 sync::{Arc, Mutex},
27 time::{Duration, SystemTime},
28};
29
30use tokio::io::{AsyncRead, AsyncReadExt};
31
32/// How long an abandoned `.partial` is kept before the reaper deletes it (Go
33/// `feature/taildrop/delete.go` `deleteDelay = time.Hour`). A resume within the window keeps the
34/// partial alive (its mtime advances on every write, and an in-flight transfer is skipped outright),
35/// so this is the grace period for a transfer to be resumed before its leftovers are reclaimed.
36pub const DELETE_DELAY: Duration = Duration::from_secs(60 * 60);
37
38/// Suffix for in-progress transfers. A completed transfer is renamed off this suffix; a name
39/// ending in it is itself never accepted as a base name (Go `partialSuffix`).
40const PARTIAL_SUFFIX: &str = ".partial";
41/// Suffix Go uses to tombstone files pending deletion on platforms with async close; we reject it
42/// as a base name for parity so a sender can't create one (Go `deletedSuffix`).
43const DELETED_SUFFIX: &str = ".deleted";
44/// Maximum base-name length in bytes (Go `validateBaseName`: 255).
45const MAX_BASE_NAME_LEN: usize = 255;
46
47/// Errors from the Taildrop file store.
48#[derive(Debug)]
49pub enum TaildropError {
50 /// The requested file name is invalid (traversal, reserved suffix, empty, too long, bad runes).
51 /// Maps to peerAPI `400 Bad Request`.
52 InvalidFileName,
53 /// A transfer for this exact base name is already in progress. Maps to peerAPI `409 Conflict`.
54 FileExists,
55 /// Underlying filesystem I/O failure. Maps to peerAPI `500`.
56 Io(io::Error),
57}
58
59impl core::fmt::Display for TaildropError {
60 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
61 match self {
62 TaildropError::InvalidFileName => write!(f, "invalid taildrop file name"),
63 TaildropError::FileExists => {
64 write!(f, "a transfer for this file is already in progress")
65 }
66 TaildropError::Io(e) => write!(f, "taildrop I/O error: {e}"),
67 }
68 }
69}
70
71impl std::error::Error for TaildropError {
72 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
73 match self {
74 TaildropError::Io(e) => Some(e),
75 _ => None,
76 }
77 }
78}
79
80impl From<io::Error> for TaildropError {
81 fn from(e: io::Error) -> Self {
82 TaildropError::Io(e)
83 }
84}
85
86/// A waiting (fully-received) Taildrop file, as reported to the embedder. Mirrors Go
87/// `apitype.WaitingFile` (default field-name JSON marshalling: `Name`, `Size`).
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct WaitingFile {
90 /// The file's base name.
91 pub name: String,
92 /// The file's size in bytes.
93 pub size: u64,
94}
95
96/// Validate a Taildrop base name, mirroring Go `taildrop.validateBaseName`.
97///
98/// Returns the name unchanged when it is a safe, single, local path component; otherwise `None`.
99/// Rejection rules (any one fails): empty or `> 255` bytes; leading/trailing ASCII space; contains
100/// a path separator (`/` or `\`), a NUL, or an ASCII control char; is `.` or `..`; equals a cleaned
101/// path other than itself (catches embedded `..`/`.` segments and absolute paths); or ends in the
102/// reserved `.partial` / `.deleted` suffixes.
103pub fn validate_base_name(name: &str) -> Option<&str> {
104 if name.is_empty() || name.len() > MAX_BASE_NAME_LEN {
105 return None;
106 }
107 if name.starts_with(' ') || name.ends_with(' ') {
108 return None;
109 }
110 if name == "." || name == ".." {
111 return None;
112 }
113 if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
114 return None;
115 }
116 // Reject any separator, NUL, or control character outright. This is the core traversal guard:
117 // with no `/`, `\`, or `..` segment possible, the name can only ever be a leaf in the store dir.
118 for ch in name.chars() {
119 if ch == '/' || ch == '\\' || ch == '\0' || ch.is_control() {
120 return None;
121 }
122 }
123 // Defense in depth: a name that does not survive `Path` normalization as a single normal
124 // component is rejected (catches `..`, absolute paths, and any platform-specific oddity).
125 let p = Path::new(name);
126 let mut comps = p.components();
127 match (comps.next(), comps.next()) {
128 (Some(std::path::Component::Normal(c)), None) if c == name => Some(name),
129 _ => None,
130 }
131}
132
133/// Choose a non-clobbering final name for `base` within `dir`, mirroring Go `nextFilename`:
134/// `foo.txt` -> `foo (1).txt` -> `foo (2).txt` ... inserting ` (n)` before the extension. Returns
135/// the first candidate (incl. `base` itself) whose path does not yet exist. Bounded to avoid an
136/// unbounded loop on a pathological directory.
137fn next_available_name(dir: &Path, base: &str) -> String {
138 if !path_present(&dir.join(base)) {
139 return base.to_string();
140 }
141 let (stem, ext) = match base.rsplit_once('.') {
142 // Keep the dot with the extension; an empty stem (dotfile like ".bashrc") has no split.
143 Some((stem, ext)) if !stem.is_empty() => (stem, format!(".{ext}")),
144 _ => (base, String::new()),
145 };
146 for n in 1..=10_000u32 {
147 let candidate = format!("{stem} ({n}){ext}");
148 if !path_present(&dir.join(&candidate)) {
149 return candidate;
150 }
151 }
152 // Pathological fallback: suffix with a high counter; extremely unlikely to be reached.
153 format!("{stem} (overflow){ext}")
154}
155
156/// Whether a path is present, treating a symlink (even a dangling one) as present. Unlike
157/// `Path::exists()` (which follows the link and returns `false` for a dangling symlink), this uses
158/// `symlink_metadata` so a planted symlink can never be mistaken for a free name in
159/// [`next_available_name`] — we must not select, then rename onto, a symlink.
160fn path_present(path: &Path) -> bool {
161 std::fs::symlink_metadata(path).is_ok()
162}
163
164/// Reject a path that is (or whose final component is) a symlink. This is hardening **beyond**
165/// upstream Go, whose taildrop (`feature/taildrop/fileops_fs.go` at v1.100.0) opens with a plain
166/// `os.OpenFile(O_CREATE|O_RDWR)` / `os.Open` and refuses symlinks nowhere — it relies on name
167/// validation (`joinDir`) alone. `validate_base_name` already blocks a traversing *name*,
168/// but not a symlink **component already planted in the store root** by a local attacker (e.g.
169/// `root/foo.txt -> /etc/cron.d/x`), which a plain `open`/`rename`/`remove` would follow. Uses
170/// `symlink_metadata` (lstat — does NOT follow the final symlink); a non-existent path is fine
171/// (returns `Ok(())`), only an existing symlink is refused.
172///
173/// This is a check-then-act guard, so it is not atomic with the open/rename/remove that follows.
174/// It kills the **persistent-plant** attack (a symlink left in the store root is no longer followed
175/// deterministically), and the per-name in-flight lock serializes our OWN operations on a name, but
176/// on its own it does not close a sub-millisecond race where an external process swaps the path for
177/// a symlink between this lstat and the syscall. The two paths that actually *open* a store file —
178/// the `offset > 0` resume write-open and the read-open — additionally pass `O_NOFOLLOW`
179/// ([`open_nofollow`]) so the kernel refuses a final-component symlink atomically, closing that
180/// residual race; `refuse_symlink` is retained ahead of them as a
181/// portable defense-in-depth check that also yields a clean typed error. The `offset == 0` put is
182/// atomically protected by `create_new` (`O_EXCL`, which refuses an existing symlink), and the
183/// non-opening ops (`rename`/`remove_file`/`read_dir`) act on the link itself rather than following
184/// it, so the advisory check is sufficient there. The residual external-swap window therefore only
185/// requires an external writer who already holds store-dir write access (the threat bound for this
186/// hardening).
187fn refuse_symlink(path: &Path) -> Result<(), TaildropError> {
188 match std::fs::symlink_metadata(path) {
189 Ok(meta) if meta.file_type().is_symlink() => Err(TaildropError::Io(io::Error::new(
190 io::ErrorKind::InvalidInput,
191 "taildrop path is a symlink; refusing to follow it",
192 ))),
193 Ok(_) => Ok(()),
194 // Not present yet (the common case for a fresh partial / final name) — nothing to refuse.
195 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
196 Err(e) => Err(e.into()),
197 }
198}
199
200/// Open an existing store file with the given [`OpenOptions`](std::fs::OpenOptions), refusing a
201/// final-component symlink **atomically** in the kernel via `O_NOFOLLOW` on Unix. This is the atomic
202/// counterpart to the advisory [`refuse_symlink`] check: where `refuse_symlink` lstat's the path
203/// first (a check-then-act guard with a sub-millisecond swap window), `O_NOFOLLOW` makes the kernel
204/// fail the `open` itself (`ELOOP`) if the final path component is a symlink, so an external process
205/// cannot win a race by swapping the path for a symlink after the lstat. This is fork hardening with
206/// no upstream-Go equivalent (Go's taildrop opens without `O_NOFOLLOW`).
207///
208/// On non-Unix targets `O_NOFOLLOW` has no portable equivalent, so this is a plain `open` and the
209/// preceding `refuse_symlink` advisory check is the only symlink defense there (Windows does not use
210/// the Unix symlink threat model for this store).
211fn open_nofollow(opts: &mut std::fs::OpenOptions, path: &Path) -> io::Result<std::fs::File> {
212 #[cfg(unix)]
213 {
214 use std::os::unix::fs::OpenOptionsExt;
215 // `custom_flags` sets the raw `open(2)` flag set, which `open()` then ORs with the access
216 // mode (`O_RDONLY`/`O_WRONLY`) derived from `.read`/`.write` — so the access mode is
217 // preserved alongside `O_NOFOLLOW`. `O_NOFOLLOW` makes the kernel return `ELOOP` instead of
218 // following a final-component symlink. It does not affect non-final components, but the store
219 // root is fixed and names are validated to a single component, so the final component is the
220 // only attacker-influenceable one.
221 opts.custom_flags(libc::O_NOFOLLOW);
222 }
223 opts.open(path)
224}
225
226/// A Taildrop file store rooted at a fixed directory. All operations are confined to this root by
227/// joining only [`validate_base_name`]-validated names.
228#[derive(Debug, Clone)]
229pub struct TaildropStore {
230 root: PathBuf,
231 /// Base names with a transfer currently in flight. A `put_file` claims its base name here for
232 /// the whole receive (both a fresh `offset == 0` transfer and a resumed `offset > 0` one), so
233 /// two concurrent PUTs for the same name cannot interleave `set_len`/`seek`/`write_all` and
234 /// corrupt the shared `.partial`. Shared (`Arc`) so it survives `TaildropStore::clone()` — the
235 /// store is handed around as `Arc<TaildropStore>` but cloning it must not fork the guard set.
236 in_flight: Arc<Mutex<HashSet<String>>>,
237}
238
239/// RAII claim on an in-flight transfer name; releasing it (on drop) frees the name for the next
240/// transfer. Holds the shared guard set so the entry is removed even on an early return / error /
241/// panic in `put_file`.
242struct InFlightGuard {
243 set: Arc<Mutex<HashSet<String>>>,
244 name: String,
245}
246
247impl Drop for InFlightGuard {
248 fn drop(&mut self) {
249 // A poisoned lock still lets us recover the set and remove our entry — leaving a stale name
250 // claimed would wedge all future transfers of that name behind a phantom conflict.
251 let mut set = self.set.lock().unwrap_or_else(|p| p.into_inner());
252 set.remove(&self.name);
253 }
254}
255
256impl TaildropStore {
257 /// Create a store rooted at `root`, creating the directory (and parents) if needed.
258 pub fn new(root: impl Into<PathBuf>) -> Result<Self, TaildropError> {
259 let root = root.into();
260 std::fs::create_dir_all(&root)?;
261 Ok(Self {
262 root,
263 in_flight: Arc::new(Mutex::new(HashSet::new())),
264 })
265 }
266
267 /// Claim `base` as in-flight, returning an RAII guard that frees it on drop. Returns
268 /// [`TaildropError::FileExists`] if another transfer already holds the name — this is the
269 /// concurrency analog of the on-disk `.partial` conflict, and it serializes all transfers of one
270 /// name so a resume (`offset > 0`) cannot race a concurrent transfer's `set_len`/`seek`/`write`.
271 fn claim_in_flight(&self, base: &str) -> Result<InFlightGuard, TaildropError> {
272 let name = base.to_string();
273 let mut set = self.in_flight.lock().unwrap_or_else(|p| p.into_inner());
274 if !set.insert(name.clone()) {
275 return Err(TaildropError::FileExists);
276 }
277 Ok(InFlightGuard {
278 set: self.in_flight.clone(),
279 name,
280 })
281 }
282
283 /// The partial-file path for an already-validated base name.
284 fn partial_path(&self, base: &str) -> PathBuf {
285 self.root.join(format!("{base}{PARTIAL_SUFFIX}"))
286 }
287
288 /// Reap abandoned `.partial` files: delete every `<base>.partial` in the store root whose last
289 /// modification is older than `delete_delay` relative to `now` AND whose base name has no
290 /// in-flight transfer. Returns the number deleted. Mirrors Go `feature/taildrop/delete.go`'s
291 /// `fileDeleter`, which GCs a partial `deleteDelay` (1h) after it was last touched, sparing one
292 /// that an active put is still writing.
293 ///
294 /// This fork has no per-file timer queue (the store is a passive `Arc`, not an actor); instead a
295 /// periodic background sweep — see [`spawn_partial_reaper`] — calls this. The two Go cancellation
296 /// signals are both honored: an **active** transfer's base name is in `in_flight` (skipped here,
297 /// the analog of Go's "no active put" check), and a **resumed** transfer advances the partial's
298 /// mtime on every write (so a partial resumed within the window looks recent and is spared). A
299 /// permanently-abandoned partial is neither, so it ages out and is deleted, reclaiming the disk
300 /// and clearing the stale-partial `409` that would otherwise block an `offset == 0` re-send of
301 /// the same name forever.
302 ///
303 /// `now` and `delete_delay` are parameters (not read from the clock here) so the reap logic is
304 /// deterministically testable. A partial whose mtime is unreadable or in the future is treated as
305 /// fresh (kept) — fail-safe toward never deleting a file that might still be live.
306 pub fn reap_abandoned_partials(&self, now: SystemTime, delete_delay: Duration) -> usize {
307 let entries = match std::fs::read_dir(&self.root) {
308 Ok(e) => e,
309 Err(e) if e.kind() == io::ErrorKind::NotFound => return 0,
310 Err(e) => {
311 tracing::warn!(error = %e, "taildrop reaper: cannot read store dir");
312 return 0;
313 }
314 };
315 // Snapshot the in-flight names once; an active transfer must never have its partial reaped.
316 let in_flight: HashSet<String> = self
317 .in_flight
318 .lock()
319 .unwrap_or_else(|p| p.into_inner())
320 .clone();
321
322 let mut deleted = 0usize;
323 for entry in entries.flatten() {
324 // `metadata()` here is lstat-based (does not follow symlinks); a symlink is never a
325 // partial we created, so `is_file()` is false for it and it is skipped — consistent with
326 // the symlink refusal elsewhere in this store.
327 let Ok(meta) = entry.metadata() else { continue };
328 if !meta.is_file() {
329 continue;
330 }
331 let Ok(name) = entry.file_name().into_string() else {
332 continue;
333 };
334 let Some(base) = name.strip_suffix(PARTIAL_SUFFIX) else {
335 continue; // not a partial
336 };
337 if in_flight.contains(base) {
338 continue; // an active transfer owns this partial — never reap it (Go "no active put")
339 }
340 // Age check: keep anything modified within `delete_delay` of `now`. An unreadable or
341 // future mtime is treated as fresh (kept) — fail-safe toward not deleting a live file.
342 let age_ok = meta
343 .modified()
344 .ok()
345 .and_then(|m| now.duration_since(m).ok())
346 .is_some_and(|age| age >= delete_delay);
347 if !age_ok {
348 continue;
349 }
350 // Final guard against the snapshot TOCTOU: re-check in-flight membership under the lock
351 // immediately before deleting, so a transfer that claimed this base AFTER the upfront
352 // snapshot (the microsecond window between the snapshot and here) still spares its
353 // partial. The mtime check above already makes a wrong delete unreachable in practice (a
354 // live partial is < delete_delay old), but this closes the window completely and cheaply
355 // (one lock per aged candidate, which is rare).
356 if self
357 .in_flight
358 .lock()
359 .unwrap_or_else(|p| p.into_inner())
360 .contains(base)
361 {
362 continue;
363 }
364 match std::fs::remove_file(entry.path()) {
365 Ok(()) => {
366 deleted += 1;
367 tracing::info!(partial = %name, "taildrop reaper: deleted abandoned partial");
368 }
369 Err(e) if e.kind() == io::ErrorKind::NotFound => {} // already gone, fine
370 Err(e) => {
371 tracing::warn!(error = %e, partial = %name, "taildrop reaper: delete failed")
372 }
373 }
374 }
375 deleted
376 }
377
378 /// Receive a file named `name` from `reader`, writing to `<name>.partial` then atomically
379 /// renaming to a non-clobbering final name on success. Mirrors Go `manager.PutFile`.
380 ///
381 /// `offset` lets a resumed transfer append past already-written bytes (the partial is opened, the
382 /// write starts at `offset`, and any bytes already on disk past `offset` are truncated away).
383 /// `expected_len` is the declared total length of the completed file (the request's
384 /// `Content-Length` plus `offset`); the transfer is finalized only if exactly that many bytes are
385 /// present. Returns the total number of bytes in the completed file.
386 ///
387 /// Fail-closed: an invalid name is rejected before any path is built; an in-progress partial for
388 /// the same name yields [`TaildropError::FileExists`]; an out-of-range resume `offset` (past the
389 /// current partial length) is rejected; an I/O error mid-transfer — or a body that ends before
390 /// `expected_len` (a short/interrupted stream) — leaves the `.partial` on disk and the final name
391 /// is never created. This matches Go `feature/taildrop/send.go`, which errors when the copied
392 /// length does not equal the declared length rather than publishing a truncated file.
393 ///
394 /// The retained `.partial` is resumable only by a peer that issues a ranged retry (an `offset > 0`
395 /// PUT); a sender that always restarts at `offset == 0` will instead hit the in-progress-conflict
396 /// path ([`TaildropError::FileExists`]) until the stale partial is cleared. A permanently-abandoned
397 /// partial is reclaimed by the background reaper after [`DELETE_DELAY`] (Go's `fileDeleter`
398 /// equivalent — see [`reap_abandoned_partials`](Self::reap_abandoned_partials) /
399 /// [`spawn_partial_reaper`]), which also clears that `offset == 0` conflict once the stale partial
400 /// ages out.
401 pub async fn put_file<R>(
402 &self,
403 name: &str,
404 mut reader: R,
405 offset: u64,
406 expected_len: u64,
407 ) -> Result<u64, TaildropError>
408 where
409 R: AsyncRead + Unpin,
410 {
411 let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
412 let partial = self.partial_path(base);
413
414 // Claim the name for the whole transfer FIRST, so two concurrent PUTs for the same base name
415 // (especially two resumes, `offset > 0`, which reopen the same `.partial`) cannot interleave
416 // their `set_len`/`seek`/`write_all` and corrupt the shared partial. The fresh-transfer path
417 // is already protected on disk by `create_new`, but the resume path opens with plain
418 // `write(true)` and needs this lock. The guard frees the name on drop (incl. early return /
419 // error / panic). Held across the await — it is a cheap `HashSet` membership marker, not a
420 // lock held during I/O, so it never blocks the runtime.
421 let _claim = self.claim_in_flight(base)?;
422
423 // Refuse to follow a symlink planted in the store root (fork hardening, no upstream-Go
424 // equivalent): the partial must be a regular file we create/own, never a pre-existing
425 // symlink to elsewhere.
426 refuse_symlink(&partial)?;
427
428 // A fresh transfer (offset 0) must not collide with another in-flight transfer of the same
429 // name; a resume (offset > 0) reopens the existing partial. File handles are std (the tokio
430 // `fs` feature is intentionally not enabled in this crate); the body is read async off the
431 // overlay stream and written to the blocking handle in a bounded loop.
432 let mut file = if offset == 0 {
433 match std::fs::OpenOptions::new()
434 .write(true)
435 .create_new(true)
436 .open(&partial)
437 {
438 Ok(f) => f,
439 Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
440 return Err(TaildropError::FileExists);
441 }
442 Err(e) => return Err(e.into()),
443 }
444 } else {
445 // Resume open: `O_NOFOLLOW` so a symlink swapped in for the partial after the
446 // `refuse_symlink` lstat above is refused atomically by the kernel rather than followed.
447 let mut f = open_nofollow(std::fs::OpenOptions::new().write(true), &partial)?;
448 // Bound the resume offset to the current partial length and truncate any bytes past it,
449 // matching Go `feature/taildrop/fileops_fs.go` (`OpenWriter` rejects `offset > curr` and
450 // `Truncate(offset)`s). Without the bound a too-large offset would leave a zero-filled
451 // sparse hole; without the truncate a shorter resumed body would leave a prior attempt's
452 // stale tail past the new end. `metadata().len()` is the partial's current size.
453 let current = f.metadata()?.len();
454 if offset > current {
455 return Err(TaildropError::Io(io::Error::new(
456 io::ErrorKind::InvalidInput,
457 "taildrop resume offset is past the end of the partial file",
458 )));
459 }
460 f.set_len(offset)?;
461 f.seek(io::SeekFrom::Start(offset))?;
462 f
463 };
464
465 let mut copied: u64 = 0;
466 let mut buf = [0u8; 64 * 1024];
467 loop {
468 let n = reader.read(&mut buf).await?;
469 if n == 0 {
470 break;
471 }
472 // Each `write_all` only pushes the chunk into the page cache (microseconds); the
473 // genuinely blocking cost is the terminal `flush`/`sync_all`/`rename` below, which we
474 // hand to a blocking thread so a flood of concurrent transfers can't starve the tokio
475 // worker pool on fsync (see `peerapi::MAX_INFLIGHT`).
476 file.write_all(&buf[..n])?;
477 copied += n as u64;
478 }
479
480 // Length check (Go `send.go`: error when `copyLength != length`). A body that ended before the
481 // declared length — an interrupted/short stream — must NOT be finalized as a complete file;
482 // leave the `.partial` on disk (with the bytes received so far) so a Range-capable peer can
483 // resume it. `checked_add` rather than a bare `+`: `offset` is an attacker-supplied header and
484 // the bound above already rejects an `offset` past the (real, on-disk) partial length, so this
485 // cannot overflow in practice — but treat an overflow as a length mismatch rather than a panic.
486 let total = match offset.checked_add(copied) {
487 Some(t) if t == expected_len => t,
488 _ => {
489 return Err(TaildropError::Io(io::Error::new(
490 io::ErrorKind::UnexpectedEof,
491 format!(
492 "taildrop body ended early: got {copied} of {expected_len} expected bytes \
493 at offset {offset}; leaving partial for resume"
494 ),
495 )));
496 }
497 };
498
499 // Finalize off the async runtime: `sync_all` (fsync) and `rename` are the dominant blocking
500 // operations, so run them on a blocking thread. The `File` and both paths are owned by the
501 // closure (`Send + 'static`), and `next_available_name` (which `stat`s candidates) goes with
502 // them. Fail-closed: any I/O error — or a join failure — propagates without ever publishing
503 // the final name, leaving the `.partial` in place for a later resume.
504 let root = self.root.clone();
505 let base = base.to_string();
506 tokio::task::spawn_blocking(move || -> io::Result<()> {
507 file.flush()?;
508 file.sync_all()?;
509 drop(file);
510
511 // Atomically publish under a non-clobbering final name. `next_available_name` probes
512 // candidates with `symlink_metadata` (not `exists`, which follows symlinks), so it will
513 // not treat a planted symlink as "free" and rename onto it; and we refuse to rename onto
514 // an existing symlink target outright (fork hardening, beyond Go). The `_claim` guard (held
515 // by the caller for the whole transfer) keeps this name serialized against other PUTs.
516 let final_name = next_available_name(&root, &base);
517 let final_path = root.join(&final_name);
518 if let Err(e) = refuse_symlink(&final_path) {
519 return Err(match e {
520 TaildropError::Io(io_err) => io_err,
521 other => io::Error::other(other.to_string()),
522 });
523 }
524 std::fs::rename(&partial, &final_path)?;
525 Ok(())
526 })
527 .await
528 .map_err(|join_err| {
529 // A panicked/cancelled finalize task: surface as I/O so the caller maps it to a 500 and
530 // the partial is left untouched (never publishes the final name).
531 TaildropError::Io(io::Error::other(format!(
532 "taildrop finalize task failed: {join_err}"
533 )))
534 })??;
535
536 Ok(total)
537 }
538
539 /// List fully-received (non-partial) files, sorted by name (Go `WaitingFiles`).
540 pub fn waiting_files(&self) -> Result<Vec<WaitingFile>, TaildropError> {
541 let mut out = Vec::new();
542 let entries = match std::fs::read_dir(&self.root) {
543 Ok(e) => e,
544 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
545 Err(e) => return Err(e.into()),
546 };
547 for entry in entries {
548 let entry = entry?;
549 // `entry.metadata()` does NOT follow symlinks (it is `lstat`-based), so a planted
550 // symlink has `is_file() == false` here and is skipped — a symlink in the store root is
551 // never reported as a waiting file (fork hardening, beyond Go), even one pointing at a real
552 // regular file elsewhere.
553 let meta = entry.metadata()?;
554 if meta.file_type().is_symlink() || !meta.is_file() {
555 continue;
556 }
557 let Ok(name) = entry.file_name().into_string() else {
558 continue;
559 };
560 // Skip in-progress / tombstoned files.
561 if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
562 continue;
563 }
564 out.push(WaitingFile {
565 name,
566 size: meta.len(),
567 });
568 }
569 out.sort_by(|a, b| a.name.cmp(&b.name));
570 Ok(out)
571 }
572
573 /// Delete a fully-received file by base name (Go `DeleteFile`). The name is validated first, so a
574 /// traversal attempt can never escape the store root, and a symlink at the target is refused
575 /// (fork hardening, beyond Go) rather than followed — a planted `root/foo.txt -> /etc/passwd`
576 /// must not let a `delete foo.txt` remove the link's target. (`remove_file` unlinks the symlink
577 /// itself rather than its referent, so the advisory `refuse_symlink` is sufficient here.)
578 pub fn delete_file(&self, name: &str) -> Result<(), TaildropError> {
579 let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
580 let path = self.root.join(base);
581 refuse_symlink(&path)?;
582 std::fs::remove_file(path)?;
583 Ok(())
584 }
585
586 /// Open a fully-received file by base name for reading, returning the handle and its size (Go
587 /// `OpenFile`). The name is validated first, and a symlink at the target is refused — advisorily
588 /// by `refuse_symlink` and then atomically by `O_NOFOLLOW` on the open itself (fork hardening
589 /// with no upstream-Go equivalent) — so a planted (or race-swapped) symlink cannot redirect the
590 /// read to an arbitrary file.
591 pub fn open_file(&self, name: &str) -> Result<(std::fs::File, u64), TaildropError> {
592 let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
593 let path = self.root.join(base);
594 refuse_symlink(&path)?;
595 // `O_NOFOLLOW` so a symlink swapped in after the `refuse_symlink` lstat is refused atomically
596 // by the kernel rather than followed, never redirecting the read to an arbitrary file.
597 let f = open_nofollow(std::fs::OpenOptions::new().read(true), &path)?;
598 let size = f.metadata()?.len();
599 Ok((f, size))
600 }
601}
602
603/// Spawn the background reaper that periodically GCs abandoned `.partial` files (Go
604/// `feature/taildrop/delete.go`'s `fileDeleter`). It sweeps every [`DELETE_DELAY`], deleting partials
605/// older than `DELETE_DELAY` that have no in-flight transfer (see
606/// [`TaildropStore::reap_abandoned_partials`]), and exits when `shutdown` flips to `true`.
607///
608/// Returns a [`JoinHandle`](tokio::task::JoinHandle) the caller should abort on drop so the task
609/// never outlives the runtime (the established `reauth_bridge` / `DerpLatencyMeasurer` pattern). An
610/// `Arc<TaildropStore>` is held (cheap clone), so the sweep sees live `in_flight` state.
611///
612/// The first sweep is deferred one full `DELETE_DELAY` (not run at startup): a partial on disk at
613/// boot is by definition at least 0s old, and Go likewise only deletes after the delay elapses, so
614/// waiting one interval avoids reaping a partial a just-restarted node might still resume.
615pub fn spawn_partial_reaper(
616 store: Arc<TaildropStore>,
617 mut shutdown: tokio::sync::watch::Receiver<bool>,
618) -> tokio::task::JoinHandle<()> {
619 tokio::spawn(async move {
620 let mut tick = tokio::time::interval(DELETE_DELAY);
621 // `interval` fires immediately on the first `tick()`; consume that so the first real sweep is
622 // one `DELETE_DELAY` out (no startup reap — a partial must age the full delay first).
623 tick.tick().await;
624 loop {
625 tokio::select! {
626 _ = tick.tick() => {
627 let n = store.reap_abandoned_partials(SystemTime::now(), DELETE_DELAY);
628 if n > 0 {
629 tracing::info!(deleted = n, "taildrop reaper: swept abandoned partials");
630 }
631 }
632 _ = shutdown.wait_for(|x| *x) => break,
633 }
634 }
635 })
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641
642 fn tmp_root() -> PathBuf {
643 // A per-call atomic counter guarantees uniqueness across tests that run concurrently in the
644 // same binary. A timestamp alone is NOT enough: `SystemTime` resolution is coarse on some
645 // platforms, so two tests starting in the same tick would collide on one dir and stomp each
646 // other's files (the cause of intermittent taildrop-test flakiness under parallel runs).
647 static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
648 let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
649 let mut p = std::env::temp_dir();
650 p.push(format!("taildrop-test-{}-{n}", std::process::id()));
651 p
652 }
653
654 #[test]
655 fn validate_rejects_traversal_and_reserved() {
656 // Valid leaf names.
657 assert_eq!(validate_base_name("photo.jpg"), Some("photo.jpg"));
658 assert_eq!(
659 validate_base_name("a file with spaces.txt"),
660 Some("a file with spaces.txt")
661 );
662 assert_eq!(validate_base_name(".bashrc"), Some(".bashrc"));
663
664 // Traversal / separators.
665 assert_eq!(validate_base_name("../etc/passwd"), None);
666 assert_eq!(validate_base_name("a/b"), None);
667 assert_eq!(validate_base_name("a\\b"), None);
668 assert_eq!(validate_base_name("/abs"), None);
669 assert_eq!(validate_base_name(".."), None);
670 assert_eq!(validate_base_name("."), None);
671
672 // NUL / control.
673 assert_eq!(validate_base_name("a\0b"), None);
674 assert_eq!(validate_base_name("a\nb"), None);
675
676 // Reserved suffixes.
677 assert_eq!(validate_base_name("x.partial"), None);
678 assert_eq!(validate_base_name("x.deleted"), None);
679
680 // Edges.
681 assert_eq!(validate_base_name(""), None);
682 assert_eq!(validate_base_name(" leading"), None);
683 assert_eq!(validate_base_name("trailing "), None);
684 assert_eq!(validate_base_name(&"a".repeat(256)), None);
685 assert_eq!(
686 validate_base_name(&"a".repeat(255)).map(|s| s.len()),
687 Some(255)
688 );
689 }
690
691 #[tokio::test]
692 async fn put_file_writes_then_atomically_renames() {
693 let root = tmp_root();
694 let store = TaildropStore::new(&root).unwrap();
695
696 let data = b"hello taildrop";
697 let n = store
698 .put_file("greeting.txt", &data[..], 0, data.len() as u64)
699 .await
700 .unwrap();
701 assert_eq!(n, data.len() as u64);
702
703 // The final file exists; no .partial remains.
704 let body = std::fs::read(root.join("greeting.txt")).unwrap();
705 assert_eq!(body, data);
706 assert!(!root.join("greeting.txt.partial").exists());
707
708 let wf = store.waiting_files().unwrap();
709 assert_eq!(wf.len(), 1);
710 assert_eq!(wf[0].name, "greeting.txt");
711 assert_eq!(wf[0].size, data.len() as u64);
712
713 std::fs::remove_dir_all(&root).ok();
714 }
715
716 #[tokio::test]
717 async fn put_file_resumes_from_offset() {
718 let root = tmp_root();
719 let store = TaildropStore::new(&root).unwrap();
720
721 // Pre-write a prefix into the `.partial` directly, simulating bytes already received by an
722 // earlier (interrupted) transfer.
723 let prefix = b"the first half ";
724 let partial = root.join("resume.txt.partial");
725 std::fs::write(&partial, prefix).unwrap();
726
727 // Resume at offset == the prefix length: `put_file` opens the existing partial, seeks past
728 // the prefix, and appends the rest.
729 let rest = b"and the second half";
730 let total = store
731 .put_file(
732 "resume.txt",
733 &rest[..],
734 prefix.len() as u64,
735 (prefix.len() + rest.len()) as u64,
736 )
737 .await
738 .unwrap();
739
740 // The returned count is offset + freshly-copied bytes, and the final file is the prefix and
741 // the resumed bytes concatenated (the seek positioned the write correctly).
742 assert_eq!(total, (prefix.len() + rest.len()) as u64);
743 let body = std::fs::read(root.join("resume.txt")).unwrap();
744 let mut expected = prefix.to_vec();
745 expected.extend_from_slice(rest);
746 assert_eq!(body, expected);
747 assert!(!partial.exists());
748
749 std::fs::remove_dir_all(&root).ok();
750 }
751
752 #[tokio::test]
753 async fn put_file_short_body_leaves_partial_not_truncated_final() {
754 // F2: a body that ends before the declared length must NOT be finalized as a complete (but
755 // truncated) file under the real name. Go errors when copyLength != length; we leave the
756 // `.partial` in place for resume.
757 let root = tmp_root();
758 let store = TaildropStore::new(&root).unwrap();
759
760 // Reader yields 5 bytes but we declare 10 expected (a short/interrupted stream).
761 let err = store
762 .put_file("short.txt", &b"world"[..], 0, 10)
763 .await
764 .unwrap_err();
765 assert!(
766 matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::UnexpectedEof),
767 "a short body must error, got {err:?}"
768 );
769 // The final name was NEVER created; the partial remains with the bytes received so far.
770 assert!(!root.join("short.txt").exists(), "no truncated final file");
771 let partial = std::fs::read(root.join("short.txt.partial")).unwrap();
772 assert_eq!(
773 partial, b"world",
774 "partial holds the received prefix for resume"
775 );
776 assert!(store.waiting_files().unwrap().is_empty());
777
778 std::fs::remove_dir_all(&root).ok();
779 }
780
781 #[tokio::test]
782 async fn put_file_resume_offset_past_end_is_rejected() {
783 // F3: a resume offset beyond the current partial length must be rejected (Go errors
784 // "offset out of range"), not produce a zero-filled sparse hole.
785 let root = tmp_root();
786 let store = TaildropStore::new(&root).unwrap();
787 std::fs::write(root.join("sparse.txt.partial"), b"abc").unwrap(); // 3 bytes on disk
788
789 let err = store
790 .put_file("sparse.txt", &b"xyz"[..], 99, 102)
791 .await
792 .unwrap_err();
793 assert!(
794 matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
795 "offset past end must be rejected, got {err:?}"
796 );
797 // The partial is untouched (still 3 bytes), no final file.
798 assert_eq!(
799 std::fs::read(root.join("sparse.txt.partial")).unwrap(),
800 b"abc"
801 );
802 assert!(!root.join("sparse.txt").exists());
803
804 std::fs::remove_dir_all(&root).ok();
805 }
806
807 #[tokio::test]
808 async fn put_file_resume_truncates_stale_tail() {
809 // F3: resuming at an offset LESS than the current partial length must truncate the bytes
810 // past the offset (Go `Truncate(offset)`), so a stale tail from a prior attempt cannot
811 // survive past the newly-written end.
812 let root = tmp_root();
813 let store = TaildropStore::new(&root).unwrap();
814 // A prior attempt left 20 bytes; we resume at offset 5 with a 3-byte tail ⇒ final is 8 bytes.
815 std::fs::write(root.join("retry.txt.partial"), b"KEEPme-STALE-TAILxxx").unwrap();
816
817 let total = store
818 .put_file("retry.txt", &b"NEW"[..], 5, 8)
819 .await
820 .unwrap();
821 assert_eq!(total, 8);
822 let body = std::fs::read(root.join("retry.txt")).unwrap();
823 assert_eq!(
824 body, b"KEEPmNEW",
825 "bytes past offset 5 truncated, then NEW appended"
826 );
827
828 std::fs::remove_dir_all(&root).ok();
829 }
830
831 #[tokio::test]
832 async fn put_file_conflict_picks_non_clobbering_name() {
833 let root = tmp_root();
834 let store = TaildropStore::new(&root).unwrap();
835
836 store
837 .put_file("dup.txt", &b"first"[..], 0, 5)
838 .await
839 .unwrap();
840 store
841 .put_file("dup.txt", &b"second"[..], 0, 6)
842 .await
843 .unwrap();
844 store
845 .put_file("dup.txt", &b"third"[..], 0, 5)
846 .await
847 .unwrap();
848
849 // Original plus two non-clobbering renames.
850 assert!(root.join("dup.txt").exists());
851 assert!(root.join("dup (1).txt").exists());
852 assert!(root.join("dup (2).txt").exists());
853
854 let wf = store.waiting_files().unwrap();
855 assert_eq!(wf.len(), 3);
856
857 std::fs::remove_dir_all(&root).ok();
858 }
859
860 #[tokio::test]
861 async fn put_file_in_progress_partial_is_conflict() {
862 let root = tmp_root();
863 let store = TaildropStore::new(&root).unwrap();
864
865 // Simulate an in-flight transfer by pre-creating the .partial file.
866 std::fs::write(root.join("busy.txt.partial"), b"partial").unwrap();
867
868 let err = store
869 .put_file("busy.txt", &b"x"[..], 0, 1)
870 .await
871 .unwrap_err();
872 assert!(matches!(err, TaildropError::FileExists));
873
874 std::fs::remove_dir_all(&root).ok();
875 }
876
877 #[tokio::test]
878 async fn put_file_rejects_bad_name_before_any_io() {
879 let root = tmp_root();
880 let store = TaildropStore::new(&root).unwrap();
881
882 let err = store
883 .put_file("../escape", &b"x"[..], 0, 1)
884 .await
885 .unwrap_err();
886 assert!(matches!(err, TaildropError::InvalidFileName));
887 // Nothing was written anywhere.
888 assert!(store.waiting_files().unwrap().is_empty());
889
890 std::fs::remove_dir_all(&root).ok();
891 }
892
893 #[tokio::test]
894 async fn delete_and_open_roundtrip() {
895 let root = tmp_root();
896 let store = TaildropStore::new(&root).unwrap();
897
898 store.put_file("doc.bin", &b"abc"[..], 0, 3).await.unwrap();
899 let (_f, size) = store.open_file("doc.bin").unwrap();
900 assert_eq!(size, 3);
901
902 store.delete_file("doc.bin").unwrap();
903 assert!(store.waiting_files().unwrap().is_empty());
904
905 // Traversal can't reach outside the root.
906 assert!(matches!(
907 store.delete_file("../../etc/passwd"),
908 Err(TaildropError::InvalidFileName)
909 ));
910
911 std::fs::remove_dir_all(&root).ok();
912 }
913
914 #[tokio::test]
915 async fn concurrent_resume_for_same_name_is_serialized() {
916 // The in-flight name guard: while one transfer holds a base name, a second PUT for the SAME
917 // name (the resume-race the lock closes) is rejected with FileExists rather than interleaving
918 // writes into the shared `.partial`. We hold the first transfer open with a reader that never
919 // completes until we let it, then fire the second concurrently.
920 let root = tmp_root();
921 let store = Arc::new(TaildropStore::new(&root).unwrap());
922
923 // A reader that delivers a byte, then blocks until released — keeps transfer #1 in flight
924 // (and thus the name claimed) while we attempt transfer #2.
925 let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
926 struct BlockingReader {
927 sent: bool,
928 release: Option<tokio::sync::oneshot::Receiver<()>>,
929 }
930 impl AsyncRead for BlockingReader {
931 fn poll_read(
932 mut self: std::pin::Pin<&mut Self>,
933 cx: &mut std::task::Context<'_>,
934 buf: &mut tokio::io::ReadBuf<'_>,
935 ) -> std::task::Poll<io::Result<()>> {
936 if !self.sent {
937 buf.put_slice(b"x");
938 self.sent = true;
939 return std::task::Poll::Ready(Ok(()));
940 }
941 // After the first byte, park until released, then report EOF.
942 match self.release.as_mut() {
943 Some(rx) => match std::pin::Pin::new(rx).poll(cx) {
944 std::task::Poll::Ready(_) => {
945 self.release = None;
946 std::task::Poll::Ready(Ok(())) // EOF (no bytes written)
947 }
948 std::task::Poll::Pending => std::task::Poll::Pending,
949 },
950 None => std::task::Poll::Ready(Ok(())),
951 }
952 }
953 }
954
955 let s1 = store.clone();
956 let t1 = tokio::spawn(async move {
957 let reader = BlockingReader {
958 sent: false,
959 release: Some(release_rx),
960 };
961 // expected_len 1: completes once the single byte is read and the reader returns EOF.
962 s1.put_file("race.bin", reader, 0, 1).await
963 });
964
965 // Wait until transfer #1 has actually claimed the name (its partial exists).
966 let partial = root.join("race.bin.partial");
967 for _ in 0..200 {
968 if partial.exists() {
969 break;
970 }
971 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
972 }
973 assert!(
974 partial.exists(),
975 "transfer #1 should have created the partial"
976 );
977
978 // Transfer #2 for the SAME name, while #1 still holds the claim → FileExists, no interleave.
979 let err = store
980 .put_file("race.bin", &b"yy"[..], 1, 3)
981 .await
982 .unwrap_err();
983 assert!(
984 matches!(err, TaildropError::FileExists),
985 "a concurrent transfer for an in-flight name must be rejected, got {err:?}"
986 );
987
988 // Release #1 so it finalizes cleanly, and confirm the name frees afterward.
989 release_tx.send(()).unwrap();
990 t1.await.unwrap().unwrap();
991 // Now the name is free: a fresh transfer succeeds (the guard was released on drop).
992 store.put_file("race.bin", &b"z"[..], 0, 1).await.unwrap();
993
994 std::fs::remove_dir_all(&root).ok();
995 }
996
997 #[cfg(unix)]
998 #[tokio::test]
999 async fn symlink_in_store_root_is_refused_not_followed() {
1000 use std::os::unix::fs::symlink;
1001
1002 let root = tmp_root();
1003 let store = TaildropStore::new(&root).unwrap();
1004
1005 // An attacker-planted symlink in the store root, pointing at a sensitive file OUTSIDE it.
1006 let outside = tmp_root();
1007 std::fs::create_dir_all(&outside).unwrap();
1008 let secret = outside.join("secret");
1009 std::fs::write(&secret, b"TOP SECRET").unwrap();
1010
1011 // (a) open_file must refuse a symlink target, never read through it.
1012 let link = root.join("link.txt");
1013 symlink(&secret, &link).unwrap();
1014 let open_err = store.open_file("link.txt").unwrap_err();
1015 assert!(
1016 matches!(open_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
1017 "open_file must refuse a symlink, got {open_err:?}"
1018 );
1019
1020 // (b) delete_file must refuse the symlink, leaving BOTH the link and its target intact.
1021 let del_err = store.delete_file("link.txt").unwrap_err();
1022 assert!(
1023 matches!(del_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
1024 "delete_file must refuse a symlink, got {del_err:?}"
1025 );
1026 assert!(
1027 secret.exists(),
1028 "the symlink target must NOT have been deleted"
1029 );
1030 assert_eq!(std::fs::read(&secret).unwrap(), b"TOP SECRET");
1031
1032 // (c) waiting_files must not report the symlink as a waiting file.
1033 assert!(
1034 store.waiting_files().unwrap().is_empty(),
1035 "a symlink in the store root must not be listed as a waiting file"
1036 );
1037
1038 // (d) put_file onto a symlinked partial must refuse rather than write through the link.
1039 let link_partial = root.join("evil.bin.partial");
1040 symlink(&secret, &link_partial).unwrap();
1041 let put_err = store
1042 .put_file("evil.bin", &b"data"[..], 0, 4)
1043 .await
1044 .unwrap_err();
1045 assert!(
1046 matches!(put_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
1047 "put_file must refuse a symlinked partial, got {put_err:?}"
1048 );
1049 assert_eq!(
1050 std::fs::read(&secret).unwrap(),
1051 b"TOP SECRET",
1052 "the symlink target must NOT have been written through"
1053 );
1054
1055 std::fs::remove_dir_all(&root).ok();
1056 std::fs::remove_dir_all(&outside).ok();
1057 }
1058
1059 /// `open_nofollow` must refuse a final-component symlink **atomically in the kernel**, not merely
1060 /// via the advisory `refuse_symlink` lstat. This is the property that closes the TOCTOU window an
1061 /// external process could otherwise win by swapping the path for a symlink after the lstat: the
1062 /// open itself fails (`ELOOP`) on a symlinked final component. The test deliberately bypasses
1063 /// `refuse_symlink` and points `open_nofollow` straight at a symlink, so it would PASS-by-reading
1064 /// the target if the flag were dropped — i.e. it is a real regression guard for the `O_NOFOLLOW`
1065 /// wiring, not just a re-test of the advisory check.
1066 #[cfg(unix)]
1067 #[test]
1068 fn open_nofollow_refuses_a_symlinked_target_atomically() {
1069 use std::os::unix::fs::symlink;
1070
1071 let root = tmp_root();
1072 std::fs::create_dir_all(&root).unwrap();
1073 let outside = tmp_root();
1074 std::fs::create_dir_all(&outside).unwrap();
1075 let secret = outside.join("secret");
1076 std::fs::write(&secret, b"TOP SECRET").unwrap();
1077
1078 let link = root.join("link.bin");
1079 symlink(&secret, &link).unwrap();
1080
1081 // Read open through the symlink must fail at the kernel (ELOOP), never returning a handle to
1082 // the target. `ErrorKind::FilesystemLoop` is the mapped kind on current platforms; assert on
1083 // the raw `ELOOP` errno too so the guard holds even if the mapping changes.
1084 let read_err =
1085 open_nofollow(std::fs::OpenOptions::new().read(true), &link).expect_err("read open");
1086 assert_eq!(
1087 read_err.raw_os_error(),
1088 Some(libc::ELOOP),
1089 "O_NOFOLLOW read open of a symlink must fail with ELOOP, got {read_err:?}"
1090 );
1091
1092 // Write open (the resume path) likewise refuses the symlink atomically.
1093 let write_err =
1094 open_nofollow(std::fs::OpenOptions::new().write(true), &link).expect_err("write open");
1095 assert_eq!(
1096 write_err.raw_os_error(),
1097 Some(libc::ELOOP),
1098 "O_NOFOLLOW write open of a symlink must fail with ELOOP, got {write_err:?}"
1099 );
1100
1101 // The target was never opened/written through.
1102 assert_eq!(std::fs::read(&secret).unwrap(), b"TOP SECRET");
1103
1104 // A real (non-symlink) file at the same final name opens fine — O_NOFOLLOW only rejects a
1105 // symlinked final component, so the normal store path is unaffected.
1106 let regular = root.join("regular.bin");
1107 std::fs::write(®ular, b"hello").unwrap();
1108 let mut f = open_nofollow(std::fs::OpenOptions::new().read(true), ®ular)
1109 .expect("O_NOFOLLOW open of a regular file must succeed");
1110 let mut got = String::new();
1111 std::io::Read::read_to_string(&mut f, &mut got).unwrap();
1112 assert_eq!(got, "hello");
1113
1114 std::fs::remove_dir_all(&root).ok();
1115 std::fs::remove_dir_all(&outside).ok();
1116 }
1117
1118 #[test]
1119 fn next_available_name_inserts_before_extension() {
1120 let root = tmp_root();
1121 std::fs::create_dir_all(&root).unwrap();
1122 assert_eq!(next_available_name(&root, "a.txt"), "a.txt");
1123 std::fs::write(root.join("a.txt"), b"x").unwrap();
1124 assert_eq!(next_available_name(&root, "a.txt"), "a (1).txt");
1125 std::fs::write(root.join("a (1).txt"), b"x").unwrap();
1126 assert_eq!(next_available_name(&root, "a.txt"), "a (2).txt");
1127 // Dotfile (no real extension) appends at end.
1128 std::fs::write(root.join(".env"), b"x").unwrap();
1129 assert_eq!(next_available_name(&root, ".env"), ".env (1)");
1130 std::fs::remove_dir_all(&root).ok();
1131 }
1132
1133 /// The reaper deletes a `.partial` older than `delete_delay` and keeps a fresh one. Aging is
1134 /// driven by the passed-in `now` (the partial's real mtime is ~now): `now + 2h` with a 1h delay
1135 /// makes an existing partial look 2h old (reaped); `now` with a 1h delay keeps it (~0s old).
1136 #[test]
1137 fn reaper_deletes_aged_partial_keeps_fresh_and_final() {
1138 let root = tmp_root();
1139 let store = TaildropStore::new(&root).unwrap();
1140
1141 std::fs::write(root.join("abandoned.bin.partial"), b"half").unwrap();
1142 std::fs::write(root.join("done.bin"), b"complete").unwrap(); // a finished file, not a partial
1143
1144 let now = SystemTime::now();
1145 let delay = Duration::from_secs(3600);
1146
1147 // Fresh: nothing is older than the delay yet.
1148 assert_eq!(
1149 store.reap_abandoned_partials(now, delay),
1150 0,
1151 "nothing aged out"
1152 );
1153 assert!(root.join("abandoned.bin.partial").exists());
1154
1155 // Aged: 2h in the future vs a 1h delay → the partial is reaped, the final file is untouched.
1156 let reaped = store.reap_abandoned_partials(now + Duration::from_secs(2 * 3600), delay);
1157 assert_eq!(reaped, 1, "the aged partial is reaped");
1158 assert!(
1159 !root.join("abandoned.bin.partial").exists(),
1160 "aged partial deleted"
1161 );
1162 assert!(
1163 root.join("done.bin").exists(),
1164 "a completed (non-partial) file must never be reaped"
1165 );
1166
1167 std::fs::remove_dir_all(&root).ok();
1168 }
1169
1170 /// An in-flight transfer's partial is NEVER reaped, even when aged (Go's "no active put" check):
1171 /// while a base name is claimed, the reaper skips its partial regardless of mtime.
1172 #[tokio::test]
1173 async fn reaper_spares_in_flight_partial() {
1174 let root = tmp_root();
1175 let store = TaildropStore::new(&root).unwrap();
1176
1177 // Pre-write an (old) partial and claim its base name as in-flight.
1178 std::fs::write(root.join("live.bin.partial"), b"in progress").unwrap();
1179 let _claim = store.claim_in_flight("live.bin").expect("claim");
1180
1181 // Even with a far-future `now` (well past the delay), the in-flight partial is spared.
1182 let reaped = store.reap_abandoned_partials(
1183 SystemTime::now() + Duration::from_secs(48 * 3600),
1184 Duration::from_secs(3600),
1185 );
1186 assert_eq!(reaped, 0, "an in-flight partial must not be reaped");
1187 assert!(
1188 root.join("live.bin.partial").exists(),
1189 "the in-flight partial survives the sweep"
1190 );
1191
1192 // Once the claim drops, the same aged partial IS reaped.
1193 drop(_claim);
1194 let reaped = store.reap_abandoned_partials(
1195 SystemTime::now() + Duration::from_secs(48 * 3600),
1196 Duration::from_secs(3600),
1197 );
1198 assert_eq!(
1199 reaped, 1,
1200 "after the claim drops, the aged partial is reaped"
1201 );
1202
1203 std::fs::remove_dir_all(&root).ok();
1204 }
1205
1206 /// The background reaper task: (1) does NOT reap at startup, and (2) terminates when shutdown
1207 /// flips true. With the real DELETE_DELAY (1h) interval, the first real sweep is an hour out, so
1208 /// within this millisecond-scale test no sweep ever fires — which is exactly what proves both
1209 /// "the startup tick is skipped" (an aged partial present at start survives) and "the task is
1210 /// parked on the interval, woken only by shutdown". (`test-util`/paused-time is intentionally not
1211 /// pulled into this crate's deps, so this uses real time and the fact that 1h >> the test.)
1212 #[tokio::test]
1213 async fn reaper_task_skips_startup_then_terminates_on_shutdown() {
1214 let root = tmp_root();
1215 let store = Arc::new(TaildropStore::new(&root).unwrap());
1216 // A partial present at boot — it must survive (no startup reap fires within the test window).
1217 std::fs::write(root.join("boot.bin.partial"), b"x").unwrap();
1218
1219 let (tx, rx) = tokio::sync::watch::channel(false);
1220 let handle = spawn_partial_reaper(store.clone(), rx);
1221
1222 // Give the task a moment to reach its select!; the first real sweep is DELETE_DELAY (1h) out,
1223 // so nothing is reaped here.
1224 tokio::time::sleep(Duration::from_millis(50)).await;
1225 assert!(
1226 root.join("boot.bin.partial").exists(),
1227 "no reap before the first real sweep (startup tick is skipped; first sweep is 1h out)"
1228 );
1229
1230 // Flip shutdown: the task must terminate via its select! shutdown arm, not hang on the 1h
1231 // interval. If the shutdown arm were broken this `timeout` would elapse (the next tick is ~1h
1232 // away), so the timeout firing IS the regression signal.
1233 tx.send_replace(true);
1234 tokio::time::timeout(Duration::from_secs(5), handle)
1235 .await
1236 .expect("reaper must terminate on shutdown (not wait for the 1h interval)")
1237 .expect("reaper task must not panic");
1238
1239 std::fs::remove_dir_all(&root).ok();
1240 }
1241
1242 /// A reaper spawned when shutdown is ALREADY true terminates on its first loop iteration
1243 /// (`watch::Receiver::wait_for` returns immediately when the predicate already holds), rather
1244 /// than waiting a full DELETE_DELAY (1h) for the first tick.
1245 #[tokio::test]
1246 async fn reaper_task_terminates_when_shutdown_already_set() {
1247 let root = tmp_root();
1248 let store = Arc::new(TaildropStore::new(&root).unwrap());
1249 let (_tx, rx) = tokio::sync::watch::channel(true); // already shut down
1250 let handle = spawn_partial_reaper(store, rx);
1251 tokio::time::timeout(Duration::from_secs(5), handle)
1252 .await
1253 .expect("a reaper spawned post-shutdown must exit promptly, not wait the 1h interval")
1254 .expect("reaper task must not panic");
1255 std::fs::remove_dir_all(&root).ok();
1256 }
1257}