#![cfg(feature = "dataforts")]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use net::adapter::net::dataforts::blob::{BlobAdapter, BlobRef, MeshBlobAdapter};
use net::adapter::net::redex::Redex;
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const PSK: [u8; 32] = [0x42u8; 32];
const SOCKET_BUF: usize = 8 * 1024 * 1024;
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(15))
.with_handshake(3, Duration::from_secs(2));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: SOCKET_BUF,
recv_buffer_size: SOCKET_BUF,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let cfg = test_config();
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
async fn handshake(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id).await.expect("connect");
accept.await.expect("accept task").expect("accept");
a.start();
b.start();
}
fn payload(len: usize) -> Vec<u8> {
(0..len).map(|i| (i % 251) as u8).collect()
}
fn small_ref(bytes: &[u8]) -> ([u8; 32], BlobRef) {
let hash: [u8; 32] = blake3::hash(bytes).into();
(
hash,
BlobRef::small("mesh://transfer", hash, bytes.len() as u64),
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn transfer_fetch_chunk_round_trip() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(adapter_b.clone());
let bytes = payload(50_000);
let (hash, blob_ref) = small_ref(&bytes);
adapter_a.store(&blob_ref, &bytes).await.expect("A store");
let got = node_b
.transfer_fetch_chunk(a_id, hash)
.await
.expect("B transfer_fetch_chunk");
assert_eq!(
got.as_ref(),
bytes.as_slice(),
"transferred chunk must match byte-for-byte",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn transfer_fetch_chunk_large_exercises_window_refill() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(adapter_b);
let bytes = payload(2 * 1024 * 1024);
let (hash, blob_ref) = small_ref(&bytes);
adapter_a.store(&blob_ref, &bytes).await.expect("A store");
let got = node_b
.transfer_fetch_chunk(a_id, hash)
.await
.expect("B transfer_fetch_chunk (2 MiB)");
assert_eq!(got.len(), bytes.len(), "length match");
assert_eq!(got.as_ref(), bytes.as_slice(), "2 MiB chunk byte-for-byte");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn transfer_fetch_chunk_discovered_finds_the_holder() {
let node_a = build_node().await;
let node_b = build_node().await;
let node_c = build_node().await;
handshake(&node_b, &node_a).await;
handshake(&node_b, &node_c).await;
let a_id = node_a.node_id();
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
let redex_c = Arc::new(Redex::new());
let adapter_c = Arc::new(MeshBlobAdapter::new("c", redex_c));
node_a.serve_blob_transfer(adapter_a.clone());
node_b.serve_blob_transfer(adapter_b);
node_c.serve_blob_transfer(adapter_c);
let bytes = payload(40_000);
let (hash, blob_ref) = small_ref(&bytes);
adapter_a.store(&blob_ref, &bytes).await.expect("A store");
let _ = a_id;
let got = node_b
.transfer_fetch_chunk_discovered(hash)
.await
.expect("B discovers + fetches from whichever peer holds it");
assert_eq!(
got.as_ref(),
bytes.as_slice(),
"discovered chunk must match byte-for-byte",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn transfer_fetch_chunk_discovered_none_hold_it() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
node_a.serve_blob_transfer(adapter_a);
node_b.serve_blob_transfer(adapter_b);
let bytes = payload(2048);
let (hash, _) = small_ref(&bytes);
use net::adapter::net::dataforts::blob::BlobError;
let err = node_b
.transfer_fetch_chunk_discovered(hash)
.await
.expect_err("no peer holds it");
assert!(matches!(err, BlobError::NotFound(_)), "got {err:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn transfer_fetch_chunk_not_found() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_b, &node_a).await;
let a_id = node_a.node_id();
let redex_a = Arc::new(Redex::new());
let adapter_a = Arc::new(MeshBlobAdapter::new("a", redex_a));
let redex_b = Arc::new(Redex::new());
let adapter_b = Arc::new(MeshBlobAdapter::new("b", redex_b));
node_a.serve_blob_transfer(adapter_a);
node_b.serve_blob_transfer(adapter_b);
let bytes = payload(1024);
let (hash, _) = small_ref(&bytes);
use net::adapter::net::dataforts::blob::BlobError;
let err = node_b
.transfer_fetch_chunk(a_id, hash)
.await
.expect_err("unheld chunk must error");
assert!(matches!(err, BlobError::NotFound(_)), "got {err:?}");
}