use async_trait::async_trait;
use crate::dht::Chord;
use crate::dht::ChordStorageSync;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::err::Result;
use crate::message::types::Message;
use crate::message::types::NotifyPredecessorReport;
use crate::message::types::NotifyPredecessorSend;
use crate::message::types::SyncVNodeWithSuccessor;
use crate::message::HandleMsg;
use crate::message::MessageHandler;
use crate::message::MessageHandlerEvent;
use crate::message::MessagePayload;
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<NotifyPredecessorSend> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
msg: &NotifyPredecessorSend,
) -> Result<Vec<MessageHandlerEvent>> {
let predecessor = { *self.dht.lock_predecessor()? };
self.dht.notify(msg.did)?;
if let Some(did) = predecessor {
if did != ctx.relay.sender() {
return Ok(vec![MessageHandlerEvent::SendReportMessage(
Message::NotifyPredecessorReport(NotifyPredecessorReport { did }),
)]);
}
}
Ok(vec![])
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<NotifyPredecessorReport> for MessageHandler {
async fn handle(
&self,
_ctx: &MessagePayload<Message>,
msg: &NotifyPredecessorReport,
) -> Result<Vec<MessageHandlerEvent>> {
let mut events = vec![MessageHandlerEvent::Connect(msg.did)];
if let Ok(PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::SyncVNodeWithSuccessor(data),
)) = self.dht.sync_vnode_with_successor(msg.did).await
{
events.push(MessageHandlerEvent::SendMessage(
Message::SyncVNodeWithSuccessor(SyncVNodeWithSuccessor { data }),
next,
))
}
Ok(events)
}
}
#[cfg(not(feature = "wasm"))]
#[cfg(test)]
mod test {
use std::sync::Arc;
use super::*;
use crate::dht::successor::SuccessorReader;
use crate::dht::Stabilization;
use crate::ecc::tests::gen_ordered_keys;
use crate::ecc::SecretKey;
use crate::message::handlers::connection::tests::test_listen_join_and_init_find_succeesor;
use crate::message::handlers::connection::tests::test_only_two_nodes_establish_connection;
use crate::message::handlers::tests::assert_no_more_msg;
use crate::message::handlers::tests::wait_for_msgs;
use crate::swarm::Swarm;
use crate::tests::default::prepare_node;
use crate::tests::manually_establish_connection;
#[tokio::test]
async fn test_triple_nodes_stabilization_1_2_3() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_ordered_nodes_stabilization(key1, key2, key3).await
}
#[tokio::test]
async fn test_triple_nodes_stabilization_2_3_1() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_ordered_nodes_stabilization(key1, key2, key3).await
}
#[tokio::test]
async fn test_triple_nodes_stabilization_3_1_2() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_ordered_nodes_stabilization(key1, key2, key3).await
}
#[tokio::test]
async fn test_triple_nodes_stabilization_3_2_1() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_desc_ordered_nodes_stabilization(key3, key2, key1).await
}
#[tokio::test]
async fn test_triple_nodes_stabilization_2_1_3() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_desc_ordered_nodes_stabilization(key3, key2, key1).await
}
#[tokio::test]
async fn test_triple_nodes_stabilization_1_3_2() -> Result<()> {
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
test_triple_desc_ordered_nodes_stabilization(key3, key2, key1).await
}
async fn test_triple_ordered_nodes_stabilization(
key1: SecretKey,
key2: SecretKey,
key3: SecretKey,
) -> Result<()> {
let (node1, _path1) = prepare_node(key1).await;
let (node2, _path2) = prepare_node(key2).await;
let (node3, _path3) = prepare_node(key3).await;
println!("========================================");
println!("|| now we connect node1 and node2 ||");
println!("========================================");
test_only_two_nodes_establish_connection(&node1, &node2).await?;
println!("========================================");
println!("|| now we start join node3 to node2 ||");
println!("========================================");
manually_establish_connection(&node3, &node2).await?;
test_listen_join_and_init_find_succeesor(&node3, &node2).await?;
node3.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state before stabilization ===");
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node3.did(),
node1.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
assert!(node1.dht().lock_predecessor()?.is_none());
assert!(node2.dht().lock_predecessor()?.is_none());
assert!(node3.dht().lock_predecessor()?.is_none());
println!("========================================");
println!("|| now we start first stabilization ||");
println!("========================================");
run_stabilize_once(node1.clone()).await?;
run_stabilize_once(node2.clone()).await?;
run_stabilize_once(node3.clone()).await?;
let ev1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev1.addr, node2.did());
assert_eq!(ev1.relay.path, vec![node2.did()]);
assert!(matches!(
ev1.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did()
));
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node1.did());
assert_eq!(ev2.relay.path, vec![node1.did()]);
assert!(matches!(
ev2.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node1.did()
));
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node2.did());
assert_eq!(ev3.relay.path, vec![node2.did()]);
assert!(matches!(
ev3.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did()
));
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node3.did());
assert_eq!(ev2.relay.path, vec![node3.did()]);
assert!(matches!(
ev2.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did()
));
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node2.did());
assert_eq!(ev3.relay.path, vec![node2.did()]);
assert!(matches!(
ev3.data,
Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node1.did()
));
node2.listen_once().await.unwrap();
node1.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
node1.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
node1.listen_once().await.unwrap();
node1.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
node1.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state after first stabilization ===");
assert_eq!(node1.dht().successors().list()?, vec![
node2.did(),
node3.did()
]);
assert_eq!(node2.dht().successors().list()?, vec![
node3.did(),
node1.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![
node1.did(),
node2.did()
]);
assert_eq!(*node1.dht().lock_predecessor()?, Some(node2.did()));
assert_eq!(*node2.dht().lock_predecessor()?, Some(node1.did()));
assert_eq!(*node3.dht().lock_predecessor()?, Some(node2.did()));
println!("=========================================");
println!("|| now we start second stabilization ||");
println!("=========================================");
run_stabilize_once(node1.clone()).await?;
run_stabilize_once(node2.clone()).await?;
run_stabilize_once(node3.clone()).await?;
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node1.did());
assert_eq!(ev3.relay.path, vec![node1.did()]);
assert!(matches!(
ev3.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node1.did()
));
let ev1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev1.addr, node2.did());
assert_eq!(ev1.relay.path, vec![node2.did()]);
assert!(matches!(
ev1.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did()
));
let ev1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev1.addr, node3.did());
assert_eq!(ev1.relay.path, vec![node3.did()]);
assert!(matches!(
ev1.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did()
));
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node1.did());
assert_eq!(ev2.relay.path, vec![node1.did()]);
assert!(matches!(
ev2.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node1.did()
));
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node3.did());
assert_eq!(ev2.relay.path, vec![node3.did()]);
assert!(matches!(
ev2.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did()
));
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node2.did());
assert_eq!(ev3.relay.path, vec![node2.did()]);
assert!(matches!(
ev2.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did()
));
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node1.did());
assert_eq!(ev3.relay.path, vec![node1.did()]);
assert!(matches!(
ev3.data,
Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node2.did()
));
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node2.did());
assert_eq!(ev3.relay.path, vec![node2.did()]);
assert!(matches!(
ev3.data,
Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node1.did()
));
let ev1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev1.addr, node3.did());
assert_eq!(ev1.relay.path, vec![node3.did()]);
assert!(matches!(
ev1.data,
Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node2.did()
));
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state after second stabilization ===");
assert_eq!(node1.dht().successors().list()?, vec![
node2.did(),
node3.did()
]);
assert_eq!(node2.dht().successors().list()?, vec![
node3.did(),
node1.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![
node1.did(),
node2.did()
]);
assert_eq!(*node1.dht().lock_predecessor()?, Some(node3.did()));
assert_eq!(*node2.dht().lock_predecessor()?, Some(node1.did()));
assert_eq!(*node3.dht().lock_predecessor()?, Some(node2.did()));
tokio::fs::remove_dir_all("./tmp").await.ok();
Ok(())
}
async fn test_triple_desc_ordered_nodes_stabilization(
key1: SecretKey,
key2: SecretKey,
key3: SecretKey,
) -> Result<()> {
let (node1, _path1) = prepare_node(key1).await;
let (node2, _path2) = prepare_node(key2).await;
let (node3, _path3) = prepare_node(key3).await;
println!("========================================");
println!("|| now we connect node1 and node2 ||");
println!("========================================");
test_only_two_nodes_establish_connection(&node1, &node2).await?;
println!("========================================");
println!("|| now we start join node3 to node2 ||");
println!("========================================");
manually_establish_connection(&node3, &node2).await?;
test_listen_join_and_init_find_succeesor(&node3, &node2).await?;
node1.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state before stabilization ===");
assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
assert_eq!(node2.dht().successors().list()?, vec![
node1.did(),
node3.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
assert!(node1.dht().lock_predecessor()?.is_none());
assert!(node2.dht().lock_predecessor()?.is_none());
assert!(node3.dht().lock_predecessor()?.is_none());
println!("========================================");
println!("|| now we start first stabilization ||");
println!("========================================");
run_stabilize_once(node1.clone()).await?;
run_stabilize_once(node2.clone()).await?;
run_stabilize_once(node3.clone()).await?;
let ev1 = node1.listen_once().await.unwrap().0;
assert_eq!(ev1.addr, node2.did());
assert_eq!(ev1.relay.path, vec![node2.did()]);
assert!(matches!(
ev1.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did()
));
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node1.did());
assert_eq!(ev2.relay.path, vec![node1.did()]);
assert!(matches!(
ev2.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node1.did()
));
let ev2 = node2.listen_once().await.unwrap().0;
assert_eq!(ev2.addr, node3.did());
assert_eq!(ev2.relay.path, vec![node3.did()]);
assert!(matches!(
ev2.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did()
));
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node2.did());
assert_eq!(ev3.relay.path, vec![node2.did()]);
assert!(matches!(
ev3.data,
Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did()
));
let ev3 = node3.listen_once().await.unwrap().0;
assert_eq!(ev3.addr, node2.did());
assert_eq!(ev3.relay.path, vec![node2.did()]);
assert!(matches!(
ev3.data,
Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node1.did()
));
node2.listen_once().await.unwrap();
node1.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
node1.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
node1.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
node1.listen_once().await.unwrap();
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state after first stabilization ===");
assert_eq!(node1.dht().successors().list()?, vec![
node3.did(),
node2.did()
]);
assert_eq!(node2.dht().successors().list()?, vec![
node1.did(),
node3.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![
node2.did(),
node1.did()
]);
assert_eq!(*node1.dht().lock_predecessor()?, Some(node2.did()));
assert_eq!(*node2.dht().lock_predecessor()?, Some(node3.did()));
assert_eq!(*node3.dht().lock_predecessor()?, Some(node2.did()));
println!("=========================================");
println!("|| now we start second stabilization ||");
println!("=========================================");
run_stabilize_once(node1.clone()).await?;
run_stabilize_once(node2.clone()).await?;
run_stabilize_once(node3.clone()).await?;
wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;
println!("=== Check state after second stabilization ===");
assert_eq!(node1.dht().successors().list()?, vec![
node3.did(),
node2.did()
]);
assert_eq!(node2.dht().successors().list()?, vec![
node1.did(),
node3.did()
]);
assert_eq!(node3.dht().successors().list()?, vec![
node2.did(),
node1.did()
]);
assert_eq!(*node1.dht().lock_predecessor()?, Some(node2.did()));
assert_eq!(*node2.dht().lock_predecessor()?, Some(node3.did()));
assert_eq!(*node3.dht().lock_predecessor()?, Some(node1.did()));
tokio::fs::remove_dir_all("./tmp").await.ok();
Ok(())
}
async fn run_stabilize_once(swarm: Arc<Swarm>) -> Result<()> {
let stab = Stabilization::new(swarm, 5);
stab.notify_predecessor().await
}
}