ts_runtime/taildrop.rs
1//! Taildrop file store — the receiving half of Tailscale's peer-to-peer file transfer.
2//!
3//! A peer sends a file to this node via the peerAPI route `PUT /v0/put/<name>` (handled in
4//! `peerapi`). This module owns the on-disk store those puts land in, faithfully mirroring
5//! Go's `taildrop.manager`:
6//!
7//! - Incoming bytes are written to a per-transfer **partial** file (`<base>.partial`) so an
8//! interrupted transfer never exposes a truncated file under its real name, and can be resumed
9//! from an offset.
10//! - On successful completion the partial is **atomically renamed** to the final base name. If the
11//! final name already exists, a non-clobbering ` (n)` suffix is chosen (Go `nextFilename`).
12//! - File names are strictly validated ([`validate_base_name`](crate::taildrop::validate_base_name)) to defeat path traversal and
13//! reserved-suffix abuse before any path is constructed — this is the security boundary.
14//!
15//! # Anti-abuse / safety
16//!
17//! Every name is validated to be a single, local, non-traversing path component before it touches
18//! the filesystem; a name containing `/`, `\`, `..`, a NUL, control chars, or the reserved
19//! `.partial` / `.deleted` suffixes is rejected with [`TaildropError::InvalidFileName`](crate::taildrop::TaildropError::InvalidFileName). The store
20//! root is fixed at construction; all I/O is confined to it by joining only validated base names.
21
22use std::{
23 collections::HashSet,
24 io::{self, Seek, Write},
25 path::{Path, PathBuf},
26 sync::{Arc, Mutex},
27 time::{Duration, SystemTime},
28};
29
30use tokio::io::{AsyncRead, AsyncReadExt};
31
32/// How long an abandoned `.partial` is kept before the reaper deletes it (Go
33/// `feature/taildrop/delete.go` `deleteDelay = time.Hour`). A resume within the window keeps the
34/// partial alive (its mtime advances on every write, and an in-flight transfer is skipped outright),
35/// so this is the grace period for a transfer to be resumed before its leftovers are reclaimed.
36pub const DELETE_DELAY: Duration = Duration::from_secs(60 * 60);
37
38/// Suffix for in-progress transfers. A completed transfer is renamed off this suffix; a name
39/// ending in it is itself never accepted as a base name (Go `partialSuffix`).
40const PARTIAL_SUFFIX: &str = ".partial";
41/// Suffix Go uses to tombstone files pending deletion on platforms with async close; we reject it
42/// as a base name for parity so a sender can't create one (Go `deletedSuffix`).
43const DELETED_SUFFIX: &str = ".deleted";
44/// Maximum base-name length in bytes (Go `validateBaseName`: 255).
45const MAX_BASE_NAME_LEN: usize = 255;
46
47/// Errors from the Taildrop file store.
48#[derive(Debug)]
49pub enum TaildropError {
50 /// The requested file name is invalid (traversal, reserved suffix, empty, too long, bad runes).
51 /// Maps to peerAPI `400 Bad Request`.
52 InvalidFileName,
53 /// A transfer for this exact base name is already in progress. Maps to peerAPI `409 Conflict`.
54 FileExists,
55 /// Underlying filesystem I/O failure. Maps to peerAPI `500`.
56 Io(io::Error),
57}
58
59impl core::fmt::Display for TaildropError {
60 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
61 match self {
62 TaildropError::InvalidFileName => write!(f, "invalid taildrop file name"),
63 TaildropError::FileExists => {
64 write!(f, "a transfer for this file is already in progress")
65 }
66 TaildropError::Io(e) => write!(f, "taildrop I/O error: {e}"),
67 }
68 }
69}
70
71impl std::error::Error for TaildropError {
72 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
73 match self {
74 TaildropError::Io(e) => Some(e),
75 _ => None,
76 }
77 }
78}
79
80impl From<io::Error> for TaildropError {
81 fn from(e: io::Error) -> Self {
82 TaildropError::Io(e)
83 }
84}
85
86/// A waiting (fully-received) Taildrop file, as reported to the embedder. Mirrors Go
87/// `apitype.WaitingFile` (default field-name JSON marshalling: `Name`, `Size`).
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct WaitingFile {
90 /// The file's base name.
91 pub name: String,
92 /// The file's size in bytes.
93 pub size: u64,
94}
95
96/// Validate a Taildrop base name, mirroring Go `taildrop.validateBaseName`.
97///
98/// Returns the name unchanged when it is a safe, single, local path component; otherwise `None`.
99/// Rejection rules (any one fails): empty or `> 255` bytes; leading/trailing ASCII space; contains
100/// a path separator (`/` or `\`), a NUL, or an ASCII control char; is `.` or `..`; equals a cleaned
101/// path other than itself (catches embedded `..`/`.` segments and absolute paths); or ends in the
102/// reserved `.partial` / `.deleted` suffixes.
103pub fn validate_base_name(name: &str) -> Option<&str> {
104 if name.is_empty() || name.len() > MAX_BASE_NAME_LEN {
105 return None;
106 }
107 if name.starts_with(' ') || name.ends_with(' ') {
108 return None;
109 }
110 if name == "." || name == ".." {
111 return None;
112 }
113 if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
114 return None;
115 }
116 // Reject any separator, NUL, or control character outright. This is the core traversal guard:
117 // with no `/`, `\`, or `..` segment possible, the name can only ever be a leaf in the store dir.
118 for ch in name.chars() {
119 if ch == '/' || ch == '\\' || ch == '\0' || ch.is_control() {
120 return None;
121 }
122 }
123 // Defense in depth: a name that does not survive `Path` normalization as a single normal
124 // component is rejected (catches `..`, absolute paths, and any platform-specific oddity).
125 let p = Path::new(name);
126 let mut comps = p.components();
127 match (comps.next(), comps.next()) {
128 (Some(std::path::Component::Normal(c)), None) if c == name => Some(name),
129 _ => None,
130 }
131}
132
133/// Choose a non-clobbering final name for `base` within `dir`, mirroring Go `nextFilename`:
134/// `foo.txt` -> `foo (1).txt` -> `foo (2).txt` ... inserting ` (n)` before the extension. Returns
135/// the first candidate (incl. `base` itself) whose path does not yet exist. Bounded to avoid an
136/// unbounded loop on a pathological directory.
137fn next_available_name(dir: &Path, base: &str) -> String {
138 if !path_present(&dir.join(base)) {
139 return base.to_string();
140 }
141 let (stem, ext) = match base.rsplit_once('.') {
142 // Keep the dot with the extension; an empty stem (dotfile like ".bashrc") has no split.
143 Some((stem, ext)) if !stem.is_empty() => (stem, format!(".{ext}")),
144 _ => (base, String::new()),
145 };
146 for n in 1..=10_000u32 {
147 let candidate = format!("{stem} ({n}){ext}");
148 if !path_present(&dir.join(&candidate)) {
149 return candidate;
150 }
151 }
152 // Pathological fallback: suffix with a high counter; extremely unlikely to be reached.
153 format!("{stem} (overflow){ext}")
154}
155
156/// Whether a path is present, treating a symlink (even a dangling one) as present. Unlike
157/// `Path::exists()` (which follows the link and returns `false` for a dangling symlink), this uses
158/// `symlink_metadata` so a planted symlink can never be mistaken for a free name in
159/// [`next_available_name`] — we must not select, then rename onto, a symlink.
160fn path_present(path: &Path) -> bool {
161 std::fs::symlink_metadata(path).is_ok()
162}
163
164/// Reject a path that is (or whose final component is) a symlink, mirroring the intent of Go's
165/// `O_NOFOLLOW` on the taildrop file ops. `validate_base_name` already blocks a traversing *name*,
166/// but not a symlink **component already planted in the store root** by a local attacker (e.g.
167/// `root/foo.txt -> /etc/cron.d/x`), which a plain `open`/`rename`/`remove` would follow. Uses
168/// `symlink_metadata` (lstat — does NOT follow the final symlink); a non-existent path is fine
169/// (returns `Ok(())`), only an existing symlink is refused.
170///
171/// This is a check-then-act guard, so it is not atomic with the open/rename/remove that follows.
172/// It kills the **persistent-plant** attack (a symlink left in the store root is no longer followed
173/// deterministically), and the per-name in-flight lock serializes our OWN operations on a name, but
174/// it does not close a sub-millisecond race where an external process swaps the path for a symlink
175/// between this lstat and the syscall. That residual requires an external writer who already holds
176/// store-dir write access (the threat bound for this hardening), and is the one axis where this is
177/// weaker than Go's atomic `O_NOFOLLOW`. The `offset == 0` put is additionally protected by
178/// `create_new` (`O_EXCL`, which refuses an existing symlink atomically); the `offset > 0` resume
179/// open is plain `write(true)` and relies on this advisory check until `O_NOFOLLOW` is wired
180/// (tracked as a follow-up — its raw value is arch-dependent, so a portable open flag is deferred).
181fn refuse_symlink(path: &Path) -> Result<(), TaildropError> {
182 match std::fs::symlink_metadata(path) {
183 Ok(meta) if meta.file_type().is_symlink() => Err(TaildropError::Io(io::Error::new(
184 io::ErrorKind::InvalidInput,
185 "taildrop path is a symlink; refusing to follow it",
186 ))),
187 Ok(_) => Ok(()),
188 // Not present yet (the common case for a fresh partial / final name) — nothing to refuse.
189 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
190 Err(e) => Err(e.into()),
191 }
192}
193
194/// A Taildrop file store rooted at a fixed directory. All operations are confined to this root by
195/// joining only [`validate_base_name`]-validated names.
196#[derive(Debug, Clone)]
197pub struct TaildropStore {
198 root: PathBuf,
199 /// Base names with a transfer currently in flight. A `put_file` claims its base name here for
200 /// the whole receive (both a fresh `offset == 0` transfer and a resumed `offset > 0` one), so
201 /// two concurrent PUTs for the same name cannot interleave `set_len`/`seek`/`write_all` and
202 /// corrupt the shared `.partial`. Shared (`Arc`) so it survives `TaildropStore::clone()` — the
203 /// store is handed around as `Arc<TaildropStore>` but cloning it must not fork the guard set.
204 in_flight: Arc<Mutex<HashSet<String>>>,
205}
206
207/// RAII claim on an in-flight transfer name; releasing it (on drop) frees the name for the next
208/// transfer. Holds the shared guard set so the entry is removed even on an early return / error /
209/// panic in `put_file`.
210struct InFlightGuard {
211 set: Arc<Mutex<HashSet<String>>>,
212 name: String,
213}
214
215impl Drop for InFlightGuard {
216 fn drop(&mut self) {
217 // A poisoned lock still lets us recover the set and remove our entry — leaving a stale name
218 // claimed would wedge all future transfers of that name behind a phantom conflict.
219 let mut set = self.set.lock().unwrap_or_else(|p| p.into_inner());
220 set.remove(&self.name);
221 }
222}
223
224impl TaildropStore {
225 /// Create a store rooted at `root`, creating the directory (and parents) if needed.
226 pub fn new(root: impl Into<PathBuf>) -> Result<Self, TaildropError> {
227 let root = root.into();
228 std::fs::create_dir_all(&root)?;
229 Ok(Self {
230 root,
231 in_flight: Arc::new(Mutex::new(HashSet::new())),
232 })
233 }
234
235 /// Claim `base` as in-flight, returning an RAII guard that frees it on drop. Returns
236 /// [`TaildropError::FileExists`] if another transfer already holds the name — this is the
237 /// concurrency analog of the on-disk `.partial` conflict, and it serializes all transfers of one
238 /// name so a resume (`offset > 0`) cannot race a concurrent transfer's `set_len`/`seek`/`write`.
239 fn claim_in_flight(&self, base: &str) -> Result<InFlightGuard, TaildropError> {
240 let name = base.to_string();
241 let mut set = self.in_flight.lock().unwrap_or_else(|p| p.into_inner());
242 if !set.insert(name.clone()) {
243 return Err(TaildropError::FileExists);
244 }
245 Ok(InFlightGuard {
246 set: self.in_flight.clone(),
247 name,
248 })
249 }
250
251 /// The partial-file path for an already-validated base name.
252 fn partial_path(&self, base: &str) -> PathBuf {
253 self.root.join(format!("{base}{PARTIAL_SUFFIX}"))
254 }
255
256 /// Reap abandoned `.partial` files: delete every `<base>.partial` in the store root whose last
257 /// modification is older than `delete_delay` relative to `now` AND whose base name has no
258 /// in-flight transfer. Returns the number deleted. Mirrors Go `feature/taildrop/delete.go`'s
259 /// `fileDeleter`, which GCs a partial `deleteDelay` (1h) after it was last touched, sparing one
260 /// that an active put is still writing.
261 ///
262 /// This fork has no per-file timer queue (the store is a passive `Arc`, not an actor); instead a
263 /// periodic background sweep — see [`spawn_partial_reaper`] — calls this. The two Go cancellation
264 /// signals are both honored: an **active** transfer's base name is in `in_flight` (skipped here,
265 /// the analog of Go's "no active put" check), and a **resumed** transfer advances the partial's
266 /// mtime on every write (so a partial resumed within the window looks recent and is spared). A
267 /// permanently-abandoned partial is neither, so it ages out and is deleted, reclaiming the disk
268 /// and clearing the stale-partial `409` that would otherwise block an `offset == 0` re-send of
269 /// the same name forever.
270 ///
271 /// `now` and `delete_delay` are parameters (not read from the clock here) so the reap logic is
272 /// deterministically testable. A partial whose mtime is unreadable or in the future is treated as
273 /// fresh (kept) — fail-safe toward never deleting a file that might still be live.
274 pub fn reap_abandoned_partials(&self, now: SystemTime, delete_delay: Duration) -> usize {
275 let entries = match std::fs::read_dir(&self.root) {
276 Ok(e) => e,
277 Err(e) if e.kind() == io::ErrorKind::NotFound => return 0,
278 Err(e) => {
279 tracing::warn!(error = %e, "taildrop reaper: cannot read store dir");
280 return 0;
281 }
282 };
283 // Snapshot the in-flight names once; an active transfer must never have its partial reaped.
284 let in_flight: HashSet<String> = self
285 .in_flight
286 .lock()
287 .unwrap_or_else(|p| p.into_inner())
288 .clone();
289
290 let mut deleted = 0usize;
291 for entry in entries.flatten() {
292 // `metadata()` here is lstat-based (does not follow symlinks); a symlink is never a
293 // partial we created, so `is_file()` is false for it and it is skipped — consistent with
294 // the symlink refusal elsewhere in this store.
295 let Ok(meta) = entry.metadata() else { continue };
296 if !meta.is_file() {
297 continue;
298 }
299 let Ok(name) = entry.file_name().into_string() else {
300 continue;
301 };
302 let Some(base) = name.strip_suffix(PARTIAL_SUFFIX) else {
303 continue; // not a partial
304 };
305 if in_flight.contains(base) {
306 continue; // an active transfer owns this partial — never reap it (Go "no active put")
307 }
308 // Age check: keep anything modified within `delete_delay` of `now`. An unreadable or
309 // future mtime is treated as fresh (kept) — fail-safe toward not deleting a live file.
310 let age_ok = meta
311 .modified()
312 .ok()
313 .and_then(|m| now.duration_since(m).ok())
314 .is_some_and(|age| age >= delete_delay);
315 if !age_ok {
316 continue;
317 }
318 // Final guard against the snapshot TOCTOU: re-check in-flight membership under the lock
319 // immediately before deleting, so a transfer that claimed this base AFTER the upfront
320 // snapshot (the microsecond window between the snapshot and here) still spares its
321 // partial. The mtime check above already makes a wrong delete unreachable in practice (a
322 // live partial is < delete_delay old), but this closes the window completely and cheaply
323 // (one lock per aged candidate, which is rare).
324 if self
325 .in_flight
326 .lock()
327 .unwrap_or_else(|p| p.into_inner())
328 .contains(base)
329 {
330 continue;
331 }
332 match std::fs::remove_file(entry.path()) {
333 Ok(()) => {
334 deleted += 1;
335 tracing::info!(partial = %name, "taildrop reaper: deleted abandoned partial");
336 }
337 Err(e) if e.kind() == io::ErrorKind::NotFound => {} // already gone, fine
338 Err(e) => {
339 tracing::warn!(error = %e, partial = %name, "taildrop reaper: delete failed")
340 }
341 }
342 }
343 deleted
344 }
345
346 /// Receive a file named `name` from `reader`, writing to `<name>.partial` then atomically
347 /// renaming to a non-clobbering final name on success. Mirrors Go `manager.PutFile`.
348 ///
349 /// `offset` lets a resumed transfer append past already-written bytes (the partial is opened, the
350 /// write starts at `offset`, and any bytes already on disk past `offset` are truncated away).
351 /// `expected_len` is the declared total length of the completed file (the request's
352 /// `Content-Length` plus `offset`); the transfer is finalized only if exactly that many bytes are
353 /// present. Returns the total number of bytes in the completed file.
354 ///
355 /// Fail-closed: an invalid name is rejected before any path is built; an in-progress partial for
356 /// the same name yields [`TaildropError::FileExists`]; an out-of-range resume `offset` (past the
357 /// current partial length) is rejected; an I/O error mid-transfer — or a body that ends before
358 /// `expected_len` (a short/interrupted stream) — leaves the `.partial` on disk and the final name
359 /// is never created. This matches Go `feature/taildrop/send.go`, which errors when the copied
360 /// length does not equal the declared length rather than publishing a truncated file.
361 ///
362 /// The retained `.partial` is resumable only by a peer that issues a ranged retry (an `offset > 0`
363 /// PUT); a sender that always restarts at `offset == 0` will instead hit the in-progress-conflict
364 /// path ([`TaildropError::FileExists`]) until the stale partial is cleared. A permanently-abandoned
365 /// partial is reclaimed by the background reaper after [`DELETE_DELAY`] (Go's `fileDeleter`
366 /// equivalent — see [`reap_abandoned_partials`](Self::reap_abandoned_partials) /
367 /// [`spawn_partial_reaper`]), which also clears that `offset == 0` conflict once the stale partial
368 /// ages out.
369 pub async fn put_file<R>(
370 &self,
371 name: &str,
372 mut reader: R,
373 offset: u64,
374 expected_len: u64,
375 ) -> Result<u64, TaildropError>
376 where
377 R: AsyncRead + Unpin,
378 {
379 let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
380 let partial = self.partial_path(base);
381
382 // Claim the name for the whole transfer FIRST, so two concurrent PUTs for the same base name
383 // (especially two resumes, `offset > 0`, which reopen the same `.partial`) cannot interleave
384 // their `set_len`/`seek`/`write_all` and corrupt the shared partial. The fresh-transfer path
385 // is already protected on disk by `create_new`, but the resume path opens with plain
386 // `write(true)` and needs this lock. The guard frees the name on drop (incl. early return /
387 // error / panic). Held across the await — it is a cheap `HashSet` membership marker, not a
388 // lock held during I/O, so it never blocks the runtime.
389 let _claim = self.claim_in_flight(base)?;
390
391 // Refuse to follow a symlink planted in the store root (Go's `O_NOFOLLOW` intent): the
392 // partial must be a regular file we create/own, never a pre-existing symlink to elsewhere.
393 refuse_symlink(&partial)?;
394
395 // A fresh transfer (offset 0) must not collide with another in-flight transfer of the same
396 // name; a resume (offset > 0) reopens the existing partial. File handles are std (the tokio
397 // `fs` feature is intentionally not enabled in this crate); the body is read async off the
398 // overlay stream and written to the blocking handle in a bounded loop.
399 let mut file = if offset == 0 {
400 match std::fs::OpenOptions::new()
401 .write(true)
402 .create_new(true)
403 .open(&partial)
404 {
405 Ok(f) => f,
406 Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
407 return Err(TaildropError::FileExists);
408 }
409 Err(e) => return Err(e.into()),
410 }
411 } else {
412 let mut f = std::fs::OpenOptions::new().write(true).open(&partial)?;
413 // Bound the resume offset to the current partial length and truncate any bytes past it,
414 // matching Go `feature/taildrop/fileops_fs.go` (`OpenWriter` rejects `offset > curr` and
415 // `Truncate(offset)`s). Without the bound a too-large offset would leave a zero-filled
416 // sparse hole; without the truncate a shorter resumed body would leave a prior attempt's
417 // stale tail past the new end. `metadata().len()` is the partial's current size.
418 let current = f.metadata()?.len();
419 if offset > current {
420 return Err(TaildropError::Io(io::Error::new(
421 io::ErrorKind::InvalidInput,
422 "taildrop resume offset is past the end of the partial file",
423 )));
424 }
425 f.set_len(offset)?;
426 f.seek(io::SeekFrom::Start(offset))?;
427 f
428 };
429
430 let mut copied: u64 = 0;
431 let mut buf = [0u8; 64 * 1024];
432 loop {
433 let n = reader.read(&mut buf).await?;
434 if n == 0 {
435 break;
436 }
437 // Each `write_all` only pushes the chunk into the page cache (microseconds); the
438 // genuinely blocking cost is the terminal `flush`/`sync_all`/`rename` below, which we
439 // hand to a blocking thread so a flood of concurrent transfers can't starve the tokio
440 // worker pool on fsync (see `peerapi::MAX_INFLIGHT`).
441 file.write_all(&buf[..n])?;
442 copied += n as u64;
443 }
444
445 // Length check (Go `send.go`: error when `copyLength != length`). A body that ended before the
446 // declared length — an interrupted/short stream — must NOT be finalized as a complete file;
447 // leave the `.partial` on disk (with the bytes received so far) so a Range-capable peer can
448 // resume it. `checked_add` rather than a bare `+`: `offset` is an attacker-supplied header and
449 // the bound above already rejects an `offset` past the (real, on-disk) partial length, so this
450 // cannot overflow in practice — but treat an overflow as a length mismatch rather than a panic.
451 let total = match offset.checked_add(copied) {
452 Some(t) if t == expected_len => t,
453 _ => {
454 return Err(TaildropError::Io(io::Error::new(
455 io::ErrorKind::UnexpectedEof,
456 format!(
457 "taildrop body ended early: got {copied} of {expected_len} expected bytes \
458 at offset {offset}; leaving partial for resume"
459 ),
460 )));
461 }
462 };
463
464 // Finalize off the async runtime: `sync_all` (fsync) and `rename` are the dominant blocking
465 // operations, so run them on a blocking thread. The `File` and both paths are owned by the
466 // closure (`Send + 'static`), and `next_available_name` (which `stat`s candidates) goes with
467 // them. Fail-closed: any I/O error — or a join failure — propagates without ever publishing
468 // the final name, leaving the `.partial` in place for a later resume.
469 let root = self.root.clone();
470 let base = base.to_string();
471 tokio::task::spawn_blocking(move || -> io::Result<()> {
472 file.flush()?;
473 file.sync_all()?;
474 drop(file);
475
476 // Atomically publish under a non-clobbering final name. `next_available_name` probes
477 // candidates with `symlink_metadata` (not `exists`, which follows symlinks), so it will
478 // not treat a planted symlink as "free" and rename onto it; and we refuse to rename onto
479 // an existing symlink target outright (Go `O_NOFOLLOW` intent). The `_claim` guard (held
480 // by the caller for the whole transfer) keeps this name serialized against other PUTs.
481 let final_name = next_available_name(&root, &base);
482 let final_path = root.join(&final_name);
483 if let Err(e) = refuse_symlink(&final_path) {
484 return Err(match e {
485 TaildropError::Io(io_err) => io_err,
486 other => io::Error::other(other.to_string()),
487 });
488 }
489 std::fs::rename(&partial, &final_path)?;
490 Ok(())
491 })
492 .await
493 .map_err(|join_err| {
494 // A panicked/cancelled finalize task: surface as I/O so the caller maps it to a 500 and
495 // the partial is left untouched (never publishes the final name).
496 TaildropError::Io(io::Error::other(format!(
497 "taildrop finalize task failed: {join_err}"
498 )))
499 })??;
500
501 Ok(total)
502 }
503
504 /// List fully-received (non-partial) files, sorted by name (Go `WaitingFiles`).
505 pub fn waiting_files(&self) -> Result<Vec<WaitingFile>, TaildropError> {
506 let mut out = Vec::new();
507 let entries = match std::fs::read_dir(&self.root) {
508 Ok(e) => e,
509 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
510 Err(e) => return Err(e.into()),
511 };
512 for entry in entries {
513 let entry = entry?;
514 // `entry.metadata()` does NOT follow symlinks (it is `lstat`-based), so a planted
515 // symlink has `is_file() == false` here and is skipped — a symlink in the store root is
516 // never reported as a waiting file (Go `O_NOFOLLOW` intent), even one pointing at a real
517 // regular file elsewhere.
518 let meta = entry.metadata()?;
519 if meta.file_type().is_symlink() || !meta.is_file() {
520 continue;
521 }
522 let Ok(name) = entry.file_name().into_string() else {
523 continue;
524 };
525 // Skip in-progress / tombstoned files.
526 if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
527 continue;
528 }
529 out.push(WaitingFile {
530 name,
531 size: meta.len(),
532 });
533 }
534 out.sort_by(|a, b| a.name.cmp(&b.name));
535 Ok(out)
536 }
537
538 /// Delete a fully-received file by base name (Go `DeleteFile`). The name is validated first, so a
539 /// traversal attempt can never escape the store root, and a symlink at the target is refused (Go
540 /// `O_NOFOLLOW` intent) rather than followed — a planted `root/foo.txt -> /etc/passwd` must not
541 /// let a `delete foo.txt` remove the link's target.
542 pub fn delete_file(&self, name: &str) -> Result<(), TaildropError> {
543 let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
544 let path = self.root.join(base);
545 refuse_symlink(&path)?;
546 std::fs::remove_file(path)?;
547 Ok(())
548 }
549
550 /// Open a fully-received file by base name for reading, returning the handle and its size (Go
551 /// `OpenFile`). The name is validated first, and a symlink at the target is refused (Go
552 /// `O_NOFOLLOW` intent) so a planted symlink cannot redirect the read to an arbitrary file.
553 pub fn open_file(&self, name: &str) -> Result<(std::fs::File, u64), TaildropError> {
554 let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
555 let path = self.root.join(base);
556 refuse_symlink(&path)?;
557 let f = std::fs::File::open(path)?;
558 let size = f.metadata()?.len();
559 Ok((f, size))
560 }
561}
562
563/// Spawn the background reaper that periodically GCs abandoned `.partial` files (Go
564/// `feature/taildrop/delete.go`'s `fileDeleter`). It sweeps every [`DELETE_DELAY`], deleting partials
565/// older than `DELETE_DELAY` that have no in-flight transfer (see
566/// [`TaildropStore::reap_abandoned_partials`]), and exits when `shutdown` flips to `true`.
567///
568/// Returns a [`JoinHandle`](tokio::task::JoinHandle) the caller should abort on drop so the task
569/// never outlives the runtime (the established `reauth_bridge` / `DerpLatencyMeasurer` pattern). An
570/// `Arc<TaildropStore>` is held (cheap clone), so the sweep sees live `in_flight` state.
571///
572/// The first sweep is deferred one full `DELETE_DELAY` (not run at startup): a partial on disk at
573/// boot is by definition at least 0s old, and Go likewise only deletes after the delay elapses, so
574/// waiting one interval avoids reaping a partial a just-restarted node might still resume.
575pub fn spawn_partial_reaper(
576 store: Arc<TaildropStore>,
577 mut shutdown: tokio::sync::watch::Receiver<bool>,
578) -> tokio::task::JoinHandle<()> {
579 tokio::spawn(async move {
580 let mut tick = tokio::time::interval(DELETE_DELAY);
581 // `interval` fires immediately on the first `tick()`; consume that so the first real sweep is
582 // one `DELETE_DELAY` out (no startup reap — a partial must age the full delay first).
583 tick.tick().await;
584 loop {
585 tokio::select! {
586 _ = tick.tick() => {
587 let n = store.reap_abandoned_partials(SystemTime::now(), DELETE_DELAY);
588 if n > 0 {
589 tracing::info!(deleted = n, "taildrop reaper: swept abandoned partials");
590 }
591 }
592 _ = shutdown.wait_for(|x| *x) => break,
593 }
594 }
595 })
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601
602 fn tmp_root() -> PathBuf {
603 // A per-call atomic counter guarantees uniqueness across tests that run concurrently in the
604 // same binary. A timestamp alone is NOT enough: `SystemTime` resolution is coarse on some
605 // platforms, so two tests starting in the same tick would collide on one dir and stomp each
606 // other's files (the cause of intermittent taildrop-test flakiness under parallel runs).
607 static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
608 let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
609 let mut p = std::env::temp_dir();
610 p.push(format!("taildrop-test-{}-{n}", std::process::id()));
611 p
612 }
613
614 #[test]
615 fn validate_rejects_traversal_and_reserved() {
616 // Valid leaf names.
617 assert_eq!(validate_base_name("photo.jpg"), Some("photo.jpg"));
618 assert_eq!(
619 validate_base_name("a file with spaces.txt"),
620 Some("a file with spaces.txt")
621 );
622 assert_eq!(validate_base_name(".bashrc"), Some(".bashrc"));
623
624 // Traversal / separators.
625 assert_eq!(validate_base_name("../etc/passwd"), None);
626 assert_eq!(validate_base_name("a/b"), None);
627 assert_eq!(validate_base_name("a\\b"), None);
628 assert_eq!(validate_base_name("/abs"), None);
629 assert_eq!(validate_base_name(".."), None);
630 assert_eq!(validate_base_name("."), None);
631
632 // NUL / control.
633 assert_eq!(validate_base_name("a\0b"), None);
634 assert_eq!(validate_base_name("a\nb"), None);
635
636 // Reserved suffixes.
637 assert_eq!(validate_base_name("x.partial"), None);
638 assert_eq!(validate_base_name("x.deleted"), None);
639
640 // Edges.
641 assert_eq!(validate_base_name(""), None);
642 assert_eq!(validate_base_name(" leading"), None);
643 assert_eq!(validate_base_name("trailing "), None);
644 assert_eq!(validate_base_name(&"a".repeat(256)), None);
645 assert_eq!(
646 validate_base_name(&"a".repeat(255)).map(|s| s.len()),
647 Some(255)
648 );
649 }
650
651 #[tokio::test]
652 async fn put_file_writes_then_atomically_renames() {
653 let root = tmp_root();
654 let store = TaildropStore::new(&root).unwrap();
655
656 let data = b"hello taildrop";
657 let n = store
658 .put_file("greeting.txt", &data[..], 0, data.len() as u64)
659 .await
660 .unwrap();
661 assert_eq!(n, data.len() as u64);
662
663 // The final file exists; no .partial remains.
664 let body = std::fs::read(root.join("greeting.txt")).unwrap();
665 assert_eq!(body, data);
666 assert!(!root.join("greeting.txt.partial").exists());
667
668 let wf = store.waiting_files().unwrap();
669 assert_eq!(wf.len(), 1);
670 assert_eq!(wf[0].name, "greeting.txt");
671 assert_eq!(wf[0].size, data.len() as u64);
672
673 std::fs::remove_dir_all(&root).ok();
674 }
675
676 #[tokio::test]
677 async fn put_file_resumes_from_offset() {
678 let root = tmp_root();
679 let store = TaildropStore::new(&root).unwrap();
680
681 // Pre-write a prefix into the `.partial` directly, simulating bytes already received by an
682 // earlier (interrupted) transfer.
683 let prefix = b"the first half ";
684 let partial = root.join("resume.txt.partial");
685 std::fs::write(&partial, prefix).unwrap();
686
687 // Resume at offset == the prefix length: `put_file` opens the existing partial, seeks past
688 // the prefix, and appends the rest.
689 let rest = b"and the second half";
690 let total = store
691 .put_file(
692 "resume.txt",
693 &rest[..],
694 prefix.len() as u64,
695 (prefix.len() + rest.len()) as u64,
696 )
697 .await
698 .unwrap();
699
700 // The returned count is offset + freshly-copied bytes, and the final file is the prefix and
701 // the resumed bytes concatenated (the seek positioned the write correctly).
702 assert_eq!(total, (prefix.len() + rest.len()) as u64);
703 let body = std::fs::read(root.join("resume.txt")).unwrap();
704 let mut expected = prefix.to_vec();
705 expected.extend_from_slice(rest);
706 assert_eq!(body, expected);
707 assert!(!partial.exists());
708
709 std::fs::remove_dir_all(&root).ok();
710 }
711
712 #[tokio::test]
713 async fn put_file_short_body_leaves_partial_not_truncated_final() {
714 // F2: a body that ends before the declared length must NOT be finalized as a complete (but
715 // truncated) file under the real name. Go errors when copyLength != length; we leave the
716 // `.partial` in place for resume.
717 let root = tmp_root();
718 let store = TaildropStore::new(&root).unwrap();
719
720 // Reader yields 5 bytes but we declare 10 expected (a short/interrupted stream).
721 let err = store
722 .put_file("short.txt", &b"world"[..], 0, 10)
723 .await
724 .unwrap_err();
725 assert!(
726 matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::UnexpectedEof),
727 "a short body must error, got {err:?}"
728 );
729 // The final name was NEVER created; the partial remains with the bytes received so far.
730 assert!(!root.join("short.txt").exists(), "no truncated final file");
731 let partial = std::fs::read(root.join("short.txt.partial")).unwrap();
732 assert_eq!(
733 partial, b"world",
734 "partial holds the received prefix for resume"
735 );
736 assert!(store.waiting_files().unwrap().is_empty());
737
738 std::fs::remove_dir_all(&root).ok();
739 }
740
741 #[tokio::test]
742 async fn put_file_resume_offset_past_end_is_rejected() {
743 // F3: a resume offset beyond the current partial length must be rejected (Go errors
744 // "offset out of range"), not produce a zero-filled sparse hole.
745 let root = tmp_root();
746 let store = TaildropStore::new(&root).unwrap();
747 std::fs::write(root.join("sparse.txt.partial"), b"abc").unwrap(); // 3 bytes on disk
748
749 let err = store
750 .put_file("sparse.txt", &b"xyz"[..], 99, 102)
751 .await
752 .unwrap_err();
753 assert!(
754 matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
755 "offset past end must be rejected, got {err:?}"
756 );
757 // The partial is untouched (still 3 bytes), no final file.
758 assert_eq!(
759 std::fs::read(root.join("sparse.txt.partial")).unwrap(),
760 b"abc"
761 );
762 assert!(!root.join("sparse.txt").exists());
763
764 std::fs::remove_dir_all(&root).ok();
765 }
766
767 #[tokio::test]
768 async fn put_file_resume_truncates_stale_tail() {
769 // F3: resuming at an offset LESS than the current partial length must truncate the bytes
770 // past the offset (Go `Truncate(offset)`), so a stale tail from a prior attempt cannot
771 // survive past the newly-written end.
772 let root = tmp_root();
773 let store = TaildropStore::new(&root).unwrap();
774 // A prior attempt left 20 bytes; we resume at offset 5 with a 3-byte tail ⇒ final is 8 bytes.
775 std::fs::write(root.join("retry.txt.partial"), b"KEEPme-STALE-TAILxxx").unwrap();
776
777 let total = store
778 .put_file("retry.txt", &b"NEW"[..], 5, 8)
779 .await
780 .unwrap();
781 assert_eq!(total, 8);
782 let body = std::fs::read(root.join("retry.txt")).unwrap();
783 assert_eq!(
784 body, b"KEEPmNEW",
785 "bytes past offset 5 truncated, then NEW appended"
786 );
787
788 std::fs::remove_dir_all(&root).ok();
789 }
790
791 #[tokio::test]
792 async fn put_file_conflict_picks_non_clobbering_name() {
793 let root = tmp_root();
794 let store = TaildropStore::new(&root).unwrap();
795
796 store
797 .put_file("dup.txt", &b"first"[..], 0, 5)
798 .await
799 .unwrap();
800 store
801 .put_file("dup.txt", &b"second"[..], 0, 6)
802 .await
803 .unwrap();
804 store
805 .put_file("dup.txt", &b"third"[..], 0, 5)
806 .await
807 .unwrap();
808
809 // Original plus two non-clobbering renames.
810 assert!(root.join("dup.txt").exists());
811 assert!(root.join("dup (1).txt").exists());
812 assert!(root.join("dup (2).txt").exists());
813
814 let wf = store.waiting_files().unwrap();
815 assert_eq!(wf.len(), 3);
816
817 std::fs::remove_dir_all(&root).ok();
818 }
819
820 #[tokio::test]
821 async fn put_file_in_progress_partial_is_conflict() {
822 let root = tmp_root();
823 let store = TaildropStore::new(&root).unwrap();
824
825 // Simulate an in-flight transfer by pre-creating the .partial file.
826 std::fs::write(root.join("busy.txt.partial"), b"partial").unwrap();
827
828 let err = store
829 .put_file("busy.txt", &b"x"[..], 0, 1)
830 .await
831 .unwrap_err();
832 assert!(matches!(err, TaildropError::FileExists));
833
834 std::fs::remove_dir_all(&root).ok();
835 }
836
837 #[tokio::test]
838 async fn put_file_rejects_bad_name_before_any_io() {
839 let root = tmp_root();
840 let store = TaildropStore::new(&root).unwrap();
841
842 let err = store
843 .put_file("../escape", &b"x"[..], 0, 1)
844 .await
845 .unwrap_err();
846 assert!(matches!(err, TaildropError::InvalidFileName));
847 // Nothing was written anywhere.
848 assert!(store.waiting_files().unwrap().is_empty());
849
850 std::fs::remove_dir_all(&root).ok();
851 }
852
853 #[tokio::test]
854 async fn delete_and_open_roundtrip() {
855 let root = tmp_root();
856 let store = TaildropStore::new(&root).unwrap();
857
858 store.put_file("doc.bin", &b"abc"[..], 0, 3).await.unwrap();
859 let (_f, size) = store.open_file("doc.bin").unwrap();
860 assert_eq!(size, 3);
861
862 store.delete_file("doc.bin").unwrap();
863 assert!(store.waiting_files().unwrap().is_empty());
864
865 // Traversal can't reach outside the root.
866 assert!(matches!(
867 store.delete_file("../../etc/passwd"),
868 Err(TaildropError::InvalidFileName)
869 ));
870
871 std::fs::remove_dir_all(&root).ok();
872 }
873
874 #[tokio::test]
875 async fn concurrent_resume_for_same_name_is_serialized() {
876 // The in-flight name guard: while one transfer holds a base name, a second PUT for the SAME
877 // name (the resume-race the lock closes) is rejected with FileExists rather than interleaving
878 // writes into the shared `.partial`. We hold the first transfer open with a reader that never
879 // completes until we let it, then fire the second concurrently.
880 let root = tmp_root();
881 let store = Arc::new(TaildropStore::new(&root).unwrap());
882
883 // A reader that delivers a byte, then blocks until released — keeps transfer #1 in flight
884 // (and thus the name claimed) while we attempt transfer #2.
885 let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
886 struct BlockingReader {
887 sent: bool,
888 release: Option<tokio::sync::oneshot::Receiver<()>>,
889 }
890 impl AsyncRead for BlockingReader {
891 fn poll_read(
892 mut self: std::pin::Pin<&mut Self>,
893 cx: &mut std::task::Context<'_>,
894 buf: &mut tokio::io::ReadBuf<'_>,
895 ) -> std::task::Poll<io::Result<()>> {
896 if !self.sent {
897 buf.put_slice(b"x");
898 self.sent = true;
899 return std::task::Poll::Ready(Ok(()));
900 }
901 // After the first byte, park until released, then report EOF.
902 match self.release.as_mut() {
903 Some(rx) => match std::pin::Pin::new(rx).poll(cx) {
904 std::task::Poll::Ready(_) => {
905 self.release = None;
906 std::task::Poll::Ready(Ok(())) // EOF (no bytes written)
907 }
908 std::task::Poll::Pending => std::task::Poll::Pending,
909 },
910 None => std::task::Poll::Ready(Ok(())),
911 }
912 }
913 }
914
915 let s1 = store.clone();
916 let t1 = tokio::spawn(async move {
917 let reader = BlockingReader {
918 sent: false,
919 release: Some(release_rx),
920 };
921 // expected_len 1: completes once the single byte is read and the reader returns EOF.
922 s1.put_file("race.bin", reader, 0, 1).await
923 });
924
925 // Wait until transfer #1 has actually claimed the name (its partial exists).
926 let partial = root.join("race.bin.partial");
927 for _ in 0..200 {
928 if partial.exists() {
929 break;
930 }
931 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
932 }
933 assert!(
934 partial.exists(),
935 "transfer #1 should have created the partial"
936 );
937
938 // Transfer #2 for the SAME name, while #1 still holds the claim → FileExists, no interleave.
939 let err = store
940 .put_file("race.bin", &b"yy"[..], 1, 3)
941 .await
942 .unwrap_err();
943 assert!(
944 matches!(err, TaildropError::FileExists),
945 "a concurrent transfer for an in-flight name must be rejected, got {err:?}"
946 );
947
948 // Release #1 so it finalizes cleanly, and confirm the name frees afterward.
949 release_tx.send(()).unwrap();
950 t1.await.unwrap().unwrap();
951 // Now the name is free: a fresh transfer succeeds (the guard was released on drop).
952 store.put_file("race.bin", &b"z"[..], 0, 1).await.unwrap();
953
954 std::fs::remove_dir_all(&root).ok();
955 }
956
957 #[cfg(unix)]
958 #[tokio::test]
959 async fn symlink_in_store_root_is_refused_not_followed() {
960 use std::os::unix::fs::symlink;
961
962 let root = tmp_root();
963 let store = TaildropStore::new(&root).unwrap();
964
965 // An attacker-planted symlink in the store root, pointing at a sensitive file OUTSIDE it.
966 let outside = tmp_root();
967 std::fs::create_dir_all(&outside).unwrap();
968 let secret = outside.join("secret");
969 std::fs::write(&secret, b"TOP SECRET").unwrap();
970
971 // (a) open_file must refuse a symlink target, never read through it.
972 let link = root.join("link.txt");
973 symlink(&secret, &link).unwrap();
974 let open_err = store.open_file("link.txt").unwrap_err();
975 assert!(
976 matches!(open_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
977 "open_file must refuse a symlink, got {open_err:?}"
978 );
979
980 // (b) delete_file must refuse the symlink, leaving BOTH the link and its target intact.
981 let del_err = store.delete_file("link.txt").unwrap_err();
982 assert!(
983 matches!(del_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
984 "delete_file must refuse a symlink, got {del_err:?}"
985 );
986 assert!(
987 secret.exists(),
988 "the symlink target must NOT have been deleted"
989 );
990 assert_eq!(std::fs::read(&secret).unwrap(), b"TOP SECRET");
991
992 // (c) waiting_files must not report the symlink as a waiting file.
993 assert!(
994 store.waiting_files().unwrap().is_empty(),
995 "a symlink in the store root must not be listed as a waiting file"
996 );
997
998 // (d) put_file onto a symlinked partial must refuse rather than write through the link.
999 let link_partial = root.join("evil.bin.partial");
1000 symlink(&secret, &link_partial).unwrap();
1001 let put_err = store
1002 .put_file("evil.bin", &b"data"[..], 0, 4)
1003 .await
1004 .unwrap_err();
1005 assert!(
1006 matches!(put_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
1007 "put_file must refuse a symlinked partial, got {put_err:?}"
1008 );
1009 assert_eq!(
1010 std::fs::read(&secret).unwrap(),
1011 b"TOP SECRET",
1012 "the symlink target must NOT have been written through"
1013 );
1014
1015 std::fs::remove_dir_all(&root).ok();
1016 std::fs::remove_dir_all(&outside).ok();
1017 }
1018
1019 #[test]
1020 fn next_available_name_inserts_before_extension() {
1021 let root = tmp_root();
1022 std::fs::create_dir_all(&root).unwrap();
1023 assert_eq!(next_available_name(&root, "a.txt"), "a.txt");
1024 std::fs::write(root.join("a.txt"), b"x").unwrap();
1025 assert_eq!(next_available_name(&root, "a.txt"), "a (1).txt");
1026 std::fs::write(root.join("a (1).txt"), b"x").unwrap();
1027 assert_eq!(next_available_name(&root, "a.txt"), "a (2).txt");
1028 // Dotfile (no real extension) appends at end.
1029 std::fs::write(root.join(".env"), b"x").unwrap();
1030 assert_eq!(next_available_name(&root, ".env"), ".env (1)");
1031 std::fs::remove_dir_all(&root).ok();
1032 }
1033
1034 /// The reaper deletes a `.partial` older than `delete_delay` and keeps a fresh one. Aging is
1035 /// driven by the passed-in `now` (the partial's real mtime is ~now): `now + 2h` with a 1h delay
1036 /// makes an existing partial look 2h old (reaped); `now` with a 1h delay keeps it (~0s old).
1037 #[test]
1038 fn reaper_deletes_aged_partial_keeps_fresh_and_final() {
1039 let root = tmp_root();
1040 let store = TaildropStore::new(&root).unwrap();
1041
1042 std::fs::write(root.join("abandoned.bin.partial"), b"half").unwrap();
1043 std::fs::write(root.join("done.bin"), b"complete").unwrap(); // a finished file, not a partial
1044
1045 let now = SystemTime::now();
1046 let delay = Duration::from_secs(3600);
1047
1048 // Fresh: nothing is older than the delay yet.
1049 assert_eq!(
1050 store.reap_abandoned_partials(now, delay),
1051 0,
1052 "nothing aged out"
1053 );
1054 assert!(root.join("abandoned.bin.partial").exists());
1055
1056 // Aged: 2h in the future vs a 1h delay → the partial is reaped, the final file is untouched.
1057 let reaped = store.reap_abandoned_partials(now + Duration::from_secs(2 * 3600), delay);
1058 assert_eq!(reaped, 1, "the aged partial is reaped");
1059 assert!(
1060 !root.join("abandoned.bin.partial").exists(),
1061 "aged partial deleted"
1062 );
1063 assert!(
1064 root.join("done.bin").exists(),
1065 "a completed (non-partial) file must never be reaped"
1066 );
1067
1068 std::fs::remove_dir_all(&root).ok();
1069 }
1070
1071 /// An in-flight transfer's partial is NEVER reaped, even when aged (Go's "no active put" check):
1072 /// while a base name is claimed, the reaper skips its partial regardless of mtime.
1073 #[tokio::test]
1074 async fn reaper_spares_in_flight_partial() {
1075 let root = tmp_root();
1076 let store = TaildropStore::new(&root).unwrap();
1077
1078 // Pre-write an (old) partial and claim its base name as in-flight.
1079 std::fs::write(root.join("live.bin.partial"), b"in progress").unwrap();
1080 let _claim = store.claim_in_flight("live.bin").expect("claim");
1081
1082 // Even with a far-future `now` (well past the delay), the in-flight partial is spared.
1083 let reaped = store.reap_abandoned_partials(
1084 SystemTime::now() + Duration::from_secs(48 * 3600),
1085 Duration::from_secs(3600),
1086 );
1087 assert_eq!(reaped, 0, "an in-flight partial must not be reaped");
1088 assert!(
1089 root.join("live.bin.partial").exists(),
1090 "the in-flight partial survives the sweep"
1091 );
1092
1093 // Once the claim drops, the same aged partial IS reaped.
1094 drop(_claim);
1095 let reaped = store.reap_abandoned_partials(
1096 SystemTime::now() + Duration::from_secs(48 * 3600),
1097 Duration::from_secs(3600),
1098 );
1099 assert_eq!(
1100 reaped, 1,
1101 "after the claim drops, the aged partial is reaped"
1102 );
1103
1104 std::fs::remove_dir_all(&root).ok();
1105 }
1106
1107 /// The background reaper task: (1) does NOT reap at startup, and (2) terminates when shutdown
1108 /// flips true. With the real DELETE_DELAY (1h) interval, the first real sweep is an hour out, so
1109 /// within this millisecond-scale test no sweep ever fires — which is exactly what proves both
1110 /// "the startup tick is skipped" (an aged partial present at start survives) and "the task is
1111 /// parked on the interval, woken only by shutdown". (`test-util`/paused-time is intentionally not
1112 /// pulled into this crate's deps, so this uses real time and the fact that 1h >> the test.)
1113 #[tokio::test]
1114 async fn reaper_task_skips_startup_then_terminates_on_shutdown() {
1115 let root = tmp_root();
1116 let store = Arc::new(TaildropStore::new(&root).unwrap());
1117 // A partial present at boot — it must survive (no startup reap fires within the test window).
1118 std::fs::write(root.join("boot.bin.partial"), b"x").unwrap();
1119
1120 let (tx, rx) = tokio::sync::watch::channel(false);
1121 let handle = spawn_partial_reaper(store.clone(), rx);
1122
1123 // Give the task a moment to reach its select!; the first real sweep is DELETE_DELAY (1h) out,
1124 // so nothing is reaped here.
1125 tokio::time::sleep(Duration::from_millis(50)).await;
1126 assert!(
1127 root.join("boot.bin.partial").exists(),
1128 "no reap before the first real sweep (startup tick is skipped; first sweep is 1h out)"
1129 );
1130
1131 // Flip shutdown: the task must terminate via its select! shutdown arm, not hang on the 1h
1132 // interval. If the shutdown arm were broken this `timeout` would elapse (the next tick is ~1h
1133 // away), so the timeout firing IS the regression signal.
1134 tx.send_replace(true);
1135 tokio::time::timeout(Duration::from_secs(5), handle)
1136 .await
1137 .expect("reaper must terminate on shutdown (not wait for the 1h interval)")
1138 .expect("reaper task must not panic");
1139
1140 std::fs::remove_dir_all(&root).ok();
1141 }
1142
1143 /// A reaper spawned when shutdown is ALREADY true terminates on its first loop iteration
1144 /// (`watch::Receiver::wait_for` returns immediately when the predicate already holds), rather
1145 /// than waiting a full DELETE_DELAY (1h) for the first tick.
1146 #[tokio::test]
1147 async fn reaper_task_terminates_when_shutdown_already_set() {
1148 let root = tmp_root();
1149 let store = Arc::new(TaildropStore::new(&root).unwrap());
1150 let (_tx, rx) = tokio::sync::watch::channel(true); // already shut down
1151 let handle = spawn_partial_reaper(store, rx);
1152 tokio::time::timeout(Duration::from_secs(5), handle)
1153 .await
1154 .expect("a reaper spawned post-shutdown must exit promptly, not wait the 1h interval")
1155 .expect("reaper task must not panic");
1156 std::fs::remove_dir_all(&root).ok();
1157 }
1158}