use async_trait::async_trait;
use crate::dht::successor::SuccessorWriter;
use crate::dht::Chord;
use crate::dht::ChordStorageSync;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::err::Error;
use crate::err::Result;
use crate::message::types::ConnectNodeReport;
use crate::message::types::ConnectNodeSend;
use crate::message::types::FindSuccessorReport;
use crate::message::types::FindSuccessorSend;
use crate::message::types::JoinDHT;
use crate::message::types::Message;
use crate::message::types::SyncVNodeWithSuccessor;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorThen;
use crate::message::HandleMsg;
use crate::message::LeaveDHT;
use crate::message::MessageHandler;
use crate::message::MessageHandlerEvent;
use crate::message::MessagePayload;
async fn handle_join_dht(
act: PeerRingAction,
ctx: &MessagePayload<Message>,
) -> Result<Vec<MessageHandlerEvent>> {
let mut events = vec![];
match act {
PeerRingAction::None => {}
PeerRingAction::RemoteAction(next, PeerRingRemoteAction::FindSuccessorForConnect(did)) => {
if next != ctx.addr {
events.push(MessageHandlerEvent::SendDirectMessage(
Message::FindSuccessorSend(FindSuccessorSend {
did,
strict: false,
then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect),
}),
next,
));
}
}
_ => unreachable!(),
}
Ok(events)
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<LeaveDHT> for MessageHandler {
async fn handle(
&self,
_ctx: &MessagePayload<Message>,
msg: &LeaveDHT,
) -> Result<Vec<MessageHandlerEvent>> {
Ok(vec![MessageHandlerEvent::Disconnect(msg.did)])
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<JoinDHT> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &JoinDHT,
) -> Result<Vec<MessageHandlerEvent>> {
let act = self.dht.join(msg.did)?;
handle_join_dht(act, ctx).await
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<ConnectNodeSend> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &ConnectNodeSend,
) -> Result<Vec<MessageHandlerEvent>> {
if self.dht.did != ctx.relay.destination {
Ok(vec![MessageHandlerEvent::ForwardPayload(None)])
} else {
Ok(vec![MessageHandlerEvent::AnswerOffer(msg.clone())])
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<ConnectNodeReport> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &ConnectNodeReport,
) -> Result<Vec<MessageHandlerEvent>> {
if self.dht.did != ctx.relay.destination {
Ok(vec![MessageHandlerEvent::ForwardPayload(None)])
} else {
Ok(vec![MessageHandlerEvent::AcceptAnswer(msg.clone())])
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<FindSuccessorSend> for MessageHandler {
async fn handle(
&self,
_ctx: &MessagePayload<Message>,
msg: &FindSuccessorSend,
) -> Result<Vec<MessageHandlerEvent>> {
match self.dht.find_successor(msg.did)? {
PeerRingAction::Some(did) => {
if !msg.strict || self.dht.did == msg.did {
match &msg.then {
FindSuccessorThen::Report(handler) => {
Ok(vec![MessageHandlerEvent::SendReportMessage(
Message::FindSuccessorReport(FindSuccessorReport {
did,
handler: handler.clone(),
}),
)])
}
}
} else {
Ok(vec![MessageHandlerEvent::ForwardPayload(Some(did))])
}
}
PeerRingAction::RemoteAction(next, _) => {
Ok(vec![MessageHandlerEvent::ResetDestination(next)])
}
act => Err(Error::PeerRingUnexpectedAction(act)),
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<FindSuccessorReport> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &FindSuccessorReport,
) -> Result<Vec<MessageHandlerEvent>> {
if self.dht.did != ctx.relay.destination {
return Ok(vec![MessageHandlerEvent::ForwardPayload(None)]);
}
match &msg.handler {
FindSuccessorReportHandler::FixFingerTable => {
Ok(vec![MessageHandlerEvent::JoinDHT(msg.did)])
}
FindSuccessorReportHandler::Connect => Ok(vec![MessageHandlerEvent::Connect(msg.did)]),
FindSuccessorReportHandler::SyncStorage => {
self.dht.successors().update(msg.did)?;
if let Ok(PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::SyncVNodeWithSuccessor(data),
)) = self.dht.sync_vnode_with_successor(msg.did).await
{
Ok(vec![MessageHandlerEvent::SendMessage(
Message::SyncVNodeWithSuccessor(SyncVNodeWithSuccessor { data }),
next,
)])
} else {
Ok(vec![])
}
}
_ => Ok(vec![]),
}
}
}
#[cfg(not(feature = "wasm"))]
#[cfg(test)]
pub mod tests {
use std::matches;
use std::sync::Arc;
use tokio::time::sleep;
use tokio::time::Duration;
use super::*;
use crate::dht::successor::SuccessorReader;
use crate::dht::Did;
use crate::ecc::tests::gen_ordered_keys;
use crate::ecc::SecretKey;
use crate::message::handlers::tests::assert_no_more_msg;
use crate::swarm::Swarm;
use crate::tests::default::prepare_node;
use crate::tests::manually_establish_connection;
use crate::transports::manager::TransportManager;
use crate::types::ice_transport::IceTransportInterface;
#[tokio::test]
async fn test_triple_nodes_connection_1_2_3() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_ordered_nodes_connection(key1, key2, key3).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_2_3_1() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_ordered_nodes_connection(key2, key3, key1).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_3_1_2() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_ordered_nodes_connection(key3, key1, key2).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_3_2_1() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_desc_ordered_nodes_connection(key3, key2, key1).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_2_1_3() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_desc_ordered_nodes_connection(key2, key1, key3).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_1_3_2() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_desc_ordered_nodes_connection(key1, key3, key2).await?;
Ok(())
}
async fn test_triple_ordered_nodes_connection(
key1: SecretKey,
key2: SecretKey,
key3: SecretKey,
) -> Result<(Arc<Swarm>, Arc<Swarm>, Arc<Swarm>)> {
let (node1, _path1) = prepare_node(key1).await;
let (node2, _path2) = prepare_node(key2).await;
let (node3, _path3) = prepare_node(key3).await;
println!("========================================");
println!("|| now we connect node1 and node2 ||");
println!("========================================");
test_only_two_nodes_establish_connection(&node1, &node2).await?;
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![node1.did()]);
assert_eq!(node3.dht().successors().list()?, vec![]);
println!("========================================");
println!("|| now we start join node3 to node2 ||");
println!("========================================");
manually_establish_connection(&node3, &node2).await?;
test_listen_join_and_init_find_succeesor(&node3, &node2).await?;
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node3.did(),
node1.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node2.did());
assert_eq!(ev_3.relay.path, vec![node2.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did()
));
assert!(!node3.dht().successors().list()?.contains(&node3.did()));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node3.did());
assert_eq!(ev_2.relay.path, vec![node3.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
assert!(!node2.dht().successors().list()?.contains(&node2.did()));
println!("=== Check state before connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did()]);
assert_transports(node2.clone(), vec![node1.did(), node3.did()]);
assert_transports(node3.clone(), vec![node2.did()]);
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node3.did(),
node1.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
println!("=============================================");
println!("|| now we connect node1 to node3 via DHT ||");
println!("=============================================");
test_connect_via_dht_and_init_find_succeesor(&node1, &node2, &node3).await?;
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node1.did());
assert_eq!(ev_2.relay.path, vec![node3.did(), node1.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node3.did()
));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node3.did());
assert_eq!(ev_1.relay.path, vec![node3.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did()
));
assert!(!node1.dht().successors().list()?.contains(&node1.did()));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node2.did());
assert_eq!(ev_1.relay.path, vec![node2.did()]);
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node1.did());
assert_eq!(ev_3.relay.path, vec![node2.did(), node1.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did()
));
assert!(!node3.dht().successors().list()?.contains(&node3.did()));
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state after connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did(), node3.did()]);
assert_transports(node2.clone(), vec![node1.did(), node3.did()]);
assert_transports(node3.clone(), vec![node1.did(), node2.did()]);
assert_eq!(node1.dht().successors().list()?, vec![
node2.did(),
node3.did()
]);
assert_eq!(node2.dht().successors().list()?, vec![
node3.did(),
node1.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![
node1.did(),
node2.did()
]);
tokio::fs::remove_dir_all("./tmp").await.ok();
Ok((node1.clone(), node2.clone(), node3.clone()))
}
async fn test_triple_desc_ordered_nodes_connection(
key1: SecretKey,
key2: SecretKey,
key3: SecretKey,
) -> Result<(Arc<Swarm>, Arc<Swarm>, Arc<Swarm>)> {
let (node1, _path1) = prepare_node(key1).await;
let (node2, _path2) = prepare_node(key2).await;
let (node3, _path3) = prepare_node(key3).await;
println!("========================================");
println!("|| now we connect node1 and node2 ||");
println!("========================================");
test_only_two_nodes_establish_connection(&node1, &node2).await?;
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![node1.did()]);
assert_eq!(node3.dht().successors().list()?, vec![]);
println!("========================================");
println!("|| now we start join node3 to node2 ||");
println!("========================================");
manually_establish_connection(&node3, &node2).await?;
test_listen_join_and_init_find_succeesor(&node3, &node2).await?;
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node1.did(),
node3.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node2.did());
assert_eq!(ev_1.relay.path, vec![node3.did(), node2.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node3.did()
));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node3.did());
assert_eq!(ev_2.relay.path, vec![node3.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
assert!(!node2.dht().successors().list()?.contains(&node2.did()));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node1.did());
assert_eq!(ev_2.relay.path, vec![node1.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node2.did());
assert_eq!(ev_3.relay.path, vec![node1.did(), node2.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
println!("=== Check state before connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did()]);
assert_transports(node2.clone(), vec![node1.did(), node3.did()]);
assert_transports(node3.clone(), vec![node2.did()]);
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node1.did(),
node3.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
println!("=============================================");
println!("|| now we connect node1 to node3 via DHT ||");
println!("=============================================");
test_connect_via_dht_and_init_find_succeesor(&node1, &node2, &node3).await?;
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node3.did());
assert_eq!(ev_2.relay.path, vec![node1.did(), node3.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node1.did()
));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node1.did());
assert_eq!(ev_3.relay.path, vec![node1.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did()
));
assert!(!node3.dht.successors().list()?.contains(&node3.did()));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node2.did());
assert_eq!(ev_3.relay.path, vec![node2.did()]);
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node3.did());
assert_eq!(ev_1.relay.path, vec![node2.did(), node3.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did()
));
assert!(!node1.dht.successors().list()?.contains(&node1.did()));
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state after connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did(), node3.did()]);
assert_transports(node2.clone(), vec![node1.did(), node3.did()]);
assert_transports(node3.clone(), vec![node1.did(), node2.did()]);
assert_eq!(node1.dht().successors().list()?, vec![
node3.did(),
node2.did()
]);
assert_eq!(node2.dht().successors().list()?, vec![
node1.did(),
node3.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![
node2.did(),
node1.did()
]);
tokio::fs::remove_dir_all("./tmp").await.ok();
Ok((node1.clone(), node2.clone(), node3.clone()))
}
pub async fn test_listen_join_and_init_find_succeesor(
node1: &Swarm,
node2: &Swarm,
) -> Result<()> {
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node1.did());
assert_eq!(ev_1.relay.path, vec![node1.did()]);
assert!(matches!(ev_1.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node2.did()));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node2.did());
assert_eq!(ev_2.relay.path, vec![node2.did()]);
assert!(matches!(ev_2.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node1.did()));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node2.did());
assert_eq!(ev_1.relay.path, vec![node2.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorSend(FindSuccessorSend{did, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect), strict: false}) if did == node2.did()
));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node1.did());
assert_eq!(ev_2.relay.path, vec![node1.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorSend(FindSuccessorSend{did, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect), strict: false}) if did == node1.did()
));
Ok(())
}
pub async fn test_only_two_nodes_establish_connection(
node1: &Swarm,
node2: &Swarm,
) -> Result<()> {
manually_establish_connection(node1, node2).await?;
test_listen_join_and_init_find_succeesor(node1, node2).await?;
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node2.did());
assert_eq!(ev_1.relay.path, vec![node2.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did()
));
assert!(!node1.dht().successors().list()?.contains(&node1.did()));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node1.did());
assert_eq!(ev_2.relay.path, vec![node1.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
assert!(!node2.dht().successors().list()?.contains(&node2.did()));
Ok(())
}
async fn test_connect_via_dht_and_init_find_succeesor(
node1: &Swarm,
node2: &Swarm,
node3: &Swarm,
) -> Result<()> {
assert!(node1.get_transport(node3.did()).is_none());
assert_eq!(node1.dht.successors().max()?, node2.did());
node1.connect(node3.did()).await.unwrap();
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node1.did());
assert_eq!(ev2.relay.path, vec![node1.did()]);
assert!(matches!(ev2.data, Message::ConnectNodeSend(_)));
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node2.did());
assert_eq!(ev3.relay.path, vec![node1.did(), node2.did()]);
assert!(matches!(ev3.data, Message::ConnectNodeSend(_)));
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node3.did());
assert_eq!(ev2.relay.path, vec![node3.did()]);
assert!(matches!(ev2.data, Message::ConnectNodeReport(_)));
let ev1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev1.addr, node2.did());
assert_eq!(ev1.relay.path, vec![node3.did(), node2.did()]);
assert!(matches!(ev1.data, Message::ConnectNodeReport(_)));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node1.did());
assert_eq!(ev_1.relay.path, vec![node1.did()]);
assert!(matches!(ev_1.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node3.did()));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node3.did());
assert_eq!(ev_3.relay.path, vec![node3.did()]);
assert!(matches!(ev_3.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node1.did()));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node3.did());
assert_eq!(ev_1.relay.path, vec![node3.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node3.did()
));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node1.did());
assert_eq!(ev_3.relay.path, vec![node1.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node1.did()
));
Ok(())
}
fn assert_transports(swarm: Arc<Swarm>, addresses: Vec<Did>) {
println!(
"Check transport of {:?}: {:?} for addresses {:?}",
swarm.did(),
swarm.get_dids(),
addresses
);
assert_eq!(swarm.get_transports().len(), addresses.len());
for addr in addresses {
assert!(swarm.get_transport(addr).is_some());
}
}
#[tokio::test]
async fn test_quadra_desc_node_connection() -> Result<()> {
let keys = gen_ordered_keys(4);
let (key1, key2, key3, key4) = (keys[0], keys[1], keys[2], keys[3]);
let (node1, node2, node3) = test_triple_ordered_nodes_connection(key1, key2, key3).await?;
let (node4, _path4) = prepare_node(key4).await;
manually_establish_connection(&node4, &node2).await?;
test_listen_join_and_init_find_succeesor(&node4, &node2).await?;
let _ = node2.listen_once().await.unwrap();
let _ = node3.listen_once().await.unwrap();
let _ = node2.listen_once().await.unwrap();
let _ = node4.listen_once().await.unwrap();
let _ = node2.listen_once().await.unwrap();
println!("==================================================");
println!("| test connect node 4 from node 1 via node 2 |");
println!("==================================================");
println!(
"node1.did(): {:?}, node2.did(): {:?}, node3.did(): {:?}, node4.did(): {:?}",
node1.did(),
node2.did(),
node3.did(),
node4.did(),
);
println!("==================================================");
node4.connect(node1.did()).await?;
assert!(matches!(
node2.listen_once().await.unwrap().0.data,
Message::ConnectNodeSend(_)
));
assert!(matches!(
node1.listen_once().await.unwrap().0.data,
Message::ConnectNodeSend(_)
));
assert!(matches!(
node2.listen_once().await.unwrap().0.data,
Message::ConnectNodeReport(_)
));
assert!(matches!(
node4.listen_once().await.unwrap().0.data,
Message::ConnectNodeReport(_)
));
println!("=================Finish handshake here=================");
Ok(())
}
#[tokio::test]
async fn test_finger_when_disconnect() -> Result<()> {
let key1 = SecretKey::random();
let key2 = SecretKey::random();
let key3 = SecretKey::random();
let (node1, _path1) = prepare_node(key1).await;
let (node2, _path2) = prepare_node(key2).await;
let (node3, _path3) = prepare_node(key3).await;
{
assert!(node1.dht().lock_finger()?.is_empty());
assert!(node1.dht().lock_finger()?.is_empty());
}
test_only_two_nodes_establish_connection(&node1, &node2).await?;
assert_no_more_msg(&node1, &node2, &node3).await;
assert_transports(node1.clone(), vec![node2.did()]);
assert_transports(node2.clone(), vec![node1.did()]);
{
let finger1 = node1.dht().lock_finger()?.clone().clone_finger();
let finger2 = node2.dht().lock_finger()?.clone().clone_finger();
assert!(finger1.into_iter().any(|x| x == Some(node2.did())));
assert!(finger2.into_iter().any(|x| x == Some(node1.did())));
}
println!("===================================");
println!("| test disconnect node1 and node2 |");
println!("===================================");
node1.disconnect(node2.did()).await?;
let ev1 = node1.listen_once().await;
assert!(ev1.is_none());
#[cfg(not(feature = "wasm"))]
node2
.get_transport(node1.did())
.unwrap()
.close()
.await
.unwrap();
for _ in 1..10 {
println!("wait 3 seconds for node2's transport 2to1 closing");
sleep(Duration::from_secs(3)).await;
if let Some(t) = node2.get_transport(node1.did()) {
if t.is_disconnected().await {
println!("transport 2to1 is disconnected!!!!");
break;
}
} else {
println!("transport 2to1 is disappeared!!!!");
break;
}
}
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node2.did());
assert!(matches!(ev2.data, Message::LeaveDHT(LeaveDHT{did}) if did == node1.did()));
assert_no_more_msg(&node1, &node2, &node3).await;
assert_transports(node1.clone(), vec![]);
assert_transports(node2.clone(), vec![]);
{
let finger1 = node1.dht().lock_finger()?.clone().clone_finger();
let finger2 = node2.dht().lock_finger()?.clone().clone_finger();
assert!(finger1.into_iter().all(|x| x.is_none()));
assert!(finger2.into_iter().all(|x| x.is_none()));
}
Ok(())
}
#[tokio::test]
async fn test_already_connect_fixture() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
let (node1, _path1) = prepare_node(key1).await;
let (node2, _path2) = prepare_node(key2).await;
let (node3, _path3) = prepare_node(key3).await;
test_only_two_nodes_establish_connection(&node1, &node2).await?;
assert_no_more_msg(&node1, &node2, &node3).await;
test_only_two_nodes_establish_connection(&node3, &node2).await?;
assert_no_more_msg(&node1, &node2, &node3).await;
println!("node1 connect node2 twice here");
let _ = node1.connect(node3.did()).await.unwrap();
let t_1_3_b = node1.connect(node3.did()).await.unwrap();
let _ = node2.listen_once().await.unwrap();
let _ = node2.listen_once().await.unwrap();
let _ = node3.listen_once().await.unwrap();
let _ = node3.listen_once().await.unwrap();
let _ = node2.listen_once().await.unwrap();
let _ = node2.listen_once().await.unwrap();
let _ = node1.listen_once().await.unwrap();
let _ = node1.listen_once().await.unwrap();
println!("wait for handshake here");
sleep(Duration::from_secs(3)).await;
let ev3 = node3.listen_once().await.unwrap().0;
assert!(matches!(ev3.data, Message::JoinDHT(_)));
let _ = node3.listen_once().await.is_none();
let ev1 = node1.listen_once().await.unwrap().0;
assert!(matches!(ev1.data, Message::JoinDHT(_)));
let _ = node1.listen_once().await.is_none();
let t1_3 = node1.get_transport(node3.did()).unwrap();
println!("transport is replace by second");
assert_eq!(t1_3.id, t_1_3_b.id);
let t3_1 = node3.get_transport(node1.did()).unwrap();
assert!(t1_3.is_connected().await);
assert!(t3_1.is_connected().await);
Ok(())
}
}