use crate::messages::{Message, NetworkTopic};
use libp2p::{
gossipsub::{Event, ConfigBuilder, IdentTopic, Message as GossipsubMessage, MessageId},
identity::{Keypair, PublicKey, ed25519},
kad::{Kademlia, KademliaEvent, store::MemoryStore, KademliaConfig},
ping::{Behaviour as Ping, Event as PingEvent, Config as PingConfig},
identify::{Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent},
Multiaddr, PeerId, StreamProtocol
};
use libp2p::swarm::NetworkBehaviour;
use pchain_types::cryptography::PublicAddress;
use std::{time::Duration, vec};
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "PeerNetworkEvent")]
pub(crate) struct PeerNetworkBehaviour {
kad: Kademlia<MemoryStore>,
gossip: libp2p::gossipsub::Behaviour,
identify: Identify,
ping: Ping,
}
impl PeerNetworkBehaviour {
pub fn new(id: PublicAddress, local_key: &Keypair, heartbeat_secs: u64) -> Self {
let proto_version = "/pchain_p2p/1.0.0";
let public_key: PublicKey = ed25519::PublicKey::try_from_bytes(&id).expect("Invalid public key to setup peer newtork.").into();
let local_peer_id = public_key.to_peer_id();
let proto_names= vec![StreamProtocol::new(proto_version)];
let kad_config = KademliaConfig::default()
.set_protocol_names(proto_names)
.to_owned();
let mut kad = Kademlia::<MemoryStore>::with_config(
local_peer_id,
MemoryStore::new(local_peer_id),
kad_config,
);
kad.set_mode(Some(libp2p::kad::Mode::Server));
let identify_config = IdentifyConfig::new(proto_version.to_string(), local_key.public());
let identify = Identify::new(identify_config);
let message_id_fn = |message: &GossipsubMessage| {
let mut id_str = message.topic.as_str().to_string();
let src_str = match message.source {
Some(src) => base64url::encode(src.to_bytes()),
None => "none".to_string(),
};
id_str.push_str(&src_str);
id_str.push_str(&message.sequence_number.unwrap_or_default().to_string());
MessageId::from(id_str.to_string())
};
let mut gossip = libp2p::gossipsub::Behaviour::new(
libp2p::gossipsub::MessageAuthenticity::Signed(local_key.clone()),
ConfigBuilder::default()
.max_transmit_size(4*1024*1024) .message_id_fn(message_id_fn)
.heartbeat_interval(Duration::from_secs(heartbeat_secs))
.build()
.unwrap(),
)
.unwrap();
gossip.subscribe(&NetworkTopic::from(id).into()).unwrap();
let ping = Ping::new(PingConfig::new());
Self {
gossip,
kad,
identify,
ping,
}
}
}
impl PeerNetworkBehaviour {
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
self.kad.add_address(peer, address);
}
pub fn remove_peer(&mut self, peer: &PeerId) {
self.kad.remove_peer(peer);
}
pub fn random_walk(&mut self) {
self.kad.get_closest_peers(PeerId::random());
}
pub fn subscribe(
&mut self,
topics: Vec<NetworkTopic>,
) -> Result<(), Box<dyn std::error::Error>> {
for topic in topics {
self.gossip.subscribe(&topic.into())?;
}
Ok(())
}
pub fn send_to(&mut self, address: PublicAddress, msg: Message) -> Result<libp2p::gossipsub::MessageId, libp2p::gossipsub::PublishError> {
let topic: IdentTopic = NetworkTopic::from(address).into();
self.gossip.publish(topic, msg)
}
pub fn broadcast(&mut self, topic: IdentTopic, msg: Message) -> Result<libp2p::gossipsub::MessageId, libp2p::gossipsub::PublishError> {
self.gossip.publish(topic, msg)
}
pub fn is_subscribed(&self, message: &GossipsubMessage) -> bool {
self.gossip.topics().any(|topic| message.topic.eq(topic))
}
}
pub(crate) enum PeerNetworkEvent {
Kad(KademliaEvent),
Gossip(Event),
Ping(PingEvent),
Identify(IdentifyEvent),
}
impl From<Event> for PeerNetworkEvent {
fn from(event: Event) -> Self {
Self::Gossip(event)
}
}
impl From<KademliaEvent> for PeerNetworkEvent {
fn from(event: KademliaEvent) -> Self {
Self::Kad(event)
}
}
impl From<PingEvent> for PeerNetworkEvent {
fn from(event: PingEvent) -> Self {
Self::Ping(event)
}
}
impl From<IdentifyEvent> for PeerNetworkEvent {
fn from(event: IdentifyEvent) -> Self {
Self::Identify(event)
}
}
#[cfg(test)]
mod test {
use std::net::Ipv4Addr;
use super::PeerNetworkBehaviour;
use crate::{conversions, messages::NetworkTopic, configuration::Config};
use libp2p::{
gossipsub::{Message as GossipsubMessage, TopicHash},
Multiaddr, PeerId,
};
use pchain_types::cryptography::PublicAddress;
struct PeerInfo {
public_key: PublicAddress,
peer_id: PeerId,
multi_addr: Multiaddr
}
fn setup_network_behaviour() -> PeerNetworkBehaviour {
let config = Config::new();
let local_public_address = conversions::public_address(&config.keypair.public()).unwrap();
PeerNetworkBehaviour::new(
local_public_address,
&config.keypair,
config.peer_discovery_interval
)
}
fn create_new_peer() -> PeerInfo {
let peer_config = Config::new();
let peer_id = peer_config.keypair.public().to_peer_id();
let peer_ip_addr = Ipv4Addr::new(127, 0, 0, 1);
let peer_multiaddr = conversions::multiaddr(peer_ip_addr, peer_config.port);
PeerInfo {
public_key: conversions::public_address(&peer_config.keypair.public()).unwrap(),
peer_id,
multi_addr: peer_multiaddr
}
}
#[test]
fn test_init_network_behaviour() {
let network_behaviour = setup_network_behaviour();
let subscribed_topics: Vec<&TopicHash> = network_behaviour.gossip.topics().collect();
assert_eq!(subscribed_topics.len(), 1);
}
#[test]
fn test_add_and_remove_peer() {
let mut network_behaviour = setup_network_behaviour();
let peer1 = create_new_peer();
let peer2 = create_new_peer();
network_behaviour.add_address(&peer1.peer_id, peer1.multi_addr);
network_behaviour.add_address(&peer2.peer_id, peer2.multi_addr);
let peer_num: usize = network_behaviour.kad.kbuckets().map(|x| x.num_entries()).sum();
assert_eq!(peer_num, 2);
network_behaviour.remove_peer(&peer1.peer_id);
let peer_num: usize = network_behaviour.kad.kbuckets().map(|x| x.num_entries()).sum();
assert_eq!(peer_num, 1);
}
#[test]
fn test_subscribe_topics() {
let mut network_behaviour = setup_network_behaviour();
let peer1 = create_new_peer();
let topic1 = NetworkTopic::new("hello world".to_string());
let topic1_hash = topic1.hash().clone();
let topic2 = NetworkTopic::from(peer1.public_key);
let topic2_hash = topic2.hash().clone();
let topics = vec![topic1, topic2];
let _ = network_behaviour.subscribe(topics);
let subscribed_topics: Vec<&TopicHash> = network_behaviour.gossip.topics().collect();
assert_eq!(subscribed_topics.len(), 3);
let topic1_message = GossipsubMessage {
source: None,
data: vec![],
sequence_number: None,
topic: topic1_hash,
};
let topic2_message = GossipsubMessage {
source: None,
data: vec![],
sequence_number: None,
topic: topic2_hash,
};
assert!(network_behaviour.is_subscribed(&topic1_message));
assert!(network_behaviour.is_subscribed(&topic2_message));
}
}