eigen_trust/
node.rs

1//! The module for the node setup, running the main loop, and handling network
2//! events.
3
4use crate::{
5	epoch::Epoch,
6	peer::Peer,
7	protocol::{EigenTrustCodec, EigenTrustProtocol, Request, Response},
8	EigenError,
9};
10use futures::StreamExt;
11use libp2p::{
12	core::upgrade::Version,
13	identity::Keypair,
14	noise::{Keypair as NoiseKeypair, NoiseConfig, X25519Spec},
15	request_response::{
16		ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
17		RequestResponseMessage,
18	},
19	swarm::{ConnectionHandlerUpgrErr, ConnectionLimits, Swarm, SwarmBuilder, SwarmEvent},
20	tcp::TcpConfig,
21	yamux::YamuxConfig,
22	Multiaddr, PeerId, Transport,
23};
24use std::{io::Error as IoError, iter::once, marker::PhantomData};
25use tokio::{
26	select,
27	time::{self, Duration, Instant},
28};
29
30/// Node configuration crate.
31pub trait NodeConfig {
32	/// The number of neighbors the peer can have.
33	/// This is also the maximum number of peers that can be connected to the
34	/// node.
35	const NUM_CONNECTIONS: usize;
36	/// Duration of the Epoch.
37	const INTERVAL: u64;
38	/// Weight of the pre-trusted score.
39	const PRE_TRUST_WEIGHT: f64;
40}
41
42/// The Node struct.
43pub struct Node<C: NodeConfig> {
44	/// Swarm object.
45	swarm: Swarm<RequestResponse<EigenTrustCodec>>,
46	/// Peer managed by the node.
47	peer: Peer,
48	/// Local keypair.
49	local_key: Keypair,
50	/// Bootstrap nodes.
51	bootstrap_nodes: Vec<(PeerId, Multiaddr, f64)>,
52	_config: PhantomData<C>,
53}
54
55impl<C: NodeConfig> Node<C> {
56	/// Create a new node, given the local keypair, local address, and bootstrap
57	/// nodes.
58	pub fn new(
59		local_key: Keypair,
60		local_address: Multiaddr,
61		bootstrap_nodes: Vec<(PeerId, Multiaddr, f64)>,
62	) -> Result<Self, EigenError> {
63		let noise_keys = NoiseKeypair::<X25519Spec>::new()
64			.into_authentic(&local_key)
65			.map_err(|e| {
66				log::error!("NoiseKeypair.into_authentic {}", e);
67				EigenError::InvalidKeypair
68			})?;
69
70		// 30 years in seconds
71		// Basically, we want connections to be open for a long time.
72		let connection_duration = Duration::from_secs(86400 * 365 * 30);
73		let transport = TcpConfig::new()
74			.nodelay(true)
75			.upgrade(Version::V1)
76			.authenticate(NoiseConfig::xx(noise_keys).into_authenticated())
77			.multiplex(YamuxConfig::default())
78			.timeout(connection_duration)
79			.boxed();
80
81		// Setting up the request/response protocol.
82		let protocols = once((EigenTrustProtocol::new(), ProtocolSupport::Full));
83		let mut cfg = RequestResponseConfig::default();
84		// Keep the connection alive in request/response protocol
85		cfg.set_connection_keep_alive(connection_duration);
86		// If we failed to get response during the interval duration, cancel it.
87		cfg.set_request_timeout(Duration::from_secs(C::INTERVAL));
88		let req_proto = RequestResponse::new(EigenTrustCodec, protocols, cfg);
89
90		// Setting up the transport and swarm.
91		let local_peer_id = PeerId::from(local_key.public());
92		// Limit the number of connections to be same as the number of neighbors in the
93		// config.
94		let num_connections =
95			u32::try_from(C::NUM_CONNECTIONS).map_err(|_| EigenError::InvalidNumNeighbours)?;
96		let connection_limits =
97			ConnectionLimits::default().with_max_established_per_peer(Some(num_connections));
98
99		let mut swarm = SwarmBuilder::new(transport, req_proto, local_peer_id)
100			.connection_limits(connection_limits)
101			.build();
102
103		swarm.listen_on(local_address).map_err(|e| {
104			log::debug!("swarm.listen_on {:?}", e);
105			EigenError::ListenFailed
106		})?;
107
108		// Init the peer struct and give it a pre trust score, if we are a bootstrap
109		// node.
110		let pre_trust_score = bootstrap_nodes
111			.iter()
112			.find(|x| x.0 == local_peer_id)
113			.map(|node| node.2)
114			.unwrap_or(0.0);
115		let peer = Peer::new(C::NUM_CONNECTIONS, pre_trust_score, C::PRE_TRUST_WEIGHT);
116
117		Ok(Self {
118			swarm,
119			peer,
120			local_key,
121			bootstrap_nodes,
122			_config: PhantomData,
123		})
124	}
125
126	/// Get the peer struct.
127	pub fn get_peer(&self) -> &Peer {
128		&self.peer
129	}
130
131	/// Get the mutable peer struct.
132	pub fn get_peer_mut(&mut self) -> &mut Peer {
133		&mut self.peer
134	}
135
136	/// Get the mutable swarm.
137	pub fn get_swarm_mut(&mut self) -> &mut Swarm<RequestResponse<EigenTrustCodec>> {
138		&mut self.swarm
139	}
140
141	/// Method for handling the request/response events.
142	pub fn handle_req_res_events(&mut self, event: RequestResponseEvent<Request, Response>) {
143		use RequestResponseEvent::*;
144		use RequestResponseMessage::{Request as Req, Response as Res};
145		match event {
146			Message {
147				peer,
148				message: Req {
149					request, channel, ..
150				},
151			} => {
152				let beh = self.swarm.behaviour_mut();
153				// First we calculate the local opinions for the requested epoch.
154				self.peer.calculate_local_opinions(request.get_epoch());
155				// Then we send the local opinion to the peer.
156				let opinion = self.peer.get_local_opinion(&(peer, request.get_epoch()));
157				let response = Response::Success(opinion);
158				let res = beh.send_response(channel, response);
159				if let Err(e) = res {
160					log::error!("Failed to send the response {:?}", e);
161				}
162			},
163			Message {
164				peer,
165				message: Res { response, .. },
166			} => {
167				// If we receive a response, we update the neighbors's opinion about us.
168				// TODO: Check the validity of the opinion, by verifying a zero-knowledge proof.
169				if let Response::Success(opinion) = response {
170					self.peer
171						.cache_neighbor_opinion((peer, opinion.get_epoch()), opinion);
172				} else {
173					log::error!("Received error response {:?}", response);
174				}
175			},
176			OutboundFailure {
177				peer, request_id, ..
178			} => {
179				log::error!("Outbound failure {:?} from {:?}", request_id, peer);
180			},
181			InboundFailure {
182				peer, request_id, ..
183			} => {
184				log::error!("Inbound failure {:?} from {:?}", request_id, peer);
185			},
186			ResponseSent { peer, request_id } => {
187				log::debug!("Response sent {:?} to {:?}", request_id, peer);
188			},
189		};
190	}
191
192	/// A method for handling the swarm events.
193	pub fn handle_swarm_events(
194		&mut self,
195		event: SwarmEvent<
196			RequestResponseEvent<Request, Response>,
197			ConnectionHandlerUpgrErr<IoError>,
198		>,
199	) {
200		match event {
201			SwarmEvent::NewListenAddr { address, .. } => log::info!("Listening on {:?}", address),
202			// Handle request/response events.
203			SwarmEvent::Behaviour(req_res_event) => self.handle_req_res_events(req_res_event),
204			// When we connect to a peer, we automatically add him as a neighbor.
205			SwarmEvent::ConnectionEstablished { peer_id, .. } => {
206				let res = self.peer.add_neighbor(peer_id);
207				if let Err(e) = res {
208					log::error!("Failed to add neighbor {:?}", e);
209				}
210				log::info!("Connection established with {:?}", peer_id);
211			},
212			// When we disconnect from a peer, we automatically remove him from the neighbors list.
213			SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
214				self.peer.remove_neighbor(peer_id);
215				log::info!("Connection closed with {:?} ({:?})", peer_id, cause);
216			},
217			SwarmEvent::Dialing(peer_id) => log::info!("Dialing {:?}", peer_id),
218			e => log::debug!("{:?}", e),
219		}
220	}
221
222	/// Dial the neighbor directly.
223	pub fn dial_neighbor(&mut self, addr: Multiaddr) {
224		let res = self.swarm.dial(addr).map_err(|_| EigenError::DialError);
225		log::debug!("swarm.dial {:?}", res);
226	}
227
228	/// Dial pre-configured bootstrap nodes.
229	pub fn dial_bootstrap_nodes(&mut self) {
230		// We want to connect to all bootstrap nodes.
231		let local_peer_id = self.local_key.public().to_peer_id();
232		for (peer_id, peer_addr, _) in self.bootstrap_nodes.iter_mut() {
233			if peer_id == &local_peer_id {
234				continue;
235			}
236
237			let res = self
238				.swarm
239				.dial(peer_addr.clone())
240				.map_err(|_| EigenError::DialError);
241			log::debug!("swarm.dial {:?}", res);
242		}
243	}
244
245	/// Send the request for an opinion to all neighbors, in the passed epoch.
246	pub fn send_epoch_requests(&mut self, epoch: Epoch) {
247		for peer_id in self.peer.neighbors() {
248			let beh = self.swarm.behaviour_mut();
249
250			let request = Request::new(epoch);
251			beh.send_request(&peer_id, request);
252		}
253	}
254
255	/// Start the main loop of the program. This function has two main tasks:
256	/// - To start an interval timer for sending the request for opinions.
257	/// - To handle the swarm + request/response events.
258	/// The amount of intervals/epochs is determined by the `interval_limit`
259	/// parameter.
260	pub async fn main_loop(mut self, interval_limit: Option<u32>) -> Result<(), EigenError> {
261		self.dial_bootstrap_nodes();
262
263		let now = Instant::now();
264		let secs_until_next_epoch = Epoch::secs_until_next_epoch(C::INTERVAL)?;
265		// Figure out when the next epoch will start.
266		let start = now + Duration::from_secs(secs_until_next_epoch);
267		let period = Duration::from_secs(C::INTERVAL);
268
269		// Setup the interval timer.
270		let mut interval = time::interval_at(start, period);
271
272		// Count the number of epochs passed
273		let mut count = 0;
274
275		loop {
276			select! {
277				biased;
278				// The interval timer tick. This is where we request opinions from the neighbors.
279				_ = interval.tick() => {
280					let current_epoch = Epoch::current_epoch(C::INTERVAL)?;
281
282					// Log out the global trust score for the previous epoch.
283					let score = self.peer.calculate_global_trust_score(current_epoch.previous());
284					log::info!("{:?} finished, score: {}", current_epoch.previous(), score);
285
286					// Send the request for opinions to all neighbors.
287					self.send_epoch_requests(current_epoch);
288
289					// Increment the epoch counter, break out of the loop if we reached the limit
290					if let Some(num) = interval_limit {
291						count += 1;
292						if count >= num {
293							break;
294						}
295					}
296				},
297				// The swarm event.
298				event = self.swarm.select_next_some() => self.handle_swarm_events(event),
299			}
300		}
301
302		Ok(())
303	}
304}
305
306#[cfg(test)]
307mod tests {
308	use super::*;
309	use std::str::FromStr;
310
311	struct TestConfig;
312	impl NodeConfig for TestConfig {
313		const INTERVAL: u64 = 10;
314		const NUM_CONNECTIONS: usize = 1;
315		const PRE_TRUST_WEIGHT: f64 = 0.5;
316	}
317
318	const PRE_TRUST_SCORE: f64 = 0.5;
319
320	#[tokio::test]
321	async fn should_emit_connection_event_on_bootstrap() {
322		const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56706";
323		const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58601";
324
325		let local_key1 = Keypair::generate_ed25519();
326		let peer_id1 = local_key1.public().to_peer_id();
327
328		let local_key2 = Keypair::generate_ed25519();
329		let peer_id2 = local_key2.public().to_peer_id();
330
331		let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
332		let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
333
334		let bootstrap_nodes = vec![
335			(peer_id1, local_address1.clone(), PRE_TRUST_SCORE),
336			(peer_id2, local_address2.clone(), PRE_TRUST_SCORE),
337		];
338
339		let mut node1 =
340			Node::<TestConfig>::new(local_key1, local_address1.clone(), bootstrap_nodes.clone())
341				.unwrap();
342		let mut node2 =
343			Node::<TestConfig>::new(local_key2, local_address2, bootstrap_nodes).unwrap();
344
345		node1.dial_bootstrap_nodes();
346
347		// For node 2
348		// 1. New listen addr
349		// 2. Incoming connection
350		// 3. Connection established
351		// For node 1
352		// 1. New listen addr
353		// 2. Connection established
354		for _ in 0..5 {
355			select! {
356				event2 = node2.get_swarm_mut().select_next_some() => {
357					if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event2 {
358						assert_eq!(peer_id, peer_id1);
359					}
360				},
361				event1 = node1.get_swarm_mut().select_next_some() => {
362					if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event1 {
363						assert_eq!(peer_id, peer_id2);
364					}
365				},
366
367			}
368		}
369	}
370
371	#[tokio::test]
372	async fn should_add_neighbors_on_bootstrap() {
373		const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56707";
374		const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58602";
375
376		let local_key1 = Keypair::generate_ed25519();
377		let peer_id1 = local_key1.public().to_peer_id();
378
379		let local_key2 = Keypair::generate_ed25519();
380		let peer_id2 = local_key2.public().to_peer_id();
381
382		let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
383		let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
384
385		let bootstrap_nodes = vec![
386			(peer_id1, local_address1.clone(), PRE_TRUST_SCORE),
387			(peer_id2, local_address2.clone(), PRE_TRUST_SCORE),
388		];
389
390		let mut node1 =
391			Node::<TestConfig>::new(local_key1, local_address1, bootstrap_nodes.clone()).unwrap();
392		let mut node2 =
393			Node::<TestConfig>::new(local_key2, local_address2, bootstrap_nodes).unwrap();
394
395		node1.dial_bootstrap_nodes();
396
397		// For node 2
398		// 1. New listen addr
399		// 2. Incoming connection
400		// 3. Connection established
401		// For node 1
402		// 1. New listen addr
403		// 2. Connection established
404		for _ in 0..5 {
405			select! {
406				event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
407				event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
408
409			}
410		}
411
412		let neighbors1: Vec<PeerId> = node1.get_peer().neighbors();
413		let neighbors2: Vec<PeerId> = node2.get_peer().neighbors();
414		let expected_neighbor1 = vec![peer_id2];
415		let expected_neighbor2 = vec![peer_id1];
416		assert_eq!(neighbors1, expected_neighbor1);
417		assert_eq!(neighbors2, expected_neighbor2);
418
419		// Disconnect from peer
420		node2.get_swarm_mut().disconnect_peer_id(peer_id1).unwrap();
421
422		// Two disconnect events
423		for _ in 0..2 {
424			select! {
425				event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
426				event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
427
428			}
429		}
430
431		let neighbors2: Vec<PeerId> = node2.get_peer().neighbors();
432		let neighbors1: Vec<PeerId> = node1.get_peer().neighbors();
433		assert!(neighbors2.is_empty());
434		assert!(neighbors1.is_empty());
435	}
436
437	#[tokio::test]
438	async fn should_add_neighbors_on_dial() {
439		const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56717";
440		const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58622";
441
442		let local_key1 = Keypair::generate_ed25519();
443		let peer_id1 = local_key1.public().to_peer_id();
444
445		let local_key2 = Keypair::generate_ed25519();
446		let peer_id2 = local_key2.public().to_peer_id();
447
448		let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
449		let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
450
451		let mut node1 = Node::<TestConfig>::new(local_key1, local_address1, Vec::new()).unwrap();
452		let mut node2 =
453			Node::<TestConfig>::new(local_key2, local_address2.clone(), Vec::new()).unwrap();
454
455		node1.dial_neighbor(local_address2);
456
457		// For node 2
458		// 1. New listen addr
459		// 2. Incoming connection
460		// 3. Connection established
461		// For node 1
462		// 1. New listen addr
463		// 2. Connection established
464		for _ in 0..5 {
465			select! {
466				event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
467				event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
468
469			}
470		}
471
472		let neighbors1: Vec<PeerId> = node1.get_peer().neighbors();
473		let neighbors2: Vec<PeerId> = node2.get_peer().neighbors();
474		let expected_neighbor1 = vec![peer_id2];
475		let expected_neighbor2 = vec![peer_id1];
476		assert_eq!(neighbors1, expected_neighbor1);
477		assert_eq!(neighbors2, expected_neighbor2);
478
479		// Disconnect from peer
480		node2.get_swarm_mut().disconnect_peer_id(peer_id1).unwrap();
481
482		// Two disconnect events
483		for _ in 0..2 {
484			select! {
485				event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
486				event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
487
488			}
489		}
490
491		let neighbors2: Vec<PeerId> = node2.get_peer().neighbors();
492		let neighbors1: Vec<PeerId> = node1.get_peer().neighbors();
493		assert!(neighbors2.is_empty());
494		assert!(neighbors1.is_empty());
495	}
496
497	#[tokio::test]
498	async fn should_handle_request_for_opinion() {
499		const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56708";
500		const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58603";
501
502		let local_key1 = Keypair::generate_ed25519();
503		let peer_id1 = local_key1.public().to_peer_id();
504
505		let local_key2 = Keypair::generate_ed25519();
506		let peer_id2 = local_key2.public().to_peer_id();
507
508		let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
509		let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
510
511		let bootstrap_nodes = vec![
512			(peer_id1, local_address1.clone(), PRE_TRUST_SCORE),
513			(peer_id2, local_address2.clone(), PRE_TRUST_SCORE),
514		];
515
516		let mut node1 =
517			Node::<TestConfig>::new(local_key1, local_address1, bootstrap_nodes.clone()).unwrap();
518		let mut node2 =
519			Node::<TestConfig>::new(local_key2, local_address2, bootstrap_nodes).unwrap();
520
521		node1.dial_bootstrap_nodes();
522
523		// For node 2
524		// 1. New listen addr
525		// 2. Incoming connection
526		// 3. Connection established
527		// For node 1
528		// 1. New listen addr
529		// 2. Connection established
530		for _ in 0..5 {
531			select! {
532				event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
533				event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
534			}
535		}
536
537		let peer1 = node1.get_peer_mut();
538		let peer2 = node2.get_peer_mut();
539
540		let current_epoch = Epoch(0);
541		let next_epoch = current_epoch.next();
542
543		peer1.set_score(peer_id2, 5);
544		peer2.set_score(peer_id1, 5);
545
546		peer1.calculate_local_opinions(current_epoch);
547		peer2.calculate_local_opinions(current_epoch);
548
549		node1.send_epoch_requests(next_epoch);
550		node2.send_epoch_requests(next_epoch);
551
552		// Expecting 2 request messages
553		// Expecting 2 response sent messages
554		// Expecting 2 response received messages
555		// Total of 6 messages
556		for _ in 0..6 {
557			select! {
558				event1 = node1.get_swarm_mut().select_next_some() => {
559					node1.handle_swarm_events(event1);
560				},
561				event2 = node2.get_swarm_mut().select_next_some() => {
562					node2.handle_swarm_events(event2);
563				},
564			}
565		}
566
567		let peer1 = node1.get_peer();
568		let peer2 = node2.get_peer();
569		let peer1_neighbor_opinion = peer1.get_neighbor_opinion(&(peer_id2, next_epoch));
570		let peer2_neighbor_opinion = peer2.get_neighbor_opinion(&(peer_id1, next_epoch));
571
572		assert_eq!(peer1_neighbor_opinion.get_epoch(), next_epoch);
573		assert_eq!(peer1_neighbor_opinion.get_local_trust_score(), 1.0);
574		assert_eq!(peer1_neighbor_opinion.get_global_trust_score(), 0.25);
575		assert_eq!(peer1_neighbor_opinion.get_product(), 0.25);
576
577		assert_eq!(peer2_neighbor_opinion.get_epoch(), next_epoch);
578		assert_eq!(peer2_neighbor_opinion.get_local_trust_score(), 1.0);
579		assert_eq!(peer2_neighbor_opinion.get_global_trust_score(), 0.25);
580		assert_eq!(peer2_neighbor_opinion.get_product(), 0.25);
581
582		let peer1_global_score = peer1.calculate_global_trust_score(next_epoch);
583		let peer2_global_score = peer1.calculate_global_trust_score(next_epoch);
584
585		let peer_gs = (1. - TestConfig::PRE_TRUST_WEIGHT) * 0.25
586			+ TestConfig::PRE_TRUST_WEIGHT * PRE_TRUST_SCORE;
587		assert_eq!(peer1_global_score, peer_gs);
588		assert_eq!(peer2_global_score, peer_gs);
589	}
590
591	#[tokio::test]
592	async fn should_run_main_loop() {
593		const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56728";
594		const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58623";
595
596		let local_key1 = Keypair::generate_ed25519();
597		let peer_id1 = local_key1.public().to_peer_id();
598
599		let local_key2 = Keypair::generate_ed25519();
600		let peer_id2 = local_key2.public().to_peer_id();
601
602		let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
603		let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
604
605		let bootstrap_nodes = vec![
606			(peer_id1, local_address1.clone(), PRE_TRUST_SCORE),
607			(peer_id2, local_address2.clone(), PRE_TRUST_SCORE),
608		];
609
610		let mut node1 =
611			Node::<TestConfig>::new(local_key1, local_address1, bootstrap_nodes.clone()).unwrap();
612		let node2 = Node::<TestConfig>::new(local_key2, local_address2, bootstrap_nodes).unwrap();
613
614		node1.dial_bootstrap_nodes();
615
616		let join1 = tokio::spawn(async move { node1.main_loop(Some(1)).await });
617
618		let join2 = tokio::spawn(async move { node2.main_loop(Some(1)).await });
619
620		let (res1, res2) = tokio::join!(join1, join2);
621		res1.unwrap().unwrap();
622		res2.unwrap().unwrap();
623	}
624}