use crate::error::{BitcoinError, Result};
use bitcoin::{
BlockHash, Network, Transaction, Txid,
consensus::encode,
p2p::{
ServiceFlags,
message::{NetworkMessage, RawNetworkMessage},
message_blockdata::Inventory,
message_network::VersionMessage,
},
};
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct P2pConfig {
pub network: Network,
pub user_agent: String,
pub protocol_version: u32,
pub services: ServiceFlags,
pub max_peers: usize,
pub connection_timeout: Duration,
pub ping_interval: Duration,
}
impl Default for P2pConfig {
fn default() -> Self {
Self {
network: Network::Bitcoin,
user_agent: "/kaccy-bitcoin:0.1.0/".to_string(),
protocol_version: 70016,
services: ServiceFlags::NONE,
max_peers: 8,
connection_timeout: Duration::from_secs(30),
ping_interval: Duration::from_secs(120),
}
}
}
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub address: SocketAddr,
pub version: Option<VersionMessage>,
pub connected_at: SystemTime,
pub last_seen: SystemTime,
pub messages_sent: u64,
pub messages_received: u64,
pub services: ServiceFlags,
pub inbound: bool,
}
impl PeerInfo {
pub fn new(address: SocketAddr, inbound: bool) -> Self {
let now = SystemTime::now();
Self {
address,
version: None,
connected_at: now,
last_seen: now,
messages_sent: 0,
messages_received: 0,
services: ServiceFlags::NONE,
inbound,
}
}
pub fn update_last_seen(&mut self) {
self.last_seen = SystemTime::now();
}
pub fn connection_duration(&self) -> Duration {
SystemTime::now()
.duration_since(self.connected_at)
.unwrap_or_default()
}
pub fn is_stale(&self, timeout: Duration) -> bool {
SystemTime::now()
.duration_since(self.last_seen)
.unwrap_or_default()
> timeout
}
}
#[derive(Debug, Clone)]
pub enum P2pEvent {
PeerConnected(SocketAddr),
PeerDisconnected(SocketAddr),
TransactionReceived(Txid),
BlockReceived(BlockHash),
VersionHandshake(SocketAddr, VersionMessage),
}
pub struct PeerManager {
peers: Arc<RwLock<HashMap<SocketAddr, PeerInfo>>>,
config: P2pConfig,
event_tx: mpsc::UnboundedSender<P2pEvent>,
}
impl PeerManager {
pub fn new(config: P2pConfig) -> (Self, mpsc::UnboundedReceiver<P2pEvent>) {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let manager = Self {
peers: Arc::new(RwLock::new(HashMap::new())),
config,
event_tx,
};
(manager, event_rx)
}
pub fn add_peer(&self, address: SocketAddr, inbound: bool) -> bool {
let mut peers = self.peers.write().unwrap();
if peers.len() >= self.config.max_peers {
return false;
}
if peers.contains_key(&address) {
return false;
}
peers.insert(address, PeerInfo::new(address, inbound));
let _ = self.event_tx.send(P2pEvent::PeerConnected(address));
true
}
pub fn remove_peer(&self, address: &SocketAddr) {
let mut peers = self.peers.write().unwrap();
if peers.remove(address).is_some() {
let _ = self.event_tx.send(P2pEvent::PeerDisconnected(*address));
}
}
pub fn update_peer_version(&self, address: &SocketAddr, version: VersionMessage) {
let mut peers = self.peers.write().unwrap();
if let Some(peer) = peers.get_mut(address) {
peer.services = version.services;
peer.version = Some(version.clone());
peer.update_last_seen();
let _ = self
.event_tx
.send(P2pEvent::VersionHandshake(*address, version));
}
}
pub fn peer_count(&self) -> usize {
self.peers.read().unwrap().len()
}
pub fn get_peers(&self) -> Vec<SocketAddr> {
self.peers.read().unwrap().keys().copied().collect()
}
pub fn get_peer_info(&self, address: &SocketAddr) -> Option<PeerInfo> {
self.peers.read().unwrap().get(address).cloned()
}
pub fn remove_stale_peers(&self, timeout: Duration) {
let stale_peers: Vec<SocketAddr> = self
.peers
.read()
.unwrap()
.iter()
.filter(|(_, info)| info.is_stale(timeout))
.map(|(addr, _)| *addr)
.collect();
for addr in stale_peers {
self.remove_peer(&addr);
}
}
}
pub struct P2pClient {
config: P2pConfig,
peer_manager: Arc<PeerManager>,
known_txs: Arc<RwLock<HashSet<Txid>>>,
#[allow(dead_code)]
known_blocks: Arc<RwLock<HashSet<BlockHash>>>,
}
impl P2pClient {
pub fn new(config: P2pConfig) -> (Self, mpsc::UnboundedReceiver<P2pEvent>) {
let (peer_manager, event_rx) = PeerManager::new(config.clone());
let client = Self {
config,
peer_manager: Arc::new(peer_manager),
known_txs: Arc::new(RwLock::new(HashSet::new())),
known_blocks: Arc::new(RwLock::new(HashSet::new())),
};
(client, event_rx)
}
pub async fn connect_peer(&self, address: SocketAddr) -> Result<()> {
info!(?address, "Connecting to peer");
if !self.peer_manager.add_peer(address, false) {
warn!(
?address,
"Failed to add peer (max peers reached or already connected)"
);
return Err(BitcoinError::ConnectionFailed(
"Max peers reached or already connected".to_string(),
));
}
match tokio::time::timeout(self.config.connection_timeout, TcpStream::connect(address))
.await
{
Ok(Ok(mut stream)) => {
debug!(?address, "TCP connection established");
let version_msg = self.create_version_message(address)?;
self.send_message(&mut stream, NetworkMessage::Version(version_msg))
.await?;
let peer_manager = Arc::clone(&self.peer_manager);
let network = self.config.network;
tokio::spawn(async move {
if let Err(e) =
Self::handle_peer_connection(stream, peer_manager, network).await
{
error!(?address, error = ?e, "Peer connection error");
}
});
Ok(())
}
Ok(Err(e)) => {
self.peer_manager.remove_peer(&address);
Err(BitcoinError::ConnectionFailed(format!(
"Failed to connect to {}: {}",
address, e
)))
}
Err(_) => {
self.peer_manager.remove_peer(&address);
Err(BitcoinError::ConnectionTimeout {
timeout_secs: self.config.connection_timeout.as_secs(),
})
}
}
}
pub async fn broadcast_transaction(&self, tx: &Transaction) -> Result<()> {
let txid = tx.compute_txid();
info!(?txid, "Broadcasting transaction");
self.known_txs.write().unwrap().insert(txid);
let inv = Inventory::Transaction(txid);
let _msg = NetworkMessage::Inv(vec![inv]);
let peers = self.peer_manager.get_peers();
if peers.is_empty() {
return Err(BitcoinError::ConnectionFailed(
"No peers connected".to_string(),
));
}
for peer_addr in peers {
debug!(?peer_addr, ?txid, "Would send tx to peer");
}
Ok(())
}
pub async fn request_blocks(&self, block_hashes: Vec<BlockHash>) -> Result<()> {
info!(count = block_hashes.len(), "Requesting blocks");
let peers = self.peer_manager.get_peers();
if peers.is_empty() {
return Err(BitcoinError::ConnectionFailed(
"No peers connected".to_string(),
));
}
let inv: Vec<Inventory> = block_hashes
.iter()
.map(|hash| Inventory::Block(*hash))
.collect();
let _msg = NetworkMessage::GetData(inv);
let peer_addr = peers[0];
debug!(
?peer_addr,
count = block_hashes.len(),
"Requesting blocks from peer"
);
Ok(())
}
pub fn peer_count(&self) -> usize {
self.peer_manager.peer_count()
}
pub fn get_peers(&self) -> Vec<SocketAddr> {
self.peer_manager.get_peers()
}
fn create_version_message(&self, peer_addr: SocketAddr) -> Result<VersionMessage> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let receiver = bitcoin::p2p::address::Address {
services: ServiceFlags::NONE,
address: match peer_addr.ip() {
IpAddr::V4(ip) => ip.to_ipv6_mapped().segments(),
IpAddr::V6(ip) => ip.segments(),
},
port: peer_addr.port(),
};
let sender = bitcoin::p2p::address::Address {
services: self.config.services,
address: [0; 8], port: 0,
};
Ok(VersionMessage {
version: self.config.protocol_version,
services: self.config.services,
timestamp,
receiver,
sender,
nonce: rand::random(),
user_agent: self.config.user_agent.clone(),
start_height: 0,
relay: false,
})
}
async fn send_message(&self, stream: &mut TcpStream, message: NetworkMessage) -> Result<()> {
let raw_msg = RawNetworkMessage::new(self.config.network.magic(), message);
let bytes = encode::serialize(&raw_msg);
stream
.write_all(&bytes)
.await
.map_err(|e| BitcoinError::ConnectionFailed(e.to_string()))?;
Ok(())
}
async fn handle_peer_connection(
mut stream: TcpStream,
peer_manager: Arc<PeerManager>,
network: Network,
) -> Result<()> {
let peer_addr = stream.peer_addr().map_err(|e| {
BitcoinError::ConnectionFailed(format!("Failed to get peer address: {}", e))
})?;
let mut buffer = vec![0u8; 1024 * 1024];
loop {
match stream.read(&mut buffer).await {
Ok(0) => {
peer_manager.remove_peer(&peer_addr);
break;
}
Ok(n) => {
let data = &buffer[..n];
match encode::deserialize::<RawNetworkMessage>(data) {
Ok(msg) => {
debug!(?peer_addr, command = ?msg.command(), "Received message");
match msg.payload() {
NetworkMessage::Version(v) => {
peer_manager.update_peer_version(&peer_addr, v.clone());
let verack = RawNetworkMessage::new(
network.magic(),
NetworkMessage::Verack,
);
let bytes = encode::serialize(&verack);
let _ = stream.write_all(&bytes).await;
}
NetworkMessage::Verack => {
debug!(?peer_addr, "Handshake completed");
}
NetworkMessage::Ping(nonce) => {
let pong = RawNetworkMessage::new(
network.magic(),
NetworkMessage::Pong(*nonce),
);
let bytes = encode::serialize(&pong);
let _ = stream.write_all(&bytes).await;
}
_ => {
}
}
}
Err(e) => {
warn!(?peer_addr, error = ?e, "Failed to parse message");
}
}
}
Err(e) => {
error!(?peer_addr, error = ?e, "Read error");
peer_manager.remove_peer(&peer_addr);
break;
}
}
}
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub struct P2pStats {
pub total_peers: usize,
pub transactions_broadcast: u64,
pub blocks_downloaded: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_p2p_config_default() {
let config = P2pConfig::default();
assert_eq!(config.network, Network::Bitcoin);
assert_eq!(config.max_peers, 8);
assert_eq!(config.protocol_version, 70016);
}
#[test]
fn test_peer_info_creation() {
let addr: SocketAddr = "127.0.0.1:8333".parse().unwrap();
let peer = PeerInfo::new(addr, false);
assert_eq!(peer.address, addr);
assert!(!peer.inbound);
assert_eq!(peer.messages_sent, 0);
assert_eq!(peer.messages_received, 0);
}
#[test]
fn test_peer_info_stale() {
let addr: SocketAddr = "127.0.0.1:8333".parse().unwrap();
let mut peer = PeerInfo::new(addr, false);
assert!(!peer.is_stale(Duration::from_secs(10)));
peer.last_seen = SystemTime::now() - Duration::from_secs(20);
assert!(peer.is_stale(Duration::from_secs(10)));
}
#[test]
fn test_peer_manager() {
let config = P2pConfig::default();
let (manager, _rx) = PeerManager::new(config);
let addr: SocketAddr = "127.0.0.1:8333".parse().unwrap();
assert!(manager.add_peer(addr, false));
assert_eq!(manager.peer_count(), 1);
assert!(!manager.add_peer(addr, false));
assert_eq!(manager.peer_count(), 1);
manager.remove_peer(&addr);
assert_eq!(manager.peer_count(), 0);
}
#[tokio::test]
async fn test_p2p_client_creation() {
let config = P2pConfig::default();
let (client, _rx) = P2pClient::new(config);
assert_eq!(client.peer_count(), 0);
}
#[test]
fn test_p2p_stats() {
let stats = P2pStats::default();
assert_eq!(stats.total_peers, 0);
assert_eq!(stats.transactions_broadcast, 0);
assert_eq!(stats.blocks_downloaded, 0);
}
}