1use crate::{
5 epoch::Epoch,
6 peer::Peer,
7 protocol::{EigenTrustCodec, EigenTrustProtocol, Request, Response},
8 EigenError,
9};
10use futures::StreamExt;
11use libp2p::{
12 core::upgrade::Version,
13 identity::Keypair,
14 noise::{Keypair as NoiseKeypair, NoiseConfig, X25519Spec},
15 request_response::{
16 ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
17 RequestResponseMessage,
18 },
19 swarm::{ConnectionHandlerUpgrErr, ConnectionLimits, Swarm, SwarmBuilder, SwarmEvent},
20 tcp::TcpConfig,
21 yamux::YamuxConfig,
22 Multiaddr, PeerId, Transport,
23};
24use std::{io::Error as IoError, iter::once, marker::PhantomData};
25use tokio::{
26 select,
27 time::{self, Duration, Instant},
28};
29
30pub trait NodeConfig {
32 const NUM_CONNECTIONS: usize;
36 const INTERVAL: u64;
38 const PRE_TRUST_WEIGHT: f64;
40}
41
42pub struct Node<C: NodeConfig> {
44 swarm: Swarm<RequestResponse<EigenTrustCodec>>,
46 peer: Peer,
48 local_key: Keypair,
50 bootstrap_nodes: Vec<(PeerId, Multiaddr, f64)>,
52 _config: PhantomData<C>,
53}
54
55impl<C: NodeConfig> Node<C> {
56 pub fn new(
59 local_key: Keypair,
60 local_address: Multiaddr,
61 bootstrap_nodes: Vec<(PeerId, Multiaddr, f64)>,
62 ) -> Result<Self, EigenError> {
63 let noise_keys = NoiseKeypair::<X25519Spec>::new()
64 .into_authentic(&local_key)
65 .map_err(|e| {
66 log::error!("NoiseKeypair.into_authentic {}", e);
67 EigenError::InvalidKeypair
68 })?;
69
70 let connection_duration = Duration::from_secs(86400 * 365 * 30);
73 let transport = TcpConfig::new()
74 .nodelay(true)
75 .upgrade(Version::V1)
76 .authenticate(NoiseConfig::xx(noise_keys).into_authenticated())
77 .multiplex(YamuxConfig::default())
78 .timeout(connection_duration)
79 .boxed();
80
81 let protocols = once((EigenTrustProtocol::new(), ProtocolSupport::Full));
83 let mut cfg = RequestResponseConfig::default();
84 cfg.set_connection_keep_alive(connection_duration);
86 cfg.set_request_timeout(Duration::from_secs(C::INTERVAL));
88 let req_proto = RequestResponse::new(EigenTrustCodec, protocols, cfg);
89
90 let local_peer_id = PeerId::from(local_key.public());
92 let num_connections =
95 u32::try_from(C::NUM_CONNECTIONS).map_err(|_| EigenError::InvalidNumNeighbours)?;
96 let connection_limits =
97 ConnectionLimits::default().with_max_established_per_peer(Some(num_connections));
98
99 let mut swarm = SwarmBuilder::new(transport, req_proto, local_peer_id)
100 .connection_limits(connection_limits)
101 .build();
102
103 swarm.listen_on(local_address).map_err(|e| {
104 log::debug!("swarm.listen_on {:?}", e);
105 EigenError::ListenFailed
106 })?;
107
108 let pre_trust_score = bootstrap_nodes
111 .iter()
112 .find(|x| x.0 == local_peer_id)
113 .map(|node| node.2)
114 .unwrap_or(0.0);
115 let peer = Peer::new(C::NUM_CONNECTIONS, pre_trust_score, C::PRE_TRUST_WEIGHT);
116
117 Ok(Self {
118 swarm,
119 peer,
120 local_key,
121 bootstrap_nodes,
122 _config: PhantomData,
123 })
124 }
125
126 pub fn get_peer(&self) -> &Peer {
128 &self.peer
129 }
130
131 pub fn get_peer_mut(&mut self) -> &mut Peer {
133 &mut self.peer
134 }
135
136 pub fn get_swarm_mut(&mut self) -> &mut Swarm<RequestResponse<EigenTrustCodec>> {
138 &mut self.swarm
139 }
140
141 pub fn handle_req_res_events(&mut self, event: RequestResponseEvent<Request, Response>) {
143 use RequestResponseEvent::*;
144 use RequestResponseMessage::{Request as Req, Response as Res};
145 match event {
146 Message {
147 peer,
148 message: Req {
149 request, channel, ..
150 },
151 } => {
152 let beh = self.swarm.behaviour_mut();
153 self.peer.calculate_local_opinions(request.get_epoch());
155 let opinion = self.peer.get_local_opinion(&(peer, request.get_epoch()));
157 let response = Response::Success(opinion);
158 let res = beh.send_response(channel, response);
159 if let Err(e) = res {
160 log::error!("Failed to send the response {:?}", e);
161 }
162 },
163 Message {
164 peer,
165 message: Res { response, .. },
166 } => {
167 if let Response::Success(opinion) = response {
170 self.peer
171 .cache_neighbor_opinion((peer, opinion.get_epoch()), opinion);
172 } else {
173 log::error!("Received error response {:?}", response);
174 }
175 },
176 OutboundFailure {
177 peer, request_id, ..
178 } => {
179 log::error!("Outbound failure {:?} from {:?}", request_id, peer);
180 },
181 InboundFailure {
182 peer, request_id, ..
183 } => {
184 log::error!("Inbound failure {:?} from {:?}", request_id, peer);
185 },
186 ResponseSent { peer, request_id } => {
187 log::debug!("Response sent {:?} to {:?}", request_id, peer);
188 },
189 };
190 }
191
192 pub fn handle_swarm_events(
194 &mut self,
195 event: SwarmEvent<
196 RequestResponseEvent<Request, Response>,
197 ConnectionHandlerUpgrErr<IoError>,
198 >,
199 ) {
200 match event {
201 SwarmEvent::NewListenAddr { address, .. } => log::info!("Listening on {:?}", address),
202 SwarmEvent::Behaviour(req_res_event) => self.handle_req_res_events(req_res_event),
204 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
206 let res = self.peer.add_neighbor(peer_id);
207 if let Err(e) = res {
208 log::error!("Failed to add neighbor {:?}", e);
209 }
210 log::info!("Connection established with {:?}", peer_id);
211 },
212 SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
214 self.peer.remove_neighbor(peer_id);
215 log::info!("Connection closed with {:?} ({:?})", peer_id, cause);
216 },
217 SwarmEvent::Dialing(peer_id) => log::info!("Dialing {:?}", peer_id),
218 e => log::debug!("{:?}", e),
219 }
220 }
221
222 pub fn dial_neighbor(&mut self, addr: Multiaddr) {
224 let res = self.swarm.dial(addr).map_err(|_| EigenError::DialError);
225 log::debug!("swarm.dial {:?}", res);
226 }
227
228 pub fn dial_bootstrap_nodes(&mut self) {
230 let local_peer_id = self.local_key.public().to_peer_id();
232 for (peer_id, peer_addr, _) in self.bootstrap_nodes.iter_mut() {
233 if peer_id == &local_peer_id {
234 continue;
235 }
236
237 let res = self
238 .swarm
239 .dial(peer_addr.clone())
240 .map_err(|_| EigenError::DialError);
241 log::debug!("swarm.dial {:?}", res);
242 }
243 }
244
245 pub fn send_epoch_requests(&mut self, epoch: Epoch) {
247 for peer_id in self.peer.neighbors() {
248 let beh = self.swarm.behaviour_mut();
249
250 let request = Request::new(epoch);
251 beh.send_request(&peer_id, request);
252 }
253 }
254
255 pub async fn main_loop(mut self, interval_limit: Option<u32>) -> Result<(), EigenError> {
261 self.dial_bootstrap_nodes();
262
263 let now = Instant::now();
264 let secs_until_next_epoch = Epoch::secs_until_next_epoch(C::INTERVAL)?;
265 let start = now + Duration::from_secs(secs_until_next_epoch);
267 let period = Duration::from_secs(C::INTERVAL);
268
269 let mut interval = time::interval_at(start, period);
271
272 let mut count = 0;
274
275 loop {
276 select! {
277 biased;
278 _ = interval.tick() => {
280 let current_epoch = Epoch::current_epoch(C::INTERVAL)?;
281
282 let score = self.peer.calculate_global_trust_score(current_epoch.previous());
284 log::info!("{:?} finished, score: {}", current_epoch.previous(), score);
285
286 self.send_epoch_requests(current_epoch);
288
289 if let Some(num) = interval_limit {
291 count += 1;
292 if count >= num {
293 break;
294 }
295 }
296 },
297 event = self.swarm.select_next_some() => self.handle_swarm_events(event),
299 }
300 }
301
302 Ok(())
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use std::str::FromStr;
310
311 struct TestConfig;
312 impl NodeConfig for TestConfig {
313 const INTERVAL: u64 = 10;
314 const NUM_CONNECTIONS: usize = 1;
315 const PRE_TRUST_WEIGHT: f64 = 0.5;
316 }
317
318 const PRE_TRUST_SCORE: f64 = 0.5;
319
320 #[tokio::test]
321 async fn should_emit_connection_event_on_bootstrap() {
322 const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56706";
323 const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58601";
324
325 let local_key1 = Keypair::generate_ed25519();
326 let peer_id1 = local_key1.public().to_peer_id();
327
328 let local_key2 = Keypair::generate_ed25519();
329 let peer_id2 = local_key2.public().to_peer_id();
330
331 let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
332 let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
333
334 let bootstrap_nodes = vec![
335 (peer_id1, local_address1.clone(), PRE_TRUST_SCORE),
336 (peer_id2, local_address2.clone(), PRE_TRUST_SCORE),
337 ];
338
339 let mut node1 =
340 Node::<TestConfig>::new(local_key1, local_address1.clone(), bootstrap_nodes.clone())
341 .unwrap();
342 let mut node2 =
343 Node::<TestConfig>::new(local_key2, local_address2, bootstrap_nodes).unwrap();
344
345 node1.dial_bootstrap_nodes();
346
347 for _ in 0..5 {
355 select! {
356 event2 = node2.get_swarm_mut().select_next_some() => {
357 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event2 {
358 assert_eq!(peer_id, peer_id1);
359 }
360 },
361 event1 = node1.get_swarm_mut().select_next_some() => {
362 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event1 {
363 assert_eq!(peer_id, peer_id2);
364 }
365 },
366
367 }
368 }
369 }
370
371 #[tokio::test]
372 async fn should_add_neighbors_on_bootstrap() {
373 const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56707";
374 const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58602";
375
376 let local_key1 = Keypair::generate_ed25519();
377 let peer_id1 = local_key1.public().to_peer_id();
378
379 let local_key2 = Keypair::generate_ed25519();
380 let peer_id2 = local_key2.public().to_peer_id();
381
382 let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
383 let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
384
385 let bootstrap_nodes = vec![
386 (peer_id1, local_address1.clone(), PRE_TRUST_SCORE),
387 (peer_id2, local_address2.clone(), PRE_TRUST_SCORE),
388 ];
389
390 let mut node1 =
391 Node::<TestConfig>::new(local_key1, local_address1, bootstrap_nodes.clone()).unwrap();
392 let mut node2 =
393 Node::<TestConfig>::new(local_key2, local_address2, bootstrap_nodes).unwrap();
394
395 node1.dial_bootstrap_nodes();
396
397 for _ in 0..5 {
405 select! {
406 event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
407 event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
408
409 }
410 }
411
412 let neighbors1: Vec<PeerId> = node1.get_peer().neighbors();
413 let neighbors2: Vec<PeerId> = node2.get_peer().neighbors();
414 let expected_neighbor1 = vec![peer_id2];
415 let expected_neighbor2 = vec![peer_id1];
416 assert_eq!(neighbors1, expected_neighbor1);
417 assert_eq!(neighbors2, expected_neighbor2);
418
419 node2.get_swarm_mut().disconnect_peer_id(peer_id1).unwrap();
421
422 for _ in 0..2 {
424 select! {
425 event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
426 event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
427
428 }
429 }
430
431 let neighbors2: Vec<PeerId> = node2.get_peer().neighbors();
432 let neighbors1: Vec<PeerId> = node1.get_peer().neighbors();
433 assert!(neighbors2.is_empty());
434 assert!(neighbors1.is_empty());
435 }
436
437 #[tokio::test]
438 async fn should_add_neighbors_on_dial() {
439 const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56717";
440 const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58622";
441
442 let local_key1 = Keypair::generate_ed25519();
443 let peer_id1 = local_key1.public().to_peer_id();
444
445 let local_key2 = Keypair::generate_ed25519();
446 let peer_id2 = local_key2.public().to_peer_id();
447
448 let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
449 let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
450
451 let mut node1 = Node::<TestConfig>::new(local_key1, local_address1, Vec::new()).unwrap();
452 let mut node2 =
453 Node::<TestConfig>::new(local_key2, local_address2.clone(), Vec::new()).unwrap();
454
455 node1.dial_neighbor(local_address2);
456
457 for _ in 0..5 {
465 select! {
466 event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
467 event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
468
469 }
470 }
471
472 let neighbors1: Vec<PeerId> = node1.get_peer().neighbors();
473 let neighbors2: Vec<PeerId> = node2.get_peer().neighbors();
474 let expected_neighbor1 = vec![peer_id2];
475 let expected_neighbor2 = vec![peer_id1];
476 assert_eq!(neighbors1, expected_neighbor1);
477 assert_eq!(neighbors2, expected_neighbor2);
478
479 node2.get_swarm_mut().disconnect_peer_id(peer_id1).unwrap();
481
482 for _ in 0..2 {
484 select! {
485 event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
486 event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
487
488 }
489 }
490
491 let neighbors2: Vec<PeerId> = node2.get_peer().neighbors();
492 let neighbors1: Vec<PeerId> = node1.get_peer().neighbors();
493 assert!(neighbors2.is_empty());
494 assert!(neighbors1.is_empty());
495 }
496
497 #[tokio::test]
498 async fn should_handle_request_for_opinion() {
499 const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56708";
500 const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58603";
501
502 let local_key1 = Keypair::generate_ed25519();
503 let peer_id1 = local_key1.public().to_peer_id();
504
505 let local_key2 = Keypair::generate_ed25519();
506 let peer_id2 = local_key2.public().to_peer_id();
507
508 let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
509 let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
510
511 let bootstrap_nodes = vec![
512 (peer_id1, local_address1.clone(), PRE_TRUST_SCORE),
513 (peer_id2, local_address2.clone(), PRE_TRUST_SCORE),
514 ];
515
516 let mut node1 =
517 Node::<TestConfig>::new(local_key1, local_address1, bootstrap_nodes.clone()).unwrap();
518 let mut node2 =
519 Node::<TestConfig>::new(local_key2, local_address2, bootstrap_nodes).unwrap();
520
521 node1.dial_bootstrap_nodes();
522
523 for _ in 0..5 {
531 select! {
532 event2 = node2.get_swarm_mut().select_next_some() => node2.handle_swarm_events(event2),
533 event1 = node1.get_swarm_mut().select_next_some() => node1.handle_swarm_events(event1),
534 }
535 }
536
537 let peer1 = node1.get_peer_mut();
538 let peer2 = node2.get_peer_mut();
539
540 let current_epoch = Epoch(0);
541 let next_epoch = current_epoch.next();
542
543 peer1.set_score(peer_id2, 5);
544 peer2.set_score(peer_id1, 5);
545
546 peer1.calculate_local_opinions(current_epoch);
547 peer2.calculate_local_opinions(current_epoch);
548
549 node1.send_epoch_requests(next_epoch);
550 node2.send_epoch_requests(next_epoch);
551
552 for _ in 0..6 {
557 select! {
558 event1 = node1.get_swarm_mut().select_next_some() => {
559 node1.handle_swarm_events(event1);
560 },
561 event2 = node2.get_swarm_mut().select_next_some() => {
562 node2.handle_swarm_events(event2);
563 },
564 }
565 }
566
567 let peer1 = node1.get_peer();
568 let peer2 = node2.get_peer();
569 let peer1_neighbor_opinion = peer1.get_neighbor_opinion(&(peer_id2, next_epoch));
570 let peer2_neighbor_opinion = peer2.get_neighbor_opinion(&(peer_id1, next_epoch));
571
572 assert_eq!(peer1_neighbor_opinion.get_epoch(), next_epoch);
573 assert_eq!(peer1_neighbor_opinion.get_local_trust_score(), 1.0);
574 assert_eq!(peer1_neighbor_opinion.get_global_trust_score(), 0.25);
575 assert_eq!(peer1_neighbor_opinion.get_product(), 0.25);
576
577 assert_eq!(peer2_neighbor_opinion.get_epoch(), next_epoch);
578 assert_eq!(peer2_neighbor_opinion.get_local_trust_score(), 1.0);
579 assert_eq!(peer2_neighbor_opinion.get_global_trust_score(), 0.25);
580 assert_eq!(peer2_neighbor_opinion.get_product(), 0.25);
581
582 let peer1_global_score = peer1.calculate_global_trust_score(next_epoch);
583 let peer2_global_score = peer1.calculate_global_trust_score(next_epoch);
584
585 let peer_gs = (1. - TestConfig::PRE_TRUST_WEIGHT) * 0.25
586 + TestConfig::PRE_TRUST_WEIGHT * PRE_TRUST_SCORE;
587 assert_eq!(peer1_global_score, peer_gs);
588 assert_eq!(peer2_global_score, peer_gs);
589 }
590
591 #[tokio::test]
592 async fn should_run_main_loop() {
593 const ADDR_1: &str = "/ip4/127.0.0.1/tcp/56728";
594 const ADDR_2: &str = "/ip4/127.0.0.1/tcp/58623";
595
596 let local_key1 = Keypair::generate_ed25519();
597 let peer_id1 = local_key1.public().to_peer_id();
598
599 let local_key2 = Keypair::generate_ed25519();
600 let peer_id2 = local_key2.public().to_peer_id();
601
602 let local_address1 = Multiaddr::from_str(ADDR_1).unwrap();
603 let local_address2 = Multiaddr::from_str(ADDR_2).unwrap();
604
605 let bootstrap_nodes = vec![
606 (peer_id1, local_address1.clone(), PRE_TRUST_SCORE),
607 (peer_id2, local_address2.clone(), PRE_TRUST_SCORE),
608 ];
609
610 let mut node1 =
611 Node::<TestConfig>::new(local_key1, local_address1, bootstrap_nodes.clone()).unwrap();
612 let node2 = Node::<TestConfig>::new(local_key2, local_address2, bootstrap_nodes).unwrap();
613
614 node1.dial_bootstrap_nodes();
615
616 let join1 = tokio::spawn(async move { node1.main_loop(Some(1)).await });
617
618 let join2 = tokio::spawn(async move { node2.main_loop(Some(1)).await });
619
620 let (res1, res2) = tokio::join!(join1, join2);
621 res1.unwrap().unwrap();
622 res2.unwrap().unwrap();
623 }
624}