ts_runtime/taildrop.rs
1//! Taildrop file store — the receiving half of Tailscale's peer-to-peer file transfer.
2//!
3//! A peer sends a file to this node via the peerAPI route `PUT /v0/put/<name>` (handled in
4//! `peerapi`). This module owns the on-disk store those puts land in, faithfully mirroring
5//! Go's `taildrop.manager`:
6//!
7//! - Incoming bytes are written to a per-transfer **partial** file (`<base>.partial`) so an
8//! interrupted transfer never exposes a truncated file under its real name, and can be resumed
9//! from an offset.
10//! - On successful completion the partial is **atomically renamed** to the final base name. If the
11//! final name already exists, a non-clobbering ` (n)` suffix is chosen (Go `nextFilename`).
12//! - File names are strictly validated ([`validate_base_name`](crate::taildrop::validate_base_name)) to defeat path traversal and
13//! reserved-suffix abuse before any path is constructed — this is the security boundary.
14//!
15//! # Anti-abuse / safety
16//!
17//! Every name is validated to be a single, local, non-traversing path component before it touches
18//! the filesystem; a name containing `/`, `\`, `..`, a NUL, control chars, or the reserved
19//! `.partial` / `.deleted` suffixes is rejected with [`TaildropError::InvalidFileName`](crate::taildrop::TaildropError::InvalidFileName). The store
20//! root is fixed at construction; all I/O is confined to it by joining only validated base names.
21
22use std::{
23 collections::HashSet,
24 io::{self, Seek, Write},
25 path::{Path, PathBuf},
26 sync::{Arc, Mutex},
27};
28
29use tokio::io::{AsyncRead, AsyncReadExt};
30
31/// Suffix for in-progress transfers. A completed transfer is renamed off this suffix; a name
32/// ending in it is itself never accepted as a base name (Go `partialSuffix`).
33const PARTIAL_SUFFIX: &str = ".partial";
34/// Suffix Go uses to tombstone files pending deletion on platforms with async close; we reject it
35/// as a base name for parity so a sender can't create one (Go `deletedSuffix`).
36const DELETED_SUFFIX: &str = ".deleted";
37/// Maximum base-name length in bytes (Go `validateBaseName`: 255).
38const MAX_BASE_NAME_LEN: usize = 255;
39
40/// Errors from the Taildrop file store.
41#[derive(Debug)]
42pub enum TaildropError {
43 /// The requested file name is invalid (traversal, reserved suffix, empty, too long, bad runes).
44 /// Maps to peerAPI `400 Bad Request`.
45 InvalidFileName,
46 /// A transfer for this exact base name is already in progress. Maps to peerAPI `409 Conflict`.
47 FileExists,
48 /// Underlying filesystem I/O failure. Maps to peerAPI `500`.
49 Io(io::Error),
50}
51
52impl core::fmt::Display for TaildropError {
53 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
54 match self {
55 TaildropError::InvalidFileName => write!(f, "invalid taildrop file name"),
56 TaildropError::FileExists => {
57 write!(f, "a transfer for this file is already in progress")
58 }
59 TaildropError::Io(e) => write!(f, "taildrop I/O error: {e}"),
60 }
61 }
62}
63
64impl std::error::Error for TaildropError {
65 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
66 match self {
67 TaildropError::Io(e) => Some(e),
68 _ => None,
69 }
70 }
71}
72
73impl From<io::Error> for TaildropError {
74 fn from(e: io::Error) -> Self {
75 TaildropError::Io(e)
76 }
77}
78
79/// A waiting (fully-received) Taildrop file, as reported to the embedder. Mirrors Go
80/// `apitype.WaitingFile` (default field-name JSON marshalling: `Name`, `Size`).
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct WaitingFile {
83 /// The file's base name.
84 pub name: String,
85 /// The file's size in bytes.
86 pub size: u64,
87}
88
89/// Validate a Taildrop base name, mirroring Go `taildrop.validateBaseName`.
90///
91/// Returns the name unchanged when it is a safe, single, local path component; otherwise `None`.
92/// Rejection rules (any one fails): empty or `> 255` bytes; leading/trailing ASCII space; contains
93/// a path separator (`/` or `\`), a NUL, or an ASCII control char; is `.` or `..`; equals a cleaned
94/// path other than itself (catches embedded `..`/`.` segments and absolute paths); or ends in the
95/// reserved `.partial` / `.deleted` suffixes.
96pub fn validate_base_name(name: &str) -> Option<&str> {
97 if name.is_empty() || name.len() > MAX_BASE_NAME_LEN {
98 return None;
99 }
100 if name.starts_with(' ') || name.ends_with(' ') {
101 return None;
102 }
103 if name == "." || name == ".." {
104 return None;
105 }
106 if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
107 return None;
108 }
109 // Reject any separator, NUL, or control character outright. This is the core traversal guard:
110 // with no `/`, `\`, or `..` segment possible, the name can only ever be a leaf in the store dir.
111 for ch in name.chars() {
112 if ch == '/' || ch == '\\' || ch == '\0' || ch.is_control() {
113 return None;
114 }
115 }
116 // Defense in depth: a name that does not survive `Path` normalization as a single normal
117 // component is rejected (catches `..`, absolute paths, and any platform-specific oddity).
118 let p = Path::new(name);
119 let mut comps = p.components();
120 match (comps.next(), comps.next()) {
121 (Some(std::path::Component::Normal(c)), None) if c == name => Some(name),
122 _ => None,
123 }
124}
125
126/// Choose a non-clobbering final name for `base` within `dir`, mirroring Go `nextFilename`:
127/// `foo.txt` -> `foo (1).txt` -> `foo (2).txt` ... inserting ` (n)` before the extension. Returns
128/// the first candidate (incl. `base` itself) whose path does not yet exist. Bounded to avoid an
129/// unbounded loop on a pathological directory.
130fn next_available_name(dir: &Path, base: &str) -> String {
131 if !path_present(&dir.join(base)) {
132 return base.to_string();
133 }
134 let (stem, ext) = match base.rsplit_once('.') {
135 // Keep the dot with the extension; an empty stem (dotfile like ".bashrc") has no split.
136 Some((stem, ext)) if !stem.is_empty() => (stem, format!(".{ext}")),
137 _ => (base, String::new()),
138 };
139 for n in 1..=10_000u32 {
140 let candidate = format!("{stem} ({n}){ext}");
141 if !path_present(&dir.join(&candidate)) {
142 return candidate;
143 }
144 }
145 // Pathological fallback: suffix with a high counter; extremely unlikely to be reached.
146 format!("{stem} (overflow){ext}")
147}
148
149/// Whether a path is present, treating a symlink (even a dangling one) as present. Unlike
150/// `Path::exists()` (which follows the link and returns `false` for a dangling symlink), this uses
151/// `symlink_metadata` so a planted symlink can never be mistaken for a free name in
152/// [`next_available_name`] — we must not select, then rename onto, a symlink.
153fn path_present(path: &Path) -> bool {
154 std::fs::symlink_metadata(path).is_ok()
155}
156
157/// Reject a path that is (or whose final component is) a symlink, mirroring the intent of Go's
158/// `O_NOFOLLOW` on the taildrop file ops. `validate_base_name` already blocks a traversing *name*,
159/// but not a symlink **component already planted in the store root** by a local attacker (e.g.
160/// `root/foo.txt -> /etc/cron.d/x`), which a plain `open`/`rename`/`remove` would follow. Uses
161/// `symlink_metadata` (lstat — does NOT follow the final symlink); a non-existent path is fine
162/// (returns `Ok(())`), only an existing symlink is refused.
163///
164/// This is a check-then-act guard, so it is not atomic with the open/rename/remove that follows.
165/// It kills the **persistent-plant** attack (a symlink left in the store root is no longer followed
166/// deterministically), and the per-name in-flight lock serializes our OWN operations on a name, but
167/// it does not close a sub-millisecond race where an external process swaps the path for a symlink
168/// between this lstat and the syscall. That residual requires an external writer who already holds
169/// store-dir write access (the threat bound for this hardening), and is the one axis where this is
170/// weaker than Go's atomic `O_NOFOLLOW`. The `offset == 0` put is additionally protected by
171/// `create_new` (`O_EXCL`, which refuses an existing symlink atomically); the `offset > 0` resume
172/// open is plain `write(true)` and relies on this advisory check until `O_NOFOLLOW` is wired
173/// (tracked as a follow-up — its raw value is arch-dependent, so a portable open flag is deferred).
174fn refuse_symlink(path: &Path) -> Result<(), TaildropError> {
175 match std::fs::symlink_metadata(path) {
176 Ok(meta) if meta.file_type().is_symlink() => Err(TaildropError::Io(io::Error::new(
177 io::ErrorKind::InvalidInput,
178 "taildrop path is a symlink; refusing to follow it",
179 ))),
180 Ok(_) => Ok(()),
181 // Not present yet (the common case for a fresh partial / final name) — nothing to refuse.
182 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
183 Err(e) => Err(e.into()),
184 }
185}
186
187/// A Taildrop file store rooted at a fixed directory. All operations are confined to this root by
188/// joining only [`validate_base_name`]-validated names.
189#[derive(Debug, Clone)]
190pub struct TaildropStore {
191 root: PathBuf,
192 /// Base names with a transfer currently in flight. A `put_file` claims its base name here for
193 /// the whole receive (both a fresh `offset == 0` transfer and a resumed `offset > 0` one), so
194 /// two concurrent PUTs for the same name cannot interleave `set_len`/`seek`/`write_all` and
195 /// corrupt the shared `.partial`. Shared (`Arc`) so it survives `TaildropStore::clone()` — the
196 /// store is handed around as `Arc<TaildropStore>` but cloning it must not fork the guard set.
197 in_flight: Arc<Mutex<HashSet<String>>>,
198}
199
200/// RAII claim on an in-flight transfer name; releasing it (on drop) frees the name for the next
201/// transfer. Holds the shared guard set so the entry is removed even on an early return / error /
202/// panic in `put_file`.
203struct InFlightGuard {
204 set: Arc<Mutex<HashSet<String>>>,
205 name: String,
206}
207
208impl Drop for InFlightGuard {
209 fn drop(&mut self) {
210 // A poisoned lock still lets us recover the set and remove our entry — leaving a stale name
211 // claimed would wedge all future transfers of that name behind a phantom conflict.
212 let mut set = self.set.lock().unwrap_or_else(|p| p.into_inner());
213 set.remove(&self.name);
214 }
215}
216
217impl TaildropStore {
218 /// Create a store rooted at `root`, creating the directory (and parents) if needed.
219 pub fn new(root: impl Into<PathBuf>) -> Result<Self, TaildropError> {
220 let root = root.into();
221 std::fs::create_dir_all(&root)?;
222 Ok(Self {
223 root,
224 in_flight: Arc::new(Mutex::new(HashSet::new())),
225 })
226 }
227
228 /// Claim `base` as in-flight, returning an RAII guard that frees it on drop. Returns
229 /// [`TaildropError::FileExists`] if another transfer already holds the name — this is the
230 /// concurrency analog of the on-disk `.partial` conflict, and it serializes all transfers of one
231 /// name so a resume (`offset > 0`) cannot race a concurrent transfer's `set_len`/`seek`/`write`.
232 fn claim_in_flight(&self, base: &str) -> Result<InFlightGuard, TaildropError> {
233 let name = base.to_string();
234 let mut set = self.in_flight.lock().unwrap_or_else(|p| p.into_inner());
235 if !set.insert(name.clone()) {
236 return Err(TaildropError::FileExists);
237 }
238 Ok(InFlightGuard {
239 set: self.in_flight.clone(),
240 name,
241 })
242 }
243
244 /// The partial-file path for an already-validated base name.
245 fn partial_path(&self, base: &str) -> PathBuf {
246 self.root.join(format!("{base}{PARTIAL_SUFFIX}"))
247 }
248
249 /// Receive a file named `name` from `reader`, writing to `<name>.partial` then atomically
250 /// renaming to a non-clobbering final name on success. Mirrors Go `manager.PutFile`.
251 ///
252 /// `offset` lets a resumed transfer append past already-written bytes (the partial is opened, the
253 /// write starts at `offset`, and any bytes already on disk past `offset` are truncated away).
254 /// `expected_len` is the declared total length of the completed file (the request's
255 /// `Content-Length` plus `offset`); the transfer is finalized only if exactly that many bytes are
256 /// present. Returns the total number of bytes in the completed file.
257 ///
258 /// Fail-closed: an invalid name is rejected before any path is built; an in-progress partial for
259 /// the same name yields [`TaildropError::FileExists`]; an out-of-range resume `offset` (past the
260 /// current partial length) is rejected; an I/O error mid-transfer — or a body that ends before
261 /// `expected_len` (a short/interrupted stream) — leaves the `.partial` on disk and the final name
262 /// is never created. This matches Go `feature/taildrop/send.go`, which errors when the copied
263 /// length does not equal the declared length rather than publishing a truncated file.
264 ///
265 /// The retained `.partial` is resumable only by a peer that issues a ranged retry (an `offset > 0`
266 /// PUT); a sender that always restarts at `offset == 0` will instead hit the in-progress-conflict
267 /// path ([`TaildropError::FileExists`]) until the stale partial is cleared. There is no automatic
268 /// reaper for an abandoned partial yet (Go's `fileDeleter` GCs one after ~1h); tracked separately.
269 pub async fn put_file<R>(
270 &self,
271 name: &str,
272 mut reader: R,
273 offset: u64,
274 expected_len: u64,
275 ) -> Result<u64, TaildropError>
276 where
277 R: AsyncRead + Unpin,
278 {
279 let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
280 let partial = self.partial_path(base);
281
282 // Claim the name for the whole transfer FIRST, so two concurrent PUTs for the same base name
283 // (especially two resumes, `offset > 0`, which reopen the same `.partial`) cannot interleave
284 // their `set_len`/`seek`/`write_all` and corrupt the shared partial. The fresh-transfer path
285 // is already protected on disk by `create_new`, but the resume path opens with plain
286 // `write(true)` and needs this lock. The guard frees the name on drop (incl. early return /
287 // error / panic). Held across the await — it is a cheap `HashSet` membership marker, not a
288 // lock held during I/O, so it never blocks the runtime.
289 let _claim = self.claim_in_flight(base)?;
290
291 // Refuse to follow a symlink planted in the store root (Go's `O_NOFOLLOW` intent): the
292 // partial must be a regular file we create/own, never a pre-existing symlink to elsewhere.
293 refuse_symlink(&partial)?;
294
295 // A fresh transfer (offset 0) must not collide with another in-flight transfer of the same
296 // name; a resume (offset > 0) reopens the existing partial. File handles are std (the tokio
297 // `fs` feature is intentionally not enabled in this crate); the body is read async off the
298 // overlay stream and written to the blocking handle in a bounded loop.
299 let mut file = if offset == 0 {
300 match std::fs::OpenOptions::new()
301 .write(true)
302 .create_new(true)
303 .open(&partial)
304 {
305 Ok(f) => f,
306 Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
307 return Err(TaildropError::FileExists);
308 }
309 Err(e) => return Err(e.into()),
310 }
311 } else {
312 let mut f = std::fs::OpenOptions::new().write(true).open(&partial)?;
313 // Bound the resume offset to the current partial length and truncate any bytes past it,
314 // matching Go `feature/taildrop/fileops_fs.go` (`OpenWriter` rejects `offset > curr` and
315 // `Truncate(offset)`s). Without the bound a too-large offset would leave a zero-filled
316 // sparse hole; without the truncate a shorter resumed body would leave a prior attempt's
317 // stale tail past the new end. `metadata().len()` is the partial's current size.
318 let current = f.metadata()?.len();
319 if offset > current {
320 return Err(TaildropError::Io(io::Error::new(
321 io::ErrorKind::InvalidInput,
322 "taildrop resume offset is past the end of the partial file",
323 )));
324 }
325 f.set_len(offset)?;
326 f.seek(io::SeekFrom::Start(offset))?;
327 f
328 };
329
330 let mut copied: u64 = 0;
331 let mut buf = [0u8; 64 * 1024];
332 loop {
333 let n = reader.read(&mut buf).await?;
334 if n == 0 {
335 break;
336 }
337 // Each `write_all` only pushes the chunk into the page cache (microseconds); the
338 // genuinely blocking cost is the terminal `flush`/`sync_all`/`rename` below, which we
339 // hand to a blocking thread so a flood of concurrent transfers can't starve the tokio
340 // worker pool on fsync (see `peerapi::MAX_INFLIGHT`).
341 file.write_all(&buf[..n])?;
342 copied += n as u64;
343 }
344
345 // Length check (Go `send.go`: error when `copyLength != length`). A body that ended before the
346 // declared length — an interrupted/short stream — must NOT be finalized as a complete file;
347 // leave the `.partial` on disk (with the bytes received so far) so a Range-capable peer can
348 // resume it. `checked_add` rather than a bare `+`: `offset` is an attacker-supplied header and
349 // the bound above already rejects an `offset` past the (real, on-disk) partial length, so this
350 // cannot overflow in practice — but treat an overflow as a length mismatch rather than a panic.
351 let total = match offset.checked_add(copied) {
352 Some(t) if t == expected_len => t,
353 _ => {
354 return Err(TaildropError::Io(io::Error::new(
355 io::ErrorKind::UnexpectedEof,
356 format!(
357 "taildrop body ended early: got {copied} of {expected_len} expected bytes \
358 at offset {offset}; leaving partial for resume"
359 ),
360 )));
361 }
362 };
363
364 // Finalize off the async runtime: `sync_all` (fsync) and `rename` are the dominant blocking
365 // operations, so run them on a blocking thread. The `File` and both paths are owned by the
366 // closure (`Send + 'static`), and `next_available_name` (which `stat`s candidates) goes with
367 // them. Fail-closed: any I/O error — or a join failure — propagates without ever publishing
368 // the final name, leaving the `.partial` in place for a later resume.
369 let root = self.root.clone();
370 let base = base.to_string();
371 tokio::task::spawn_blocking(move || -> io::Result<()> {
372 file.flush()?;
373 file.sync_all()?;
374 drop(file);
375
376 // Atomically publish under a non-clobbering final name. `next_available_name` probes
377 // candidates with `symlink_metadata` (not `exists`, which follows symlinks), so it will
378 // not treat a planted symlink as "free" and rename onto it; and we refuse to rename onto
379 // an existing symlink target outright (Go `O_NOFOLLOW` intent). The `_claim` guard (held
380 // by the caller for the whole transfer) keeps this name serialized against other PUTs.
381 let final_name = next_available_name(&root, &base);
382 let final_path = root.join(&final_name);
383 if let Err(e) = refuse_symlink(&final_path) {
384 return Err(match e {
385 TaildropError::Io(io_err) => io_err,
386 other => io::Error::other(other.to_string()),
387 });
388 }
389 std::fs::rename(&partial, &final_path)?;
390 Ok(())
391 })
392 .await
393 .map_err(|join_err| {
394 // A panicked/cancelled finalize task: surface as I/O so the caller maps it to a 500 and
395 // the partial is left untouched (never publishes the final name).
396 TaildropError::Io(io::Error::other(format!(
397 "taildrop finalize task failed: {join_err}"
398 )))
399 })??;
400
401 Ok(total)
402 }
403
404 /// List fully-received (non-partial) files, sorted by name (Go `WaitingFiles`).
405 pub fn waiting_files(&self) -> Result<Vec<WaitingFile>, TaildropError> {
406 let mut out = Vec::new();
407 let entries = match std::fs::read_dir(&self.root) {
408 Ok(e) => e,
409 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
410 Err(e) => return Err(e.into()),
411 };
412 for entry in entries {
413 let entry = entry?;
414 // `entry.metadata()` does NOT follow symlinks (it is `lstat`-based), so a planted
415 // symlink has `is_file() == false` here and is skipped — a symlink in the store root is
416 // never reported as a waiting file (Go `O_NOFOLLOW` intent), even one pointing at a real
417 // regular file elsewhere.
418 let meta = entry.metadata()?;
419 if meta.file_type().is_symlink() || !meta.is_file() {
420 continue;
421 }
422 let Ok(name) = entry.file_name().into_string() else {
423 continue;
424 };
425 // Skip in-progress / tombstoned files.
426 if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
427 continue;
428 }
429 out.push(WaitingFile {
430 name,
431 size: meta.len(),
432 });
433 }
434 out.sort_by(|a, b| a.name.cmp(&b.name));
435 Ok(out)
436 }
437
438 /// Delete a fully-received file by base name (Go `DeleteFile`). The name is validated first, so a
439 /// traversal attempt can never escape the store root, and a symlink at the target is refused (Go
440 /// `O_NOFOLLOW` intent) rather than followed — a planted `root/foo.txt -> /etc/passwd` must not
441 /// let a `delete foo.txt` remove the link's target.
442 pub fn delete_file(&self, name: &str) -> Result<(), TaildropError> {
443 let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
444 let path = self.root.join(base);
445 refuse_symlink(&path)?;
446 std::fs::remove_file(path)?;
447 Ok(())
448 }
449
450 /// Open a fully-received file by base name for reading, returning the handle and its size (Go
451 /// `OpenFile`). The name is validated first, and a symlink at the target is refused (Go
452 /// `O_NOFOLLOW` intent) so a planted symlink cannot redirect the read to an arbitrary file.
453 pub fn open_file(&self, name: &str) -> Result<(std::fs::File, u64), TaildropError> {
454 let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
455 let path = self.root.join(base);
456 refuse_symlink(&path)?;
457 let f = std::fs::File::open(path)?;
458 let size = f.metadata()?.len();
459 Ok((f, size))
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 fn tmp_root() -> PathBuf {
468 // A per-call atomic counter guarantees uniqueness across tests that run concurrently in the
469 // same binary. A timestamp alone is NOT enough: `SystemTime` resolution is coarse on some
470 // platforms, so two tests starting in the same tick would collide on one dir and stomp each
471 // other's files (the cause of intermittent taildrop-test flakiness under parallel runs).
472 static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
473 let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
474 let mut p = std::env::temp_dir();
475 p.push(format!("taildrop-test-{}-{n}", std::process::id()));
476 p
477 }
478
479 #[test]
480 fn validate_rejects_traversal_and_reserved() {
481 // Valid leaf names.
482 assert_eq!(validate_base_name("photo.jpg"), Some("photo.jpg"));
483 assert_eq!(
484 validate_base_name("a file with spaces.txt"),
485 Some("a file with spaces.txt")
486 );
487 assert_eq!(validate_base_name(".bashrc"), Some(".bashrc"));
488
489 // Traversal / separators.
490 assert_eq!(validate_base_name("../etc/passwd"), None);
491 assert_eq!(validate_base_name("a/b"), None);
492 assert_eq!(validate_base_name("a\\b"), None);
493 assert_eq!(validate_base_name("/abs"), None);
494 assert_eq!(validate_base_name(".."), None);
495 assert_eq!(validate_base_name("."), None);
496
497 // NUL / control.
498 assert_eq!(validate_base_name("a\0b"), None);
499 assert_eq!(validate_base_name("a\nb"), None);
500
501 // Reserved suffixes.
502 assert_eq!(validate_base_name("x.partial"), None);
503 assert_eq!(validate_base_name("x.deleted"), None);
504
505 // Edges.
506 assert_eq!(validate_base_name(""), None);
507 assert_eq!(validate_base_name(" leading"), None);
508 assert_eq!(validate_base_name("trailing "), None);
509 assert_eq!(validate_base_name(&"a".repeat(256)), None);
510 assert_eq!(
511 validate_base_name(&"a".repeat(255)).map(|s| s.len()),
512 Some(255)
513 );
514 }
515
516 #[tokio::test]
517 async fn put_file_writes_then_atomically_renames() {
518 let root = tmp_root();
519 let store = TaildropStore::new(&root).unwrap();
520
521 let data = b"hello taildrop";
522 let n = store
523 .put_file("greeting.txt", &data[..], 0, data.len() as u64)
524 .await
525 .unwrap();
526 assert_eq!(n, data.len() as u64);
527
528 // The final file exists; no .partial remains.
529 let body = std::fs::read(root.join("greeting.txt")).unwrap();
530 assert_eq!(body, data);
531 assert!(!root.join("greeting.txt.partial").exists());
532
533 let wf = store.waiting_files().unwrap();
534 assert_eq!(wf.len(), 1);
535 assert_eq!(wf[0].name, "greeting.txt");
536 assert_eq!(wf[0].size, data.len() as u64);
537
538 std::fs::remove_dir_all(&root).ok();
539 }
540
541 #[tokio::test]
542 async fn put_file_resumes_from_offset() {
543 let root = tmp_root();
544 let store = TaildropStore::new(&root).unwrap();
545
546 // Pre-write a prefix into the `.partial` directly, simulating bytes already received by an
547 // earlier (interrupted) transfer.
548 let prefix = b"the first half ";
549 let partial = root.join("resume.txt.partial");
550 std::fs::write(&partial, prefix).unwrap();
551
552 // Resume at offset == the prefix length: `put_file` opens the existing partial, seeks past
553 // the prefix, and appends the rest.
554 let rest = b"and the second half";
555 let total = store
556 .put_file(
557 "resume.txt",
558 &rest[..],
559 prefix.len() as u64,
560 (prefix.len() + rest.len()) as u64,
561 )
562 .await
563 .unwrap();
564
565 // The returned count is offset + freshly-copied bytes, and the final file is the prefix and
566 // the resumed bytes concatenated (the seek positioned the write correctly).
567 assert_eq!(total, (prefix.len() + rest.len()) as u64);
568 let body = std::fs::read(root.join("resume.txt")).unwrap();
569 let mut expected = prefix.to_vec();
570 expected.extend_from_slice(rest);
571 assert_eq!(body, expected);
572 assert!(!partial.exists());
573
574 std::fs::remove_dir_all(&root).ok();
575 }
576
577 #[tokio::test]
578 async fn put_file_short_body_leaves_partial_not_truncated_final() {
579 // F2: a body that ends before the declared length must NOT be finalized as a complete (but
580 // truncated) file under the real name. Go errors when copyLength != length; we leave the
581 // `.partial` in place for resume.
582 let root = tmp_root();
583 let store = TaildropStore::new(&root).unwrap();
584
585 // Reader yields 5 bytes but we declare 10 expected (a short/interrupted stream).
586 let err = store
587 .put_file("short.txt", &b"world"[..], 0, 10)
588 .await
589 .unwrap_err();
590 assert!(
591 matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::UnexpectedEof),
592 "a short body must error, got {err:?}"
593 );
594 // The final name was NEVER created; the partial remains with the bytes received so far.
595 assert!(!root.join("short.txt").exists(), "no truncated final file");
596 let partial = std::fs::read(root.join("short.txt.partial")).unwrap();
597 assert_eq!(
598 partial, b"world",
599 "partial holds the received prefix for resume"
600 );
601 assert!(store.waiting_files().unwrap().is_empty());
602
603 std::fs::remove_dir_all(&root).ok();
604 }
605
606 #[tokio::test]
607 async fn put_file_resume_offset_past_end_is_rejected() {
608 // F3: a resume offset beyond the current partial length must be rejected (Go errors
609 // "offset out of range"), not produce a zero-filled sparse hole.
610 let root = tmp_root();
611 let store = TaildropStore::new(&root).unwrap();
612 std::fs::write(root.join("sparse.txt.partial"), b"abc").unwrap(); // 3 bytes on disk
613
614 let err = store
615 .put_file("sparse.txt", &b"xyz"[..], 99, 102)
616 .await
617 .unwrap_err();
618 assert!(
619 matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
620 "offset past end must be rejected, got {err:?}"
621 );
622 // The partial is untouched (still 3 bytes), no final file.
623 assert_eq!(
624 std::fs::read(root.join("sparse.txt.partial")).unwrap(),
625 b"abc"
626 );
627 assert!(!root.join("sparse.txt").exists());
628
629 std::fs::remove_dir_all(&root).ok();
630 }
631
632 #[tokio::test]
633 async fn put_file_resume_truncates_stale_tail() {
634 // F3: resuming at an offset LESS than the current partial length must truncate the bytes
635 // past the offset (Go `Truncate(offset)`), so a stale tail from a prior attempt cannot
636 // survive past the newly-written end.
637 let root = tmp_root();
638 let store = TaildropStore::new(&root).unwrap();
639 // A prior attempt left 20 bytes; we resume at offset 5 with a 3-byte tail ⇒ final is 8 bytes.
640 std::fs::write(root.join("retry.txt.partial"), b"KEEPme-STALE-TAILxxx").unwrap();
641
642 let total = store
643 .put_file("retry.txt", &b"NEW"[..], 5, 8)
644 .await
645 .unwrap();
646 assert_eq!(total, 8);
647 let body = std::fs::read(root.join("retry.txt")).unwrap();
648 assert_eq!(
649 body, b"KEEPmNEW",
650 "bytes past offset 5 truncated, then NEW appended"
651 );
652
653 std::fs::remove_dir_all(&root).ok();
654 }
655
656 #[tokio::test]
657 async fn put_file_conflict_picks_non_clobbering_name() {
658 let root = tmp_root();
659 let store = TaildropStore::new(&root).unwrap();
660
661 store
662 .put_file("dup.txt", &b"first"[..], 0, 5)
663 .await
664 .unwrap();
665 store
666 .put_file("dup.txt", &b"second"[..], 0, 6)
667 .await
668 .unwrap();
669 store
670 .put_file("dup.txt", &b"third"[..], 0, 5)
671 .await
672 .unwrap();
673
674 // Original plus two non-clobbering renames.
675 assert!(root.join("dup.txt").exists());
676 assert!(root.join("dup (1).txt").exists());
677 assert!(root.join("dup (2).txt").exists());
678
679 let wf = store.waiting_files().unwrap();
680 assert_eq!(wf.len(), 3);
681
682 std::fs::remove_dir_all(&root).ok();
683 }
684
685 #[tokio::test]
686 async fn put_file_in_progress_partial_is_conflict() {
687 let root = tmp_root();
688 let store = TaildropStore::new(&root).unwrap();
689
690 // Simulate an in-flight transfer by pre-creating the .partial file.
691 std::fs::write(root.join("busy.txt.partial"), b"partial").unwrap();
692
693 let err = store
694 .put_file("busy.txt", &b"x"[..], 0, 1)
695 .await
696 .unwrap_err();
697 assert!(matches!(err, TaildropError::FileExists));
698
699 std::fs::remove_dir_all(&root).ok();
700 }
701
702 #[tokio::test]
703 async fn put_file_rejects_bad_name_before_any_io() {
704 let root = tmp_root();
705 let store = TaildropStore::new(&root).unwrap();
706
707 let err = store
708 .put_file("../escape", &b"x"[..], 0, 1)
709 .await
710 .unwrap_err();
711 assert!(matches!(err, TaildropError::InvalidFileName));
712 // Nothing was written anywhere.
713 assert!(store.waiting_files().unwrap().is_empty());
714
715 std::fs::remove_dir_all(&root).ok();
716 }
717
718 #[tokio::test]
719 async fn delete_and_open_roundtrip() {
720 let root = tmp_root();
721 let store = TaildropStore::new(&root).unwrap();
722
723 store.put_file("doc.bin", &b"abc"[..], 0, 3).await.unwrap();
724 let (_f, size) = store.open_file("doc.bin").unwrap();
725 assert_eq!(size, 3);
726
727 store.delete_file("doc.bin").unwrap();
728 assert!(store.waiting_files().unwrap().is_empty());
729
730 // Traversal can't reach outside the root.
731 assert!(matches!(
732 store.delete_file("../../etc/passwd"),
733 Err(TaildropError::InvalidFileName)
734 ));
735
736 std::fs::remove_dir_all(&root).ok();
737 }
738
739 #[tokio::test]
740 async fn concurrent_resume_for_same_name_is_serialized() {
741 // The in-flight name guard: while one transfer holds a base name, a second PUT for the SAME
742 // name (the resume-race the lock closes) is rejected with FileExists rather than interleaving
743 // writes into the shared `.partial`. We hold the first transfer open with a reader that never
744 // completes until we let it, then fire the second concurrently.
745 let root = tmp_root();
746 let store = Arc::new(TaildropStore::new(&root).unwrap());
747
748 // A reader that delivers a byte, then blocks until released — keeps transfer #1 in flight
749 // (and thus the name claimed) while we attempt transfer #2.
750 let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
751 struct BlockingReader {
752 sent: bool,
753 release: Option<tokio::sync::oneshot::Receiver<()>>,
754 }
755 impl AsyncRead for BlockingReader {
756 fn poll_read(
757 mut self: std::pin::Pin<&mut Self>,
758 cx: &mut std::task::Context<'_>,
759 buf: &mut tokio::io::ReadBuf<'_>,
760 ) -> std::task::Poll<io::Result<()>> {
761 if !self.sent {
762 buf.put_slice(b"x");
763 self.sent = true;
764 return std::task::Poll::Ready(Ok(()));
765 }
766 // After the first byte, park until released, then report EOF.
767 match self.release.as_mut() {
768 Some(rx) => match std::pin::Pin::new(rx).poll(cx) {
769 std::task::Poll::Ready(_) => {
770 self.release = None;
771 std::task::Poll::Ready(Ok(())) // EOF (no bytes written)
772 }
773 std::task::Poll::Pending => std::task::Poll::Pending,
774 },
775 None => std::task::Poll::Ready(Ok(())),
776 }
777 }
778 }
779
780 let s1 = store.clone();
781 let t1 = tokio::spawn(async move {
782 let reader = BlockingReader {
783 sent: false,
784 release: Some(release_rx),
785 };
786 // expected_len 1: completes once the single byte is read and the reader returns EOF.
787 s1.put_file("race.bin", reader, 0, 1).await
788 });
789
790 // Wait until transfer #1 has actually claimed the name (its partial exists).
791 let partial = root.join("race.bin.partial");
792 for _ in 0..200 {
793 if partial.exists() {
794 break;
795 }
796 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
797 }
798 assert!(
799 partial.exists(),
800 "transfer #1 should have created the partial"
801 );
802
803 // Transfer #2 for the SAME name, while #1 still holds the claim → FileExists, no interleave.
804 let err = store
805 .put_file("race.bin", &b"yy"[..], 1, 3)
806 .await
807 .unwrap_err();
808 assert!(
809 matches!(err, TaildropError::FileExists),
810 "a concurrent transfer for an in-flight name must be rejected, got {err:?}"
811 );
812
813 // Release #1 so it finalizes cleanly, and confirm the name frees afterward.
814 release_tx.send(()).unwrap();
815 t1.await.unwrap().unwrap();
816 // Now the name is free: a fresh transfer succeeds (the guard was released on drop).
817 store.put_file("race.bin", &b"z"[..], 0, 1).await.unwrap();
818
819 std::fs::remove_dir_all(&root).ok();
820 }
821
822 #[cfg(unix)]
823 #[tokio::test]
824 async fn symlink_in_store_root_is_refused_not_followed() {
825 use std::os::unix::fs::symlink;
826
827 let root = tmp_root();
828 let store = TaildropStore::new(&root).unwrap();
829
830 // An attacker-planted symlink in the store root, pointing at a sensitive file OUTSIDE it.
831 let outside = tmp_root();
832 std::fs::create_dir_all(&outside).unwrap();
833 let secret = outside.join("secret");
834 std::fs::write(&secret, b"TOP SECRET").unwrap();
835
836 // (a) open_file must refuse a symlink target, never read through it.
837 let link = root.join("link.txt");
838 symlink(&secret, &link).unwrap();
839 let open_err = store.open_file("link.txt").unwrap_err();
840 assert!(
841 matches!(open_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
842 "open_file must refuse a symlink, got {open_err:?}"
843 );
844
845 // (b) delete_file must refuse the symlink, leaving BOTH the link and its target intact.
846 let del_err = store.delete_file("link.txt").unwrap_err();
847 assert!(
848 matches!(del_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
849 "delete_file must refuse a symlink, got {del_err:?}"
850 );
851 assert!(
852 secret.exists(),
853 "the symlink target must NOT have been deleted"
854 );
855 assert_eq!(std::fs::read(&secret).unwrap(), b"TOP SECRET");
856
857 // (c) waiting_files must not report the symlink as a waiting file.
858 assert!(
859 store.waiting_files().unwrap().is_empty(),
860 "a symlink in the store root must not be listed as a waiting file"
861 );
862
863 // (d) put_file onto a symlinked partial must refuse rather than write through the link.
864 let link_partial = root.join("evil.bin.partial");
865 symlink(&secret, &link_partial).unwrap();
866 let put_err = store
867 .put_file("evil.bin", &b"data"[..], 0, 4)
868 .await
869 .unwrap_err();
870 assert!(
871 matches!(put_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
872 "put_file must refuse a symlinked partial, got {put_err:?}"
873 );
874 assert_eq!(
875 std::fs::read(&secret).unwrap(),
876 b"TOP SECRET",
877 "the symlink target must NOT have been written through"
878 );
879
880 std::fs::remove_dir_all(&root).ok();
881 std::fs::remove_dir_all(&outside).ok();
882 }
883
884 #[test]
885 fn next_available_name_inserts_before_extension() {
886 let root = tmp_root();
887 std::fs::create_dir_all(&root).unwrap();
888 assert_eq!(next_available_name(&root, "a.txt"), "a.txt");
889 std::fs::write(root.join("a.txt"), b"x").unwrap();
890 assert_eq!(next_available_name(&root, "a.txt"), "a (1).txt");
891 std::fs::write(root.join("a (1).txt"), b"x").unwrap();
892 assert_eq!(next_available_name(&root, "a.txt"), "a (2).txt");
893 // Dotfile (no real extension) appends at end.
894 std::fs::write(root.join(".env"), b"x").unwrap();
895 assert_eq!(next_available_name(&root, ".env"), ".env (1)");
896 std::fs::remove_dir_all(&root).ok();
897 }
898}