mod common;
use common::spawn_connexa_with_default_key;
use connexa::prelude::dht::{Mode, Quorum, RecordKey};
use connexa::prelude::{DHTEvent, Multiaddr, PeerId};
use futures::StreamExt;
use crate::common::DEFAULT_TIMEOUT;
async fn create_connected_nodes() -> (
connexa::handle::Connexa,
connexa::handle::Connexa,
PeerId,
PeerId,
) {
let [
(node1, node1_peer_id, node1_addr),
(node2, node2_peer_id, node2_addr),
] = common::spawn_connexa_nodes_with_default_keys::<2>().await;
node2.swarm().dial(node1_addr.clone()).await.unwrap();
node1
.dht()
.add_address(node2_peer_id, node2_addr)
.await
.unwrap();
node2
.dht()
.add_address(node1_peer_id, node1_addr)
.await
.unwrap();
(node1, node2, node1_peer_id, node2_peer_id)
}
#[tokio::test]
async fn test_dht_mode() {
let node = spawn_connexa_with_default_key().await;
let mode = node.dht().mode().await.unwrap();
assert_eq!(mode, Mode::Server);
node.dht().set_mode(Mode::Server).await.unwrap();
let mode = node.dht().mode().await.unwrap();
assert_eq!(mode, Mode::Server);
node.dht().set_mode(Mode::Client).await.unwrap();
let mode = node.dht().mode().await.unwrap();
assert_eq!(mode, Mode::Client);
}
#[tokio::test]
async fn test_dht_add_address() {
let node = spawn_connexa_with_default_key().await;
let peer_id = PeerId::random();
let addr: Multiaddr = "/memory/1234".parse().unwrap();
node.dht().add_address(peer_id, addr).await.unwrap();
}
#[tokio::test]
async fn test_dht_put_and_get() {
let (node1, node2, peer_id1, _peer_id2) = create_connected_nodes().await;
let key = "test-key";
let value = b"test-value";
node1
.dht()
.put(key, value.to_vec(), Quorum::One)
.await
.unwrap();
let mut stream = node2.dht().get(key).await.unwrap();
let record = tokio::time::timeout(DEFAULT_TIMEOUT, stream.next())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(record.record.value, value);
assert_eq!(record.record.publisher, Some(peer_id1));
}
#[tokio::test]
async fn test_dht_provide_and_get_providers() {
let (node1, node2, peer_id1, _peer_id2) = create_connected_nodes().await;
let key = "provided-content";
node1.dht().provide(key).await.unwrap();
let mut stream = node2.dht().get_providers(key).await.unwrap();
let providers = tokio::time::timeout(DEFAULT_TIMEOUT, stream.next())
.await
.unwrap()
.unwrap()
.unwrap();
assert!(providers.contains(&peer_id1));
}
#[tokio::test]
async fn test_dht_stop_provide() {
let node = spawn_connexa_with_default_key().await;
let key = "stop-provide-key";
node.dht().provide(key).await.unwrap();
node.dht().stop_provide(key).await.unwrap();
}
#[tokio::test]
async fn test_dht_find_peer() {
let (_node1, node2, peer_id1, _peer_id2) = create_connected_nodes().await;
let peer_info = node2.dht().find_peer(peer_id1).await.unwrap();
assert!(!peer_info.is_empty());
assert!(peer_info.iter().any(|info| info.peer_id == peer_id1));
}
#[tokio::test]
async fn test_dht_listener() {
let (node1, node2, _peer_id1, _peer_id2) = create_connected_nodes().await;
let mut listener = node2.dht().listener(()).await.unwrap();
let node_clone = node1.clone();
tokio::spawn(async move {
node_clone
.dht()
.put("listener-test", &b"data"[..], Quorum::One)
.await
.unwrap();
});
let event = tokio::time::timeout(DEFAULT_TIMEOUT, listener.next())
.await
.unwrap()
.unwrap();
assert!(matches!(event, DHTEvent::PutRecord { .. }))
}
#[tokio::test]
async fn test_dht_listener_with_specific_key() {
let (node1, node2, _peer_id1, _peer_id2) = create_connected_nodes().await;
let key = RecordKey::from(b"specific-key".to_vec());
let mut listener = node2.dht().listener(&key).await.unwrap();
let node_clone = node1.clone();
let key_clone = key.clone();
tokio::spawn(async move {
node_clone
.dht()
.put(&key_clone, &b"data"[..], Quorum::One)
.await
.unwrap();
});
let event = tokio::time::timeout(DEFAULT_TIMEOUT, listener.next())
.await
.unwrap()
.unwrap();
assert!(matches!(event, DHTEvent::PutRecord { .. }));
}
#[tokio::test]
async fn test_dht_bootstrap_with_no_peers() {
let node = spawn_connexa_with_default_key().await;
let result = node.dht().bootstrap().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_dht_find_non_existent_peer() {
let node = spawn_connexa_with_default_key().await;
let random_peer = PeerId::random();
let result = node.dht().find_peer(random_peer).await.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn test_dht_get_non_existent_key() {
let node = spawn_connexa_with_default_key().await;
let mut stream = node.dht().get("non-existent-key").await.unwrap();
stream.next().await.unwrap().unwrap_err();
}