use futures::stream::StreamExt;
use libp2p::gossipsub::MessageAcceptance;
use libp2p::multiaddr::Protocol;
use libp2p::{gossipsub, kad, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux};
use libp2p::{identify, identity, Multiaddr, PeerId, StreamProtocol};
use log::warn;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use thiserror::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::{io, select, time};
mod dns;
const MAX_OFFER_SIZE: usize = 300 * 1024;
#[derive(Error, Debug)]
pub enum SplashError {
#[error("Offer exceeds maximum size of {0} bytes")]
OfferTooLarge(usize),
#[error("Invalid offer format: not a valid bech32 string")]
InvalidOfferFormat,
#[error("Failed to send offer to network")]
SendError,
}
pub enum SplashEvent {
Initialized(PeerId),
PeerConnected(PeerId),
PeerDisconnected(PeerId),
OfferReceived(String),
NewListenAddress(Multiaddr),
OfferBroadcasted(String),
OfferBroadcastFailed(gossipsub::PublishError),
}
pub struct Splash {
pub listen_addresses: Vec<Multiaddr>,
pub known_peers: Vec<Multiaddr>,
pub keys: identity::Keypair,
network_name: String,
submission: Sender<Vec<u8>>,
submission_receiver: Option<Receiver<Vec<u8>>>,
}
pub struct SplashContext {
pub node: Splash,
pub events: mpsc::Receiver<SplashEvent>,
}
impl Clone for Splash {
fn clone(&self) -> Self {
Splash {
listen_addresses: self.listen_addresses.clone(),
known_peers: self.known_peers.clone(),
keys: self.keys.clone(),
network_name: self.network_name.clone(),
submission: self.submission.clone(),
submission_receiver: None,
}
}
}
#[derive(NetworkBehaviour)]
struct SplashBehaviour {
gossipsub: gossipsub::Behaviour,
kademlia: kad::Behaviour<kad::store::MemoryStore>,
identify: identify::Behaviour,
}
impl Splash {
pub fn new() -> Splash {
let (submission_sender, submission_receiver) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
Splash {
known_peers: Vec::new(),
listen_addresses: Vec::new(),
keys: identity::Keypair::generate_ed25519(),
network_name: "splash".to_string(),
submission: submission_sender,
submission_receiver: Some(submission_receiver),
}
}
pub fn validate_offer(offer: &str) -> Result<(), SplashError> {
if offer.len() > MAX_OFFER_SIZE {
return Err(SplashError::OfferTooLarge(MAX_OFFER_SIZE));
}
if !offer.starts_with("offer1") || bech32::decode(offer).is_err() {
return Err(SplashError::InvalidOfferFormat);
}
Ok(())
}
pub async fn broadcast_offer(&self, offer: &str) -> Result<(), SplashError> {
Splash::validate_offer(offer)?;
self.submission
.send(offer.as_bytes().to_vec())
.await
.map_err(|_| SplashError::SendError)?;
Ok(())
}
pub fn with_listen_addresses(mut self, listen_addresses: Vec<Multiaddr>) -> Self {
self.listen_addresses = listen_addresses;
self
}
pub fn with_known_peers(mut self, known_peers: Vec<Multiaddr>) -> Self {
self.known_peers = known_peers;
self
}
pub fn with_keys(mut self, keys: identity::Keypair) -> Self {
self.keys = keys;
self
}
pub fn with_testnet(mut self) -> Self {
self.network_name = "splash-testnet".to_string();
self
}
pub async fn build(mut self) -> Result<SplashContext, Box<dyn std::error::Error>> {
let (event_tx, event_rx) = mpsc::channel(100);
if self.known_peers.is_empty() {
self.known_peers = dns::resolve_peers_from_dns(self.network_name.clone())
.await
.map_err(|e| format!("Failed to resolve peers from DNS: {}", e))?;
}
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(self.keys.clone())
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|key| {
let unique_offer_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(5)) .message_id_fn(unique_offer_fn) .max_transmit_size(MAX_OFFER_SIZE)
.validate_messages()
.validation_mode(gossipsub::ValidationMode::Permissive)
.build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?;
let dummy_key = identity::Keypair::generate_ed25519();
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(dummy_key),
gossipsub_config,
)?;
let mut cfg = kad::Config::new(
StreamProtocol::try_from_owned(format!("/{}/kad/1", self.network_name))
.expect("protocol name is valid"),
);
cfg.set_query_timeout(Duration::from_secs(60));
let store = kad::store::MemoryStore::new(key.public().to_peer_id());
let mut kademlia =
kad::Behaviour::with_config(key.public().to_peer_id(), store, cfg);
for addr in self.known_peers.iter() {
let Some(Protocol::P2p(peer_id)) = addr.iter().last() else {
return Err("Expect peer multiaddr to contain peer ID.".into());
};
kademlia.add_address(&peer_id, addr.clone());
}
kademlia.bootstrap().unwrap();
let identify = identify::Behaviour::new(
identify::Config::new(
format!("/{}/id/1", self.network_name),
key.public().clone(),
)
.with_agent_version(format!("splash/{}", env!("CARGO_PKG_VERSION"))),
);
Ok(SplashBehaviour {
gossipsub,
kademlia,
identify,
})
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
if !self.listen_addresses.is_empty() {
for addr in self.listen_addresses.iter() {
swarm.listen_on(addr.clone())?;
}
} else {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
swarm.listen_on("/ip6/::/tcp/0".parse()?)?;
}
let topic = gossipsub::IdentTopic::new(format!("/{}/offers/1", self.network_name));
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
let mut peer_discovery_interval = time::interval(time::Duration::from_secs(10));
let mut submission_receiver = self
.submission_receiver
.take()
.ok_or("Submission receiver already consumed")?;
event_tx
.send(SplashEvent::Initialized(self.keys.public().to_peer_id()))
.await
.ok();
tokio::spawn(async move {
loop {
select! {
Some(offer) = submission_receiver.recv() => {
if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic.clone(), offer.clone()) {
event_tx.send(SplashEvent::OfferBroadcastFailed(e)).await.ok();
}
event_tx.send(SplashEvent::OfferBroadcasted(String::from_utf8_lossy(&offer).to_string())).await.ok();
},
_ = peer_discovery_interval.tick() => {
swarm.behaviour_mut().kademlia.get_closest_peers(PeerId::random());
},
event = swarm.select_next_some() => match event {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
event_tx.send(SplashEvent::PeerConnected(peer_id)).await.ok();
},
SwarmEvent::ConnectionClosed { peer_id, .. } => {
event_tx.send(SplashEvent::PeerDisconnected(peer_id)).await.ok();
},
SwarmEvent::Behaviour(SplashBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source,
message_id,
message,
})) => {
let msg_str = String::from_utf8_lossy(&message.data).into_owned();
match Splash::validate_offer(&msg_str) {
Ok(_) => {
event_tx.send(SplashEvent::OfferReceived(msg_str)).await.ok();
swarm.behaviour_mut().gossipsub.report_message_validation_result(&message_id, &propagation_source, MessageAcceptance::Accept).ok();
}
Err(e) => {
warn!("Received invalid offer: {}", e);
swarm.behaviour_mut().gossipsub.report_message_validation_result(&message_id, &propagation_source, MessageAcceptance::Reject).ok();
}
}
},
SwarmEvent::Behaviour(SplashBehaviourEvent::Identify(identify::Event::Received { info: identify::Info { observed_addr, listen_addrs, .. }, peer_id, connection_id: _ })) => {
for addr in listen_addrs {
let is_non_global = addr.iter().any(|p| match p {
Protocol::Ip4(addr) => addr.is_loopback() || addr.is_private(),
Protocol::Ip6(addr) => addr.is_loopback(),
_ => false,
});
if is_non_global {
continue;
}
swarm.behaviour_mut().kademlia.add_address(&peer_id, addr);
}
swarm.add_external_address(observed_addr);
},
SwarmEvent::NewListenAddr { address, .. } => {
event_tx.send(SplashEvent::NewListenAddress(address)).await.ok();
},
_ => {}
}
}
}
});
Ok(SplashContext {
node: self,
events: event_rx,
})
}
}