use std::net::SocketAddr;
use thenodes::network::peer_manager::PeerManager;
use tokio::sync::mpsc;
fn addr_local(port: u16) -> SocketAddr {
format!("127.0.0.1:{}", port).parse().unwrap()
}
#[tokio::test]
async fn relay_store_forward_delivers_on_connect() {
let pm = PeerManager::new();
pm.set_binding("nodeA", "nodeB", true, None, None).await;
pm.enqueue_store_forward(
"nodeB",
"{\"dummy\":1}".to_string(),
None,
false,
false,
None,
)
.await;
let (tx, mut rx) = mpsc::channel::<String>(8);
let addr_b = addr_local(10001);
pm.add_peer(addr_b, tx, "nodeB".to_string()).await.unwrap();
let got = rx.recv().await.expect("expected one forwarded frame");
assert!(got.contains("dummy"));
}
#[tokio::test]
async fn relay_store_forward_respects_expiry() {
let pm = PeerManager::new();
let now = {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
};
let expires = now + 1;
pm.set_binding("nodeA", "nodeB", true, Some(expires), None)
.await;
pm.enqueue_store_forward(
"nodeB",
"{\"ttl\":1}".to_string(),
Some(expires),
false,
false,
None,
)
.await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let (tx, mut rx) = mpsc::channel::<String>(8);
let addr_b = addr_local(10002);
pm.add_peer(addr_b, tx, "nodeB".to_string()).await.unwrap();
let res = tokio::time::timeout(std::time::Duration::from_millis(200), rx.recv()).await;
assert!(res.is_err(), "expired frames should not be delivered");
}
#[tokio::test]
async fn relay_store_forward_per_target_cap_drops_oldest() {
let pm = PeerManager::new();
pm.set_binding("nodeA", "nodeB", true, None, None).await;
let over = 1030usize;
for i in 0..over {
pm.enqueue_store_forward(
"nodeB",
format!("{{\"seq\":{}}}", i),
None,
false,
false,
None,
)
.await;
}
let (tx, mut rx) = mpsc::channel::<String>(1200);
let addr_b = addr_local(10003);
pm.add_peer(addr_b, tx, "nodeB".to_string()).await.unwrap();
let mut frames: Vec<String> = Vec::new();
while let Ok(msg) = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await
{
if let Some(m) = msg {
frames.push(m);
} else {
break;
}
if frames.len() >= 1024 {
break;
}
}
assert_eq!(frames.len(), 1024, "should deliver up to per-target cap");
let first_expected = over as i64 - 1024;
let last_expected = over as i64 - 1;
let first_seq: i64 = serde_json::from_str::<serde_json::Value>(&frames[0]).unwrap()["seq"]
.as_i64()
.unwrap();
let last_seq: i64 = serde_json::from_str::<serde_json::Value>(&frames[1023]).unwrap()["seq"]
.as_i64()
.unwrap();
assert_eq!(first_seq, first_expected);
assert_eq!(last_seq, last_expected);
}
#[tokio::test]
async fn relay_store_forward_global_cap_limits_total() {
let pm = PeerManager::new();
pm.set_binding("nodeA", "nodeB1", true, None, None).await;
pm.set_binding("nodeA", "nodeB2", true, None, None).await;
pm.set_binding("nodeA", "nodeB3", true, None, None).await;
pm.set_binding("nodeA", "nodeB4", true, None, None).await;
pm.set_binding("nodeA", "nodeB5", true, None, None).await;
pm.set_binding("nodeA", "nodeB6", true, None, None).await;
pm.set_binding("nodeA", "nodeB7", true, None, None).await;
pm.set_binding("nodeA", "nodeB8", true, None, None).await;
pm.set_binding("nodeA", "nodeB9", true, None, None).await;
for t in 1..=9 {
let to = format!("nodeB{}", t);
for i in 0..1300 {
pm.enqueue_store_forward(
&to,
format!("{{\"t\":{},\"i\":{}}}", t, i),
None,
false,
false,
None,
)
.await;
}
}
let mut rxs: Vec<mpsc::Receiver<String>> = Vec::new();
for t in 1..=9 {
let (tx, rx) = mpsc::channel::<String>(1400);
let addr = addr_local(11000 + t);
let node_id = format!("nodeB{}", t);
pm.add_peer(addr, tx, node_id).await.unwrap();
rxs.push(rx);
}
let mut total = 0usize;
for rx in rxs.iter_mut() {
loop {
match tokio::time::timeout(std::time::Duration::from_millis(10), rx.recv()).await {
Ok(Some(_m)) => total += 1,
_ => break,
}
if total >= 9000 {
break;
}
}
}
assert_eq!(
total, 8192,
"global cap should limit total delivered frames to 8192"
);
}
#[tokio::test]
async fn relay_store_forward_global_cap_affects_distribution() {
let pm = PeerManager::new();
for t in 1..=9 {
pm.set_binding("nodeA", &format!("nodeB{}", t), true, None, None)
.await;
}
for t in 1..=9 {
let to = format!("nodeB{}", t);
for i in 0..1300 {
pm.enqueue_store_forward(
&to,
format!("{{\"t\":{},\"i\":{}}}", t, i),
None,
false,
false,
None,
)
.await;
}
}
let mut rxs: Vec<(usize, mpsc::Receiver<String>)> = Vec::new();
for t in 1..=9 {
let (tx, rx) = mpsc::channel::<String>(1400);
let addr = addr_local(12000 + t);
let node_id = format!("nodeB{}", t);
pm.add_peer(addr, tx, node_id).await.unwrap();
rxs.push((t as usize, rx));
}
let mut per_target: [usize; 9] = [0; 9];
for (idx, rx) in rxs.iter_mut() {
loop {
match tokio::time::timeout(std::time::Duration::from_millis(5), rx.recv()).await {
Ok(Some(_)) => per_target[*idx - 1] += 1,
_ => break,
}
if per_target[*idx - 1] >= 1024 {
break;
}
}
}
let below_cap = per_target.iter().filter(|&&c| c < 1024).count();
assert!(
below_cap > 0,
"global cap should reduce some targets below per-target cap"
);
let total: usize = per_target.iter().sum();
assert_eq!(total, 8192);
}