use core::fmt::Debug;
use core::marker::PhantomData;
use core::time::Duration;
use core::ops::DerefMut;
use codec::{Encode, Decode};
use libp2p::{identity, NetworkBehaviour, PeerId};
use libp2p::mdns::Mdns;
use libp2p::floodsub::{Floodsub, Topic, TopicBuilder};
use libp2p::kad::Kademlia;
use libp2p::core::swarm::{NetworkBehaviourEventProcess, NetworkBehaviourAction};
use futures::{Async, stream::Stream};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Interval;
use blockchain::backend::{SharedCommittable, ChainQuery, ImportLock};
use blockchain::import::BlockImporter;
use crate::{SimpleSyncMessage, SimpleSync, NetworkEnvironment, NetworkHandle, NetworkEvent, StatusProducer};
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "(PeerId, SimpleSyncMessage<B, S>)", poll_method = "poll")]
struct Behaviour<TSubstream: AsyncRead + AsyncWrite, B, S> {
floodsub: Floodsub<TSubstream>,
kademlia: Kademlia<TSubstream>,
mdns: Mdns<TSubstream>,
#[behaviour(ignore)]
topic: Topic,
#[behaviour(ignore)]
events: Vec<(PeerId, SimpleSyncMessage<B, S>)>,
}
impl<TSubstream: AsyncRead + AsyncWrite, B, S> Behaviour<TSubstream, B, S> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, (PeerId, SimpleSyncMessage<B, S>)>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
}
Async::NotReady
}
}
impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkEnvironment for Behaviour<TSubstream, B, S> {
type PeerId = PeerId;
type Message = SimpleSyncMessage<B, S>;
}
impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkHandle for Behaviour<TSubstream, B, S> where
B: Encode,
S: Encode,
{
fn send(&mut self, _peer: &PeerId, message: SimpleSyncMessage<B, S>) {
self.floodsub.publish(&self.topic, message.encode());
}
fn broadcast(&mut self, message: SimpleSyncMessage<B, S>) {
self.floodsub.publish(&self.topic, message.encode());
}
}
impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkBehaviourEventProcess<libp2p::floodsub::FloodsubEvent> for Behaviour<TSubstream, B, S> where
B: Encode + Decode + Debug,
S: Encode + Decode + Debug,
{
fn inject_event(&mut self, floodsub_message: libp2p::floodsub::FloodsubEvent) {
if let libp2p::floodsub::FloodsubEvent::Message(floodsub_message) = floodsub_message {
let message = SimpleSyncMessage::<B, S>::decode(&mut &floodsub_message.data[..]).unwrap();
self.events.push((floodsub_message.source.clone(), message));
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkBehaviourEventProcess<libp2p::kad::KademliaOut> for Behaviour<TSubstream, B, S> {
fn inject_event(&mut self, message: libp2p::kad::KademliaOut) {
if let libp2p::kad::KademliaOut::Discovered { peer_id, .. } = message {
println!("Discovered via Kademlia {:?}", peer_id);
self.floodsub.add_node_to_partial_view(peer_id);
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkBehaviourEventProcess<libp2p::mdns::MdnsEvent> for Behaviour<TSubstream, B, S> {
fn inject_event(&mut self, event: libp2p::mdns::MdnsEvent) {
match event {
libp2p::mdns::MdnsEvent::Discovered(list) => {
for (peer, _) in list {
self.floodsub.add_node_to_partial_view(peer);
}
},
libp2p::mdns::MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
self.floodsub.remove_node_from_partial_view(&peer);
}
}
}
}
}
}
pub fn start_network_simple_sync<Ba, I, St>(
port: &str,
backend: Ba,
import_lock: ImportLock,
importer: I,
status: St,
) where
Ba: SharedCommittable + ChainQuery + Send + Sync + 'static,
Ba::Block: Debug + Encode + Decode + Send + Sync,
I: BlockImporter<Block=Ba::Block> + Send + Sync + 'static,
St: StatusProducer + Send + Sync + 'static,
St::Status: Debug + Clone + Send + Sync,
{
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {:?}", local_peer_id);
let transport = libp2p::build_tcp_ws_secio_mplex_yamux(local_key);
let topic = TopicBuilder::new("blocks").build();
let mut sync = SimpleSync {
backend, importer, status, import_lock,
_marker: PhantomData,
};
let mut swarm = {
let mut behaviour = Behaviour {
floodsub: Floodsub::new(local_peer_id.clone()),
kademlia: Kademlia::new(local_peer_id.clone()),
mdns: libp2p::mdns::Mdns::new().expect("Failed to create mDNS service"),
topic: topic.clone(),
events: Vec::new(),
};
assert!(behaviour.floodsub.subscribe(topic.clone()));
libp2p::Swarm::new(transport, behaviour, local_peer_id)
};
let addr = libp2p::Swarm::listen_on(&mut swarm, format!("/ip4/0.0.0.0/tcp/{}", port).parse().unwrap()).unwrap();
println!("Listening on {:?}", addr);
let mut interval = Interval::new_interval(Duration::new(5, 0));
let mut listening = false;
tokio::run(futures::future::poll_fn(move || -> Result<_, ()> {
loop {
match interval.poll().expect("Error while polling interval") {
Async::Ready(Some(_)) => {
sync.on_tick(swarm.deref_mut());
},
Async::Ready(None) => panic!("Interval closed"),
Async::NotReady => break,
};
}
loop {
match swarm.poll().expect("Error while polling swarm") {
Async::Ready(Some((peer_id, message))) => {
println!("Received: {:?} from {:?}", message, peer_id);
sync.on_message(swarm.deref_mut(), &peer_id, message);
},
Async::Ready(None) | Async::NotReady => {
if !listening {
if let Some(a) = libp2p::Swarm::listeners(&swarm).next() {
println!("Listening on {:?}", a);
listening = true;
}
}
break
}
}
}
Ok(Async::NotReady)
}));
}