use std::marker::PhantomData;
use std::pin::Pin;
use std::time::Duration;
use futures_channel::mpsc::{self, SendError};
use futures_util::{Sink, SinkExt, Stream, StreamExt};
use p2panda_core::Topic;
use p2panda_core::test_utils::setup_logging;
use p2panda_sync::traits::{Manager as SyncManagerTrait, Protocol};
use p2panda_sync::{FromSync, ToSync};
use ractor::thread_local::{ThreadLocalActor, ThreadLocalActorSpawner};
use ractor::{ActorRef, call};
use rand::random;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use crate::NodeId;
use crate::address_book::AddressBook;
use crate::addrs::NodeInfo;
use crate::gossip::Gossip;
use crate::iroh_endpoint::Endpoint;
use crate::sync::actors::{SyncManager, ToSyncManager};
use crate::sync::handle::SyncHandle;
use crate::test_utils::{ApplicationArguments, test_args_from_seed};
const TEST_PROTOCOL_ID: [u8; 32] = [101; 32];
struct FailingNode {
args: ApplicationArguments,
sync_ref: ActorRef<ToSyncManager<DummySyncMessage, DummySyncEvent>>,
}
impl FailingNode {
pub async fn spawn(
seed: [u8; 32],
node_infos: Vec<NodeInfo>,
sync_args: FailingSyncArgs,
) -> Self {
let args = test_args_from_seed(seed);
let address_book = AddressBook::builder().spawn().await.unwrap();
for info in node_infos {
address_book.insert_node_info(info).await.unwrap();
}
let endpoint = Endpoint::builder(address_book.clone())
.config(args.iroh_config.clone())
.signing_key(args.signing_key.clone())
.spawn()
.await
.unwrap();
let gossip = Gossip::builder(address_book.clone(), endpoint.clone())
.spawn()
.await
.unwrap();
let thread_pool = ThreadLocalActorSpawner::new();
let (sync_ref, _) =
SyncManager::<DummySyncManager<FailingSyncArgs, FailingSyncProtocol>>::spawn(
None,
(TEST_PROTOCOL_ID.to_vec(), sync_args, endpoint, gossip),
thread_pool,
)
.await
.unwrap();
Self { args, sync_ref }
}
pub fn node_id(&self) -> NodeId {
self.args.verifying_key
}
pub fn shutdown(&self) {
self.sync_ref.stop(None);
}
}
#[derive(Debug, Error)]
enum SyncError {
#[error("unexpected sync failure")]
UnexpectedFailure,
}
#[derive(Debug, Clone)]
enum SyncBehaviour {
Panic,
Error,
Wait,
}
#[derive(Debug)]
struct FailingSyncProtocol {
behaviour: SyncBehaviour,
}
impl Protocol for FailingSyncProtocol {
type Output = ();
type Error = SyncError;
type Message = ();
async fn run(
self,
sink: &mut (impl Sink<Self::Message, Error = impl std::fmt::Debug> + Unpin),
stream: &mut (impl Stream<Item = Result<Self::Message, impl std::fmt::Debug>> + Unpin),
) -> Result<Self::Output, Self::Error> {
let _ = sink.send(()).await;
tokio::time::sleep(Duration::from_millis(200)).await;
match self.behaviour {
SyncBehaviour::Panic => panic!(),
SyncBehaviour::Error => return Err(SyncError::UnexpectedFailure),
SyncBehaviour::Wait => {
while let Some(_) = stream.next().await {}
return Err(SyncError::UnexpectedFailure);
}
}
}
}
#[derive(Clone, Debug)]
struct FailingSyncArgs {
pub event_tx: broadcast::Sender<FromSync<DummySyncEvent>>,
pub behaviour: SyncBehaviour,
}
impl FailingSyncArgs {
pub fn new(behaviour: SyncBehaviour) -> (Self, broadcast::Receiver<FromSync<DummySyncEvent>>) {
let (tx, rx) = broadcast::channel(128);
(
Self {
event_tx: tx,
behaviour,
},
rx,
)
}
}
#[derive(Clone, Debug)]
#[allow(unused)]
enum DummySyncEvent {
SessionCreated,
SyncStarted,
Received(DummySyncMessage),
SyncFinished,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
enum DummySyncMessage {
Data,
Close,
}
#[derive(Debug)]
struct DummySyncManager<C, P> {
pub event_tx: broadcast::Sender<FromSync<DummySyncEvent>>,
#[allow(unused)]
pub event_rx: broadcast::Receiver<FromSync<DummySyncEvent>>,
pub args: C,
pub _marker: PhantomData<P>,
}
impl SyncManagerTrait<Topic> for DummySyncManager<FailingSyncArgs, FailingSyncProtocol> {
type Protocol = FailingSyncProtocol;
type Event = DummySyncEvent;
type Args = FailingSyncArgs;
type Message = DummySyncMessage;
type Error = SendError;
fn from_args(args: Self::Args) -> Self {
let event_rx = args.event_tx.subscribe();
DummySyncManager {
event_tx: args.event_tx.clone(),
event_rx,
args,
_marker: PhantomData,
}
}
async fn session(
&mut self,
session_id: u64,
config: &p2panda_sync::SessionConfig<Topic>,
) -> Self::Protocol {
self.event_tx
.send(FromSync {
session_id,
remote: config.remote,
event: DummySyncEvent::SessionCreated,
})
.unwrap();
FailingSyncProtocol {
behaviour: self.args.behaviour.clone(),
}
}
async fn session_handle(
&self,
_session_id: u64,
) -> Option<std::pin::Pin<Box<dyn Sink<ToSync<Self::Message>, Error = Self::Error>>>> {
let (tx, _) = mpsc::channel::<ToSync<Self::Message>>(128);
let sink = Box::pin(tx) as Pin<Box<dyn Sink<ToSync<Self::Message>, Error = Self::Error>>>;
Some(sink)
}
fn subscribe(&mut self) -> impl Stream<Item = FromSync<Self::Event>> + Send + Unpin + 'static {
let stream = BroadcastStream::new(self.event_tx.subscribe())
.filter_map(|event| async { event.ok() });
Box::pin(stream)
}
}
#[tokio::test]
async fn failed_sync_session_retry() {
setup_logging();
let topic = [0; 32].into();
for (alice_behavior, bob_behavior) in [
(SyncBehaviour::Panic, SyncBehaviour::Wait),
(SyncBehaviour::Wait, SyncBehaviour::Panic),
(SyncBehaviour::Error, SyncBehaviour::Wait),
(SyncBehaviour::Wait, SyncBehaviour::Error),
(SyncBehaviour::Error, SyncBehaviour::Error),
] {
let (bob_sync_config, _bob_rx) = FailingSyncArgs::new(bob_behavior);
let mut bob = FailingNode::spawn(random(), vec![], bob_sync_config).await;
let (alice_sync_config, _alice_rx) = FailingSyncArgs::new(alice_behavior);
let alice =
FailingNode::spawn(random(), vec![bob.args.node_info()], alice_sync_config).await;
let alice_handle = {
let manager_ref = call!(alice.sync_ref, ToSyncManager::Create, topic, true).unwrap();
SyncHandle::new(topic, alice.sync_ref.clone(), manager_ref)
};
let mut alice_subscription = alice_handle.subscribe().await.unwrap();
let _bob_handle = {
let manager_ref = call!(bob.sync_ref, ToSyncManager::Create, topic, true).unwrap();
SyncHandle::new(topic, bob.sync_ref.clone(), manager_ref)
};
alice_handle.initiate_session(bob.node_id());
let event = alice_subscription.next().await.unwrap();
let expected_remote = bob.node_id();
assert!(
matches!(
event,
Ok(FromSync {
session_id: 0,
remote,
event: DummySyncEvent::SessionCreated
}) if remote == expected_remote
),
"{:#?}",
event
);
let event = alice_subscription.next().await.unwrap();
assert!(
matches!(
event,
Ok(FromSync {
session_id: 1,
remote,
event: DummySyncEvent::SessionCreated
}) if remote == expected_remote
),
"{:#?}",
event
);
alice.shutdown();
bob.shutdown();
}
}