use std::ops::Deref;
use async_trait::async_trait;
use super::dht;
use crate::dht::types::CorrectChord;
use crate::dht::Chord;
use crate::dht::PeerRingAction;
use crate::dht::TopoInfo;
use crate::error::Error;
use crate::error::Result;
use crate::message::types::ConnectNodeReport;
use crate::message::types::ConnectNodeSend;
use crate::message::types::FindSuccessorReport;
use crate::message::types::FindSuccessorSend;
use crate::message::types::JoinDHT;
use crate::message::types::Message;
use crate::message::types::QueryForTopoInfoReport;
use crate::message::types::QueryForTopoInfoSend;
use crate::message::types::Then;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorThen;
use crate::message::HandleMsg;
use crate::message::LeaveDHT;
use crate::message::MessageHandler;
use crate::message::MessageHandlerEvent;
use crate::message::MessagePayload;
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<QueryForTopoInfoSend> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &QueryForTopoInfoSend,
) -> Result<Vec<MessageHandlerEvent>> {
let info: TopoInfo = TopoInfo::try_from(self.dht.deref())?;
if msg.did == self.dht.did {
Ok(vec![MessageHandlerEvent::SendReportMessage(
ctx.clone(),
Message::QueryForTopoInfoReport(msg.resp(info)),
)])
} else {
Ok(vec![])
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<QueryForTopoInfoReport> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &QueryForTopoInfoReport,
) -> Result<Vec<MessageHandlerEvent>> {
match msg.then {
<QueryForTopoInfoReport as Then>::Then::SyncSuccessor => Ok(msg
.info
.successors
.iter()
.map(|did| MessageHandlerEvent::JoinDHT(ctx.clone(), *did))
.collect()),
<QueryForTopoInfoReport as Then>::Then::Stabilization => {
let ev = self.dht.stabilize(msg.info.clone())?;
dht::handle_dht_events(&ev, ctx).await
}
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<LeaveDHT> for MessageHandler {
async fn handle(
&self,
_ctx: &MessagePayload<Message>,
msg: &LeaveDHT,
) -> Result<Vec<MessageHandlerEvent>> {
Ok(vec![MessageHandlerEvent::Disconnect(msg.did)])
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<JoinDHT> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &JoinDHT,
) -> Result<Vec<MessageHandlerEvent>> {
Ok(vec![MessageHandlerEvent::JoinDHT(ctx.clone(), msg.did)])
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<ConnectNodeSend> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &ConnectNodeSend,
) -> Result<Vec<MessageHandlerEvent>> {
if self.dht.did != ctx.relay.destination {
Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)])
} else {
Ok(vec![MessageHandlerEvent::AnswerOffer(
ctx.clone(),
msg.clone(),
)])
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<ConnectNodeReport> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &ConnectNodeReport,
) -> Result<Vec<MessageHandlerEvent>> {
if self.dht.did != ctx.relay.destination {
Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)])
} else {
Ok(vec![MessageHandlerEvent::AcceptAnswer(
ctx.relay.origin_sender(),
msg.clone(),
)])
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<FindSuccessorSend> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &FindSuccessorSend,
) -> Result<Vec<MessageHandlerEvent>> {
match self.dht.find_successor(msg.did)? {
PeerRingAction::Some(did) => {
if !msg.strict || self.dht.did == msg.did {
match &msg.then {
FindSuccessorThen::Report(handler) => {
Ok(vec![MessageHandlerEvent::SendReportMessage(
ctx.clone(),
Message::FindSuccessorReport(FindSuccessorReport {
did,
handler: handler.clone(),
}),
)])
}
}
} else {
Ok(vec![MessageHandlerEvent::ForwardPayload(
ctx.clone(),
Some(did),
)])
}
}
PeerRingAction::RemoteAction(next, _) => {
Ok(vec![MessageHandlerEvent::ResetDestination(
ctx.clone(),
next,
)])
}
act => Err(Error::PeerRingUnexpectedAction(act)),
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<FindSuccessorReport> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &FindSuccessorReport,
) -> Result<Vec<MessageHandlerEvent>> {
if self.dht.did != ctx.relay.destination {
return Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)]);
}
match &msg.handler {
FindSuccessorReportHandler::FixFingerTable => {
Ok(vec![MessageHandlerEvent::Connect(msg.did)])
}
FindSuccessorReportHandler::Connect => Ok(vec![MessageHandlerEvent::Connect(msg.did)]),
_ => Ok(vec![]),
}
}
}
#[cfg(not(feature = "wasm"))]
#[cfg(test)]
pub mod tests {
use std::matches;
use std::sync::Arc;
use rings_transport::core::transport::ConnectionInterface;
use tokio::time::sleep;
use tokio::time::Duration;
use super::*;
use crate::dht::successor::SuccessorReader;
use crate::dht::Did;
use crate::ecc::tests::gen_ordered_keys;
use crate::ecc::SecretKey;
use crate::message::handlers::tests::assert_no_more_msg;
use crate::swarm::Swarm;
use crate::tests::default::prepare_node;
use crate::tests::manually_establish_connection;
#[tokio::test]
async fn test_triple_nodes_connection_1_2_3() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_ordered_nodes_connection(key1, key2, key3).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_2_3_1() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_ordered_nodes_connection(key2, key3, key1).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_3_1_2() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_ordered_nodes_connection(key3, key1, key2).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_3_2_1() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_desc_ordered_nodes_connection(key3, key2, key1).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_2_1_3() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_desc_ordered_nodes_connection(key2, key1, key3).await?;
Ok(())
}
#[tokio::test]
async fn test_triple_nodes_connection_1_3_2() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_desc_ordered_nodes_connection(key1, key3, key2).await?;
Ok(())
}
async fn test_triple_ordered_nodes_connection(
key1: SecretKey,
key2: SecretKey,
key3: SecretKey,
) -> Result<(Arc<Swarm>, Arc<Swarm>, Arc<Swarm>)> {
let (node1, _path1) = prepare_node(key1).await;
let (node2, _path2) = prepare_node(key2).await;
let (node3, _path3) = prepare_node(key3).await;
println!("========================================");
println!("|| now we connect node1 and node2 ||");
println!("========================================");
test_only_two_nodes_establish_connection(&node1, &node2).await?;
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![node1.did()]);
assert_eq!(node3.dht().successors().list()?, vec![]);
println!("========================================");
println!("|| now we start join node3 to node2 ||");
println!("========================================");
manually_establish_connection(&node3, &node2).await;
test_listen_join_and_init_find_succeesor(&node3, &node2).await?;
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node3.did(),
node1.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node2.did());
assert_eq!(ev_3.relay.path, vec![node2.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did()
));
assert!(!node3.dht().successors().list()?.contains(&node3.did()));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node3.did());
assert_eq!(ev_2.relay.path, vec![node3.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
assert!(!node2.dht().successors().list()?.contains(&node2.did()));
println!("=== Check state before connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did()]);
assert_transports(node2.clone(), vec![node1.did(), node3.did()]);
assert_transports(node3.clone(), vec![node2.did()]);
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node3.did(),
node1.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
println!("=============================================");
println!("|| now we connect node1 to node3 via DHT ||");
println!("=============================================");
test_connect_via_dht_and_init_find_succeesor(&node1, &node2, &node3).await?;
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node1.did());
assert_eq!(ev_2.relay.path, vec![node3.did(), node1.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node3.did()
));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node3.did());
assert_eq!(ev_1.relay.path, vec![node3.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did()
));
assert!(!node1.dht().successors().list()?.contains(&node1.did()));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node2.did());
assert_eq!(ev_1.relay.path, vec![node2.did()]);
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node1.did());
assert_eq!(ev_3.relay.path, vec![node2.did(), node1.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did()
));
assert!(!node3.dht().successors().list()?.contains(&node3.did()));
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state after connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did(), node3.did()]);
assert_transports(node2.clone(), vec![node1.did(), node3.did()]);
assert_transports(node3.clone(), vec![node1.did(), node2.did()]);
assert_eq!(node1.dht().successors().list()?, vec![
node2.did(),
node3.did()
]);
assert_eq!(node2.dht().successors().list()?, vec![
node3.did(),
node1.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![
node1.did(),
node2.did()
]);
tokio::fs::remove_dir_all("./tmp").await.ok();
Ok((node1.clone(), node2.clone(), node3.clone()))
}
async fn test_triple_desc_ordered_nodes_connection(
key1: SecretKey,
key2: SecretKey,
key3: SecretKey,
) -> Result<(Arc<Swarm>, Arc<Swarm>, Arc<Swarm>)> {
let (node1, _path1) = prepare_node(key1).await;
let (node2, _path2) = prepare_node(key2).await;
let (node3, _path3) = prepare_node(key3).await;
println!("========================================");
println!("|| now we connect node1 and node2 ||");
println!("========================================");
test_only_two_nodes_establish_connection(&node1, &node2).await?;
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![node1.did()]);
assert_eq!(node3.dht().successors().list()?, vec![]);
println!("========================================");
println!("|| now we start join node3 to node2 ||");
println!("========================================");
manually_establish_connection(&node3, &node2).await;
test_listen_join_and_init_find_succeesor(&node3, &node2).await?;
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node1.did(),
node3.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node2.did());
assert_eq!(ev_1.relay.path, vec![node3.did(), node2.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node3.did()
));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node3.did());
assert_eq!(ev_2.relay.path, vec![node3.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
assert!(!node2.dht().successors().list()?.contains(&node2.did()));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node1.did());
assert_eq!(ev_2.relay.path, vec![node1.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node2.did());
assert_eq!(ev_3.relay.path, vec![node1.did(), node2.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
println!("=== Check state before connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did()]);
assert_transports(node2.clone(), vec![node1.did(), node3.did()]);
assert_transports(node3.clone(), vec![node2.did()]);
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node1.did(),
node3.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
println!("=============================================");
println!("|| now we connect node1 to node3 via DHT ||");
println!("=============================================");
test_connect_via_dht_and_init_find_succeesor(&node1, &node2, &node3).await?;
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node3.did());
assert_eq!(ev_2.relay.path, vec![node1.did(), node3.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node1.did()
));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node1.did());
assert_eq!(ev_3.relay.path, vec![node1.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did()
));
assert!(!node3.dht.successors().list()?.contains(&node3.did()));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node2.did());
assert_eq!(ev_3.relay.path, vec![node2.did()]);
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node3.did());
assert_eq!(ev_1.relay.path, vec![node2.did(), node3.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did()
));
assert!(!node1.dht.successors().list()?.contains(&node1.did()));
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state after connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did(), node3.did()]);
assert_transports(node2.clone(), vec![node1.did(), node3.did()]);
assert_transports(node3.clone(), vec![node1.did(), node2.did()]);
assert_eq!(node1.dht().successors().list()?, vec![
node3.did(),
node2.did()
]);
assert_eq!(node2.dht().successors().list()?, vec![
node1.did(),
node3.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![
node2.did(),
node1.did()
]);
tokio::fs::remove_dir_all("./tmp").await.ok();
Ok((node1.clone(), node2.clone(), node3.clone()))
}
pub async fn test_listen_join_and_init_find_succeesor(
node1: &Swarm,
node2: &Swarm,
) -> Result<()> {
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node1.did());
assert_eq!(ev_1.relay.path, vec![node1.did()]);
assert!(matches!(ev_1.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node2.did()));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node2.did());
assert_eq!(ev_2.relay.path, vec![node2.did()]);
assert!(matches!(ev_2.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node1.did()));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node2.did());
assert_eq!(ev_1.relay.path, vec![node2.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorSend(FindSuccessorSend{did, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect), strict: false}) if did == node2.did()
));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node1.did());
assert_eq!(ev_2.relay.path, vec![node1.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorSend(FindSuccessorSend{did, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect), strict: false}) if did == node1.did()
));
Ok(())
}
pub async fn test_only_two_nodes_establish_connection(
node1: &Swarm,
node2: &Swarm,
) -> Result<()> {
manually_establish_connection(node1, node2).await;
test_listen_join_and_init_find_succeesor(node1, node2).await?;
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node2.did());
assert_eq!(ev_1.relay.path, vec![node2.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did()
));
assert!(!node1.dht().successors().list()?.contains(&node1.did()));
let ev_2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev_2.addr, node1.did());
assert_eq!(ev_2.relay.path, vec![node1.did()]);
assert!(matches!(
ev_2.data,
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did()
));
assert!(!node2.dht().successors().list()?.contains(&node2.did()));
Ok(())
}
async fn test_connect_via_dht_and_init_find_succeesor(
node1: &Swarm,
node2: &Swarm,
node3: &Swarm,
) -> Result<()> {
assert!(node1.get_connection(node3.did()).is_none());
assert_eq!(node1.dht.successors().max()?, node2.did());
node1.connect(node3.did()).await.unwrap();
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node1.did());
assert_eq!(ev2.relay.path, vec![node1.did()]);
assert!(matches!(ev2.data, Message::ConnectNodeSend(_)));
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node2.did());
assert_eq!(ev3.relay.path, vec![node1.did(), node2.did()]);
assert!(matches!(ev3.data, Message::ConnectNodeSend(_)));
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node3.did());
assert_eq!(ev2.relay.path, vec![node3.did()]);
assert!(matches!(ev2.data, Message::ConnectNodeReport(_)));
let ev1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev1.addr, node2.did());
assert_eq!(ev1.relay.path, vec![node3.did(), node2.did()]);
assert!(matches!(ev1.data, Message::ConnectNodeReport(_)));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node1.did());
assert_eq!(ev_1.relay.path, vec![node1.did()]);
assert!(matches!(ev_1.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node3.did()));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node3.did());
assert_eq!(ev_3.relay.path, vec![node3.did()]);
assert!(matches!(ev_3.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node1.did()));
let ev_1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev_1.addr, node3.did());
assert_eq!(ev_1.relay.path, vec![node3.did()]);
assert!(matches!(
ev_1.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node3.did()
));
let ev_3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev_3.addr, node1.did());
assert_eq!(ev_3.relay.path, vec![node1.did()]);
assert!(matches!(
ev_3.data,
Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node1.did()
));
Ok(())
}
fn assert_transports(swarm: Arc<Swarm>, addresses: Vec<Did>) {
println!(
"Check transport of {:?}: {:?} for addresses {:?}",
swarm.did(),
swarm.get_connection_ids(),
addresses
);
assert_eq!(swarm.get_connections().len(), addresses.len());
for addr in addresses {
assert!(swarm.get_connection(addr).is_some());
}
}
#[tokio::test]
async fn test_fourth_node_connection() -> Result<()> {
let keys = gen_ordered_keys(4);
let (key1, key2, key3, key4) = (keys[0], keys[1], keys[2], keys[3]);
let (node1, node2, node3) = test_triple_ordered_nodes_connection(key1, key2, key3).await?;
let (node4, _path4) = prepare_node(key4).await;
tokio::select! {
_ = async {
futures::join!(
async { node1.clone().listen().await },
async { node2.clone().listen().await },
async { node3.clone().listen().await },
async { node4.clone().listen().await },
)
} => {unreachable!();}
_ = async {
manually_establish_connection(&node4, &node2).await;
tokio::time::sleep(Duration::from_secs(3)).await;
println!("=== Check state before connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did(), node3.did(), node4.did()]);
assert_transports(node2.clone(), vec![node3.did(), node4.did(), node1.did()]);
assert_transports(node3.clone(), vec![node1.did(), node2.did()]);
assert_transports(node4.clone(), vec![node1.did(), node2.did()]);
assert_eq!(node1.dht().successors().list().unwrap(), vec![
node2.did(),
node3.did(),
node4.did(),
]);
assert_eq!(node2.dht().successors().list().unwrap(), vec![
node3.did(),
node4.did(),
node1.did(),
]);
assert_eq!(node3.dht().successors().list().unwrap(), vec![
node1.did(),
node2.did(),
]);
assert_eq!(node4.dht().successors().list().unwrap(), vec![
node1.did(),
node2.did(),
]);
println!("========================================");
println!("| test node4 connect node3 via dht |");
println!("========================================");
println!(
"node1.did(): {:?}, node2.did(): {:?}, node3.did(): {:?}, node4.did(): {:?}",
node1.did(),
node2.did(),
node3.did(),
node4.did(),
);
println!("==================================================");
node4.connect(node3.did()).await.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
println!("=== Check state after connect via DHT ===");
assert_transports(node1.clone(), vec![node2.did(), node3.did(), node4.did()]);
assert_transports(node2.clone(), vec![node3.did(), node4.did(), node1.did()]);
assert_transports(node3.clone(), vec![node4.did(), node1.did(), node2.did()]);
assert_transports(node4.clone(), vec![node1.did(), node2.did(), node3.did()]);
assert_eq!(node1.dht().successors().list().unwrap(), vec![
node2.did(),
node3.did(),
node4.did()
]);
assert_eq!(node2.dht().successors().list().unwrap(), vec![
node3.did(),
node4.did(),
node1.did(),
]);
assert_eq!(node3.dht().successors().list().unwrap(), vec![
node4.did(),
node1.did(),
node2.did(),
]);
assert_eq!(node4.dht().successors().list().unwrap(), vec![
node1.did(),
node2.did(),
node3.did(),
]);
} => {}
}
Ok(())
}
#[tokio::test]
async fn test_finger_when_disconnect() -> Result<()> {
let key1 = SecretKey::random();
let key2 = SecretKey::random();
let key3 = SecretKey::random();
let (node1, _path1) = prepare_node(key1).await;
let (node2, _path2) = prepare_node(key2).await;
let (node3, _path3) = prepare_node(key3).await;
{
assert!(node1.dht().lock_finger()?.is_empty());
assert!(node1.dht().lock_finger()?.is_empty());
}
test_only_two_nodes_establish_connection(&node1, &node2).await?;
assert_no_more_msg(&node1, &node2, &node3).await;
assert_transports(node1.clone(), vec![node2.did()]);
assert_transports(node2.clone(), vec![node1.did()]);
{
let finger1 = node1.dht().lock_finger()?.clone().clone_finger();
let finger2 = node2.dht().lock_finger()?.clone().clone_finger();
assert!(finger1.into_iter().any(|x| x == Some(node2.did())));
assert!(finger2.into_iter().any(|x| x == Some(node1.did())));
}
println!("===================================");
println!("| test disconnect node1 and node2 |");
println!("===================================");
node1.disconnect(node2.did()).await?;
let ev1 = node1.listen_once().await.unwrap().0;
assert!(matches!(ev1.data, Message::LeaveDHT(LeaveDHT{did}) if did == node2.did()));
#[cfg(not(feature = "wasm"))]
node2.disconnect(node1.did()).await.unwrap();
for _ in 1..10 {
println!("wait 3 seconds for node2's transport 2to1 closing");
sleep(Duration::from_secs(3)).await;
if let Some(t) = node2.get_connection(node1.did()) {
if t.is_disconnected().await {
println!("transport 2to1 is disconnected!!!!");
break;
}
} else {
println!("transport 2to1 is disappeared!!!!");
break;
}
}
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node2.did());
assert!(matches!(ev2.data, Message::LeaveDHT(LeaveDHT{did}) if did == node1.did()));
assert_no_more_msg(&node1, &node2, &node3).await;
assert_transports(node1.clone(), vec![]);
assert_transports(node2.clone(), vec![]);
{
let finger1 = node1.dht().lock_finger()?.clone().clone_finger();
let finger2 = node2.dht().lock_finger()?.clone().clone_finger();
assert!(finger1.into_iter().all(|x| x.is_none()));
assert!(finger2.into_iter().all(|x| x.is_none()));
}
Ok(())
}
}