trueno_gpu/monitor/data_flow/
mod.rs1use std::collections::VecDeque;
19use std::time::Instant;
20
21use super::device::DeviceId;
22
23#[derive(Debug, Clone)]
29pub struct DataFlowMetrics {
30 pub pcie_generation: u8,
33 pub pcie_width: u8,
35 pub pcie_theoretical_gbps: f64,
37 pub pcie_tx_gbps: f64,
39 pub pcie_rx_gbps: f64,
41
42 pub active_transfers: Vec<Transfer>,
45 pub completed_transfers: VecDeque<Transfer>,
47
48 pub memory_bus_utilization_pct: f64,
51 pub memory_read_gbps: f64,
53 pub memory_write_gbps: f64,
55
56 pub pinned_memory_used_bytes: u64,
59 pub pinned_memory_total_bytes: u64,
61 pub staging_buffer_used_bytes: u64,
63
64 pub pcie_tx_history: VecDeque<f64>,
67 pub pcie_rx_history: VecDeque<f64>,
69 pub memory_bus_history: VecDeque<f64>,
71}
72
73impl DataFlowMetrics {
74 pub const MAX_HISTORY_POINTS: usize = 60;
76 pub const MAX_COMPLETED_TRANSFERS: usize = 100;
78
79 #[must_use]
81 pub fn new() -> Self {
82 Self::default()
83 }
84
85 #[must_use]
87 pub fn calculate_pcie_bandwidth(generation: u8, width: u8) -> f64 {
88 let gt_per_lane = match generation {
90 1 => 2.5, 2 => 5.0, 3 => 8.0, 4 => 16.0, 5 => 32.0, 6 => 64.0, _ => 0.0,
97 };
98
99 let encoding_efficiency = if generation >= 3 { 128.0 / 130.0 } else { 0.8 };
101
102 gt_per_lane * width as f64 * encoding_efficiency / 8.0
104 }
105
106 pub fn set_pcie_config(&mut self, generation: u8, width: u8) {
108 self.pcie_generation = generation;
109 self.pcie_width = width;
110 self.pcie_theoretical_gbps = Self::calculate_pcie_bandwidth(generation, width);
111 }
112
113 #[must_use]
115 pub fn pcie_tx_utilization_pct(&self) -> f64 {
116 if self.pcie_theoretical_gbps > 0.0 {
117 (self.pcie_tx_gbps / self.pcie_theoretical_gbps) * 100.0
118 } else {
119 0.0
120 }
121 }
122
123 #[must_use]
125 pub fn pcie_rx_utilization_pct(&self) -> f64 {
126 if self.pcie_theoretical_gbps > 0.0 {
127 (self.pcie_rx_gbps / self.pcie_theoretical_gbps) * 100.0
128 } else {
129 0.0
130 }
131 }
132
133 pub fn start_transfer(&mut self, transfer: Transfer) {
135 self.active_transfers.push(transfer);
136 }
137
138 pub fn complete_transfer(&mut self, transfer_id: TransferId) {
140 if let Some(idx) = self
141 .active_transfers
142 .iter()
143 .position(|t| t.id == transfer_id)
144 {
145 let mut transfer = self.active_transfers.remove(idx);
146 transfer.complete();
147 self.completed_transfers.push_back(transfer);
148 if self.completed_transfers.len() > Self::MAX_COMPLETED_TRANSFERS {
149 self.completed_transfers.pop_front();
150 }
151 }
152 }
153
154 pub fn update_history(&mut self) {
156 self.pcie_tx_history.push_back(self.pcie_tx_gbps);
157 if self.pcie_tx_history.len() > Self::MAX_HISTORY_POINTS {
158 self.pcie_tx_history.pop_front();
159 }
160
161 self.pcie_rx_history.push_back(self.pcie_rx_gbps);
162 if self.pcie_rx_history.len() > Self::MAX_HISTORY_POINTS {
163 self.pcie_rx_history.pop_front();
164 }
165
166 self.memory_bus_history
167 .push_back(self.memory_bus_utilization_pct);
168 if self.memory_bus_history.len() > Self::MAX_HISTORY_POINTS {
169 self.memory_bus_history.pop_front();
170 }
171 }
172
173 #[must_use]
175 pub fn bytes_in_flight(&self) -> u64 {
176 self.active_transfers
177 .iter()
178 .map(|t| t.size_bytes.saturating_sub(t.transferred_bytes))
179 .sum()
180 }
181
182 #[must_use]
184 pub fn pinned_memory_utilization_pct(&self) -> f64 {
185 if self.pinned_memory_total_bytes > 0 {
186 (self.pinned_memory_used_bytes as f64 / self.pinned_memory_total_bytes as f64) * 100.0
187 } else {
188 0.0
189 }
190 }
191}
192
193impl Default for DataFlowMetrics {
194 fn default() -> Self {
195 Self {
196 pcie_generation: 4,
197 pcie_width: 16,
198 pcie_theoretical_gbps: Self::calculate_pcie_bandwidth(4, 16),
199 pcie_tx_gbps: 0.0,
200 pcie_rx_gbps: 0.0,
201 active_transfers: Vec::new(),
202 completed_transfers: VecDeque::with_capacity(Self::MAX_COMPLETED_TRANSFERS),
203 memory_bus_utilization_pct: 0.0,
204 memory_read_gbps: 0.0,
205 memory_write_gbps: 0.0,
206 pinned_memory_used_bytes: 0,
207 pinned_memory_total_bytes: 0,
208 staging_buffer_used_bytes: 0,
209 pcie_tx_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
210 pcie_rx_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
211 memory_bus_history: VecDeque::with_capacity(Self::MAX_HISTORY_POINTS),
212 }
213 }
214}
215
216#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
222pub struct TransferId(pub u64);
223
224impl TransferId {
225 #[must_use]
227 pub fn new() -> Self {
228 use std::sync::atomic::{AtomicU64, Ordering};
229 static COUNTER: AtomicU64 = AtomicU64::new(1);
230 Self(COUNTER.fetch_add(1, Ordering::Relaxed))
231 }
232}
233
234impl Default for TransferId {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240#[derive(Debug, Clone)]
242pub struct Transfer {
243 pub id: TransferId,
245 pub direction: TransferDirection,
247 pub source: MemoryLocation,
249 pub destination: MemoryLocation,
251 pub size_bytes: u64,
253 pub transferred_bytes: u64,
255 pub start_time: Instant,
257 pub end_time: Option<Instant>,
259 pub status: TransferStatus,
261 pub label: String,
263}
264
265impl Transfer {
266 #[must_use]
268 pub fn new(
269 direction: TransferDirection,
270 source: MemoryLocation,
271 destination: MemoryLocation,
272 size_bytes: u64,
273 ) -> Self {
274 Self {
275 id: TransferId::new(),
276 direction,
277 source,
278 destination,
279 size_bytes,
280 transferred_bytes: 0,
281 start_time: Instant::now(),
282 end_time: None,
283 status: TransferStatus::Pending,
284 label: String::new(),
285 }
286 }
287
288 #[must_use]
290 pub fn host_to_device(size_bytes: u64, device_id: DeviceId) -> Self {
291 Self::new(
292 TransferDirection::HostToDevice,
293 MemoryLocation::SystemRam,
294 MemoryLocation::GpuVram(device_id),
295 size_bytes,
296 )
297 }
298
299 #[must_use]
301 pub fn device_to_host(size_bytes: u64, device_id: DeviceId) -> Self {
302 Self::new(
303 TransferDirection::DeviceToHost,
304 MemoryLocation::GpuVram(device_id),
305 MemoryLocation::SystemRam,
306 size_bytes,
307 )
308 }
309
310 #[must_use]
312 pub fn with_label(mut self, label: impl Into<String>) -> Self {
313 self.label = label.into();
314 self
315 }
316
317 #[must_use]
319 pub fn progress_pct(&self) -> f64 {
320 if self.size_bytes == 0 {
321 return 100.0;
322 }
323 (self.transferred_bytes as f64 / self.size_bytes as f64) * 100.0
324 }
325
326 #[must_use]
328 pub fn elapsed(&self) -> std::time::Duration {
329 match self.end_time {
330 Some(end) => end.duration_since(self.start_time),
331 None => self.start_time.elapsed(),
332 }
333 }
334
335 #[must_use]
337 pub fn elapsed_ms(&self) -> f64 {
338 self.elapsed().as_secs_f64() * 1000.0
339 }
340
341 #[must_use]
343 pub fn bandwidth_gbps(&self) -> f64 {
344 let elapsed_s = self.elapsed().as_secs_f64();
345 if elapsed_s > 0.0 {
346 self.transferred_bytes as f64 / elapsed_s / 1e9
347 } else {
348 0.0
349 }
350 }
351
352 pub fn update_progress(&mut self, bytes_transferred: u64) {
354 self.transferred_bytes = bytes_transferred;
355 if self.status == TransferStatus::Pending {
356 self.status = TransferStatus::InProgress;
357 }
358 }
359
360 pub fn complete(&mut self) {
362 self.transferred_bytes = self.size_bytes;
363 self.status = TransferStatus::Completed;
364 self.end_time = Some(Instant::now());
365 }
366
367 pub fn fail(&mut self, _reason: &str) {
369 self.status = TransferStatus::Failed;
370 self.end_time = Some(Instant::now());
371 }
372}
373
374#[derive(Debug, Clone, Copy, PartialEq, Eq)]
376pub enum TransferDirection {
377 HostToDevice,
379 DeviceToHost,
381 DeviceToDevice,
383 PeerToPeer,
385}
386
387impl std::fmt::Display for TransferDirection {
388 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389 match self {
390 Self::HostToDevice => write!(f, "H→D"),
391 Self::DeviceToHost => write!(f, "D→H"),
392 Self::DeviceToDevice => write!(f, "D→D"),
393 Self::PeerToPeer => write!(f, "P2P"),
394 }
395 }
396}
397
398#[derive(Debug, Clone, Copy, PartialEq, Eq)]
400pub enum MemoryLocation {
401 SystemRam,
403 PinnedMemory,
405 GpuVram(DeviceId),
407 UnifiedMemory,
409}
410
411impl std::fmt::Display for MemoryLocation {
412 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413 match self {
414 Self::SystemRam => write!(f, "RAM"),
415 Self::PinnedMemory => write!(f, "Pinned"),
416 Self::GpuVram(id) => write!(f, "VRAM:{}", id),
417 Self::UnifiedMemory => write!(f, "Unified"),
418 }
419 }
420}
421
422#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum TransferStatus {
425 Pending,
427 InProgress,
429 Completed,
431 Failed,
433}
434
435#[cfg(test)]
436mod tests;