use crate::common::CurrentNetwork;
use snarkos_node_bft_events::{Event, EventCodec};
use std::{
io,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use pea2pea::{
Config,
Connection,
ConnectionSide,
Node,
Pea2Pea,
protocols::{Handshake, OnDisconnect, Reading, Writing},
};
use tokio::{
sync::mpsc::{self, Receiver, Sender},
time::timeout,
};
pub struct TestPeer {
inner_node: InnerNode,
inbound_rx: Receiver<(SocketAddr, Event<CurrentNetwork>)>,
}
#[derive(Clone)]
struct InnerNode {
node: Node,
inbound_tx: Sender<(SocketAddr, Event<CurrentNetwork>)>,
}
impl TestPeer {
pub async fn new() -> Self {
let (tx, rx) = mpsc::channel(100);
let inner_node = InnerNode {
node: Node::new(Config {
max_connections: 200,
listener_addr: Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)),
..Default::default()
}),
inbound_tx: tx,
};
inner_node.enable_handshake().await;
inner_node.enable_reading().await;
inner_node.enable_writing().await;
inner_node.enable_disconnect().await;
inner_node.node().start_listening().await.unwrap();
Self { inner_node, inbound_rx: rx }
}
pub fn listening_addr(&self) -> SocketAddr {
self.inner_node.node().listening_addr().expect("addr should be present")
}
pub async fn connect(&self, target: SocketAddr) -> io::Result<()> {
self.inner_node.node().connect(target).await?;
Ok(())
}
pub fn unicast(&self, target: SocketAddr, message: Event<CurrentNetwork>) -> io::Result<()> {
self.inner_node.unicast(target, message)?;
Ok(())
}
pub async fn recv(&mut self) -> (SocketAddr, Event<CurrentNetwork>) {
match self.inbound_rx.recv().await {
Some(message) => message,
None => panic!("all senders dropped!"),
}
}
pub async fn recv_timeout(&mut self, duration: Duration) -> (SocketAddr, Event<CurrentNetwork>) {
match timeout(duration, self.recv()).await {
Ok(message) => message,
_ => panic!("timed out waiting for message"),
}
}
}
impl Pea2Pea for InnerNode {
fn node(&self) -> &Node {
&self.node
}
}
impl Handshake for InnerNode {
const TIMEOUT_MS: u64 = 10_000;
async fn perform_handshake(&self, connection: Connection) -> io::Result<Connection> {
Ok(connection)
}
}
impl Writing for InnerNode {
type Codec = EventCodec<CurrentNetwork>;
type Message = Event<CurrentNetwork>;
fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
Default::default()
}
}
impl Reading for InnerNode {
type Codec = EventCodec<CurrentNetwork>;
type Message = Event<CurrentNetwork>;
fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
Default::default()
}
async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
self.inbound_tx
.send((peer_addr, message))
.await
.map_err(|_| io::Error::other("failed to send message to test peer, all receivers have been dropped"))
}
}
impl OnDisconnect for InnerNode {
async fn on_disconnect(&self, _peer_addr: SocketAddr) {}
}