Skip to main content

trueno_gpu/monitor/data_flow/
mod.rs

1//! Data Flow Tracking (TRUENO-SPEC-023)
2//!
3//! PCIe bandwidth monitoring and memory transfer tracking for
4//! host-device data movement.
5//!
6//! # Transfer Directions
7//!
8//! - H2D: Host to Device (CPU → GPU)
9//! - D2H: Device to Host (GPU → CPU)
10//! - D2D: Device to Device (GPU → GPU, same device)
11//! - P2P: Peer to Peer (GPU → GPU, NVLink/PCIe)
12//!
13//! # References
14//!
15//! - PCIe 4.0: 31.5 GB/s theoretical (x16)
16//! - PCIe 5.0: 63 GB/s theoretical (x16)
17
18use std::collections::VecDeque;
19use std::time::Instant;
20
21use super::device::DeviceId;
22
23// ============================================================================
24// Data Flow Metrics (TRUENO-SPEC-023 Section 5.2)
25// ============================================================================
26
27/// Data flow and transfer metrics
28#[derive(Debug, Clone)]
29pub struct DataFlowMetrics {
30    // PCIe metrics
31    /// PCIe generation (4, 5, etc.)
32    pub pcie_generation: u8,
33    /// PCIe link width (x1, x4, x8, x16)
34    pub pcie_width: u8,
35    /// Theoretical PCIe bandwidth in GB/s
36    pub pcie_theoretical_gbps: f64,
37    /// Current TX bandwidth in GB/s
38    pub pcie_tx_gbps: f64,
39    /// Current RX bandwidth in GB/s
40    pub pcie_rx_gbps: f64,
41
42    // Active transfers
43    /// Currently active transfers
44    pub active_transfers: Vec<Transfer>,
45    /// Recently completed transfers (last 100)
46    pub completed_transfers: VecDeque<Transfer>,
47
48    // Memory bus
49    /// GPU memory bus utilization percentage
50    pub memory_bus_utilization_pct: f64,
51    /// Memory read bandwidth in GB/s
52    pub memory_read_gbps: f64,
53    /// Memory write bandwidth in GB/s
54    pub memory_write_gbps: f64,
55
56    // Buffer pools
57    /// Pinned memory used in bytes
58    pub pinned_memory_used_bytes: u64,
59    /// Pinned memory total in bytes
60    pub pinned_memory_total_bytes: u64,
61    /// Staging buffer used in bytes
62    pub staging_buffer_used_bytes: u64,
63
64    // History (60-point sparklines)
65    /// PCIe TX history
66    pub pcie_tx_history: VecDeque<f64>,
67    /// PCIe RX history
68    pub pcie_rx_history: VecDeque<f64>,
69    /// Memory bus utilization history
70    pub memory_bus_history: VecDeque<f64>,
71}
72
73impl DataFlowMetrics {
74    /// Maximum history points
75    pub const MAX_HISTORY_POINTS: usize = 60;
76    /// Maximum completed transfers to keep
77    pub const MAX_COMPLETED_TRANSFERS: usize = 100;
78
79    /// Create new data flow metrics
80    #[must_use]
81    pub fn new() -> Self {
82        Self::default()
83    }
84
85    /// Calculate PCIe theoretical bandwidth based on generation and width
86    #[must_use]
87    pub fn calculate_pcie_bandwidth(generation: u8, width: u8) -> f64 {
88        // GT/s per lane by generation
89        let gt_per_lane = match generation {
90            1 => 2.5,  // PCIe 1.0
91            2 => 5.0,  // PCIe 2.0
92            3 => 8.0,  // PCIe 3.0
93            4 => 16.0, // PCIe 4.0
94            5 => 32.0, // PCIe 5.0
95            6 => 64.0, // PCIe 6.0
96            _ => 0.0,
97        };
98
99        // 128b/130b encoding for PCIe 3.0+, 8b/10b for 1.0/2.0
100        let encoding_efficiency = if generation >= 3 { 128.0 / 130.0 } else { 0.8 };
101
102        // GB/s = GT/s * lanes * encoding / 8 bits per byte
103        gt_per_lane * width as f64 * encoding_efficiency / 8.0
104    }
105
106    /// Set PCIe configuration
107    pub fn set_pcie_config(&mut self, generation: u8, width: u8) {
108        self.pcie_generation = generation;
109        self.pcie_width = width;
110        self.pcie_theoretical_gbps = Self::calculate_pcie_bandwidth(generation, width);
111    }
112
113    /// Get PCIe TX utilization percentage
114    #[must_use]
115    pub fn pcie_tx_utilization_pct(&self) -> f64 {
116        if self.pcie_theoretical_gbps > 0.0 {
117            (self.pcie_tx_gbps / self.pcie_theoretical_gbps) * 100.0
118        } else {
119            0.0
120        }
121    }
122
123    /// Get PCIe RX utilization percentage
124    #[must_use]
125    pub fn pcie_rx_utilization_pct(&self) -> f64 {
126        if self.pcie_theoretical_gbps > 0.0 {
127            (self.pcie_rx_gbps / self.pcie_theoretical_gbps) * 100.0
128        } else {
129            0.0
130        }
131    }
132
133    /// Start tracking a new transfer
134    pub fn start_transfer(&mut self, transfer: Transfer) {
135        self.active_transfers.push(transfer);
136    }
137
138    /// Complete a transfer and move to history
139    pub fn complete_transfer(&mut self, transfer_id: TransferId) {
140        if let Some(idx) = self
141            .active_transfers
142            .iter()
143            .position(|t| t.id == transfer_id)
144        {
145            let mut transfer = self.active_transfers.remove(idx);
146            transfer.complete();
147            self.completed_transfers.push_back(transfer);
148            if self.completed_transfers.len() > Self::MAX_COMPLETED_TRANSFERS {
149                self.completed_transfers.pop_front();
150            }
151        }
152    }
153
154    /// Update history sparklines
155    pub fn update_history(&mut self) {
156        self.pcie_tx_history.push_back(self.pcie_tx_gbps);
157        if self.pcie_tx_history.len() > Self::MAX_HISTORY_POINTS {
158            self.pcie_tx_history.pop_front();
159        }
160
161        self.pcie_rx_history.push_back(self.pcie_rx_gbps);
162        if self.pcie_rx_history.len() > Self::MAX_HISTORY_POINTS {
163            self.pcie_rx_history.pop_front();
164        }
165
166        self.memory_bus_history
167            .push_back(self.memory_bus_utilization_pct);
168        if self.memory_bus_history.len() > Self::MAX_HISTORY_POINTS {
169            self.memory_bus_history.pop_front();
170        }
171    }
172
173    /// Get total bytes currently being transferred
174    #[must_use]
175    pub fn bytes_in_flight(&self) -> u64 {
176        self.active_transfers
177            .iter()
178            .map(|t| t.size_bytes.saturating_sub(t.transferred_bytes))
179            .sum()
180    }
181
182    /// Get pinned memory utilization percentage
183    #[must_use]
184    pub fn pinned_memory_utilization_pct(&self) -> f64 {
185        if self.pinned_memory_total_bytes > 0 {
186            (self.pinned_memory_used_bytes as f64 / self.pinned_memory_total_bytes as f64) * 100.0
187        } else {
188            0.0
189        }
190    }
191}
192
193impl Default for DataFlowMetrics {
194    fn default() -> Self {
195        Self {
196            pcie_generation: 4,
197            pcie_width: 16,
198            pcie_theoretical_gbps: Self::calculate_pcie_bandwidth(4, 16),
199            pcie_tx_gbps: 0.0,
200            pcie_rx_gbps: 0.0,
201            active_transfers: Vec::new(),
202            completed_transfers: VecDeque::with_capacity(Self::MAX_COMPLETED_TRANSFERS),
203            memory_bus_utilization_pct: 0.0,
204            memory_read_gbps: 0.0,
205            memory_write_gbps: 0.0,
206            pinned_memory_used_bytes: 0,
207            pinned_memory_total_bytes: 0,
208            staging_buffer_used_bytes: 0,
209            pcie_tx_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
210            pcie_rx_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
211            memory_bus_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
212        }
213    }
214}
215
216// ============================================================================
217// Transfer Tracking
218// ============================================================================
219
220/// Unique transfer identifier
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
222pub struct TransferId(pub u64);
223
224impl TransferId {
225    /// Generate a new unique transfer ID
226    #[must_use]
227    pub fn new() -> Self {
228        use std::sync::atomic::{AtomicU64, Ordering};
229        static COUNTER: AtomicU64 = AtomicU64::new(1);
230        Self(COUNTER.fetch_add(1, Ordering::Relaxed))
231    }
232}
233
234impl Default for TransferId {
235    fn default() -> Self {
236        Self::new()
237    }
238}
239
240/// Memory transfer between host and device
241#[derive(Debug, Clone)]
242pub struct Transfer {
243    /// Unique transfer ID
244    pub id: TransferId,
245    /// Transfer direction
246    pub direction: TransferDirection,
247    /// Source memory location
248    pub source: MemoryLocation,
249    /// Destination memory location
250    pub destination: MemoryLocation,
251    /// Total transfer size in bytes
252    pub size_bytes: u64,
253    /// Bytes transferred so far
254    pub transferred_bytes: u64,
255    /// Transfer start time
256    pub start_time: Instant,
257    /// Transfer end time (if completed)
258    pub end_time: Option<Instant>,
259    /// Transfer status
260    pub status: TransferStatus,
261    /// Human-readable label
262    pub label: String,
263}
264
265impl Transfer {
266    /// Create a new transfer
267    #[must_use]
268    pub fn new(
269        direction: TransferDirection,
270        source: MemoryLocation,
271        destination: MemoryLocation,
272        size_bytes: u64,
273    ) -> Self {
274        Self {
275            id: TransferId::new(),
276            direction,
277            source,
278            destination,
279            size_bytes,
280            transferred_bytes: 0,
281            start_time: Instant::now(),
282            end_time: None,
283            status: TransferStatus::Pending,
284            label: String::new(),
285        }
286    }
287
288    /// Create H2D transfer
289    #[must_use]
290    pub fn host_to_device(size_bytes: u64, device_id: DeviceId) -> Self {
291        Self::new(
292            TransferDirection::HostToDevice,
293            MemoryLocation::SystemRam,
294            MemoryLocation::GpuVram(device_id),
295            size_bytes,
296        )
297    }
298
299    /// Create D2H transfer
300    #[must_use]
301    pub fn device_to_host(size_bytes: u64, device_id: DeviceId) -> Self {
302        Self::new(
303            TransferDirection::DeviceToHost,
304            MemoryLocation::GpuVram(device_id),
305            MemoryLocation::SystemRam,
306            size_bytes,
307        )
308    }
309
310    /// Set label
311    #[must_use]
312    pub fn with_label(mut self, label: impl Into<String>) -> Self {
313        self.label = label.into();
314        self
315    }
316
317    /// Get transfer progress percentage (0.0-100.0)
318    #[must_use]
319    pub fn progress_pct(&self) -> f64 {
320        if self.size_bytes == 0 {
321            return 100.0;
322        }
323        (self.transferred_bytes as f64 / self.size_bytes as f64) * 100.0
324    }
325
326    /// Get elapsed time
327    #[must_use]
328    pub fn elapsed(&self) -> std::time::Duration {
329        match self.end_time {
330            Some(end) => end.duration_since(self.start_time),
331            None => self.start_time.elapsed(),
332        }
333    }
334
335    /// Get elapsed time in milliseconds
336    #[must_use]
337    pub fn elapsed_ms(&self) -> f64 {
338        self.elapsed().as_secs_f64() * 1000.0
339    }
340
341    /// Get current bandwidth in GB/s
342    #[must_use]
343    pub fn bandwidth_gbps(&self) -> f64 {
344        let elapsed_s = self.elapsed().as_secs_f64();
345        if elapsed_s > 0.0 {
346            self.transferred_bytes as f64 / elapsed_s / 1e9
347        } else {
348            0.0
349        }
350    }
351
352    /// Update transfer progress
353    pub fn update_progress(&mut self, bytes_transferred: u64) {
354        self.transferred_bytes = bytes_transferred;
355        if self.status == TransferStatus::Pending {
356            self.status = TransferStatus::InProgress;
357        }
358    }
359
360    /// Mark transfer as complete
361    pub fn complete(&mut self) {
362        self.transferred_bytes = self.size_bytes;
363        self.status = TransferStatus::Completed;
364        self.end_time = Some(Instant::now());
365    }
366
367    /// Mark transfer as failed
368    pub fn fail(&mut self, _reason: &str) {
369        self.status = TransferStatus::Failed;
370        self.end_time = Some(Instant::now());
371    }
372}
373
374/// Transfer direction
375#[derive(Debug, Clone, Copy, PartialEq, Eq)]
376pub enum TransferDirection {
377    /// Host (CPU) to Device (GPU)
378    HostToDevice,
379    /// Device (GPU) to Host (CPU)
380    DeviceToHost,
381    /// Device to Device (same GPU)
382    DeviceToDevice,
383    /// Peer to Peer (GPU to GPU via NVLink/PCIe)
384    PeerToPeer,
385}
386
387impl std::fmt::Display for TransferDirection {
388    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389        match self {
390            Self::HostToDevice => write!(f, "H→D"),
391            Self::DeviceToHost => write!(f, "D→H"),
392            Self::DeviceToDevice => write!(f, "D→D"),
393            Self::PeerToPeer => write!(f, "P2P"),
394        }
395    }
396}
397
398/// Memory location
399#[derive(Debug, Clone, Copy, PartialEq, Eq)]
400pub enum MemoryLocation {
401    /// System RAM
402    SystemRam,
403    /// Pinned (page-locked) memory
404    PinnedMemory,
405    /// GPU VRAM
406    GpuVram(DeviceId),
407    /// Unified/managed memory
408    UnifiedMemory,
409}
410
411impl std::fmt::Display for MemoryLocation {
412    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413        match self {
414            Self::SystemRam => write!(f, "RAM"),
415            Self::PinnedMemory => write!(f, "Pinned"),
416            Self::GpuVram(id) => write!(f, "VRAM:{}", id),
417            Self::UnifiedMemory => write!(f, "Unified"),
418        }
419    }
420}
421
422/// Transfer status
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum TransferStatus {
425    /// Transfer queued
426    Pending,
427    /// Transfer in progress
428    InProgress,
429    /// Transfer completed
430    Completed,
431    /// Transfer failed
432    Failed,
433}
434
435#[cfg(test)]
436mod tests;