use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use futures::StreamExt as _;
use http::StatusCode;
use sha2::{Digest, Sha256};
use super::blob::{Blob, BlobMeta};
use super::{BlobFuture, BlobStore, BlobStoreError, ByteStream, validate_key};
#[derive(Clone)]
pub struct SigningKey(Arc<Vec<u8>>);
impl std::fmt::Debug for SigningKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SigningKey")
.field("len", &self.0.len())
.finish()
}
}
impl SigningKey {
#[must_use]
pub fn new(bytes: Vec<u8>) -> Self {
Self(Arc::new(bytes))
}
#[must_use]
pub fn random() -> Self {
let mut bytes = vec![0u8; 32];
let a = uuid::Uuid::new_v4();
let b = uuid::Uuid::new_v4();
bytes[..16].copy_from_slice(a.as_bytes());
bytes[16..].copy_from_slice(b.as_bytes());
Self::new(bytes)
}
fn as_bytes(&self) -> &[u8] {
&self.0
}
}
#[derive(Clone, Debug)]
pub struct LocalBlobStore {
inner: Arc<LocalInner>,
}
#[derive(Debug)]
struct LocalInner {
provider_id: String,
root: PathBuf,
canonical_root: PathBuf,
mount_path: String,
default_expiry: Duration,
signing_key: SigningKey,
previous_signing_keys: Vec<SigningKey>,
}
impl LocalBlobStore {
pub fn new(
provider_id: impl Into<String>,
root: impl Into<PathBuf>,
mount_path: impl Into<String>,
default_expiry: Duration,
signing_key: SigningKey,
previous_signing_keys: Vec<SigningKey>,
) -> Result<Self, BlobStoreError> {
let mount_path = mount_path.into();
if !mount_path.starts_with('/') {
return Err(BlobStoreError::InvalidInput(format!(
"storage.local.mount_path must start with '/' (got {mount_path:?})"
)));
}
let root = root.into();
std::fs::create_dir_all(&root).map_err(BlobStoreError::io)?;
let canonical_root = std::fs::canonicalize(&root).map_err(BlobStoreError::io)?;
Ok(Self {
inner: Arc::new(LocalInner {
provider_id: provider_id.into(),
root,
canonical_root,
mount_path,
default_expiry,
signing_key,
previous_signing_keys,
}),
})
}
#[must_use]
pub fn mount_path(&self) -> &str {
&self.inner.mount_path
}
#[must_use]
pub fn signing_key(&self) -> SigningKey {
self.inner.signing_key.clone()
}
#[must_use]
pub fn root(&self) -> &Path {
&self.inner.root
}
async fn safe_path_for_key(&self, key: &str) -> Result<PathBuf, BlobStoreError> {
validate_key(key)?;
let target = self.inner.root.join(key);
let mut probe = target.clone();
let canon_existing = loop {
match tokio::fs::canonicalize(&probe).await {
Ok(p) => break p,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
if !probe.pop() {
return Err(BlobStoreError::io(
"storage root vanished while resolving blob key",
));
}
}
Err(err) => return Err(BlobStoreError::io(err)),
}
};
if !canon_existing.starts_with(&self.inner.canonical_root) {
return Err(BlobStoreError::PermissionDenied(
"blob key resolves outside storage root".into(),
));
}
Ok(target)
}
pub(crate) async fn get_with_meta(
&self,
key: &str,
) -> Result<(Bytes, Option<StoredBlobMeta>), BlobStoreError> {
let path = self.safe_path_for_key(key).await?;
let bytes = match tokio::fs::read(&path).await {
Ok(bytes) => Bytes::from(bytes),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Err(BlobStoreError::NotFound(key.to_owned()));
}
Err(err) => return Err(BlobStoreError::io(err)),
};
let meta = read_meta_sidecar(&path).await;
Ok((bytes, meta))
}
}
impl BlobStore for LocalBlobStore {
fn provider_id(&self) -> &str {
&self.inner.provider_id
}
fn put<'a>(
&'a self,
key: &'a str,
content_type: &'a str,
bytes: Bytes,
) -> BlobFuture<'a, Blob> {
Box::pin(async move {
let path = self.safe_path_for_key(key).await?;
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(BlobStoreError::io)?;
}
let etag = sha256_hex(&bytes);
let tmp_path = temp_sibling_path(&path);
if let Err(err) = tokio::fs::write(&tmp_path, &bytes).await {
let _ = tokio::fs::remove_file(&tmp_path).await;
return Err(BlobStoreError::io(err));
}
if let Err(err) = atomic_replace(&tmp_path, &path).await {
let _ = tokio::fs::remove_file(&tmp_path).await;
return Err(BlobStoreError::io(err));
}
if write_meta_sidecar(
&path,
&StoredBlobMeta {
content_type: content_type.to_owned(),
etag: Some(etag.clone()),
},
)
.await
.is_err()
{
drop_stale_sidecar(&path).await;
}
Ok(Blob {
provider_id: self.inner.provider_id.clone(),
key: key.to_owned(),
content_type: content_type.to_owned(),
byte_size: bytes.len() as u64,
etag: Some(etag),
})
})
}
fn put_stream<'a>(
&'a self,
key: &'a str,
content_type: &'a str,
mut data: ByteStream<'a>,
) -> BlobFuture<'a, Blob> {
Box::pin(async move {
use tokio::io::AsyncWriteExt as _;
let path = self.safe_path_for_key(key).await?;
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(BlobStoreError::io)?;
}
let tmp_path = temp_sibling_path(&path);
let result = async {
let mut file = tokio::fs::File::create(&tmp_path)
.await
.map_err(BlobStoreError::io)?;
let mut hasher = Sha256::new();
let mut byte_size: u64 = 0;
while let Some(chunk) = data.next().await {
let chunk = chunk?;
hasher.update(&chunk);
byte_size = byte_size.saturating_add(chunk.len() as u64);
file.write_all(&chunk).await.map_err(BlobStoreError::io)?;
}
file.flush().await.map_err(BlobStoreError::io)?;
Ok::<(u64, String), BlobStoreError>((byte_size, hex(hasher.finalize())))
}
.await;
match result {
Ok((byte_size, etag)) => {
if let Err(err) = atomic_replace(&tmp_path, &path).await {
let _ = tokio::fs::remove_file(&tmp_path).await;
return Err(BlobStoreError::io(err));
}
if write_meta_sidecar(
&path,
&StoredBlobMeta {
content_type: content_type.to_owned(),
etag: Some(etag.clone()),
},
)
.await
.is_err()
{
drop_stale_sidecar(&path).await;
}
Ok(Blob {
provider_id: self.inner.provider_id.clone(),
key: key.to_owned(),
content_type: content_type.to_owned(),
byte_size,
etag: Some(etag),
})
}
Err(err) => {
let _ = tokio::fs::remove_file(&tmp_path).await;
Err(err)
}
}
})
}
fn get<'a>(&'a self, key: &'a str) -> BlobFuture<'a, Bytes> {
Box::pin(async move {
let path = self.safe_path_for_key(key).await?;
match tokio::fs::read(&path).await {
Ok(bytes) => Ok(Bytes::from(bytes)),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
Err(BlobStoreError::NotFound(key.to_owned()))
}
Err(err) => Err(BlobStoreError::io(err)),
}
})
}
fn delete<'a>(&'a self, key: &'a str) -> BlobFuture<'a, ()> {
Box::pin(async move {
let path = self.safe_path_for_key(key).await?;
match tokio::fs::remove_file(&path).await {
Ok(()) => {}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => return Err(BlobStoreError::io(err)),
}
let _ = tokio::fs::remove_file(meta_sidecar_path(&path)).await;
Ok(())
})
}
fn head<'a>(&'a self, key: &'a str) -> BlobFuture<'a, Option<BlobMeta>> {
Box::pin(async move {
let path = self.safe_path_for_key(key).await?;
match tokio::fs::metadata(&path).await {
Ok(fs_meta) => {
let sidecar = read_meta_sidecar(&path).await;
Ok(Some(BlobMeta {
key: key.to_owned(),
content_type: sidecar.as_ref().map_or_else(
|| "application/octet-stream".to_owned(),
|m| m.content_type.clone(),
),
byte_size: fs_meta.len(),
etag: sidecar.and_then(|m| m.etag),
}))
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(BlobStoreError::io(err)),
}
})
}
fn presigned_url<'a>(&'a self, key: &'a str, expires_in: Duration) -> BlobFuture<'a, String> {
Box::pin(async move {
validate_key(key)?;
let expires_in = if expires_in.is_zero() {
self.inner.default_expiry
} else {
expires_in
};
let exp_at = SystemTime::now()
.checked_add(expires_in)
.unwrap_or(UNIX_EPOCH)
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
let signature = sign(self.inner.signing_key.as_bytes(), key, exp_at);
let encoded_key = encode_key_path(key);
let url = format!(
"{base}/{encoded_key}?exp={exp_at}&sig={signature}",
base = self.inner.mount_path.trim_end_matches('/'),
);
Ok(url)
})
}
}
#[must_use]
pub fn sign(key_bytes: &[u8], blob_key: &str, expires_at: u64) -> String {
use hmac::{Hmac, Mac};
let mut mac =
<Hmac<Sha256> as Mac>::new_from_slice(key_bytes).expect("HMAC accepts any key length");
mac.update(blob_key.as_bytes());
mac.update(b":");
mac.update(expires_at.to_string().as_bytes());
hex(mac.finalize().into_bytes())
}
pub fn verify(
signing_key: &[u8],
blob_key: &str,
expires_at: u64,
signature: &str,
) -> Result<(), BlobStoreError> {
let expected = sign(signing_key, blob_key, expires_at);
if !constant_time_eq(expected.as_bytes(), signature.as_bytes()) {
return Err(BlobStoreError::Signature("signature mismatch".into()));
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
if expires_at < now {
return Err(BlobStoreError::Signature("signed url expired".into()));
}
Ok(())
}
fn sha256_hex(bytes: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(bytes);
hex(hasher.finalize())
}
async fn atomic_replace(src: &std::path::Path, dst: &std::path::Path) -> std::io::Result<()> {
match tokio::fs::rename(src, dst).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
let backup = backup_sibling_path(dst);
tokio::fs::rename(dst, &backup).await?;
match tokio::fs::rename(src, dst).await {
Ok(()) => {
let _ = tokio::fs::remove_file(&backup).await;
Ok(())
}
Err(rename_err) => {
let _ = tokio::fs::rename(&backup, dst).await;
Err(rename_err)
}
}
}
Err(err) => Err(err),
}
}
fn backup_sibling_path(path: &std::path::Path) -> std::path::PathBuf {
let id = uuid::Uuid::new_v4().simple().to_string();
let mut name = path.file_name().map_or_else(
|| std::ffi::OsString::from("blob"),
std::ffi::OsStr::to_owned,
);
name.push(".bak.");
name.push(&id);
path.with_file_name(name)
}
fn temp_sibling_path(path: &std::path::Path) -> std::path::PathBuf {
let id = uuid::Uuid::new_v4().simple().to_string();
let mut name = path.file_name().map_or_else(
|| std::ffi::OsString::from("blob"),
std::ffi::OsStr::to_owned,
);
name.push(".tmp.");
name.push(&id);
path.with_file_name(name)
}
#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) struct StoredBlobMeta {
pub content_type: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub etag: Option<String>,
}
fn meta_sidecar_path(path: &std::path::Path) -> std::path::PathBuf {
let mut name = path.file_name().map_or_else(
|| std::ffi::OsString::from("blob"),
std::ffi::OsStr::to_owned,
);
name.push(".meta");
path.with_file_name(name)
}
#[allow(clippy::cognitive_complexity)]
async fn write_meta_sidecar(blob_path: &std::path::Path, meta: &StoredBlobMeta) -> Result<(), ()> {
use tokio::io::AsyncWriteExt as _;
let path = meta_sidecar_path(blob_path);
let bytes = match serde_json::to_vec(meta) {
Ok(b) => b,
Err(err) => {
tracing::warn!(error = %err, "failed to serialize blob metadata sidecar");
return Err(());
}
};
let tmp = temp_sibling_path(&path);
let mut file = match tokio::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp)
.await
{
Ok(f) => f,
Err(err) => {
tracing::warn!(
error = %err,
tmp = %tmp.display(),
"failed to create blob metadata sidecar temp file"
);
return Err(());
}
};
if let Err(err) = file.write_all(&bytes).await {
let _ = tokio::fs::remove_file(&tmp).await;
tracing::warn!(error = %err, "failed to write blob metadata sidecar bytes");
return Err(());
}
if let Err(err) = file.flush().await {
let _ = tokio::fs::remove_file(&tmp).await;
tracing::warn!(error = %err, "failed to flush blob metadata sidecar");
return Err(());
}
drop(file);
if let Err(err) = atomic_replace(&tmp, &path).await {
let _ = tokio::fs::remove_file(&tmp).await;
tracing::warn!(
error = %err,
sidecar = %path.display(),
"failed to commit blob metadata sidecar"
);
return Err(());
}
Ok(())
}
async fn drop_stale_sidecar(blob_path: &std::path::Path) {
let path = meta_sidecar_path(blob_path);
if let Err(err) = tokio::fs::remove_file(&path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
tracing::warn!(
error = %err,
sidecar = %path.display(),
"failed to clear stale blob metadata sidecar after sidecar-write failure"
);
}
}
async fn read_meta_sidecar(blob_path: &std::path::Path) -> Option<StoredBlobMeta> {
let path = meta_sidecar_path(blob_path);
let bytes = tokio::fs::read(&path).await.ok()?;
serde_json::from_slice(&bytes).ok()
}
fn encode_key_path(key: &str) -> String {
use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
const PATH_SEGMENT: &AsciiSet = &CONTROLS
.add(b' ')
.add(b'"')
.add(b'#')
.add(b'%')
.add(b'/')
.add(b'<')
.add(b'>')
.add(b'?')
.add(b'`')
.add(b'{')
.add(b'}')
.add(b'\\');
let mut result = String::with_capacity(key.len() + 16);
let mut first = true;
for segment in key.split('/') {
if !first {
result.push('/');
}
first = false;
result.extend(utf8_percent_encode(segment, PATH_SEGMENT));
}
result
}
fn hex<B: AsRef<[u8]>>(bytes: B) -> String {
let mut s = String::with_capacity(bytes.as_ref().len() * 2);
for b in bytes.as_ref() {
use std::fmt::Write as _;
let _ = write!(s, "{b:02x}");
}
s
}
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
use subtle::ConstantTimeEq;
a.ct_eq(b).into()
}
pub(crate) fn verify_with_rotation(
current: &SigningKey,
previous: &[SigningKey],
blob_key: &str,
expires_at: u64,
signature: &str,
) -> Result<(), BlobStoreError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
if expires_at < now {
return Err(BlobStoreError::Signature("signed url expired".into()));
}
let expected_current = sign(current.as_bytes(), blob_key, expires_at);
if constant_time_eq(expected_current.as_bytes(), signature.as_bytes()) {
return Ok(());
}
for prev in previous {
let expected = sign(prev.as_bytes(), blob_key, expires_at);
if constant_time_eq(expected.as_bytes(), signature.as_bytes()) {
return Ok(());
}
}
Err(BlobStoreError::Signature("signature mismatch".into()))
}
pub fn serve_router(store: &LocalBlobStore) -> axum::Router<crate::AppState> {
use axum::extract::{Path, Query};
use axum::response::IntoResponse;
#[derive(Debug, serde::Deserialize)]
struct SignedQuery {
exp: u64,
sig: String,
}
let store_for_route = store.clone();
let mount = format!("{}/{{*key}}", store.mount_path().trim_end_matches('/'));
let handler = move |Path(blob_key): Path<String>, Query(q): Query<SignedQuery>| {
let store = store_for_route.clone();
async move {
if let Err(err) = verify_with_rotation(
&store.inner.signing_key,
&store.inner.previous_signing_keys,
&blob_key,
q.exp,
&q.sig,
) {
return (StatusCode::FORBIDDEN, err.to_string()).into_response();
}
match store.get_with_meta(&blob_key).await {
Ok((bytes, meta)) => {
let content_type = meta
.map_or_else(|| "application/octet-stream".to_owned(), |m| m.content_type);
([(http::header::CONTENT_TYPE, content_type)], bytes).into_response()
}
Err(BlobStoreError::NotFound(_)) => {
(StatusCode::NOT_FOUND, "not found").into_response()
}
Err(err) => err.into_autumn_error().into_response(),
}
}
};
axum::Router::new().route(&mount, axum::routing::get(handler))
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use futures::stream;
fn temp_root() -> tempfile::TempDir {
tempfile::tempdir().unwrap()
}
fn store(root: &Path) -> LocalBlobStore {
LocalBlobStore::new(
"test",
root.to_path_buf(),
"/_blobs",
Duration::from_secs(60),
SigningKey::new(b"test-key".to_vec()),
vec![],
)
.unwrap()
}
#[tokio::test]
async fn put_get_round_trip() {
let dir = temp_root();
let s = store(dir.path());
let blob = s
.put("a/b.png", "image/png", Bytes::from_static(b"abc"))
.await
.unwrap();
assert_eq!(blob.byte_size, 3);
assert!(blob.etag.is_some());
let bytes = s.get("a/b.png").await.unwrap();
assert_eq!(&bytes[..], b"abc");
}
#[tokio::test]
async fn put_stream_round_trip() {
let dir = temp_root();
let s = store(dir.path());
let chunks: Vec<Result<Bytes, BlobStoreError>> = vec![
Ok(Bytes::from_static(b"hello, ")),
Ok(Bytes::from_static(b"world")),
];
let stream: ByteStream<'static> = Box::pin(stream::iter(chunks));
let blob = s
.put_stream("greet.txt", "text/plain", stream)
.await
.unwrap();
assert_eq!(blob.byte_size, 12);
let bytes = s.get("greet.txt").await.unwrap();
assert_eq!(&bytes[..], b"hello, world");
}
#[tokio::test]
async fn get_missing_returns_not_found() {
let dir = temp_root();
let s = store(dir.path());
let err = s.get("missing.txt").await.unwrap_err();
assert!(matches!(err, BlobStoreError::NotFound(_)));
}
#[tokio::test]
async fn delete_idempotent() {
let dir = temp_root();
let s = store(dir.path());
s.delete("nope").await.unwrap();
let _ = s
.put("k.txt", "text/plain", Bytes::from_static(b"x"))
.await
.unwrap();
s.delete("k.txt").await.unwrap();
assert!(matches!(
s.get("k.txt").await.unwrap_err(),
BlobStoreError::NotFound(_)
));
}
#[tokio::test]
async fn rejects_traversal_keys() {
let dir = temp_root();
let s = store(dir.path());
let err = s
.put("../escape.txt", "text/plain", Bytes::from_static(b"x"))
.await
.unwrap_err();
assert!(matches!(err, BlobStoreError::InvalidInput(_)));
}
#[test]
fn new_rejects_mount_path_without_leading_slash() {
let dir = temp_root();
let err = LocalBlobStore::new(
"test",
dir.path().to_path_buf(),
"_blobs", Duration::from_secs(60),
SigningKey::new(b"k".to_vec()),
vec![],
)
.unwrap_err();
assert!(matches!(err, BlobStoreError::InvalidInput(_)));
}
#[cfg(unix)]
#[tokio::test]
async fn rejects_keys_traversing_root_escaping_symlinks() {
use std::os::unix::fs::symlink;
let outside = tempfile::tempdir().unwrap();
std::fs::write(outside.path().join("secret"), b"do not read").unwrap();
let dir = temp_root();
symlink(outside.path(), dir.path().join("escape")).unwrap();
let s = store(dir.path());
let err = s.get("escape/secret").await.unwrap_err();
assert!(
matches!(err, BlobStoreError::PermissionDenied(_)),
"expected PermissionDenied, got {err:?}"
);
let err = s
.put(
"escape/leaked.txt",
"text/plain",
Bytes::from_static(b"oops"),
)
.await
.unwrap_err();
assert!(matches!(err, BlobStoreError::PermissionDenied(_)));
assert!(!outside.path().join("leaked.txt").exists());
}
#[cfg(unix)]
#[tokio::test]
async fn sidecar_write_does_not_follow_hostile_symlink() {
use std::os::unix::fs::symlink;
let outside = tempfile::tempdir().unwrap();
let target = outside.path().join("untouchable");
std::fs::write(&target, b"original-contents").unwrap();
let dir = temp_root();
let s = store(dir.path());
let sidecar_path = dir.path().join("victim.bin.meta");
symlink(&target, &sidecar_path).unwrap();
s.put("victim.bin", "image/png", Bytes::from_static(b"pixels"))
.await
.unwrap();
assert_eq!(std::fs::read(&target).unwrap(), b"original-contents");
}
#[test]
fn signature_round_trip() {
let key = b"k";
let sig = sign(key, "blob/1.png", 99);
verify(key, "blob/1.png", u64::MAX / 2, &sig).unwrap_err();
let exp = SystemTime::now()
.checked_add(Duration::from_secs(60))
.unwrap()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let sig = sign(key, "blob/1.png", exp);
verify(key, "blob/1.png", exp, &sig).unwrap();
}
#[test]
fn signature_rejects_wrong_key() {
let exp = SystemTime::now()
.checked_add(Duration::from_secs(60))
.unwrap()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let sig = sign(b"alpha", "blob/1.png", exp);
let err = verify(b"beta", "blob/1.png", exp, &sig).unwrap_err();
assert!(matches!(err, BlobStoreError::Signature(_)));
}
#[test]
fn signature_rejects_expired() {
let exp = 1; let sig = sign(b"k", "blob/1.png", exp);
let err = verify(b"k", "blob/1.png", exp, &sig).unwrap_err();
assert!(matches!(err, BlobStoreError::Signature(_)));
}
#[tokio::test]
async fn presigned_url_includes_signature_and_exp() {
let dir = temp_root();
let s = store(dir.path());
let url = s
.presigned_url("a/b.png", Duration::from_secs(120))
.await
.unwrap();
assert!(url.starts_with("/_blobs/a/b.png?exp="));
assert!(url.contains("&sig="));
}
#[tokio::test]
async fn presigned_url_percent_encodes_reserved_chars() {
let dir = temp_root();
let s = store(dir.path());
let url = s
.presigned_url("user 1/note#1.png", Duration::from_secs(120))
.await
.unwrap();
assert!(
url.starts_with("/_blobs/user%201/note%231.png?exp="),
"unexpected URL: {url}"
);
assert!(url.contains("&sig="));
}
#[tokio::test]
async fn put_stream_cleans_up_partial_file_on_error() {
use futures::stream;
let dir = temp_root();
let s = store(dir.path());
let chunks: Vec<Result<Bytes, BlobStoreError>> = vec![
Ok(Bytes::from_static(b"first")),
Err(BlobStoreError::Backend("boom".into())),
];
let stream: ByteStream<'static> = Box::pin(stream::iter(chunks));
let err = s
.put_stream("interrupted.bin", "application/octet-stream", stream)
.await
.unwrap_err();
assert!(matches!(err, BlobStoreError::Backend(_)));
let path = dir.path().join("interrupted.bin");
assert!(!path.exists(), "partial blob was not cleaned up");
assert!(matches!(
s.get("interrupted.bin").await.unwrap_err(),
BlobStoreError::NotFound(_)
));
}
#[test]
fn encode_key_path_passes_segments_separately() {
assert_eq!(encode_key_path("foo"), "foo");
assert_eq!(encode_key_path("a/b/c"), "a/b/c");
assert_eq!(encode_key_path("a b/c?d"), "a%20b/c%3Fd");
assert_eq!(encode_key_path("hash#frag/q"), "hash%23frag/q");
assert_eq!(encode_key_path(""), "");
assert_eq!(encode_key_path("a/"), "a/");
assert_eq!(encode_key_path("/b"), "/b");
assert_eq!(encode_key_path("🚀/path"), "%F0%9F%9A%80/path");
}
#[tokio::test]
async fn put_replaces_atomically() {
let dir = temp_root();
let s = store(dir.path());
s.put(
"k.bin",
"application/octet-stream",
Bytes::from_static(b"first"),
)
.await
.unwrap();
s.put(
"k.bin",
"application/octet-stream",
Bytes::from_static(b"second"),
)
.await
.unwrap();
assert_eq!(&s.get("k.bin").await.unwrap()[..], b"second");
let entries: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(Result::ok)
.map(|e| e.file_name().to_string_lossy().into_owned())
.collect();
assert!(
!entries.iter().any(|n| n.contains(".tmp.")),
"temp file leaked: {entries:?}"
);
}
#[tokio::test]
async fn atomic_replace_overwrites_existing_destination() {
let dir = temp_root();
let dst = dir.path().join("target.bin");
tokio::fs::write(&dst, b"old").await.unwrap();
let tmp = dir.path().join("staging.tmp");
tokio::fs::write(&tmp, b"new").await.unwrap();
atomic_replace(&tmp, &dst).await.unwrap();
assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"new");
assert!(!tmp.exists(), "temp file should be consumed by rename");
}
#[tokio::test]
async fn atomic_replace_creates_new_destination() {
let dir = temp_root();
let dst = dir.path().join("fresh.bin");
let tmp = dir.path().join("staging.tmp");
tokio::fs::write(&tmp, b"hello").await.unwrap();
atomic_replace(&tmp, &dst).await.unwrap();
assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"hello");
}
#[tokio::test]
async fn atomic_replace_propagates_io_errors() {
let dir = temp_root();
let tmp = dir.path().join("missing.tmp"); let dst = dir.path().join("target.bin");
let err = atomic_replace(&tmp, &dst).await.unwrap_err();
assert!(
matches!(err.kind(), std::io::ErrorKind::NotFound),
"expected NotFound, got {:?}",
err.kind()
);
}
#[test]
fn temp_sibling_path_keeps_parent_directory() {
let original = std::path::Path::new("/var/lib/blobs/avatars/me.png");
let tmp = temp_sibling_path(original);
assert_eq!(tmp.parent(), original.parent());
let name = tmp.file_name().unwrap().to_string_lossy();
assert!(name.starts_with("me.png.tmp."));
}
#[test]
fn backup_sibling_path_keeps_parent_directory() {
let original = std::path::Path::new("/var/lib/blobs/avatars/me.png");
let backup = backup_sibling_path(original);
assert_eq!(backup.parent(), original.parent());
let name = backup.file_name().unwrap().to_string_lossy();
assert!(name.starts_with("me.png.bak."));
}
#[test]
fn meta_sidecar_path_appends_meta_suffix() {
let blob = std::path::Path::new("/var/lib/blobs/avatars/me.png");
let sidecar = meta_sidecar_path(blob);
assert_eq!(sidecar.parent(), blob.parent());
assert_eq!(sidecar.file_name().unwrap(), "me.png.meta");
}
#[tokio::test]
async fn put_persists_content_type_for_head_and_serve() {
let dir = temp_root();
let s = store(dir.path());
let blob = s
.put("a/b.png", "image/png", Bytes::from_static(b"abc"))
.await
.unwrap();
assert_eq!(blob.content_type, "image/png");
let meta = s.head("a/b.png").await.unwrap().expect("blob exists");
assert_eq!(meta.content_type, "image/png");
assert!(meta.etag.is_some(), "etag should round-trip via sidecar");
}
#[tokio::test]
async fn delete_cleans_up_meta_sidecar() {
let dir = temp_root();
let s = store(dir.path());
s.put("k.png", "image/png", Bytes::from_static(b"x"))
.await
.unwrap();
let resolved = s.safe_path_for_key("k.png").await.unwrap();
assert!(meta_sidecar_path(&resolved).exists());
s.delete("k.png").await.unwrap();
assert!(!meta_sidecar_path(&resolved).exists());
}
#[tokio::test]
async fn delete_keeps_sidecar_when_blob_remove_fails() {
let dir = temp_root();
let s = store(dir.path());
let blob_path = dir.path().join("pinned.bin");
tokio::fs::create_dir(&blob_path).await.unwrap();
let sidecar = meta_sidecar_path(&blob_path);
tokio::fs::write(&sidecar, br#"{"content_type":"image/png"}"#)
.await
.unwrap();
assert!(blob_path.is_dir());
assert!(sidecar.is_file());
let result = s.delete("pinned.bin").await;
assert!(
result.is_err(),
"expected error: blob path is a directory, remove_file should fail"
);
assert!(
sidecar.exists(),
"sidecar must survive a failed blob delete"
);
assert!(blob_path.is_dir());
}
#[tokio::test]
async fn head_falls_back_to_octet_stream_without_sidecar() {
let dir = temp_root();
let s = store(dir.path());
let path = dir.path().join("legacy.bin");
tokio::fs::write(&path, b"raw").await.unwrap();
let meta = s.head("legacy.bin").await.unwrap().expect("blob exists");
assert_eq!(meta.content_type, "application/octet-stream");
assert_eq!(meta.byte_size, 3);
assert!(meta.etag.is_none());
}
#[tokio::test]
async fn drop_stale_sidecar_removes_existing_metadata() {
let dir = temp_root();
let blob = dir.path().join("victim.bin");
let sidecar = meta_sidecar_path(&blob);
tokio::fs::write(&sidecar, br#"{"content_type":"image/png"}"#)
.await
.unwrap();
assert!(sidecar.exists());
drop_stale_sidecar(&blob).await;
assert!(!sidecar.exists());
drop_stale_sidecar(&blob).await;
}
#[tokio::test]
async fn read_meta_sidecar_handles_missing_and_malformed() {
let dir = temp_root();
let blob_path = dir.path().join("absent.bin");
assert!(read_meta_sidecar(&blob_path).await.is_none());
let blob_path = dir.path().join("malformed.bin");
tokio::fs::write(meta_sidecar_path(&blob_path), b"not json")
.await
.unwrap();
assert!(read_meta_sidecar(&blob_path).await.is_none());
}
#[tokio::test]
async fn get_with_meta_returns_bytes_plus_sidecar_metadata() {
let dir = temp_root();
let s = store(dir.path());
s.put(
"doc.pdf",
"application/pdf",
Bytes::from_static(b"%PDF-1.4"),
)
.await
.unwrap();
let (bytes, meta) = s.get_with_meta("doc.pdf").await.unwrap();
assert_eq!(&bytes[..], b"%PDF-1.4");
let m = meta.expect("sidecar should be present");
assert_eq!(m.content_type, "application/pdf");
assert!(m.etag.is_some());
}
#[tokio::test]
async fn atomic_replace_recovery_restores_dst_on_failure() {
let dir = temp_root();
let dst = dir.path().join("target.bin");
tokio::fs::write(&dst, b"old-blob").await.unwrap();
let backup = backup_sibling_path(&dst);
tokio::fs::rename(&dst, &backup).await.unwrap();
let err = tokio::fs::rename(dir.path().join("never.tmp"), &dst)
.await
.unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
tokio::fs::rename(&backup, &dst).await.unwrap();
assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"old-blob");
}
#[test]
fn encode_key_path_does_not_skip_leading_slash() {
let key = "/some/key";
let encoded = encode_key_path(key);
assert_eq!(encoded, "/some/key");
}
#[test]
fn sha256_hex_computes_correct_hash() {
assert_eq!(
sha256_hex(b""),
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
);
}
#[test]
fn hex_computes_correct_hex() {
assert_eq!(hex(b"xyz"), "78797a");
}
#[test]
fn verify_rejects_empty_signature() {
let key = b"secret";
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let expires = now + 3600;
let result = verify(key, "blob", expires, "");
assert!(matches!(result, Err(BlobStoreError::Signature(_))));
}
#[tokio::test]
async fn safe_path_for_key_rejects_missing_directory() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().to_path_buf();
let s2 = store(&path);
dir.close().unwrap();
let err = s2.safe_path_for_key("some_blob").await.unwrap_err();
assert!(
matches!(
err,
BlobStoreError::Io(_) | BlobStoreError::PermissionDenied(_)
),
"Unexpected error: {err:?}"
);
}
#[tokio::test]
async fn get_with_meta_propagates_io_errors() {
let dir = temp_root();
let s = store(dir.path());
let path = dir.path().join("some_blob");
tokio::fs::create_dir(&path).await.unwrap();
let Err(err) = s.get_with_meta("some_blob").await else {
panic!("Expected error");
};
assert!(matches!(err, BlobStoreError::Io(_)));
}
#[tokio::test]
async fn drop_stale_sidecar_is_idempotent() {
let dir = temp_root();
let blob = dir.path().join("victim.bin");
let sidecar = meta_sidecar_path(&blob);
drop_stale_sidecar(&blob).await;
drop_stale_sidecar(&blob).await;
assert!(!sidecar.exists());
}
#[tokio::test]
async fn drop_stale_sidecar_ignores_and_survives_non_not_found_errors() {
let dir = temp_root();
let blob = dir.path().join("victim.bin");
let sidecar = meta_sidecar_path(&blob);
tokio::fs::create_dir(&sidecar).await.unwrap();
drop_stale_sidecar(&blob).await;
assert!(sidecar.exists()); }
#[tokio::test]
async fn get_with_meta_when_meta_missing_returns_none() {
let dir = temp_root();
let s = store(dir.path());
let path = dir.path().join("blob.bin");
tokio::fs::write(&path, b"data").await.unwrap();
let (bytes, meta) = s.get_with_meta("blob.bin").await.unwrap();
assert_eq!(&bytes[..], b"data");
assert!(meta.is_none());
}
#[tokio::test]
async fn atomic_replace_handles_already_exists_on_backup_rename() {
let dir = temp_root();
let dst = dir.path().join("target.bin");
tokio::fs::write(&dst, b"old").await.unwrap();
let tmp = dir.path().join("staging.tmp");
tokio::fs::write(&tmp, b"new").await.unwrap();
let backup = backup_sibling_path(&dst);
tokio::fs::write(&backup, b"interloper").await.unwrap();
atomic_replace(&tmp, &dst).await.unwrap();
assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"new");
}
#[tokio::test]
async fn put_rejects_missing_directory_to_trigger_io_error() {
let missing = tempfile::tempdir().unwrap().path().to_path_buf();
let s2 = store(&missing);
std::fs::remove_dir_all(&missing).unwrap();
let err = s2
.put("some_blob", "text/plain", bytes::Bytes::from_static(b"xyz"))
.await
.unwrap_err();
assert!(matches!(
err,
BlobStoreError::Io(_) | BlobStoreError::PermissionDenied(_)
));
}
#[tokio::test]
async fn delete_rejects_missing_directory_to_trigger_io_error() {
let missing = tempfile::tempdir().unwrap().path().to_path_buf();
let s2 = store(&missing);
std::fs::remove_dir_all(&missing).unwrap();
let err = s2.delete("some_blob").await.unwrap_err();
assert!(matches!(
err,
BlobStoreError::Io(_) | BlobStoreError::PermissionDenied(_)
));
}
#[tokio::test]
async fn head_rejects_missing_directory_to_trigger_io_error() {
let missing = tempfile::tempdir().unwrap().path().to_path_buf();
let s2 = store(&missing);
std::fs::remove_dir_all(&missing).unwrap();
let err = s2.head("some_blob").await.unwrap_err();
assert!(matches!(
err,
BlobStoreError::Io(_) | BlobStoreError::PermissionDenied(_)
));
}
#[test]
fn provider_id_is_local() {
let dir = temp_root();
let s = LocalBlobStore::new(
"local_test_id",
dir.path().to_path_buf(),
"/mnt",
std::time::Duration::from_secs(3600),
SigningKey::random(),
vec![],
)
.unwrap();
assert_eq!(s.provider_id(), "local_test_id");
}
#[test]
fn signing_key_debug_does_not_leak_material() {
let key = SigningKey::new(b"super-secret".to_vec());
let dbg = format!("{key:?}");
assert!(!dbg.contains("super-secret"));
assert!(dbg.contains("len"));
}
#[tokio::test]
async fn blob_url_signed_with_previous_key_still_verifies() {
let dir = temp_root();
let old_key = SigningKey::new(b"old-key-32-bytes-xxxxxxxxxxxxxxx".to_vec());
let new_key = SigningKey::new(b"new-key-32-bytes-xxxxxxxxxxxxxxx".to_vec());
let store = LocalBlobStore::new(
"test",
dir.path(),
"/_blobs",
Duration::from_secs(60),
new_key,
vec![old_key.clone()],
)
.unwrap();
store
.put("a/b.txt", "text/plain", bytes::Bytes::from_static(b"hi"))
.await
.unwrap();
let exp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
+ 3600;
let old_sig = sign(old_key.as_bytes(), "a/b.txt", exp);
assert!(
verify_with_rotation(
&store.inner.signing_key,
&store.inner.previous_signing_keys,
"a/b.txt",
exp,
&old_sig
)
.is_ok(),
"old-key signed URL must verify during grace window"
);
}
#[test]
fn blob_url_expired_with_previous_key_still_rejects() {
let old_key = SigningKey::new(b"old-key-32-bytes-xxxxxxxxxxxxxxx".to_vec());
let new_key = SigningKey::new(b"new-key-32-bytes-xxxxxxxxxxxxxxx".to_vec());
let expired_exp = 1u64; let old_sig = sign(old_key.as_bytes(), "a/b.txt", expired_exp);
let result = verify_with_rotation(&new_key, &[old_key], "a/b.txt", expired_exp, &old_sig);
assert!(
result.is_err(),
"expired URL must be rejected even with valid previous key"
);
}
}