rings-core 0.7.0

Chord DHT implementation with ICE
Documentation
use async_trait::async_trait;

use crate::dht::Chord;
use crate::dht::ChordStorageSync;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::error::Result;
use crate::message::types::Message;
use crate::message::types::NotifyPredecessorReport;
use crate::message::types::NotifyPredecessorSend;
use crate::message::types::SyncVNodeWithSuccessor;
use crate::message::HandleMsg;
use crate::message::MessageHandler;
use crate::message::MessagePayload;
use crate::message::PayloadSender;

#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<NotifyPredecessorSend> for MessageHandler {
    async fn handle(&self, ctx: &MessagePayload, msg: &NotifyPredecessorSend) -> Result<()> {
        let predecessor = self.dht.notify(msg.did)?;

        if predecessor != ctx.relay.origin_sender() {
            return self
                .transport
                .send_report_message(
                    ctx,
                    Message::NotifyPredecessorReport(NotifyPredecessorReport { did: predecessor }),
                )
                .await;
        }

        Ok(())
    }
}

#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<NotifyPredecessorReport> for MessageHandler {
    async fn handle(&self, _ctx: &MessagePayload, msg: &NotifyPredecessorReport) -> Result<()> {
        self.transport
            .connect(msg.did, self.inner_callback())
            .await?;

        if let Ok(PeerRingAction::RemoteAction(
            next,
            PeerRingRemoteAction::SyncVNodeWithSuccessor(data),
        )) = self.dht.sync_vnode_with_successor(msg.did).await
        {
            self.transport
                .send_message(
                    Message::SyncVNodeWithSuccessor(SyncVNodeWithSuccessor { data }),
                    next,
                )
                .await?;
        }

        Ok(())
    }
}

#[cfg(not(feature = "wasm"))]
#[cfg(test)]
mod test {
    use std::sync::Arc;

    use super::*;
    use crate::dht::successor::SuccessorReader;
    use crate::ecc::tests::gen_ordered_keys;
    use crate::ecc::SecretKey;
    use crate::swarm::Swarm;
    use crate::tests::default::assert_no_more_msg;
    use crate::tests::default::prepare_node;
    use crate::tests::default::wait_for_msgs;
    use crate::tests::manually_establish_connection;

    #[tokio::test]
    async fn test_triple_nodes_stabilization_1_2_3() -> Result<()> {
        let keys = gen_ordered_keys(3);
        let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
        test_triple_ordered_nodes_stabilization(key1, key2, key3).await
    }

    #[tokio::test]
    async fn test_triple_nodes_stabilization_2_3_1() -> Result<()> {
        let keys = gen_ordered_keys(3);
        let (key1, key2, key3) = (keys[0], keys[1], keys[2]);

        test_triple_ordered_nodes_stabilization(key2, key3, key1).await
    }

    #[tokio::test]
    async fn test_triple_nodes_stabilization_3_1_2() -> Result<()> {
        let keys = gen_ordered_keys(3);
        let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
        test_triple_ordered_nodes_stabilization(key3, key1, key2).await
    }

    #[tokio::test]
    async fn test_triple_nodes_stabilization_3_2_1() -> Result<()> {
        let keys = gen_ordered_keys(3);
        let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
        test_triple_desc_ordered_nodes_stabilization(key3, key2, key1).await
    }

    #[tokio::test]
    async fn test_triple_nodes_stabilization_2_1_3() -> Result<()> {
        let keys = gen_ordered_keys(3);
        let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
        test_triple_desc_ordered_nodes_stabilization(key2, key1, key3).await
    }

    #[tokio::test]
    async fn test_triple_nodes_stabilization_1_3_2() -> Result<()> {
        let keys = gen_ordered_keys(3);
        let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
        test_triple_desc_ordered_nodes_stabilization(key1, key3, key2).await
    }

    async fn test_triple_ordered_nodes_stabilization(
        key1: SecretKey,
        key2: SecretKey,
        key3: SecretKey,
    ) -> Result<()> {
        let node1 = prepare_node(key1).await;
        let node2 = prepare_node(key2).await;
        let node3 = prepare_node(key3).await;

        println!("========================================");
        println!("||  now we connect node1 and node2    ||");
        println!("========================================");

        manually_establish_connection(&node1.swarm, &node2.swarm).await;
        wait_for_msgs([&node1, &node2, &node3]).await;
        assert_no_more_msg([&node1, &node2, &node3]).await;

        println!("========================================");
        println!("||  now we start join node3 to node2  ||");
        println!("========================================");

        manually_establish_connection(&node3.swarm, &node2.swarm).await;
        wait_for_msgs([&node1, &node2, &node3]).await;
        assert_no_more_msg([&node1, &node2, &node3]).await;

        println!("=== Check state before stabilization ===");
        assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
        assert_eq!(node2.dht().successors().list()?, vec![
            node3.did(),
            node1.did()
        ]);
        assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
        assert!(node1.dht().lock_predecessor()?.is_none());
        assert!(node2.dht().lock_predecessor()?.is_none());
        assert!(node3.dht().lock_predecessor()?.is_none());

        println!("========================================");
        println!("||  now we start first stabilization  ||");
        println!("========================================");

        run_stabilization_once(node1.swarm.clone()).await?;
        run_stabilization_once(node2.swarm.clone()).await?;
        run_stabilization_once(node3.swarm.clone()).await?;

        wait_for_msgs([&node1, &node2, &node3]).await;
        assert_no_more_msg([&node1, &node2, &node3]).await;

        println!("=== Check state after first stabilization ===");
        assert!(node1.dht().successors().list()?.contains(&node2.did()));
        assert_eq!(node2.dht().successors().list()?, vec![
            node3.did(),
            node1.did()
        ]);
        assert!(node3.dht().successors().list()?.contains(&node2.did()));

        println!("==========================================");
        println!("||  now we start 5 times stabilization  ||");
        println!("==========================================");

        for _ in 0..5 {
            run_stabilization_once(node1.swarm.clone()).await?;
            run_stabilization_once(node2.swarm.clone()).await?;
            run_stabilization_once(node3.swarm.clone()).await?;

            wait_for_msgs([&node1, &node2, &node3]).await;
            assert_no_more_msg([&node1, &node2, &node3]).await;

            println!("=== Check state after stabilization ===");
            assert_eq!(node1.dht().successors().list()?, vec![
                node2.did(),
                node3.did()
            ]);
            assert_eq!(node2.dht().successors().list()?, vec![
                node3.did(),
                node1.did()
            ]);
            assert_eq!(node3.dht().successors().list()?, vec![
                node1.did(),
                node2.did()
            ]);
        }

        println!("=== Check predecessor after all stabilization ===");
        assert_eq!(*node1.dht().lock_predecessor()?, Some(node3.did()));
        assert_eq!(*node2.dht().lock_predecessor()?, Some(node1.did()));
        assert_eq!(*node3.dht().lock_predecessor()?, Some(node2.did()));
        Ok(())
    }

    async fn test_triple_desc_ordered_nodes_stabilization(
        key1: SecretKey,
        key2: SecretKey,
        key3: SecretKey,
    ) -> Result<()> {
        let node1 = prepare_node(key1).await;
        let node2 = prepare_node(key2).await;
        let node3 = prepare_node(key3).await;

        println!("========================================");
        println!("||  now we connect node1 and node2    ||");
        println!("========================================");

        manually_establish_connection(&node1.swarm, &node2.swarm).await;
        wait_for_msgs([&node1, &node2, &node3]).await;
        assert_no_more_msg([&node1, &node2, &node3]).await;

        println!("========================================");
        println!("||  now we start join node3 to node2  ||");
        println!("========================================");

        manually_establish_connection(&node3.swarm, &node2.swarm).await;
        wait_for_msgs([&node1, &node2, &node3]).await;
        assert_no_more_msg([&node1, &node2, &node3]).await;

        println!("=== Check state before stabilization ===");
        assert_eq!(node1.dht().successors().list()?, vec![node2.did()]);
        assert_eq!(node2.dht().successors().list()?, vec![
            node1.did(),
            node3.did()
        ]);
        assert_eq!(node3.dht().successors().list()?, vec![node2.did()]);
        assert!(node1.dht().lock_predecessor()?.is_none());
        assert!(node2.dht().lock_predecessor()?.is_none());
        assert!(node3.dht().lock_predecessor()?.is_none());

        println!("========================================");
        println!("||  now we start first stabilization  ||");
        println!("========================================");

        run_stabilization_once(node1.swarm.clone()).await?;
        run_stabilization_once(node2.swarm.clone()).await?;
        run_stabilization_once(node3.swarm.clone()).await?;

        wait_for_msgs([&node1, &node2, &node3]).await;
        assert_no_more_msg([&node1, &node2, &node3]).await;

        println!("=== Check state after first stabilization ===");
        assert!(node1.dht().successors().list()?.contains(&node2.did()));
        assert_eq!(node2.dht().successors().list()?, vec![
            node1.did(),
            node3.did()
        ]);
        assert!(node3.dht().successors().list()?.contains(&node2.did()));

        println!("==========================================");
        println!("||  now we start 5 times stabilization  ||");
        println!("==========================================");

        for _ in 0..5 {
            run_stabilization_once(node1.swarm.clone()).await?;
            run_stabilization_once(node2.swarm.clone()).await?;
            run_stabilization_once(node3.swarm.clone()).await?;

            wait_for_msgs([&node1, &node2, &node3]).await;
            assert_no_more_msg([&node1, &node2, &node3]).await;

            println!("=== Check state after stabilization ===");
            assert_eq!(node1.dht().successors().list()?, vec![
                node3.did(),
                node2.did()
            ]);
            assert_eq!(node2.dht().successors().list()?, vec![
                node1.did(),
                node3.did()
            ]);
            assert_eq!(node3.dht().successors().list()?, vec![
                node2.did(),
                node1.did()
            ]);
        }

        println!("=== Check predecessor after all stabilization ===");
        assert_eq!(*node1.dht().lock_predecessor()?, Some(node2.did()));
        assert_eq!(*node2.dht().lock_predecessor()?, Some(node3.did()));
        assert_eq!(*node3.dht().lock_predecessor()?, Some(node1.did()));

        Ok(())
    }

    async fn run_stabilization_once(swarm: Arc<Swarm>) -> Result<()> {
        let stab = swarm.stabilizer();
        stab.notify_predecessor().await
    }
}