use crate::atp::mailbox::{MailboxTransferId, PeerId};
use crate::types::Time;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::net::SocketAddr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub mod coordinator;
pub mod peer_selection;
pub mod piece_tracker;
pub mod quality;
pub mod strategy;
pub use coordinator::SwarmCoordinator;
pub use peer_selection::{PathQuality, PeerQuality, PeerReputation, PeerScore, PeerSelector};
pub use piece_tracker::{PieceMap, PieceStatus, PieceTracker};
pub use quality::QualityMetrics;
pub use strategy::{PieceSelectionStrategy, SwarmStrategy};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct PieceId(pub u64);
impl PieceId {
pub fn new(id: u64) -> Self {
Self(id)
}
pub fn as_u64(self) -> u64 {
self.0
}
}
impl fmt::Display for PieceId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
pub(crate) fn swarm_time_now() -> Time {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
.min(u128::from(u64::MAX)) as u64;
Time::from_nanos(nanos)
}
#[derive(Debug, Clone)]
pub struct SwarmConfig {
pub max_peers: usize,
pub piece_selection_strategy: PieceSelectionStrategy,
pub peer_quality_threshold: f64,
pub max_pieces_per_peer: usize,
pub piece_request_timeout: Duration,
pub quality_assessment_interval: Duration,
pub enable_incentives: bool,
pub reciprocity_ratio: f64,
pub max_transfer_duration: Duration,
}
impl Default for SwarmConfig {
fn default() -> Self {
Self {
max_peers: 8,
piece_selection_strategy: PieceSelectionStrategy::RarestFirst,
peer_quality_threshold: 0.5,
max_pieces_per_peer: 4,
piece_request_timeout: Duration::from_secs(30),
quality_assessment_interval: Duration::from_secs(60),
enable_incentives: true,
reciprocity_ratio: 1.2, max_transfer_duration: Duration::from_secs(3600), }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmPeer {
pub peer_id: PeerId,
pub endpoint: SocketAddr,
pub available_pieces: BTreeSet<PieceId>,
pub quality: PeerQuality,
pub reputation: PeerReputation,
pub last_seen: Time,
pub pending_requests: BTreeSet<PieceId>,
pub capabilities: PeerCapabilities,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerCapabilities {
pub max_concurrent_uploads: usize,
pub preferred_chunk_size: usize,
pub supported_strategies: Vec<PieceSelectionStrategy>,
pub bandwidth_estimate: Option<u64>,
pub supports_repair_symbols: bool,
pub participates_in_incentives: bool,
}
impl Default for PeerCapabilities {
fn default() -> Self {
Self {
max_concurrent_uploads: 4,
preferred_chunk_size: 1024 * 1024, supported_strategies: vec![
PieceSelectionStrategy::RarestFirst,
PieceSelectionStrategy::Sequential,
],
bandwidth_estimate: None,
supports_repair_symbols: false,
participates_in_incentives: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PieceAssignment {
pub peer_id: PeerId,
pub piece_id: PieceId,
pub priority: u32,
pub estimated_completion: Time,
pub retry_count: u32,
pub assigned_at: Time,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmTransferStatus {
pub transfer_id: MailboxTransferId,
pub total_pieces: u64,
pub completed_pieces: u64,
pub pending_pieces: u64,
pub remaining_pieces: u64,
pub active_peers: HashMap<PeerId, SwarmPeer>,
pub download_rate: f64,
pub upload_rate: f64,
pub estimated_completion: Option<Time>,
pub quality_metrics: SwarmQualityMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmQualityMetrics {
pub avg_peer_response_time: Duration,
pub verification_failure_rate: f64,
pub peer_churn_rate: f64,
pub avg_piece_redundancy: f64,
pub incentive_balance_score: f64,
pub health_score: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SwarmEvent {
TransferStarted {
transfer_id: MailboxTransferId,
object_id: String,
total_pieces: u64,
peer_count: usize,
},
PeerJoined {
peer_id: PeerId,
available_pieces: BTreeSet<PieceId>,
capabilities: PeerCapabilities,
},
PeerLeft {
peer_id: PeerId,
reason: String,
contributed_pieces: u64,
},
PieceRequested {
peer_id: PeerId,
piece_id: PieceId,
priority: u32,
},
PieceReceived {
peer_id: PeerId,
piece_id: PieceId,
verification_status: String,
download_time: Duration,
},
PieceVerificationFailed {
peer_id: PeerId,
piece_id: PieceId,
error_details: String,
},
PeerQualityUpdated {
peer_id: PeerId,
old_quality: f64,
new_quality: f64,
reason: String,
},
StrategyAdapted {
old_strategy: PieceSelectionStrategy,
new_strategy: PieceSelectionStrategy,
adaptation_reason: String,
},
TransferCompleted {
transfer_id: MailboxTransferId,
duration: Duration,
total_pieces: u64,
peer_count: usize,
avg_quality: f64,
},
TransferFailed {
transfer_id: MailboxTransferId,
reason: String,
completed_pieces: u64,
total_pieces: u64,
},
}
#[derive(Debug, thiserror::Error)]
pub enum SwarmError {
#[error("No suitable peers available: {details}")]
NoPeersAvailable { details: String },
#[error("Peer communication failed: {peer_id}, {error}")]
PeerCommunicationFailed { peer_id: PeerId, error: String },
#[error("Piece verification failed: {piece_id} from {peer_id}")]
PieceVerificationFailed { piece_id: PieceId, peer_id: PeerId },
#[error("Swarm coordination timeout after {duration:?}")]
CoordinationTimeout { duration: Duration },
#[error("Invalid piece selection strategy: {strategy}")]
InvalidStrategy { strategy: String },
#[error("Peer quality below threshold: {peer_id}, quality {quality}, threshold {threshold}")]
PeerQualityBelowThreshold {
peer_id: PeerId,
quality: f64,
threshold: f64,
},
#[error("Incentive system error: {details}")]
IncentiveError { details: String },
#[error("Invalid swarm configuration: {details}")]
ConfigurationError { details: String },
#[error("Transfer not found: {transfer_id}")]
TransferNotFound { transfer_id: MailboxTransferId },
#[error("Piece not found: {piece_id}")]
PieceNotFound { piece_id: PieceId },
#[error("Invalid piece state for {piece_id}: {current_state}")]
InvalidPieceState {
piece_id: PieceId,
current_state: String,
},
}
pub type SwarmResult<T> = Result<T, SwarmError>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_piece_id_ordering() {
let piece1 = PieceId::new(1);
let piece2 = PieceId::new(2);
let piece3 = PieceId::new(1);
assert!(piece1 < piece2);
assert_eq!(piece1, piece3);
assert_ne!(piece1, piece2);
}
#[test]
fn test_swarm_config_defaults() {
let config = SwarmConfig::default();
assert_eq!(config.max_peers, 8);
assert_eq!(
config.piece_selection_strategy,
PieceSelectionStrategy::RarestFirst
);
assert_eq!(config.peer_quality_threshold, 0.5);
assert!(config.enable_incentives);
}
#[test]
fn test_peer_capabilities_defaults() {
let capabilities = PeerCapabilities::default();
assert_eq!(capabilities.max_concurrent_uploads, 4);
assert_eq!(capabilities.preferred_chunk_size, 1024 * 1024);
assert!(capabilities.participates_in_incentives);
assert!(!capabilities.supports_repair_symbols);
}
#[test]
fn test_piece_assignment_serialization() {
let assignment = PieceAssignment {
peer_id: PeerId::new("test-peer"),
piece_id: PieceId::new(42),
priority: 100,
estimated_completion: swarm_time_now(),
retry_count: 0,
assigned_at: swarm_time_now(),
};
let serialized = serde_json::to_string(&assignment).unwrap();
let deserialized: PieceAssignment = serde_json::from_str(&serialized).unwrap();
assert_eq!(assignment.piece_id, deserialized.piece_id);
assert_eq!(assignment.priority, deserialized.priority);
assert_eq!(assignment.retry_count, deserialized.retry_count);
}
#[test]
fn test_swarm_error_display() {
let error = SwarmError::PeerQualityBelowThreshold {
peer_id: PeerId::new("bad-peer"),
quality: 0.3,
threshold: 0.5,
};
let display = format!("{}", error);
assert!(display.contains("Peer quality below threshold"));
assert!(display.contains("bad-peer"));
assert!(display.contains("0.3"));
assert!(display.contains("0.5"));
}
#[test]
fn test_swarm_event_serialization() {
let event = SwarmEvent::PeerJoined {
peer_id: PeerId::new("new-peer"),
available_pieces: [PieceId::new(1), PieceId::new(2)].iter().copied().collect(),
capabilities: PeerCapabilities::default(),
};
let serialized = serde_json::to_string(&event).unwrap();
let deserialized: SwarmEvent = serde_json::from_str(&serialized).unwrap();
match (event, deserialized) {
(
SwarmEvent::PeerJoined {
available_pieces: p1,
..
},
SwarmEvent::PeerJoined {
available_pieces: p2,
..
},
) => {
assert_eq!(p1, p2);
}
_ => panic!("Event type mismatch after serialization"),
}
}
}