use std::collections::VecDeque;
use std::time::Instant;
use super::device::DeviceId;
#[derive(Debug, Clone)]
pub struct DataFlowMetrics {
pub pcie_generation: u8,
pub pcie_width: u8,
pub pcie_theoretical_gbps: f64,
pub pcie_tx_gbps: f64,
pub pcie_rx_gbps: f64,
pub active_transfers: Vec<Transfer>,
pub completed_transfers: VecDeque<Transfer>,
pub memory_bus_utilization_pct: f64,
pub memory_read_gbps: f64,
pub memory_write_gbps: f64,
pub pinned_memory_used_bytes: u64,
pub pinned_memory_total_bytes: u64,
pub staging_buffer_used_bytes: u64,
pub pcie_tx_history: VecDeque<f64>,
pub pcie_rx_history: VecDeque<f64>,
pub memory_bus_history: VecDeque<f64>,
}
impl DataFlowMetrics {
pub const MAX_HISTORY_POINTS: usize = 60;
pub const MAX_COMPLETED_TRANSFERS: usize = 100;
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn calculate_pcie_bandwidth(generation: u8, width: u8) -> f64 {
let gt_per_lane = match generation {
1 => 2.5, 2 => 5.0, 3 => 8.0, 4 => 16.0, 5 => 32.0, 6 => 64.0, _ => 0.0,
};
let encoding_efficiency = if generation >= 3 { 128.0 / 130.0 } else { 0.8 };
gt_per_lane * width as f64 * encoding_efficiency / 8.0
}
pub fn set_pcie_config(&mut self, generation: u8, width: u8) {
self.pcie_generation = generation;
self.pcie_width = width;
self.pcie_theoretical_gbps = Self::calculate_pcie_bandwidth(generation, width);
}
#[must_use]
pub fn pcie_tx_utilization_pct(&self) -> f64 {
if self.pcie_theoretical_gbps > 0.0 {
(self.pcie_tx_gbps / self.pcie_theoretical_gbps) * 100.0
} else {
0.0
}
}
#[must_use]
pub fn pcie_rx_utilization_pct(&self) -> f64 {
if self.pcie_theoretical_gbps > 0.0 {
(self.pcie_rx_gbps / self.pcie_theoretical_gbps) * 100.0
} else {
0.0
}
}
pub fn start_transfer(&mut self, transfer: Transfer) {
self.active_transfers.push(transfer);
}
pub fn complete_transfer(&mut self, transfer_id: TransferId) {
if let Some(idx) = self
.active_transfers
.iter()
.position(|t| t.id == transfer_id)
{
let mut transfer = self.active_transfers.remove(idx);
transfer.complete();
self.completed_transfers.push_back(transfer);
if self.completed_transfers.len() > Self::MAX_COMPLETED_TRANSFERS {
self.completed_transfers.pop_front();
}
}
}
pub fn update_history(&mut self) {
self.pcie_tx_history.push_back(self.pcie_tx_gbps);
if self.pcie_tx_history.len() > Self::MAX_HISTORY_POINTS {
self.pcie_tx_history.pop_front();
}
self.pcie_rx_history.push_back(self.pcie_rx_gbps);
if self.pcie_rx_history.len() > Self::MAX_HISTORY_POINTS {
self.pcie_rx_history.pop_front();
}
self.memory_bus_history
.push_back(self.memory_bus_utilization_pct);
if self.memory_bus_history.len() > Self::MAX_HISTORY_POINTS {
self.memory_bus_history.pop_front();
}
}
#[must_use]
pub fn bytes_in_flight(&self) -> u64 {
self.active_transfers
.iter()
.map(|t| t.size_bytes.saturating_sub(t.transferred_bytes))
.sum()
}
#[must_use]
pub fn pinned_memory_utilization_pct(&self) -> f64 {
if self.pinned_memory_total_bytes > 0 {
(self.pinned_memory_used_bytes as f64 / self.pinned_memory_total_bytes as f64) * 100.0
} else {
0.0
}
}
}
impl Default for DataFlowMetrics {
fn default() -> Self {
Self {
pcie_generation: 4,
pcie_width: 16,
pcie_theoretical_gbps: Self::calculate_pcie_bandwidth(4, 16),
pcie_tx_gbps: 0.0,
pcie_rx_gbps: 0.0,
active_transfers: Vec::new(),
completed_transfers: VecDeque::with_capacity(Self::MAX_COMPLETED_TRANSFERS),
memory_bus_utilization_pct: 0.0,
memory_read_gbps: 0.0,
memory_write_gbps: 0.0,
pinned_memory_used_bytes: 0,
pinned_memory_total_bytes: 0,
staging_buffer_used_bytes: 0,
pcie_tx_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
pcie_rx_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
memory_bus_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TransferId(pub u64);
impl TransferId {
#[must_use]
pub fn new() -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::Relaxed))
}
}
impl Default for TransferId {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct Transfer {
pub id: TransferId,
pub direction: TransferDirection,
pub source: MemoryLocation,
pub destination: MemoryLocation,
pub size_bytes: u64,
pub transferred_bytes: u64,
pub start_time: Instant,
pub end_time: Option<Instant>,
pub status: TransferStatus,
pub label: String,
}
impl Transfer {
#[must_use]
pub fn new(
direction: TransferDirection,
source: MemoryLocation,
destination: MemoryLocation,
size_bytes: u64,
) -> Self {
Self {
id: TransferId::new(),
direction,
source,
destination,
size_bytes,
transferred_bytes: 0,
start_time: Instant::now(),
end_time: None,
status: TransferStatus::Pending,
label: String::new(),
}
}
#[must_use]
pub fn host_to_device(size_bytes: u64, device_id: DeviceId) -> Self {
Self::new(
TransferDirection::HostToDevice,
MemoryLocation::SystemRam,
MemoryLocation::GpuVram(device_id),
size_bytes,
)
}
#[must_use]
pub fn device_to_host(size_bytes: u64, device_id: DeviceId) -> Self {
Self::new(
TransferDirection::DeviceToHost,
MemoryLocation::GpuVram(device_id),
MemoryLocation::SystemRam,
size_bytes,
)
}
#[must_use]
pub fn with_label(mut self, label: impl Into<String>) -> Self {
self.label = label.into();
self
}
#[must_use]
pub fn progress_pct(&self) -> f64 {
if self.size_bytes == 0 {
return 100.0;
}
(self.transferred_bytes as f64 / self.size_bytes as f64) * 100.0
}
#[must_use]
pub fn elapsed(&self) -> std::time::Duration {
match self.end_time {
Some(end) => end.duration_since(self.start_time),
None => self.start_time.elapsed(),
}
}
#[must_use]
pub fn elapsed_ms(&self) -> f64 {
self.elapsed().as_secs_f64() * 1000.0
}
#[must_use]
pub fn bandwidth_gbps(&self) -> f64 {
let elapsed_s = self.elapsed().as_secs_f64();
if elapsed_s > 0.0 {
self.transferred_bytes as f64 / elapsed_s / 1e9
} else {
0.0
}
}
pub fn update_progress(&mut self, bytes_transferred: u64) {
self.transferred_bytes = bytes_transferred;
if self.status == TransferStatus::Pending {
self.status = TransferStatus::InProgress;
}
}
pub fn complete(&mut self) {
self.transferred_bytes = self.size_bytes;
self.status = TransferStatus::Completed;
self.end_time = Some(Instant::now());
}
pub fn fail(&mut self, _reason: &str) {
self.status = TransferStatus::Failed;
self.end_time = Some(Instant::now());
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferDirection {
HostToDevice,
DeviceToHost,
DeviceToDevice,
PeerToPeer,
}
impl std::fmt::Display for TransferDirection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::HostToDevice => write!(f, "H→D"),
Self::DeviceToHost => write!(f, "D→H"),
Self::DeviceToDevice => write!(f, "D→D"),
Self::PeerToPeer => write!(f, "P2P"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MemoryLocation {
SystemRam,
PinnedMemory,
GpuVram(DeviceId),
UnifiedMemory,
}
impl std::fmt::Display for MemoryLocation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SystemRam => write!(f, "RAM"),
Self::PinnedMemory => write!(f, "Pinned"),
Self::GpuVram(id) => write!(f, "VRAM:{}", id),
Self::UnifiedMemory => write!(f, "Unified"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferStatus {
Pending,
InProgress,
Completed,
Failed,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn h031_pcie_bandwidth_gen4_x16() {
let bw = DataFlowMetrics::calculate_pcie_bandwidth(4, 16);
assert!((bw - 31.5).abs() < 0.5, "Expected ~31.5 GB/s, got {}", bw);
}
#[test]
fn h031_pcie_bandwidth_gen5_x16() {
let bw = DataFlowMetrics::calculate_pcie_bandwidth(5, 16);
assert!((bw - 63.0).abs() < 1.0, "Expected ~63 GB/s, got {}", bw);
}
#[test]
fn h031_pcie_bandwidth_gen3_x16() {
let bw = DataFlowMetrics::calculate_pcie_bandwidth(3, 16);
assert!((bw - 15.75).abs() < 0.5, "Expected ~15.75 GB/s, got {}", bw);
}
#[test]
fn h031_pcie_bandwidth_gen4_x8() {
let bw = DataFlowMetrics::calculate_pcie_bandwidth(4, 8);
assert!((bw - 15.75).abs() < 0.5, "Expected ~15.75 GB/s, got {}", bw);
}
#[test]
fn h031_pcie_bandwidth_unknown_gen() {
let bw = DataFlowMetrics::calculate_pcie_bandwidth(99, 16);
assert_eq!(bw, 0.0);
}
#[test]
fn h032_data_flow_default() {
let metrics = DataFlowMetrics::default();
assert_eq!(metrics.pcie_generation, 4);
assert_eq!(metrics.pcie_width, 16);
assert!(metrics.pcie_theoretical_gbps > 30.0);
}
#[test]
fn h032_data_flow_set_pcie_config() {
let mut metrics = DataFlowMetrics::new();
metrics.set_pcie_config(5, 16);
assert_eq!(metrics.pcie_generation, 5);
assert_eq!(metrics.pcie_width, 16);
assert!(metrics.pcie_theoretical_gbps > 60.0);
}
#[test]
fn h032_data_flow_tx_utilization() {
let mut metrics = DataFlowMetrics::new();
metrics.pcie_theoretical_gbps = 31.5;
metrics.pcie_tx_gbps = 15.75;
assert!((metrics.pcie_tx_utilization_pct() - 50.0).abs() < 1.0);
}
#[test]
fn h032_data_flow_rx_utilization() {
let mut metrics = DataFlowMetrics::new();
metrics.pcie_theoretical_gbps = 31.5;
metrics.pcie_rx_gbps = 7.875;
assert!((metrics.pcie_rx_utilization_pct() - 25.0).abs() < 1.0);
}
#[test]
fn h032_data_flow_utilization_zero_theoretical() {
let mut metrics = DataFlowMetrics::new();
metrics.pcie_theoretical_gbps = 0.0;
assert_eq!(metrics.pcie_tx_utilization_pct(), 0.0);
assert_eq!(metrics.pcie_rx_utilization_pct(), 0.0);
}
#[test]
fn h033_transfer_id_unique() {
let id1 = TransferId::new();
let id2 = TransferId::new();
assert_ne!(id1, id2);
}
#[test]
fn h033_transfer_new() {
let transfer = Transfer::new(
TransferDirection::HostToDevice,
MemoryLocation::SystemRam,
MemoryLocation::GpuVram(DeviceId::nvidia(0)),
1024 * 1024,
);
assert_eq!(transfer.direction, TransferDirection::HostToDevice);
assert_eq!(transfer.size_bytes, 1024 * 1024);
assert_eq!(transfer.transferred_bytes, 0);
assert_eq!(transfer.status, TransferStatus::Pending);
}
#[test]
fn h033_transfer_h2d_helper() {
let transfer = Transfer::host_to_device(1024 * 1024, DeviceId::nvidia(0));
assert_eq!(transfer.direction, TransferDirection::HostToDevice);
assert_eq!(transfer.source, MemoryLocation::SystemRam);
}
#[test]
fn h033_transfer_d2h_helper() {
let transfer = Transfer::device_to_host(1024 * 1024, DeviceId::nvidia(0));
assert_eq!(transfer.direction, TransferDirection::DeviceToHost);
assert_eq!(transfer.destination, MemoryLocation::SystemRam);
}
#[test]
fn h033_transfer_with_label() {
let transfer = Transfer::host_to_device(1024, DeviceId::nvidia(0)).with_label("tensor_a");
assert_eq!(transfer.label, "tensor_a");
}
#[test]
fn h033_transfer_progress() {
let mut transfer = Transfer::host_to_device(1000, DeviceId::nvidia(0));
assert_eq!(transfer.progress_pct(), 0.0);
transfer.update_progress(500);
assert!((transfer.progress_pct() - 50.0).abs() < 0.01);
assert_eq!(transfer.status, TransferStatus::InProgress);
transfer.complete();
assert_eq!(transfer.progress_pct(), 100.0);
assert_eq!(transfer.status, TransferStatus::Completed);
}
#[test]
fn h033_transfer_progress_zero_size() {
let transfer = Transfer::host_to_device(0, DeviceId::nvidia(0));
assert_eq!(transfer.progress_pct(), 100.0);
}
#[test]
fn h033_transfer_elapsed() {
let transfer = Transfer::host_to_device(1024, DeviceId::nvidia(0));
std::thread::sleep(std::time::Duration::from_millis(10));
assert!(transfer.elapsed_ms() >= 10.0);
}
#[test]
fn h033_transfer_bandwidth() {
let mut transfer = Transfer::host_to_device(1_000_000_000, DeviceId::nvidia(0)); std::thread::sleep(std::time::Duration::from_millis(100));
transfer.transferred_bytes = 100_000_000;
let bw = transfer.bandwidth_gbps();
assert!(bw > 0.5 && bw < 2.0, "Bandwidth {} GB/s unexpected", bw);
}
#[test]
fn h034_transfer_direction_display() {
assert_eq!(format!("{}", TransferDirection::HostToDevice), "H→D");
assert_eq!(format!("{}", TransferDirection::DeviceToHost), "D→H");
assert_eq!(format!("{}", TransferDirection::DeviceToDevice), "D→D");
assert_eq!(format!("{}", TransferDirection::PeerToPeer), "P2P");
}
#[test]
fn h035_memory_location_display() {
assert_eq!(format!("{}", MemoryLocation::SystemRam), "RAM");
assert_eq!(format!("{}", MemoryLocation::PinnedMemory), "Pinned");
assert_eq!(format!("{}", MemoryLocation::UnifiedMemory), "Unified");
assert!(format!("{}", MemoryLocation::GpuVram(DeviceId::nvidia(0))).contains("NVIDIA"));
}
#[test]
fn h036_history_update() {
let mut metrics = DataFlowMetrics::new();
for i in 0..100 {
metrics.pcie_tx_gbps = i as f64;
metrics.pcie_rx_gbps = i as f64 * 0.5;
metrics.memory_bus_utilization_pct = i as f64;
metrics.update_history();
}
assert_eq!(
metrics.pcie_tx_history.len(),
DataFlowMetrics::MAX_HISTORY_POINTS
);
assert_eq!(
metrics.pcie_rx_history.len(),
DataFlowMetrics::MAX_HISTORY_POINTS
);
assert_eq!(
metrics.memory_bus_history.len(),
DataFlowMetrics::MAX_HISTORY_POINTS
);
}
#[test]
fn h037_start_and_complete_transfer() {
let mut metrics = DataFlowMetrics::new();
let transfer = Transfer::host_to_device(1024, DeviceId::nvidia(0));
let id = transfer.id;
metrics.start_transfer(transfer);
assert_eq!(metrics.active_transfers.len(), 1);
metrics.complete_transfer(id);
assert_eq!(metrics.active_transfers.len(), 0);
assert_eq!(metrics.completed_transfers.len(), 1);
}
#[test]
fn h037_completed_transfer_limit() {
let mut metrics = DataFlowMetrics::new();
for _ in 0..150 {
let transfer = Transfer::host_to_device(1024, DeviceId::nvidia(0));
let id = transfer.id;
metrics.start_transfer(transfer);
metrics.complete_transfer(id);
}
assert_eq!(
metrics.completed_transfers.len(),
DataFlowMetrics::MAX_COMPLETED_TRANSFERS
);
}
#[test]
fn h037_bytes_in_flight() {
let mut metrics = DataFlowMetrics::new();
let mut t1 = Transfer::host_to_device(1000, DeviceId::nvidia(0));
t1.transferred_bytes = 400;
let mut t2 = Transfer::host_to_device(2000, DeviceId::nvidia(0));
t2.transferred_bytes = 500;
metrics.start_transfer(t1);
metrics.start_transfer(t2);
assert_eq!(metrics.bytes_in_flight(), 2100);
}
#[test]
fn h038_pinned_memory_utilization() {
let mut metrics = DataFlowMetrics::new();
metrics.pinned_memory_used_bytes = 512 * 1024 * 1024; metrics.pinned_memory_total_bytes = 1024 * 1024 * 1024;
assert!((metrics.pinned_memory_utilization_pct() - 50.0).abs() < 0.01);
}
#[test]
fn h038_pinned_memory_utilization_zero_total() {
let metrics = DataFlowMetrics::new();
assert_eq!(metrics.pinned_memory_utilization_pct(), 0.0);
}
}