Skip to main content

ts_runtime/
taildrop.rs

1//! Taildrop file store — the receiving half of Tailscale's peer-to-peer file transfer.
2//!
3//! A peer sends a file to this node via the peerAPI route `PUT /v0/put/<name>` (handled in
4//! `peerapi`). This module owns the on-disk store those puts land in, faithfully mirroring
5//! Go's `taildrop.manager`:
6//!
7//! - Incoming bytes are written to a per-transfer **partial** file (`<base>.partial`) so an
8//!   interrupted transfer never exposes a truncated file under its real name, and can be resumed
9//!   from an offset.
10//! - On successful completion the partial is **atomically renamed** to the final base name. If the
11//!   final name already exists, a non-clobbering ` (n)` suffix is chosen (Go `nextFilename`).
12//! - File names are strictly validated ([`validate_base_name`](crate::taildrop::validate_base_name)) to defeat path traversal and
13//!   reserved-suffix abuse before any path is constructed — this is the security boundary.
14//!
15//! # Anti-abuse / safety
16//!
17//! Every name is validated to be a single, local, non-traversing path component before it touches
18//! the filesystem; a name containing `/`, `\`, `..`, a NUL, control chars, or the reserved
19//! `.partial` / `.deleted` suffixes is rejected with [`TaildropError::InvalidFileName`](crate::taildrop::TaildropError::InvalidFileName). The store
20//! root is fixed at construction; all I/O is confined to it by joining only validated base names.
21
22use std::{
23    collections::HashSet,
24    io::{self, Seek, Write},
25    path::{Path, PathBuf},
26    sync::{Arc, Mutex},
27};
28
29use tokio::io::{AsyncRead, AsyncReadExt};
30
31/// Suffix for in-progress transfers. A completed transfer is renamed off this suffix; a name
32/// ending in it is itself never accepted as a base name (Go `partialSuffix`).
33const PARTIAL_SUFFIX: &str = ".partial";
34/// Suffix Go uses to tombstone files pending deletion on platforms with async close; we reject it
35/// as a base name for parity so a sender can't create one (Go `deletedSuffix`).
36const DELETED_SUFFIX: &str = ".deleted";
37/// Maximum base-name length in bytes (Go `validateBaseName`: 255).
38const MAX_BASE_NAME_LEN: usize = 255;
39
40/// Errors from the Taildrop file store.
41#[derive(Debug)]
42pub enum TaildropError {
43    /// The requested file name is invalid (traversal, reserved suffix, empty, too long, bad runes).
44    /// Maps to peerAPI `400 Bad Request`.
45    InvalidFileName,
46    /// A transfer for this exact base name is already in progress. Maps to peerAPI `409 Conflict`.
47    FileExists,
48    /// Underlying filesystem I/O failure. Maps to peerAPI `500`.
49    Io(io::Error),
50}
51
52impl core::fmt::Display for TaildropError {
53    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
54        match self {
55            TaildropError::InvalidFileName => write!(f, "invalid taildrop file name"),
56            TaildropError::FileExists => {
57                write!(f, "a transfer for this file is already in progress")
58            }
59            TaildropError::Io(e) => write!(f, "taildrop I/O error: {e}"),
60        }
61    }
62}
63
64impl std::error::Error for TaildropError {
65    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
66        match self {
67            TaildropError::Io(e) => Some(e),
68            _ => None,
69        }
70    }
71}
72
73impl From<io::Error> for TaildropError {
74    fn from(e: io::Error) -> Self {
75        TaildropError::Io(e)
76    }
77}
78
79/// A waiting (fully-received) Taildrop file, as reported to the embedder. Mirrors Go
80/// `apitype.WaitingFile` (default field-name JSON marshalling: `Name`, `Size`).
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct WaitingFile {
83    /// The file's base name.
84    pub name: String,
85    /// The file's size in bytes.
86    pub size: u64,
87}
88
89/// Validate a Taildrop base name, mirroring Go `taildrop.validateBaseName`.
90///
91/// Returns the name unchanged when it is a safe, single, local path component; otherwise `None`.
92/// Rejection rules (any one fails): empty or `> 255` bytes; leading/trailing ASCII space; contains
93/// a path separator (`/` or `\`), a NUL, or an ASCII control char; is `.` or `..`; equals a cleaned
94/// path other than itself (catches embedded `..`/`.` segments and absolute paths); or ends in the
95/// reserved `.partial` / `.deleted` suffixes.
96pub fn validate_base_name(name: &str) -> Option<&str> {
97    if name.is_empty() || name.len() > MAX_BASE_NAME_LEN {
98        return None;
99    }
100    if name.starts_with(' ') || name.ends_with(' ') {
101        return None;
102    }
103    if name == "." || name == ".." {
104        return None;
105    }
106    if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
107        return None;
108    }
109    // Reject any separator, NUL, or control character outright. This is the core traversal guard:
110    // with no `/`, `\`, or `..` segment possible, the name can only ever be a leaf in the store dir.
111    for ch in name.chars() {
112        if ch == '/' || ch == '\\' || ch == '\0' || ch.is_control() {
113            return None;
114        }
115    }
116    // Defense in depth: a name that does not survive `Path` normalization as a single normal
117    // component is rejected (catches `..`, absolute paths, and any platform-specific oddity).
118    let p = Path::new(name);
119    let mut comps = p.components();
120    match (comps.next(), comps.next()) {
121        (Some(std::path::Component::Normal(c)), None) if c == name => Some(name),
122        _ => None,
123    }
124}
125
126/// Choose a non-clobbering final name for `base` within `dir`, mirroring Go `nextFilename`:
127/// `foo.txt` -> `foo (1).txt` -> `foo (2).txt` ... inserting ` (n)` before the extension. Returns
128/// the first candidate (incl. `base` itself) whose path does not yet exist. Bounded to avoid an
129/// unbounded loop on a pathological directory.
130fn next_available_name(dir: &Path, base: &str) -> String {
131    if !path_present(&dir.join(base)) {
132        return base.to_string();
133    }
134    let (stem, ext) = match base.rsplit_once('.') {
135        // Keep the dot with the extension; an empty stem (dotfile like ".bashrc") has no split.
136        Some((stem, ext)) if !stem.is_empty() => (stem, format!(".{ext}")),
137        _ => (base, String::new()),
138    };
139    for n in 1..=10_000u32 {
140        let candidate = format!("{stem} ({n}){ext}");
141        if !path_present(&dir.join(&candidate)) {
142            return candidate;
143        }
144    }
145    // Pathological fallback: suffix with a high counter; extremely unlikely to be reached.
146    format!("{stem} (overflow){ext}")
147}
148
149/// Whether a path is present, treating a symlink (even a dangling one) as present. Unlike
150/// `Path::exists()` (which follows the link and returns `false` for a dangling symlink), this uses
151/// `symlink_metadata` so a planted symlink can never be mistaken for a free name in
152/// [`next_available_name`] — we must not select, then rename onto, a symlink.
153fn path_present(path: &Path) -> bool {
154    std::fs::symlink_metadata(path).is_ok()
155}
156
157/// Reject a path that is (or whose final component is) a symlink, mirroring the intent of Go's
158/// `O_NOFOLLOW` on the taildrop file ops. `validate_base_name` already blocks a traversing *name*,
159/// but not a symlink **component already planted in the store root** by a local attacker (e.g.
160/// `root/foo.txt -> /etc/cron.d/x`), which a plain `open`/`rename`/`remove` would follow. Uses
161/// `symlink_metadata` (lstat — does NOT follow the final symlink); a non-existent path is fine
162/// (returns `Ok(())`), only an existing symlink is refused.
163///
164/// This is a check-then-act guard, so it is not atomic with the open/rename/remove that follows.
165/// It kills the **persistent-plant** attack (a symlink left in the store root is no longer followed
166/// deterministically), and the per-name in-flight lock serializes our OWN operations on a name, but
167/// it does not close a sub-millisecond race where an external process swaps the path for a symlink
168/// between this lstat and the syscall. That residual requires an external writer who already holds
169/// store-dir write access (the threat bound for this hardening), and is the one axis where this is
170/// weaker than Go's atomic `O_NOFOLLOW`. The `offset == 0` put is additionally protected by
171/// `create_new` (`O_EXCL`, which refuses an existing symlink atomically); the `offset > 0` resume
172/// open is plain `write(true)` and relies on this advisory check until `O_NOFOLLOW` is wired
173/// (tracked as a follow-up — its raw value is arch-dependent, so a portable open flag is deferred).
174fn refuse_symlink(path: &Path) -> Result<(), TaildropError> {
175    match std::fs::symlink_metadata(path) {
176        Ok(meta) if meta.file_type().is_symlink() => Err(TaildropError::Io(io::Error::new(
177            io::ErrorKind::InvalidInput,
178            "taildrop path is a symlink; refusing to follow it",
179        ))),
180        Ok(_) => Ok(()),
181        // Not present yet (the common case for a fresh partial / final name) — nothing to refuse.
182        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
183        Err(e) => Err(e.into()),
184    }
185}
186
187/// A Taildrop file store rooted at a fixed directory. All operations are confined to this root by
188/// joining only [`validate_base_name`]-validated names.
189#[derive(Debug, Clone)]
190pub struct TaildropStore {
191    root: PathBuf,
192    /// Base names with a transfer currently in flight. A `put_file` claims its base name here for
193    /// the whole receive (both a fresh `offset == 0` transfer and a resumed `offset > 0` one), so
194    /// two concurrent PUTs for the same name cannot interleave `set_len`/`seek`/`write_all` and
195    /// corrupt the shared `.partial`. Shared (`Arc`) so it survives `TaildropStore::clone()` — the
196    /// store is handed around as `Arc<TaildropStore>` but cloning it must not fork the guard set.
197    in_flight: Arc<Mutex<HashSet<String>>>,
198}
199
200/// RAII claim on an in-flight transfer name; releasing it (on drop) frees the name for the next
201/// transfer. Holds the shared guard set so the entry is removed even on an early return / error /
202/// panic in `put_file`.
203struct InFlightGuard {
204    set: Arc<Mutex<HashSet<String>>>,
205    name: String,
206}
207
208impl Drop for InFlightGuard {
209    fn drop(&mut self) {
210        // A poisoned lock still lets us recover the set and remove our entry — leaving a stale name
211        // claimed would wedge all future transfers of that name behind a phantom conflict.
212        let mut set = self.set.lock().unwrap_or_else(|p| p.into_inner());
213        set.remove(&self.name);
214    }
215}
216
217impl TaildropStore {
218    /// Create a store rooted at `root`, creating the directory (and parents) if needed.
219    pub fn new(root: impl Into<PathBuf>) -> Result<Self, TaildropError> {
220        let root = root.into();
221        std::fs::create_dir_all(&root)?;
222        Ok(Self {
223            root,
224            in_flight: Arc::new(Mutex::new(HashSet::new())),
225        })
226    }
227
228    /// Claim `base` as in-flight, returning an RAII guard that frees it on drop. Returns
229    /// [`TaildropError::FileExists`] if another transfer already holds the name — this is the
230    /// concurrency analog of the on-disk `.partial` conflict, and it serializes all transfers of one
231    /// name so a resume (`offset > 0`) cannot race a concurrent transfer's `set_len`/`seek`/`write`.
232    fn claim_in_flight(&self, base: &str) -> Result<InFlightGuard, TaildropError> {
233        let name = base.to_string();
234        let mut set = self.in_flight.lock().unwrap_or_else(|p| p.into_inner());
235        if !set.insert(name.clone()) {
236            return Err(TaildropError::FileExists);
237        }
238        Ok(InFlightGuard {
239            set: self.in_flight.clone(),
240            name,
241        })
242    }
243
244    /// The partial-file path for an already-validated base name.
245    fn partial_path(&self, base: &str) -> PathBuf {
246        self.root.join(format!("{base}{PARTIAL_SUFFIX}"))
247    }
248
249    /// Receive a file named `name` from `reader`, writing to `<name>.partial` then atomically
250    /// renaming to a non-clobbering final name on success. Mirrors Go `manager.PutFile`.
251    ///
252    /// `offset` lets a resumed transfer append past already-written bytes (the partial is opened, the
253    /// write starts at `offset`, and any bytes already on disk past `offset` are truncated away).
254    /// `expected_len` is the declared total length of the completed file (the request's
255    /// `Content-Length` plus `offset`); the transfer is finalized only if exactly that many bytes are
256    /// present. Returns the total number of bytes in the completed file.
257    ///
258    /// Fail-closed: an invalid name is rejected before any path is built; an in-progress partial for
259    /// the same name yields [`TaildropError::FileExists`]; an out-of-range resume `offset` (past the
260    /// current partial length) is rejected; an I/O error mid-transfer — or a body that ends before
261    /// `expected_len` (a short/interrupted stream) — leaves the `.partial` on disk and the final name
262    /// is never created. This matches Go `feature/taildrop/send.go`, which errors when the copied
263    /// length does not equal the declared length rather than publishing a truncated file.
264    ///
265    /// The retained `.partial` is resumable only by a peer that issues a ranged retry (an `offset > 0`
266    /// PUT); a sender that always restarts at `offset == 0` will instead hit the in-progress-conflict
267    /// path ([`TaildropError::FileExists`]) until the stale partial is cleared. There is no automatic
268    /// reaper for an abandoned partial yet (Go's `fileDeleter` GCs one after ~1h); tracked separately.
269    pub async fn put_file<R>(
270        &self,
271        name: &str,
272        mut reader: R,
273        offset: u64,
274        expected_len: u64,
275    ) -> Result<u64, TaildropError>
276    where
277        R: AsyncRead + Unpin,
278    {
279        let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
280        let partial = self.partial_path(base);
281
282        // Claim the name for the whole transfer FIRST, so two concurrent PUTs for the same base name
283        // (especially two resumes, `offset > 0`, which reopen the same `.partial`) cannot interleave
284        // their `set_len`/`seek`/`write_all` and corrupt the shared partial. The fresh-transfer path
285        // is already protected on disk by `create_new`, but the resume path opens with plain
286        // `write(true)` and needs this lock. The guard frees the name on drop (incl. early return /
287        // error / panic). Held across the await — it is a cheap `HashSet` membership marker, not a
288        // lock held during I/O, so it never blocks the runtime.
289        let _claim = self.claim_in_flight(base)?;
290
291        // Refuse to follow a symlink planted in the store root (Go's `O_NOFOLLOW` intent): the
292        // partial must be a regular file we create/own, never a pre-existing symlink to elsewhere.
293        refuse_symlink(&partial)?;
294
295        // A fresh transfer (offset 0) must not collide with another in-flight transfer of the same
296        // name; a resume (offset > 0) reopens the existing partial. File handles are std (the tokio
297        // `fs` feature is intentionally not enabled in this crate); the body is read async off the
298        // overlay stream and written to the blocking handle in a bounded loop.
299        let mut file = if offset == 0 {
300            match std::fs::OpenOptions::new()
301                .write(true)
302                .create_new(true)
303                .open(&partial)
304            {
305                Ok(f) => f,
306                Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
307                    return Err(TaildropError::FileExists);
308                }
309                Err(e) => return Err(e.into()),
310            }
311        } else {
312            let mut f = std::fs::OpenOptions::new().write(true).open(&partial)?;
313            // Bound the resume offset to the current partial length and truncate any bytes past it,
314            // matching Go `feature/taildrop/fileops_fs.go` (`OpenWriter` rejects `offset > curr` and
315            // `Truncate(offset)`s). Without the bound a too-large offset would leave a zero-filled
316            // sparse hole; without the truncate a shorter resumed body would leave a prior attempt's
317            // stale tail past the new end. `metadata().len()` is the partial's current size.
318            let current = f.metadata()?.len();
319            if offset > current {
320                return Err(TaildropError::Io(io::Error::new(
321                    io::ErrorKind::InvalidInput,
322                    "taildrop resume offset is past the end of the partial file",
323                )));
324            }
325            f.set_len(offset)?;
326            f.seek(io::SeekFrom::Start(offset))?;
327            f
328        };
329
330        let mut copied: u64 = 0;
331        let mut buf = [0u8; 64 * 1024];
332        loop {
333            let n = reader.read(&mut buf).await?;
334            if n == 0 {
335                break;
336            }
337            // Each `write_all` only pushes the chunk into the page cache (microseconds); the
338            // genuinely blocking cost is the terminal `flush`/`sync_all`/`rename` below, which we
339            // hand to a blocking thread so a flood of concurrent transfers can't starve the tokio
340            // worker pool on fsync (see `peerapi::MAX_INFLIGHT`).
341            file.write_all(&buf[..n])?;
342            copied += n as u64;
343        }
344
345        // Length check (Go `send.go`: error when `copyLength != length`). A body that ended before the
346        // declared length — an interrupted/short stream — must NOT be finalized as a complete file;
347        // leave the `.partial` on disk (with the bytes received so far) so a Range-capable peer can
348        // resume it. `checked_add` rather than a bare `+`: `offset` is an attacker-supplied header and
349        // the bound above already rejects an `offset` past the (real, on-disk) partial length, so this
350        // cannot overflow in practice — but treat an overflow as a length mismatch rather than a panic.
351        let total = match offset.checked_add(copied) {
352            Some(t) if t == expected_len => t,
353            _ => {
354                return Err(TaildropError::Io(io::Error::new(
355                    io::ErrorKind::UnexpectedEof,
356                    format!(
357                        "taildrop body ended early: got {copied} of {expected_len} expected bytes \
358                         at offset {offset}; leaving partial for resume"
359                    ),
360                )));
361            }
362        };
363
364        // Finalize off the async runtime: `sync_all` (fsync) and `rename` are the dominant blocking
365        // operations, so run them on a blocking thread. The `File` and both paths are owned by the
366        // closure (`Send + 'static`), and `next_available_name` (which `stat`s candidates) goes with
367        // them. Fail-closed: any I/O error — or a join failure — propagates without ever publishing
368        // the final name, leaving the `.partial` in place for a later resume.
369        let root = self.root.clone();
370        let base = base.to_string();
371        tokio::task::spawn_blocking(move || -> io::Result<()> {
372            file.flush()?;
373            file.sync_all()?;
374            drop(file);
375
376            // Atomically publish under a non-clobbering final name. `next_available_name` probes
377            // candidates with `symlink_metadata` (not `exists`, which follows symlinks), so it will
378            // not treat a planted symlink as "free" and rename onto it; and we refuse to rename onto
379            // an existing symlink target outright (Go `O_NOFOLLOW` intent). The `_claim` guard (held
380            // by the caller for the whole transfer) keeps this name serialized against other PUTs.
381            let final_name = next_available_name(&root, &base);
382            let final_path = root.join(&final_name);
383            if let Err(e) = refuse_symlink(&final_path) {
384                return Err(match e {
385                    TaildropError::Io(io_err) => io_err,
386                    other => io::Error::other(other.to_string()),
387                });
388            }
389            std::fs::rename(&partial, &final_path)?;
390            Ok(())
391        })
392        .await
393        .map_err(|join_err| {
394            // A panicked/cancelled finalize task: surface as I/O so the caller maps it to a 500 and
395            // the partial is left untouched (never publishes the final name).
396            TaildropError::Io(io::Error::other(format!(
397                "taildrop finalize task failed: {join_err}"
398            )))
399        })??;
400
401        Ok(total)
402    }
403
404    /// List fully-received (non-partial) files, sorted by name (Go `WaitingFiles`).
405    pub fn waiting_files(&self) -> Result<Vec<WaitingFile>, TaildropError> {
406        let mut out = Vec::new();
407        let entries = match std::fs::read_dir(&self.root) {
408            Ok(e) => e,
409            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
410            Err(e) => return Err(e.into()),
411        };
412        for entry in entries {
413            let entry = entry?;
414            // `entry.metadata()` does NOT follow symlinks (it is `lstat`-based), so a planted
415            // symlink has `is_file() == false` here and is skipped — a symlink in the store root is
416            // never reported as a waiting file (Go `O_NOFOLLOW` intent), even one pointing at a real
417            // regular file elsewhere.
418            let meta = entry.metadata()?;
419            if meta.file_type().is_symlink() || !meta.is_file() {
420                continue;
421            }
422            let Ok(name) = entry.file_name().into_string() else {
423                continue;
424            };
425            // Skip in-progress / tombstoned files.
426            if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
427                continue;
428            }
429            out.push(WaitingFile {
430                name,
431                size: meta.len(),
432            });
433        }
434        out.sort_by(|a, b| a.name.cmp(&b.name));
435        Ok(out)
436    }
437
438    /// Delete a fully-received file by base name (Go `DeleteFile`). The name is validated first, so a
439    /// traversal attempt can never escape the store root, and a symlink at the target is refused (Go
440    /// `O_NOFOLLOW` intent) rather than followed — a planted `root/foo.txt -> /etc/passwd` must not
441    /// let a `delete foo.txt` remove the link's target.
442    pub fn delete_file(&self, name: &str) -> Result<(), TaildropError> {
443        let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
444        let path = self.root.join(base);
445        refuse_symlink(&path)?;
446        std::fs::remove_file(path)?;
447        Ok(())
448    }
449
450    /// Open a fully-received file by base name for reading, returning the handle and its size (Go
451    /// `OpenFile`). The name is validated first, and a symlink at the target is refused (Go
452    /// `O_NOFOLLOW` intent) so a planted symlink cannot redirect the read to an arbitrary file.
453    pub fn open_file(&self, name: &str) -> Result<(std::fs::File, u64), TaildropError> {
454        let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
455        let path = self.root.join(base);
456        refuse_symlink(&path)?;
457        let f = std::fs::File::open(path)?;
458        let size = f.metadata()?.len();
459        Ok((f, size))
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    fn tmp_root() -> PathBuf {
468        // A per-call atomic counter guarantees uniqueness across tests that run concurrently in the
469        // same binary. A timestamp alone is NOT enough: `SystemTime` resolution is coarse on some
470        // platforms, so two tests starting in the same tick would collide on one dir and stomp each
471        // other's files (the cause of intermittent taildrop-test flakiness under parallel runs).
472        static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
473        let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
474        let mut p = std::env::temp_dir();
475        p.push(format!("taildrop-test-{}-{n}", std::process::id()));
476        p
477    }
478
479    #[test]
480    fn validate_rejects_traversal_and_reserved() {
481        // Valid leaf names.
482        assert_eq!(validate_base_name("photo.jpg"), Some("photo.jpg"));
483        assert_eq!(
484            validate_base_name("a file with spaces.txt"),
485            Some("a file with spaces.txt")
486        );
487        assert_eq!(validate_base_name(".bashrc"), Some(".bashrc"));
488
489        // Traversal / separators.
490        assert_eq!(validate_base_name("../etc/passwd"), None);
491        assert_eq!(validate_base_name("a/b"), None);
492        assert_eq!(validate_base_name("a\\b"), None);
493        assert_eq!(validate_base_name("/abs"), None);
494        assert_eq!(validate_base_name(".."), None);
495        assert_eq!(validate_base_name("."), None);
496
497        // NUL / control.
498        assert_eq!(validate_base_name("a\0b"), None);
499        assert_eq!(validate_base_name("a\nb"), None);
500
501        // Reserved suffixes.
502        assert_eq!(validate_base_name("x.partial"), None);
503        assert_eq!(validate_base_name("x.deleted"), None);
504
505        // Edges.
506        assert_eq!(validate_base_name(""), None);
507        assert_eq!(validate_base_name(" leading"), None);
508        assert_eq!(validate_base_name("trailing "), None);
509        assert_eq!(validate_base_name(&"a".repeat(256)), None);
510        assert_eq!(
511            validate_base_name(&"a".repeat(255)).map(|s| s.len()),
512            Some(255)
513        );
514    }
515
516    #[tokio::test]
517    async fn put_file_writes_then_atomically_renames() {
518        let root = tmp_root();
519        let store = TaildropStore::new(&root).unwrap();
520
521        let data = b"hello taildrop";
522        let n = store
523            .put_file("greeting.txt", &data[..], 0, data.len() as u64)
524            .await
525            .unwrap();
526        assert_eq!(n, data.len() as u64);
527
528        // The final file exists; no .partial remains.
529        let body = std::fs::read(root.join("greeting.txt")).unwrap();
530        assert_eq!(body, data);
531        assert!(!root.join("greeting.txt.partial").exists());
532
533        let wf = store.waiting_files().unwrap();
534        assert_eq!(wf.len(), 1);
535        assert_eq!(wf[0].name, "greeting.txt");
536        assert_eq!(wf[0].size, data.len() as u64);
537
538        std::fs::remove_dir_all(&root).ok();
539    }
540
541    #[tokio::test]
542    async fn put_file_resumes_from_offset() {
543        let root = tmp_root();
544        let store = TaildropStore::new(&root).unwrap();
545
546        // Pre-write a prefix into the `.partial` directly, simulating bytes already received by an
547        // earlier (interrupted) transfer.
548        let prefix = b"the first half ";
549        let partial = root.join("resume.txt.partial");
550        std::fs::write(&partial, prefix).unwrap();
551
552        // Resume at offset == the prefix length: `put_file` opens the existing partial, seeks past
553        // the prefix, and appends the rest.
554        let rest = b"and the second half";
555        let total = store
556            .put_file(
557                "resume.txt",
558                &rest[..],
559                prefix.len() as u64,
560                (prefix.len() + rest.len()) as u64,
561            )
562            .await
563            .unwrap();
564
565        // The returned count is offset + freshly-copied bytes, and the final file is the prefix and
566        // the resumed bytes concatenated (the seek positioned the write correctly).
567        assert_eq!(total, (prefix.len() + rest.len()) as u64);
568        let body = std::fs::read(root.join("resume.txt")).unwrap();
569        let mut expected = prefix.to_vec();
570        expected.extend_from_slice(rest);
571        assert_eq!(body, expected);
572        assert!(!partial.exists());
573
574        std::fs::remove_dir_all(&root).ok();
575    }
576
577    #[tokio::test]
578    async fn put_file_short_body_leaves_partial_not_truncated_final() {
579        // F2: a body that ends before the declared length must NOT be finalized as a complete (but
580        // truncated) file under the real name. Go errors when copyLength != length; we leave the
581        // `.partial` in place for resume.
582        let root = tmp_root();
583        let store = TaildropStore::new(&root).unwrap();
584
585        // Reader yields 5 bytes but we declare 10 expected (a short/interrupted stream).
586        let err = store
587            .put_file("short.txt", &b"world"[..], 0, 10)
588            .await
589            .unwrap_err();
590        assert!(
591            matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::UnexpectedEof),
592            "a short body must error, got {err:?}"
593        );
594        // The final name was NEVER created; the partial remains with the bytes received so far.
595        assert!(!root.join("short.txt").exists(), "no truncated final file");
596        let partial = std::fs::read(root.join("short.txt.partial")).unwrap();
597        assert_eq!(
598            partial, b"world",
599            "partial holds the received prefix for resume"
600        );
601        assert!(store.waiting_files().unwrap().is_empty());
602
603        std::fs::remove_dir_all(&root).ok();
604    }
605
606    #[tokio::test]
607    async fn put_file_resume_offset_past_end_is_rejected() {
608        // F3: a resume offset beyond the current partial length must be rejected (Go errors
609        // "offset out of range"), not produce a zero-filled sparse hole.
610        let root = tmp_root();
611        let store = TaildropStore::new(&root).unwrap();
612        std::fs::write(root.join("sparse.txt.partial"), b"abc").unwrap(); // 3 bytes on disk
613
614        let err = store
615            .put_file("sparse.txt", &b"xyz"[..], 99, 102)
616            .await
617            .unwrap_err();
618        assert!(
619            matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
620            "offset past end must be rejected, got {err:?}"
621        );
622        // The partial is untouched (still 3 bytes), no final file.
623        assert_eq!(
624            std::fs::read(root.join("sparse.txt.partial")).unwrap(),
625            b"abc"
626        );
627        assert!(!root.join("sparse.txt").exists());
628
629        std::fs::remove_dir_all(&root).ok();
630    }
631
632    #[tokio::test]
633    async fn put_file_resume_truncates_stale_tail() {
634        // F3: resuming at an offset LESS than the current partial length must truncate the bytes
635        // past the offset (Go `Truncate(offset)`), so a stale tail from a prior attempt cannot
636        // survive past the newly-written end.
637        let root = tmp_root();
638        let store = TaildropStore::new(&root).unwrap();
639        // A prior attempt left 20 bytes; we resume at offset 5 with a 3-byte tail ⇒ final is 8 bytes.
640        std::fs::write(root.join("retry.txt.partial"), b"KEEPme-STALE-TAILxxx").unwrap();
641
642        let total = store
643            .put_file("retry.txt", &b"NEW"[..], 5, 8)
644            .await
645            .unwrap();
646        assert_eq!(total, 8);
647        let body = std::fs::read(root.join("retry.txt")).unwrap();
648        assert_eq!(
649            body, b"KEEPmNEW",
650            "bytes past offset 5 truncated, then NEW appended"
651        );
652
653        std::fs::remove_dir_all(&root).ok();
654    }
655
656    #[tokio::test]
657    async fn put_file_conflict_picks_non_clobbering_name() {
658        let root = tmp_root();
659        let store = TaildropStore::new(&root).unwrap();
660
661        store
662            .put_file("dup.txt", &b"first"[..], 0, 5)
663            .await
664            .unwrap();
665        store
666            .put_file("dup.txt", &b"second"[..], 0, 6)
667            .await
668            .unwrap();
669        store
670            .put_file("dup.txt", &b"third"[..], 0, 5)
671            .await
672            .unwrap();
673
674        // Original plus two non-clobbering renames.
675        assert!(root.join("dup.txt").exists());
676        assert!(root.join("dup (1).txt").exists());
677        assert!(root.join("dup (2).txt").exists());
678
679        let wf = store.waiting_files().unwrap();
680        assert_eq!(wf.len(), 3);
681
682        std::fs::remove_dir_all(&root).ok();
683    }
684
685    #[tokio::test]
686    async fn put_file_in_progress_partial_is_conflict() {
687        let root = tmp_root();
688        let store = TaildropStore::new(&root).unwrap();
689
690        // Simulate an in-flight transfer by pre-creating the .partial file.
691        std::fs::write(root.join("busy.txt.partial"), b"partial").unwrap();
692
693        let err = store
694            .put_file("busy.txt", &b"x"[..], 0, 1)
695            .await
696            .unwrap_err();
697        assert!(matches!(err, TaildropError::FileExists));
698
699        std::fs::remove_dir_all(&root).ok();
700    }
701
702    #[tokio::test]
703    async fn put_file_rejects_bad_name_before_any_io() {
704        let root = tmp_root();
705        let store = TaildropStore::new(&root).unwrap();
706
707        let err = store
708            .put_file("../escape", &b"x"[..], 0, 1)
709            .await
710            .unwrap_err();
711        assert!(matches!(err, TaildropError::InvalidFileName));
712        // Nothing was written anywhere.
713        assert!(store.waiting_files().unwrap().is_empty());
714
715        std::fs::remove_dir_all(&root).ok();
716    }
717
718    #[tokio::test]
719    async fn delete_and_open_roundtrip() {
720        let root = tmp_root();
721        let store = TaildropStore::new(&root).unwrap();
722
723        store.put_file("doc.bin", &b"abc"[..], 0, 3).await.unwrap();
724        let (_f, size) = store.open_file("doc.bin").unwrap();
725        assert_eq!(size, 3);
726
727        store.delete_file("doc.bin").unwrap();
728        assert!(store.waiting_files().unwrap().is_empty());
729
730        // Traversal can't reach outside the root.
731        assert!(matches!(
732            store.delete_file("../../etc/passwd"),
733            Err(TaildropError::InvalidFileName)
734        ));
735
736        std::fs::remove_dir_all(&root).ok();
737    }
738
739    #[tokio::test]
740    async fn concurrent_resume_for_same_name_is_serialized() {
741        // The in-flight name guard: while one transfer holds a base name, a second PUT for the SAME
742        // name (the resume-race the lock closes) is rejected with FileExists rather than interleaving
743        // writes into the shared `.partial`. We hold the first transfer open with a reader that never
744        // completes until we let it, then fire the second concurrently.
745        let root = tmp_root();
746        let store = Arc::new(TaildropStore::new(&root).unwrap());
747
748        // A reader that delivers a byte, then blocks until released — keeps transfer #1 in flight
749        // (and thus the name claimed) while we attempt transfer #2.
750        let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
751        struct BlockingReader {
752            sent: bool,
753            release: Option<tokio::sync::oneshot::Receiver<()>>,
754        }
755        impl AsyncRead for BlockingReader {
756            fn poll_read(
757                mut self: std::pin::Pin<&mut Self>,
758                cx: &mut std::task::Context<'_>,
759                buf: &mut tokio::io::ReadBuf<'_>,
760            ) -> std::task::Poll<io::Result<()>> {
761                if !self.sent {
762                    buf.put_slice(b"x");
763                    self.sent = true;
764                    return std::task::Poll::Ready(Ok(()));
765                }
766                // After the first byte, park until released, then report EOF.
767                match self.release.as_mut() {
768                    Some(rx) => match std::pin::Pin::new(rx).poll(cx) {
769                        std::task::Poll::Ready(_) => {
770                            self.release = None;
771                            std::task::Poll::Ready(Ok(())) // EOF (no bytes written)
772                        }
773                        std::task::Poll::Pending => std::task::Poll::Pending,
774                    },
775                    None => std::task::Poll::Ready(Ok(())),
776                }
777            }
778        }
779
780        let s1 = store.clone();
781        let t1 = tokio::spawn(async move {
782            let reader = BlockingReader {
783                sent: false,
784                release: Some(release_rx),
785            };
786            // expected_len 1: completes once the single byte is read and the reader returns EOF.
787            s1.put_file("race.bin", reader, 0, 1).await
788        });
789
790        // Wait until transfer #1 has actually claimed the name (its partial exists).
791        let partial = root.join("race.bin.partial");
792        for _ in 0..200 {
793            if partial.exists() {
794                break;
795            }
796            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
797        }
798        assert!(
799            partial.exists(),
800            "transfer #1 should have created the partial"
801        );
802
803        // Transfer #2 for the SAME name, while #1 still holds the claim → FileExists, no interleave.
804        let err = store
805            .put_file("race.bin", &b"yy"[..], 1, 3)
806            .await
807            .unwrap_err();
808        assert!(
809            matches!(err, TaildropError::FileExists),
810            "a concurrent transfer for an in-flight name must be rejected, got {err:?}"
811        );
812
813        // Release #1 so it finalizes cleanly, and confirm the name frees afterward.
814        release_tx.send(()).unwrap();
815        t1.await.unwrap().unwrap();
816        // Now the name is free: a fresh transfer succeeds (the guard was released on drop).
817        store.put_file("race.bin", &b"z"[..], 0, 1).await.unwrap();
818
819        std::fs::remove_dir_all(&root).ok();
820    }
821
822    #[cfg(unix)]
823    #[tokio::test]
824    async fn symlink_in_store_root_is_refused_not_followed() {
825        use std::os::unix::fs::symlink;
826
827        let root = tmp_root();
828        let store = TaildropStore::new(&root).unwrap();
829
830        // An attacker-planted symlink in the store root, pointing at a sensitive file OUTSIDE it.
831        let outside = tmp_root();
832        std::fs::create_dir_all(&outside).unwrap();
833        let secret = outside.join("secret");
834        std::fs::write(&secret, b"TOP SECRET").unwrap();
835
836        // (a) open_file must refuse a symlink target, never read through it.
837        let link = root.join("link.txt");
838        symlink(&secret, &link).unwrap();
839        let open_err = store.open_file("link.txt").unwrap_err();
840        assert!(
841            matches!(open_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
842            "open_file must refuse a symlink, got {open_err:?}"
843        );
844
845        // (b) delete_file must refuse the symlink, leaving BOTH the link and its target intact.
846        let del_err = store.delete_file("link.txt").unwrap_err();
847        assert!(
848            matches!(del_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
849            "delete_file must refuse a symlink, got {del_err:?}"
850        );
851        assert!(
852            secret.exists(),
853            "the symlink target must NOT have been deleted"
854        );
855        assert_eq!(std::fs::read(&secret).unwrap(), b"TOP SECRET");
856
857        // (c) waiting_files must not report the symlink as a waiting file.
858        assert!(
859            store.waiting_files().unwrap().is_empty(),
860            "a symlink in the store root must not be listed as a waiting file"
861        );
862
863        // (d) put_file onto a symlinked partial must refuse rather than write through the link.
864        let link_partial = root.join("evil.bin.partial");
865        symlink(&secret, &link_partial).unwrap();
866        let put_err = store
867            .put_file("evil.bin", &b"data"[..], 0, 4)
868            .await
869            .unwrap_err();
870        assert!(
871            matches!(put_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
872            "put_file must refuse a symlinked partial, got {put_err:?}"
873        );
874        assert_eq!(
875            std::fs::read(&secret).unwrap(),
876            b"TOP SECRET",
877            "the symlink target must NOT have been written through"
878        );
879
880        std::fs::remove_dir_all(&root).ok();
881        std::fs::remove_dir_all(&outside).ok();
882    }
883
884    #[test]
885    fn next_available_name_inserts_before_extension() {
886        let root = tmp_root();
887        std::fs::create_dir_all(&root).unwrap();
888        assert_eq!(next_available_name(&root, "a.txt"), "a.txt");
889        std::fs::write(root.join("a.txt"), b"x").unwrap();
890        assert_eq!(next_available_name(&root, "a.txt"), "a (1).txt");
891        std::fs::write(root.join("a (1).txt"), b"x").unwrap();
892        assert_eq!(next_available_name(&root, "a.txt"), "a (2).txt");
893        // Dotfile (no real extension) appends at end.
894        std::fs::write(root.join(".env"), b"x").unwrap();
895        assert_eq!(next_available_name(&root, ".env"), ".env (1)");
896        std::fs::remove_dir_all(&root).ok();
897    }
898}