use std::collections::BTreeSet;
use std::path::{Component, Path, PathBuf};
use std::sync::Arc;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use super::blob::{
chunk_payload, BlobAdapter, BlobError, BlobRef, Encoding, MeshBlobAdapter,
BLOB_CHUNK_SIZE_BYTES,
};
use crate::adapter::net::MeshNode;
pub const DIR_MANIFEST_VERSION: u8 = 1;
pub const DEFAULT_FETCH_CONCURRENCY: usize = 16;
const BLOCKING_FS_THRESHOLD: u64 = 256 * 1024;
pub const DEFAULT_INFLIGHT_BUDGET_BYTES: usize = 8 * 1024 * 1024;
#[derive(Debug)]
pub enum DirError {
Io(std::io::Error),
Blob(BlobError),
UnsafePath(String),
Manifest(String),
}
impl std::fmt::Display for DirError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(e) => write!(f, "dir transfer io: {e}"),
Self::Blob(e) => write!(f, "dir transfer blob: {e}"),
Self::UnsafePath(p) => write!(f, "dir transfer: unsafe manifest path {p:?}"),
Self::Manifest(m) => write!(f, "dir transfer: bad manifest: {m}"),
}
}
}
impl std::error::Error for DirError {}
impl From<std::io::Error> for DirError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
impl From<BlobError> for DirError {
fn from(e: BlobError) -> Self {
Self::Blob(e)
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct DirEntry {
pub path: String,
pub kind: EntryKind,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum EntryKind {
File {
mode: u32,
blob: Vec<u8>,
},
Dir {
mode: u32,
},
Symlink {
target: String,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct DirManifest {
pub version: u8,
pub entries: Vec<DirEntry>,
}
impl DirManifest {
pub fn file_count(&self) -> usize {
self.entries
.iter()
.filter(|e| matches!(e.kind, EntryKind::File { .. }))
.count()
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct DirStats {
pub files: usize,
pub dirs: usize,
pub symlinks: usize,
pub bytes: u64,
}
pub async fn store_dir(adapter: &MeshBlobAdapter, root: &Path) -> Result<BlobRef, DirError> {
let mut entries: Vec<DirEntry> = Vec::new();
let root_buf = root.to_path_buf();
let mut raw = tokio::task::spawn_blocking(
move || -> Result<Vec<(String, std::fs::Metadata, PathBuf)>, DirError> {
let mut raw = Vec::new();
walk(&root_buf, &root_buf, &mut raw)?;
Ok(raw)
},
)
.await
.map_err(|e| DirError::Io(std::io::Error::other(e)))??;
raw.sort_by(|a, b| a.0.cmp(&b.0));
for (rel, meta, abs) in raw {
let file_type = meta.file_type();
if file_type.is_symlink() {
let target = std::fs::read_link(&abs)?;
entries.push(DirEntry {
path: rel,
kind: EntryKind::Symlink {
target: target.to_string_lossy().into_owned(),
},
});
} else if file_type.is_dir() {
entries.push(DirEntry {
path: rel,
kind: EntryKind::Dir {
mode: mode_of(&meta),
},
});
} else if file_type.is_file() {
let bytes = if meta.len() > BLOCKING_FS_THRESHOLD {
tokio::task::spawn_blocking(move || std::fs::read(&abs))
.await
.map_err(|e| DirError::Io(std::io::Error::other(e)))??
} else {
std::fs::read(&abs)?
};
let chunked = chunk_payload(&bytes)?;
let hash: [u8; 32] = blake3::hash(&bytes).into();
let uri = format!("mesh://{}", hex(&hash));
let blob_ref = chunked.into_blob_ref(uri, Encoding::Replicated)?;
adapter.store(&blob_ref, &bytes).await?;
entries.push(DirEntry {
path: rel,
kind: EntryKind::File {
mode: mode_of(&meta),
blob: blob_ref.encode(),
},
});
}
}
let manifest = DirManifest {
version: DIR_MANIFEST_VERSION,
entries,
};
let manifest_bytes =
postcard::to_allocvec(&manifest).map_err(|e| DirError::Manifest(format!("encode: {e}")))?;
let chunked = chunk_payload(&manifest_bytes)?;
let mhash: [u8; 32] = blake3::hash(&manifest_bytes).into();
let manifest_ref =
chunked.into_blob_ref(format!("mesh://{}", hex(&mhash)), Encoding::Replicated)?;
adapter.store(&manifest_ref, &manifest_bytes).await?;
Ok(manifest_ref)
}
fn walk(
root: &Path,
dir: &Path,
out: &mut Vec<(String, std::fs::Metadata, PathBuf)>,
) -> Result<(), DirError> {
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let abs = entry.path();
let meta = std::fs::symlink_metadata(&abs)?;
let rel = rel_path(root, &abs);
let is_dir_descend = meta.file_type().is_dir() && !meta.file_type().is_symlink();
out.push((rel, meta, abs.clone()));
if is_dir_descend {
walk(root, &abs, out)?;
}
}
Ok(())
}
fn rel_path(root: &Path, abs: &Path) -> String {
let rel = abs.strip_prefix(root).unwrap_or(abs);
let mut parts: Vec<String> = Vec::new();
for comp in rel.components() {
if let Component::Normal(c) = comp {
parts.push(c.to_string_lossy().into_owned());
}
}
parts.join("/")
}
#[cfg(unix)]
fn mode_of(meta: &std::fs::Metadata) -> u32 {
use std::os::unix::fs::PermissionsExt;
meta.permissions().mode()
}
#[cfg(not(unix))]
fn mode_of(_meta: &std::fs::Metadata) -> u32 {
0
}
pub async fn fetch_dir(
node: &Arc<MeshNode>,
source: u64,
manifest_ref: &BlobRef,
dest: &Path,
concurrency: usize,
) -> Result<DirStats, DirError> {
let manifest_bytes = transfer_fetch_blob(node, source, manifest_ref).await?;
let manifest: DirManifest = postcard::from_bytes(&manifest_bytes)
.map_err(|e| DirError::Manifest(format!("decode: {e}")))?;
if manifest.version != DIR_MANIFEST_VERSION {
return Err(DirError::Manifest(format!(
"unsupported manifest version {}",
manifest.version
)));
}
let dest = dest.to_path_buf();
let work = alloc_temp_dir(&dest).await?;
let stats = match reconstruct_tree(node, source, &manifest, &work, concurrency).await {
Ok(stats) => stats,
Err(e) => {
let work = work.clone();
let _ = tokio::task::spawn_blocking(move || std::fs::remove_dir_all(&work)).await;
return Err(e);
}
};
install_tree(work, dest).await?;
Ok(stats)
}
async fn reconstruct_tree(
node: &Arc<MeshNode>,
source: u64,
manifest: &DirManifest,
root: &Path,
concurrency: usize,
) -> Result<DirStats, DirError> {
let root = root.to_path_buf();
let mut stats = DirStats::default();
let mut want_dirs: BTreeSet<PathBuf> = BTreeSet::new();
for entry in &manifest.entries {
let safe = safe_join(&root, &entry.path)?;
match &entry.kind {
EntryKind::Dir { .. } => {
want_dirs.insert(safe);
}
EntryKind::File { .. } | EntryKind::Symlink { .. } => {
if let Some(parent) = safe.parent() {
want_dirs.insert(parent.to_path_buf());
}
}
}
}
let root_for_dirs = root.clone();
stats.dirs = tokio::task::spawn_blocking(move || -> Result<usize, DirError> {
std::fs::create_dir_all(&root_for_dirs)?;
let mut n = 0;
for dir in &want_dirs {
if !dir.exists() {
std::fs::create_dir_all(dir)?;
n += 1;
}
}
Ok(n)
})
.await
.map_err(|e| DirError::Io(std::io::Error::other(e)))??;
let concurrency = if concurrency == 0 {
DEFAULT_FETCH_CONCURRENCY
} else {
concurrency
};
let sem = Arc::new(tokio::sync::Semaphore::new(concurrency));
let budget = u32::try_from(DEFAULT_INFLIGHT_BUDGET_BYTES).unwrap_or(u32::MAX);
let byte_sem = Arc::new(tokio::sync::Semaphore::new(budget as usize));
let mut tasks = Vec::new();
for entry in &manifest.entries {
let EntryKind::File { mode, blob } = &entry.kind else {
continue;
};
let safe = safe_join(&root, &entry.path)?;
let blob_ref = BlobRef::decode(blob)
.map_err(DirError::Blob)?
.ok_or_else(|| DirError::Manifest(format!("entry {} has no blob ref", entry.path)))?;
let in_flight = blob_ref
.size()
.min(BLOB_CHUNK_SIZE_BYTES)
.min(budget as u64)
.max(1) as u32;
let node = node.clone();
let sem = sem.clone();
let byte_sem = byte_sem.clone();
let mode = *mode;
tasks.push(tokio::spawn(async move {
let _permit = sem.acquire().await.map_err(|_| {
DirError::Blob(BlobError::Backend("dir fetch: semaphore closed".into()))
})?;
let _bytes_permit = byte_sem.acquire_many(in_flight).await.map_err(|_| {
DirError::Blob(BlobError::Backend(
"dir fetch: byte semaphore closed".into(),
))
})?;
let bytes = transfer_fetch_blob(&node, source, &blob_ref).await?;
let len = bytes.len() as u64;
if len > BLOCKING_FS_THRESHOLD {
tokio::task::spawn_blocking(move || write_file(&safe, &bytes, mode))
.await
.map_err(|e| DirError::Io(std::io::Error::other(e)))??;
} else {
write_file(&safe, &bytes, mode)?;
}
Ok::<u64, DirError>(len)
}));
}
for task in tasks {
match task.await {
Ok(Ok(n)) => {
stats.files += 1;
stats.bytes += n;
}
Ok(Err(e)) => return Err(e),
Err(join) => {
return Err(DirError::Blob(BlobError::Backend(format!(
"dir fetch task panicked: {join}"
))))
}
}
}
let mut links: Vec<(String, PathBuf)> = Vec::new();
for entry in &manifest.entries {
if let EntryKind::Symlink { target } = &entry.kind {
links.push((target.clone(), safe_join(&root, &entry.path)?));
}
}
stats.symlinks = tokio::task::spawn_blocking(move || {
links
.into_iter()
.filter(|(target, safe)| make_symlink(target, safe).is_ok())
.count()
})
.await
.map_err(|e| DirError::Io(std::io::Error::other(e)))?;
Ok(stats)
}
fn unique_suffix() -> u64 {
use std::sync::atomic::{AtomicU64, Ordering};
static SEQ: AtomicU64 = AtomicU64::new(0);
let seq = SEQ.fetch_add(1, Ordering::Relaxed);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
seq ^ nanos.rotate_left(17) ^ (std::process::id() as u64).rotate_left(43)
}
async fn alloc_temp_dir(dest: &Path) -> Result<PathBuf, DirError> {
let parent = match dest.parent() {
Some(p) if !p.as_os_str().is_empty() => p.to_path_buf(),
_ => PathBuf::from("."),
};
let base = dest
.file_name()
.ok_or_else(|| DirError::UnsafePath(dest.to_string_lossy().into_owned()))?
.to_string_lossy()
.into_owned();
tokio::task::spawn_blocking(move || -> Result<PathBuf, DirError> {
std::fs::create_dir_all(&parent)?;
for _ in 0..8 {
let work = parent.join(format!(".{base}.fetch_{:016x}", unique_suffix()));
match std::fs::create_dir(&work) {
Ok(()) => return Ok(work),
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => continue,
Err(e) => return Err(DirError::Io(e)),
}
}
Err(DirError::Io(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
"fetch_dir: could not allocate a unique temp directory",
)))
})
.await
.map_err(|e| DirError::Io(std::io::Error::other(e)))?
}
async fn install_tree(work: PathBuf, dest: PathBuf) -> Result<(), DirError> {
tokio::task::spawn_blocking(move || -> Result<(), DirError> {
if !dest.exists() {
return std::fs::rename(&work, &dest).map_err(|e| {
let _ = std::fs::remove_dir_all(&work);
DirError::Io(e)
});
}
let parent = match dest.parent() {
Some(p) if !p.as_os_str().is_empty() => p.to_path_buf(),
_ => PathBuf::from("."),
};
let base = dest
.file_name()
.ok_or_else(|| DirError::UnsafePath(dest.to_string_lossy().into_owned()))?
.to_string_lossy()
.into_owned();
let mut backup = None;
for _ in 0..8 {
let cand = parent.join(format!(".{base}.replaced_{:016x}", unique_suffix()));
if !cand.exists() {
backup = Some(cand);
break;
}
}
let backup = backup.ok_or_else(|| {
DirError::Io(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
"fetch_dir: could not allocate a backup path",
))
})?;
std::fs::rename(&dest, &backup).map_err(DirError::Io)?;
if let Err(e) = std::fs::rename(&work, &dest) {
let _ = std::fs::rename(&backup, &dest);
let _ = std::fs::remove_dir_all(&work);
return Err(DirError::Io(e));
}
let _ = std::fs::remove_dir_all(&backup);
Ok(())
})
.await
.map_err(|e| DirError::Io(std::io::Error::other(e)))?
}
async fn transfer_fetch_blob(
node: &Arc<MeshNode>,
source: u64,
blob_ref: &BlobRef,
) -> Result<bytes::Bytes, DirError> {
match blob_ref {
BlobRef::Small { hash, .. } => Ok(node.transfer_fetch_chunk(source, *hash).await?),
BlobRef::Manifest { chunks, .. } => {
let mut buf = BytesMut::with_capacity(blob_ref.size() as usize);
for chunk in chunks {
let bytes = node.transfer_fetch_chunk(source, chunk.hash).await?;
buf.put_slice(&bytes);
}
Ok(buf.freeze())
}
BlobRef::Tree { .. } => Err(DirError::Blob(BlobError::Backend(
"dir transfer: BlobRef::Tree not supported by the directory wrapper".into(),
))),
}
}
fn safe_join(dest: &Path, rel: &str) -> Result<PathBuf, DirError> {
if rel.is_empty() {
return Err(DirError::UnsafePath(rel.to_owned()));
}
let mut out = dest.to_path_buf();
for comp in Path::new(rel).components() {
match comp {
Component::Normal(c) => out.push(c),
_ => return Err(DirError::UnsafePath(rel.to_owned())),
}
}
Ok(out)
}
fn write_file(path: &Path, bytes: &[u8], mode: u32) -> Result<(), DirError> {
std::fs::write(path, bytes)?;
apply_mode(path, mode)?;
Ok(())
}
#[cfg(unix)]
fn apply_mode(path: &Path, mode: u32) -> Result<(), DirError> {
if mode != 0 {
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(mode))?;
}
Ok(())
}
#[cfg(not(unix))]
fn apply_mode(_path: &Path, _mode: u32) -> Result<(), DirError> {
Ok(())
}
#[cfg(unix)]
fn make_symlink(target: &str, link: &Path) -> std::io::Result<()> {
std::os::unix::fs::symlink(target, link)
}
#[cfg(windows)]
fn make_symlink(target: &str, link: &Path) -> std::io::Result<()> {
std::os::windows::fs::symlink_file(target, link)
}
#[cfg(not(any(unix, windows)))]
fn make_symlink(_target: &str, _link: &Path) -> std::io::Result<()> {
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"symlinks unsupported on this platform",
))
}
fn hex(hash: &[u8; 32]) -> String {
let mut s = String::with_capacity(64);
for b in hash {
use std::fmt::Write as _;
let _ = write!(s, "{b:02x}");
}
s
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn safe_join_accepts_plain_relative_paths() {
let dest = Path::new("/tmp/dest");
let p = safe_join(dest, "a/b/c.txt").unwrap();
assert!(p.ends_with("a/b/c.txt") || p.ends_with("a\\b\\c.txt"));
}
#[test]
fn safe_join_rejects_escapes() {
let dest = Path::new("/tmp/dest");
assert!(safe_join(dest, "../escape").is_err());
assert!(safe_join(dest, "a/../../escape").is_err());
assert!(safe_join(dest, "/abs/path").is_err());
assert!(safe_join(dest, "").is_err());
}
#[test]
fn manifest_round_trips_through_postcard() {
let manifest = DirManifest {
version: DIR_MANIFEST_VERSION,
entries: vec![
DirEntry {
path: "dir".into(),
kind: EntryKind::Dir { mode: 0o755 },
},
DirEntry {
path: "dir/file.txt".into(),
kind: EntryKind::File {
mode: 0o644,
blob: BlobRef::small("mesh://x", [7u8; 32], 3).encode(),
},
},
DirEntry {
path: "link".into(),
kind: EntryKind::Symlink {
target: "dir/file.txt".into(),
},
},
],
};
let bytes = postcard::to_allocvec(&manifest).unwrap();
let decoded: DirManifest = postcard::from_bytes(&bytes).unwrap();
assert_eq!(decoded, manifest);
assert_eq!(decoded.file_count(), 1);
}
}