memscope_rs/cli/commands/html_from_json/
large_file_optimizer.rs1use serde_json::Value;
7use std::error::Error;
8use std::fmt;
9use std::fs::File;
10use std::io::BufReader;
11use std::path::Path;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::Instant;
15
16#[derive(Debug, Clone)]
18pub struct LargeFileConfig {
19 pub max_memory_bytes: usize,
21 pub stream_chunk_size: usize,
23 pub enable_memory_monitoring: bool,
25 pub enable_progress_reporting: bool,
27 pub max_file_size_bytes: usize,
29}
30
31impl Default for LargeFileConfig {
32 fn default() -> Self {
33 Self {
34 max_memory_bytes: 512 * 1024 * 1024, stream_chunk_size: 64 * 1024, enable_memory_monitoring: true,
37 enable_progress_reporting: true,
38 max_file_size_bytes: 2 * 1024 * 1024 * 1024, }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct MemoryStats {
46 pub current_usage_bytes: usize,
48 pub peak_usage_bytes: usize,
50 pub allocation_count: usize,
52 pub efficiency_ratio: f64,
54}
55
56#[derive(Debug)]
58pub struct ProcessingStats {
59 pub file_size_bytes: usize,
61 pub processing_time_ms: u64,
63 pub streaming_mode_used: bool,
65 pub memory_stats: MemoryStats,
67 pub throughput_mb_per_sec: f64,
69 pub objects_processed: usize,
71}
72
73#[derive(Debug)]
75pub enum LargeFileError {
76 FileTooLarge(usize, usize),
78 MemoryLimitExceeded(usize, usize),
80 StreamingParseError(String),
82 IoError(std::io::Error),
84 ValidationError(String),
86}
87
88impl fmt::Display for LargeFileError {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 match self {
91 LargeFileError::FileTooLarge(size, limit) => {
92 write!(f, "File size ({size} bytes) exceeds limit ({limit} bytes)",)
93 }
94 LargeFileError::MemoryLimitExceeded(used, limit) => {
95 write!(
96 f,
97 "Memory usage ({used} bytes) exceeds limit ({limit} bytes)",
98 )
99 }
100 LargeFileError::StreamingParseError(msg) => {
101 write!(f, "Streaming parse error: {msg}")
102 }
103 LargeFileError::IoError(err) => {
104 write!(f, "IO error: {err}")
105 }
106 LargeFileError::ValidationError(msg) => {
107 write!(f, "Validation error: {msg}")
108 }
109 }
110 }
111}
112
113impl Error for LargeFileError {}
114
115pub struct MemoryMonitor {
117 current_usage: Arc<AtomicUsize>,
119 peak_usage: Arc<AtomicUsize>,
121 memory_limit: usize,
123 enabled: bool,
125}
126
127impl MemoryMonitor {
128 pub fn new(memory_limit: usize, enabled: bool) -> Self {
130 Self {
131 current_usage: Arc::new(AtomicUsize::new(0)),
132 peak_usage: Arc::new(AtomicUsize::new(0)),
133 memory_limit,
134 enabled,
135 }
136 }
137
138 pub fn allocate(&self, size: usize) -> Result<(), LargeFileError> {
140 if !self.enabled {
141 return Ok(());
142 }
143
144 let new_usage = self.current_usage.fetch_add(size, Ordering::Relaxed) + size;
145
146 let mut peak = self.peak_usage.load(Ordering::Relaxed);
148 while new_usage > peak {
149 match self.peak_usage.compare_exchange_weak(
150 peak,
151 new_usage,
152 Ordering::Relaxed,
153 Ordering::Relaxed,
154 ) {
155 Ok(_) => break,
156 Err(current_peak) => peak = current_peak,
157 }
158 }
159
160 if new_usage > self.memory_limit {
162 return Err(LargeFileError::MemoryLimitExceeded(
163 new_usage,
164 self.memory_limit,
165 ));
166 }
167
168 Ok(())
169 }
170
171 pub fn deallocate(&self, size: usize) {
173 if self.enabled {
174 self.current_usage.fetch_sub(size, Ordering::Relaxed);
175 }
176 }
177
178 pub fn get_stats(&self) -> MemoryStats {
180 let current = self.current_usage.load(Ordering::Relaxed);
181 let peak = self.peak_usage.load(Ordering::Relaxed);
182
183 MemoryStats {
184 current_usage_bytes: current,
185 peak_usage_bytes: peak,
186 allocation_count: 1, efficiency_ratio: if peak > 0 {
188 current as f64 / peak as f64
189 } else {
190 1.0
191 },
192 }
193 }
194}
195
196pub struct LargeFileOptimizer {
198 config: LargeFileConfig,
200 memory_monitor: MemoryMonitor,
202}
203
204impl LargeFileOptimizer {
205 pub fn new(config: LargeFileConfig) -> Self {
207 let memory_monitor =
208 MemoryMonitor::new(config.max_memory_bytes, config.enable_memory_monitoring);
209
210 Self {
211 config,
212 memory_monitor,
213 }
214 }
215
216 pub fn new_default() -> Self {
218 Self::new(LargeFileConfig::default())
219 }
220
221 pub fn process_file<P: AsRef<Path>>(
223 &self,
224 file_path: P,
225 file_type: &str,
226 ) -> Result<(Value, ProcessingStats), LargeFileError> {
227 let start_time = Instant::now();
228 let path = file_path.as_ref();
229
230 let file_size = std::fs::metadata(path)
232 .map_err(LargeFileError::IoError)?
233 .len() as usize;
234
235 if file_size > self.config.max_file_size_bytes {
236 return Err(LargeFileError::FileTooLarge(
237 file_size,
238 self.config.max_file_size_bytes,
239 ));
240 }
241
242 tracing::info!(
243 "🔧 Processing large file: {} ({:.1} MB)",
244 path.display(),
245 file_size as f64 / 1024.0 / 1024.0
246 );
247
248 let use_streaming = file_size > self.config.max_memory_bytes / 2;
250
251 let (json_value, objects_processed) = if use_streaming {
252 tracing::info!("📡 Using streaming mode for large file processing");
253 self.process_streaming(path, file_type)?
254 } else {
255 tracing::info!("💾 Using memory-optimized mode for file processing");
256 self.process_memory_optimized(path, file_type)?
257 };
258
259 let processing_time = start_time.elapsed().as_millis() as u64;
260 let throughput = if processing_time > 0 {
261 (file_size as f64 / 1024.0 / 1024.0) / (processing_time as f64 / 1000.0)
262 } else {
263 0.0
264 };
265
266 let stats = ProcessingStats {
267 file_size_bytes: file_size,
268 processing_time_ms: processing_time,
269 streaming_mode_used: use_streaming,
270 memory_stats: self.memory_monitor.get_stats(),
271 throughput_mb_per_sec: throughput,
272 objects_processed,
273 };
274
275 tracing::info!(
276 "✅ File processed: {:.1} MB/s, {} objects, {}ms",
277 throughput,
278 objects_processed,
279 processing_time
280 );
281
282 Ok((json_value, stats))
283 }
284
285 fn process_streaming<P: AsRef<Path>>(
287 &self,
288 file_path: P,
289 file_type: &str,
290 ) -> Result<(Value, usize), LargeFileError> {
291 let file = File::open(file_path).map_err(LargeFileError::IoError)?;
292 let reader = BufReader::with_capacity(self.config.stream_chunk_size, file);
293
294 self.memory_monitor
296 .allocate(self.config.stream_chunk_size)?;
297
298 let json_value: Value = serde_json::from_reader(reader)
303 .map_err(|e| LargeFileError::StreamingParseError(e.to_string()))?;
304
305 self.validate_json_structure(&json_value, file_type)?;
307
308 let objects_processed = self.count_json_objects(&json_value);
310
311 self.memory_monitor
313 .deallocate(self.config.stream_chunk_size);
314
315 Ok((json_value, objects_processed))
316 }
317
318 fn process_memory_optimized<P: AsRef<Path>>(
320 &self,
321 file_path: P,
322 file_type: &str,
323 ) -> Result<(Value, usize), LargeFileError> {
324 let file = File::open(file_path).map_err(LargeFileError::IoError)?;
326 let reader = BufReader::new(file);
327
328 self.memory_monitor.allocate(8192)?; let json_value: Value = serde_json::from_reader(reader)
333 .map_err(|e| LargeFileError::StreamingParseError(e.to_string()))?;
334
335 self.validate_json_structure(&json_value, file_type)?;
337
338 let objects_processed = self.count_json_objects(&json_value);
340
341 self.memory_monitor.deallocate(8192);
343
344 Ok((json_value, objects_processed))
345 }
346
347 fn validate_json_structure(&self, json: &Value, file_type: &str) -> Result<(), LargeFileError> {
349 match file_type {
350 "memory_analysis" => {
351 if !json.is_object() {
352 return Err(LargeFileError::ValidationError(
353 "Memory analysis JSON must be an object".to_string(),
354 ));
355 }
356
357 let obj = json.as_object().expect("Test operation failed");
359 if !obj.contains_key("allocations") && !obj.contains_key("summary") {
360 return Err(LargeFileError::ValidationError(
361 "Memory analysis JSON must contain 'allocations' or 'summary' field"
362 .to_string(),
363 ));
364 }
365 }
366 "unsafe_ffi" => {
367 if !json.is_object() {
368 return Err(LargeFileError::ValidationError(
369 "Unsafe FFI JSON must be an object".to_string(),
370 ));
371 }
372
373 let obj = json.as_object().expect("Test operation failed");
374 if !obj.contains_key("enhanced_ffi_data") && !obj.contains_key("summary") {
375 return Err(LargeFileError::ValidationError(
376 "Unsafe FFI JSON must contain 'enhanced_ffi_data' or 'summary' field"
377 .to_string(),
378 ));
379 }
380 }
381 "performance" => {
382 if !json.is_object() {
383 return Err(LargeFileError::ValidationError(
384 "Performance JSON must be an object".to_string(),
385 ));
386 }
387
388 let obj = json.as_object().expect("Test operation failed");
389 if !obj.contains_key("memory_performance")
390 && !obj.contains_key("allocation_distribution")
391 {
392 return Err(LargeFileError::ValidationError(
393 "Performance JSON must contain performance-related fields".to_string(),
394 ));
395 }
396 }
397 "lifetime" => {
398 if !json.is_object() {
399 return Err(LargeFileError::ValidationError(
400 "Lifetime JSON must be an object".to_string(),
401 ));
402 }
403
404 let obj = json.as_object().expect("Test operation failed");
405 if !obj.contains_key("lifecycle_events") {
406 return Err(LargeFileError::ValidationError(
407 "Lifetime JSON must contain 'lifecycle_events' field".to_string(),
408 ));
409 }
410 }
411 "complex_types" => {
412 if !json.is_object() {
413 return Err(LargeFileError::ValidationError(
414 "Complex types JSON must be an object".to_string(),
415 ));
416 }
417
418 let obj = json.as_object().expect("Test operation failed");
419 if !obj.contains_key("categorized_types") && !obj.contains_key("generic_types") {
420 return Err(LargeFileError::ValidationError(
421 "Complex types JSON must contain type-related fields".to_string(),
422 ));
423 }
424 }
425 _ => {
426 if !json.is_object() && !json.is_array() {
428 return Err(LargeFileError::ValidationError(
429 "JSON must be an object or array".to_string(),
430 ));
431 }
432 }
433 }
434
435 Ok(())
436 }
437
438 fn count_json_objects(&self, json: &Value) -> usize {
440 match json {
441 Value::Object(obj) => {
442 let mut count = 1; for (key, value) in obj {
446 match key.as_str() {
447 "allocations" | "lifecycle_events" | "enhanced_ffi_data"
448 | "boundary_events" | "categorized_types" | "generic_types" => {
449 if let Value::Array(arr) = value {
450 count += arr.len();
451 }
452 }
453 _ => {}
454 }
455 }
456
457 count
458 }
459 Value::Array(arr) => arr.len(),
460 _ => 1,
461 }
462 }
463
464 pub fn get_memory_stats(&self) -> MemoryStats {
466 self.memory_monitor.get_stats()
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473 use std::fs;
474 use tempfile::TempDir;
475
476 #[test]
477 fn test_large_file_config_default() {
478 let config = LargeFileConfig::default();
479 assert_eq!(config.max_memory_bytes, 512 * 1024 * 1024);
480 assert_eq!(config.stream_chunk_size, 64 * 1024);
481 assert!(config.enable_memory_monitoring);
482 assert!(config.enable_progress_reporting);
483 }
484
485 #[test]
486 fn test_memory_monitor() {
487 let monitor = MemoryMonitor::new(1024, true);
488
489 assert!(monitor.allocate(512).is_ok());
491 assert_eq!(monitor.get_stats().current_usage_bytes, 512);
492
493 monitor.deallocate(256);
495 assert_eq!(monitor.get_stats().current_usage_bytes, 256);
496
497 assert!(monitor.allocate(1024).is_err());
499 }
500
501 #[test]
502 fn test_process_small_file() {
503 let temp_dir = TempDir::new().expect("Failed to get test value");
504 let file_path = temp_dir.path().join("test.json");
505
506 let test_data =
507 r#"{"allocations": [{"ptr": "0x123", "size": 100}], "summary": {"total": 1}}"#;
508 fs::write(&file_path, test_data).expect("Failed to write test file");
509
510 let optimizer = LargeFileOptimizer::new_default();
511 let result = optimizer.process_file(&file_path, "memory_analysis");
512
513 assert!(result.is_ok());
514 let (json_value, stats) = result.expect("Test operation failed");
515 assert!(json_value.is_object());
516 assert!(!stats.streaming_mode_used);
517 assert_eq!(stats.objects_processed, 2); }
519
520 #[test]
521 fn test_json_validation() {
522 let optimizer = LargeFileOptimizer::new_default();
523
524 let valid_json = serde_json::json!({
526 "allocations": [],
527 "summary": {"total": 0}
528 });
529 assert!(optimizer
530 .validate_json_structure(&valid_json, "memory_analysis")
531 .is_ok());
532
533 let invalid_json = serde_json::json!({
535 "invalid_field": "value"
536 });
537 assert!(optimizer
538 .validate_json_structure(&invalid_json, "memory_analysis")
539 .is_err());
540 }
541}