use crate::storage::ChunkStorage;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StorageAnalytics {
pub total_capacity: u64,
pub used_storage: u64,
pub free_storage: u64,
pub utilization_percent: f64,
pub chunk_count: u64,
pub content_count: u64,
pub avg_chunk_size: u64,
pub largest_content: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TransferAnalytics {
pub total_uploaded: u64,
pub total_downloaded: u64,
pub uploaded_today: u64,
pub downloaded_today: u64,
pub uploaded_week: u64,
pub downloaded_week: u64,
pub uploaded_month: u64,
pub downloaded_month: u64,
pub upload_rate: f64,
pub download_rate: f64,
pub peak_upload_rate: f64,
pub peak_download_rate: f64,
pub transfers_today: u64,
pub failed_transfers_today: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct EarningAnalytics {
pub total_earned: u64,
pub earned_today: u64,
pub earned_week: u64,
pub earned_month: u64,
pub daily_rate: f64,
pub proofs_today: u64,
pub successful_proofs_today: u64,
pub avg_reward_per_proof: f64,
pub top_earners: Vec<ContentEarning>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContentEarning {
pub cid: String,
pub title: Option<String>,
pub total_earned: u64,
pub transfer_count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ContentAnalytics {
pub pinned_count: u64,
pub cached_count: u64,
pub most_accessed: Vec<ContentAccess>,
pub recent_access: Vec<ContentAccess>,
pub by_category: HashMap<String, u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContentAccess {
pub cid: String,
pub title: Option<String>,
pub access_count: u64,
pub last_accessed: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct PerformanceAnalytics {
pub uptime_secs: u64,
pub uptime_percent_7d: f64,
pub avg_latency_ms: f64,
pub p50_latency_ms: f64,
pub p95_latency_ms: f64,
pub p99_latency_ms: f64,
pub connected_peers: u64,
pub active_transfers: u64,
pub cpu_usage_percent: f64,
pub memory_usage: u64,
pub disk_io_rate: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct DashboardAnalytics {
pub storage: StorageAnalytics,
pub transfer: TransferAnalytics,
pub earning: EarningAnalytics,
pub content: ContentAnalytics,
pub performance: PerformanceAnalytics,
pub last_updated: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSeriesPoint {
pub timestamp: u64,
pub value: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct HistoricalData {
pub upload_hourly: Vec<TimeSeriesPoint>,
pub download_hourly: Vec<TimeSeriesPoint>,
pub earnings_daily: Vec<TimeSeriesPoint>,
pub storage_daily: Vec<TimeSeriesPoint>,
pub transfers_daily: Vec<TimeSeriesPoint>,
}
pub struct AnalyticsCollector {
storage: Arc<RwLock<ChunkStorage>>,
start_time: Instant,
transfers: RwLock<TransferRecords>,
earnings: RwLock<EarningRecords>,
latency_samples: RwLock<Vec<f64>>,
config: AnalyticsConfig,
}
#[derive(Debug, Default)]
struct TransferRecords {
total_uploaded: u64,
total_downloaded: u64,
recent_uploads: Vec<(Instant, u64)>,
recent_downloads: Vec<(Instant, u64)>,
history: Vec<TransferRecord>,
}
#[derive(Debug, Clone)]
struct TransferRecord {
timestamp: u64,
uploaded: u64,
downloaded: u64,
success: bool,
}
#[derive(Debug, Default)]
#[allow(dead_code)]
struct EarningRecords {
total_earned: u64,
proofs: Vec<(u64, u64)>,
by_content: HashMap<String, u64>,
daily_history: Vec<(u64, u64)>,
}
#[derive(Debug, Clone)]
pub struct AnalyticsConfig {
pub max_latency_samples: usize,
pub max_transfer_records: usize,
pub history_retention_days: u64,
}
impl Default for AnalyticsConfig {
fn default() -> Self {
Self {
max_latency_samples: 1000,
max_transfer_records: 10000,
history_retention_days: 30,
}
}
}
impl AnalyticsCollector {
#[inline]
#[must_use]
pub fn new(storage: Arc<RwLock<ChunkStorage>>, config: AnalyticsConfig) -> Self {
Self {
storage,
start_time: Instant::now(),
transfers: RwLock::new(TransferRecords::default()),
earnings: RwLock::new(EarningRecords::default()),
latency_samples: RwLock::new(Vec::new()),
config,
}
}
#[inline]
pub fn record_upload(&self, bytes: u64, success: bool) {
let mut transfers = self.transfers.write().unwrap();
if success {
transfers.total_uploaded += bytes;
}
transfers.recent_uploads.push((Instant::now(), bytes));
let cutoff = Instant::now() - Duration::from_secs(86400);
transfers.recent_uploads.retain(|(t, _)| *t > cutoff);
transfers.history.push(TransferRecord {
timestamp: current_timestamp(),
uploaded: bytes,
downloaded: 0,
success,
});
if transfers.history.len() > self.config.max_transfer_records {
transfers.history.remove(0);
}
}
#[inline]
pub fn record_download(&self, bytes: u64, success: bool) {
let mut transfers = self.transfers.write().unwrap();
if success {
transfers.total_downloaded += bytes;
}
transfers.recent_downloads.push((Instant::now(), bytes));
let cutoff = Instant::now() - Duration::from_secs(86400);
transfers.recent_downloads.retain(|(t, _)| *t > cutoff);
transfers.history.push(TransferRecord {
timestamp: current_timestamp(),
uploaded: 0,
downloaded: bytes,
success,
});
if transfers.history.len() > self.config.max_transfer_records {
transfers.history.remove(0);
}
}
#[inline]
pub fn record_earning(&self, amount: u64, content_cid: Option<&str>) {
let mut earnings = self.earnings.write().unwrap();
earnings.total_earned += amount;
earnings.proofs.push((current_timestamp(), amount));
if let Some(cid) = content_cid {
*earnings.by_content.entry(cid.to_string()).or_insert(0) += amount;
}
}
#[inline]
pub fn record_latency(&self, latency_ms: f64) {
let mut samples = self.latency_samples.write().unwrap();
samples.push(latency_ms);
if samples.len() > self.config.max_latency_samples {
samples.remove(0);
}
}
#[must_use]
pub fn storage_analytics(&self) -> StorageAnalytics {
let storage = self.storage.read().unwrap();
let stats = storage.stats();
let used = stats.used_bytes;
let total = stats.max_bytes;
let free = stats.available_bytes;
StorageAnalytics {
total_capacity: total,
used_storage: used,
free_storage: free,
utilization_percent: stats.usage_percent,
chunk_count: 0, content_count: stats.pinned_content_count as u64,
avg_chunk_size: 0,
largest_content: 0,
}
}
#[must_use]
pub fn transfer_analytics(&self) -> TransferAnalytics {
let transfers = self.transfers.read().unwrap();
let now = Instant::now();
let day_ago = now - Duration::from_secs(86400);
let week_ago = now - Duration::from_secs(7 * 86400);
let month_ago = now - Duration::from_secs(30 * 86400);
let uploaded_today: u64 = transfers
.recent_uploads
.iter()
.filter(|(t, _)| *t > day_ago)
.map(|(_, b)| b)
.sum();
let downloaded_today: u64 = transfers
.recent_downloads
.iter()
.filter(|(t, _)| *t > day_ago)
.map(|(_, b)| b)
.sum();
let minute_ago = now - Duration::from_secs(60);
let upload_rate: f64 = transfers
.recent_uploads
.iter()
.filter(|(t, _)| *t > minute_ago)
.map(|(_, b)| *b as f64)
.sum::<f64>()
/ 60.0;
let download_rate: f64 = transfers
.recent_downloads
.iter()
.filter(|(t, _)| *t > minute_ago)
.map(|(_, b)| *b as f64)
.sum::<f64>()
/ 60.0;
TransferAnalytics {
total_uploaded: transfers.total_uploaded,
total_downloaded: transfers.total_downloaded,
uploaded_today,
downloaded_today,
uploaded_week: calculate_period_sum(&transfers.recent_uploads, week_ago),
downloaded_week: calculate_period_sum(&transfers.recent_downloads, week_ago),
uploaded_month: calculate_period_sum(&transfers.recent_uploads, month_ago),
downloaded_month: calculate_period_sum(&transfers.recent_downloads, month_ago),
upload_rate,
download_rate,
peak_upload_rate: 0.0, peak_download_rate: 0.0,
transfers_today: transfers
.history
.iter()
.filter(|r| r.success && is_today(r.timestamp))
.count() as u64,
failed_transfers_today: transfers
.history
.iter()
.filter(|r| !r.success && is_today(r.timestamp))
.count() as u64,
}
}
#[must_use]
pub fn earning_analytics(&self) -> EarningAnalytics {
let earnings = self.earnings.read().unwrap();
let now = current_timestamp();
let day_start = now - (now % 86400);
let week_start = now - 7 * 86400;
let month_start = now - 30 * 86400;
let earned_today: u64 = earnings
.proofs
.iter()
.filter(|(t, _)| *t >= day_start)
.map(|(_, a)| a)
.sum();
let earned_week: u64 = earnings
.proofs
.iter()
.filter(|(t, _)| *t >= week_start)
.map(|(_, a)| a)
.sum();
let earned_month: u64 = earnings
.proofs
.iter()
.filter(|(t, _)| *t >= month_start)
.map(|(_, a)| a)
.sum();
let proofs_today = earnings
.proofs
.iter()
.filter(|(t, _)| *t >= day_start)
.count() as u64;
let avg_reward = if proofs_today > 0 {
earned_today as f64 / proofs_today as f64
} else {
0.0
};
let mut top_earners: Vec<_> = earnings
.by_content
.iter()
.map(|(cid, total)| ContentEarning {
cid: cid.clone(),
title: None,
total_earned: *total,
transfer_count: 0,
})
.collect();
top_earners.sort_by(|a, b| b.total_earned.cmp(&a.total_earned));
top_earners.truncate(10);
EarningAnalytics {
total_earned: earnings.total_earned,
earned_today,
earned_week,
earned_month,
daily_rate: if self.start_time.elapsed().as_secs() > 0 {
earned_month as f64 / 30.0
} else {
0.0
},
proofs_today,
successful_proofs_today: proofs_today,
avg_reward_per_proof: avg_reward,
top_earners,
}
}
#[must_use]
pub fn performance_analytics(&self) -> PerformanceAnalytics {
let samples = self.latency_samples.read().unwrap();
let (avg, p50, p95, p99) = if !samples.is_empty() {
let mut sorted: Vec<f64> = samples.clone();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
let avg = sorted.iter().sum::<f64>() / sorted.len() as f64;
let p50 = sorted[sorted.len() / 2];
let p95 = sorted[(sorted.len() * 95) / 100];
let p99 = sorted[(sorted.len() * 99) / 100];
(avg, p50, p95, p99)
} else {
(0.0, 0.0, 0.0, 0.0)
};
PerformanceAnalytics {
uptime_secs: self.start_time.elapsed().as_secs(),
uptime_percent_7d: 100.0, avg_latency_ms: avg,
p50_latency_ms: p50,
p95_latency_ms: p95,
p99_latency_ms: p99,
connected_peers: 0, active_transfers: 0,
cpu_usage_percent: 0.0,
memory_usage: 0,
disk_io_rate: 0,
}
}
#[must_use]
pub fn dashboard_analytics(&self) -> DashboardAnalytics {
DashboardAnalytics {
storage: self.storage_analytics(),
transfer: self.transfer_analytics(),
earning: self.earning_analytics(),
content: ContentAnalytics::default(), performance: self.performance_analytics(),
last_updated: current_timestamp(),
}
}
#[must_use]
pub fn historical_data(&self) -> HistoricalData {
let transfers = self.transfers.read().unwrap();
let earnings = self.earnings.read().unwrap();
let now = current_timestamp();
let mut upload_hourly = Vec::new();
let mut download_hourly = Vec::new();
for h in 0..24 {
let hour_start = now - (now % 3600) - (h * 3600);
let hour_end = hour_start + 3600;
let upload: u64 = transfers
.history
.iter()
.filter(|r| r.timestamp >= hour_start && r.timestamp < hour_end)
.map(|r| r.uploaded)
.sum();
let download: u64 = transfers
.history
.iter()
.filter(|r| r.timestamp >= hour_start && r.timestamp < hour_end)
.map(|r| r.downloaded)
.sum();
upload_hourly.push(TimeSeriesPoint {
timestamp: hour_start * 1000,
value: upload as f64,
});
download_hourly.push(TimeSeriesPoint {
timestamp: hour_start * 1000,
value: download as f64,
});
}
let mut earnings_daily = Vec::new();
for d in 0..30 {
let day_start = now - (now % 86400) - (d * 86400);
let day_end = day_start + 86400;
let earned: u64 = earnings
.proofs
.iter()
.filter(|(t, _)| *t >= day_start && *t < day_end)
.map(|(_, a)| a)
.sum();
earnings_daily.push(TimeSeriesPoint {
timestamp: day_start * 1000,
value: earned as f64,
});
}
HistoricalData {
upload_hourly,
download_hourly,
earnings_daily,
storage_daily: Vec::new(), transfers_daily: Vec::new(),
}
}
}
#[inline]
fn calculate_period_sum(records: &[(Instant, u64)], since: Instant) -> u64 {
records
.iter()
.filter(|(t, _)| *t > since)
.map(|(_, b)| b)
.sum()
}
#[inline]
fn is_today(timestamp: u64) -> bool {
let now = current_timestamp();
let day_start = now - (now % 86400);
timestamp >= day_start
}
#[inline]
fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_storage_analytics() {
let tmp = tempdir().unwrap();
let storage = ChunkStorage::new(tmp.path().to_path_buf(), 1024 * 1024 * 100)
.await
.unwrap();
let collector =
AnalyticsCollector::new(Arc::new(RwLock::new(storage)), AnalyticsConfig::default());
let analytics = collector.storage_analytics();
assert!(analytics.total_capacity > 0);
}
#[tokio::test]
async fn test_transfer_recording() {
let tmp = tempdir().unwrap();
let storage = ChunkStorage::new(tmp.path().to_path_buf(), 1024 * 1024 * 100)
.await
.unwrap();
let collector =
AnalyticsCollector::new(Arc::new(RwLock::new(storage)), AnalyticsConfig::default());
collector.record_upload(1024, true);
collector.record_download(2048, true);
let analytics = collector.transfer_analytics();
assert_eq!(analytics.total_uploaded, 1024);
assert_eq!(analytics.total_downloaded, 2048);
}
#[tokio::test]
async fn test_earning_recording() {
let tmp = tempdir().unwrap();
let storage = ChunkStorage::new(tmp.path().to_path_buf(), 1024 * 1024 * 100)
.await
.unwrap();
let collector =
AnalyticsCollector::new(Arc::new(RwLock::new(storage)), AnalyticsConfig::default());
collector.record_earning(100, Some("QmTest1"));
collector.record_earning(200, Some("QmTest1"));
collector.record_earning(50, Some("QmTest2"));
let analytics = collector.earning_analytics();
assert_eq!(analytics.total_earned, 350);
}
#[tokio::test]
async fn test_latency_percentiles() {
let tmp = tempdir().unwrap();
let storage = ChunkStorage::new(tmp.path().to_path_buf(), 1024 * 1024 * 100)
.await
.unwrap();
let collector =
AnalyticsCollector::new(Arc::new(RwLock::new(storage)), AnalyticsConfig::default());
for i in 1..=100 {
collector.record_latency(i as f64);
}
let analytics = collector.performance_analytics();
assert!(analytics.avg_latency_ms > 0.0);
assert!(analytics.p50_latency_ms > 0.0);
assert!(analytics.p95_latency_ms > analytics.p50_latency_ms);
}
}