memscope_rs/cli/commands/html_from_json/
large_file_optimizer.rs

1//! Large file optimization module for JSON processing
2//!
3//! This module provides streaming JSON parsing and memory optimization
4//! for handling large JSON files efficiently without memory issues.
5
6use serde_json::Value;
7use std::error::Error;
8use std::fmt;
9use std::fs::File;
10use std::io::BufReader;
11use std::path::Path;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::Instant;
15
16/// Configuration for large file processing
17#[derive(Debug, Clone)]
18pub struct LargeFileConfig {
19    /// Maximum memory usage in bytes before switching to streaming mode
20    pub max_memory_bytes: usize,
21    /// Chunk size for streaming processing in bytes
22    pub stream_chunk_size: usize,
23    /// Enable memory usage monitoring
24    pub enable_memory_monitoring: bool,
25    /// Enable progress reporting
26    pub enable_progress_reporting: bool,
27    /// Maximum file size to process in bytes (safety limit)
28    pub max_file_size_bytes: usize,
29}
30
31impl Default for LargeFileConfig {
32    fn default() -> Self {
33        Self {
34            max_memory_bytes: 512 * 1024 * 1024, // 512MB
35            stream_chunk_size: 64 * 1024,        // 64KB chunks
36            enable_memory_monitoring: true,
37            enable_progress_reporting: true,
38            max_file_size_bytes: 2 * 1024 * 1024 * 1024, // 2GB limit
39        }
40    }
41}
42
43/// Memory usage statistics
44#[derive(Debug, Clone)]
45pub struct MemoryStats {
46    /// Current memory usage in bytes
47    pub current_usage_bytes: usize,
48    /// Peak memory usage in bytes
49    pub peak_usage_bytes: usize,
50    /// Number of memory allocations tracked
51    pub allocation_count: usize,
52    /// Memory efficiency ratio (0.0 to 1.0)
53    pub efficiency_ratio: f64,
54}
55
56/// Processing statistics for large files
57#[derive(Debug)]
58pub struct ProcessingStats {
59    /// File size in bytes
60    pub file_size_bytes: usize,
61    /// Processing time in milliseconds
62    pub processing_time_ms: u64,
63    /// Whether streaming mode was used
64    pub streaming_mode_used: bool,
65    /// Memory statistics
66    pub memory_stats: MemoryStats,
67    /// Throughput in MB/s
68    pub throughput_mb_per_sec: f64,
69    /// Number of JSON objects processed
70    pub objects_processed: usize,
71}
72
73/// Errors that can occur during large file processing
74#[derive(Debug)]
75pub enum LargeFileError {
76    /// File is too large to process safely
77    FileTooLarge(usize, usize),
78    /// Memory limit exceeded during processing
79    MemoryLimitExceeded(usize, usize),
80    /// Streaming JSON parsing failed
81    StreamingParseError(String),
82    /// IO error during file processing
83    IoError(std::io::Error),
84    /// JSON structure validation failed
85    ValidationError(String),
86}
87
88impl fmt::Display for LargeFileError {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        match self {
91            LargeFileError::FileTooLarge(size, limit) => {
92                write!(f, "File size ({size} bytes) exceeds limit ({limit} bytes)",)
93            }
94            LargeFileError::MemoryLimitExceeded(used, limit) => {
95                write!(
96                    f,
97                    "Memory usage ({used} bytes) exceeds limit ({limit} bytes)",
98                )
99            }
100            LargeFileError::StreamingParseError(msg) => {
101                write!(f, "Streaming parse error: {msg}")
102            }
103            LargeFileError::IoError(err) => {
104                write!(f, "IO error: {err}")
105            }
106            LargeFileError::ValidationError(msg) => {
107                write!(f, "Validation error: {msg}")
108            }
109        }
110    }
111}
112
113impl Error for LargeFileError {}
114
115/// Memory monitor for tracking usage during processing
116pub struct MemoryMonitor {
117    /// Current memory usage counter
118    current_usage: Arc<AtomicUsize>,
119    /// Peak memory usage
120    peak_usage: Arc<AtomicUsize>,
121    /// Memory limit in bytes
122    memory_limit: usize,
123    /// Enable monitoring flag
124    enabled: bool,
125}
126
127impl MemoryMonitor {
128    /// Create a new memory monitor
129    pub fn new(memory_limit: usize, enabled: bool) -> Self {
130        Self {
131            current_usage: Arc::new(AtomicUsize::new(0)),
132            peak_usage: Arc::new(AtomicUsize::new(0)),
133            memory_limit,
134            enabled,
135        }
136    }
137
138    /// Allocate memory and track usage
139    pub fn allocate(&self, size: usize) -> Result<(), LargeFileError> {
140        if !self.enabled {
141            return Ok(());
142        }
143
144        let new_usage = self.current_usage.fetch_add(size, Ordering::Relaxed) + size;
145
146        // Update peak usage
147        let mut peak = self.peak_usage.load(Ordering::Relaxed);
148        while new_usage > peak {
149            match self.peak_usage.compare_exchange_weak(
150                peak,
151                new_usage,
152                Ordering::Relaxed,
153                Ordering::Relaxed,
154            ) {
155                Ok(_) => break,
156                Err(current_peak) => peak = current_peak,
157            }
158        }
159
160        // Check memory limit
161        if new_usage > self.memory_limit {
162            return Err(LargeFileError::MemoryLimitExceeded(
163                new_usage,
164                self.memory_limit,
165            ));
166        }
167
168        Ok(())
169    }
170
171    /// Deallocate memory and update tracking
172    pub fn deallocate(&self, size: usize) {
173        if self.enabled {
174            self.current_usage.fetch_sub(size, Ordering::Relaxed);
175        }
176    }
177
178    /// Get current memory statistics
179    pub fn get_stats(&self) -> MemoryStats {
180        let current = self.current_usage.load(Ordering::Relaxed);
181        let peak = self.peak_usage.load(Ordering::Relaxed);
182
183        MemoryStats {
184            current_usage_bytes: current,
185            peak_usage_bytes: peak,
186            allocation_count: 1, // Simplified for this implementation
187            efficiency_ratio: if peak > 0 {
188                current as f64 / peak as f64
189            } else {
190                1.0
191            },
192        }
193    }
194}
195
196/// Large file optimizer for JSON processing
197pub struct LargeFileOptimizer {
198    /// Configuration settings
199    config: LargeFileConfig,
200    /// Memory monitor
201    memory_monitor: MemoryMonitor,
202}
203
204impl LargeFileOptimizer {
205    /// Create a new large file optimizer
206    pub fn new(config: LargeFileConfig) -> Self {
207        let memory_monitor =
208            MemoryMonitor::new(config.max_memory_bytes, config.enable_memory_monitoring);
209
210        Self {
211            config,
212            memory_monitor,
213        }
214    }
215
216    /// Create optimizer with default configuration
217    pub fn new_default() -> Self {
218        Self::new(LargeFileConfig::default())
219    }
220
221    /// Process a large JSON file with optimization
222    pub fn process_file<P: AsRef<Path>>(
223        &self,
224        file_path: P,
225        file_type: &str,
226    ) -> Result<(Value, ProcessingStats), LargeFileError> {
227        let start_time = Instant::now();
228        let path = file_path.as_ref();
229
230        // Check file size
231        let file_size = std::fs::metadata(path)
232            .map_err(LargeFileError::IoError)?
233            .len() as usize;
234
235        if file_size > self.config.max_file_size_bytes {
236            return Err(LargeFileError::FileTooLarge(
237                file_size,
238                self.config.max_file_size_bytes,
239            ));
240        }
241
242        tracing::info!(
243            "🔧 Processing large file: {} ({:.1} MB)",
244            path.display(),
245            file_size as f64 / 1024.0 / 1024.0
246        );
247
248        // Decide processing strategy based on file size
249        let use_streaming = file_size > self.config.max_memory_bytes / 2;
250
251        let (json_value, objects_processed) = if use_streaming {
252            tracing::info!("📡 Using streaming mode for large file processing");
253            self.process_streaming(path, file_type)?
254        } else {
255            tracing::info!("💾 Using memory-optimized mode for file processing");
256            self.process_memory_optimized(path, file_type)?
257        };
258
259        let processing_time = start_time.elapsed().as_millis() as u64;
260        let throughput = if processing_time > 0 {
261            (file_size as f64 / 1024.0 / 1024.0) / (processing_time as f64 / 1000.0)
262        } else {
263            0.0
264        };
265
266        let stats = ProcessingStats {
267            file_size_bytes: file_size,
268            processing_time_ms: processing_time,
269            streaming_mode_used: use_streaming,
270            memory_stats: self.memory_monitor.get_stats(),
271            throughput_mb_per_sec: throughput,
272            objects_processed,
273        };
274
275        tracing::info!(
276            "✅ File processed: {:.1} MB/s, {} objects, {}ms",
277            throughput,
278            objects_processed,
279            processing_time
280        );
281
282        Ok((json_value, stats))
283    }
284
285    /// Process file using streaming JSON parsing
286    fn process_streaming<P: AsRef<Path>>(
287        &self,
288        file_path: P,
289        file_type: &str,
290    ) -> Result<(Value, usize), LargeFileError> {
291        let file = File::open(file_path).map_err(LargeFileError::IoError)?;
292        let reader = BufReader::with_capacity(self.config.stream_chunk_size, file);
293
294        // Track memory allocation for the reader buffer
295        self.memory_monitor
296            .allocate(self.config.stream_chunk_size)?;
297
298        // For streaming, use serde_json's streaming parser directly from BufReader
299        // 避免将整个文件读入内存
300
301        // Parse JSON directly from BufReader - 流式解析,避免内存峰值
302        let json_value: Value = serde_json::from_reader(reader)
303            .map_err(|e| LargeFileError::StreamingParseError(e.to_string()))?;
304
305        // Validate JSON structure
306        self.validate_json_structure(&json_value, file_type)?;
307
308        // Count objects processed (simplified)
309        let objects_processed = self.count_json_objects(&json_value);
310
311        // Clean up memory tracking
312        self.memory_monitor
313            .deallocate(self.config.stream_chunk_size);
314
315        Ok((json_value, objects_processed))
316    }
317
318    /// Process file using memory-optimized approach
319    fn process_memory_optimized<P: AsRef<Path>>(
320        &self,
321        file_path: P,
322        file_type: &str,
323    ) -> Result<(Value, usize), LargeFileError> {
324        // 使用流式读取避免内存峰值
325        let file = File::open(file_path).map_err(LargeFileError::IoError)?;
326        let reader = BufReader::new(file);
327
328        // Track memory for reader buffer
329        self.memory_monitor.allocate(8192)?; // BufReader默认缓冲区大小
330
331        // Parse JSON directly from reader
332        let json_value: Value = serde_json::from_reader(reader)
333            .map_err(|e| LargeFileError::StreamingParseError(e.to_string()))?;
334
335        // Validate structure
336        self.validate_json_structure(&json_value, file_type)?;
337
338        // Count objects
339        let objects_processed = self.count_json_objects(&json_value);
340
341        // Clean up memory tracking
342        self.memory_monitor.deallocate(8192);
343
344        Ok((json_value, objects_processed))
345    }
346
347    /// Validate JSON structure based on file type
348    fn validate_json_structure(&self, json: &Value, file_type: &str) -> Result<(), LargeFileError> {
349        match file_type {
350            "memory_analysis" => {
351                if !json.is_object() {
352                    return Err(LargeFileError::ValidationError(
353                        "Memory analysis JSON must be an object".to_string(),
354                    ));
355                }
356
357                // Check for required fields
358                let obj = json.as_object().expect("Test operation failed");
359                if !obj.contains_key("allocations") && !obj.contains_key("summary") {
360                    return Err(LargeFileError::ValidationError(
361                        "Memory analysis JSON must contain 'allocations' or 'summary' field"
362                            .to_string(),
363                    ));
364                }
365            }
366            "unsafe_ffi" => {
367                if !json.is_object() {
368                    return Err(LargeFileError::ValidationError(
369                        "Unsafe FFI JSON must be an object".to_string(),
370                    ));
371                }
372
373                let obj = json.as_object().expect("Test operation failed");
374                if !obj.contains_key("enhanced_ffi_data") && !obj.contains_key("summary") {
375                    return Err(LargeFileError::ValidationError(
376                        "Unsafe FFI JSON must contain 'enhanced_ffi_data' or 'summary' field"
377                            .to_string(),
378                    ));
379                }
380            }
381            "performance" => {
382                if !json.is_object() {
383                    return Err(LargeFileError::ValidationError(
384                        "Performance JSON must be an object".to_string(),
385                    ));
386                }
387
388                let obj = json.as_object().expect("Test operation failed");
389                if !obj.contains_key("memory_performance")
390                    && !obj.contains_key("allocation_distribution")
391                {
392                    return Err(LargeFileError::ValidationError(
393                        "Performance JSON must contain performance-related fields".to_string(),
394                    ));
395                }
396            }
397            "lifetime" => {
398                if !json.is_object() {
399                    return Err(LargeFileError::ValidationError(
400                        "Lifetime JSON must be an object".to_string(),
401                    ));
402                }
403
404                let obj = json.as_object().expect("Test operation failed");
405                if !obj.contains_key("lifecycle_events") {
406                    return Err(LargeFileError::ValidationError(
407                        "Lifetime JSON must contain 'lifecycle_events' field".to_string(),
408                    ));
409                }
410            }
411            "complex_types" => {
412                if !json.is_object() {
413                    return Err(LargeFileError::ValidationError(
414                        "Complex types JSON must be an object".to_string(),
415                    ));
416                }
417
418                let obj = json.as_object().expect("Test operation failed");
419                if !obj.contains_key("categorized_types") && !obj.contains_key("generic_types") {
420                    return Err(LargeFileError::ValidationError(
421                        "Complex types JSON must contain type-related fields".to_string(),
422                    ));
423                }
424            }
425            _ => {
426                // Basic validation for other file types
427                if !json.is_object() && !json.is_array() {
428                    return Err(LargeFileError::ValidationError(
429                        "JSON must be an object or array".to_string(),
430                    ));
431                }
432            }
433        }
434
435        Ok(())
436    }
437
438    /// Count the number of JSON objects processed
439    fn count_json_objects(&self, json: &Value) -> usize {
440        match json {
441            Value::Object(obj) => {
442                let mut count = 1; // The object itself
443
444                // Count objects in arrays that are likely to contain multiple items
445                for (key, value) in obj {
446                    match key.as_str() {
447                        "allocations" | "lifecycle_events" | "enhanced_ffi_data"
448                        | "boundary_events" | "categorized_types" | "generic_types" => {
449                            if let Value::Array(arr) = value {
450                                count += arr.len();
451                            }
452                        }
453                        _ => {}
454                    }
455                }
456
457                count
458            }
459            Value::Array(arr) => arr.len(),
460            _ => 1,
461        }
462    }
463
464    /// Get current memory usage statistics
465    pub fn get_memory_stats(&self) -> MemoryStats {
466        self.memory_monitor.get_stats()
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473    use std::fs;
474    use tempfile::TempDir;
475
476    #[test]
477    fn test_large_file_config_default() {
478        let config = LargeFileConfig::default();
479        assert_eq!(config.max_memory_bytes, 512 * 1024 * 1024);
480        assert_eq!(config.stream_chunk_size, 64 * 1024);
481        assert!(config.enable_memory_monitoring);
482        assert!(config.enable_progress_reporting);
483    }
484
485    #[test]
486    fn test_memory_monitor() {
487        let monitor = MemoryMonitor::new(1024, true);
488
489        // Test allocation
490        assert!(monitor.allocate(512).is_ok());
491        assert_eq!(monitor.get_stats().current_usage_bytes, 512);
492
493        // Test deallocation
494        monitor.deallocate(256);
495        assert_eq!(monitor.get_stats().current_usage_bytes, 256);
496
497        // Test memory limit
498        assert!(monitor.allocate(1024).is_err());
499    }
500
501    #[test]
502    fn test_process_small_file() {
503        let temp_dir = TempDir::new().expect("Failed to get test value");
504        let file_path = temp_dir.path().join("test.json");
505
506        let test_data =
507            r#"{"allocations": [{"ptr": "0x123", "size": 100}], "summary": {"total": 1}}"#;
508        fs::write(&file_path, test_data).expect("Failed to write test file");
509
510        let optimizer = LargeFileOptimizer::new_default();
511        let result = optimizer.process_file(&file_path, "memory_analysis");
512
513        assert!(result.is_ok());
514        let (json_value, stats) = result.expect("Test operation failed");
515        assert!(json_value.is_object());
516        assert!(!stats.streaming_mode_used);
517        assert_eq!(stats.objects_processed, 2); // 1 object + 1 allocation
518    }
519
520    #[test]
521    fn test_json_validation() {
522        let optimizer = LargeFileOptimizer::new_default();
523
524        // Test valid memory analysis JSON
525        let valid_json = serde_json::json!({
526            "allocations": [],
527            "summary": {"total": 0}
528        });
529        assert!(optimizer
530            .validate_json_structure(&valid_json, "memory_analysis")
531            .is_ok());
532
533        // Test invalid memory analysis JSON
534        let invalid_json = serde_json::json!({
535            "invalid_field": "value"
536        });
537        assert!(optimizer
538            .validate_json_structure(&invalid_json, "memory_analysis")
539            .is_err());
540    }
541}