use std::collections::HashMap;
use assert_matches::assert_matches;
use iroh::Endpoint;
use iroh::endpoint::{Connection, presets};
use iroh::protocol::{AcceptError, ProtocolHandler, Router};
use p2panda_core::test_utils::setup_logging;
use p2panda_core::{Operation, Topic};
use p2panda_net::cbor::{into_cbor_sink, into_cbor_stream};
use p2panda_sync::FromSync;
use p2panda_sync::protocols::{Logs, TopicLogSyncEvent as Event};
use p2panda_sync::test_utils::{Peer, TestTopicSyncMessage};
use p2panda_sync::traits::Protocol;
use tokio_stream::StreamExt;
use crate::test_utils::TestNode;
#[tokio::test]
async fn e2e_log_sync() {
setup_logging();
let topic: Topic = [0; 32].into();
let log_id = 0;
let mut bob = TestNode::spawn([11; 32], None).await;
let mut alice = TestNode::spawn([10; 32], Some(bob.node_info())).await;
alice
.client
.create_operation(b"Hello from Alice", log_id)
.await;
alice
.client
.associate(&topic, &HashMap::from([(alice.client_id(), vec![log_id])]))
.await;
bob.client.create_operation(b"Hello from Bob", log_id).await;
bob.client
.associate(&topic, &HashMap::from([(bob.client_id(), vec![log_id])]))
.await;
let alice_handle = alice.log_sync.stream(topic, true).await.unwrap();
let mut alice_subscription = alice_handle.subscribe().await.unwrap();
let bob_handle = bob.log_sync.stream(topic, true).await.unwrap();
let mut bob_subscription = bob_handle.subscribe().await.unwrap();
alice_handle.initiate_session(bob.node_id());
let bob_id = bob.node_id();
let event = alice_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
session_id: 0,
remote,
event: Event::SyncStarted { .. },
}) if remote == bob_id
);
let event = alice_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::OperationReceived { .. },
..
})
);
let event = alice_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::SyncFinished { .. },
..
})
);
let event = alice_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::LiveModeStarted,
..
})
);
let alice_id = alice.node_id();
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
session_id: 0,
remote,
event: Event::SyncStarted { .. },
}) if remote == alice_id
);
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::OperationReceived { .. },
..
})
);
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::SyncFinished { .. },
..
})
);
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::LiveModeStarted,
..
})
);
let (header, _, body) = alice
.client
.create_operation(b"live message from Alice", log_id)
.await;
alice_handle
.publish(Operation {
hash: header.hash(),
header,
body: Some(body),
})
.await
.unwrap();
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::OperationReceived { .. },
..
})
);
drop(alice_handle);
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::SessionFinished { .. },
..
})
);
let event = alice_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::SessionFinished { .. },
..
})
);
}
#[tokio::test]
async fn e2e_three_party_sync() {
setup_logging();
let topic: Topic = [0; 32].into();
let log_id = 0;
let mut bob = TestNode::spawn([30; 32], None).await;
let mut alice = TestNode::spawn([31; 32], Some(bob.node_info())).await;
let mut carol = TestNode::spawn([32; 32], Some(alice.node_info())).await;
alice
.client
.create_operation(b"Hello from Alice", log_id)
.await;
alice
.client
.associate(&topic, &HashMap::from([(alice.client_id(), vec![log_id])]))
.await;
bob.client.create_operation(b"Hello from Bob", log_id).await;
bob.client
.associate(&topic, &HashMap::from([(bob.client_id(), vec![log_id])]))
.await;
carol
.client
.create_operation(b"Hello from Carol", log_id)
.await;
carol
.client
.associate(&topic, &HashMap::from([(carol.client_id(), vec![log_id])]))
.await;
let alice_handle = alice.log_sync.stream(topic, true).await.unwrap();
let mut alice_subscription = alice_handle.subscribe().await.unwrap();
let bob_handle = bob.log_sync.stream(topic, true).await.unwrap();
let mut bob_subscription = bob_handle.subscribe().await.unwrap();
alice_handle.initiate_session(bob.node_id());
let bob_id = bob.node_id();
let event = alice_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
session_id: 0,
remote,
event: Event::SyncStarted { .. },
}) if remote == bob_id
);
let event = alice_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::OperationReceived { .. },
..
})
);
let event = alice_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::SyncFinished { .. },
..
})
);
let event = alice_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::LiveModeStarted,
..
})
);
let alice_id = alice.node_id();
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
session_id: 0,
remote,
event: Event::SyncStarted { .. },
}) if remote == alice_id
);
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::OperationReceived { .. },
..
})
);
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::SyncFinished { .. },
..
})
);
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::LiveModeStarted,
..
})
);
let (header, _, body) = alice
.client
.create_operation(b"live message from Alice", log_id)
.await;
alice_handle
.publish(Operation {
hash: header.hash(),
header,
body: Some(body),
})
.await
.unwrap();
let event = bob_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::OperationReceived { .. },
..
})
);
let carol_handle = carol.log_sync.stream(topic, true).await.unwrap();
let mut carol_subscription = carol_handle.subscribe().await.unwrap();
carol_handle.initiate_session(alice.node_id());
let event = carol_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
session_id: 0,
event: Event::SyncStarted { .. },
..
})
);
let event = carol_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::OperationReceived { .. },
..
})
);
let event = carol_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::OperationReceived { .. },
..
})
);
let event = carol_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::SyncFinished { .. },
..
})
);
let event = carol_subscription.next().await.unwrap();
assert_matches!(
event,
Ok(FromSync {
event: Event::LiveModeStarted,
..
})
);
}
#[tokio::test]
async fn unsubscribe_from_gossip_after_drop() {
setup_logging();
let sync_topic: Topic = [0; 32].into();
let alice = TestNode::spawn([73; 32], None).await;
let alice_handle = alice.log_sync.stream(sync_topic, true).await.unwrap();
let mut watcher = alice
.address_book
.watch_node_topics(alice.node_id(), false)
.await
.unwrap();
while let Some(event) = watcher.recv().await {
if !event.value.contains(&sync_topic) && event.value.len() == 1 {
break;
}
}
drop(alice_handle);
while let Some(event) = watcher.recv().await {
if event.value.is_empty() {
break;
}
}
}
const ALPN: &[u8] = b"iroh/smol/0";
#[derive(Debug, Clone, Default)]
struct TestProtocol {}
impl ProtocolHandler for TestProtocol {
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
let _ = connection.accept_bi().await;
Ok(())
}
async fn shutdown(&self) {}
}
#[tokio::test]
async fn panic_on_sink_closure_after_error_regression() {
setup_logging();
let topic = Topic::random();
let mut peer = Peer::new(0).await;
peer.associate(&topic, &Logs::default()).await;
let (session, _events_rx, _live_tx) = peer.topic_sync_protocol(topic.clone(), true);
let acceptor = Endpoint::bind(presets::Minimal).await.unwrap();
let acceptor_router = Router::builder(acceptor)
.accept(ALPN, TestProtocol::default())
.spawn();
let addr = acceptor_router.endpoint().addr();
let initiator = Endpoint::bind(presets::Minimal).await.unwrap();
let connection = initiator.connect(addr, ALPN).await.unwrap();
let (tx, rx) = connection.open_bi().await.unwrap();
let mut tx = into_cbor_sink::<TestTopicSyncMessage, _>(tx);
let mut rx = into_cbor_stream::<TestTopicSyncMessage, _>(rx);
let handle = tokio::spawn(async move { session.run(&mut tx, &mut rx).await });
connection.close(0u32.into(), b"testing");
let result = handle.await.unwrap();
assert!(result.is_err());
}