memscope_rs/cli/commands/html_from_json/
large_file_optimizer.rs1use serde_json::Value;
7use std::error::Error;
8use std::fmt;
9use std::fs::File;
10use std::io::{BufReader, Read};
11use std::path::Path;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::Instant;
15
16#[derive(Debug, Clone)]
18pub struct LargeFileConfig {
19 pub max_memory_bytes: usize,
21 pub stream_chunk_size: usize,
23 pub enable_memory_monitoring: bool,
25 pub enable_progress_reporting: bool,
27 pub max_file_size_bytes: usize,
29}
30
31impl Default for LargeFileConfig {
32 fn default() -> Self {
33 Self {
34 max_memory_bytes: 512 * 1024 * 1024, stream_chunk_size: 64 * 1024, enable_memory_monitoring: true,
37 enable_progress_reporting: true,
38 max_file_size_bytes: 2 * 1024 * 1024 * 1024, }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct MemoryStats {
46 pub current_usage_bytes: usize,
48 pub peak_usage_bytes: usize,
50 pub allocation_count: usize,
52 pub efficiency_ratio: f64,
54}
55
56#[derive(Debug)]
58pub struct ProcessingStats {
59 pub file_size_bytes: usize,
61 pub processing_time_ms: u64,
63 pub streaming_mode_used: bool,
65 pub memory_stats: MemoryStats,
67 pub throughput_mb_per_sec: f64,
69 pub objects_processed: usize,
71}
72
73#[derive(Debug)]
75pub enum LargeFileError {
76 FileTooLarge(usize, usize),
78 MemoryLimitExceeded(usize, usize),
80 StreamingParseError(String),
82 IoError(std::io::Error),
84 ValidationError(String),
86}
87
88impl fmt::Display for LargeFileError {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 match self {
91 LargeFileError::FileTooLarge(size, limit) => {
92 write!(
93 f,
94 "File size ({} bytes) exceeds limit ({} bytes)",
95 size, limit
96 )
97 }
98 LargeFileError::MemoryLimitExceeded(used, limit) => {
99 write!(
100 f,
101 "Memory usage ({} bytes) exceeds limit ({} bytes)",
102 used, limit
103 )
104 }
105 LargeFileError::StreamingParseError(msg) => {
106 write!(f, "Streaming parse error: {msg}")
107 }
108 LargeFileError::IoError(err) => {
109 write!(f, "IO error: {err}")
110 }
111 LargeFileError::ValidationError(msg) => {
112 write!(f, "Validation error: {msg}")
113 }
114 }
115 }
116}
117
118impl Error for LargeFileError {}
119
120pub struct MemoryMonitor {
122 current_usage: Arc<AtomicUsize>,
124 peak_usage: Arc<AtomicUsize>,
126 memory_limit: usize,
128 enabled: bool,
130}
131
132impl MemoryMonitor {
133 pub fn new(memory_limit: usize, enabled: bool) -> Self {
135 Self {
136 current_usage: Arc::new(AtomicUsize::new(0)),
137 peak_usage: Arc::new(AtomicUsize::new(0)),
138 memory_limit,
139 enabled,
140 }
141 }
142
143 pub fn allocate(&self, size: usize) -> Result<(), LargeFileError> {
145 if !self.enabled {
146 return Ok(());
147 }
148
149 let new_usage = self.current_usage.fetch_add(size, Ordering::Relaxed) + size;
150
151 let mut peak = self.peak_usage.load(Ordering::Relaxed);
153 while new_usage > peak {
154 match self.peak_usage.compare_exchange_weak(
155 peak,
156 new_usage,
157 Ordering::Relaxed,
158 Ordering::Relaxed,
159 ) {
160 Ok(_) => break,
161 Err(current_peak) => peak = current_peak,
162 }
163 }
164
165 if new_usage > self.memory_limit {
167 return Err(LargeFileError::MemoryLimitExceeded(
168 new_usage,
169 self.memory_limit,
170 ));
171 }
172
173 Ok(())
174 }
175
176 pub fn deallocate(&self, size: usize) {
178 if self.enabled {
179 self.current_usage.fetch_sub(size, Ordering::Relaxed);
180 }
181 }
182
183 pub fn get_stats(&self) -> MemoryStats {
185 let current = self.current_usage.load(Ordering::Relaxed);
186 let peak = self.peak_usage.load(Ordering::Relaxed);
187
188 MemoryStats {
189 current_usage_bytes: current,
190 peak_usage_bytes: peak,
191 allocation_count: 1, efficiency_ratio: if peak > 0 {
193 current as f64 / peak as f64
194 } else {
195 1.0
196 },
197 }
198 }
199}
200
201pub struct LargeFileOptimizer {
203 config: LargeFileConfig,
205 memory_monitor: MemoryMonitor,
207}
208
209impl LargeFileOptimizer {
210 pub fn new(config: LargeFileConfig) -> Self {
212 let memory_monitor =
213 MemoryMonitor::new(config.max_memory_bytes, config.enable_memory_monitoring);
214
215 Self {
216 config,
217 memory_monitor,
218 }
219 }
220
221 pub fn default() -> Self {
223 Self::new(LargeFileConfig::default())
224 }
225
226 pub fn process_file<P: AsRef<Path>>(
228 &self,
229 file_path: P,
230 file_type: &str,
231 ) -> Result<(Value, ProcessingStats), LargeFileError> {
232 let start_time = Instant::now();
233 let path = file_path.as_ref();
234
235 let file_size = std::fs::metadata(path)
237 .map_err(LargeFileError::IoError)?
238 .len() as usize;
239
240 if file_size > self.config.max_file_size_bytes {
241 return Err(LargeFileError::FileTooLarge(
242 file_size,
243 self.config.max_file_size_bytes,
244 ));
245 }
246
247 println!(
248 "🔧 Processing large file: {} ({:.1} MB)",
249 path.display(),
250 file_size as f64 / 1024.0 / 1024.0
251 );
252
253 let use_streaming = file_size > self.config.max_memory_bytes / 2;
255
256 let (json_value, objects_processed) = if use_streaming {
257 println!("📡 Using streaming mode for large file processing");
258 self.process_streaming(path, file_type)?
259 } else {
260 println!("💾 Using memory-optimized mode for file processing");
261 self.process_memory_optimized(path, file_type)?
262 };
263
264 let processing_time = start_time.elapsed().as_millis() as u64;
265 let throughput = if processing_time > 0 {
266 (file_size as f64 / 1024.0 / 1024.0) / (processing_time as f64 / 1000.0)
267 } else {
268 0.0
269 };
270
271 let stats = ProcessingStats {
272 file_size_bytes: file_size,
273 processing_time_ms: processing_time,
274 streaming_mode_used: use_streaming,
275 memory_stats: self.memory_monitor.get_stats(),
276 throughput_mb_per_sec: throughput,
277 objects_processed,
278 };
279
280 println!(
281 "✅ File processed: {:.1} MB/s, {} objects, {}ms",
282 throughput, objects_processed, processing_time
283 );
284
285 Ok((json_value, stats))
286 }
287
288 fn process_streaming<P: AsRef<Path>>(
290 &self,
291 file_path: P,
292 file_type: &str,
293 ) -> Result<(Value, usize), LargeFileError> {
294 let file = File::open(file_path).map_err(LargeFileError::IoError)?;
295 let mut reader = BufReader::with_capacity(self.config.stream_chunk_size, file);
296
297 self.memory_monitor
299 .allocate(self.config.stream_chunk_size)?;
300
301 let mut buffer = String::new();
303 reader
304 .read_to_string(&mut buffer)
305 .map_err(LargeFileError::IoError)?;
306
307 self.memory_monitor.allocate(buffer.len())?;
309
310 let json_value: Value = serde_json::from_str(&buffer)
312 .map_err(|e| LargeFileError::StreamingParseError(e.to_string()))?;
313
314 self.validate_json_structure(&json_value, file_type)?;
316
317 let objects_processed = self.count_json_objects(&json_value);
319
320 self.memory_monitor.deallocate(buffer.len());
322 self.memory_monitor
323 .deallocate(self.config.stream_chunk_size);
324
325 Ok((json_value, objects_processed))
326 }
327
328 fn process_memory_optimized<P: AsRef<Path>>(
330 &self,
331 file_path: P,
332 file_type: &str,
333 ) -> Result<(Value, usize), LargeFileError> {
334 let content = std::fs::read_to_string(file_path).map_err(LargeFileError::IoError)?;
336
337 self.memory_monitor.allocate(content.len())?;
338
339 let json_value: Value = serde_json::from_str(&content)
341 .map_err(|e| LargeFileError::StreamingParseError(e.to_string()))?;
342
343 self.validate_json_structure(&json_value, file_type)?;
345
346 let objects_processed = self.count_json_objects(&json_value);
348
349 self.memory_monitor.deallocate(content.len());
351
352 Ok((json_value, objects_processed))
353 }
354
355 fn validate_json_structure(&self, json: &Value, file_type: &str) -> Result<(), LargeFileError> {
357 match file_type {
358 "memory_analysis" => {
359 if !json.is_object() {
360 return Err(LargeFileError::ValidationError(
361 "Memory analysis JSON must be an object".to_string(),
362 ));
363 }
364
365 let obj = json.as_object().unwrap();
367 if !obj.contains_key("allocations") && !obj.contains_key("summary") {
368 return Err(LargeFileError::ValidationError(
369 "Memory analysis JSON must contain 'allocations' or 'summary' field"
370 .to_string(),
371 ));
372 }
373 }
374 "unsafe_ffi" => {
375 if !json.is_object() {
376 return Err(LargeFileError::ValidationError(
377 "Unsafe FFI JSON must be an object".to_string(),
378 ));
379 }
380
381 let obj = json.as_object().unwrap();
382 if !obj.contains_key("enhanced_ffi_data") && !obj.contains_key("summary") {
383 return Err(LargeFileError::ValidationError(
384 "Unsafe FFI JSON must contain 'enhanced_ffi_data' or 'summary' field"
385 .to_string(),
386 ));
387 }
388 }
389 "performance" => {
390 if !json.is_object() {
391 return Err(LargeFileError::ValidationError(
392 "Performance JSON must be an object".to_string(),
393 ));
394 }
395
396 let obj = json.as_object().unwrap();
397 if !obj.contains_key("memory_performance")
398 && !obj.contains_key("allocation_distribution")
399 {
400 return Err(LargeFileError::ValidationError(
401 "Performance JSON must contain performance-related fields".to_string(),
402 ));
403 }
404 }
405 "lifetime" => {
406 if !json.is_object() {
407 return Err(LargeFileError::ValidationError(
408 "Lifetime JSON must be an object".to_string(),
409 ));
410 }
411
412 let obj = json.as_object().unwrap();
413 if !obj.contains_key("lifecycle_events") {
414 return Err(LargeFileError::ValidationError(
415 "Lifetime JSON must contain 'lifecycle_events' field".to_string(),
416 ));
417 }
418 }
419 "complex_types" => {
420 if !json.is_object() {
421 return Err(LargeFileError::ValidationError(
422 "Complex types JSON must be an object".to_string(),
423 ));
424 }
425
426 let obj = json.as_object().unwrap();
427 if !obj.contains_key("categorized_types") && !obj.contains_key("generic_types") {
428 return Err(LargeFileError::ValidationError(
429 "Complex types JSON must contain type-related fields".to_string(),
430 ));
431 }
432 }
433 _ => {
434 if !json.is_object() && !json.is_array() {
436 return Err(LargeFileError::ValidationError(
437 "JSON must be an object or array".to_string(),
438 ));
439 }
440 }
441 }
442
443 Ok(())
444 }
445
446 fn count_json_objects(&self, json: &Value) -> usize {
448 match json {
449 Value::Object(obj) => {
450 let mut count = 1; for (key, value) in obj {
454 match key.as_str() {
455 "allocations" | "lifecycle_events" | "enhanced_ffi_data"
456 | "boundary_events" | "categorized_types" | "generic_types" => {
457 if let Value::Array(arr) = value {
458 count += arr.len();
459 }
460 }
461 _ => {}
462 }
463 }
464
465 count
466 }
467 Value::Array(arr) => arr.len(),
468 _ => 1,
469 }
470 }
471
472 pub fn get_memory_stats(&self) -> MemoryStats {
474 self.memory_monitor.get_stats()
475 }
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481 use std::fs;
482 use tempfile::TempDir;
483
484 #[test]
485 fn test_large_file_config_default() {
486 let config = LargeFileConfig::default();
487 assert_eq!(config.max_memory_bytes, 512 * 1024 * 1024);
488 assert_eq!(config.stream_chunk_size, 64 * 1024);
489 assert!(config.enable_memory_monitoring);
490 assert!(config.enable_progress_reporting);
491 }
492
493 #[test]
494 fn test_memory_monitor() {
495 let monitor = MemoryMonitor::new(1024, true);
496
497 assert!(monitor.allocate(512).is_ok());
499 assert_eq!(monitor.get_stats().current_usage_bytes, 512);
500
501 monitor.deallocate(256);
503 assert_eq!(monitor.get_stats().current_usage_bytes, 256);
504
505 assert!(monitor.allocate(1024).is_err());
507 }
508
509 #[test]
510 fn test_process_small_file() {
511 let temp_dir = TempDir::new().unwrap();
512 let file_path = temp_dir.path().join("test.json");
513
514 let test_data =
515 r#"{"allocations": [{"ptr": "0x123", "size": 100}], "summary": {"total": 1}}"#;
516 fs::write(&file_path, test_data).unwrap();
517
518 let optimizer = LargeFileOptimizer::default();
519 let result = optimizer.process_file(&file_path, "memory_analysis");
520
521 assert!(result.is_ok());
522 let (json_value, stats) = result.unwrap();
523 assert!(json_value.is_object());
524 assert!(!stats.streaming_mode_used);
525 assert_eq!(stats.objects_processed, 2); }
527
528 #[test]
529 fn test_json_validation() {
530 let optimizer = LargeFileOptimizer::default();
531
532 let valid_json = serde_json::json!({
534 "allocations": [],
535 "summary": {"total": 0}
536 });
537 assert!(optimizer
538 .validate_json_structure(&valid_json, "memory_analysis")
539 .is_ok());
540
541 let invalid_json = serde_json::json!({
543 "invalid_field": "value"
544 });
545 assert!(optimizer
546 .validate_json_structure(&invalid_json, "memory_analysis")
547 .is_err());
548 }
549}