#[test]
fn test_module_path() {
let path = module_path!();
println!("Actual module path: {}", path);
assert!(path.contains("cla::tests"));
}
#[test]
fn test_current_module() {
let current_module = module_path!();
println!("Current module path: {}", current_module);
assert!(current_module.contains("cla::tests"));
}
use crate::bpv7::bundle::{Bundle, PrimaryBlock};
use crate::bpv7::EndpointId;
use crate::cla::manager::*;
use crate::cla::peer::ClaPeer;
use crate::cla::tcp::client::*;
use crate::cla::tcp::server::*;
use crate::cla::ConvergenceLayer;
use crate::consts::tcp::*;
use async_trait::async_trait;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tempfile::TempDir;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tokio::time::Duration;
fn create_test_bundle(source: &str, destination: &str, payload: &[u8]) -> Bundle {
Bundle {
primary: PrimaryBlock {
version: 7,
source: source.to_string(),
destination: destination.to_string(),
report_to: "none".to_string(),
creation_timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
lifetime: 3600,
},
payload: payload.to_vec(),
}
}
#[derive(Debug, Clone)]
struct MockCla {
address: String,
should_fail: bool,
activation_counter: Arc<AtomicUsize>,
}
impl MockCla {
fn new(address: &str) -> Self {
Self {
address: address.to_string(),
should_fail: false,
activation_counter: Arc::new(AtomicUsize::new(0)),
}
}
fn new_failing(address: &str) -> Self {
Self {
address: address.to_string(),
should_fail: true,
activation_counter: Arc::new(AtomicUsize::new(0)),
}
}
fn activation_count(&self) -> usize {
self.activation_counter.load(Ordering::SeqCst)
}
}
#[async_trait]
impl ConvergenceLayer for MockCla {
fn address(&self) -> String {
self.address.clone()
}
async fn activate(&self) -> anyhow::Result<()> {
self.activation_counter.fetch_add(1, Ordering::SeqCst);
if self.should_fail {
return Err(anyhow::anyhow!("Mock activation failure"));
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
Ok(())
}
}
#[async_trait]
impl ClaPeer for MockCla {
fn get_peer_endpoint_id(&self) -> EndpointId {
EndpointId::from(self.address.as_str())
}
async fn is_reachable(&self) -> bool {
!self.should_fail
}
fn get_cla_type(&self) -> &str {
"mock"
}
fn get_connection_address(&self) -> String {
self.address.clone()
}
fn clone_box(&self) -> Box<dyn ClaPeer> {
Box::new(self.clone())
}
async fn activate(&self) -> anyhow::Result<()> {
<Self as ConvergenceLayer>::activate(self).await
}
}
#[tokio::test]
async fn test_cla_manager_new() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let manager = ClaManager::new(move |_bundle| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let peers = manager.list_reachable_peers().await;
let addresses: Vec<String> = peers.iter().map(|p| p.get_connection_address()).collect();
assert!(addresses.is_empty());
}
#[tokio::test]
async fn test_register_single_cla() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let manager = ClaManager::new(move |_bundle| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let mock_cla = Box::new(MockCla::new("test://127.0.0.1:8080"));
manager.register_peer(mock_cla).await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let peers = manager.list_reachable_peers().await;
assert_eq!(peers.len(), 1);
let addresses: Vec<String> = peers.iter().map(|p| p.get_connection_address()).collect();
assert!(addresses.contains(&"test://127.0.0.1:8080".to_string()));
}
#[tokio::test]
async fn test_register_multiple_clas() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let manager = ClaManager::new(move |_bundle| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let cla1 = Box::new(MockCla::new("test://127.0.0.1:8080"));
let cla2 = Box::new(MockCla::new("test://127.0.0.1:8081"));
let cla3 = Box::new(MockCla::new("test://127.0.0.1:8082"));
manager.register_peer(cla1).await;
manager.register_peer(cla2).await;
manager.register_peer(cla3).await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let peers = manager.list_reachable_peers().await;
assert_eq!(peers.len(), 3);
let addresses: Vec<String> = peers.iter().map(|p| p.get_connection_address()).collect();
assert!(addresses.contains(&"test://127.0.0.1:8080".to_string()));
assert!(addresses.contains(&"test://127.0.0.1:8081".to_string()));
assert!(addresses.contains(&"test://127.0.0.1:8082".to_string()));
}
#[tokio::test]
async fn test_register_duplicate_cla() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let manager = ClaManager::new(move |_bundle| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let cla1 = Box::new(MockCla::new("test://127.0.0.1:8080"));
let cla2 = Box::new(MockCla::new("test://127.0.0.1:8080"));
manager.register_peer(cla1).await;
manager.register_peer(cla2).await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let peers = manager.list_reachable_peers().await;
assert_eq!(peers.len(), 1);
}
#[tokio::test]
async fn test_register_failing_cla() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let manager = ClaManager::new(move |_bundle| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let failing_cla = Box::new(MockCla::new_failing("test://127.0.0.1:8080"));
manager.register_peer(failing_cla.clone()).await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let all_peers = manager.list_all_peers().await;
assert_eq!(all_peers.len(), 1);
let reachable_peers = manager.list_reachable_peers().await;
assert_eq!(reachable_peers.len(), 0);
assert_eq!(failing_cla.activation_count(), 1);
}
#[tokio::test]
async fn test_notify_receive() {
let received_bundles = Arc::new(Mutex::new(Vec::new()));
let bundles_clone = Arc::clone(&received_bundles);
let manager = ClaManager::new(move |bundle| {
let bundles = Arc::clone(&bundles_clone);
tokio::spawn(async move {
let mut guard = bundles.lock().await;
guard.push(bundle);
});
});
let test_bundle = create_test_bundle("dtn://source", "dtn://dest", b"test payload");
manager.notify_receive(test_bundle.clone());
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let received = received_bundles.lock().await;
assert_eq!(received.len(), 1);
assert_eq!(received[0].primary.source, "dtn://source");
assert_eq!(received[0].primary.destination, "dtn://dest");
}
#[tokio::test]
async fn test_notify_receive_multiple_bundles() {
let received_count = Arc::new(AtomicUsize::new(0));
let count_clone = Arc::clone(&received_count);
let manager = ClaManager::new(move |_bundle| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
for i in 0..5 {
let bundle = create_test_bundle(
&format!("dtn://source{}", i),
&format!("dtn://dest{}", i),
b"test payload",
);
manager.notify_receive(bundle);
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
assert_eq!(received_count.load(Ordering::SeqCst), 5);
}
#[tokio::test]
async fn test_manager_clone() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let manager1 = ClaManager::new(move |_bundle| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let manager2 = manager1.clone();
let cla1 = Box::new(MockCla::new("test://127.0.0.1:8080"));
let cla2 = Box::new(MockCla::new("test://127.0.0.1:8081"));
manager1.register_peer(cla1).await;
manager2.register_peer(cla2).await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let peers1 = manager1.list_reachable_peers().await;
let peers2 = manager2.list_reachable_peers().await;
let addresses1: Vec<String> = peers1.iter().map(|p| p.get_connection_address()).collect();
let addresses2: Vec<String> = peers2.iter().map(|p| p.get_connection_address()).collect();
assert_eq!(addresses1.len(), 2);
assert_eq!(addresses2.len(), 2);
assert_eq!(addresses1, addresses2);
}
#[tokio::test]
async fn test_list_active_empty() {
let manager = ClaManager::new(|_bundle| {});
let peers = manager.list_reachable_peers().await;
assert!(peers.is_empty());
}
#[test]
fn test_mock_cla_address() {
let mock_cla = MockCla::new("test://example.com:1234");
assert_eq!(mock_cla.address(), "test://example.com:1234");
}
#[tokio::test]
async fn test_mock_cla_activation_success() {
let mock_cla = MockCla::new("test://example.com");
let result = <MockCla as ConvergenceLayer>::activate(&mock_cla).await;
assert!(result.is_ok());
assert_eq!(mock_cla.activation_count(), 1);
}
#[tokio::test]
async fn test_mock_cla_activation_failure() {
let mock_cla = MockCla::new_failing("test://example.com");
let result = <MockCla as ConvergenceLayer>::activate(&mock_cla).await;
assert!(result.is_err());
assert_eq!(mock_cla.activation_count(), 1);
}
#[test]
fn test_tcp_cla_dialer_new() {
let dialer = TcpClaClient {
target_addr: "127.0.0.1:8080".to_string(),
connection_info: None,
};
assert_eq!(dialer.target_addr, "127.0.0.1:8080");
}
#[test]
fn test_tcp_cla_dialer_address() {
let dialer = TcpClaClient {
target_addr: "localhost:9090".to_string(),
connection_info: None,
};
assert_eq!(dialer.address(), "localhost:9090");
}
#[test]
fn test_create_bundle_simple() {
let bundle = create_bundle("dtn://source", "dtn://dest", b"hello".to_vec());
assert_eq!(bundle.primary.source, "dtn://source");
assert_eq!(bundle.primary.destination, "dtn://dest");
assert_eq!(bundle.payload, b"hello");
assert_eq!(bundle.primary.version, 7);
assert_eq!(bundle.primary.report_to, "none");
assert_eq!(bundle.primary.lifetime, 3600);
}
#[test]
fn test_create_bundle_with_various_payloads() {
let test_cases = vec![
("empty", b"".to_vec()),
("simple", b"hello world".to_vec()),
("unicode", "こんにちは世界".as_bytes().to_vec()),
("numbers", b"123456789".to_vec()),
("binary", vec![0, 1, 2, 255, 254, 253]),
];
for (name, payload) in test_cases {
let bundle = create_bundle(
&format!("dtn://source_{}", name),
&format!("dtn://dest_{}", name),
payload.clone(),
);
assert_eq!(bundle.payload, payload);
assert!(bundle.primary.creation_timestamp > 0);
}
}
#[test]
fn test_create_bundle_timing() {
let before = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let bundle = create_bundle("dtn://source", "dtn://dest", b"test".to_vec());
let after = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(bundle.primary.creation_timestamp >= before);
assert!(bundle.primary.creation_timestamp <= after);
}
async fn mock_tcp_server(
response: &'static str,
) -> anyhow::Result<(u16, tokio::task::JoinHandle<()>)> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let port = listener.local_addr()?.port();
let handle = tokio::spawn(async move {
if let Ok((mut stream, _)) = listener.accept().await {
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_ok() {
let len = u32::from_be_bytes(len_buf) as usize;
let mut data = vec![0u8; len];
if stream.read_exact(&mut data).await.is_ok() {
let _ = stream.write_all(response.as_bytes()).await;
}
}
}
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok((port, handle))
}
#[tokio::test]
async fn test_send_bundle_success() -> anyhow::Result<()> {
let (port, _handle) = mock_tcp_server(OK).await?;
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await?;
let bundle = create_test_bundle("dtn://source", "dtn://dest", b"test payload");
let result = send_bundle(&mut stream, &bundle).await;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_send_bundle_with_different_acks() -> anyhow::Result<()> {
let test_cases = [OK, ACK, SUCCESS, RECEIVED];
for (i, ack) in test_cases.iter().enumerate() {
let (port, _handle) = mock_tcp_server(ack).await?;
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await?;
let bundle = create_test_bundle(
&format!("dtn://source_{}", i),
&format!("dtn://dest_{}", i),
format!("test payload {}", i).as_bytes(),
);
let result = send_bundle(&mut stream, &bundle).await;
assert!(result.is_ok(), "Failed for ACK: {}", ack);
}
Ok(())
}
#[tokio::test]
async fn test_send_bundle_large_payload() -> anyhow::Result<()> {
let (port, _handle) = mock_tcp_server(OK).await?;
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await?;
let large_payload = vec![42u8; 10000];
let bundle = create_test_bundle("dtn://source", "dtn://dest", &large_payload);
let result = send_bundle(&mut stream, &bundle).await;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_send_bundle_serialization() -> anyhow::Result<()> {
let bundle = create_test_bundle("dtn://source", "dtn://dest", b"test");
let encoded = serde_cbor::to_vec(&bundle);
assert!(encoded.is_ok());
let encoded_data = encoded.unwrap();
assert!(!encoded_data.is_empty());
let decoded: Result<Bundle, _> = serde_cbor::from_slice(&encoded_data);
assert!(decoded.is_ok());
let decoded_bundle = decoded.unwrap();
assert_eq!(decoded_bundle.primary.source, bundle.primary.source);
assert_eq!(
decoded_bundle.primary.destination,
bundle.primary.destination
);
assert_eq!(decoded_bundle.payload, bundle.payload);
Ok(())
}
#[tokio::test]
async fn test_tcp_cla_dialer_activate_no_server() {
let dialer = TcpClaClient {
target_addr: "127.0.0.1:19999".to_string(), connection_info: None,
};
let result = dialer.activate().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_tcp_cla_dialer_activate_with_empty_store() -> anyhow::Result<()> {
let (port, _handle) = mock_tcp_server(OK).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let temp_dir = TempDir::new()?;
let _temp_bundles_dir = temp_dir.path().join("test_bundles");
let _dialer = TcpClaClient {
target_addr: format!("127.0.0.1:{}", port),
connection_info: None,
};
Ok(())
}
#[test]
fn test_create_bundle_different_addresses() {
let test_cases = vec![
("dtn://node1", "dtn://node2"),
("tcp://localhost:8080", "tcp://remote:9090"),
("http://example.com", "https://secure.example.com"),
("", ""), ];
for (source, dest) in test_cases {
let bundle = create_bundle(source, dest, b"test".to_vec());
assert_eq!(bundle.primary.source, source);
assert_eq!(bundle.primary.destination, dest);
}
}
#[test]
fn test_create_bundle_consistency() {
for i in 0..10 {
let bundle = create_bundle(
&format!("dtn://source{}", i),
&format!("dtn://dest{}", i),
format!("payload{}", i).into_bytes(),
);
assert_eq!(bundle.primary.version, 7);
assert_eq!(bundle.primary.report_to, "none");
assert_eq!(bundle.primary.lifetime, 3600);
assert!(bundle.primary.creation_timestamp > 0);
}
}
#[test]
fn test_tcp_cla_listener_new() {
let callback = Arc::new(|_bundle: Bundle| {});
let listener = TcpClaListener {
bind_addr: "127.0.0.1:8080".to_string(),
receive_callback: callback,
};
assert_eq!(listener.bind_addr, "127.0.0.1:8080");
}
#[test]
fn test_tcp_cla_listener_address() {
let callback = Arc::new(|_bundle: Bundle| {});
let listener = TcpClaListener {
bind_addr: "0.0.0.0:9090".to_string(),
receive_callback: callback,
};
assert_eq!(listener.address(), "0.0.0.0:9090");
}
async fn _send_bundle_to_server(addr: &str, bundle: &Bundle) -> anyhow::Result<String> {
let mut stream = TcpStream::connect(addr).await?;
let encoded = serde_cbor::to_vec(bundle)?;
let len = encoded.len() as u32;
stream.write_all(&len.to_be_bytes()).await?;
stream.write_all(&encoded).await?;
let mut response = String::new();
stream.read_to_string(&mut response).await?;
Ok(response)
}
#[tokio::test]
async fn test_handle_connection_single_bundle() -> anyhow::Result<()> {
let received_bundles = Arc::new(Mutex::new(Vec::new()));
let bundles_clone = Arc::clone(&received_bundles);
let callback = {
let bundles_ref = Arc::clone(&bundles_clone);
Arc::new(move |bundle: Bundle| {
let bundles = Arc::clone(&bundles_ref);
tokio::spawn(async move {
let mut guard = bundles.lock().await;
guard.push(bundle);
});
})
};
let (client, server) = tokio::io::duplex(1024);
let handle = tokio::spawn(async move { handle_connection(server, callback).await });
let bundle = create_test_bundle("dtn://source", "dtn://dest", b"test payload");
let encoded = serde_cbor::to_vec(&bundle)?;
let len = encoded.len() as u32;
let mut client = client;
client.write_all(&len.to_be_bytes()).await?;
client.write_all(&encoded).await?;
let mut response = [0u8; 2];
client.read_exact(&mut response).await?;
assert_eq!(&response, b"OK");
drop(client);
let _ = tokio::time::timeout(Duration::from_millis(100), handle).await;
let received = received_bundles.lock().await;
assert_eq!(received.len(), 1);
assert_eq!(received[0].primary.source, "dtn://source");
assert_eq!(received[0].primary.destination, "dtn://dest");
assert_eq!(received[0].payload, b"test payload");
Ok(())
}
#[tokio::test]
async fn test_handle_connection_multiple_bundles() -> anyhow::Result<()> {
let received_count = Arc::new(AtomicUsize::new(0));
let count_clone = Arc::clone(&received_count);
let callback = Arc::new(move |_bundle: Bundle| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
let (client, server) = tokio::io::duplex(2048);
let handle = tokio::spawn(async move { handle_connection(server, callback).await });
let mut client = client;
for i in 0..3 {
let bundle = create_test_bundle(
&format!("dtn://source{}", i),
&format!("dtn://dest{}", i),
format!("payload {}", i).as_bytes(),
);
let encoded = serde_cbor::to_vec(&bundle)?;
let len = encoded.len() as u32;
client.write_all(&len.to_be_bytes()).await?;
client.write_all(&encoded).await?;
let mut response = [0u8; 2];
client.read_exact(&mut response).await?;
assert_eq!(&response, b"OK");
}
drop(client);
let _ = tokio::time::timeout(Duration::from_millis(100), handle).await;
assert_eq!(received_count.load(Ordering::SeqCst), 3);
Ok(())
}
#[tokio::test]
async fn test_handle_connection_large_bundle() -> anyhow::Result<()> {
let received_bundles = Arc::new(Mutex::new(Vec::new()));
let bundles_clone = Arc::clone(&received_bundles);
let callback = {
let bundles_ref = Arc::clone(&bundles_clone);
Arc::new(move |bundle: Bundle| {
let bundles = Arc::clone(&bundles_ref);
tokio::spawn(async move {
let mut guard = bundles.lock().await;
guard.push(bundle);
});
})
};
let (client, server) = tokio::io::duplex(20000);
let handle = tokio::spawn(async move { handle_connection(server, callback).await });
let large_payload = vec![42u8; 10000];
let bundle = create_test_bundle("dtn://source", "dtn://dest", &large_payload);
let encoded = serde_cbor::to_vec(&bundle)?;
let len = encoded.len() as u32;
let mut client = client;
client.write_all(&len.to_be_bytes()).await?;
client.write_all(&encoded).await?;
let mut response = [0u8; 2];
client.read_exact(&mut response).await?;
assert_eq!(&response, b"OK");
drop(client);
let _ = tokio::time::timeout(Duration::from_millis(100), handle).await;
let received = received_bundles.lock().await;
assert_eq!(received.len(), 1);
assert_eq!(received[0].payload.len(), 10000);
Ok(())
}
#[tokio::test]
async fn test_handle_connection_eof() -> anyhow::Result<()> {
let callback = Arc::new(|_bundle: Bundle| {});
let (client, server) = tokio::io::duplex(1024);
let handle = tokio::spawn(async move { handle_connection(server, callback).await });
drop(client);
let result = tokio::time::timeout(Duration::from_millis(100), handle).await;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_handle_connection_invalid_data() -> anyhow::Result<()> {
let callback = Arc::new(|_bundle: Bundle| {});
let (client, server) = tokio::io::duplex(1024);
let handle = tokio::spawn(async move { handle_connection(server, callback).await });
let mut client = client;
let invalid_len = 0xFFFFFFFFu32;
client.write_all(&invalid_len.to_be_bytes()).await?;
drop(client);
let _result = tokio::time::timeout(Duration::from_millis(100), handle).await;
Ok(())
}
#[tokio::test]
async fn test_handle_connection_partial_data() -> anyhow::Result<()> {
let callback = Arc::new(|_bundle: Bundle| {});
let (client, server) = tokio::io::duplex(1024);
let handle = tokio::spawn(async move { handle_connection(server, callback).await });
let mut client = client;
let len = 100u32;
client.write_all(&len.to_be_bytes()).await?;
client.write_all(b"incomplete").await?;
drop(client);
let _result = tokio::time::timeout(Duration::from_millis(100), handle).await;
Ok(())
}
#[tokio::test]
async fn test_tcp_cla_listener_activate_bind_error() {
let callback = Arc::new(|_bundle: Bundle| {});
let listener = TcpClaListener {
bind_addr: "invalid:address".to_string(),
receive_callback: callback,
};
let result = listener.activate().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_bundle_serialization_roundtrip() -> anyhow::Result<()> {
let original_bundle = create_test_bundle(
"dtn://test_source",
"dtn://test_destination",
b"test payload data",
);
let encoded = serde_cbor::to_vec(&original_bundle);
assert!(encoded.is_ok());
let encoded_data = encoded.unwrap();
assert!(!encoded_data.is_empty());
let decoded_bundle: Bundle = serde_cbor::from_slice(&encoded_data)?;
assert_eq!(
decoded_bundle.primary.version,
original_bundle.primary.version
);
assert_eq!(
decoded_bundle.primary.source,
original_bundle.primary.source
);
assert_eq!(
decoded_bundle.primary.destination,
original_bundle.primary.destination
);
assert_eq!(
decoded_bundle.primary.report_to,
original_bundle.primary.report_to
);
assert_eq!(
decoded_bundle.primary.creation_timestamp,
original_bundle.primary.creation_timestamp
);
assert_eq!(
decoded_bundle.primary.lifetime,
original_bundle.primary.lifetime
);
assert_eq!(decoded_bundle.payload, original_bundle.payload);
Ok(())
}
#[test]
fn test_create_test_bundle_fields() {
let bundle = create_test_bundle("source", "dest", b"payload");
assert_eq!(bundle.primary.version, 7);
assert_eq!(bundle.primary.source, "source");
assert_eq!(bundle.primary.destination, "dest");
assert_eq!(bundle.primary.report_to, "none");
assert_eq!(bundle.primary.lifetime, 3600);
assert_eq!(bundle.payload, b"payload");
assert!(bundle.primary.creation_timestamp > 0);
}
#[test]
fn test_create_test_bundle_timing() {
let before = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let bundle = create_test_bundle("src", "dst", b"test");
let after = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(bundle.primary.creation_timestamp >= before);
assert!(bundle.primary.creation_timestamp <= after);
}
#[test]
fn test_create_test_bundle_various_payloads() {
let test_cases = vec![
b"".to_vec(),
b"simple".to_vec(),
"unicode: こんにちは".as_bytes().to_vec(),
vec![0, 1, 2, 255, 254, 253], ];
for payload in test_cases {
let bundle = create_test_bundle("src", "dst", &payload);
assert_eq!(bundle.payload, payload);
}
}
#[test]
fn test_tcp_peer_new() {
let eid = crate::bpv7::EndpointId::from("dtn://test-peer");
let peer = crate::cla::TcpPeer::new(eid.clone(), "192.168.1.100:8080".to_string());
assert_eq!(peer.peer_id, eid);
assert_eq!(peer.address, "192.168.1.100:8080");
}
#[test]
fn test_tcp_peer_from_endpoint_id() {
let eid = crate::bpv7::EndpointId::from("dtn://test-node");
let peer = crate::cla::TcpPeer::from_endpoint_id(eid.clone());
assert_eq!(peer.peer_id, eid);
assert_eq!(peer.address, "dtn://test-node");
}
#[test]
fn test_tcp_peer_for_test() {
let eid = crate::bpv7::EndpointId::from("dtn://test-endpoint");
let peer = crate::cla::TcpPeer::for_test(eid.clone());
assert_eq!(peer.peer_id, eid);
assert_eq!(peer.address, "dtn://test-endpoint");
}
#[test]
fn test_tcp_peer_get_peer_endpoint_id() {
let eid = crate::bpv7::EndpointId::from("dtn://my-peer");
let peer = crate::cla::TcpPeer::new(eid.clone(), "10.0.0.1:9090".to_string());
assert_eq!(peer.get_peer_endpoint_id(), eid);
}
#[test]
fn test_tcp_peer_get_cla_type() {
let eid = crate::bpv7::EndpointId::from("dtn://any-peer");
let peer = crate::cla::TcpPeer::new(eid, "localhost:8080".to_string());
assert_eq!(peer.get_cla_type(), "tcp");
}
#[test]
fn test_tcp_peer_get_connection_address() {
let eid = crate::bpv7::EndpointId::from("dtn://addr-test");
let address = "example.com:1234".to_string();
let peer = crate::cla::TcpPeer::new(eid, address.clone());
assert_eq!(peer.get_connection_address(), address);
}
#[tokio::test]
async fn test_tcp_peer_is_reachable_unreachable() {
let eid = crate::bpv7::EndpointId::from("dtn://unreachable");
let peer = crate::cla::TcpPeer::new(eid, "127.0.0.1:19998".to_string());
let reachable = peer.is_reachable().await;
assert!(!reachable);
}
#[tokio::test]
async fn test_tcp_peer_is_reachable_timeout() {
let eid = crate::bpv7::EndpointId::from("dtn://timeout-test");
let peer = crate::cla::TcpPeer::new(eid, "192.0.2.1:80".to_string());
let reachable = peer.is_reachable().await;
assert!(!reachable);
}
#[tokio::test]
async fn test_tcp_peer_is_reachable_with_mock_server() -> anyhow::Result<()> {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let port = listener.local_addr()?.port();
tokio::spawn(async move {
if let Ok((stream, _)) = listener.accept().await {
drop(stream); }
});
let eid = crate::bpv7::EndpointId::from("dtn://reachable");
let peer = crate::cla::TcpPeer::new(eid, format!("127.0.0.1:{}", port));
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let reachable = peer.is_reachable().await;
assert!(reachable);
Ok(())
}
#[test]
fn test_tcp_cla_client_new() {
let client = TcpClaClient {
target_addr: "test.example.com:8080".to_string(),
connection_info: None,
};
assert_eq!(client.target_addr, "test.example.com:8080");
}
#[tokio::test]
async fn test_tcp_cla_client_activate_connection_refused() {
let client = TcpClaClient {
target_addr: "127.0.0.1:19997".to_string(), connection_info: None,
};
let result = client.activate().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_tcp_cla_client_activate_invalid_address() {
let client = TcpClaClient {
target_addr: "invalid-hostname:8080".to_string(),
connection_info: None,
};
let result = client.activate().await;
assert!(result.is_err());
}
#[test]
fn test_create_bundle_empty_payload() {
let bundle = create_bundle("dtn://src", "dtn://dst", vec![]);
assert_eq!(bundle.payload, b"");
assert_eq!(bundle.primary.source, "dtn://src");
assert_eq!(bundle.primary.destination, "dtn://dst");
}
#[test]
fn test_create_bundle_large_payload() {
let large_payload = vec![0xFF; 1000];
let bundle = create_bundle("dtn://big-src", "dtn://big-dst", large_payload.clone());
assert_eq!(bundle.payload, large_payload);
}
#[test]
fn test_create_bundle_unicode_addresses() {
let bundle = create_bundle(
"dtn://テスト送信",
"dtn://テスト受信",
b"unicode test".to_vec(),
);
assert_eq!(bundle.primary.source, "dtn://テスト送信");
assert_eq!(bundle.primary.destination, "dtn://テスト受信");
}