#![cfg(feature = "dataforts")]
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use net::adapter::net::dataforts::blob::{
chunk_payload, BlobAdapter, BlobRef, Encoding, MeshBlobAdapter,
};
use net::adapter::net::dataforts::dir::{
fetch_dir, store_dir, DirEntry, DirManifest, EntryKind, DIR_MANIFEST_VERSION,
};
use net::adapter::net::redex::Redex;
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const PSK: [u8; 32] = [0x42u8; 32];
const SOCKET_BUF: usize = 16 * 1024 * 1024;
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(15))
.with_handshake(3, Duration::from_secs(2));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: SOCKET_BUF,
recv_buffer_size: SOCKET_BUF,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let cfg = test_config();
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
async fn handshake(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id).await.expect("connect");
accept.await.expect("accept task").expect("accept");
a.start();
b.start();
}
struct TempDir(PathBuf);
impl TempDir {
fn new(tag: &str) -> Self {
static SEQ: AtomicU64 = AtomicU64::new(0);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let mut base = std::env::temp_dir();
base.push(format!(
"net-dir-xfer-{tag}-{}-{}-{}",
std::process::id(),
SEQ.fetch_add(1, Ordering::Relaxed),
nanos
));
std::fs::create_dir_all(&base).expect("create temp dir");
Self(base)
}
fn path(&self) -> &Path {
&self.0
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
fn read_tree(root: &Path) -> BTreeMap<String, Vec<u8>> {
let mut out = BTreeMap::new();
fn walk(root: &Path, dir: &Path, out: &mut BTreeMap<String, Vec<u8>>) {
for entry in std::fs::read_dir(dir).unwrap() {
let entry = entry.unwrap();
let abs = entry.path();
let meta = std::fs::symlink_metadata(&abs).unwrap();
if meta.is_dir() {
walk(root, &abs, out);
} else if meta.is_file() {
let rel = abs
.strip_prefix(root)
.unwrap()
.to_string_lossy()
.replace('\\', "/");
out.insert(rel, std::fs::read(&abs).unwrap());
}
}
}
walk(root, root, &mut out);
out
}
fn write(root: &Path, rel: &str, bytes: &[u8]) {
let p = root.join(rel);
std::fs::create_dir_all(p.parent().unwrap()).unwrap();
std::fs::write(p, bytes).unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn directory_transfer_reconstructs_tree() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(adapter_b);
let src = TempDir::new("src");
write(src.path(), "readme.txt", b"hello directory transfer");
write(
src.path(),
"a/one.bin",
&(0..5000u32).map(|i| i as u8).collect::<Vec<_>>(),
);
write(
src.path(),
"a/b/two.bin",
&(0..50_000u32).map(|i| (i % 251) as u8).collect::<Vec<_>>(),
);
write(src.path(), "a/b/empty.dat", b"");
std::fs::create_dir_all(src.path().join("c/empty_dir")).unwrap();
let manifest_ref = store_dir(&adapter_a, src.path()).await.expect("store_dir");
let dest = TempDir::new("dest");
let stats = fetch_dir(&node_b, a_id, &manifest_ref, dest.path(), 0)
.await
.expect("fetch_dir");
let want = read_tree(src.path());
let got = read_tree(dest.path());
assert_eq!(
got, want,
"reconstructed tree must match source byte-for-byte"
);
assert_eq!(stats.files, want.len(), "stats.files matches file count");
assert!(
dest.path().join("c/empty_dir").is_dir(),
"empty directory must be recreated"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn directory_transfer_many_small_files() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(adapter_b);
let src = TempDir::new("manysrc");
let n = 200usize; for i in 0..n {
let body = format!("file {i} contents — {}", "x".repeat(i % 64));
write(
src.path(),
&format!("pkg{}/mod{}.js", i % 12, i),
body.as_bytes(),
);
}
let manifest_ref = store_dir(&adapter_a, src.path()).await.expect("store_dir");
let dest = TempDir::new("manydest");
let stats = fetch_dir(&node_b, a_id, &manifest_ref, dest.path(), 0)
.await
.expect("fetch_dir");
assert_eq!(stats.files, n, "all {n} files transferred");
assert_eq!(read_tree(dest.path()), read_tree(src.path()), "trees match");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn directory_transfer_many_large_files() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(adapter_b);
let src = TempDir::new("biglots-src");
let n = 6usize;
for i in 0..n {
let body: Vec<u8> = (0..4 * 1024 * 1024usize)
.map(|j| ((j + i) % 251) as u8)
.collect();
write(src.path(), &format!("artifact{i}.bin"), &body);
}
let manifest_ref = store_dir(&adapter_a, src.path()).await.expect("store_dir");
let dest = TempDir::new("biglots-dest");
let stats = fetch_dir(&node_b, a_id, &manifest_ref, dest.path(), 16)
.await
.expect("fetch_dir (many large files)");
assert_eq!(stats.files, n, "all {n} large files transferred");
assert_eq!(read_tree(dest.path()), read_tree(src.path()), "trees match");
}
fn gen_node_modules(root: &Path, packages: usize) -> (usize, u64) {
let mut files = 0usize;
let mut bytes = 0u64;
let mut emit = |rel: String, len: usize, seed: u8| {
let body: Vec<u8> = (0..len)
.map(|i| ((i + seed as usize) % 251) as u8)
.collect();
write(root, &rel, &body);
files += 1;
bytes += len as u64;
};
for p in 0..packages {
let pkg = format!("node_modules/pkg{p:04}");
emit(format!("{pkg}/package.json"), 200 + (p % 400), 1);
emit(format!("{pkg}/index.js"), 800 + (p * 7 % 4000), 2);
emit(format!("{pkg}/README.md"), 500 + (p % 1500), 3);
for f in 0..(3 + p % 5) {
emit(
format!("{pkg}/lib/mod{f}.js"),
300 + (p * f % 6000),
(f + 4) as u8,
);
}
if p % 7 == 0 {
emit(
format!("{pkg}/dist/bundle.min.js"),
40_000 + (p % 20_000),
9,
);
emit(
format!("{pkg}/node_modules/dep{p}/index.js"),
600 + (p % 2000),
11,
);
}
}
#[cfg(unix)]
{
let link = root.join("node_modules/.bin/cli");
let _ = std::fs::create_dir_all(link.parent().unwrap());
let _ = std::os::unix::fs::symlink("../pkg0000/index.js", &link);
}
(files, bytes)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "scale benchmark — run with --ignored --nocapture"]
async fn bench_nodemodules_scale() {
const PACKAGES: usize = 1500; const CONCURRENCY: usize = 16;
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(adapter_b);
let src = TempDir::new("nm-src");
let (file_count, total_bytes) = gen_node_modules(src.path(), PACKAGES);
let mib = total_bytes as f64 / (1024.0 * 1024.0);
let store_start = Instant::now();
let manifest_ref = store_dir(&adapter_a, src.path()).await.expect("store_dir");
let store_elapsed = store_start.elapsed();
let dest = TempDir::new("nm-dest");
let xfer_start = Instant::now();
let stats = fetch_dir(&node_b, a_id, &manifest_ref, dest.path(), CONCURRENCY)
.await
.expect("fetch_dir");
let xfer_elapsed = xfer_start.elapsed();
println!("── node_modules-scale transfer ──");
println!(" tree: {file_count} files, {mib:.1} MiB (deep-nested, mixed sizes)");
println!(" store: {store_elapsed:?}");
println!(
" transfer: {xfer_elapsed:?} = {:.1} MiB/s, {:.0} files/s",
mib / xfer_elapsed.as_secs_f64(),
stats.files as f64 / xfer_elapsed.as_secs_f64()
);
println!(
" stats: {} files, {} dirs, {} bytes",
stats.files, stats.dirs, stats.bytes
);
assert_eq!(stats.files, file_count, "every file transferred");
assert_eq!(stats.bytes, total_bytes, "byte total matches");
assert_eq!(
read_tree(dest.path()),
read_tree(src.path()),
"reconstructed node_modules matches source byte-for-byte",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "scale benchmark — run with --ignored --nocapture"]
async fn bench_throughput_invariance() {
const VOLUME: usize = 8 * 1024 * 1024; const CONCURRENCY: usize = 8;
async fn run_arm(label: &str, files: usize, file_len: usize, cap: usize) -> f64 {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let adapter_a = Arc::new(
MeshBlobAdapter::new("a", Arc::new(Redex::new())).with_chunk_file_max_memory_bytes(cap),
);
let adapter_b = Arc::new(MeshBlobAdapter::new("b", Arc::new(Redex::new())));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(adapter_b);
let src = TempDir::new("inv-src");
for f in 0..files {
let body: Vec<u8> = (0..file_len).map(|i| ((i + f) % 251) as u8).collect();
write(src.path(), &format!("d{}/f{f}.bin", f % 32), &body);
}
let manifest_ref = store_dir(&adapter_a, src.path()).await.expect("store_dir");
let dest = TempDir::new("inv-dest");
let start = Instant::now();
let stats = fetch_dir(&node_b, a_id, &manifest_ref, dest.path(), CONCURRENCY)
.await
.expect("fetch_dir");
let elapsed = start.elapsed();
let mib = stats.bytes as f64 / (1024.0 * 1024.0);
let rate = mib / elapsed.as_secs_f64();
println!(" {label}: {files} files × {file_len} B = {mib:.1} MiB in {elapsed:?} = {rate:.2} MiB/s");
rate
}
println!(
"── throughput invariance (equal {} MiB volume) ──",
VOLUME / (1024 * 1024)
);
let low = run_arm(
"few-large ",
VOLUME / (4 * 1024 * 1024),
4 * 1024 * 1024,
5 * 1024 * 1024,
)
.await;
let high = run_arm("many-small", VOLUME / (8 * 1024), 8 * 1024, 256 * 1024).await;
let ratio = high / low;
println!(" invariance ratio (many-small / few-large): {ratio:.2} (plan target ≥ 0.80)");
if ratio >= 0.80 {
println!(" ✓ throughput scales with volume, not file count");
} else {
println!(
" ✗ per-file overhead is significant: many-small is {:.0}% of few-large throughput",
ratio * 100.0
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn directory_transfer_large_multichunk_file() {
use net::adapter::net::dataforts::blob::BLOB_CHUNK_SIZE_BYTES;
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(adapter_b);
let big_len = 9 * 1024 * 1024usize;
assert!(
big_len as u64 > BLOB_CHUNK_SIZE_BYTES,
"must exceed one chunk"
);
let big: Vec<u8> = (0..big_len).map(|i| (i % 251) as u8).collect();
let src = TempDir::new("bigsrc");
write(src.path(), "small.txt", b"a small sibling");
write(src.path(), "data/big.bin", &big);
let manifest_ref = store_dir(&adapter_a, src.path()).await.expect("store_dir");
let dest = TempDir::new("bigdest");
let stats = fetch_dir(&node_b, a_id, &manifest_ref, dest.path(), 0)
.await
.expect("fetch_dir");
assert_eq!(stats.files, 2, "both files transferred");
assert_eq!(stats.bytes, (big_len + 15) as u64, "byte total");
let got = read_tree(dest.path());
assert_eq!(got.get("data/big.bin").map(|v| v.len()), Some(big_len));
assert_eq!(
read_tree(dest.path()),
read_tree(src.path()),
"multi-chunk file + sibling reconstruct byte-for-byte"
);
}
fn hex32(h: &[u8; 32]) -> String {
use std::fmt::Write as _;
let mut s = String::with_capacity(64);
for b in h {
let _ = write!(s, "{b:02x}");
}
s
}
async fn store_manifest(adapter: &MeshBlobAdapter, manifest: &DirManifest) -> BlobRef {
let bytes = postcard::to_allocvec(manifest).unwrap();
let hash: [u8; 32] = blake3::hash(&bytes).into();
let chunked = chunk_payload(&bytes).unwrap();
let blob_ref = chunked
.into_blob_ref(format!("mesh://{}", hex32(&hash)), Encoding::Replicated)
.unwrap();
adapter.store(&blob_ref, &bytes).await.unwrap();
blob_ref
}
fn temp_orphans(parent: &Path, base: &str) -> Vec<String> {
let fetch_pfx = format!(".{base}.fetch_");
let repl_pfx = format!(".{base}.replaced_");
let mut out = Vec::new();
if let Ok(rd) = std::fs::read_dir(parent) {
for e in rd.flatten() {
let name = e.file_name().to_string_lossy().into_owned();
if name.starts_with(&fetch_pfx) || name.starts_with(&repl_pfx) {
out.push(name);
}
}
}
out
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn fetch_dir_replaces_and_removes_stale_files() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let adapter_a = Arc::new(MeshBlobAdapter::new("a", Arc::new(Redex::new())));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(Arc::new(MeshBlobAdapter::new("b", Arc::new(Redex::new()))));
let src1 = TempDir::new("src1");
write(src1.path(), "a.txt", b"A1");
write(src1.path(), "b.txt", b"B1");
let m1 = store_dir(&adapter_a, src1.path()).await.expect("store v1");
let src2 = TempDir::new("src2");
write(src2.path(), "a.txt", b"A2");
let m2 = store_dir(&adapter_a, src2.path()).await.expect("store v2");
let parent = TempDir::new("parent");
let dest = parent.path().join("dest");
fetch_dir(&node_b, a_id, &m1, &dest, 0)
.await
.expect("fetch v1");
assert_eq!(
read_tree(&dest).keys().cloned().collect::<Vec<_>>(),
vec!["a.txt".to_string(), "b.txt".to_string()],
"v1 has both files"
);
fetch_dir(&node_b, a_id, &m2, &dest, 0)
.await
.expect("fetch v2");
let got = read_tree(&dest);
assert_eq!(
got.keys().cloned().collect::<Vec<_>>(),
vec!["a.txt".to_string()],
"stale b.txt removed by the atomic replace"
);
assert_eq!(
got.get("a.txt").map(Vec::as_slice),
Some(&b"A2"[..]),
"new content"
);
assert!(
temp_orphans(parent.path(), "dest").is_empty(),
"no temp/backup orphans after a successful replace"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn fetch_dir_failure_preserves_existing_dest() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let adapter_a = Arc::new(MeshBlobAdapter::new("a", Arc::new(Redex::new())));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(Arc::new(MeshBlobAdapter::new("b", Arc::new(Redex::new()))));
let src = TempDir::new("src");
write(src.path(), "keep.txt", b"original");
let m_good = store_dir(&adapter_a, src.path()).await.expect("store good");
let parent = TempDir::new("parent");
let dest = parent.path().join("dest");
fetch_dir(&node_b, a_id, &m_good, &dest, 0)
.await
.expect("seed dest");
let before = read_tree(&dest);
let bad = DirManifest {
version: DIR_MANIFEST_VERSION,
entries: vec![DirEntry {
path: "../escape.txt".into(),
kind: EntryKind::File {
mode: 0o644,
blob: BlobRef::small("mesh://x", [0xAB; 32], 4).encode(),
},
}],
};
let m_bad = store_manifest(&adapter_a, &bad).await;
let err = fetch_dir(&node_b, a_id, &m_bad, &dest, 0).await;
assert!(err.is_err(), "unsafe manifest path must fail the fetch");
assert_eq!(read_tree(&dest), before, "dest unchanged after the failure");
assert!(
temp_orphans(parent.path(), "dest").is_empty(),
"temp tree cleaned up on failure"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn fetch_dir_failure_does_not_create_dest() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let adapter_a = Arc::new(MeshBlobAdapter::new("a", Arc::new(Redex::new())));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(Arc::new(MeshBlobAdapter::new("b", Arc::new(Redex::new()))));
let bad = DirManifest {
version: DIR_MANIFEST_VERSION,
entries: vec![DirEntry {
path: "../escape.txt".into(),
kind: EntryKind::File {
mode: 0o644,
blob: BlobRef::small("mesh://x", [0xCD; 32], 4).encode(),
},
}],
};
let m_bad = store_manifest(&adapter_a, &bad).await;
let parent = TempDir::new("parent");
let dest = parent.path().join("never");
assert!(fetch_dir(&node_b, a_id, &m_bad, &dest, 0).await.is_err());
assert!(!dest.exists(), "dest must not exist after a failed fetch");
assert!(
temp_orphans(parent.path(), "never").is_empty(),
"no temp orphan after a failed fetch into a fresh dest"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_fetch_into_adjacent_dests() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let adapter_a = Arc::new(MeshBlobAdapter::new("a", Arc::new(Redex::new())));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(Arc::new(MeshBlobAdapter::new("b", Arc::new(Redex::new()))));
let src1 = TempDir::new("src1");
write(src1.path(), "x.txt", b"tree-one");
let m1 = store_dir(&adapter_a, src1.path()).await.expect("store 1");
let src2 = TempDir::new("src2");
write(src2.path(), "y.txt", b"tree-two");
let m2 = store_dir(&adapter_a, src2.path()).await.expect("store 2");
let parent = TempDir::new("parent");
let dest_a = parent.path().join("a");
let dest_b = parent.path().join("b");
let (r1, r2) = tokio::join!(
fetch_dir(&node_b, a_id, &m1, &dest_a, 0),
fetch_dir(&node_b, a_id, &m2, &dest_b, 0),
);
r1.expect("fetch a");
r2.expect("fetch b");
assert_eq!(read_tree(&dest_a), read_tree(src1.path()), "dest_a == src1");
assert_eq!(read_tree(&dest_b), read_tree(src2.path()), "dest_b == src2");
assert!(temp_orphans(parent.path(), "a").is_empty());
assert!(temp_orphans(parent.path(), "b").is_empty());
}