use crate::core::error::Result;
use crate::error::TrustformersError;
use crate::hub_differential::{EnhancedDeltaInfo, ModelVersion};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::{broadcast, RwLock};
use tokio::time::{interval, timeout};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PeerInfo {
pub peer_id: String,
pub address: SocketAddr,
pub public_key: String,
pub last_seen: SystemTime,
pub reputation: f64, pub capabilities: PeerCapabilities,
pub bandwidth: BandwidthInfo,
pub models: Vec<ModelAdvertisement>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PeerCapabilities {
pub can_serve_models: bool,
pub can_compute_diffs: bool,
pub supported_algorithms: Vec<String>,
pub max_model_size: u64,
pub storage_capacity: u64,
pub compute_power: f64, }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct BandwidthInfo {
pub upload_mbps: f64,
pub download_mbps: f64,
pub latency_ms: u32,
pub measured_at: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ModelAdvertisement {
pub model_id: String,
pub version: String,
pub size: u64,
pub checksum: String,
pub availability: f64, pub last_updated: SystemTime,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum P2PMessage {
PeerAnnouncement(PeerInfo),
PeerDiscovery,
PeerList(Vec<PeerInfo>),
ModelRequest {
model_id: String,
version: String,
requestor_id: String,
},
ModelResponse {
model_id: String,
version: String,
available: bool,
estimated_time: Duration,
chunk_info: ChunkInfo,
},
ModelChunk {
model_id: String,
version: String,
chunk_id: u32,
data: Vec<u8>,
checksum: String,
},
DeltaRequest {
model_id: String,
from_version: String,
to_version: String,
requestor_id: String,
},
DeltaResponse {
model_id: String,
from_version: String,
to_version: String,
delta_info: Option<EnhancedDeltaInfo>,
available: bool,
},
Heartbeat {
peer_id: String,
timestamp: SystemTime,
},
PingRequest {
peer_id: String,
timestamp: SystemTime,
},
PingResponse {
peer_id: String,
timestamp: SystemTime,
latency: Duration,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ChunkInfo {
pub total_chunks: u32,
pub chunk_size: u32,
pub total_size: u64,
pub checksums: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct P2PConfig {
pub listen_address: SocketAddr,
pub discovery_port: u16,
pub max_peers: usize,
pub heartbeat_interval: Duration,
pub peer_timeout: Duration,
pub chunk_size: u32,
pub max_concurrent_transfers: usize,
pub enable_dht: bool,
pub bootstrap_peers: Vec<SocketAddr>,
pub reputation_threshold: f64,
pub storage_path: PathBuf,
}
impl Default for P2PConfig {
fn default() -> Self {
Self {
listen_address: "127.0.0.1:8080".parse().expect("failed to parse"),
discovery_port: 8081,
max_peers: 100,
heartbeat_interval: Duration::from_secs(30),
peer_timeout: Duration::from_secs(300),
chunk_size: 1024 * 1024, max_concurrent_transfers: 5,
enable_dht: true,
bootstrap_peers: Vec::new(),
reputation_threshold: 0.5,
storage_path: PathBuf::from("./p2p_storage"),
}
}
}
#[derive(Debug)]
pub struct ReputationTracker {
reputation_scores: HashMap<String, f64>,
interaction_history: HashMap<String, Vec<ReputationEvent>>,
decay_factor: f64,
}
#[derive(Debug, Clone)]
pub struct ReputationEvent {
event_type: ReputationEventType,
timestamp: SystemTime,
score_delta: f64,
}
#[derive(Debug, Clone)]
pub enum ReputationEventType {
SuccessfulTransfer,
FailedTransfer,
InvalidData,
SlowResponse,
FastResponse,
Availability,
}
impl Default for ReputationTracker {
fn default() -> Self {
Self::new()
}
}
impl ReputationTracker {
pub fn new() -> Self {
Self {
reputation_scores: HashMap::new(),
interaction_history: HashMap::new(),
decay_factor: 0.95, }
}
pub fn get_reputation(&self, peer_id: &str) -> f64 {
self.reputation_scores.get(peer_id).copied().unwrap_or(0.5)
}
pub fn update_reputation(&mut self, peer_id: &str, event: ReputationEvent) {
let current_score = self.get_reputation(peer_id);
let new_score = (current_score + event.score_delta).clamp(0.0, 1.0);
self.reputation_scores.insert(peer_id.to_string(), new_score);
self.interaction_history.entry(peer_id.to_string()).or_default().push(event);
}
pub fn decay_reputations(&mut self) {
for score in self.reputation_scores.values_mut() {
*score = (*score * self.decay_factor).max(0.0);
}
}
}
pub struct P2PNode {
config: P2PConfig,
peer_id: String,
peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
reputation: Arc<Mutex<ReputationTracker>>,
message_sender: broadcast::Sender<P2PMessage>,
message_receiver: broadcast::Receiver<P2PMessage>,
models: Arc<RwLock<HashMap<String, ModelVersion>>>,
active_transfers: Arc<RwLock<HashMap<String, TransferState>>>,
}
#[derive(Debug, Clone)]
pub struct TransferState {
pub model_id: String,
pub version: String,
pub peer_id: String,
pub progress: f64,
pub start_time: SystemTime,
pub chunks_received: HashSet<u32>,
pub total_chunks: u32,
}
impl P2PNode {
pub async fn new(config: P2PConfig) -> Result<Self> {
let peer_id = Self::generate_peer_id();
let (message_sender, message_receiver) = broadcast::channel(1000);
tokio::fs::create_dir_all(&config.storage_path).await.map_err(|e| {
TrustformersError::Io {
message: format!("Failed to create storage directory: {}", e),
path: None,
suggestion: Some(
"Check if you have write permissions in the parent directory".to_string(),
),
}
})?;
Ok(Self {
config,
peer_id,
peers: Arc::new(RwLock::new(HashMap::new())),
reputation: Arc::new(Mutex::new(ReputationTracker::new())),
message_sender,
message_receiver,
models: Arc::new(RwLock::new(HashMap::new())),
active_transfers: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn start(&mut self) -> Result<()> {
let tcp_listener = TcpListener::bind(&self.config.listen_address).await.map_err(|e| {
TrustformersError::Network {
message: format!("Failed to bind TCP listener: {}", e),
url: None,
status_code: None,
suggestion: Some("Check if the port is already in use".to_string()),
retry_recommended: false,
}
})?;
let discovery_addr =
SocketAddr::new(self.config.listen_address.ip(), self.config.discovery_port);
let udp_socket =
UdpSocket::bind(discovery_addr).await.map_err(|e| TrustformersError::Network {
message: format!("Failed to bind UDP socket: {}", e),
url: None,
status_code: None,
suggestion: Some("Check if the port is already in use".to_string()),
retry_recommended: false,
})?;
self.spawn_tcp_handler(tcp_listener).await;
self.spawn_discovery_handler(udp_socket).await;
self.spawn_heartbeat_task().await;
self.spawn_reputation_decay_task().await;
self.connect_to_bootstrap_peers().await?;
println!(
"P2P node {} started on {}",
self.peer_id, self.config.listen_address
);
Ok(())
}
async fn spawn_tcp_handler(&self, listener: TcpListener) {
let peers = self.peers.clone();
let reputation = self.reputation.clone();
let message_sender = self.message_sender.clone();
let models = self.models.clone();
let active_transfers = self.active_transfers.clone();
let peer_id = self.peer_id.clone();
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, addr)) => {
let peers = peers.clone();
let reputation = reputation.clone();
let message_sender = message_sender.clone();
let models = models.clone();
let active_transfers = active_transfers.clone();
let peer_id = peer_id.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_peer_connection(
stream,
addr,
peers,
reputation,
message_sender,
models,
active_transfers,
peer_id,
)
.await
{
eprintln!("Error handling peer connection: {}", e);
}
});
},
Err(e) => {
eprintln!("Failed to accept connection: {}", e);
},
}
}
});
}
async fn spawn_discovery_handler(&self, socket: UdpSocket) {
let peers = self.peers.clone();
let peer_id = self.peer_id.clone();
let message_sender = self.message_sender.clone();
tokio::spawn(async move {
let mut buf = [0u8; 4096];
loop {
match socket.recv_from(&mut buf).await {
Ok((len, addr)) => {
if let Ok(message) = serde_json::from_slice::<P2PMessage>(&buf[..len]) {
match message {
P2PMessage::PeerDiscovery => {
let our_info = Self::create_peer_info(&peer_id, addr);
let response = P2PMessage::PeerAnnouncement(our_info);
if let Ok(response_data) = serde_json::to_vec(&response) {
let _ = socket.send_to(&response_data, addr).await;
}
},
P2PMessage::PeerAnnouncement(peer_info) => {
let mut peers_lock = peers.write().await;
peers_lock.insert(peer_info.peer_id.clone(), peer_info);
},
_ => {},
}
}
},
Err(e) => {
eprintln!("Discovery error: {}", e);
},
}
}
});
}
async fn spawn_heartbeat_task(&self) {
let peers = self.peers.clone();
let reputation = self.reputation.clone();
let heartbeat_interval = self.config.heartbeat_interval;
let peer_timeout = self.config.peer_timeout;
let peer_id = self.peer_id.clone();
tokio::spawn(async move {
let mut interval = interval(heartbeat_interval);
loop {
interval.tick().await;
let heartbeat = P2PMessage::Heartbeat {
peer_id: peer_id.clone(),
timestamp: SystemTime::now(),
};
let mut peers_lock = peers.write().await;
let now = SystemTime::now();
peers_lock.retain(|_, peer| {
let time_since_seen =
now.duration_since(peer.last_seen).unwrap_or(Duration::from_secs(0));
time_since_seen < peer_timeout
});
drop(peers_lock);
}
});
}
async fn spawn_reputation_decay_task(&self) {
let reputation = self.reputation.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(3600));
loop {
interval.tick().await;
let mut reputation_lock = reputation.lock().expect("lock should not be poisoned");
reputation_lock.decay_reputations();
}
});
}
async fn handle_peer_connection(
mut stream: TcpStream,
addr: SocketAddr,
peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
reputation: Arc<Mutex<ReputationTracker>>,
message_sender: broadcast::Sender<P2PMessage>,
models: Arc<RwLock<HashMap<String, ModelVersion>>>,
active_transfers: Arc<RwLock<HashMap<String, TransferState>>>,
peer_id: String,
) -> Result<()> {
let mut buffer = [0u8; 4096];
loop {
match timeout(Duration::from_secs(30), stream.read(&mut buffer)).await {
Ok(Ok(0)) => break, Ok(Ok(n)) => {
if let Ok(message) = serde_json::from_slice::<P2PMessage>(&buffer[..n]) {
Self::handle_message(
message,
&mut stream,
&peers,
&reputation,
&message_sender,
&models,
&active_transfers,
&peer_id,
)
.await?;
}
},
Ok(Err(e)) => {
eprintln!("Read error: {}", e);
break;
},
Err(_) => {
break;
},
}
}
Ok(())
}
async fn handle_message(
message: P2PMessage,
stream: &mut TcpStream,
peers: &Arc<RwLock<HashMap<String, PeerInfo>>>,
reputation: &Arc<Mutex<ReputationTracker>>,
message_sender: &broadcast::Sender<P2PMessage>,
models: &Arc<RwLock<HashMap<String, ModelVersion>>>,
active_transfers: &Arc<RwLock<HashMap<String, TransferState>>>,
peer_id: &str,
) -> Result<()> {
match message {
P2PMessage::ModelRequest {
model_id,
version,
requestor_id,
} => {
let models_lock = models.read().await;
let model_key = format!("{}:{}", model_id, version);
if let Some(model) = models_lock.get(&model_key) {
let chunk_info = ChunkInfo {
total_chunks: (model.file_size / 1024 / 1024) as u32 + 1, chunk_size: 1024 * 1024,
total_size: model.file_size,
checksums: vec!["dummy".to_string()], };
let response = P2PMessage::ModelResponse {
model_id: model_id.clone(),
version: version.clone(),
available: true,
estimated_time: Duration::from_secs(60),
chunk_info,
};
let response_data = serde_json::to_vec(&response).map_err(|e| {
TrustformersError::InvalidInput {
message: format!("Failed to serialize response: {}", e),
parameter: Some("response".to_string()),
expected: Some("serializable data".to_string()),
received: None,
suggestion: Some("Check if the response data is valid".to_string()),
}
})?;
stream.write_all(&response_data).await.map_err(|e| {
TrustformersError::Network {
message: format!("Failed to send response: {}", e),
url: None,
status_code: None,
suggestion: Some("Check network connection".to_string()),
retry_recommended: true,
}
})?;
}
},
P2PMessage::DeltaRequest {
model_id,
from_version,
to_version,
requestor_id,
} => {
let models_lock = models.read().await;
let from_key = format!("{}:{}", model_id, from_version);
let to_key = format!("{}:{}", model_id, to_version);
let available =
models_lock.contains_key(&from_key) && models_lock.contains_key(&to_key);
let response = P2PMessage::DeltaResponse {
model_id,
from_version,
to_version,
delta_info: None, available,
};
let response_data =
serde_json::to_vec(&response).map_err(|e| TrustformersError::InvalidInput {
message: format!("Failed to serialize delta response: {}", e),
parameter: Some("delta_response".to_string()),
expected: Some("serializable data".to_string()),
received: None,
suggestion: Some("Check if the delta response data is valid".to_string()),
})?;
stream.write_all(&response_data).await.map_err(|e| TrustformersError::Network {
message: format!("Failed to send delta response: {}", e),
url: None,
status_code: None,
suggestion: Some("Check network connection".to_string()),
retry_recommended: true,
})?;
},
P2PMessage::PingRequest {
peer_id: requester_id,
timestamp,
} => {
let response = P2PMessage::PingResponse {
peer_id: peer_id.to_string(),
timestamp: SystemTime::now(),
latency: SystemTime::now().duration_since(timestamp).unwrap_or(Duration::ZERO),
};
let response_data =
serde_json::to_vec(&response).map_err(|e| TrustformersError::InvalidInput {
message: format!("Failed to serialize ping response: {}", e),
parameter: Some("ping_response".to_string()),
expected: Some("serializable data".to_string()),
received: None,
suggestion: Some("Check if the ping response data is valid".to_string()),
})?;
stream.write_all(&response_data).await.map_err(|e| TrustformersError::Network {
message: format!("Failed to send ping response: {}", e),
url: None,
status_code: None,
suggestion: Some("Check network connection".to_string()),
retry_recommended: true,
})?;
},
_ => {
},
}
Ok(())
}
async fn connect_to_bootstrap_peers(&self) -> Result<()> {
for peer_addr in &self.config.bootstrap_peers {
match TcpStream::connect(peer_addr).await {
Ok(mut stream) => {
let discovery = P2PMessage::PeerDiscovery;
let data = serde_json::to_vec(&discovery).map_err(|e| {
TrustformersError::InvalidInput {
message: format!("Failed to serialize discovery: {}", e),
parameter: Some("discovery".to_string()),
expected: Some("serializable data".to_string()),
received: None,
suggestion: Some("Check if the discovery data is valid".to_string()),
}
})?;
if let Err(e) = stream.write_all(&data).await {
eprintln!("Failed to send discovery to {}: {}", peer_addr, e);
}
},
Err(e) => {
eprintln!("Failed to connect to bootstrap peer {}: {}", peer_addr, e);
},
}
}
Ok(())
}
pub async fn request_model(&self, model_id: &str, version: &str) -> Result<PathBuf> {
let peers = self.find_peers_with_model(model_id, version).await;
if peers.is_empty() {
return Err(TrustformersError::Hub {
message: format!("No peers found with model {}:{}", model_id, version),
model_id: model_id.to_string(),
endpoint: None,
suggestion: Some(
"Try connecting to more bootstrap peers or wait for peer discovery".to_string(),
),
recovery_actions: vec![],
}
.into());
}
let best_peer = self.select_best_peer(&peers).await;
self.download_model_from_peer(&best_peer, model_id, version).await
}
pub async fn share_model(
&self,
model_path: &Path,
model_id: &str,
version: &str,
) -> Result<()> {
let model_data = fs::read(model_path).await.map_err(|e| TrustformersError::Io {
message: format!("Failed to read model file: {}", e),
path: None,
suggestion: Some("Check if the file exists and is readable".to_string()),
})?;
let model_version = ModelVersion {
id: format!("{}:{}", model_id, version),
model_id: model_id.to_string(),
version: version.to_string(),
parent_version: None,
created_at: SystemTime::now(),
file_hash: Self::calculate_hash(&model_data),
file_size: model_data.len() as u64,
compressed_size: None,
description: None,
changes: Vec::new(),
metadata: HashMap::new(),
};
let storage_path = self.config.storage_path.join(format!("{}_{}.model", model_id, version));
fs::write(&storage_path, model_data).await.map_err(|e| TrustformersError::Io {
message: format!("Failed to store model: {}", e),
path: None,
suggestion: Some("Check if you have write permissions".to_string()),
})?;
let mut models_lock = self.models.write().await;
models_lock.insert(model_version.id.clone(), model_version);
println!("Model {}:{} is now being shared", model_id, version);
Ok(())
}
async fn find_peers_with_model(&self, model_id: &str, version: &str) -> Vec<PeerInfo> {
let peers_lock = self.peers.read().await;
peers_lock
.values()
.filter(|peer| {
peer.models.iter().any(|ad| ad.model_id == model_id && ad.version == version)
})
.cloned()
.collect()
}
async fn select_best_peer(&self, peers: &[PeerInfo]) -> PeerInfo {
let reputation_lock = self.reputation.lock().expect("lock should not be poisoned");
peers
.iter()
.max_by(|a, b| {
let score_a =
reputation_lock.get_reputation(&a.peer_id) * a.bandwidth.download_mbps;
let score_b =
reputation_lock.get_reputation(&b.peer_id) * b.bandwidth.download_mbps;
score_a.partial_cmp(&score_b).unwrap_or(std::cmp::Ordering::Equal)
})
.cloned()
.unwrap_or_else(|| peers[0].clone())
}
async fn download_model_from_peer(
&self,
peer: &PeerInfo,
model_id: &str,
version: &str,
) -> Result<PathBuf> {
let mut stream =
TcpStream::connect(&peer.address)
.await
.map_err(|e| TrustformersError::Network {
message: format!("Failed to connect to peer: {}", e),
url: None,
status_code: None,
suggestion: Some("Check network connection and peer availability".to_string()),
retry_recommended: true,
})?;
let request = P2PMessage::ModelRequest {
model_id: model_id.to_string(),
version: version.to_string(),
requestor_id: self.peer_id.clone(),
};
let request_data =
serde_json::to_vec(&request).map_err(|e| TrustformersError::InvalidInput {
message: format!("Failed to serialize request: {}", e),
parameter: Some("request".to_string()),
expected: Some("serializable data".to_string()),
received: None,
suggestion: Some("Check if the request data is valid".to_string()),
})?;
stream.write_all(&request_data).await.map_err(|e| TrustformersError::Network {
message: format!("Failed to send request: {}", e),
url: None,
status_code: None,
suggestion: Some("Check network connection".to_string()),
retry_recommended: true,
})?;
let mut buffer = [0u8; 4096];
let n = stream.read(&mut buffer).await.map_err(|e| TrustformersError::Network {
message: format!("Failed to read response: {}", e),
url: None,
status_code: None,
suggestion: Some("Check network connection".to_string()),
retry_recommended: true,
})?;
let response: P2PMessage =
serde_json::from_slice(&buffer[..n]).map_err(|e| TrustformersError::InvalidInput {
message: format!("Failed to parse response: {}", e),
parameter: Some("response".to_string()),
expected: Some("valid JSON".to_string()),
received: None,
suggestion: Some("Check if the response is corrupted".to_string()),
})?;
match response {
P2PMessage::ModelResponse {
available: true,
chunk_info,
..
} => {
let output_path =
self.config.storage_path.join(format!("{}_{}.model", model_id, version));
self.download_chunks(&mut stream, &chunk_info, &output_path).await?;
Ok(output_path)
},
_ => Err(TrustformersError::Hub {
message: "Model not available from peer".to_string(),
model_id: model_id.to_string(),
endpoint: None,
suggestion: Some("Try requesting from a different peer".to_string()),
recovery_actions: vec![],
}
.into()),
}
}
async fn download_chunks(
&self,
stream: &mut TcpStream,
chunk_info: &ChunkInfo,
output_path: &Path,
) -> Result<()> {
let mut file_data = Vec::with_capacity(chunk_info.total_size as usize);
for chunk_id in 0..chunk_info.total_chunks {
let mut chunk_buffer = vec![0u8; chunk_info.chunk_size as usize];
let n =
stream.read(&mut chunk_buffer).await.map_err(|e| TrustformersError::Network {
message: format!("Failed to read chunk: {}", e),
url: None,
status_code: None,
suggestion: Some("Check network connection".to_string()),
retry_recommended: true,
})?;
chunk_buffer.truncate(n);
file_data.extend_from_slice(&chunk_buffer);
}
fs::write(output_path, file_data).await.map_err(|e| TrustformersError::Io {
message: format!("Failed to write model file: {}", e),
path: None,
suggestion: Some("Check if you have write permissions".to_string()),
})?;
Ok(())
}
fn generate_peer_id() -> String {
let mut hasher = Sha256::new();
hasher.update(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_nanos()
.to_be_bytes(),
);
hex::encode(hasher.finalize())[..16].to_string()
}
fn create_peer_info(peer_id: &str, addr: SocketAddr) -> PeerInfo {
PeerInfo {
peer_id: peer_id.to_string(),
address: addr,
public_key: "dummy_key".to_string(),
last_seen: SystemTime::now(),
reputation: 0.5,
capabilities: PeerCapabilities {
can_serve_models: true,
can_compute_diffs: true,
supported_algorithms: vec!["xdelta3".to_string(), "layerwise".to_string()],
max_model_size: 10 * 1024 * 1024 * 1024, storage_capacity: 100 * 1024 * 1024 * 1024, compute_power: 1.0,
},
bandwidth: BandwidthInfo {
upload_mbps: 100.0,
download_mbps: 100.0,
latency_ms: 50,
measured_at: SystemTime::now(),
},
models: Vec::new(),
}
}
fn calculate_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
hex::encode(hasher.finalize())
}
pub async fn get_peer_count(&self) -> usize {
let peers_lock = self.peers.read().await;
peers_lock.len()
}
pub async fn get_model_count(&self) -> usize {
let models_lock = self.models.read().await;
models_lock.len()
}
pub fn get_peer_id(&self) -> &str {
&self.peer_id
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_peer_creation() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
let config = P2PConfig {
storage_path: temp_dir.path().to_path_buf(),
..Default::default()
};
let node = P2PNode::new(config).await.expect("async operation failed");
assert!(!node.peer_id.is_empty());
}
#[test]
fn test_reputation_tracker() {
let mut tracker = ReputationTracker::new();
let event = ReputationEvent {
event_type: ReputationEventType::SuccessfulTransfer,
timestamp: SystemTime::now(),
score_delta: 0.1,
};
tracker.update_reputation("peer1", event);
assert_eq!(tracker.get_reputation("peer1"), 0.6);
tracker.decay_reputations();
assert!(tracker.get_reputation("peer1") < 0.6);
}
#[test]
fn test_peer_info_serialization() {
let peer_info = PeerInfo {
peer_id: "test_peer".to_string(),
address: "127.0.0.1:8080".parse().expect("failed to parse"),
public_key: "test_key".to_string(),
last_seen: SystemTime::now(),
reputation: 0.8,
capabilities: PeerCapabilities {
can_serve_models: true,
can_compute_diffs: true,
supported_algorithms: vec!["xdelta3".to_string()],
max_model_size: 1024,
storage_capacity: 2048,
compute_power: 1.0,
},
bandwidth: BandwidthInfo {
upload_mbps: 100.0,
download_mbps: 100.0,
latency_ms: 50,
measured_at: SystemTime::now(),
},
models: Vec::new(),
};
let serialized = serde_json::to_string(&peer_info).expect("JSON serialization failed");
let deserialized: PeerInfo =
serde_json::from_str(&serialized).expect("JSON deserialization failed");
assert_eq!(peer_info.peer_id, deserialized.peer_id);
assert_eq!(peer_info.reputation, deserialized.reputation);
}
#[test]
fn test_p2p_config_default() {
let config = P2PConfig::default();
assert_eq!(config.max_peers, 100);
assert_eq!(config.heartbeat_interval, Duration::from_secs(30));
assert_eq!(config.peer_timeout, Duration::from_secs(300));
assert_eq!(config.chunk_size, 1024 * 1024);
assert_eq!(config.max_concurrent_transfers, 5);
assert!(config.enable_dht);
assert!(config.bootstrap_peers.is_empty());
assert!((config.reputation_threshold - 0.5).abs() < f64::EPSILON);
}
#[test]
fn test_reputation_tracker_default_score() {
let tracker = ReputationTracker::new();
assert!((tracker.get_reputation("unknown_peer") - 0.5).abs() < f64::EPSILON);
}
#[test]
fn test_reputation_tracker_multiple_events() {
let mut tracker = ReputationTracker::new();
for _ in 0..5 {
let event = ReputationEvent {
event_type: ReputationEventType::SuccessfulTransfer,
timestamp: SystemTime::now(),
score_delta: 0.05,
};
tracker.update_reputation("peer_a", event);
}
let rep = tracker.get_reputation("peer_a");
assert!(rep > 0.5);
assert!(rep <= 1.0);
}
#[test]
fn test_reputation_tracker_negative_event() {
let mut tracker = ReputationTracker::new();
let event = ReputationEvent {
event_type: ReputationEventType::InvalidData,
timestamp: SystemTime::now(),
score_delta: -0.3,
};
tracker.update_reputation("bad_peer", event);
let rep = tracker.get_reputation("bad_peer");
assert!(rep < 0.5);
assert!(rep >= 0.0);
}
#[test]
fn test_reputation_tracker_clamp_to_one() {
let mut tracker = ReputationTracker::new();
let event = ReputationEvent {
event_type: ReputationEventType::SuccessfulTransfer,
timestamp: SystemTime::now(),
score_delta: 0.6,
};
tracker.update_reputation("great_peer", event);
assert!((tracker.get_reputation("great_peer") - 1.0).abs() < f64::EPSILON);
}
#[test]
fn test_reputation_tracker_clamp_to_zero() {
let mut tracker = ReputationTracker::new();
let event = ReputationEvent {
event_type: ReputationEventType::InvalidData,
timestamp: SystemTime::now(),
score_delta: -1.0,
};
tracker.update_reputation("terrible_peer", event);
assert!((tracker.get_reputation("terrible_peer") - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_reputation_decay() {
let mut tracker = ReputationTracker::new();
let event = ReputationEvent {
event_type: ReputationEventType::SuccessfulTransfer,
timestamp: SystemTime::now(),
score_delta: 0.3,
};
tracker.update_reputation("peer_x", event);
let before_decay = tracker.get_reputation("peer_x");
tracker.decay_reputations();
let after_decay = tracker.get_reputation("peer_x");
assert!(after_decay < before_decay);
}
#[test]
fn test_reputation_decay_multiple_rounds() {
let mut tracker = ReputationTracker::new();
let event = ReputationEvent {
event_type: ReputationEventType::Availability,
timestamp: SystemTime::now(),
score_delta: 0.4,
};
tracker.update_reputation("peer_y", event);
for _ in 0..10 {
tracker.decay_reputations();
}
let rep = tracker.get_reputation("peer_y");
assert!(rep >= 0.0);
assert!(rep < 0.9); }
#[test]
fn test_peer_capabilities_creation() {
let caps = PeerCapabilities {
can_serve_models: true,
can_compute_diffs: false,
supported_algorithms: vec!["bsdiff".to_string()],
max_model_size: 10_000_000_000,
storage_capacity: 100_000_000_000,
compute_power: 2.5,
};
assert!(caps.can_serve_models);
assert!(!caps.can_compute_diffs);
assert_eq!(caps.supported_algorithms.len(), 1);
}
#[test]
fn test_bandwidth_info_creation() {
let info = BandwidthInfo {
upload_mbps: 50.0,
download_mbps: 200.0,
latency_ms: 25,
measured_at: SystemTime::now(),
};
assert!(info.download_mbps > info.upload_mbps);
assert!(info.latency_ms > 0);
}
#[test]
fn test_model_advertisement_creation() {
let ad = ModelAdvertisement {
model_id: "llama-7b".to_string(),
version: "v2".to_string(),
size: 7_000_000_000,
checksum: "sha256_hash".to_string(),
availability: 0.95,
last_updated: SystemTime::now(),
metadata: {
let mut m = HashMap::new();
m.insert("task".to_string(), "generation".to_string());
m
},
};
assert_eq!(ad.model_id, "llama-7b");
assert!(ad.availability > 0.0 && ad.availability <= 1.0);
}
#[test]
fn test_chunk_info_creation() {
let info = ChunkInfo {
total_chunks: 100,
chunk_size: 1024 * 1024,
total_size: 100 * 1024 * 1024,
checksums: vec!["hash".to_string(); 100],
};
assert_eq!(info.total_chunks, 100);
assert_eq!(info.checksums.len(), info.total_chunks as usize);
}
#[test]
fn test_transfer_state_creation() {
let state = TransferState {
model_id: "model_1".to_string(),
version: "1.0".to_string(),
peer_id: "peer_abc".to_string(),
progress: 0.5,
start_time: SystemTime::now(),
chunks_received: {
let mut set = HashSet::new();
set.insert(0);
set.insert(1);
set.insert(2);
set
},
total_chunks: 10,
};
assert_eq!(state.chunks_received.len(), 3);
assert!((state.progress - 0.5).abs() < f64::EPSILON);
}
#[test]
fn test_reputation_event_type_variants() {
let events = [
ReputationEventType::SuccessfulTransfer,
ReputationEventType::FailedTransfer,
ReputationEventType::InvalidData,
ReputationEventType::SlowResponse,
ReputationEventType::FastResponse,
ReputationEventType::Availability,
];
assert_eq!(events.len(), 6);
}
#[test]
fn test_p2p_message_peer_discovery() {
let msg = P2PMessage::PeerDiscovery;
assert!(matches!(msg, P2PMessage::PeerDiscovery));
}
#[test]
fn test_p2p_message_heartbeat() {
let msg = P2PMessage::Heartbeat {
peer_id: "peer_1".to_string(),
timestamp: SystemTime::now(),
};
if let P2PMessage::Heartbeat { peer_id, .. } = &msg {
assert_eq!(peer_id, "peer_1");
}
}
#[test]
fn test_p2p_message_model_request() {
let msg = P2PMessage::ModelRequest {
model_id: "gpt2".to_string(),
version: "1.0".to_string(),
requestor_id: "peer_2".to_string(),
};
if let P2PMessage::ModelRequest { model_id, .. } = &msg {
assert_eq!(model_id, "gpt2");
}
}
#[test]
fn test_p2p_message_model_chunk() {
let msg = P2PMessage::ModelChunk {
model_id: "model_a".to_string(),
version: "1.0".to_string(),
chunk_id: 5,
data: vec![0u8; 1024],
checksum: "chunk_hash".to_string(),
};
if let P2PMessage::ModelChunk { chunk_id, data, .. } = &msg {
assert_eq!(*chunk_id, 5);
assert_eq!(data.len(), 1024);
}
}
#[test]
fn test_peer_info_creation() {
let info = PeerInfo {
peer_id: "peer_test".to_string(),
address: "192.168.1.100:9090".parse().expect("failed to parse"),
public_key: "pubkey_123".to_string(),
last_seen: SystemTime::now(),
reputation: 0.7,
capabilities: PeerCapabilities {
can_serve_models: true,
can_compute_diffs: true,
supported_algorithms: vec![],
max_model_size: 1024,
storage_capacity: 2048,
compute_power: 1.0,
},
bandwidth: BandwidthInfo {
upload_mbps: 10.0,
download_mbps: 50.0,
latency_ms: 30,
measured_at: SystemTime::now(),
},
models: vec![],
};
assert_eq!(info.peer_id, "peer_test");
assert!((info.reputation - 0.7).abs() < f64::EPSILON);
}
#[tokio::test]
async fn test_peer_id_generation_unique() {
let temp_dir1 = TempDir::new().expect("failed to create temp dir");
let temp_dir2 = TempDir::new().expect("failed to create temp dir");
let config1 = P2PConfig {
storage_path: temp_dir1.path().to_path_buf(),
listen_address: "127.0.0.1:0".parse().expect("failed to parse"),
..Default::default()
};
let config2 = P2PConfig {
storage_path: temp_dir2.path().to_path_buf(),
listen_address: "127.0.0.1:0".parse().expect("failed to parse"),
..Default::default()
};
let node1 = P2PNode::new(config1).await.expect("async operation failed");
let node2 = P2PNode::new(config2).await.expect("async operation failed");
assert_ne!(node1.peer_id, node2.peer_id);
}
}