use std::collections::VecDeque;
use std::time::Instant;
use super::device::DeviceId;
#[derive(Debug, Clone)]
pub struct DataFlowMetrics {
pub pcie_generation: u8,
pub pcie_width: u8,
pub pcie_theoretical_gbps: f64,
pub pcie_tx_gbps: f64,
pub pcie_rx_gbps: f64,
pub active_transfers: Vec<Transfer>,
pub completed_transfers: VecDeque<Transfer>,
pub memory_bus_utilization_pct: f64,
pub memory_read_gbps: f64,
pub memory_write_gbps: f64,
pub pinned_memory_used_bytes: u64,
pub pinned_memory_total_bytes: u64,
pub staging_buffer_used_bytes: u64,
pub pcie_tx_history: VecDeque<f64>,
pub pcie_rx_history: VecDeque<f64>,
pub memory_bus_history: VecDeque<f64>,
}
impl DataFlowMetrics {
pub const MAX_HISTORY_POINTS: usize = 60;
pub const MAX_COMPLETED_TRANSFERS: usize = 100;
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn calculate_pcie_bandwidth(generation: u8, width: u8) -> f64 {
let gt_per_lane = match generation {
1 => 2.5, 2 => 5.0, 3 => 8.0, 4 => 16.0, 5 => 32.0, 6 => 64.0, _ => 0.0,
};
let encoding_efficiency = if generation >= 3 { 128.0 / 130.0 } else { 0.8 };
gt_per_lane * width as f64 * encoding_efficiency / 8.0
}
pub fn set_pcie_config(&mut self, generation: u8, width: u8) {
self.pcie_generation = generation;
self.pcie_width = width;
self.pcie_theoretical_gbps = Self::calculate_pcie_bandwidth(generation, width);
}
#[must_use]
pub fn pcie_tx_utilization_pct(&self) -> f64 {
if self.pcie_theoretical_gbps > 0.0 {
(self.pcie_tx_gbps / self.pcie_theoretical_gbps) * 100.0
} else {
0.0
}
}
#[must_use]
pub fn pcie_rx_utilization_pct(&self) -> f64 {
if self.pcie_theoretical_gbps > 0.0 {
(self.pcie_rx_gbps / self.pcie_theoretical_gbps) * 100.0
} else {
0.0
}
}
pub fn start_transfer(&mut self, transfer: Transfer) {
self.active_transfers.push(transfer);
}
pub fn complete_transfer(&mut self, transfer_id: TransferId) {
if let Some(idx) = self.active_transfers.iter().position(|t| t.id == transfer_id) {
let mut transfer = self.active_transfers.remove(idx);
transfer.complete();
self.completed_transfers.push_back(transfer);
if self.completed_transfers.len() > Self::MAX_COMPLETED_TRANSFERS {
self.completed_transfers.pop_front();
}
}
}
pub fn update_history(&mut self) {
self.pcie_tx_history.push_back(self.pcie_tx_gbps);
if self.pcie_tx_history.len() > Self::MAX_HISTORY_POINTS {
self.pcie_tx_history.pop_front();
}
self.pcie_rx_history.push_back(self.pcie_rx_gbps);
if self.pcie_rx_history.len() > Self::MAX_HISTORY_POINTS {
self.pcie_rx_history.pop_front();
}
self.memory_bus_history.push_back(self.memory_bus_utilization_pct);
if self.memory_bus_history.len() > Self::MAX_HISTORY_POINTS {
self.memory_bus_history.pop_front();
}
}
#[must_use]
pub fn bytes_in_flight(&self) -> u64 {
self.active_transfers.iter().map(|t| t.size_bytes.saturating_sub(t.transferred_bytes)).sum()
}
#[must_use]
pub fn pinned_memory_utilization_pct(&self) -> f64 {
if self.pinned_memory_total_bytes > 0 {
(self.pinned_memory_used_bytes as f64 / self.pinned_memory_total_bytes as f64) * 100.0
} else {
0.0
}
}
}
impl Default for DataFlowMetrics {
fn default() -> Self {
Self {
pcie_generation: 4,
pcie_width: 16,
pcie_theoretical_gbps: Self::calculate_pcie_bandwidth(4, 16),
pcie_tx_gbps: 0.0,
pcie_rx_gbps: 0.0,
active_transfers: Vec::new(),
completed_transfers: VecDeque::with_capacity(Self::MAX_COMPLETED_TRANSFERS),
memory_bus_utilization_pct: 0.0,
memory_read_gbps: 0.0,
memory_write_gbps: 0.0,
pinned_memory_used_bytes: 0,
pinned_memory_total_bytes: 0,
staging_buffer_used_bytes: 0,
pcie_tx_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
pcie_rx_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
memory_bus_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TransferId(pub u64);
impl TransferId {
#[must_use]
pub fn new() -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::Relaxed))
}
}
impl Default for TransferId {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct Transfer {
pub id: TransferId,
pub direction: TransferDirection,
pub source: MemoryLocation,
pub destination: MemoryLocation,
pub size_bytes: u64,
pub transferred_bytes: u64,
pub start_time: Instant,
pub end_time: Option<Instant>,
pub status: TransferStatus,
pub label: String,
}
impl Transfer {
#[must_use]
pub fn new(
direction: TransferDirection,
source: MemoryLocation,
destination: MemoryLocation,
size_bytes: u64,
) -> Self {
Self {
id: TransferId::new(),
direction,
source,
destination,
size_bytes,
transferred_bytes: 0,
start_time: Instant::now(),
end_time: None,
status: TransferStatus::Pending,
label: String::new(),
}
}
#[must_use]
pub fn host_to_device(size_bytes: u64, device_id: DeviceId) -> Self {
Self::new(
TransferDirection::HostToDevice,
MemoryLocation::SystemRam,
MemoryLocation::GpuVram(device_id),
size_bytes,
)
}
#[must_use]
pub fn device_to_host(size_bytes: u64, device_id: DeviceId) -> Self {
Self::new(
TransferDirection::DeviceToHost,
MemoryLocation::GpuVram(device_id),
MemoryLocation::SystemRam,
size_bytes,
)
}
#[must_use]
pub fn with_label(mut self, label: impl Into<String>) -> Self {
self.label = label.into();
self
}
#[must_use]
pub fn progress_pct(&self) -> f64 {
if self.size_bytes == 0 {
return 100.0;
}
(self.transferred_bytes as f64 / self.size_bytes as f64) * 100.0
}
#[must_use]
pub fn elapsed(&self) -> std::time::Duration {
match self.end_time {
Some(end) => end.duration_since(self.start_time),
None => self.start_time.elapsed(),
}
}
#[must_use]
pub fn elapsed_ms(&self) -> f64 {
self.elapsed().as_secs_f64() * 1000.0
}
#[must_use]
pub fn bandwidth_gbps(&self) -> f64 {
let elapsed_s = self.elapsed().as_secs_f64();
if elapsed_s > 0.0 {
self.transferred_bytes as f64 / elapsed_s / 1e9
} else {
0.0
}
}
pub fn update_progress(&mut self, bytes_transferred: u64) {
self.transferred_bytes = bytes_transferred;
if self.status == TransferStatus::Pending {
self.status = TransferStatus::InProgress;
}
}
pub fn complete(&mut self) {
self.transferred_bytes = self.size_bytes;
self.status = TransferStatus::Completed;
self.end_time = Some(Instant::now());
}
pub fn fail(&mut self, _reason: &str) {
self.status = TransferStatus::Failed;
self.end_time = Some(Instant::now());
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferDirection {
HostToDevice,
DeviceToHost,
DeviceToDevice,
PeerToPeer,
}
impl std::fmt::Display for TransferDirection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::HostToDevice => write!(f, "H→D"),
Self::DeviceToHost => write!(f, "D→H"),
Self::DeviceToDevice => write!(f, "D→D"),
Self::PeerToPeer => write!(f, "P2P"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MemoryLocation {
SystemRam,
PinnedMemory,
GpuVram(DeviceId),
UnifiedMemory,
}
impl std::fmt::Display for MemoryLocation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SystemRam => write!(f, "RAM"),
Self::PinnedMemory => write!(f, "Pinned"),
Self::GpuVram(id) => write!(f, "VRAM:{}", id),
Self::UnifiedMemory => write!(f, "Unified"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferStatus {
Pending,
InProgress,
Completed,
Failed,
}
#[cfg(test)]
mod tests;