Skip to main content

trustformers_debug/profiler/
io_monitor.rs

1//! I/O operation monitoring and bandwidth tracking
2
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::time::{Duration, Instant, SystemTime};
6use uuid::Uuid;
7
8/// I/O operation profiling
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct IoProfile {
11    pub operation_type: IoOperationType,
12    pub file_path: Option<String>,
13    pub bytes_transferred: usize,
14    pub duration: Duration,
15    pub bandwidth_mb_s: f64,
16    pub queue_time: Duration,
17    pub device_type: IoDeviceType,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub enum IoOperationType {
22    FileRead,
23    FileWrite,
24    NetworkRead,
25    NetworkWrite,
26    DatabaseQuery,
27    CacheLoad,
28    CacheStore,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
32pub enum IoDeviceType {
33    SSD,
34    HDD,
35    Network,
36    Memory,
37    Cache,
38}
39
40/// Layer-wise latency analysis
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct LayerLatencyProfile {
43    pub layer_name: String,
44    pub layer_type: String,
45    pub input_shapes: Vec<Vec<usize>>,
46    pub output_shapes: Vec<Vec<usize>>,
47    pub cpu_time: Duration,
48    pub gpu_time: Duration,
49    pub memory_copy_time: Duration,
50    pub sync_time: Duration,
51    pub parameter_count: usize,
52    pub flops: u64,
53    pub memory_footprint_bytes: usize,
54    pub cache_hit_rate: f64,
55}
56
57#[derive(Debug, Serialize, Deserialize)]
58pub struct IoPerformanceSummary {
59    pub total_operations: usize,
60    pub total_bytes_transferred: usize,
61    pub avg_bandwidth_by_device: HashMap<IoDeviceType, f64>,
62    pub slowest_operations: Vec<String>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct BandwidthSample {
67    pub timestamp: SystemTime,
68    pub bandwidth_mb_s: f64,
69    pub device_type: IoDeviceType,
70}
71
72/// I/O operation monitor
73#[derive(Debug)]
74pub struct IoMonitor {
75    pub(crate) active_operations: HashMap<Uuid, IoOperation>,
76    pub(crate) bandwidth_history: Vec<BandwidthSample>,
77    pub(crate) io_queue_depth: usize,
78}
79
80#[allow(dead_code)]
81#[derive(Debug)]
82pub struct IoOperation {
83    #[allow(dead_code)]
84    pub(crate) operation_id: Uuid,
85    pub(crate) start_time: Instant,
86    pub(crate) operation_type: IoOperationType,
87    pub(crate) bytes_expected: usize,
88}
89
90impl Default for IoMonitor {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96impl IoMonitor {
97    pub fn new() -> Self {
98        Self {
99            active_operations: HashMap::new(),
100            bandwidth_history: Vec::new(),
101            io_queue_depth: 0,
102        }
103    }
104
105    pub fn start_io_operation(
106        &mut self,
107        operation_type: IoOperationType,
108        bytes_expected: usize,
109    ) -> Uuid {
110        let operation_id = Uuid::new_v4();
111        let operation = IoOperation {
112            operation_id,
113            start_time: Instant::now(),
114            operation_type,
115            bytes_expected,
116        };
117
118        self.active_operations.insert(operation_id, operation);
119        self.io_queue_depth += 1;
120        operation_id
121    }
122
123    pub fn finish_io_operation(
124        &mut self,
125        operation_id: Uuid,
126        bytes_transferred: usize,
127    ) -> Option<IoProfile> {
128        if let Some(operation) = self.active_operations.remove(&operation_id) {
129            let duration = operation.start_time.elapsed();
130            let bandwidth_mb_s = if duration.as_secs_f64() > 0.0 {
131                bytes_transferred as f64 / (1024.0 * 1024.0) / duration.as_secs_f64()
132            } else {
133                0.0
134            };
135
136            self.io_queue_depth = self.io_queue_depth.saturating_sub(1);
137
138            let device_type = match operation.operation_type {
139                IoOperationType::FileRead | IoOperationType::FileWrite => IoDeviceType::SSD,
140                IoOperationType::NetworkRead | IoOperationType::NetworkWrite => {
141                    IoDeviceType::Network
142                },
143                IoOperationType::CacheLoad | IoOperationType::CacheStore => IoDeviceType::Cache,
144                _ => IoDeviceType::Memory,
145            };
146
147            // Record bandwidth sample
148            self.bandwidth_history.push(BandwidthSample {
149                timestamp: SystemTime::now(),
150                bandwidth_mb_s,
151                device_type: device_type.clone(),
152            });
153
154            // Keep only recent samples
155            if self.bandwidth_history.len() > 1000 {
156                self.bandwidth_history.drain(0..500);
157            }
158
159            Some(IoProfile {
160                operation_type: operation.operation_type,
161                file_path: None, // Would be filled in practice
162                bytes_transferred,
163                duration,
164                bandwidth_mb_s,
165                queue_time: Duration::from_millis(self.io_queue_depth as u64 * 10), // Simplified
166                device_type,
167            })
168        } else {
169            None
170        }
171    }
172
173    pub fn get_average_bandwidth(&self, device_type: &IoDeviceType) -> f64 {
174        let samples: Vec<f64> = self
175            .bandwidth_history
176            .iter()
177            .filter(|s| &s.device_type == device_type)
178            .map(|s| s.bandwidth_mb_s)
179            .collect();
180
181        if samples.is_empty() {
182            0.0
183        } else {
184            samples.iter().sum::<f64>() / samples.len() as f64
185        }
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn test_io_monitor_new() {
195        let monitor = IoMonitor::new();
196        assert_eq!(monitor.io_queue_depth, 0);
197        assert!(monitor.bandwidth_history.is_empty());
198    }
199
200    #[test]
201    fn test_io_monitor_start_operation() {
202        let mut monitor = IoMonitor::new();
203        let _id = monitor.start_io_operation(IoOperationType::FileRead, 4096);
204        assert_eq!(monitor.io_queue_depth, 1);
205        assert_eq!(monitor.active_operations.len(), 1);
206    }
207
208    #[test]
209    fn test_io_monitor_finish_operation() {
210        let mut monitor = IoMonitor::new();
211        let id = monitor.start_io_operation(IoOperationType::FileWrite, 8192);
212        let profile = monitor.finish_io_operation(id, 8192);
213        assert!(profile.is_some());
214        let p = profile.expect("profile should be Some");
215        assert_eq!(p.bytes_transferred, 8192);
216        assert_eq!(monitor.io_queue_depth, 0);
217    }
218
219    #[test]
220    fn test_io_monitor_finish_nonexistent() {
221        let mut monitor = IoMonitor::new();
222        let profile = monitor.finish_io_operation(Uuid::new_v4(), 100);
223        assert!(profile.is_none());
224    }
225
226    #[test]
227    fn test_io_monitor_average_bandwidth_empty() {
228        let monitor = IoMonitor::new();
229        assert!((monitor.get_average_bandwidth(&IoDeviceType::SSD) - 0.0).abs() < 1e-9);
230    }
231
232    #[test]
233    fn test_io_monitor_device_type_mapping() {
234        let mut monitor = IoMonitor::new();
235        let id = monitor.start_io_operation(IoOperationType::NetworkRead, 1024);
236        let profile = monitor.finish_io_operation(id, 1024);
237        assert!(profile.is_some());
238        let p = profile.expect("profile should be Some");
239        assert_eq!(p.device_type, IoDeviceType::Network);
240    }
241
242    #[test]
243    fn test_io_monitor_cache_device_type() {
244        let mut monitor = IoMonitor::new();
245        let id = monitor.start_io_operation(IoOperationType::CacheLoad, 512);
246        let profile = monitor.finish_io_operation(id, 512);
247        assert!(profile.is_some());
248        let p = profile.expect("profile should be Some");
249        assert_eq!(p.device_type, IoDeviceType::Cache);
250    }
251}