use std::collections::HashMap;
use futures_util::StreamExt;
use p2panda_core::test_utils::setup_logging;
use p2panda_core::{Body, Topic};
use p2panda_net::test_utils::TestNode;
use p2panda_sync::protocols::TopicLogSyncEvent;
#[tokio::test]
async fn gossip_and_sync_with_same_topic() {
setup_logging();
let topic: Topic = [1; 32].into();
let mut panda = TestNode::spawn([98; 32], None).await;
let panda_gossip_handle = panda.gossip.stream(topic).await.unwrap();
let mut panda_gossip_rx = panda_gossip_handle.subscribe();
let panda_gossip_task = tokio::spawn(async move {
while let Some(Ok(bytes)) = panda_gossip_rx.next().await {
return Some(bytes);
}
return None;
});
let panda_sync_handle = panda.log_sync.stream(topic, true).await.unwrap();
let mut panda_sync_rx = panda_sync_handle.subscribe().await.unwrap();
let panda_sync_task = tokio::spawn(async move {
while let Some(Ok(item)) = panda_sync_rx.next().await {
if let TopicLogSyncEvent::OperationReceived { operation, .. } = item.event {
return Some(operation);
}
}
return None;
});
let mut penguin = TestNode::spawn([99; 32], None).await;
penguin
.address_book
.insert_node_info(panda.node_info().bootstrap())
.await
.unwrap();
let penguin_gossip_handle = penguin.gossip.stream(topic).await.unwrap();
penguin_gossip_handle
.publish(b"Hello, Panda!")
.await
.unwrap();
let log_id = 0;
penguin
.client
.create_operation(b"Hello, again, Panda!", log_id)
.await;
penguin
.client
.associate(
&topic,
&HashMap::from([(penguin.client_id(), vec![log_id])]),
)
.await;
let _penguin_sync_handle = penguin.log_sync.stream(topic, true).await.unwrap();
let message = panda_gossip_task.await.unwrap();
assert_eq!(message, Some(b"Hello, Panda!".to_vec()));
let operation = panda_sync_task.await.unwrap().unwrap();
assert_eq!(operation.body, Some(Body::new(b"Hello, again, Panda!")));
}