#!/usr/bin/env rust
//! # P2P Foundation Bootstrap Node
//!
//! A dedicated bootstrap node for the P2P Foundation network.
//! Provides initial peer discovery and network connectivity for new nodes.
use anyhow::{Context, Result};
use clap::Parser;
use libp2p::{
multiaddr::Multiaddr,
PeerId,
};
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
path::PathBuf,
time::Duration,
};
use tokio::{
signal,
time::{interval, sleep},
};
use tracing::{info, warn, error, debug};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
// Re-export the core P2P functionality (this will need to be implemented)
use p2p_foundation::{
bootstrap::WordEncoder,
network::{P2PNode, NodeConfig},
transport::QuicTransport,
dht::KademliaConfig,
};
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Configuration file path
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
/// Listen address (overrides config)
#[arg(short, long)]
listen: Option<Multiaddr>,
/// Bootstrap mode (always true for this binary)
#[arg(long, default_value = "true")]
bootstrap: bool,
/// Verbose logging
#[arg(short, long)]
verbose: bool,
}
#[derive(Debug, Serialize, Deserialize)]
struct BootstrapConfig {
pub node: NodeConfig,
pub logging: LoggingConfig,
pub metrics: MetricsConfig,
pub health_check: HealthCheckConfig,
pub security: SecurityConfig,
pub dht: DhtConfig,
pub tunneling: TunnelingConfig,
}
#[derive(Debug, Serialize, Deserialize)]
struct NodeConfig {
pub listen_addresses: Vec<String>,
pub announce_addresses: Vec<String>,
pub bootstrap_mode: bool,
pub enable_mdns: bool,
pub enable_kad_bootstrap: bool,
pub max_connections: u32,
pub connection_idle_timeout: String,
pub max_concurrent_streams: u32,
}
#[derive(Debug, Serialize, Deserialize)]
struct LoggingConfig {
pub level: String,
pub file: String,
pub max_size: String,
pub max_files: u32,
}
#[derive(Debug, Serialize, Deserialize)]
struct MetricsConfig {
pub enabled: bool,
pub bind_address: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct HealthCheckConfig {
pub enabled: bool,
pub bind_address: String,
pub path: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct SecurityConfig {
pub enable_noise: bool,
pub enable_yamux: bool,
pub connection_timeout: String,
pub handshake_timeout: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct DhtConfig {
pub replication_factor: u32,
pub alpha: u32,
pub beta: u32,
pub random_walk_interval: String,
pub bootstrap_interval: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct TunnelingConfig {
pub enable_6to4: bool,
pub enable_teredo: bool,
pub enable_6in4: bool,
pub enable_dslite: bool,
pub enable_isatap: bool,
pub enable_map_e: bool,
pub enable_map_t: bool,
pub auto_detect: bool,
}
impl Default for BootstrapConfig {
fn default() -> Self {
Self {
node: NodeConfig {
listen_addresses: vec!["/ip6/::/udp/9000/quic".to_string()],
announce_addresses: vec![],
bootstrap_mode: true,
enable_mdns: true,
enable_kad_bootstrap: true,
max_connections: 1000,
connection_idle_timeout: "300s".to_string(),
max_concurrent_streams: 100,
},
logging: LoggingConfig {
level: "info".to_string(),
file: "/var/log/p2p-foundation/bootstrap-node.log".to_string(),
max_size: "100MB".to_string(),
max_files: 5,
},
metrics: MetricsConfig {
enabled: true,
bind_address: "127.0.0.1:9090".to_string(),
},
health_check: HealthCheckConfig {
enabled: true,
bind_address: "0.0.0.0:8080".to_string(),
path: "/health".to_string(),
},
security: SecurityConfig {
enable_noise: true,
enable_yamux: true,
connection_timeout: "30s".to_string(),
handshake_timeout: "10s".to_string(),
},
dht: DhtConfig {
replication_factor: 20,
alpha: 3,
beta: 3,
random_walk_interval: "300s".to_string(),
bootstrap_interval: "60s".to_string(),
},
tunneling: TunnelingConfig {
enable_6to4: true,
enable_teredo: true,
enable_6in4: true,
enable_dslite: true,
enable_isatap: true,
enable_map_e: true,
enable_map_t: true,
auto_detect: true,
},
}
}
}
struct BootstrapNode {
config: BootstrapConfig,
node: P2PNode,
word_encoder: WordEncoder,
start_time: std::time::Instant,
peer_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}
impl BootstrapNode {
pub async fn new(config: BootstrapConfig) -> Result<Self> {
info!("🚀 Starting P2P Foundation Bootstrap Node");
// Parse listen addresses
let listen_addrs: Vec<Multiaddr> = config.node.listen_addresses
.iter()
.map(|addr| addr.parse())
.collect::<Result<Vec<_>, _>>()
.context("Failed to parse listen addresses")?;
// Create node configuration
let node_config = p2p_foundation::network::NodeConfig {
listen_addresses: listen_addrs,
bootstrap_mode: true,
enable_mdns: config.node.enable_mdns,
max_connections: config.node.max_connections,
connection_idle_timeout: Duration::from_secs(300),
dht_config: KademliaConfig {
replication_factor: config.dht.replication_factor,
alpha: config.dht.alpha as u8,
beta: config.dht.beta as u8,
..Default::default()
},
..Default::default()
};
// Create P2P node
let node = P2PNode::new(node_config).await
.context("Failed to create P2P node")?;
// Create word encoder for three-word addresses
let word_encoder = WordEncoder::new();
let peer_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
Ok(Self {
config,
node,
word_encoder,
start_time: std::time::Instant::now(),
peer_count,
})
}
pub async fn start(&mut self) -> Result<()> {
info!("🌐 Starting bootstrap node...");
// Start the P2P node
self.node.start().await
.context("Failed to start P2P node")?;
// Get our peer ID and addresses
let peer_id = self.node.local_peer_id();
let listen_addrs = self.node.listeners();
info!("📍 Bootstrap node started:");
info!(" Peer ID: {}", peer_id);
for addr in &listen_addrs {
info!(" Listening on: {}", addr);
// Generate three-word address
if let Ok(three_words) = self.word_encoder.encode_multiaddr(addr) {
info!(" Three-word: {}", three_words);
}
}
// Start health check server if enabled
if self.config.health_check.enabled {
self.start_health_server().await?;
}
// Start metrics server if enabled
if self.config.metrics.enabled {
self.start_metrics_server().await?;
}
// Start monitoring tasks
self.start_monitoring_tasks().await;
info!("✅ Bootstrap node is ready and accepting connections");
info!("🔗 Other nodes can bootstrap using:");
for addr in &listen_addrs {
info!(" {}", addr);
if let Ok(three_words) = self.word_encoder.encode_multiaddr(addr) {
info!(" {} (three-word)", three_words);
}
}
Ok(())
}
async fn start_health_server(&self) -> Result<()> {
let bind_addr = self.config.health_check.bind_address.clone();
let peer_count = self.peer_count.clone();
let start_time = self.start_time;
tokio::spawn(async move {
use warp::Filter;
let health = warp::path("health")
.map(move || {
let uptime = start_time.elapsed().as_secs();
let peers = peer_count.load(std::sync::atomic::Ordering::Relaxed);
let response = serde_json::json!({
"status": "healthy",
"uptime_seconds": uptime,
"connected_peers": peers,
"timestamp": chrono::Utc::now().to_rfc3339(),
"version": env!("CARGO_PKG_VERSION"),
"node_type": "bootstrap"
});
warp::reply::json(&response)
});
let routes = health
.with(warp::cors().allow_any_origin());
info!("🏥 Health check server starting on http://{}/health", bind_addr);
let addr: std::net::SocketAddr = bind_addr.parse()
.expect("Invalid health check bind address");
warp::serve(routes)
.run(addr)
.await;
});
Ok(())
}
async fn start_metrics_server(&self) -> Result<()> {
let bind_addr = self.config.metrics.bind_address.clone();
let peer_count = self.peer_count.clone();
tokio::spawn(async move {
use warp::Filter;
let metrics = warp::path("metrics")
.map(move || {
let peers = peer_count.load(std::sync::atomic::Ordering::Relaxed);
// Prometheus-style metrics
let metrics = format!(
"# HELP p2p_connected_peers Number of connected peers\n\
# TYPE p2p_connected_peers gauge\n\
p2p_connected_peers {}\n\
# HELP p2p_node_info Node information\n\
# TYPE p2p_node_info gauge\n\
p2p_node_info{{version=\"{}\",type=\"bootstrap\"}} 1\n",
peers,
env!("CARGO_PKG_VERSION")
);
warp::reply::with_header(
metrics,
"content-type",
"text/plain; version=0.0.4"
)
});
info!("📊 Metrics server starting on http://{}/metrics", bind_addr);
let addr: std::net::SocketAddr = bind_addr.parse()
.expect("Invalid metrics bind address");
warp::serve(metrics)
.run(addr)
.await;
});
Ok(())
}
async fn start_monitoring_tasks(&self) {
let peer_count = self.peer_count.clone();
// Periodic peer count logging
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(60));
loop {
interval.tick().await;
let count = peer_count.load(std::sync::atomic::Ordering::Relaxed);
info!("📊 Connected peers: {}", count);
}
});
// Network status monitoring
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(300)); // 5 minutes
loop {
interval.tick().await;
// Log system resources
if let Ok(output) = tokio::process::Command::new("free")
.arg("-m")
.output()
.await
{
if let Ok(output_str) = String::from_utf8(output.stdout) {
for line in output_str.lines() {
if line.starts_with("Mem:") {
debug!("💾 Memory usage: {}", line);
break;
}
}
}
}
}
});
}
pub async fn run(&mut self) -> Result<()> {
// Handle shutdown signals
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
.context("Failed to create SIGTERM handler")?;
let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
.context("Failed to create SIGINT handler")?;
tokio::select! {
_ = sigterm.recv() => {
info!("📨 Received SIGTERM, shutting down gracefully...");
}
_ = sigint.recv() => {
info!("📨 Received SIGINT, shutting down gracefully...");
}
result = self.run_forever() => {
match result {
Ok(_) => info!("✅ Bootstrap node completed successfully"),
Err(e) => error!("❌ Bootstrap node error: {}", e),
}
}
}
info!("🛑 Shutting down bootstrap node...");
self.shutdown().await?;
info!("👋 Bootstrap node shutdown complete");
Ok(())
}
async fn run_forever(&mut self) -> Result<()> {
// Main event loop - in a real implementation, this would handle
// network events, peer connections, DHT operations, etc.
loop {
sleep(Duration::from_secs(10)).await;
// Update peer count (placeholder - would come from actual node)
// let current_peers = self.node.connected_peers().len();
// self.peer_count.store(current_peers, std::sync::atomic::Ordering::Relaxed);
}
}
async fn shutdown(&mut self) -> Result<()> {
// Graceful shutdown - close connections, save state, etc.
info!("🔄 Performing graceful shutdown...");
// In a real implementation, this would:
// - Close all peer connections gracefully
// - Save DHT state
// - Stop all background tasks
// - Clean up resources
Ok(())
}
}
fn setup_logging(level: &str, file_path: Option<&str>) -> Result<()> {
let level = match level.to_lowercase().as_str() {
"trace" => tracing::Level::TRACE,
"debug" => tracing::Level::DEBUG,
"info" => tracing::Level::INFO,
"warn" => tracing::Level::WARN,
"error" => tracing::Level::ERROR,
_ => tracing::Level::INFO,
};
let stdout_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true);
if let Some(file_path) = file_path {
// Ensure log directory exists
if let Some(parent) = std::path::Path::new(file_path).parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(file_path)?;
let file_layer = tracing_subscriber::fmt::layer()
.with_writer(file)
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.json();
tracing_subscriber::registry()
.with(stdout_layer)
.with(file_layer)
.with(tracing_subscriber::filter::LevelFilter::from_level(level))
.init();
} else {
tracing_subscriber::registry()
.with(stdout_layer)
.with(tracing_subscriber::filter::LevelFilter::from_level(level))
.init();
}
Ok(())
}
fn load_config(config_path: Option<&PathBuf>) -> Result<BootstrapConfig> {
if let Some(path) = config_path {
let content = std::fs::read_to_string(path)
.with_context(|| format!("Failed to read config file: {:?}", path))?;
let config: BootstrapConfig = toml::from_str(&content)
.with_context(|| format!("Failed to parse config file: {:?}", path))?;
Ok(config)
} else {
info!("📋 No config file specified, using defaults");
Ok(BootstrapConfig::default())
}
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
// Load configuration
let mut config = load_config(args.config.as_ref())
.context("Failed to load configuration")?;
// Override with command line arguments
if let Some(listen_addr) = args.listen {
config.node.listen_addresses = vec![listen_addr.to_string()];
}
// Setup logging
let log_level = if args.verbose { "debug" } else { &config.logging.level };
let log_file = if config.logging.file.is_empty() {
None
} else {
Some(config.logging.file.as_str())
};
setup_logging(log_level, log_file)
.context("Failed to setup logging")?;
info!("🚀 P2P Foundation Bootstrap Node v{}", env!("CARGO_PKG_VERSION"));
info!("📋 Configuration loaded");
// Create and start bootstrap node
let mut bootstrap_node = BootstrapNode::new(config).await
.context("Failed to create bootstrap node")?;
bootstrap_node.start().await
.context("Failed to start bootstrap node")?;
bootstrap_node.run().await
.context("Bootstrap node encountered an error")?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = BootstrapConfig::default();
assert!(config.node.bootstrap_mode);
assert!(config.health_check.enabled);
assert_eq!(config.node.max_connections, 1000);
}
#[tokio::test]
async fn test_config_loading() {
// Test would verify config file loading
}
}