use std::collections::HashSet;
use std::time::Duration;
use futures_test::task::noop_context;
use futures_util::TryStreamExt;
use p2panda_core::Topic;
use tokio::time::sleep;
use tokio_stream::StreamExt;
use crate::address_book::AddressBook;
use crate::gossip::api::GossipPublishError;
use crate::gossip::{DEFAULT_MAX_MESSAGE_SIZE, Gossip, GossipEvent};
use crate::iroh_endpoint::Endpoint;
use crate::test_utils::{setup_logging, test_args};
#[tokio::test]
async fn joined_and_left_events_are_received() {
setup_logging();
let mut ant_args = test_args();
let mut bat_args = test_args();
let topic: Topic = [1; 32].into();
let ant_address_book = AddressBook::builder().spawn().await.unwrap();
let bat_address_book = AddressBook::builder().spawn().await.unwrap();
let ant_endpoint = Endpoint::builder(ant_address_book.clone())
.config(ant_args.iroh_config.clone())
.signing_key(ant_args.signing_key.clone())
.spawn()
.await
.unwrap();
let bat_endpoint = Endpoint::builder(bat_address_book.clone())
.config(bat_args.iroh_config.clone())
.signing_key(bat_args.signing_key.clone())
.spawn()
.await
.unwrap();
let ant_info = ant_args.node_info().bootstrap();
let bat_info = bat_args.node_info().bootstrap();
bat_address_book
.set_topics(ant_info.node_id, [topic])
.await
.unwrap();
bat_address_book.insert_node_info(ant_info).await.unwrap();
ant_address_book
.set_topics(bat_info.node_id, [topic])
.await
.unwrap();
ant_address_book.insert_node_info(bat_info).await.unwrap();
let ant_gossip = Gossip::builder(ant_address_book.clone(), ant_endpoint.clone())
.spawn()
.await
.unwrap();
let bat_gossip = Gossip::builder(bat_address_book.clone(), bat_endpoint.clone())
.spawn()
.await
.unwrap();
let mut events = ant_gossip.events().await.unwrap();
let ant_handle = ant_gossip.stream(topic).await.unwrap();
let _bat_handle = bat_gossip.stream(topic).await.unwrap();
assert!(matches!(
events.recv().await,
Ok(GossipEvent::Joined { .. })
));
drop(ant_handle);
assert!(matches!(events.recv().await, Ok(GossipEvent::Left { .. })));
}
#[tokio::test]
async fn join_without_bootstrap() {
setup_logging();
let mut ant_args = test_args();
let bat_args = test_args();
let cat_args = test_args();
let topic: Topic = [1; 32].into();
let ant_address_book = AddressBook::builder().spawn().await.unwrap();
let bat_address_book = AddressBook::builder().spawn().await.unwrap();
let cat_address_book = AddressBook::builder().spawn().await.unwrap();
let ant_endpoint = Endpoint::builder(ant_address_book.clone())
.config(ant_args.iroh_config.clone())
.signing_key(ant_args.signing_key.clone())
.spawn()
.await
.unwrap();
let bat_endpoint = Endpoint::builder(bat_address_book.clone())
.config(bat_args.iroh_config.clone())
.signing_key(bat_args.signing_key.clone())
.spawn()
.await
.unwrap();
let cat_endpoint = Endpoint::builder(cat_address_book.clone())
.config(cat_args.iroh_config.clone())
.signing_key(cat_args.signing_key.clone())
.spawn()
.await
.unwrap();
let ant_info = ant_args.node_info().bootstrap();
bat_address_book
.insert_node_info(ant_info.clone())
.await
.unwrap();
bat_address_book
.set_topics(ant_info.node_id, [topic])
.await
.unwrap();
cat_address_book
.insert_node_info(ant_info.clone())
.await
.unwrap();
cat_address_book
.set_topics(ant_info.node_id, [topic])
.await
.unwrap();
let ant_gossip = Gossip::builder(ant_address_book.clone(), ant_endpoint.clone())
.spawn()
.await
.unwrap();
let bat_gossip = Gossip::builder(bat_address_book.clone(), bat_endpoint.clone())
.spawn()
.await
.unwrap();
let cat_gossip = Gossip::builder(cat_address_book.clone(), cat_endpoint.clone())
.spawn()
.await
.unwrap();
let mut events = ant_gossip.events().await.unwrap();
let _ant_to_gossip = ant_gossip.stream(topic).await.unwrap();
let _bat_to_gossip = bat_gossip.stream(topic).await.unwrap();
let _cat_to_gossip = cat_gossip.stream(topic).await.unwrap();
let mut neighbours = HashSet::new();
if let GossipEvent::Joined {
topic: event_topic,
nodes,
} = events.recv().await.unwrap()
{
assert_eq!(event_topic, topic);
neighbours.extend(nodes);
}
if let GossipEvent::NeighbourUp {
topic: event_topic,
node,
} = events.recv().await.unwrap()
{
assert_eq!(event_topic, topic);
neighbours.insert(node);
}
assert_eq!(
neighbours,
HashSet::from([bat_args.verifying_key, cat_args.verifying_key])
);
}
#[tokio::test]
async fn two_peer_gossip() {
setup_logging();
let mut ant_args = test_args();
let bat_args = test_args();
let topic: Topic = [7; 32].into();
let ant_address_book = AddressBook::builder().spawn().await.unwrap();
let bat_address_book = AddressBook::builder().spawn().await.unwrap();
let ant_endpoint = Endpoint::builder(ant_address_book.clone())
.config(ant_args.iroh_config.clone())
.signing_key(ant_args.signing_key.clone())
.spawn()
.await
.unwrap();
let bat_endpoint = Endpoint::builder(bat_address_book.clone())
.config(bat_args.iroh_config.clone())
.signing_key(bat_args.signing_key.clone())
.spawn()
.await
.unwrap();
let ant_info = ant_args.node_info().bootstrap();
bat_address_book
.insert_node_info(ant_info.clone())
.await
.unwrap();
bat_address_book
.set_topics(ant_info.node_id, [topic])
.await
.unwrap();
let ant_gossip = Gossip::builder(ant_address_book.clone(), ant_endpoint.clone())
.spawn()
.await
.unwrap();
let bat_gossip = Gossip::builder(bat_address_book.clone(), bat_endpoint.clone())
.spawn()
.await
.unwrap();
let ant_handle = ant_gossip.stream(topic).await.unwrap();
let bat_handle = bat_gossip.stream(topic).await.unwrap();
ant_handle.publish(b"hi, bat!").await.unwrap();
let mut bat_from_gossip_rx = bat_handle.subscribe();
let Some(Ok(msg)) = bat_from_gossip_rx.next().await else {
panic!("expected msg from ant")
};
assert_eq!(msg, b"hi, bat!".to_vec());
bat_handle.publish(b"oh hey ant!").await.unwrap();
let mut ant_from_gossip_rx = ant_handle.subscribe();
let Some(Ok(msg)) = ant_from_gossip_rx.next().await else {
panic!("expected msg from bat")
};
assert_eq!(msg, b"oh hey ant!".to_vec());
}
#[ignore = "flaky"]
#[tokio::test]
async fn third_peer_joins_non_bootstrap() {
setup_logging();
let mut ant_args = test_args();
let mut bat_args = test_args();
let cat_args = test_args();
let topic: Topic = [11; 32].into();
let ant_address_book = AddressBook::builder().spawn().await.unwrap();
let bat_address_book = AddressBook::builder().spawn().await.unwrap();
let cat_address_book = AddressBook::builder().spawn().await.unwrap();
let ant_endpoint = Endpoint::builder(ant_address_book.clone())
.config(ant_args.iroh_config.clone())
.signing_key(ant_args.signing_key.clone())
.spawn()
.await
.unwrap();
let bat_endpoint = Endpoint::builder(bat_address_book.clone())
.config(bat_args.iroh_config.clone())
.signing_key(bat_args.signing_key.clone())
.spawn()
.await
.unwrap();
let cat_endpoint = Endpoint::builder(cat_address_book.clone())
.config(cat_args.iroh_config.clone())
.signing_key(cat_args.signing_key.clone())
.spawn()
.await
.unwrap();
let ant_info = ant_args.node_info().bootstrap();
bat_address_book
.insert_node_info(ant_info.clone())
.await
.unwrap();
bat_address_book
.set_topics(ant_info.node_id, [topic])
.await
.unwrap();
let ant_gossip = Gossip::builder(ant_address_book.clone(), ant_endpoint.clone())
.spawn()
.await
.unwrap();
let bat_gossip = Gossip::builder(bat_address_book.clone(), bat_endpoint.clone())
.spawn()
.await
.unwrap();
let cat_gossip = Gossip::builder(cat_address_book.clone(), cat_endpoint.clone())
.spawn()
.await
.unwrap();
let ant_handle = ant_gossip.stream(topic).await.unwrap();
let bat_handle = bat_gossip.stream(topic).await.unwrap();
let mut bat_from_gossip_rx = bat_handle.subscribe();
let bat_info = bat_args.node_info().bootstrap();
cat_address_book
.insert_node_info(bat_info.clone())
.await
.unwrap();
cat_address_book
.set_topics(bat_info.node_id, [topic])
.await
.unwrap();
let cat_handle = cat_gossip.stream(topic).await.unwrap();
let mut cat_from_gossip_rx = cat_handle.subscribe();
sleep(Duration::from_millis(250)).await;
let cat_msg_to_ant_and_bat = b"hi ant and bat!".to_vec();
cat_handle
.publish(cat_msg_to_ant_and_bat.clone())
.await
.unwrap();
let Some(Ok(msg)) = bat_from_gossip_rx.next().await else {
panic!("expected msg from cat")
};
assert_eq!(msg, cat_msg_to_ant_and_bat);
let ant_msg_to_bat_and_cat = b"hi bat and cat!".to_vec();
ant_handle
.publish(ant_msg_to_bat_and_cat.clone())
.await
.unwrap();
let Some(Ok(msg)) = cat_from_gossip_rx.next().await else {
panic!("expected msg from ant")
};
assert_eq!(msg, ant_msg_to_bat_and_cat);
}
#[tokio::test]
async fn three_peer_gossip_with_rejoin() {
setup_logging();
let mut ant_args = test_args();
let mut bat_args = test_args();
let cat_args = test_args();
let topic: Topic = [9; 32].into();
let ant_address_book = AddressBook::builder().spawn().await.unwrap();
let bat_address_book = AddressBook::builder().spawn().await.unwrap();
let cat_address_book = AddressBook::builder().spawn().await.unwrap();
let ant_endpoint = Endpoint::builder(ant_address_book.clone())
.config(ant_args.iroh_config.clone())
.signing_key(ant_args.signing_key.clone())
.spawn()
.await
.unwrap();
let bat_endpoint = Endpoint::builder(bat_address_book.clone())
.config(bat_args.iroh_config.clone())
.signing_key(bat_args.signing_key.clone())
.spawn()
.await
.unwrap();
let cat_endpoint = Endpoint::builder(cat_address_book.clone())
.config(cat_args.iroh_config.clone())
.signing_key(cat_args.signing_key.clone())
.spawn()
.await
.unwrap();
let ant_info = ant_args.node_info().bootstrap();
bat_address_book
.insert_node_info(ant_info.clone())
.await
.unwrap();
bat_address_book
.set_topics(ant_info.node_id, [topic])
.await
.unwrap();
let ant_gossip = Gossip::builder(ant_address_book.clone(), ant_endpoint.clone())
.spawn()
.await
.unwrap();
let bat_gossip = Gossip::builder(bat_address_book.clone(), bat_endpoint.clone())
.spawn()
.await
.unwrap();
let cat_gossip = Gossip::builder(cat_address_book.clone(), cat_endpoint.clone())
.spawn()
.await
.unwrap();
let ant_handle = ant_gossip.stream(topic).await.unwrap();
let bat_handle = bat_gossip.stream(topic).await.unwrap();
let mut ant_from_gossip_rx = ant_handle.subscribe();
let mut bat_from_gossip_rx = bat_handle.subscribe();
let ant_msg_to_bat = b"hi bat!".to_vec();
ant_handle.publish(ant_msg_to_bat.clone()).await.unwrap();
let Some(Ok(msg)) = bat_from_gossip_rx.next().await else {
panic!("expected msg from ant")
};
assert_eq!(msg, ant_msg_to_bat);
let bat_msg_to_ant = b"oh hey ant!".to_vec();
bat_handle.publish(bat_msg_to_ant.clone()).await.unwrap();
let Some(Ok(msg)) = ant_from_gossip_rx.next().await else {
panic!("expected msg from bat")
};
assert_eq!(msg, bat_msg_to_ant);
drop(ant_address_book);
drop(ant_endpoint);
let cat_handle = cat_gossip.stream(topic).await.unwrap();
let mut cat_from_gossip_rx = cat_handle.subscribe();
let cat_msg_to_bat = b"hi bat!".to_vec();
cat_handle.publish(cat_msg_to_bat.clone()).await.unwrap();
sleep(Duration::from_millis(50)).await;
let mut cx = noop_context();
assert!(bat_from_gossip_rx.try_poll_next_unpin(&mut cx).is_pending());
let bat_msg_to_cat = b"anyone out there?".to_vec();
bat_handle.publish(bat_msg_to_cat.clone()).await.unwrap();
sleep(Duration::from_millis(50)).await;
let mut cx = noop_context();
assert!(bat_from_gossip_rx.try_poll_next_unpin(&mut cx).is_pending());
let bat_info = bat_args.node_info().bootstrap();
cat_address_book
.insert_node_info(bat_info.clone())
.await
.unwrap();
cat_address_book
.set_topics(bat_info.node_id, [topic])
.await
.unwrap();
let cat_msg_to_bat = b"you there bat?".to_vec();
cat_handle.publish(cat_msg_to_bat.clone()).await.unwrap();
let Some(Ok(msg)) = bat_from_gossip_rx.next().await else {
panic!("expected msg from cat")
};
assert_eq!(msg, cat_msg_to_bat);
let bat_msg_to_cat = b"yoyo!".to_vec();
bat_handle.publish(bat_msg_to_cat.clone()).await.unwrap();
let Some(Ok(msg)) = cat_from_gossip_rx.next().await else {
panic!("expected msg from bat")
};
assert_eq!(msg, bat_msg_to_cat);
}
#[tokio::test]
async fn leave_overlay_on_drop() {
setup_logging();
let mut ant_args = test_args();
let mut bat_args = test_args();
let topic: Topic = [1; 32].into();
let ant_address_book = AddressBook::builder().spawn().await.unwrap();
let bat_address_book = AddressBook::builder().spawn().await.unwrap();
let ant_endpoint = Endpoint::builder(ant_address_book.clone())
.config(ant_args.iroh_config.clone())
.signing_key(ant_args.signing_key.clone())
.spawn()
.await
.unwrap();
let bat_endpoint = Endpoint::builder(bat_address_book.clone())
.config(bat_args.iroh_config.clone())
.signing_key(bat_args.signing_key.clone())
.spawn()
.await
.unwrap();
let ant_info = ant_args.node_info().bootstrap();
let bat_info = bat_args.node_info().bootstrap();
bat_address_book
.set_topics(ant_info.node_id, [topic])
.await
.unwrap();
bat_address_book.insert_node_info(ant_info).await.unwrap();
ant_address_book
.set_topics(bat_info.node_id, [topic])
.await
.unwrap();
ant_address_book.insert_node_info(bat_info).await.unwrap();
let ant_gossip = Gossip::builder(ant_address_book.clone(), ant_endpoint.clone())
.spawn()
.await
.unwrap();
let bat_gossip = Gossip::builder(bat_address_book.clone(), bat_endpoint.clone())
.spawn()
.await
.unwrap();
let ant_handle = ant_gossip.stream(topic).await.unwrap();
let bat_handle = bat_gossip.stream(topic).await.unwrap();
let mut bat_rx = bat_handle.subscribe();
assert!(ant_handle.publish(b"test 0").await.is_ok());
assert_eq!(bat_rx.next().await.unwrap().unwrap(), b"test 0".to_vec());
let ant_gossip_2 = ant_gossip.clone();
drop(ant_gossip_2);
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(ant_handle.publish(b"test 1").await.is_ok());
assert_eq!(bat_rx.next().await.unwrap().unwrap(), b"test 1".to_vec());
drop(ant_gossip);
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(ant_handle.publish(b"test 1").await.is_err());
drop(ant_handle);
let ant_gossip = Gossip::builder(ant_address_book.clone(), ant_endpoint.clone())
.spawn()
.await
.unwrap();
let mut events = ant_gossip.events().await.unwrap();
let ant_handle_1 = ant_gossip.stream(topic).await.unwrap();
let ant_handle_2 = ant_gossip.stream(topic).await.unwrap();
let ant_handle_3 = ant_gossip.stream(topic).await.unwrap();
assert!(matches!(
events.recv().await,
Ok(GossipEvent::Joined { .. })
));
let ant_rx_1 = ant_handle_1.subscribe();
let ant_rx_2 = ant_handle_2.subscribe();
let ant_rx_3 = ant_handle_2.subscribe();
let ant_rx_4 = ant_handle_1.subscribe();
drop(ant_handle_2);
drop(ant_rx_3);
drop(ant_rx_2);
assert!(ant_handle_1.publish(b"test 2").await.is_ok());
assert_eq!(bat_rx.next().await.unwrap().unwrap(), b"test 2".to_vec());
drop(ant_rx_1);
drop(ant_handle_3);
drop(ant_handle_1);
assert_eq!(
events.try_recv(),
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
);
drop(ant_rx_4);
assert!(matches!(events.recv().await, Ok(GossipEvent::Left { .. })));
let handle = ant_gossip.stream(topic).await.unwrap();
assert!(handle.publish(b"test 3").await.is_ok());
assert_eq!(bat_rx.next().await.unwrap().unwrap(), b"test 3".to_vec());
}
#[tokio::test]
async fn large_message_error() {
setup_logging();
let args = test_args();
let topic = Topic::random();
let address_book = AddressBook::builder().spawn().await.unwrap();
let endpoint = Endpoint::builder(address_book.clone())
.config(args.iroh_config.clone())
.signing_key(args.signing_key.clone())
.spawn()
.await
.unwrap();
let gossip = Gossip::builder(address_book.clone(), endpoint.clone())
.spawn()
.await
.unwrap();
let handle = gossip.stream(topic).await.unwrap();
let large_str = hex::encode([255; 2500]);
let large_str_len = large_str.len();
let result = handle.publish(large_str).await;
assert_eq!(
result,
Err(GossipPublishError::MessageTooLarge((
large_str_len,
DEFAULT_MAX_MESSAGE_SIZE )))
);
}