use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use libp2p::PeerId;
use cid::Cid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BitswapStats {
pub protocol: ProtocolStats,
pub network: NetworkStats,
pub blocks: BlockStats,
pub peers: PeerStats,
pub sessions: SessionStats,
pub performance: PerformanceStats,
pub errors: ErrorStats,
pub timestamp: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ProtocolStats {
pub messages_sent: u64,
pub messages_received: u64,
pub message_bytes_sent: u64,
pub message_bytes_received: u64,
pub uptime_seconds: u64,
pub restarts: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NetworkStats {
pub connections_established: u64,
pub connections_closed: u64,
pub active_connections: u64,
pub connection_failures: u64,
pub average_connection_duration_seconds: f64,
pub bandwidth_utilization_bps: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BlockStats {
pub blocks_sent: u64,
pub blocks_received: u64,
pub blocks_requested: u64,
pub blocks_failed: u64,
pub block_bytes_sent: u64,
pub block_bytes_received: u64,
pub average_block_size: f64,
pub duplicate_blocks: u64,
pub hit_rate_percentage: f64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PeerStats {
pub total_peers: u64,
pub connected_peers: u64,
pub high_quality_peers: u64,
pub average_peer_quality: f64,
pub peer_timeouts: u64,
pub peer_disconnections: u64,
pub average_blocks_per_peer: f64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SessionStats {
pub total_sessions: u64,
pub active_sessions: u64,
pub completed_sessions: u64,
pub expired_sessions: u64,
pub average_session_duration_seconds: f64,
pub average_blocks_per_session: f64,
pub session_success_rate_percentage: f64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PerformanceStats {
pub average_response_time_ms: f64,
pub p95_response_time_ms: f64,
pub p99_response_time_ms: f64,
pub throughput_blocks_per_second: f64,
pub throughput_bytes_per_second: f64,
pub memory_usage_bytes: u64,
pub cpu_utilization_percentage: f64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ErrorStats {
pub total_errors: u64,
pub network_errors: u64,
pub protocol_errors: u64,
pub timeout_errors: u64,
pub peer_errors: u64,
pub validation_errors: u64,
pub other_errors: u64,
}
pub struct StatsCollector {
stats: BitswapStats,
start_time: Instant,
response_times: Vec<f64>,
max_samples: usize,
peer_stats: HashMap<PeerId, PeerStatsEntry>,
session_stats: HashMap<String, SessionStatsEntry>,
}
#[derive(Debug, Clone)]
struct PeerStatsEntry {
blocks_sent: u64,
blocks_received: u64,
bytes_sent: u64,
bytes_received: u64,
connected_at: Instant,
last_activity: Instant,
quality_score: f64,
}
#[derive(Debug, Clone)]
struct SessionStatsEntry {
started_at: Instant,
blocks_requested: u64,
blocks_received: u64,
completed: bool,
expired: bool,
}
impl StatsCollector {
pub fn new() -> Self {
Self {
stats: BitswapStats::new(),
start_time: Instant::now(),
response_times: Vec::new(),
max_samples: 10000,
peer_stats: HashMap::new(),
session_stats: HashMap::new(),
}
}
pub fn record_message_sent(&mut self, size: usize) {
self.stats.protocol.messages_sent += 1;
self.stats.protocol.message_bytes_sent += size as u64;
}
pub fn record_message_received(&mut self, size: usize) {
self.stats.protocol.messages_received += 1;
self.stats.protocol.message_bytes_received += size as u64;
}
pub fn record_block_sent(&mut self, peer_id: &PeerId, cid: &Cid, size: usize) {
self.stats.blocks.blocks_sent += 1;
self.stats.blocks.block_bytes_sent += size as u64;
let peer_entry = self.peer_stats.entry(*peer_id).or_insert_with(|| PeerStatsEntry {
blocks_sent: 0,
blocks_received: 0,
bytes_sent: 0,
bytes_received: 0,
connected_at: Instant::now(),
last_activity: Instant::now(),
quality_score: 1.0,
});
peer_entry.blocks_sent += 1;
peer_entry.bytes_sent += size as u64;
peer_entry.last_activity = Instant::now();
}
pub fn record_block_received(&mut self, peer_id: &PeerId, cid: &Cid, size: usize, response_time_ms: f64) {
self.stats.blocks.blocks_received += 1;
self.stats.blocks.block_bytes_received += size as u64;
self.response_times.push(response_time_ms);
if self.response_times.len() > self.max_samples {
self.response_times.remove(0);
}
let peer_entry = self.peer_stats.entry(*peer_id).or_insert_with(|| PeerStatsEntry {
blocks_sent: 0,
blocks_received: 0,
bytes_sent: 0,
bytes_received: 0,
connected_at: Instant::now(),
last_activity: Instant::now(),
quality_score: 1.0,
});
peer_entry.blocks_received += 1;
peer_entry.bytes_received += size as u64;
peer_entry.last_activity = Instant::now();
}
pub fn record_block_requested(&mut self, cid: &Cid) {
self.stats.blocks.blocks_requested += 1;
}
pub fn record_block_failed(&mut self, cid: &Cid) {
self.stats.blocks.blocks_failed += 1;
}
pub fn record_duplicate_block(&mut self, cid: &Cid) {
self.stats.blocks.duplicate_blocks += 1;
}
pub fn record_peer_connected(&mut self, peer_id: PeerId) {
self.stats.network.connections_established += 1;
self.stats.network.active_connections += 1;
self.peer_stats.insert(peer_id, PeerStatsEntry {
blocks_sent: 0,
blocks_received: 0,
bytes_sent: 0,
bytes_received: 0,
connected_at: Instant::now(),
last_activity: Instant::now(),
quality_score: 1.0,
});
}
pub fn record_peer_disconnected(&mut self, peer_id: &PeerId) {
self.stats.network.connections_closed += 1;
if self.stats.network.active_connections > 0 {
self.stats.network.active_connections -= 1;
}
self.stats.peers.peer_disconnections += 1;
self.peer_stats.remove(peer_id);
}
pub fn record_session_started(&mut self, session_id: String) {
self.stats.sessions.total_sessions += 1;
self.stats.sessions.active_sessions += 1;
self.session_stats.insert(session_id, SessionStatsEntry {
started_at: Instant::now(),
blocks_requested: 0,
blocks_received: 0,
completed: false,
expired: false,
});
}
pub fn record_session_completed(&mut self, session_id: &str) {
if let Some(entry) = self.session_stats.get_mut(session_id) {
entry.completed = true;
}
self.stats.sessions.completed_sessions += 1;
if self.stats.sessions.active_sessions > 0 {
self.stats.sessions.active_sessions -= 1;
}
}
pub fn record_session_expired(&mut self, session_id: &str) {
if let Some(entry) = self.session_stats.get_mut(session_id) {
entry.expired = true;
}
self.stats.sessions.expired_sessions += 1;
if self.stats.sessions.active_sessions > 0 {
self.stats.sessions.active_sessions -= 1;
}
}
pub fn record_error(&mut self, error_type: ErrorType) {
self.stats.errors.total_errors += 1;
match error_type {
ErrorType::Network => self.stats.errors.network_errors += 1,
ErrorType::Protocol => self.stats.errors.protocol_errors += 1,
ErrorType::Timeout => self.stats.errors.timeout_errors += 1,
ErrorType::Peer => self.stats.errors.peer_errors += 1,
ErrorType::Validation => self.stats.errors.validation_errors += 1,
ErrorType::Other => self.stats.errors.other_errors += 1,
}
}
pub fn update_computed_stats(&mut self) {
self.stats.protocol.uptime_seconds = self.start_time.elapsed().as_secs();
if self.stats.blocks.blocks_received > 0 {
self.stats.blocks.average_block_size =
self.stats.blocks.block_bytes_received as f64 / self.stats.blocks.blocks_received as f64;
}
if self.stats.blocks.blocks_requested > 0 {
self.stats.blocks.hit_rate_percentage =
(self.stats.blocks.blocks_received as f64 / self.stats.blocks.blocks_requested as f64) * 100.0;
}
self.stats.peers.connected_peers = self.peer_stats.len() as u64;
self.stats.peers.total_peers = self.stats.network.connections_established;
if !self.peer_stats.is_empty() {
let total_quality: f64 = self.peer_stats.values().map(|p| p.quality_score).sum();
self.stats.peers.average_peer_quality = total_quality / self.peer_stats.len() as f64;
let high_quality_count = self.peer_stats.values()
.filter(|p| p.quality_score > 0.8)
.count();
self.stats.peers.high_quality_peers = high_quality_count as u64;
let total_blocks: u64 = self.peer_stats.values()
.map(|p| p.blocks_sent + p.blocks_received)
.sum();
self.stats.peers.average_blocks_per_peer = total_blocks as f64 / self.peer_stats.len() as f64;
}
if !self.response_times.is_empty() {
let sum: f64 = self.response_times.iter().sum();
self.stats.performance.average_response_time_ms = sum / self.response_times.len() as f64;
let mut sorted_times = self.response_times.clone();
sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap());
let p95_index = ((sorted_times.len() as f64) * 0.95) as usize;
let p99_index = ((sorted_times.len() as f64) * 0.99) as usize;
if p95_index < sorted_times.len() {
self.stats.performance.p95_response_time_ms = sorted_times[p95_index];
}
if p99_index < sorted_times.len() {
self.stats.performance.p99_response_time_ms = sorted_times[p99_index];
}
}
let uptime_seconds = self.stats.protocol.uptime_seconds as f64;
if uptime_seconds > 0.0 {
self.stats.performance.throughput_blocks_per_second =
self.stats.blocks.blocks_received as f64 / uptime_seconds;
self.stats.performance.throughput_bytes_per_second =
self.stats.blocks.block_bytes_received as f64 / uptime_seconds;
}
if !self.session_stats.is_empty() {
let total_duration: Duration = self.session_stats.values()
.map(|s| s.started_at.elapsed())
.sum();
self.stats.sessions.average_session_duration_seconds =
total_duration.as_secs() as f64 / self.session_stats.len() as f64;
let total_blocks: u64 = self.session_stats.values()
.map(|s| s.blocks_received)
.sum();
self.stats.sessions.average_blocks_per_session =
total_blocks as f64 / self.session_stats.len() as f64;
let completed_sessions = self.session_stats.values()
.filter(|s| s.completed)
.count();
self.stats.sessions.session_success_rate_percentage =
(completed_sessions as f64 / self.session_stats.len() as f64) * 100.0;
}
self.stats.timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
}
pub fn get_stats(&mut self) -> BitswapStats {
self.update_computed_stats();
self.stats.clone()
}
pub fn reset(&mut self) {
self.stats = BitswapStats::new();
self.start_time = Instant::now();
self.response_times.clear();
self.peer_stats.clear();
self.session_stats.clear();
}
pub fn get_peer_stats(&self, peer_id: &PeerId) -> Option<&PeerStatsEntry> {
self.peer_stats.get(peer_id)
}
pub fn get_session_stats(&self, session_id: &str) -> Option<&SessionStatsEntry> {
self.session_stats.get(session_id)
}
}
impl Default for StatsCollector {
fn default() -> Self {
Self::new()
}
}
impl BitswapStats {
pub fn new() -> Self {
Self {
protocol: ProtocolStats::default(),
network: NetworkStats::default(),
blocks: BlockStats::default(),
peers: PeerStats::default(),
sessions: SessionStats::default(),
performance: PerformanceStats::default(),
errors: ErrorStats::default(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
}
}
}
impl Default for BitswapStats {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy)]
pub enum ErrorType {
Network,
Protocol,
Timeout,
Peer,
Validation,
Other,
}
#[cfg(test)]
mod tests {
use super::*;
use libp2p::identity::Keypair;
fn create_test_peer() -> PeerId {
let keypair = Keypair::generate_ed25519();
PeerId::from(keypair.public())
}
#[test]
fn test_stats_collector_creation() {
let collector = StatsCollector::new();
let stats = collector.stats;
assert_eq!(stats.protocol.messages_sent, 0);
assert_eq!(stats.blocks.blocks_sent, 0);
assert_eq!(stats.peers.connected_peers, 0);
}
#[test]
fn test_message_statistics() {
let mut collector = StatsCollector::new();
collector.record_message_sent(100);
collector.record_message_received(200);
let stats = collector.get_stats();
assert_eq!(stats.protocol.messages_sent, 1);
assert_eq!(stats.protocol.messages_received, 1);
assert_eq!(stats.protocol.message_bytes_sent, 100);
assert_eq!(stats.protocol.message_bytes_received, 200);
}
#[test]
fn test_block_statistics() {
let mut collector = StatsCollector::new();
let peer = create_test_peer();
let cid = Cid::default();
collector.record_block_sent(&peer, &cid, 1024);
collector.record_block_received(&peer, &cid, 2048, 100.0);
let stats = collector.get_stats();
assert_eq!(stats.blocks.blocks_sent, 1);
assert_eq!(stats.blocks.blocks_received, 1);
assert_eq!(stats.blocks.block_bytes_sent, 1024);
assert_eq!(stats.blocks.block_bytes_received, 2048);
assert_eq!(stats.blocks.average_block_size, 2048.0);
}
#[test]
fn test_peer_statistics() {
let mut collector = StatsCollector::new();
let peer = create_test_peer();
collector.record_peer_connected(peer);
assert_eq!(collector.get_stats().network.active_connections, 1);
collector.record_peer_disconnected(&peer);
assert_eq!(collector.get_stats().network.active_connections, 0);
assert_eq!(collector.get_stats().network.connections_closed, 1);
}
#[test]
fn test_session_statistics() {
let mut collector = StatsCollector::new();
let session_id = "test-session".to_string();
collector.record_session_started(session_id.clone());
assert_eq!(collector.get_stats().sessions.active_sessions, 1);
collector.record_session_completed(&session_id);
assert_eq!(collector.get_stats().sessions.completed_sessions, 1);
assert_eq!(collector.get_stats().sessions.active_sessions, 0);
}
#[test]
fn test_error_statistics() {
let mut collector = StatsCollector::new();
collector.record_error(ErrorType::Network);
collector.record_error(ErrorType::Protocol);
collector.record_error(ErrorType::Timeout);
let stats = collector.get_stats();
assert_eq!(stats.errors.total_errors, 3);
assert_eq!(stats.errors.network_errors, 1);
assert_eq!(stats.errors.protocol_errors, 1);
assert_eq!(stats.errors.timeout_errors, 1);
}
#[test]
fn test_response_time_percentiles() {
let mut collector = StatsCollector::new();
for i in 1..=100 {
collector.response_times.push(i as f64);
}
let stats = collector.get_stats();
assert_eq!(stats.performance.average_response_time_ms, 50.5);
assert!(stats.performance.p95_response_time_ms >= 94.0 && stats.performance.p95_response_time_ms <= 96.0);
assert!(stats.performance.p99_response_time_ms >= 98.0 && stats.performance.p99_response_time_ms <= 100.0);
}
#[test]
fn test_reset_statistics() {
let mut collector = StatsCollector::new();
collector.record_message_sent(100);
collector.record_block_received(&create_test_peer(), &Cid::default(), 1024, 50.0);
assert!(collector.get_stats().protocol.messages_sent > 0);
collector.reset();
let stats = collector.get_stats();
assert_eq!(stats.protocol.messages_sent, 0);
assert_eq!(stats.blocks.blocks_received, 0);
}
}