1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
use core::fmt::Debug;
use core::marker::PhantomData;
use core::time::Duration;
use core::ops::DerefMut;
use codec::{Encode, Decode};
use libp2p::{identity, NetworkBehaviour, PeerId};
use libp2p::mdns::Mdns;
use libp2p::floodsub::{Floodsub, Topic, TopicBuilder};
use libp2p::kad::Kademlia;
use libp2p::core::swarm::{NetworkBehaviourEventProcess, NetworkBehaviourAction};
use futures::{Async, stream::Stream};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Interval;
use blockchain::backend::{SharedCommittable, ChainQuery, ImportLock};
use blockchain::import::BlockImporter;
use crate::{SimpleSyncMessage, SimpleSync, NetworkEnvironment, NetworkHandle, NetworkEvent, StatusProducer};

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "(PeerId, SimpleSyncMessage<B, S>)", poll_method = "poll")]
struct Behaviour<TSubstream: AsyncRead + AsyncWrite, B, S> {
	floodsub: Floodsub<TSubstream>,
	kademlia: Kademlia<TSubstream>,
	mdns: Mdns<TSubstream>,

	#[behaviour(ignore)]
	topic: Topic,
	#[behaviour(ignore)]
	events: Vec<(PeerId, SimpleSyncMessage<B, S>)>,
}

impl<TSubstream: AsyncRead + AsyncWrite, B, S> Behaviour<TSubstream, B, S> {
	fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, (PeerId, SimpleSyncMessage<B, S>)>> {
		if !self.events.is_empty() {
			return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
		}

		Async::NotReady
	}
}

impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkEnvironment for Behaviour<TSubstream, B, S> {
	type PeerId = PeerId;
	type Message = SimpleSyncMessage<B, S>;
}

impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkHandle for Behaviour<TSubstream, B, S>  where
	B: Encode,
	S: Encode,
{
	fn send(&mut self, _peer: &PeerId, message: SimpleSyncMessage<B, S>) {
		self.floodsub.publish(&self.topic, message.encode());
	}

	fn broadcast(&mut self, message: SimpleSyncMessage<B, S>) {
		self.floodsub.publish(&self.topic, message.encode());
	}
}

impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkBehaviourEventProcess<libp2p::floodsub::FloodsubEvent> for Behaviour<TSubstream, B, S> where
	B: Encode + Decode + Debug,
	S: Encode + Decode + Debug,
{
	fn inject_event(&mut self, floodsub_message: libp2p::floodsub::FloodsubEvent) {
		if let libp2p::floodsub::FloodsubEvent::Message(floodsub_message) = floodsub_message {
			let message = SimpleSyncMessage::<B, S>::decode(&mut &floodsub_message.data[..]).unwrap();

			self.events.push((floodsub_message.source.clone(), message));
		}
	}
}


impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkBehaviourEventProcess<libp2p::kad::KademliaOut> for Behaviour<TSubstream, B, S> {
	fn inject_event(&mut self, message: libp2p::kad::KademliaOut) {
		if let libp2p::kad::KademliaOut::Discovered { peer_id, .. } = message {
			println!("Discovered via Kademlia {:?}", peer_id);
			self.floodsub.add_node_to_partial_view(peer_id);
		}
	}
}

impl<TSubstream: AsyncRead + AsyncWrite, B, S> NetworkBehaviourEventProcess<libp2p::mdns::MdnsEvent> for Behaviour<TSubstream, B, S> {
    fn inject_event(&mut self, event: libp2p::mdns::MdnsEvent) {
        match event {
            libp2p::mdns::MdnsEvent::Discovered(list) => {
                for (peer, _) in list {
                    self.floodsub.add_node_to_partial_view(peer);
                }
            },
            libp2p::mdns::MdnsEvent::Expired(list) => {
                for (peer, _) in list {
                    if !self.mdns.has_node(&peer) {
                        self.floodsub.remove_node_from_partial_view(&peer);
                    }
                }
            }
        }
    }
}

pub fn start_network_simple_sync<Ba, I, St>(
	port: &str,
	backend: Ba,
	import_lock: ImportLock,
	importer: I,
	status: St,
) where
	Ba: SharedCommittable + ChainQuery + Send + Sync + 'static,
	Ba::Block: Debug + Encode + Decode + Send + Sync,
	I: BlockImporter<Block=Ba::Block> + Send + Sync + 'static,
	St: StatusProducer + Send + Sync + 'static,
	St::Status: Debug + Clone + Send + Sync,
{
    // Create a random PeerId
    let local_key = identity::Keypair::generate_ed25519();
    let local_peer_id = PeerId::from(local_key.public());
	println!("Local peer id: {:?}", local_peer_id);

	let transport = libp2p::build_tcp_ws_secio_mplex_yamux(local_key);
	let topic = TopicBuilder::new("blocks").build();

	let mut sync = SimpleSync {
		backend, importer, status, import_lock,
		_marker: PhantomData,
	};

	let mut swarm = {
		let mut behaviour = Behaviour {
			floodsub: Floodsub::new(local_peer_id.clone()),
			kademlia: Kademlia::new(local_peer_id.clone()),
			mdns: libp2p::mdns::Mdns::new().expect("Failed to create mDNS service"),

			topic: topic.clone(),
			events: Vec::new(),
		};

		assert!(behaviour.floodsub.subscribe(topic.clone()));
		libp2p::Swarm::new(transport, behaviour, local_peer_id)
	};

	// Listen on all interfaces and whatever port the OS assigns
	let addr = libp2p::Swarm::listen_on(&mut swarm, format!("/ip4/0.0.0.0/tcp/{}", port).parse().unwrap()).unwrap();
	println!("Listening on {:?}", addr);

	let mut interval = Interval::new_interval(Duration::new(5, 0));
	let mut listening = false;
    tokio::run(futures::future::poll_fn(move || -> Result<_, ()> {
        loop {
            match interval.poll().expect("Error while polling interval") {
                Async::Ready(Some(_)) => {
					sync.on_tick(swarm.deref_mut());
				},
                Async::Ready(None) => panic!("Interval closed"),
                Async::NotReady => break,
            };
        }

        loop {
            match swarm.poll().expect("Error while polling swarm") {
                Async::Ready(Some((peer_id, message))) => {
					println!("Received: {:?} from {:?}", message, peer_id);
					sync.on_message(swarm.deref_mut(), &peer_id, message);
				},
                Async::Ready(None) | Async::NotReady => {
                    if !listening {
                        if let Some(a) = libp2p::Swarm::listeners(&swarm).next() {
                            println!("Listening on {:?}", a);
                            listening = true;
                        }
                    }
                    break
                }
            }
        }

        Ok(Async::NotReady)
	}));
}