use std::{
collections::HashSet,
io::{self, Seek, Write},
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
use tokio::io::{AsyncRead, AsyncReadExt};
const PARTIAL_SUFFIX: &str = ".partial";
const DELETED_SUFFIX: &str = ".deleted";
const MAX_BASE_NAME_LEN: usize = 255;
#[derive(Debug)]
pub enum TaildropError {
InvalidFileName,
FileExists,
Io(io::Error),
}
impl core::fmt::Display for TaildropError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
TaildropError::InvalidFileName => write!(f, "invalid taildrop file name"),
TaildropError::FileExists => {
write!(f, "a transfer for this file is already in progress")
}
TaildropError::Io(e) => write!(f, "taildrop I/O error: {e}"),
}
}
}
impl std::error::Error for TaildropError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
TaildropError::Io(e) => Some(e),
_ => None,
}
}
}
impl From<io::Error> for TaildropError {
fn from(e: io::Error) -> Self {
TaildropError::Io(e)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WaitingFile {
pub name: String,
pub size: u64,
}
pub fn validate_base_name(name: &str) -> Option<&str> {
if name.is_empty() || name.len() > MAX_BASE_NAME_LEN {
return None;
}
if name.starts_with(' ') || name.ends_with(' ') {
return None;
}
if name == "." || name == ".." {
return None;
}
if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
return None;
}
for ch in name.chars() {
if ch == '/' || ch == '\\' || ch == '\0' || ch.is_control() {
return None;
}
}
let p = Path::new(name);
let mut comps = p.components();
match (comps.next(), comps.next()) {
(Some(std::path::Component::Normal(c)), None) if c == name => Some(name),
_ => None,
}
}
fn next_available_name(dir: &Path, base: &str) -> String {
if !path_present(&dir.join(base)) {
return base.to_string();
}
let (stem, ext) = match base.rsplit_once('.') {
Some((stem, ext)) if !stem.is_empty() => (stem, format!(".{ext}")),
_ => (base, String::new()),
};
for n in 1..=10_000u32 {
let candidate = format!("{stem} ({n}){ext}");
if !path_present(&dir.join(&candidate)) {
return candidate;
}
}
format!("{stem} (overflow){ext}")
}
fn path_present(path: &Path) -> bool {
std::fs::symlink_metadata(path).is_ok()
}
fn refuse_symlink(path: &Path) -> Result<(), TaildropError> {
match std::fs::symlink_metadata(path) {
Ok(meta) if meta.file_type().is_symlink() => Err(TaildropError::Io(io::Error::new(
io::ErrorKind::InvalidInput,
"taildrop path is a symlink; refusing to follow it",
))),
Ok(_) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e.into()),
}
}
#[derive(Debug, Clone)]
pub struct TaildropStore {
root: PathBuf,
in_flight: Arc<Mutex<HashSet<String>>>,
}
struct InFlightGuard {
set: Arc<Mutex<HashSet<String>>>,
name: String,
}
impl Drop for InFlightGuard {
fn drop(&mut self) {
let mut set = self.set.lock().unwrap_or_else(|p| p.into_inner());
set.remove(&self.name);
}
}
impl TaildropStore {
pub fn new(root: impl Into<PathBuf>) -> Result<Self, TaildropError> {
let root = root.into();
std::fs::create_dir_all(&root)?;
Ok(Self {
root,
in_flight: Arc::new(Mutex::new(HashSet::new())),
})
}
fn claim_in_flight(&self, base: &str) -> Result<InFlightGuard, TaildropError> {
let name = base.to_string();
let mut set = self.in_flight.lock().unwrap_or_else(|p| p.into_inner());
if !set.insert(name.clone()) {
return Err(TaildropError::FileExists);
}
Ok(InFlightGuard {
set: self.in_flight.clone(),
name,
})
}
fn partial_path(&self, base: &str) -> PathBuf {
self.root.join(format!("{base}{PARTIAL_SUFFIX}"))
}
pub async fn put_file<R>(
&self,
name: &str,
mut reader: R,
offset: u64,
expected_len: u64,
) -> Result<u64, TaildropError>
where
R: AsyncRead + Unpin,
{
let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
let partial = self.partial_path(base);
let _claim = self.claim_in_flight(base)?;
refuse_symlink(&partial)?;
let mut file = if offset == 0 {
match std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&partial)
{
Ok(f) => f,
Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
return Err(TaildropError::FileExists);
}
Err(e) => return Err(e.into()),
}
} else {
let mut f = std::fs::OpenOptions::new().write(true).open(&partial)?;
let current = f.metadata()?.len();
if offset > current {
return Err(TaildropError::Io(io::Error::new(
io::ErrorKind::InvalidInput,
"taildrop resume offset is past the end of the partial file",
)));
}
f.set_len(offset)?;
f.seek(io::SeekFrom::Start(offset))?;
f
};
let mut copied: u64 = 0;
let mut buf = [0u8; 64 * 1024];
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
break;
}
file.write_all(&buf[..n])?;
copied += n as u64;
}
let total = match offset.checked_add(copied) {
Some(t) if t == expected_len => t,
_ => {
return Err(TaildropError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"taildrop body ended early: got {copied} of {expected_len} expected bytes \
at offset {offset}; leaving partial for resume"
),
)));
}
};
let root = self.root.clone();
let base = base.to_string();
tokio::task::spawn_blocking(move || -> io::Result<()> {
file.flush()?;
file.sync_all()?;
drop(file);
let final_name = next_available_name(&root, &base);
let final_path = root.join(&final_name);
if let Err(e) = refuse_symlink(&final_path) {
return Err(match e {
TaildropError::Io(io_err) => io_err,
other => io::Error::other(other.to_string()),
});
}
std::fs::rename(&partial, &final_path)?;
Ok(())
})
.await
.map_err(|join_err| {
TaildropError::Io(io::Error::other(format!(
"taildrop finalize task failed: {join_err}"
)))
})??;
Ok(total)
}
pub fn waiting_files(&self) -> Result<Vec<WaitingFile>, TaildropError> {
let mut out = Vec::new();
let entries = match std::fs::read_dir(&self.root) {
Ok(e) => e,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
Err(e) => return Err(e.into()),
};
for entry in entries {
let entry = entry?;
let meta = entry.metadata()?;
if meta.file_type().is_symlink() || !meta.is_file() {
continue;
}
let Ok(name) = entry.file_name().into_string() else {
continue;
};
if name.ends_with(PARTIAL_SUFFIX) || name.ends_with(DELETED_SUFFIX) {
continue;
}
out.push(WaitingFile {
name,
size: meta.len(),
});
}
out.sort_by(|a, b| a.name.cmp(&b.name));
Ok(out)
}
pub fn delete_file(&self, name: &str) -> Result<(), TaildropError> {
let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
let path = self.root.join(base);
refuse_symlink(&path)?;
std::fs::remove_file(path)?;
Ok(())
}
pub fn open_file(&self, name: &str) -> Result<(std::fs::File, u64), TaildropError> {
let base = validate_base_name(name).ok_or(TaildropError::InvalidFileName)?;
let path = self.root.join(base);
refuse_symlink(&path)?;
let f = std::fs::File::open(path)?;
let size = f.metadata()?.len();
Ok((f, size))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn tmp_root() -> PathBuf {
static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut p = std::env::temp_dir();
p.push(format!("taildrop-test-{}-{n}", std::process::id()));
p
}
#[test]
fn validate_rejects_traversal_and_reserved() {
assert_eq!(validate_base_name("photo.jpg"), Some("photo.jpg"));
assert_eq!(
validate_base_name("a file with spaces.txt"),
Some("a file with spaces.txt")
);
assert_eq!(validate_base_name(".bashrc"), Some(".bashrc"));
assert_eq!(validate_base_name("../etc/passwd"), None);
assert_eq!(validate_base_name("a/b"), None);
assert_eq!(validate_base_name("a\\b"), None);
assert_eq!(validate_base_name("/abs"), None);
assert_eq!(validate_base_name(".."), None);
assert_eq!(validate_base_name("."), None);
assert_eq!(validate_base_name("a\0b"), None);
assert_eq!(validate_base_name("a\nb"), None);
assert_eq!(validate_base_name("x.partial"), None);
assert_eq!(validate_base_name("x.deleted"), None);
assert_eq!(validate_base_name(""), None);
assert_eq!(validate_base_name(" leading"), None);
assert_eq!(validate_base_name("trailing "), None);
assert_eq!(validate_base_name(&"a".repeat(256)), None);
assert_eq!(
validate_base_name(&"a".repeat(255)).map(|s| s.len()),
Some(255)
);
}
#[tokio::test]
async fn put_file_writes_then_atomically_renames() {
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
let data = b"hello taildrop";
let n = store
.put_file("greeting.txt", &data[..], 0, data.len() as u64)
.await
.unwrap();
assert_eq!(n, data.len() as u64);
let body = std::fs::read(root.join("greeting.txt")).unwrap();
assert_eq!(body, data);
assert!(!root.join("greeting.txt.partial").exists());
let wf = store.waiting_files().unwrap();
assert_eq!(wf.len(), 1);
assert_eq!(wf[0].name, "greeting.txt");
assert_eq!(wf[0].size, data.len() as u64);
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn put_file_resumes_from_offset() {
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
let prefix = b"the first half ";
let partial = root.join("resume.txt.partial");
std::fs::write(&partial, prefix).unwrap();
let rest = b"and the second half";
let total = store
.put_file(
"resume.txt",
&rest[..],
prefix.len() as u64,
(prefix.len() + rest.len()) as u64,
)
.await
.unwrap();
assert_eq!(total, (prefix.len() + rest.len()) as u64);
let body = std::fs::read(root.join("resume.txt")).unwrap();
let mut expected = prefix.to_vec();
expected.extend_from_slice(rest);
assert_eq!(body, expected);
assert!(!partial.exists());
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn put_file_short_body_leaves_partial_not_truncated_final() {
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
let err = store
.put_file("short.txt", &b"world"[..], 0, 10)
.await
.unwrap_err();
assert!(
matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::UnexpectedEof),
"a short body must error, got {err:?}"
);
assert!(!root.join("short.txt").exists(), "no truncated final file");
let partial = std::fs::read(root.join("short.txt.partial")).unwrap();
assert_eq!(
partial, b"world",
"partial holds the received prefix for resume"
);
assert!(store.waiting_files().unwrap().is_empty());
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn put_file_resume_offset_past_end_is_rejected() {
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
std::fs::write(root.join("sparse.txt.partial"), b"abc").unwrap();
let err = store
.put_file("sparse.txt", &b"xyz"[..], 99, 102)
.await
.unwrap_err();
assert!(
matches!(err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
"offset past end must be rejected, got {err:?}"
);
assert_eq!(
std::fs::read(root.join("sparse.txt.partial")).unwrap(),
b"abc"
);
assert!(!root.join("sparse.txt").exists());
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn put_file_resume_truncates_stale_tail() {
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
std::fs::write(root.join("retry.txt.partial"), b"KEEPme-STALE-TAILxxx").unwrap();
let total = store
.put_file("retry.txt", &b"NEW"[..], 5, 8)
.await
.unwrap();
assert_eq!(total, 8);
let body = std::fs::read(root.join("retry.txt")).unwrap();
assert_eq!(
body, b"KEEPmNEW",
"bytes past offset 5 truncated, then NEW appended"
);
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn put_file_conflict_picks_non_clobbering_name() {
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
store
.put_file("dup.txt", &b"first"[..], 0, 5)
.await
.unwrap();
store
.put_file("dup.txt", &b"second"[..], 0, 6)
.await
.unwrap();
store
.put_file("dup.txt", &b"third"[..], 0, 5)
.await
.unwrap();
assert!(root.join("dup.txt").exists());
assert!(root.join("dup (1).txt").exists());
assert!(root.join("dup (2).txt").exists());
let wf = store.waiting_files().unwrap();
assert_eq!(wf.len(), 3);
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn put_file_in_progress_partial_is_conflict() {
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
std::fs::write(root.join("busy.txt.partial"), b"partial").unwrap();
let err = store
.put_file("busy.txt", &b"x"[..], 0, 1)
.await
.unwrap_err();
assert!(matches!(err, TaildropError::FileExists));
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn put_file_rejects_bad_name_before_any_io() {
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
let err = store
.put_file("../escape", &b"x"[..], 0, 1)
.await
.unwrap_err();
assert!(matches!(err, TaildropError::InvalidFileName));
assert!(store.waiting_files().unwrap().is_empty());
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn delete_and_open_roundtrip() {
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
store.put_file("doc.bin", &b"abc"[..], 0, 3).await.unwrap();
let (_f, size) = store.open_file("doc.bin").unwrap();
assert_eq!(size, 3);
store.delete_file("doc.bin").unwrap();
assert!(store.waiting_files().unwrap().is_empty());
assert!(matches!(
store.delete_file("../../etc/passwd"),
Err(TaildropError::InvalidFileName)
));
std::fs::remove_dir_all(&root).ok();
}
#[tokio::test]
async fn concurrent_resume_for_same_name_is_serialized() {
let root = tmp_root();
let store = Arc::new(TaildropStore::new(&root).unwrap());
let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
struct BlockingReader {
sent: bool,
release: Option<tokio::sync::oneshot::Receiver<()>>,
}
impl AsyncRead for BlockingReader {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<io::Result<()>> {
if !self.sent {
buf.put_slice(b"x");
self.sent = true;
return std::task::Poll::Ready(Ok(()));
}
match self.release.as_mut() {
Some(rx) => match std::pin::Pin::new(rx).poll(cx) {
std::task::Poll::Ready(_) => {
self.release = None;
std::task::Poll::Ready(Ok(())) }
std::task::Poll::Pending => std::task::Poll::Pending,
},
None => std::task::Poll::Ready(Ok(())),
}
}
}
let s1 = store.clone();
let t1 = tokio::spawn(async move {
let reader = BlockingReader {
sent: false,
release: Some(release_rx),
};
s1.put_file("race.bin", reader, 0, 1).await
});
let partial = root.join("race.bin.partial");
for _ in 0..200 {
if partial.exists() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
assert!(
partial.exists(),
"transfer #1 should have created the partial"
);
let err = store
.put_file("race.bin", &b"yy"[..], 1, 3)
.await
.unwrap_err();
assert!(
matches!(err, TaildropError::FileExists),
"a concurrent transfer for an in-flight name must be rejected, got {err:?}"
);
release_tx.send(()).unwrap();
t1.await.unwrap().unwrap();
store.put_file("race.bin", &b"z"[..], 0, 1).await.unwrap();
std::fs::remove_dir_all(&root).ok();
}
#[cfg(unix)]
#[tokio::test]
async fn symlink_in_store_root_is_refused_not_followed() {
use std::os::unix::fs::symlink;
let root = tmp_root();
let store = TaildropStore::new(&root).unwrap();
let outside = tmp_root();
std::fs::create_dir_all(&outside).unwrap();
let secret = outside.join("secret");
std::fs::write(&secret, b"TOP SECRET").unwrap();
let link = root.join("link.txt");
symlink(&secret, &link).unwrap();
let open_err = store.open_file("link.txt").unwrap_err();
assert!(
matches!(open_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
"open_file must refuse a symlink, got {open_err:?}"
);
let del_err = store.delete_file("link.txt").unwrap_err();
assert!(
matches!(del_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
"delete_file must refuse a symlink, got {del_err:?}"
);
assert!(
secret.exists(),
"the symlink target must NOT have been deleted"
);
assert_eq!(std::fs::read(&secret).unwrap(), b"TOP SECRET");
assert!(
store.waiting_files().unwrap().is_empty(),
"a symlink in the store root must not be listed as a waiting file"
);
let link_partial = root.join("evil.bin.partial");
symlink(&secret, &link_partial).unwrap();
let put_err = store
.put_file("evil.bin", &b"data"[..], 0, 4)
.await
.unwrap_err();
assert!(
matches!(put_err, TaildropError::Io(ref e) if e.kind() == io::ErrorKind::InvalidInput),
"put_file must refuse a symlinked partial, got {put_err:?}"
);
assert_eq!(
std::fs::read(&secret).unwrap(),
b"TOP SECRET",
"the symlink target must NOT have been written through"
);
std::fs::remove_dir_all(&root).ok();
std::fs::remove_dir_all(&outside).ok();
}
#[test]
fn next_available_name_inserts_before_extension() {
let root = tmp_root();
std::fs::create_dir_all(&root).unwrap();
assert_eq!(next_available_name(&root, "a.txt"), "a.txt");
std::fs::write(root.join("a.txt"), b"x").unwrap();
assert_eq!(next_available_name(&root, "a.txt"), "a (1).txt");
std::fs::write(root.join("a (1).txt"), b"x").unwrap();
assert_eq!(next_available_name(&root, "a.txt"), "a (2).txt");
std::fs::write(root.join(".env"), b"x").unwrap();
assert_eq!(next_available_name(&root, ".env"), ".env (1)");
std::fs::remove_dir_all(&root).ok();
}
}