memscope_rs/cli/commands/html_from_json/
large_file_optimizer.rs

1//! Large file optimization module for JSON processing
2//!
3//! This module provides streaming JSON parsing and memory optimization
4//! for handling large JSON files efficiently without memory issues.
5
6use serde_json::Value;
7use std::error::Error;
8use std::fmt;
9use std::fs::File;
10use std::io::{BufReader, Read};
11use std::path::Path;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::Instant;
15
16/// Configuration for large file processing
17#[derive(Debug, Clone)]
18pub struct LargeFileConfig {
19    /// Maximum memory usage in bytes before switching to streaming mode
20    pub max_memory_bytes: usize,
21    /// Chunk size for streaming processing in bytes
22    pub stream_chunk_size: usize,
23    /// Enable memory usage monitoring
24    pub enable_memory_monitoring: bool,
25    /// Enable progress reporting
26    pub enable_progress_reporting: bool,
27    /// Maximum file size to process in bytes (safety limit)
28    pub max_file_size_bytes: usize,
29}
30
31impl Default for LargeFileConfig {
32    fn default() -> Self {
33        Self {
34            max_memory_bytes: 512 * 1024 * 1024, // 512MB
35            stream_chunk_size: 64 * 1024,        // 64KB chunks
36            enable_memory_monitoring: true,
37            enable_progress_reporting: true,
38            max_file_size_bytes: 2 * 1024 * 1024 * 1024, // 2GB limit
39        }
40    }
41}
42
43/// Memory usage statistics
44#[derive(Debug, Clone)]
45pub struct MemoryStats {
46    /// Current memory usage in bytes
47    pub current_usage_bytes: usize,
48    /// Peak memory usage in bytes
49    pub peak_usage_bytes: usize,
50    /// Number of memory allocations tracked
51    pub allocation_count: usize,
52    /// Memory efficiency ratio (0.0 to 1.0)
53    pub efficiency_ratio: f64,
54}
55
56/// Processing statistics for large files
57#[derive(Debug)]
58pub struct ProcessingStats {
59    /// File size in bytes
60    pub file_size_bytes: usize,
61    /// Processing time in milliseconds
62    pub processing_time_ms: u64,
63    /// Whether streaming mode was used
64    pub streaming_mode_used: bool,
65    /// Memory statistics
66    pub memory_stats: MemoryStats,
67    /// Throughput in MB/s
68    pub throughput_mb_per_sec: f64,
69    /// Number of JSON objects processed
70    pub objects_processed: usize,
71}
72
73/// Errors that can occur during large file processing
74#[derive(Debug)]
75pub enum LargeFileError {
76    /// File is too large to process safely
77    FileTooLarge(usize, usize),
78    /// Memory limit exceeded during processing
79    MemoryLimitExceeded(usize, usize),
80    /// Streaming JSON parsing failed
81    StreamingParseError(String),
82    /// IO error during file processing
83    IoError(std::io::Error),
84    /// JSON structure validation failed
85    ValidationError(String),
86}
87
88impl fmt::Display for LargeFileError {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        match self {
91            LargeFileError::FileTooLarge(size, limit) => {
92                write!(
93                    f,
94                    "File size ({} bytes) exceeds limit ({} bytes)",
95                    size, limit
96                )
97            }
98            LargeFileError::MemoryLimitExceeded(used, limit) => {
99                write!(
100                    f,
101                    "Memory usage ({} bytes) exceeds limit ({} bytes)",
102                    used, limit
103                )
104            }
105            LargeFileError::StreamingParseError(msg) => {
106                write!(f, "Streaming parse error: {msg}")
107            }
108            LargeFileError::IoError(err) => {
109                write!(f, "IO error: {err}")
110            }
111            LargeFileError::ValidationError(msg) => {
112                write!(f, "Validation error: {msg}")
113            }
114        }
115    }
116}
117
118impl Error for LargeFileError {}
119
120/// Memory monitor for tracking usage during processing
121pub struct MemoryMonitor {
122    /// Current memory usage counter
123    current_usage: Arc<AtomicUsize>,
124    /// Peak memory usage
125    peak_usage: Arc<AtomicUsize>,
126    /// Memory limit in bytes
127    memory_limit: usize,
128    /// Enable monitoring flag
129    enabled: bool,
130}
131
132impl MemoryMonitor {
133    /// Create a new memory monitor
134    pub fn new(memory_limit: usize, enabled: bool) -> Self {
135        Self {
136            current_usage: Arc::new(AtomicUsize::new(0)),
137            peak_usage: Arc::new(AtomicUsize::new(0)),
138            memory_limit,
139            enabled,
140        }
141    }
142
143    /// Allocate memory and track usage
144    pub fn allocate(&self, size: usize) -> Result<(), LargeFileError> {
145        if !self.enabled {
146            return Ok(());
147        }
148
149        let new_usage = self.current_usage.fetch_add(size, Ordering::Relaxed) + size;
150
151        // Update peak usage
152        let mut peak = self.peak_usage.load(Ordering::Relaxed);
153        while new_usage > peak {
154            match self.peak_usage.compare_exchange_weak(
155                peak,
156                new_usage,
157                Ordering::Relaxed,
158                Ordering::Relaxed,
159            ) {
160                Ok(_) => break,
161                Err(current_peak) => peak = current_peak,
162            }
163        }
164
165        // Check memory limit
166        if new_usage > self.memory_limit {
167            return Err(LargeFileError::MemoryLimitExceeded(
168                new_usage,
169                self.memory_limit,
170            ));
171        }
172
173        Ok(())
174    }
175
176    /// Deallocate memory and update tracking
177    pub fn deallocate(&self, size: usize) {
178        if self.enabled {
179            self.current_usage.fetch_sub(size, Ordering::Relaxed);
180        }
181    }
182
183    /// Get current memory statistics
184    pub fn get_stats(&self) -> MemoryStats {
185        let current = self.current_usage.load(Ordering::Relaxed);
186        let peak = self.peak_usage.load(Ordering::Relaxed);
187
188        MemoryStats {
189            current_usage_bytes: current,
190            peak_usage_bytes: peak,
191            allocation_count: 1, // Simplified for this implementation
192            efficiency_ratio: if peak > 0 {
193                current as f64 / peak as f64
194            } else {
195                1.0
196            },
197        }
198    }
199}
200
201/// Large file optimizer for JSON processing
202pub struct LargeFileOptimizer {
203    /// Configuration settings
204    config: LargeFileConfig,
205    /// Memory monitor
206    memory_monitor: MemoryMonitor,
207}
208
209impl LargeFileOptimizer {
210    /// Create a new large file optimizer
211    pub fn new(config: LargeFileConfig) -> Self {
212        let memory_monitor =
213            MemoryMonitor::new(config.max_memory_bytes, config.enable_memory_monitoring);
214
215        Self {
216            config,
217            memory_monitor,
218        }
219    }
220
221    /// Create optimizer with default configuration
222    pub fn default() -> Self {
223        Self::new(LargeFileConfig::default())
224    }
225
226    /// Process a large JSON file with optimization
227    pub fn process_file<P: AsRef<Path>>(
228        &self,
229        file_path: P,
230        file_type: &str,
231    ) -> Result<(Value, ProcessingStats), LargeFileError> {
232        let start_time = Instant::now();
233        let path = file_path.as_ref();
234
235        // Check file size
236        let file_size = std::fs::metadata(path)
237            .map_err(LargeFileError::IoError)?
238            .len() as usize;
239
240        if file_size > self.config.max_file_size_bytes {
241            return Err(LargeFileError::FileTooLarge(
242                file_size,
243                self.config.max_file_size_bytes,
244            ));
245        }
246
247        println!(
248            "🔧 Processing large file: {} ({:.1} MB)",
249            path.display(),
250            file_size as f64 / 1024.0 / 1024.0
251        );
252
253        // Decide processing strategy based on file size
254        let use_streaming = file_size > self.config.max_memory_bytes / 2;
255
256        let (json_value, objects_processed) = if use_streaming {
257            println!("📡 Using streaming mode for large file processing");
258            self.process_streaming(path, file_type)?
259        } else {
260            println!("💾 Using memory-optimized mode for file processing");
261            self.process_memory_optimized(path, file_type)?
262        };
263
264        let processing_time = start_time.elapsed().as_millis() as u64;
265        let throughput = if processing_time > 0 {
266            (file_size as f64 / 1024.0 / 1024.0) / (processing_time as f64 / 1000.0)
267        } else {
268            0.0
269        };
270
271        let stats = ProcessingStats {
272            file_size_bytes: file_size,
273            processing_time_ms: processing_time,
274            streaming_mode_used: use_streaming,
275            memory_stats: self.memory_monitor.get_stats(),
276            throughput_mb_per_sec: throughput,
277            objects_processed,
278        };
279
280        println!(
281            "✅ File processed: {:.1} MB/s, {} objects, {}ms",
282            throughput, objects_processed, processing_time
283        );
284
285        Ok((json_value, stats))
286    }
287
288    /// Process file using streaming JSON parsing
289    fn process_streaming<P: AsRef<Path>>(
290        &self,
291        file_path: P,
292        file_type: &str,
293    ) -> Result<(Value, usize), LargeFileError> {
294        let file = File::open(file_path).map_err(LargeFileError::IoError)?;
295        let mut reader = BufReader::with_capacity(self.config.stream_chunk_size, file);
296
297        // Track memory allocation for the reader buffer
298        self.memory_monitor
299            .allocate(self.config.stream_chunk_size)?;
300
301        // For streaming, we'll read the JSON in chunks and validate structure
302        let mut buffer = String::new();
303        reader
304            .read_to_string(&mut buffer)
305            .map_err(LargeFileError::IoError)?;
306
307        // Track memory for the buffer
308        self.memory_monitor.allocate(buffer.len())?;
309
310        // Parse JSON with streaming deserializer for validation
311        let json_value: Value = serde_json::from_str(&buffer)
312            .map_err(|e| LargeFileError::StreamingParseError(e.to_string()))?;
313
314        // Validate JSON structure
315        self.validate_json_structure(&json_value, file_type)?;
316
317        // Count objects processed (simplified)
318        let objects_processed = self.count_json_objects(&json_value);
319
320        // Clean up memory tracking
321        self.memory_monitor.deallocate(buffer.len());
322        self.memory_monitor
323            .deallocate(self.config.stream_chunk_size);
324
325        Ok((json_value, objects_processed))
326    }
327
328    /// Process file using memory-optimized approach
329    fn process_memory_optimized<P: AsRef<Path>>(
330        &self,
331        file_path: P,
332        file_type: &str,
333    ) -> Result<(Value, usize), LargeFileError> {
334        // Read file with memory tracking
335        let content = std::fs::read_to_string(file_path).map_err(LargeFileError::IoError)?;
336
337        self.memory_monitor.allocate(content.len())?;
338
339        // Parse JSON
340        let json_value: Value = serde_json::from_str(&content)
341            .map_err(|e| LargeFileError::StreamingParseError(e.to_string()))?;
342
343        // Validate structure
344        self.validate_json_structure(&json_value, file_type)?;
345
346        // Count objects
347        let objects_processed = self.count_json_objects(&json_value);
348
349        // Clean up memory tracking
350        self.memory_monitor.deallocate(content.len());
351
352        Ok((json_value, objects_processed))
353    }
354
355    /// Validate JSON structure based on file type
356    fn validate_json_structure(&self, json: &Value, file_type: &str) -> Result<(), LargeFileError> {
357        match file_type {
358            "memory_analysis" => {
359                if !json.is_object() {
360                    return Err(LargeFileError::ValidationError(
361                        "Memory analysis JSON must be an object".to_string(),
362                    ));
363                }
364
365                // Check for required fields
366                let obj = json.as_object().unwrap();
367                if !obj.contains_key("allocations") && !obj.contains_key("summary") {
368                    return Err(LargeFileError::ValidationError(
369                        "Memory analysis JSON must contain 'allocations' or 'summary' field"
370                            .to_string(),
371                    ));
372                }
373            }
374            "unsafe_ffi" => {
375                if !json.is_object() {
376                    return Err(LargeFileError::ValidationError(
377                        "Unsafe FFI JSON must be an object".to_string(),
378                    ));
379                }
380
381                let obj = json.as_object().unwrap();
382                if !obj.contains_key("enhanced_ffi_data") && !obj.contains_key("summary") {
383                    return Err(LargeFileError::ValidationError(
384                        "Unsafe FFI JSON must contain 'enhanced_ffi_data' or 'summary' field"
385                            .to_string(),
386                    ));
387                }
388            }
389            "performance" => {
390                if !json.is_object() {
391                    return Err(LargeFileError::ValidationError(
392                        "Performance JSON must be an object".to_string(),
393                    ));
394                }
395
396                let obj = json.as_object().unwrap();
397                if !obj.contains_key("memory_performance")
398                    && !obj.contains_key("allocation_distribution")
399                {
400                    return Err(LargeFileError::ValidationError(
401                        "Performance JSON must contain performance-related fields".to_string(),
402                    ));
403                }
404            }
405            "lifetime" => {
406                if !json.is_object() {
407                    return Err(LargeFileError::ValidationError(
408                        "Lifetime JSON must be an object".to_string(),
409                    ));
410                }
411
412                let obj = json.as_object().unwrap();
413                if !obj.contains_key("lifecycle_events") {
414                    return Err(LargeFileError::ValidationError(
415                        "Lifetime JSON must contain 'lifecycle_events' field".to_string(),
416                    ));
417                }
418            }
419            "complex_types" => {
420                if !json.is_object() {
421                    return Err(LargeFileError::ValidationError(
422                        "Complex types JSON must be an object".to_string(),
423                    ));
424                }
425
426                let obj = json.as_object().unwrap();
427                if !obj.contains_key("categorized_types") && !obj.contains_key("generic_types") {
428                    return Err(LargeFileError::ValidationError(
429                        "Complex types JSON must contain type-related fields".to_string(),
430                    ));
431                }
432            }
433            _ => {
434                // Basic validation for other file types
435                if !json.is_object() && !json.is_array() {
436                    return Err(LargeFileError::ValidationError(
437                        "JSON must be an object or array".to_string(),
438                    ));
439                }
440            }
441        }
442
443        Ok(())
444    }
445
446    /// Count the number of JSON objects processed
447    fn count_json_objects(&self, json: &Value) -> usize {
448        match json {
449            Value::Object(obj) => {
450                let mut count = 1; // The object itself
451
452                // Count objects in arrays that are likely to contain multiple items
453                for (key, value) in obj {
454                    match key.as_str() {
455                        "allocations" | "lifecycle_events" | "enhanced_ffi_data"
456                        | "boundary_events" | "categorized_types" | "generic_types" => {
457                            if let Value::Array(arr) = value {
458                                count += arr.len();
459                            }
460                        }
461                        _ => {}
462                    }
463                }
464
465                count
466            }
467            Value::Array(arr) => arr.len(),
468            _ => 1,
469        }
470    }
471
472    /// Get current memory usage statistics
473    pub fn get_memory_stats(&self) -> MemoryStats {
474        self.memory_monitor.get_stats()
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481    use std::fs;
482    use tempfile::TempDir;
483
484    #[test]
485    fn test_large_file_config_default() {
486        let config = LargeFileConfig::default();
487        assert_eq!(config.max_memory_bytes, 512 * 1024 * 1024);
488        assert_eq!(config.stream_chunk_size, 64 * 1024);
489        assert!(config.enable_memory_monitoring);
490        assert!(config.enable_progress_reporting);
491    }
492
493    #[test]
494    fn test_memory_monitor() {
495        let monitor = MemoryMonitor::new(1024, true);
496
497        // Test allocation
498        assert!(monitor.allocate(512).is_ok());
499        assert_eq!(monitor.get_stats().current_usage_bytes, 512);
500
501        // Test deallocation
502        monitor.deallocate(256);
503        assert_eq!(monitor.get_stats().current_usage_bytes, 256);
504
505        // Test memory limit
506        assert!(monitor.allocate(1024).is_err());
507    }
508
509    #[test]
510    fn test_process_small_file() {
511        let temp_dir = TempDir::new().unwrap();
512        let file_path = temp_dir.path().join("test.json");
513
514        let test_data =
515            r#"{"allocations": [{"ptr": "0x123", "size": 100}], "summary": {"total": 1}}"#;
516        fs::write(&file_path, test_data).unwrap();
517
518        let optimizer = LargeFileOptimizer::default();
519        let result = optimizer.process_file(&file_path, "memory_analysis");
520
521        assert!(result.is_ok());
522        let (json_value, stats) = result.unwrap();
523        assert!(json_value.is_object());
524        assert!(!stats.streaming_mode_used);
525        assert_eq!(stats.objects_processed, 2); // 1 object + 1 allocation
526    }
527
528    #[test]
529    fn test_json_validation() {
530        let optimizer = LargeFileOptimizer::default();
531
532        // Test valid memory analysis JSON
533        let valid_json = serde_json::json!({
534            "allocations": [],
535            "summary": {"total": 0}
536        });
537        assert!(optimizer
538            .validate_json_structure(&valid_json, "memory_analysis")
539            .is_ok());
540
541        // Test invalid memory analysis JSON
542        let invalid_json = serde_json::json!({
543            "invalid_field": "value"
544        });
545        assert!(optimizer
546            .validate_json_structure(&invalid_json, "memory_analysis")
547            .is_err());
548    }
549}