use std::{collections::HashMap, net::Ipv4Addr};
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
use libp2p::Transport;
use libp2p::dns::TokioDnsConfig;
use libp2p::swarm::SwarmBuilder;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
gossipsub::Event,
identify::Event as IdentifyEvent,
swarm::SwarmEvent,
tcp::Config as TcpConfig,
identity,
noise,
yamux,
PeerId,
};
use libp2p_mplex::MplexConfig;
use futures::StreamExt;
use tokio::sync::Mutex;
use pchain_types::cryptography::PublicAddress;
use crate::messages::NetworkTopic;
use crate::network::Network;
use crate::{
behaviour::{PeerNetworkBehaviour, PeerNetworkEvent},
configuration::Config,
conversions,
messages::{Envelope, MessageGateChain},
network::SendCommand,
};
pub async fn start(
config: Config,
subscribe_topics: Vec<NetworkTopic>,
message_gates: MessageGateChain,
) -> Result<Network, Box<dyn Error>> {
let local_public_address = conversions::public_address(&config.keypair.public()).unwrap();
let local_peer_id = config.keypair.public().to_peer_id();
let local_keypair = config.keypair;
log::info!(
"Local peer id: {:?} {:?}",
local_peer_id,
local_peer_id.to_bytes()
);
let transport = build_transport(local_keypair.clone()).await?;
let behaviour: PeerNetworkBehaviour =
PeerNetworkBehaviour::new(local_public_address, &local_keypair, 10);
let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
swarm.listen_on(conversions::multiaddr(
Ipv4Addr::new(0, 0, 0, 0),
config.port,
))?;
if !config.boot_nodes.is_empty() {
config.boot_nodes.iter().for_each(|peer_info| {
swarm.behaviour_mut().add_address(
&peer_info.peer_id,
conversions::multiaddr(peer_info.ip_address, peer_info.port),
);
});
}
swarm.behaviour_mut().subscribe(subscribe_topics)?;
let peer_lookup: Arc<Mutex<HashMap<PeerId, PublicAddress>>> =
Arc::new(Mutex::new(HashMap::new()));
let peer_lookup_in_networking = peer_lookup.clone();
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<SendCommand>(config.send_command_buffer_size);
let mut discover_tick =
tokio::time::interval(Duration::from_secs(config.peer_discovery_interval));
let network_thread_handle = tokio::task::spawn(async move {
loop {
let (send_command, event) = tokio::select! {
biased;
event = swarm.select_next_some() => {
(None, Some(event))
},
send_command = receiver.recv() => {
(send_command, None)
},
_ = discover_tick.tick() => {
swarm.behaviour_mut().random_walk();
(None, None)
},
};
if let Some(send_command) = send_command {
match send_command {
SendCommand::SendTo(address, raw_message) => {
log::info!("SendTo: {}", conversions::base64_string(address).as_str());
if address == local_public_address {
let envelope = Envelope {
origin: local_public_address,
message: raw_message,
};
message_gates
.message_in(
&NetworkTopic::from(local_public_address).hash(),
envelope,
)
.await;
} else if let Err(e) = swarm.behaviour_mut().send_to(address, raw_message) {
log::error!("{:?}", e);
}
}
SendCommand::Broadcast(topic, msg) => {
log::info!("Broadcast (Topic: {:?})", topic);
if let Err(e) = swarm.behaviour_mut().broadcast(topic.into(), msg) {
log::debug!("{:?}", e);
}
}
}
}
if let Some(event) = event {
match event {
SwarmEvent::Behaviour(
PeerNetworkEvent::Gossip(
Event::Message {
message,
..
})) => {
if let Some(src_peer_id) = &message.source {
if let Some(public_addr) = peer_lookup_in_networking
.clone()
.lock()
.await
.get(src_peer_id)
{
if swarm.behaviour().is_subscribed(&message) {
let envelope = Envelope {
origin: *public_addr,
message: message.data,
};
message_gates.message_in(&message.topic, envelope).await;
} else {
log::debug!("Receive unknown gossip message");
}
} else {
log::debug!("Sender is not In List. {}", src_peer_id);
}
}
}
SwarmEvent::Behaviour(PeerNetworkEvent::Identify(
IdentifyEvent::Received { peer_id, info },
)) => {
info.listen_addrs.iter().for_each(|a| {
swarm.behaviour_mut().add_address(&peer_id, a.clone());
});
if let Some(identified_address) =
conversions::public_address(&info.public_key)
{
log::debug!(
"Identify PeerID: {} Address: {}",
peer_id,
conversions::base64_string(identified_address).as_str()
);
peer_lookup_in_networking
.clone()
.lock()
.await
.insert(peer_id, identified_address);
}
}
SwarmEvent::ConnectionClosed { peer_id, .. } => {
log::debug!("ConnectionClosed {}", peer_id);
swarm.behaviour_mut().remove_peer(&peer_id);
peer_lookup_in_networking
.clone()
.lock()
.await
.remove(&peer_id);
}
_ => {}
}
}
}
});
Ok(Network {
network_thread: network_thread_handle,
peer_public_addrs: peer_lookup,
sender,
})
}
async fn build_transport(
keypair: identity::Keypair,
) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>> {
let transport = {
let tcp = libp2p::tcp::tokio::Transport::new(TcpConfig::new().nodelay(true));
TokioDnsConfig::system(tcp)?
};
let upgrade = libp2p::core::upgrade::SelectUpgrade::new(
yamux::Config::default(),
MplexConfig::default(),
);
Ok(transport
.upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(noise::Config::new(&keypair).unwrap())
.multiplex(upgrade)
.timeout(std::time::Duration::from_secs(20))
.boxed())
}
#[cfg(test)]
mod tests {
use super::build_transport;
use crate::engine::start;
use crate::{configuration::Config, messages::MessageGateChain};
#[tokio::test]
async fn network_test_start() {
let test_config = Config::new();
let test_subscribe_topics = vec![];
let test_gates = MessageGateChain::new();
let result = start(test_config, test_subscribe_topics, test_gates).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_build_transport() {
let test_keypair = libp2p::identity::Keypair::generate_ed25519();
let result = build_transport(test_keypair).await;
assert!(result.is_ok());
}
}