1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
pub mod behaviour;
pub mod multiaddr;
pub mod ratelimit;
use behaviour::Behaviour;
use libp2p::core::upgrade;
use libp2p::gossipsub::IdentTopic;
use libp2p::gossipsub::TopicHash;
use libp2p::identity;
use libp2p::mplex;
use libp2p::noise;
use libp2p::swarm::ConnectionLimits;
use libp2p::swarm::SwarmBuilder;
use libp2p::tcp;
use libp2p::Multiaddr;
use libp2p::PeerId;
use libp2p::Swarm;
use libp2p::Transport;
use ratelimit::Endpoint;
use ratelimit::Ratelimit;
use sha2::Digest;
use sha2::Sha256;
use std::collections::HashMap;
use std::collections::HashSet;
use std::error::Error;
use std::time::Duration;
use tofuri_core::*;
pub struct P2p {
pub swarm: Swarm<Behaviour>,
pub filter: HashSet<Hash>,
pub connections: HashMap<Multiaddr, PeerId>,
pub ratelimit: Ratelimit,
pub unknown: HashSet<Multiaddr>,
pub known: HashSet<Multiaddr>,
}
impl P2p {
pub async fn new(max_established: Option<u32>, timeout: u64, known: HashSet<Multiaddr>) -> Result<P2p, Box<dyn Error>> {
Ok(P2p {
swarm: swarm(max_established, timeout).await?,
filter: HashSet::new(),
connections: HashMap::new(),
ratelimit: Ratelimit::default(),
unknown: HashSet::new(),
known,
})
}
pub fn ratelimit(&mut self, peer_id: PeerId, endpoint: Endpoint) -> Result<(), Box<dyn Error>> {
let (multiaddr, _) = self.connections.iter().find(|x| x.1 == &peer_id).unwrap();
let addr = multiaddr::ip_addr(multiaddr).expect("multiaddr to include ip");
if self.ratelimit.add(addr, endpoint) {
let _ = self.swarm.disconnect_peer_id(peer_id);
return Err("ratelimited".into());
}
Ok(())
}
pub fn filter(&mut self, data: &[u8]) -> bool {
let mut hasher = Sha256::new();
hasher.update(data);
!self.filter.insert(hasher.finalize().into())
}
fn gossipsub_has_mesh_peers(&self, topic: &str) -> bool {
self.swarm.behaviour().gossipsub.mesh_peers(&TopicHash::from_raw(topic)).count() != 0
}
pub fn gossipsub_publish(&mut self, topic: &str, data: Vec<u8>) -> Result<(), Box<dyn Error>> {
if !self.gossipsub_has_mesh_peers(topic) {
return Ok(());
}
if self.filter(&data) {
return Err(format!("gossipsub_publish filter {topic}").into());
}
if let Err(err) = self.swarm.behaviour_mut().gossipsub.publish(IdentTopic::new(topic), data) {
return Err(err.into());
}
Ok(())
}
}
async fn swarm(max_established: Option<u32>, timeout: u64) -> Result<Swarm<Behaviour>, Box<dyn Error>> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true))
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseAuthenticated::xx(&local_key).expect("Signing libp2p-noise static DH keypair failed."))
.multiplex(mplex::MplexConfig::new())
.timeout(Duration::from_millis(timeout))
.boxed();
let mut behaviour = Behaviour::new(local_key).await?;
for ident_topic in [
IdentTopic::new("block"),
IdentTopic::new("stake"),
IdentTopic::new("transaction"),
IdentTopic::new("multiaddr"),
]
.iter()
{
behaviour.gossipsub.subscribe(ident_topic)?;
}
let mut limits = ConnectionLimits::default();
limits = limits.with_max_established_per_peer(Some(1));
limits = limits.with_max_established(max_established);
Ok(SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id)
.connection_limits(limits)
.build())
}