1use crate::storage::ChunkStorage;
42use serde::{Deserialize, Serialize};
43use std::collections::HashMap;
44use std::sync::{Arc, RwLock};
45use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
46
47#[derive(Debug, Clone, Serialize, Deserialize, Default)]
49pub struct StorageAnalytics {
50 pub total_capacity: u64,
52 pub used_storage: u64,
54 pub free_storage: u64,
56 pub utilization_percent: f64,
58 pub chunk_count: u64,
60 pub content_count: u64,
62 pub avg_chunk_size: u64,
64 pub largest_content: u64,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize, Default)]
70pub struct TransferAnalytics {
71 pub total_uploaded: u64,
73 pub total_downloaded: u64,
75 pub uploaded_today: u64,
77 pub downloaded_today: u64,
79 pub uploaded_week: u64,
81 pub downloaded_week: u64,
83 pub uploaded_month: u64,
85 pub downloaded_month: u64,
87 pub upload_rate: f64,
89 pub download_rate: f64,
91 pub peak_upload_rate: f64,
93 pub peak_download_rate: f64,
95 pub transfers_today: u64,
97 pub failed_transfers_today: u64,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, Default)]
103pub struct EarningAnalytics {
104 pub total_earned: u64,
106 pub earned_today: u64,
108 pub earned_week: u64,
110 pub earned_month: u64,
112 pub daily_rate: f64,
114 pub proofs_today: u64,
116 pub successful_proofs_today: u64,
118 pub avg_reward_per_proof: f64,
120 pub top_earners: Vec<ContentEarning>,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct ContentEarning {
127 pub cid: String,
129 pub title: Option<String>,
131 pub total_earned: u64,
133 pub transfer_count: u64,
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize, Default)]
139pub struct ContentAnalytics {
140 pub pinned_count: u64,
142 pub cached_count: u64,
144 pub most_accessed: Vec<ContentAccess>,
146 pub recent_access: Vec<ContentAccess>,
148 pub by_category: HashMap<String, u64>,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ContentAccess {
155 pub cid: String,
157 pub title: Option<String>,
159 pub access_count: u64,
161 pub last_accessed: u64,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize, Default)]
167pub struct PerformanceAnalytics {
168 pub uptime_secs: u64,
170 pub uptime_percent_7d: f64,
172 pub avg_latency_ms: f64,
174 pub p50_latency_ms: f64,
176 pub p95_latency_ms: f64,
178 pub p99_latency_ms: f64,
180 pub connected_peers: u64,
182 pub active_transfers: u64,
184 pub cpu_usage_percent: f64,
186 pub memory_usage: u64,
188 pub disk_io_rate: u64,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize, Default)]
194pub struct DashboardAnalytics {
195 pub storage: StorageAnalytics,
197 pub transfer: TransferAnalytics,
199 pub earning: EarningAnalytics,
201 pub content: ContentAnalytics,
203 pub performance: PerformanceAnalytics,
205 pub last_updated: u64,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct TimeSeriesPoint {
212 pub timestamp: u64,
214 pub value: f64,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize, Default)]
220pub struct HistoricalData {
221 pub upload_hourly: Vec<TimeSeriesPoint>,
223 pub download_hourly: Vec<TimeSeriesPoint>,
225 pub earnings_daily: Vec<TimeSeriesPoint>,
227 pub storage_daily: Vec<TimeSeriesPoint>,
229 pub transfers_daily: Vec<TimeSeriesPoint>,
231}
232
233pub struct AnalyticsCollector {
235 storage: Arc<RwLock<ChunkStorage>>,
237 start_time: Instant,
239 transfers: RwLock<TransferRecords>,
241 earnings: RwLock<EarningRecords>,
243 latency_samples: RwLock<Vec<f64>>,
245 config: AnalyticsConfig,
247}
248
249#[derive(Debug, Default)]
251struct TransferRecords {
252 total_uploaded: u64,
254 total_downloaded: u64,
256 recent_uploads: Vec<(Instant, u64)>,
258 recent_downloads: Vec<(Instant, u64)>,
260 history: Vec<TransferRecord>,
262}
263
264#[derive(Debug, Clone)]
266struct TransferRecord {
267 timestamp: u64,
268 uploaded: u64,
269 downloaded: u64,
270 success: bool,
271}
272
273#[derive(Debug, Default)]
275#[allow(dead_code)]
276struct EarningRecords {
277 total_earned: u64,
279 proofs: Vec<(u64, u64)>,
281 by_content: HashMap<String, u64>,
283 daily_history: Vec<(u64, u64)>,
285}
286
287#[derive(Debug, Clone)]
289pub struct AnalyticsConfig {
290 pub max_latency_samples: usize,
292 pub max_transfer_records: usize,
294 pub history_retention_days: u64,
296}
297
298impl Default for AnalyticsConfig {
299 fn default() -> Self {
300 Self {
301 max_latency_samples: 1000,
302 max_transfer_records: 10000,
303 history_retention_days: 30,
304 }
305 }
306}
307
308impl AnalyticsCollector {
309 #[inline]
311 #[must_use]
312 pub fn new(storage: Arc<RwLock<ChunkStorage>>, config: AnalyticsConfig) -> Self {
313 Self {
314 storage,
315 start_time: Instant::now(),
316 transfers: RwLock::new(TransferRecords::default()),
317 earnings: RwLock::new(EarningRecords::default()),
318 latency_samples: RwLock::new(Vec::new()),
319 config,
320 }
321 }
322
323 #[inline]
325 pub fn record_upload(&self, bytes: u64, success: bool) {
326 let mut transfers = self.transfers.write().unwrap();
327 if success {
328 transfers.total_uploaded += bytes;
329 }
330 transfers.recent_uploads.push((Instant::now(), bytes));
331
332 let cutoff = Instant::now() - Duration::from_secs(86400);
334 transfers.recent_uploads.retain(|(t, _)| *t > cutoff);
335
336 transfers.history.push(TransferRecord {
338 timestamp: current_timestamp(),
339 uploaded: bytes,
340 downloaded: 0,
341 success,
342 });
343
344 if transfers.history.len() > self.config.max_transfer_records {
346 transfers.history.remove(0);
347 }
348 }
349
350 #[inline]
352 pub fn record_download(&self, bytes: u64, success: bool) {
353 let mut transfers = self.transfers.write().unwrap();
354 if success {
355 transfers.total_downloaded += bytes;
356 }
357 transfers.recent_downloads.push((Instant::now(), bytes));
358
359 let cutoff = Instant::now() - Duration::from_secs(86400);
361 transfers.recent_downloads.retain(|(t, _)| *t > cutoff);
362
363 transfers.history.push(TransferRecord {
365 timestamp: current_timestamp(),
366 uploaded: 0,
367 downloaded: bytes,
368 success,
369 });
370
371 if transfers.history.len() > self.config.max_transfer_records {
373 transfers.history.remove(0);
374 }
375 }
376
377 #[inline]
379 pub fn record_earning(&self, amount: u64, content_cid: Option<&str>) {
380 let mut earnings = self.earnings.write().unwrap();
381 earnings.total_earned += amount;
382 earnings.proofs.push((current_timestamp(), amount));
383
384 if let Some(cid) = content_cid {
385 *earnings.by_content.entry(cid.to_string()).or_insert(0) += amount;
386 }
387 }
388
389 #[inline]
391 pub fn record_latency(&self, latency_ms: f64) {
392 let mut samples = self.latency_samples.write().unwrap();
393 samples.push(latency_ms);
394
395 if samples.len() > self.config.max_latency_samples {
397 samples.remove(0);
398 }
399 }
400
401 #[must_use]
403 pub fn storage_analytics(&self) -> StorageAnalytics {
404 let storage = self.storage.read().unwrap();
405 let stats = storage.stats();
406
407 let used = stats.used_bytes;
408 let total = stats.max_bytes;
409 let free = stats.available_bytes;
410
411 StorageAnalytics {
412 total_capacity: total,
413 used_storage: used,
414 free_storage: free,
415 utilization_percent: stats.usage_percent,
416 chunk_count: 0, content_count: stats.pinned_content_count as u64,
418 avg_chunk_size: 0,
419 largest_content: 0,
420 }
421 }
422
423 #[must_use]
425 pub fn transfer_analytics(&self) -> TransferAnalytics {
426 let transfers = self.transfers.read().unwrap();
427 let now = Instant::now();
428 let day_ago = now - Duration::from_secs(86400);
429 let week_ago = now - Duration::from_secs(7 * 86400);
430 let month_ago = now - Duration::from_secs(30 * 86400);
431
432 let uploaded_today: u64 = transfers
433 .recent_uploads
434 .iter()
435 .filter(|(t, _)| *t > day_ago)
436 .map(|(_, b)| b)
437 .sum();
438
439 let downloaded_today: u64 = transfers
440 .recent_downloads
441 .iter()
442 .filter(|(t, _)| *t > day_ago)
443 .map(|(_, b)| b)
444 .sum();
445
446 let minute_ago = now - Duration::from_secs(60);
448 let upload_rate: f64 = transfers
449 .recent_uploads
450 .iter()
451 .filter(|(t, _)| *t > minute_ago)
452 .map(|(_, b)| *b as f64)
453 .sum::<f64>()
454 / 60.0;
455
456 let download_rate: f64 = transfers
457 .recent_downloads
458 .iter()
459 .filter(|(t, _)| *t > minute_ago)
460 .map(|(_, b)| *b as f64)
461 .sum::<f64>()
462 / 60.0;
463
464 TransferAnalytics {
465 total_uploaded: transfers.total_uploaded,
466 total_downloaded: transfers.total_downloaded,
467 uploaded_today,
468 downloaded_today,
469 uploaded_week: calculate_period_sum(&transfers.recent_uploads, week_ago),
470 downloaded_week: calculate_period_sum(&transfers.recent_downloads, week_ago),
471 uploaded_month: calculate_period_sum(&transfers.recent_uploads, month_ago),
472 downloaded_month: calculate_period_sum(&transfers.recent_downloads, month_ago),
473 upload_rate,
474 download_rate,
475 peak_upload_rate: 0.0, peak_download_rate: 0.0,
477 transfers_today: transfers
478 .history
479 .iter()
480 .filter(|r| r.success && is_today(r.timestamp))
481 .count() as u64,
482 failed_transfers_today: transfers
483 .history
484 .iter()
485 .filter(|r| !r.success && is_today(r.timestamp))
486 .count() as u64,
487 }
488 }
489
490 #[must_use]
492 pub fn earning_analytics(&self) -> EarningAnalytics {
493 let earnings = self.earnings.read().unwrap();
494 let now = current_timestamp();
495 let day_start = now - (now % 86400);
496 let week_start = now - 7 * 86400;
497 let month_start = now - 30 * 86400;
498
499 let earned_today: u64 = earnings
500 .proofs
501 .iter()
502 .filter(|(t, _)| *t >= day_start)
503 .map(|(_, a)| a)
504 .sum();
505
506 let earned_week: u64 = earnings
507 .proofs
508 .iter()
509 .filter(|(t, _)| *t >= week_start)
510 .map(|(_, a)| a)
511 .sum();
512
513 let earned_month: u64 = earnings
514 .proofs
515 .iter()
516 .filter(|(t, _)| *t >= month_start)
517 .map(|(_, a)| a)
518 .sum();
519
520 let proofs_today = earnings
521 .proofs
522 .iter()
523 .filter(|(t, _)| *t >= day_start)
524 .count() as u64;
525
526 let avg_reward = if proofs_today > 0 {
527 earned_today as f64 / proofs_today as f64
528 } else {
529 0.0
530 };
531
532 let mut top_earners: Vec<_> = earnings
534 .by_content
535 .iter()
536 .map(|(cid, total)| ContentEarning {
537 cid: cid.clone(),
538 title: None,
539 total_earned: *total,
540 transfer_count: 0,
541 })
542 .collect();
543 top_earners.sort_by(|a, b| b.total_earned.cmp(&a.total_earned));
544 top_earners.truncate(10);
545
546 EarningAnalytics {
547 total_earned: earnings.total_earned,
548 earned_today,
549 earned_week,
550 earned_month,
551 daily_rate: if self.start_time.elapsed().as_secs() > 0 {
552 earned_month as f64 / 30.0
553 } else {
554 0.0
555 },
556 proofs_today,
557 successful_proofs_today: proofs_today,
558 avg_reward_per_proof: avg_reward,
559 top_earners,
560 }
561 }
562
563 #[must_use]
565 pub fn performance_analytics(&self) -> PerformanceAnalytics {
566 let samples = self.latency_samples.read().unwrap();
567
568 let (avg, p50, p95, p99) = if !samples.is_empty() {
569 let mut sorted: Vec<f64> = samples.clone();
570 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
571
572 let avg = sorted.iter().sum::<f64>() / sorted.len() as f64;
573 let p50 = sorted[sorted.len() / 2];
574 let p95 = sorted[(sorted.len() * 95) / 100];
575 let p99 = sorted[(sorted.len() * 99) / 100];
576
577 (avg, p50, p95, p99)
578 } else {
579 (0.0, 0.0, 0.0, 0.0)
580 };
581
582 PerformanceAnalytics {
583 uptime_secs: self.start_time.elapsed().as_secs(),
584 uptime_percent_7d: 100.0, avg_latency_ms: avg,
586 p50_latency_ms: p50,
587 p95_latency_ms: p95,
588 p99_latency_ms: p99,
589 connected_peers: 0, active_transfers: 0,
591 cpu_usage_percent: 0.0,
592 memory_usage: 0,
593 disk_io_rate: 0,
594 }
595 }
596
597 #[must_use]
599 pub fn dashboard_analytics(&self) -> DashboardAnalytics {
600 DashboardAnalytics {
601 storage: self.storage_analytics(),
602 transfer: self.transfer_analytics(),
603 earning: self.earning_analytics(),
604 content: ContentAnalytics::default(), performance: self.performance_analytics(),
606 last_updated: current_timestamp(),
607 }
608 }
609
610 #[must_use]
612 pub fn historical_data(&self) -> HistoricalData {
613 let transfers = self.transfers.read().unwrap();
614 let earnings = self.earnings.read().unwrap();
615
616 let now = current_timestamp();
618 let mut upload_hourly = Vec::new();
619 let mut download_hourly = Vec::new();
620
621 for h in 0..24 {
622 let hour_start = now - (now % 3600) - (h * 3600);
623 let hour_end = hour_start + 3600;
624
625 let upload: u64 = transfers
626 .history
627 .iter()
628 .filter(|r| r.timestamp >= hour_start && r.timestamp < hour_end)
629 .map(|r| r.uploaded)
630 .sum();
631
632 let download: u64 = transfers
633 .history
634 .iter()
635 .filter(|r| r.timestamp >= hour_start && r.timestamp < hour_end)
636 .map(|r| r.downloaded)
637 .sum();
638
639 upload_hourly.push(TimeSeriesPoint {
640 timestamp: hour_start * 1000,
641 value: upload as f64,
642 });
643 download_hourly.push(TimeSeriesPoint {
644 timestamp: hour_start * 1000,
645 value: download as f64,
646 });
647 }
648
649 let mut earnings_daily = Vec::new();
651 for d in 0..30 {
652 let day_start = now - (now % 86400) - (d * 86400);
653 let day_end = day_start + 86400;
654
655 let earned: u64 = earnings
656 .proofs
657 .iter()
658 .filter(|(t, _)| *t >= day_start && *t < day_end)
659 .map(|(_, a)| a)
660 .sum();
661
662 earnings_daily.push(TimeSeriesPoint {
663 timestamp: day_start * 1000,
664 value: earned as f64,
665 });
666 }
667
668 HistoricalData {
669 upload_hourly,
670 download_hourly,
671 earnings_daily,
672 storage_daily: Vec::new(), transfers_daily: Vec::new(),
674 }
675 }
676}
677
678#[inline]
680fn calculate_period_sum(records: &[(Instant, u64)], since: Instant) -> u64 {
681 records
682 .iter()
683 .filter(|(t, _)| *t > since)
684 .map(|(_, b)| b)
685 .sum()
686}
687
688#[inline]
690fn is_today(timestamp: u64) -> bool {
691 let now = current_timestamp();
692 let day_start = now - (now % 86400);
693 timestamp >= day_start
694}
695
696#[inline]
698fn current_timestamp() -> u64 {
699 SystemTime::now()
700 .duration_since(UNIX_EPOCH)
701 .map(|d| d.as_secs())
702 .unwrap_or(0)
703}
704
705#[cfg(test)]
706mod tests {
707 use super::*;
708 use tempfile::tempdir;
709
710 #[tokio::test]
711 async fn test_storage_analytics() {
712 let tmp = tempdir().unwrap();
713 let storage = ChunkStorage::new(tmp.path().to_path_buf(), 1024 * 1024 * 100)
714 .await
715 .unwrap();
716 let collector =
717 AnalyticsCollector::new(Arc::new(RwLock::new(storage)), AnalyticsConfig::default());
718
719 let analytics = collector.storage_analytics();
720 assert!(analytics.total_capacity > 0);
721 }
722
723 #[tokio::test]
724 async fn test_transfer_recording() {
725 let tmp = tempdir().unwrap();
726 let storage = ChunkStorage::new(tmp.path().to_path_buf(), 1024 * 1024 * 100)
727 .await
728 .unwrap();
729 let collector =
730 AnalyticsCollector::new(Arc::new(RwLock::new(storage)), AnalyticsConfig::default());
731
732 collector.record_upload(1024, true);
733 collector.record_download(2048, true);
734
735 let analytics = collector.transfer_analytics();
736 assert_eq!(analytics.total_uploaded, 1024);
737 assert_eq!(analytics.total_downloaded, 2048);
738 }
739
740 #[tokio::test]
741 async fn test_earning_recording() {
742 let tmp = tempdir().unwrap();
743 let storage = ChunkStorage::new(tmp.path().to_path_buf(), 1024 * 1024 * 100)
744 .await
745 .unwrap();
746 let collector =
747 AnalyticsCollector::new(Arc::new(RwLock::new(storage)), AnalyticsConfig::default());
748
749 collector.record_earning(100, Some("QmTest1"));
750 collector.record_earning(200, Some("QmTest1"));
751 collector.record_earning(50, Some("QmTest2"));
752
753 let analytics = collector.earning_analytics();
754 assert_eq!(analytics.total_earned, 350);
755 }
756
757 #[tokio::test]
758 async fn test_latency_percentiles() {
759 let tmp = tempdir().unwrap();
760 let storage = ChunkStorage::new(tmp.path().to_path_buf(), 1024 * 1024 * 100)
761 .await
762 .unwrap();
763 let collector =
764 AnalyticsCollector::new(Arc::new(RwLock::new(storage)), AnalyticsConfig::default());
765
766 for i in 1..=100 {
767 collector.record_latency(i as f64);
768 }
769
770 let analytics = collector.performance_analytics();
771 assert!(analytics.avg_latency_ms > 0.0);
772 assert!(analytics.p50_latency_ms > 0.0);
773 assert!(analytics.p95_latency_ms > analytics.p50_latency_ms);
774 }
775}