ipfs 0.2.1

IPFS node implementation
Documentation
use futures::future::pending;
use futures::stream::StreamExt;
use ipfs::Node;
use std::time::Duration;
use tokio::time::timeout;

mod common;
use common::{spawn_nodes, Topology};

#[tokio::test(max_threads = 1)]
async fn subscribe_only_once() {
    let a = Node::new("test_node").await;
    let _stream = a.pubsub_subscribe("some_topic".into()).await.unwrap();
    a.pubsub_subscribe("some_topic".into()).await.unwrap_err();
}

#[tokio::test(max_threads = 1)]
async fn resubscribe_after_unsubscribe() {
    let a = Node::new("test_node").await;

    let mut stream = a.pubsub_subscribe("topic".into()).await.unwrap();
    a.pubsub_unsubscribe("topic").await.unwrap();
    // sender has been dropped
    assert_eq!(stream.next().await, None);

    drop(a.pubsub_subscribe("topic".into()).await.unwrap());
}

#[tokio::test(max_threads = 1)]
async fn unsubscribe_via_drop() {
    let a = Node::new("test_node").await;

    let msgs = a.pubsub_subscribe("topic".into()).await.unwrap();
    assert_eq!(a.pubsub_subscribed().await.unwrap(), &["topic"]);

    drop(msgs);

    let empty: &[&str] = &[];
    assert_eq!(a.pubsub_subscribed().await.unwrap(), empty);
}

#[tokio::test(max_threads = 1)]
async fn can_publish_without_subscribing() {
    let a = Node::new("test_node").await;
    a.pubsub_publish("topic".into(), b"foobar".to_vec())
        .await
        .unwrap()
}

#[tokio::test(max_threads = 1)]
#[allow(clippy::mutable_key_type)] // clippy doesn't like Vec inside HashSet
async fn publish_between_two_nodes() {
    use futures::stream::StreamExt;
    use std::collections::HashSet;

    let nodes = spawn_nodes(2, Topology::Line).await;

    let topic = "shared".to_owned();

    let mut a_msgs = nodes[0].pubsub_subscribe(topic.clone()).await.unwrap();
    let mut b_msgs = nodes[1].pubsub_subscribe(topic.clone()).await.unwrap();

    // need to wait to see both sides so that the messages will get through
    let mut appeared = false;
    for _ in 0..100usize {
        if nodes[0]
            .pubsub_peers(Some(topic.clone()))
            .await
            .unwrap()
            .contains(&nodes[1].id)
            && nodes[1]
                .pubsub_peers(Some(topic.clone()))
                .await
                .unwrap()
                .contains(&nodes[0].id)
        {
            appeared = true;
            break;
        }
        timeout(Duration::from_millis(100), pending::<()>())
            .await
            .unwrap_err();
    }

    assert!(
        appeared,
        "timed out before both nodes appeared as pubsub peers"
    );

    nodes[0]
        .pubsub_publish(topic.clone(), b"foobar".to_vec())
        .await
        .unwrap();
    nodes[1]
        .pubsub_publish(topic.clone(), b"barfoo".to_vec())
        .await
        .unwrap();

    // the order is not defined, but both should see the other's message and the message they sent
    let expected = [
        (&[topic.clone()], &nodes[0].id, b"foobar"),
        (&[topic.clone()], &nodes[1].id, b"barfoo"),
    ]
    .iter()
    .cloned()
    .map(|(topics, id, data)| (topics.to_vec(), id.clone(), data.to_vec()))
    .collect::<HashSet<_>>();

    for st in &mut [b_msgs.by_ref(), a_msgs.by_ref()] {
        let actual = st
            .take(2)
            // Arc::try_unwrap will fail sometimes here as the sender side in src/p2p/pubsub.rs:305
            // can still be looping
            .map(|msg| (*msg).clone())
            .map(|msg| (msg.topics, msg.source, msg.data))
            .collect::<HashSet<_>>()
            .await;
        assert_eq!(expected, actual);
    }

    drop(b_msgs);

    let mut disappeared = false;
    for _ in 0..100usize {
        if !nodes[0]
            .pubsub_peers(Some(topic.clone()))
            .await
            .unwrap()
            .contains(&nodes[1].id)
        {
            disappeared = true;
            break;
        }
        timeout(Duration::from_millis(100), pending::<()>())
            .await
            .unwrap_err();
    }

    assert!(disappeared, "timed out before a saw b's unsubscription");
}

#[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))]
#[tokio::test(max_threads = 1)]
#[ignore = "doesn't work yet"]
async fn pubsub_interop() {
    use common::interop::{api_call, ForeignNode};
    use futures::{future, pin_mut};

    let rust_node = Node::new("rusty_boi").await;
    let foreign_node = ForeignNode::new();
    let foreign_api_port = foreign_node.api_port;

    rust_node
        .connect(foreign_node.addrs[0].clone())
        .await
        .unwrap();

    const TOPIC: &str = "shared";

    let _rust_sub_stream = rust_node.pubsub_subscribe(TOPIC.to_string()).await.unwrap();

    let foreign_sub_answer = future::maybe_done(api_call(
        foreign_api_port,
        format!("pubsub/sub?arg={}", TOPIC),
    ));
    pin_mut!(foreign_sub_answer);
    assert_eq!(foreign_sub_answer.as_mut().output_mut(), None);

    // need to wait to see both sides so that the messages will get through
    let mut appeared = false;
    for _ in 0..100usize {
        if rust_node
            .pubsub_peers(Some(TOPIC.to_string()))
            .await
            .unwrap()
            .contains(&foreign_node.id)
            && api_call(foreign_api_port, &format!("pubsub/peers?arg={}", TOPIC))
                .await
                .contains(&rust_node.id.to_string())
        {
            appeared = true;
            break;
        }
        timeout(Duration::from_millis(200), pending::<()>())
            .await
            .unwrap_err();
    }

    assert!(
        appeared,
        "timed out before both nodes appeared as pubsub peers"
    );
}