#[cfg(test)]
mod tests {
use aex::{
connection::{
global::GlobalContext, manager::ConnectionManager, node::Node, scope::NetworkScope,
types::BiDirectionalConnections,
},
tcp::types::{Command, RawCodec},
time::SystemTime,
};
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use tokio::{io::AsyncReadExt, net::TcpListener};
#[tokio::test]
async fn test_new_manager() {
let manager = ConnectionManager::new();
assert!(manager.connections.is_empty());
assert!(!manager.cancel_token.is_cancelled());
}
#[tokio::test]
async fn test_add_and_remove_logic() {
let manager = ConnectionManager::new();
let addr: SocketAddr = "1.1.1.1:8080".parse().unwrap();
let handle =
tokio::spawn(async { tokio::time::sleep(std::time::Duration::from_secs(10)).await })
.abort_handle();
let loopback: SocketAddr = "127.0.0.1:9000".parse().unwrap();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(
loopback,
handle.clone(),
cancellation_token.clone(),
true,
None,
);
assert!(manager.connections.is_empty(), "Loopback should be ignored");
manager.add(addr, handle.clone(), cancellation_token.clone(), true, None);
assert_eq!(manager.connections.len(), 1);
let addr2: SocketAddr = "1.1.1.1:8081".parse().unwrap();
manager.add(
addr2,
handle.clone(),
cancellation_token.clone(),
false,
None,
);
{
let bucket = manager
.connections
.get(&(addr.ip(), NetworkScope::from_ip(&addr.ip())))
.unwrap();
assert_eq!(bucket.clients.len(), 1);
assert_eq!(bucket.servers.len(), 1);
}
manager.remove(addr, true); {
let bucket = manager
.connections
.get(&(addr.ip(), NetworkScope::from_ip(&addr.ip())))
.unwrap();
assert_eq!(bucket.clients.len(), 0);
assert_eq!(bucket.servers.len(), 1);
}
manager.remove(addr2, false); assert!(
manager.connections.is_empty(),
"Bucket should be cleaned up"
);
}
#[tokio::test]
async fn test_cancel_operations() {
tokio::time::timeout(std::time::Duration::from_secs(5), async {
println!(">>> 开始测试: 初始化 Manager");
let manager = ConnectionManager::new();
let addr: SocketAddr = "1.2.3.4:5000".parse().unwrap();
println!(">>> 正在添加连接");
let handle = tokio::spawn(async {
loop {
tokio::task::yield_now().await;
}
})
.abort_handle();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(addr, handle.clone(), cancellation_token.clone(), true, None);
println!(">>> 正在执行 cancel_gracefully");
assert!(manager.cancel_gracefully(addr));
println!(">>> 检查 cancel_token 状态");
{
let ip_key = (addr.ip(), NetworkScope::from_ip(&addr.ip()));
if let Some(bucket) = manager.connections.get(&ip_key) {
if let Some(entry) = bucket.clients.get(&addr) {
assert!(entry.cancel_token.is_cancelled());
}
}
}
println!(">>> 正在执行 cancel_by_addr (最可能的死锁点)");
manager.cancel_by_addr(addr);
println!(">>> 正在执行 cancel_all_by_ip");
manager.add(addr, handle.clone(), cancellation_token.clone(), true, None);
manager.cancel_all_by_ip(addr.ip());
println!(">>> 测试完成!");
})
.await
.expect("测试因超时被迫中止,确认发生了死锁!");
}
#[tokio::test]
async fn test_deactivate_and_status() {
let manager = ConnectionManager::new();
let addr: SocketAddr = "8.8.8.8:80".parse().unwrap();
let handle = tokio::spawn(async {}).abort_handle();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(addr, handle, cancellation_token.clone(), true, None);
let status = manager.status();
assert_eq!(status.total_ips, 1);
assert_eq!(status.total_clients, 1);
assert_eq!(status.total_servers, 0);
manager.deactivate(0, 0); assert!(manager.connections.is_empty());
let empty_status = manager.status();
assert_eq!(empty_status.average_uptime, 0);
}
#[tokio::test]
async fn test_shutdown() {
let manager = ConnectionManager::new();
let addr: SocketAddr = "10.0.0.1:443".parse().unwrap();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(
addr,
tokio::spawn(async {}).abort_handle(),
cancellation_token,
false,
None,
);
manager.shutdown();
assert!(manager.cancel_token.is_cancelled());
assert!(manager.connections.is_empty());
}
#[test]
fn test_cleanup_deadlock_prevention() {
let manager = ConnectionManager::new();
let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
let scope = NetworkScope::from_ip(&ip);
let key = (ip, scope);
manager
.connections
.insert(key, BiDirectionalConnections::new());
manager.check_and_cleanup_bucket(key);
assert!(manager.connections.is_empty());
}
#[tokio::test]
async fn test_extreme_deactivate() {
let manager = ConnectionManager::new();
let addr: SocketAddr = "1.1.1.1:80".parse().unwrap();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(
addr,
tokio::spawn(async {}).abort_handle(),
cancellation_token,
true,
None,
);
manager.deactivate(0, 0); assert!(manager.connections.is_empty());
}
#[tokio::test]
async fn test_network_scope_coverage() {
let manager = ConnectionManager::new();
let intranet_addr: SocketAddr = "10.0.0.1:80".parse().unwrap();
let extranet_addr: SocketAddr = "1.1.1.1:80".parse().unwrap();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(
intranet_addr,
tokio::spawn(async {}).abort_handle(),
cancellation_token.clone(),
true,
None,
);
manager.add(
extranet_addr,
tokio::spawn(async {}).abort_handle(),
cancellation_token,
false,
None,
);
let status = manager.status();
assert!(status.intranet_conns > 0);
assert!(status.extranet_conns > 0);
assert_eq!(status.total_ips, 2);
}
#[test]
fn test_status_empty_manager() {
let manager = ConnectionManager::new();
let status = manager.status();
assert_eq!(status.total_ips, 0);
assert_eq!(status.average_uptime, 0); }
#[tokio::test]
async fn test_connection_notify_by_node_id() {
let manager = ConnectionManager::new();
let addr: SocketAddr = "192.168.1.100:8080".parse().unwrap();
let node_id = vec![1, 2, 3, 4];
let handle = tokio::spawn(async {}).abort_handle();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(addr, handle, cancellation_token.clone(), true, None);
{
let ip = addr.ip();
let scope = NetworkScope::from_ip(&ip);
let bi_conn = manager.connections.get(&(ip, scope)).unwrap();
let entry = bi_conn.clients.get(&addr).unwrap();
let mut node_lock = entry.node.write().await;
*node_lock = Some(Node {
id: node_id.clone(),
version: 1,
started_at: SystemTime::now_ts(),
port: 8080,
protocols: HashSet::new(),
ips: Vec::new(),
});
}
let called = Arc::new(AtomicBool::new(false));
let called_clone = Arc::clone(&called);
manager
.notify(&node_id, |entries| {
let inner_called = called_clone; async move {
assert_eq!(entries.len(), 1, "应该找到一个匹配的连接");
assert_eq!(entries[0].addr, addr);
inner_called.store(true, Ordering::SeqCst);
}
})
.await;
assert!(called.load(Ordering::SeqCst), "Notify 应该修改了原子变量");
manager
.notify(&vec![9, 9, 9], |entries| async move {
assert!(entries.is_empty(), "不匹配的 ID 应该返回空列表");
})
.await;
}
#[tokio::test] async fn test_notify_multiple_connections() {
let manager = ConnectionManager::new();
let node_id = vec![42];
let addrs = vec![
"1.1.1.1:1000".parse::<SocketAddr>().unwrap(),
"2.2.2.2:2000".parse::<SocketAddr>().unwrap(),
];
for &addr in &addrs {
let handle = tokio::spawn(async {}).abort_handle();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(addr, handle, cancellation_token.clone(), true, None);
let ip = addr.ip();
let scope = NetworkScope::from_ip(&ip);
let bi_conn = manager
.connections
.get(&(ip, scope))
.expect("Bucket missing");
let entry = bi_conn.clients.get(&addr).expect("Entry missing");
let mut node_lock = entry.node.write().await;
*node_lock = Some(Node {
id: node_id.clone(),
version: 1,
started_at: SystemTime::now_ts(),
port: 8080,
protocols: HashSet::new(),
ips: Vec::new(),
});
}
manager
.notify(&node_id, |entries| async move {
assert_eq!(entries.len(), 2, "同一个 Node ID 应该能搜到多个连接");
})
.await;
}
#[tokio::test] async fn test_connection_forward_all() {
let manager = ConnectionManager::new();
let addrs = vec![
"1.1.1.1:1000".parse::<SocketAddr>().unwrap(),
"2.2.2.2:2000".parse::<SocketAddr>().unwrap(),
"3.3.3.3:3000".parse::<SocketAddr>().unwrap(),
];
for &addr in &addrs {
let handle = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
})
.abort_handle();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(addr, handle, cancellation_token.clone(), true, None);
}
manager
.forward(|entries| async move {
assert_eq!(entries.len(), 3, "应该获取到所有 3 个连接");
for addr in ["1.1.1.1:1000", "2.2.2.2:2000", "3.3.3.3:3000"] {
let target_addr: SocketAddr = addr.parse().unwrap();
assert!(
entries.iter().any(|e| e.addr == target_addr),
"未能找到地址为 {} 的连接",
addr
);
}
})
.await; }
async fn setup_test_server() -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
while let Ok((mut stream, _)) = listener.accept().await {
let mut buf = [0u8; 10];
let _ = stream.read(&mut buf).await;
}
});
addr
}
#[tokio::test]
async fn test_connect_success() {
let manager = ConnectionManager::new();
let addr = setup_test_server().await;
let global = Arc::new(GlobalContext::new(addr, None));
let result = manager
.connect::<RawCodec, RawCodec, _, _>(
addr,
global,
|_ctx| async move {
},
Some(10),
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_connect_duplicate_prevented() {
let manager = ConnectionManager::new();
let addr = setup_test_server().await;
let handle = tokio::spawn(async {}).abort_handle();
let cancellation_token: tokio_util::sync::CancellationToken =
tokio_util::sync::CancellationToken::new();
manager.add(addr, handle, cancellation_token, false, None);
let global = Arc::new(GlobalContext::new(addr, None));
let result = manager
.connect::<RawCodec, RawCodec, _, _>(
addr,
global,
|_ctx| async move {
},
Some(10),
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_connect_physical_failure() {
let manager = ConnectionManager::new();
let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
let global = Arc::new(GlobalContext::new(addr, None));
let result = manager
.connect::<RawCodec, RawCodec, _, _>(
addr,
global,
|_ctx| async move {
},
Some(10),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_connect_closure_execution() {
let manager = ConnectionManager::new();
let addr = setup_test_server().await;
let (tx, rx) = tokio::sync::oneshot::channel();
let global = Arc::new(GlobalContext::new(addr, None));
let _ = manager
.connect::<RawCodec, RawCodec, _, _>(
addr,
global,
|_ctx| async move {
let _ = tx.send(true);
},
Some(10),
)
.await;
let executed = tokio::time::timeout(Duration::from_secs(1), rx).await;
assert!(matches!(executed, Ok(Ok(true))));
}
}