1use crate::{ConcurrentPeerManager, ConcurrentWantList, Session};
7use std::collections::HashMap;
8use std::fmt;
9use std::time::Duration;
10
11#[derive(Debug, Clone)]
13pub struct DiagnosticReport {
14 pub timestamp: std::time::SystemTime,
16 pub health_status: HealthStatus,
18 pub want_list: WantListDiagnostics,
20 pub peer_manager: PeerManagerDiagnostics,
22 pub sessions: Vec<SessionDiagnostics>,
24 pub issues: Vec<DiagnosticIssue>,
26 pub recommendations: Vec<String>,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum HealthStatus {
33 Healthy,
35 Degraded,
37 Warning,
39 Critical,
41}
42
43#[derive(Debug, Clone)]
45pub struct WantListDiagnostics {
46 pub total_wants: usize,
48 pub priority_distribution: HashMap<String, usize>,
50 pub expired_wants: usize,
52 pub retry_pending: usize,
54 pub avg_queue_time: Duration,
56 pub oldest_want_age: Option<Duration>,
58}
59
60#[derive(Debug, Clone)]
62pub struct PeerManagerDiagnostics {
63 pub total_peers: usize,
65 pub active_peers: usize,
67 pub blacklisted_peers: usize,
69 pub avg_peer_score: f64,
71 pub circuit_breaker_open: usize,
73 pub avg_latency: Option<Duration>,
75 pub total_bandwidth: u64,
77}
78
79#[derive(Debug, Clone)]
81pub struct SessionDiagnostics {
82 pub session_id: u64,
84 pub state: String,
86 pub blocks_requested: usize,
88 pub blocks_received: usize,
90 pub progress_percent: f64,
92 pub elapsed_time: Duration,
94 pub estimated_remaining: Option<Duration>,
96 pub throughput: u64,
98}
99
100#[derive(Debug, Clone)]
102pub struct DiagnosticIssue {
103 pub severity: IssueSeverity,
105 pub category: IssueCategory,
107 pub description: String,
109 pub details: Option<String>,
111}
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
115pub enum IssueSeverity {
116 Info,
118 Warning,
120 Error,
122 Critical,
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128pub enum IssueCategory {
129 WantList,
131 PeerManagement,
133 Session,
135 Performance,
137 Configuration,
139}
140
141pub struct DiagnosticEngine {
143 config: DiagnosticConfig,
145}
146
147#[derive(Debug, Clone)]
149pub struct DiagnosticConfig {
150 pub max_queue_time: Duration,
152 pub min_active_peers: usize,
154 pub min_avg_score: f64,
156 pub max_expired_ratio: f64,
158 pub min_progress_rate: f64,
160}
161
162impl Default for DiagnosticConfig {
163 fn default() -> Self {
164 Self {
165 max_queue_time: Duration::from_secs(60),
166 min_active_peers: 3,
167 min_avg_score: 0.5,
168 max_expired_ratio: 0.1, min_progress_rate: 1.0, }
171 }
172}
173
174impl DiagnosticEngine {
175 pub fn new() -> Self {
177 Self {
178 config: DiagnosticConfig::default(),
179 }
180 }
181
182 pub fn with_config(config: DiagnosticConfig) -> Self {
184 Self { config }
185 }
186
187 pub fn generate_report(
189 &self,
190 want_list: &ConcurrentWantList,
191 peer_manager: &ConcurrentPeerManager,
192 sessions: &[&Session],
193 ) -> DiagnosticReport {
194 let want_diag = self.diagnose_want_list(want_list);
195 let peer_diag = self.diagnose_peer_manager(peer_manager);
196 let session_diags: Vec<_> = sessions.iter().map(|s| self.diagnose_session(s)).collect();
197
198 let mut issues = Vec::new();
199 issues.extend(self.detect_want_list_issues(&want_diag));
200 issues.extend(self.detect_peer_issues(&peer_diag));
201 issues.extend(self.detect_session_issues(&session_diags));
202
203 let health_status = self.determine_health_status(&issues);
204 let recommendations = self.generate_recommendations(&issues, &want_diag, &peer_diag);
205
206 DiagnosticReport {
207 timestamp: std::time::SystemTime::now(),
208 health_status,
209 want_list: want_diag,
210 peer_manager: peer_diag,
211 sessions: session_diags,
212 issues,
213 recommendations,
214 }
215 }
216
217 fn diagnose_want_list(&self, want_list: &ConcurrentWantList) -> WantListDiagnostics {
219 let cids = want_list.cids();
221 let all_wants = want_list.get_batch(&cids);
222 let total_wants = all_wants.len();
223
224 let mut priority_distribution = HashMap::new();
225 let mut expired_count = 0;
226 let mut retry_count = 0;
227 let mut total_age = Duration::ZERO;
228 let mut oldest_age = None;
229
230 let now = std::time::Instant::now();
231
232 for want in &all_wants {
233 let priority_label = match want.priority {
235 p if p >= 900 => "Critical",
236 p if p >= 700 => "Urgent",
237 p if p >= 500 => "High",
238 p if p >= 300 => "Normal",
239 _ => "Low",
240 }
241 .to_string();
242 *priority_distribution.entry(priority_label).or_insert(0) += 1;
243
244 if let Some(deadline) = want.deadline {
246 if now >= deadline {
247 expired_count += 1;
248 }
249 }
250
251 if want.retry_count > 0 {
253 retry_count += 1;
254 }
255
256 let age = now.duration_since(want.created_at);
258 total_age += age;
259 oldest_age = Some(oldest_age.map_or(age, |old: Duration| old.max(age)));
260 }
261
262 let avg_queue_time = if total_wants > 0 {
263 total_age / total_wants as u32
264 } else {
265 Duration::ZERO
266 };
267
268 WantListDiagnostics {
269 total_wants,
270 priority_distribution,
271 expired_wants: expired_count,
272 retry_pending: retry_count,
273 avg_queue_time,
274 oldest_want_age: oldest_age,
275 }
276 }
277
278 fn diagnose_peer_manager(
280 &self,
281 peer_manager: &ConcurrentPeerManager,
282 ) -> PeerManagerDiagnostics {
283 let stats = peer_manager.stats();
284
285 PeerManagerDiagnostics {
286 total_peers: stats.total_peers,
287 active_peers: stats.connected_peers,
288 blacklisted_peers: stats.blacklisted_peers,
289 avg_peer_score: stats.avg_score,
290 circuit_breaker_open: 0, avg_latency: Some(Duration::from_millis(stats.avg_latency_ms as u64)),
292 total_bandwidth: 0, }
294 }
295
296 fn diagnose_session(&self, session: &Session) -> SessionDiagnostics {
298 let stats = session.stats();
299 let state = format!("{:?}", session.state());
300
301 let progress_percent = if stats.total_blocks > 0 {
302 (stats.blocks_received as f64 / stats.total_blocks as f64) * 100.0
303 } else {
304 0.0
305 };
306
307 let elapsed_time = if let Some(start) = stats.started_at {
309 if let Some(end) = stats.completed_at {
310 end.duration_since(start)
311 } else {
312 std::time::Instant::now().duration_since(start)
313 }
314 } else {
315 Duration::ZERO
316 };
317
318 let estimated_remaining = if stats.bytes_transferred > 0
320 && stats.total_blocks > stats.blocks_received
321 {
322 let remaining_blocks = stats.total_blocks - stats.blocks_received;
323 let blocks_per_sec = stats.blocks_received as f64 / elapsed_time.as_secs_f64().max(1.0);
324 if blocks_per_sec > 0.0 {
325 Some(Duration::from_secs_f64(
326 remaining_blocks as f64 / blocks_per_sec,
327 ))
328 } else {
329 None
330 }
331 } else {
332 None
333 };
334
335 let throughput = if elapsed_time.as_secs() > 0 {
336 stats.bytes_transferred / elapsed_time.as_secs()
337 } else {
338 0
339 };
340
341 SessionDiagnostics {
342 session_id: session.id(),
343 state,
344 blocks_requested: stats.total_blocks,
345 blocks_received: stats.blocks_received,
346 progress_percent,
347 elapsed_time,
348 estimated_remaining,
349 throughput,
350 }
351 }
352
353 fn detect_want_list_issues(&self, diag: &WantListDiagnostics) -> Vec<DiagnosticIssue> {
355 let mut issues = Vec::new();
356
357 if diag.avg_queue_time > self.config.max_queue_time {
359 issues.push(DiagnosticIssue {
360 severity: IssueSeverity::Warning,
361 category: IssueCategory::WantList,
362 description: "High average queue time for wants".to_string(),
363 details: Some(format!(
364 "Average queue time is {:?}, exceeds threshold of {:?}",
365 diag.avg_queue_time, self.config.max_queue_time
366 )),
367 });
368 }
369
370 if diag.total_wants > 0 {
372 let expired_ratio = diag.expired_wants as f64 / diag.total_wants as f64;
373 if expired_ratio > self.config.max_expired_ratio {
374 issues.push(DiagnosticIssue {
375 severity: IssueSeverity::Error,
376 category: IssueCategory::WantList,
377 description: "High ratio of expired wants".to_string(),
378 details: Some(format!(
379 "{:.1}% of wants have expired (threshold: {:.1}%)",
380 expired_ratio * 100.0,
381 self.config.max_expired_ratio * 100.0
382 )),
383 });
384 }
385 }
386
387 if diag.retry_pending > diag.total_wants / 2 {
389 issues.push(DiagnosticIssue {
390 severity: IssueSeverity::Warning,
391 category: IssueCategory::Performance,
392 description: "High number of wants awaiting retry".to_string(),
393 details: Some(format!(
394 "{} out of {} wants are awaiting retry",
395 diag.retry_pending, diag.total_wants
396 )),
397 });
398 }
399
400 issues
401 }
402
403 fn detect_peer_issues(&self, diag: &PeerManagerDiagnostics) -> Vec<DiagnosticIssue> {
405 let mut issues = Vec::new();
406
407 if diag.active_peers < self.config.min_active_peers {
409 issues.push(DiagnosticIssue {
410 severity: IssueSeverity::Critical,
411 category: IssueCategory::PeerManagement,
412 description: "Insufficient active peers".to_string(),
413 details: Some(format!(
414 "Only {} active peers (minimum recommended: {})",
415 diag.active_peers, self.config.min_active_peers
416 )),
417 });
418 }
419
420 if diag.avg_peer_score < self.config.min_avg_score {
422 issues.push(DiagnosticIssue {
423 severity: IssueSeverity::Warning,
424 category: IssueCategory::PeerManagement,
425 description: "Low average peer score".to_string(),
426 details: Some(format!(
427 "Average peer score is {:.2} (threshold: {:.2})",
428 diag.avg_peer_score, self.config.min_avg_score
429 )),
430 });
431 }
432
433 if diag.total_peers > 0 {
435 let blacklist_ratio = diag.blacklisted_peers as f64 / diag.total_peers as f64;
436 if blacklist_ratio > 0.3 {
437 issues.push(DiagnosticIssue {
438 severity: IssueSeverity::Warning,
439 category: IssueCategory::PeerManagement,
440 description: "High percentage of blacklisted peers".to_string(),
441 details: Some(format!(
442 "{:.1}% of peers are blacklisted",
443 blacklist_ratio * 100.0
444 )),
445 });
446 }
447 }
448
449 issues
450 }
451
452 fn detect_session_issues(&self, sessions: &[SessionDiagnostics]) -> Vec<DiagnosticIssue> {
454 let mut issues = Vec::new();
455
456 for session in sessions {
457 if session.blocks_requested > 0
459 && session.progress_percent < 10.0
460 && session.elapsed_time > Duration::from_secs(30)
461 {
462 issues.push(DiagnosticIssue {
463 severity: IssueSeverity::Warning,
464 category: IssueCategory::Session,
465 description: format!("Session {} appears stalled", session.session_id),
466 details: Some(format!(
467 "Only {:.1}% progress after {:?}",
468 session.progress_percent, session.elapsed_time
469 )),
470 });
471 }
472
473 if session.elapsed_time > Duration::from_secs(10) && session.throughput < 10_000 {
475 issues.push(DiagnosticIssue {
477 severity: IssueSeverity::Warning,
478 category: IssueCategory::Performance,
479 description: format!("Low throughput for session {}", session.session_id),
480 details: Some(format!(
481 "Current throughput: {} bytes/sec",
482 session.throughput
483 )),
484 });
485 }
486 }
487
488 issues
489 }
490
491 fn determine_health_status(&self, issues: &[DiagnosticIssue]) -> HealthStatus {
493 let mut max_severity = IssueSeverity::Info;
494
495 for issue in issues {
496 if issue.severity > max_severity {
497 max_severity = issue.severity;
498 }
499 }
500
501 match max_severity {
502 IssueSeverity::Info => HealthStatus::Healthy,
503 IssueSeverity::Warning => HealthStatus::Degraded,
504 IssueSeverity::Error => HealthStatus::Warning,
505 IssueSeverity::Critical => HealthStatus::Critical,
506 }
507 }
508
509 fn generate_recommendations(
511 &self,
512 issues: &[DiagnosticIssue],
513 want_diag: &WantListDiagnostics,
514 peer_diag: &PeerManagerDiagnostics,
515 ) -> Vec<String> {
516 let mut recommendations = Vec::new();
517
518 for issue in issues {
520 match issue.category {
521 IssueCategory::PeerManagement
522 if peer_diag.active_peers < self.config.min_active_peers =>
523 {
524 recommendations.push(
525 "Consider connecting to more peers to improve redundancy and performance"
526 .to_string(),
527 );
528 }
529 IssueCategory::WantList if want_diag.expired_wants > 0 => {
530 recommendations.push(
531 "Consider increasing timeout duration or improving network connectivity"
532 .to_string(),
533 );
534 }
535 IssueCategory::Performance => {
536 recommendations.push(
537 "Check network conditions and consider adjusting batch sizes or concurrency limits".to_string()
538 );
539 }
540 _ => {}
541 }
542 }
543
544 if peer_diag.avg_peer_score < 0.7 {
546 recommendations.push(
547 "Peer quality is suboptimal. Consider finding better peers or adjusting scoring weights".to_string()
548 );
549 }
550
551 if want_diag.total_wants > 1000 {
552 recommendations.push(
553 "Large number of pending wants. Consider increasing max_concurrent_blocks or adding more peers".to_string()
554 );
555 }
556
557 recommendations.dedup();
558 recommendations
559 }
560}
561
562impl Default for DiagnosticEngine {
563 fn default() -> Self {
564 Self::new()
565 }
566}
567
568impl fmt::Display for DiagnosticReport {
569 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570 writeln!(f, "=== Transport Diagnostic Report ===")?;
571 writeln!(f, "Generated: {:?}", self.timestamp)?;
572 writeln!(f, "Health Status: {:?}", self.health_status)?;
573 writeln!(f)?;
574
575 writeln!(f, "Want List:")?;
576 writeln!(f, " Total wants: {}", self.want_list.total_wants)?;
577 writeln!(f, " Expired: {}", self.want_list.expired_wants)?;
578 writeln!(f, " Retry pending: {}", self.want_list.retry_pending)?;
579 writeln!(f, " Avg queue time: {:?}", self.want_list.avg_queue_time)?;
580 writeln!(f)?;
581
582 writeln!(f, "Peer Manager:")?;
583 writeln!(f, " Total peers: {}", self.peer_manager.total_peers)?;
584 writeln!(f, " Active: {}", self.peer_manager.active_peers)?;
585 writeln!(f, " Blacklisted: {}", self.peer_manager.blacklisted_peers)?;
586 writeln!(f, " Avg score: {:.2}", self.peer_manager.avg_peer_score)?;
587 writeln!(f)?;
588
589 if !self.sessions.is_empty() {
590 writeln!(f, "Sessions:")?;
591 for session in &self.sessions {
592 writeln!(
593 f,
594 " Session {}: {:.1}% complete ({}/{})",
595 session.session_id,
596 session.progress_percent,
597 session.blocks_received,
598 session.blocks_requested
599 )?;
600 }
601 writeln!(f)?;
602 }
603
604 if !self.issues.is_empty() {
605 writeln!(f, "Issues:")?;
606 for issue in &self.issues {
607 writeln!(
608 f,
609 " [{:?}] {}: {}",
610 issue.severity, issue.category, issue.description
611 )?;
612 if let Some(details) = &issue.details {
613 writeln!(f, " {}", details)?;
614 }
615 }
616 writeln!(f)?;
617 }
618
619 if !self.recommendations.is_empty() {
620 writeln!(f, "Recommendations:")?;
621 for (i, rec) in self.recommendations.iter().enumerate() {
622 writeln!(f, " {}. {}", i + 1, rec)?;
623 }
624 }
625
626 Ok(())
627 }
628}
629
630impl fmt::Display for IssueCategory {
631 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
632 match self {
633 IssueCategory::WantList => write!(f, "WantList"),
634 IssueCategory::PeerManagement => write!(f, "PeerManagement"),
635 IssueCategory::Session => write!(f, "Session"),
636 IssueCategory::Performance => write!(f, "Performance"),
637 IssueCategory::Configuration => write!(f, "Configuration"),
638 }
639 }
640}
641
642#[cfg(test)]
643mod tests {
644 use super::*;
645 use crate::{Priority, WantListConfig};
646 use ipfrs_core::Cid;
647 use std::time::Duration;
648
649 #[test]
650 fn test_diagnostic_engine_creation() {
651 let engine = DiagnosticEngine::new();
652 assert_eq!(engine.config.min_active_peers, 3);
653
654 let custom_config = DiagnosticConfig {
655 min_active_peers: 5,
656 ..Default::default()
657 };
658 let engine = DiagnosticEngine::with_config(custom_config);
659 assert_eq!(engine.config.min_active_peers, 5);
660 }
661
662 #[test]
663 fn test_empty_report() {
664 let engine = DiagnosticEngine::new();
665 let want_list = ConcurrentWantList::new(WantListConfig::default());
666 let peer_manager = ConcurrentPeerManager::new(Default::default());
667
668 let report = engine.generate_report(&want_list, &peer_manager, &[]);
669
670 assert_eq!(report.want_list.total_wants, 0);
671 assert_eq!(report.peer_manager.total_peers, 0);
672 assert_eq!(report.sessions.len(), 0);
673 }
674
675 #[test]
676 fn test_detect_low_peer_count() {
677 let engine = DiagnosticEngine::new();
678 let want_list = ConcurrentWantList::new(WantListConfig::default());
679 let peer_manager = ConcurrentPeerManager::new(Default::default());
680
681 let report = engine.generate_report(&want_list, &peer_manager, &[]);
682
683 assert!(report
685 .issues
686 .iter()
687 .any(|i| matches!(i.category, IssueCategory::PeerManagement)));
688 assert_eq!(report.health_status, HealthStatus::Critical);
689 }
690
691 #[test]
692 fn test_want_list_diagnostics() {
693 let engine = DiagnosticEngine::new();
694 let want_list = ConcurrentWantList::new(WantListConfig::default());
695
696 let cid1 = Cid::default();
698 want_list.add_simple(cid1, Priority::High as i32);
699
700 let peer_manager = ConcurrentPeerManager::new(Default::default());
701 let report = engine.generate_report(&want_list, &peer_manager, &[]);
702
703 assert_eq!(report.want_list.total_wants, 1);
704 assert!(!report.want_list.priority_distribution.is_empty());
705 }
706
707 #[test]
708 fn test_health_status_determination() {
709 let engine = DiagnosticEngine::new();
710
711 let issues = vec![];
713 assert_eq!(
714 engine.determine_health_status(&issues),
715 HealthStatus::Healthy
716 );
717
718 let issues = vec![DiagnosticIssue {
720 severity: IssueSeverity::Warning,
721 category: IssueCategory::Performance,
722 description: "Test".to_string(),
723 details: None,
724 }];
725 assert_eq!(
726 engine.determine_health_status(&issues),
727 HealthStatus::Degraded
728 );
729
730 let issues = vec![DiagnosticIssue {
732 severity: IssueSeverity::Critical,
733 category: IssueCategory::PeerManagement,
734 description: "Test".to_string(),
735 details: None,
736 }];
737 assert_eq!(
738 engine.determine_health_status(&issues),
739 HealthStatus::Critical
740 );
741 }
742
743 #[test]
744 fn test_report_display() {
745 let engine = DiagnosticEngine::new();
746 let want_list = ConcurrentWantList::new(WantListConfig::default());
747 let peer_manager = ConcurrentPeerManager::new(Default::default());
748
749 let report = engine.generate_report(&want_list, &peer_manager, &[]);
750 let display = format!("{}", report);
751
752 assert!(display.contains("Transport Diagnostic Report"));
753 assert!(display.contains("Health Status"));
754 assert!(display.contains("Want List"));
755 assert!(display.contains("Peer Manager"));
756 }
757
758 #[test]
759 fn test_issue_severity_ordering() {
760 assert!(IssueSeverity::Critical > IssueSeverity::Error);
761 assert!(IssueSeverity::Error > IssueSeverity::Warning);
762 assert!(IssueSeverity::Warning > IssueSeverity::Info);
763 }
764
765 #[test]
766 fn test_recommendations_generation() {
767 let engine = DiagnosticEngine::new();
768 let want_list = ConcurrentWantList::new(WantListConfig::default());
769 let peer_manager = ConcurrentPeerManager::new(Default::default());
770
771 let report = engine.generate_report(&want_list, &peer_manager, &[]);
772
773 assert!(!report.recommendations.is_empty());
775 }
776
777 #[test]
778 fn test_diagnostic_config_default() {
779 let config = DiagnosticConfig::default();
780 assert_eq!(config.min_active_peers, 3);
781 assert_eq!(config.max_queue_time, Duration::from_secs(60));
782 assert_eq!(config.min_avg_score, 0.5);
783 }
784}