use {ConnectedChannel};
use bip_peer::{PeerManagerBuilder, PeerInfo, IPeerManagerMessage, OPeerManagerMessage};
use bip_peer::protocols::{NullProtocol};
use bip_peer::messages::PeerWireProtocolMessage;
use bip_handshake::Extensions;
use bip_util::bt;
use futures::{future, Future, AsyncSink};
use futures::sink::Sink;
use futures::stream::Stream;
use tokio_core::reactor::Core;
#[test]
fn positive_peer_manager_send_backpressure() {
let mut core = Core::new().unwrap();
let manager = PeerManagerBuilder::new()
.with_peer_capacity(1)
.build(core.handle());
let (peer_one, peer_two): (ConnectedChannel<PeerWireProtocolMessage<NullProtocol>, PeerWireProtocolMessage<NullProtocol>>,
ConnectedChannel<PeerWireProtocolMessage<NullProtocol>, PeerWireProtocolMessage<NullProtocol>>) = ::connected_channel(5);
let peer_one_info = PeerInfo::new("127.0.0.1:0".parse().unwrap(), [0u8; bt::PEER_ID_LEN].into(), [0u8; bt::INFO_HASH_LEN].into(), Extensions::new());
let peer_two_info = PeerInfo::new("127.0.0.1:1".parse().unwrap(), [1u8; bt::PEER_ID_LEN].into(), [1u8; bt::INFO_HASH_LEN].into(), Extensions::new());
let manager = core.run(manager.send(IPeerManagerMessage::AddPeer(peer_one_info, peer_one))).unwrap();
let (response, mut manager) = core.run(manager.into_future().map(|(opt_item, stream)| (opt_item.unwrap(), stream)).map_err(|_| ())).unwrap();
match response {
OPeerManagerMessage::PeerAdded(info) => assert_eq!(peer_one_info, info),
_ => panic!("Unexpected First Peer Manager Response")
};
let (response, manager) = core.run(future::lazy(|| {
future::ok::<_, ()>((manager.start_send(IPeerManagerMessage::AddPeer(peer_two_info, peer_two)), manager))
})).unwrap();
let peer_two = match response {
Ok(AsyncSink::NotReady(IPeerManagerMessage::AddPeer(info, peer_two))) => { assert_eq!(peer_two_info, info); peer_two },
_ => panic!("Unexpected Second Peer Manager Response")
};
let manager = core.run(manager.send(IPeerManagerMessage::RemovePeer(peer_one_info))).unwrap();
let (response, manager) = core.run(manager.into_future().map(|(opt_item, stream)| (opt_item.unwrap(), stream)).map_err(|_| ())).unwrap();
match response {
OPeerManagerMessage::PeerRemoved(info) => assert_eq!(peer_one_info, info),
_ => panic!("Unexpected Third Peer Manager Response")
};
let manager = core.run(manager.send(IPeerManagerMessage::AddPeer(peer_two_info, peer_two))).unwrap();
let (response, _manager) = core.run(manager.into_future().map(|(opt_item, stream)| (opt_item.unwrap(), stream)).map_err(|_| ())).unwrap();
match response {
OPeerManagerMessage::PeerAdded(info) => assert_eq!(peer_two_info, info),
_ => panic!("Unexpected Fourth Peer Manager Response")
};
}