use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::Semaphore;
use super::adapter::{BlobAdapter, BlobByteStream};
use super::blob_ref::BlobRef;
use super::error::BlobError;
pub const FS_STREAM_CHUNK_BYTES: usize = 256 * 1024;
fn expect_small(blob_ref: &BlobRef) -> Result<([u8; 32], &str), BlobError> {
match blob_ref {
BlobRef::Small { hash, uri, .. } => Ok((*hash, uri.as_str())),
BlobRef::Manifest { .. } | BlobRef::Tree { .. } => Err(BlobError::Backend(
"FileSystemAdapter operates on Small blobs only; \
chunked blobs are handled by the layer above"
.to_owned(),
)),
}
}
pub const DEFAULT_FS_ADAPTER_CONCURRENCY: usize = 64;
#[derive(Debug, Clone)]
pub struct FileSystemAdapter {
id: String,
root: PathBuf,
concurrency: Arc<Semaphore>,
}
impl FileSystemAdapter {
pub fn new(id: impl Into<String>, root: impl Into<PathBuf>) -> Self {
Self {
id: id.into(),
root: root.into(),
concurrency: Arc::new(Semaphore::new(DEFAULT_FS_ADAPTER_CONCURRENCY)),
}
}
pub fn with_concurrency(mut self, cap: usize) -> Self {
self.concurrency = Arc::new(Semaphore::new(cap.max(1)));
self
}
fn path_for(&self, hash: &[u8; 32]) -> PathBuf {
let mut hex = String::with_capacity(64);
for b in hash {
use std::fmt::Write;
let _ = write!(hex, "{:02x}", b);
}
let shard = &hex[..2];
self.root.join(shard).join(&hex)
}
}
fn backend(e: impl std::fmt::Display) -> BlobError {
BlobError::Backend(e.to_string())
}
fn sanitize_uri_for_error(uri: &str) -> String {
const MAX_LEN: usize = 256;
let (trimmed, truncated) = if uri.len() > MAX_LEN {
let cut = (0..=MAX_LEN)
.rev()
.find(|&i| uri.is_char_boundary(i))
.unwrap_or(0);
(&uri[..cut], true)
} else {
(uri, false)
};
let mut out = String::with_capacity(trimmed.len());
for c in trimmed.chars() {
if c.is_control() {
out.push_str(&format!("\\x{:02X}", c as u32));
} else {
out.push(c);
}
}
if truncated {
out.push('…');
}
out
}
#[async_trait]
impl BlobAdapter for FileSystemAdapter {
fn adapter_id(&self) -> &str {
&self.id
}
fn accepted_schemes(&self) -> &[&str] {
&["file"]
}
async fn store(&self, blob_ref: &BlobRef, bytes: &[u8]) -> Result<(), BlobError> {
let (expected_hash, _uri) = expect_small(blob_ref)?;
let computed: [u8; 32] = blake3::hash(bytes).into();
if computed != expected_hash {
return Err(BlobError::HashMismatch {
expected: expected_hash,
actual: computed,
});
}
let path = self.path_for(&expected_hash);
let root = self.root.clone();
let bytes = bytes.to_vec();
let _permit = self
.concurrency
.clone()
.acquire_owned()
.await
.map_err(|_| backend("adapter concurrency semaphore closed"))?;
tokio::task::spawn_blocking(move || -> Result<(), BlobError> {
let _permit = _permit; if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(backend)?;
let parent_canon = std::fs::canonicalize(parent).map_err(backend)?;
let root_canon = std::fs::canonicalize(&root).map_err(backend)?;
if !parent_canon.starts_with(&root_canon) {
return Err(BlobError::Backend(format!(
"fs adapter: shard dir escapes root (parent={:?} root={:?})",
parent_canon, root_canon,
)));
}
}
use std::sync::atomic::{AtomicU64, Ordering};
static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let mut tmp = path.clone();
let mut name = tmp
.file_name()
.ok_or_else(|| backend("path has no file name"))?
.to_owned();
name.push(format!(".{}-{}-{}.tmp", std::process::id(), counter, nanos));
tmp.set_file_name(name);
{
use std::io::Write;
let mut f = std::fs::File::create(&tmp).map_err(backend)?;
f.write_all(&bytes).map_err(backend)?;
f.sync_all().map_err(backend)?;
}
match std::fs::rename(&tmp, &path) {
Ok(()) => {}
Err(rename_err) => {
let existing = match std::fs::read(&path) {
Ok(buf) => buf,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
let _ = std::fs::remove_file(&tmp);
return Err(backend(rename_err));
}
Err(e) => {
let _ = std::fs::remove_file(&tmp);
return Err(backend(e));
}
};
let existing_hash: [u8; 32] = blake3::hash(&existing).into();
let _ = std::fs::remove_file(&tmp);
if existing_hash != expected_hash {
return Err(BlobError::Backend(format!(
"fs adapter: canonical path exists with mismatched content \
(rename err: {})",
rename_err
)));
}
}
}
if let Some(parent) = path.parent() {
if let Ok(dir) = std::fs::File::open(parent) {
let _ = dir.sync_all();
}
}
Ok(())
})
.await
.map_err(|e| backend(format!("join error: {}", e)))?
}
async fn fetch(&self, blob_ref: &BlobRef) -> Result<Bytes, BlobError> {
let (hash, uri) = expect_small(blob_ref)?;
let path = self.path_for(&hash);
let uri = sanitize_uri_for_error(uri);
let _permit = self
.concurrency
.clone()
.acquire_owned()
.await
.map_err(|_| backend("adapter concurrency semaphore closed"))?;
tokio::task::spawn_blocking(move || -> Result<Bytes, BlobError> {
let _permit = _permit;
match std::fs::read(&path) {
Ok(bytes) => Ok(Bytes::from(bytes)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(BlobError::NotFound(uri)),
Err(e) => Err(backend(e)),
}
})
.await
.map_err(|e| backend(format!("join error: {}", e)))?
}
async fn fetch_range(&self, blob_ref: &BlobRef, range: Range<u64>) -> Result<Bytes, BlobError> {
if range.start > range.end {
return Err(backend(format!(
"range.start ({}) > range.end ({})",
range.start, range.end
)));
}
let len = range.end.saturating_sub(range.start);
if len == 0 {
return Ok(Bytes::new());
}
if len > usize::MAX as u64 {
return Err(backend(format!(
"range length {} exceeds usize::MAX on this target",
len
)));
}
let (hash, uri) = expect_small(blob_ref)?;
let path = self.path_for(&hash);
let uri = sanitize_uri_for_error(uri);
let start = range.start;
let _permit = self
.concurrency
.clone()
.acquire_owned()
.await
.map_err(|_| backend("adapter concurrency semaphore closed"))?;
tokio::task::spawn_blocking(move || -> Result<Bytes, BlobError> {
let _permit = _permit;
let mut f = match std::fs::File::open(&path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Err(BlobError::NotFound(uri))
}
Err(e) => return Err(backend(e)),
};
f.seek(SeekFrom::Start(start)).map_err(backend)?;
let mut buf = vec![0u8; len as usize];
f.read_exact(&mut buf).map_err(backend)?;
Ok(Bytes::from(buf))
})
.await
.map_err(|e| backend(format!("join error: {}", e)))?
}
async fn exists(&self, blob_ref: &BlobRef) -> Result<bool, BlobError> {
let (hash, _uri) = expect_small(blob_ref)?;
let path = self.path_for(&hash);
let permit = self
.concurrency
.clone()
.acquire_owned()
.await
.map_err(|_| backend("adapter concurrency semaphore closed"))?;
let res = tokio::task::spawn_blocking(move || {
let _permit = permit;
Path::new(&path).is_file()
})
.await
.map_err(|e| backend(format!("join error: {}", e)))?;
Ok(res)
}
async fn fetch_stream(&self, blob_ref: &BlobRef) -> Result<BlobByteStream, BlobError> {
let (hash, uri) = expect_small(blob_ref)?;
let path = self.path_for(&hash);
let uri = sanitize_uri_for_error(uri);
let permit = self
.concurrency
.clone()
.acquire_owned()
.await
.map_err(|_| backend("adapter concurrency semaphore closed"))?;
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, BlobError>>(4);
tokio::task::spawn_blocking(move || {
let _permit = permit;
let mut f = match std::fs::File::open(&path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
let _ = tx.blocking_send(Err(BlobError::NotFound(uri)));
return;
}
Err(e) => {
let _ = tx.blocking_send(Err(backend(e)));
return;
}
};
let mut buf = vec![0u8; FS_STREAM_CHUNK_BYTES];
loop {
let n = match f.read(&mut buf) {
Ok(0) => return,
Ok(n) => n,
Err(e) => {
let _ = tx.blocking_send(Err(backend(e)));
return;
}
};
let chunk = Bytes::copy_from_slice(&buf[..n]);
if tx.blocking_send(Ok(chunk)).is_err() {
return;
}
}
});
let stream = futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
});
Ok(Box::pin(stream))
}
async fn delete(&self, blob_ref: &BlobRef) -> Result<(), BlobError> {
let (hash, uri) = expect_small(blob_ref)?;
let path = self.path_for(&hash);
let uri = sanitize_uri_for_error(uri);
let permit = self
.concurrency
.clone()
.acquire_owned()
.await
.map_err(|_| backend("adapter concurrency semaphore closed"))?;
tokio::task::spawn_blocking(move || -> Result<(), BlobError> {
let _permit = permit;
match std::fs::remove_file(&path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
let _ = uri;
Ok(())
}
Err(e) => Err(backend(e)),
}
})
.await
.map_err(|e| backend(format!("join error: {}", e)))?
}
async fn stat(&self, blob_ref: &BlobRef) -> Result<super::adapter::BlobStat, BlobError> {
let (hash, uri) = expect_small(blob_ref)?;
let path = self.path_for(&hash);
let uri = sanitize_uri_for_error(uri);
let permit = self
.concurrency
.clone()
.acquire_owned()
.await
.map_err(|_| backend("adapter concurrency semaphore closed"))?;
let advertised_size = blob_ref.size();
let advertised_encoding = blob_ref.encoding();
tokio::task::spawn_blocking(move || -> Result<super::adapter::BlobStat, BlobError> {
let _permit = permit;
let meta = match std::fs::metadata(&path) {
Ok(m) => m,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Err(BlobError::NotFound(uri))
}
Err(e) => return Err(backend(e)),
};
let last_seen_unix_ms = meta
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_millis() as u64);
Ok(super::adapter::BlobStat {
size: advertised_size.max(meta.len()),
replicas_observed: 0,
replica_target: None,
last_seen_unix_ms,
encoding: advertised_encoding,
})
})
.await
.map_err(|e| backend(format!("join error: {}", e)))?
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
#[test]
fn sanitize_uri_handles_multibyte_at_boundary() {
let uri = format!("{}{}", "a".repeat(255), "🦀");
let out = sanitize_uri_for_error(&uri);
assert!(
out.ends_with('…'),
"expected truncation marker, got {:?}",
out
);
}
#[test]
fn sanitize_uri_preserves_short_input() {
let uri = "file:///🦀/path";
let out = sanitize_uri_for_error(uri);
assert_eq!(out, uri);
}
fn unique_root() -> PathBuf {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("net-blob-fs-test-{}-{}", std::process::id(), n))
}
fn make_ref(payload: &[u8], uri: &str) -> BlobRef {
let hash: [u8; 32] = blake3::hash(payload).into();
BlobRef::small(uri, hash, payload.len() as u64)
}
fn cleanup(root: &Path) {
let _ = std::fs::remove_dir_all(root);
}
#[tokio::test]
async fn store_fetch_round_trip() {
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-test", &root);
let payload = b"hello dataforts";
let blob = make_ref(payload, "file:///test/key");
adapter.store(&blob, payload).await.unwrap();
let fetched = adapter.fetch(&blob).await.unwrap();
assert_eq!(fetched.as_ref(), payload);
blob.verify(&fetched).unwrap();
cleanup(&root);
}
#[tokio::test]
async fn fetch_missing_returns_not_found() {
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-test", &root);
let blob = BlobRef::small("file:///ghost", [0xFF; 32], 0);
let err = adapter.fetch(&blob).await.unwrap_err();
match err {
BlobError::NotFound(uri) => assert_eq!(uri, "file:///ghost"),
other => panic!("expected NotFound, got {:?}", other),
}
cleanup(&root);
}
#[tokio::test]
async fn exists_true_only_after_store() {
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-test", &root);
let payload = b"x";
let blob = make_ref(payload, "file:///x");
assert!(!adapter.exists(&blob).await.unwrap());
adapter.store(&blob, payload).await.unwrap();
assert!(adapter.exists(&blob).await.unwrap());
cleanup(&root);
}
#[tokio::test]
async fn fetch_range_returns_slice() {
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-test", &root);
let payload: &[u8] = b"abcdefghij";
let blob = make_ref(payload, "file:///alphabet");
adapter.store(&blob, payload).await.unwrap();
let mid = adapter.fetch_range(&blob, 3..7).await.unwrap();
assert_eq!(mid.as_ref(), b"defg");
cleanup(&root);
}
#[tokio::test]
async fn fetch_range_empty_is_empty() {
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-test", &root);
let payload = b"data";
let blob = make_ref(payload, "file:///data");
adapter.store(&blob, payload).await.unwrap();
let empty = adapter.fetch_range(&blob, 2..2).await.unwrap();
assert!(empty.is_empty());
cleanup(&root);
}
#[tokio::test]
#[allow(clippy::reversed_empty_ranges)] async fn fetch_range_reversed_is_error() {
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-test", &root);
let blob = BlobRef::small("file:///r", [0x00; 32], 10);
let err = adapter.fetch_range(&blob, 5..3).await.unwrap_err();
assert!(matches!(err, BlobError::Backend(_)));
cleanup(&root);
}
#[tokio::test]
async fn store_rejects_mismatched_bytes_vs_hash() {
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-test", &root);
let real = b"real content";
let fake = b"fake content";
let real_hash: [u8; 32] = blake3::hash(real).into();
let blob = BlobRef::small("file:///impostor", real_hash, fake.len() as u64);
let err = adapter.store(&blob, fake).await.unwrap_err();
match err {
BlobError::HashMismatch { expected, actual } => {
assert_eq!(expected, real_hash);
assert_ne!(actual, real_hash);
}
other => panic!("expected HashMismatch, got {:?}", other),
}
assert!(!adapter.exists(&blob).await.unwrap());
cleanup(&root);
}
#[tokio::test]
async fn store_rejects_canonical_path_with_mismatched_content() {
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-toctou", &root);
let payload = b"the right content";
let hash: [u8; 32] = blake3::hash(payload).into();
let blob = BlobRef::small("file:///toctou", hash, payload.len() as u64);
let shard = format!("{:02x}", hash[0]);
let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
let canonical = root.join(&shard).join(&hex);
std::fs::create_dir_all(canonical.parent().unwrap()).unwrap();
std::fs::write(&canonical, b"GARBAGE-not-matching-hash").unwrap();
adapter.store(&blob, payload).await.unwrap();
let on_disk = std::fs::read(&canonical).unwrap();
assert_eq!(on_disk, payload, "canonical slot must hold correct bytes");
cleanup(&root);
}
#[cfg(unix)]
#[tokio::test]
async fn store_rejects_shard_dir_symlink_escape() {
let root = unique_root();
std::fs::create_dir_all(&root).unwrap();
let outside = root.parent().unwrap().join(format!(
"outside-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
std::fs::create_dir_all(&outside).unwrap();
let payload = b"escape-test";
let hash: [u8; 32] = blake3::hash(payload).into();
let shard = format!("{:02x}", hash[0]);
let shard_path = root.join(&shard);
std::os::unix::fs::symlink(&outside, &shard_path).unwrap();
let adapter = FileSystemAdapter::new("fs-symlink", &root);
let blob = BlobRef::small("file:///escape", hash, payload.len() as u64);
let err = adapter.store(&blob, payload).await.unwrap_err();
match err {
BlobError::Backend(msg) => assert!(
msg.contains("escapes root"),
"expected escape-root rejection; got {msg}"
),
other => panic!("expected Backend(escapes root), got {:?}", other),
}
let outside_contents: Vec<_> = std::fs::read_dir(&outside)
.unwrap()
.filter_map(Result::ok)
.collect();
assert!(
outside_contents.is_empty(),
"adapter wrote outside its root: {:?}",
outside_contents
);
cleanup(&root);
let _ = std::fs::remove_dir_all(&outside);
}
#[tokio::test]
async fn fetch_stream_yields_multi_chunk_for_large_blobs() {
use futures::StreamExt;
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-stream", &root);
let payload = vec![0xCDu8; FS_STREAM_CHUNK_BYTES * 3 + 17];
let blob = make_ref(&payload, "file:///fs-stream");
adapter.store(&blob, &payload).await.unwrap();
let mut stream = adapter.fetch_stream(&blob).await.unwrap();
let mut chunks = 0usize;
let mut buf = Vec::with_capacity(payload.len());
while let Some(chunk) = stream.next().await {
let chunk = chunk.unwrap();
chunks += 1;
buf.extend_from_slice(&chunk);
}
assert!(
chunks >= 4,
"FS adapter must yield multiple chunks for a multi-chunk payload; got {}",
chunks
);
assert_eq!(buf, payload);
cleanup(&root);
}
#[tokio::test]
async fn fetch_stream_returns_not_found_on_missing_blob() {
use futures::StreamExt;
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-stream-miss", &root);
let blob = BlobRef::small("file:///ghost", [0xFF; 32], 0);
let mut stream = adapter.fetch_stream(&blob).await.unwrap();
let first = stream.next().await.expect("must yield NotFound chunk");
match first {
Err(BlobError::NotFound(_)) => {}
other => panic!("expected NotFound from fetch_stream, got {:?}", other),
}
cleanup(&root);
}
#[tokio::test]
async fn store_overwrites_atomically() {
let root = unique_root();
let adapter = FileSystemAdapter::new("fs-test", &root);
let p1 = b"version one";
let p2 = b"version two -- longer";
let blob_a = make_ref(p1, "file:///a");
adapter.store(&blob_a, p1).await.unwrap();
let blob_b = make_ref(p2, "file:///a");
adapter.store(&blob_b, p2).await.unwrap();
assert_eq!(adapter.fetch(&blob_a).await.unwrap().as_ref(), p1);
assert_eq!(adapter.fetch(&blob_b).await.unwrap().as_ref(), p2);
cleanup(&root);
}
}