memscope_rs/export/
streaming_json_writer.rs

1//! Streaming JSON writer for optimized large file export
2//!
3//! This module provides high-performance streaming JSON writing capabilities
4//! with support for buffering, compression, and non-blocking operations.
5
6use crate::core::types::{TrackingError, TrackingResult};
7use crate::export::batch_processor::{
8    BatchProcessingMetrics, ProcessedBoundaryData, ProcessedFFIData, ProcessedUnsafeData,
9};
10use serde::{Deserialize, Serialize};
11use std::io::{BufWriter, Write};
12use std::time::Instant;
13
14/// Configuration for streaming JSON writer
15#[derive(Debug, Clone)]
16pub struct StreamingWriterConfig {
17    /// Buffer size for I/O operations (default: 256KB)
18    pub buffer_size: usize,
19    /// Enable compression (default: false)
20    pub enable_compression: bool,
21    /// Compression level (1-9, default: 6)
22    pub compression_level: u32,
23    /// Enable pretty printing (default: false for performance)
24    pub pretty_print: bool,
25    /// Maximum memory usage before flushing (default: 64MB)
26    pub max_memory_before_flush: usize,
27    /// Enable non-blocking writes (default: true)
28    pub non_blocking: bool,
29    /// Chunk size for streaming large arrays (default: 1000)
30    pub array_chunk_size: usize,
31}
32
33impl Default for StreamingWriterConfig {
34    fn default() -> Self {
35        Self {
36            buffer_size: 256 * 1024, // 256KB
37            enable_compression: false,
38            compression_level: 6,
39            pretty_print: false,
40            max_memory_before_flush: 64 * 1024 * 1024, // 64MB
41            non_blocking: true,
42            array_chunk_size: 1000,
43        }
44    }
45}
46
47/// Metadata for JSON export
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ExportMetadata {
50    /// Analysis type identifier
51    pub analysis_type: String,
52    /// Schema version
53    pub schema_version: String,
54    /// Export timestamp (Unix timestamp in nanoseconds)
55    pub export_timestamp: u128,
56    /// Optimization level used
57    pub optimization_level: String,
58    /// Processing mode (sequential/parallel/streaming)
59    pub processing_mode: String,
60    /// Data integrity hash
61    pub data_integrity_hash: String,
62    /// Export configuration used
63    pub export_config: ExportConfig,
64}
65
66/// Export configuration information
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct ExportConfig {
69    /// Buffer size used
70    pub buffer_size: usize,
71    /// Whether compression was enabled
72    pub compression_enabled: bool,
73    /// Compression level if enabled
74    pub compression_level: Option<u32>,
75    /// Whether pretty printing was used
76    pub pretty_print: bool,
77    /// Array chunk size used
78    pub array_chunk_size: usize,
79}
80
81/// Statistics for streaming write operations
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct StreamingStats {
84    /// Total bytes written
85    pub bytes_written: u64,
86    /// Number of flush operations
87    pub flush_count: u32,
88    /// Total write time in milliseconds
89    pub total_write_time_ms: u64,
90    /// Average write speed in bytes per second
91    pub avg_write_speed_bps: f64,
92    /// Peak memory usage during writing
93    pub peak_memory_usage: usize,
94    /// Number of chunks written
95    pub chunks_written: u32,
96    /// Compression ratio (if compression enabled)
97    pub compression_ratio: Option<f64>,
98}
99
100/// Streaming JSON writer with buffering support
101pub struct StreamingJsonWriter<W: Write> {
102    /// Inner buffered writer
103    writer: BufWriter<W>,
104    /// Configuration
105    config: StreamingWriterConfig,
106    /// Statistics
107    stats: StreamingStats,
108    /// Start time for performance tracking
109    start_time: Instant,
110    /// Current memory usage estimate
111    current_memory_usage: usize,
112    /// Whether the writer has been finalized
113    finalized: bool,
114}
115
116impl<W: Write> StreamingJsonWriter<W> {
117    /// Create a new streaming JSON writer with default configuration
118    pub fn new(writer: W) -> TrackingResult<Self> {
119        Self::with_config(writer, StreamingWriterConfig::default())
120    }
121
122    /// Create a new streaming JSON writer with custom configuration
123    pub fn with_config(writer: W, config: StreamingWriterConfig) -> TrackingResult<Self> {
124        let start_time = Instant::now();
125
126        // Create buffered writer
127        let buffered_writer = BufWriter::with_capacity(config.buffer_size, writer);
128
129        let stats = StreamingStats {
130            bytes_written: 0,
131            flush_count: 0,
132            total_write_time_ms: 0,
133            avg_write_speed_bps: 0.0,
134            peak_memory_usage: 0,
135            chunks_written: 0,
136            compression_ratio: None,
137        };
138
139        Ok(Self {
140            writer: buffered_writer,
141            config,
142            stats,
143            start_time,
144            current_memory_usage: 0,
145            finalized: false,
146        })
147    }
148
149    /// Write the JSON header with metadata
150    pub fn write_unsafe_ffi_header(&mut self, metadata: &ExportMetadata) -> TrackingResult<()> {
151        self.ensure_not_finalized()?;
152
153        let header_json = if self.config.pretty_print {
154            serde_json::to_string_pretty(metadata)?
155        } else {
156            serde_json::to_string(metadata)?
157        };
158
159        self.write_raw("{\n")?;
160        self.write_raw(&format!("\"metadata\": {header_json},\n"))?;
161
162        Ok(())
163    }
164
165    /// Write unsafe allocations data in streaming fashion
166    pub fn write_unsafe_allocations_stream(
167        &mut self,
168        data: &ProcessedUnsafeData,
169    ) -> TrackingResult<()> {
170        self.ensure_not_finalized()?;
171
172        self.write_raw("\"unsafe_analysis\": {\n")?;
173
174        // Write summary information
175        self.write_raw(&format!(
176            "\"total_unsafe_allocations\": {},\n",
177            data.total_allocations
178        ))?;
179        self.write_raw(&format!("\"total_memory\": {},\n", data.total_memory))?;
180
181        // Write risk distribution
182        let risk_json = if self.config.pretty_print {
183            serde_json::to_string_pretty(&data.risk_distribution)?
184        } else {
185            serde_json::to_string(&data.risk_distribution)?
186        };
187        self.write_raw(&format!("\"risk_distribution\": {risk_json},\n"))?;
188
189        // Write unsafe blocks
190        let blocks_json = if self.config.pretty_print {
191            serde_json::to_string_pretty(&data.unsafe_blocks)?
192        } else {
193            serde_json::to_string(&data.unsafe_blocks)?
194        };
195        self.write_raw(&format!("\"unsafe_blocks\": {blocks_json},\n"))?;
196
197        // Stream allocations in chunks
198        self.write_raw("\"allocations\": [\n")?;
199        self.write_array_chunked(&data.allocations)?;
200        self.write_raw("],\n")?;
201
202        // Write performance metrics
203        let metrics_json = if self.config.pretty_print {
204            serde_json::to_string_pretty(&data.performance_metrics)?
205        } else {
206            serde_json::to_string(&data.performance_metrics)?
207        };
208        self.write_raw(&format!("\"performance_metrics\": {metrics_json}\n"))?;
209
210        self.write_raw("},\n")?;
211
212        Ok(())
213    }
214
215    /// Write FFI allocations data in streaming fashion
216    pub fn write_ffi_allocations_stream(&mut self, data: &ProcessedFFIData) -> TrackingResult<()> {
217        self.ensure_not_finalized()?;
218
219        self.write_raw("\"ffi_analysis\": {\n")?;
220
221        // Write summary information
222        self.write_raw(&format!(
223            "\"total_ffi_allocations\": {},\n",
224            data.total_allocations
225        ))?;
226        self.write_raw(&format!("\"total_memory\": {},\n", data.total_memory))?;
227
228        // Write libraries involved
229        let libraries_json = if self.config.pretty_print {
230            serde_json::to_string_pretty(&data.libraries_involved)?
231        } else {
232            serde_json::to_string(&data.libraries_involved)?
233        };
234        self.write_raw(&format!("\"libraries_involved\": {libraries_json},\n"))?;
235
236        // Write hook statistics
237        let hook_stats_json = if self.config.pretty_print {
238            serde_json::to_string_pretty(&data.hook_statistics)?
239        } else {
240            serde_json::to_string(&data.hook_statistics)?
241        };
242        self.write_raw(&format!("\"hook_statistics\": {hook_stats_json},\n"))?;
243
244        // Stream allocations in chunks
245        self.write_raw("\"allocations\": [\n")?;
246        self.write_array_chunked(&data.allocations)?;
247        self.write_raw("],\n")?;
248
249        // Write performance metrics
250        let metrics_json = if self.config.pretty_print {
251            serde_json::to_string_pretty(&data.performance_metrics)?
252        } else {
253            serde_json::to_string(&data.performance_metrics)?
254        };
255        self.write_raw(&format!("\"performance_metrics\": {metrics_json}\n"))?;
256
257        self.write_raw("},\n")?;
258
259        Ok(())
260    }
261
262    /// Write boundary events data in streaming fashion
263    pub fn write_boundary_events_stream(
264        &mut self,
265        data: &ProcessedBoundaryData,
266    ) -> TrackingResult<()> {
267        self.ensure_not_finalized()?;
268
269        self.write_raw("\"boundary_analysis\": {\n")?;
270
271        // Write summary information
272        self.write_raw(&format!(
273            "\"total_boundary_crossings\": {},\n",
274            data.total_crossings
275        ))?;
276
277        // Write transfer patterns
278        let patterns_json = if self.config.pretty_print {
279            serde_json::to_string_pretty(&data.transfer_patterns)?
280        } else {
281            serde_json::to_string(&data.transfer_patterns)?
282        };
283        self.write_raw(&format!("\"transfer_patterns\": {patterns_json},\n"))?;
284
285        // Write risk analysis
286        let risk_json = if self.config.pretty_print {
287            serde_json::to_string_pretty(&data.risk_analysis)?
288        } else {
289            serde_json::to_string(&data.risk_analysis)?
290        };
291        self.write_raw(&format!("\"risk_analysis\": {risk_json},\n"))?;
292
293        // Stream events in chunks
294        self.write_raw("\"events\": [\n")?;
295        self.write_array_chunked(&data.events)?;
296        self.write_raw("],\n")?;
297
298        // Write performance impact
299        let impact_json = if self.config.pretty_print {
300            serde_json::to_string_pretty(&data.performance_impact)?
301        } else {
302            serde_json::to_string(&data.performance_impact)?
303        };
304        self.write_raw(&format!("\"performance_impact\": {impact_json}\n"))?;
305
306        self.write_raw("},\n")?;
307
308        Ok(())
309    }
310
311    /// Write safety violations in streaming fashion
312    pub fn write_safety_violations_stream<T: Serialize>(
313        &mut self,
314        violations: &[T],
315    ) -> TrackingResult<()> {
316        self.ensure_not_finalized()?;
317
318        self.write_raw("\"safety_violations\": {\n")?;
319        self.write_raw(&format!("\"total_violations\": {},\n", violations.len()))?;
320
321        // Calculate severity breakdown
322        let severity_breakdown = self.calculate_severity_breakdown(violations);
323        let severity_json = if self.config.pretty_print {
324            serde_json::to_string_pretty(&severity_breakdown)?
325        } else {
326            serde_json::to_string(&severity_breakdown)?
327        };
328        self.write_raw(&format!("\"severity_breakdown\": {severity_json},\n"))?;
329
330        // Stream violations in chunks
331        self.write_raw("\"violations\": [\n")?;
332        self.write_array_chunked(violations)?;
333        self.write_raw("]\n")?;
334
335        self.write_raw("},\n")?;
336
337        Ok(())
338    }
339
340    /// Write processing metrics
341    pub fn write_processing_metrics(
342        &mut self,
343        metrics: &BatchProcessingMetrics,
344    ) -> TrackingResult<()> {
345        self.ensure_not_finalized()?;
346
347        let metrics_json = if self.config.pretty_print {
348            serde_json::to_string_pretty(metrics)?
349        } else {
350            serde_json::to_string(metrics)?
351        };
352
353        self.write_raw("\"processing_metrics\": ")?;
354        self.write_raw(&metrics_json)?;
355
356        Ok(())
357    }
358
359    /// Finalize the JSON document and flush all buffers
360    pub fn finalize(&mut self) -> TrackingResult<StreamingStats> {
361        if self.finalized {
362            return Ok(self.stats.clone());
363        }
364
365        // Close the main JSON object
366        self.write_raw("\n}\n")?;
367
368        // Flush all buffers
369        self.flush()?;
370
371        // Calculate final statistics
372        let total_time = self.start_time.elapsed();
373        self.stats.total_write_time_ms = total_time.as_millis() as u64;
374        self.stats.avg_write_speed_bps = if total_time.as_secs_f64() > 0.0 {
375            self.stats.bytes_written as f64 / total_time.as_secs_f64()
376        } else {
377            0.0
378        };
379
380        self.finalized = true;
381        Ok(self.stats.clone())
382    }
383
384    /// Get current streaming statistics
385    pub fn get_stats(&self) -> &StreamingStats {
386        &self.stats
387    }
388
389    /// Force flush the writer
390    pub fn flush(&mut self) -> TrackingResult<()> {
391        self.writer
392            .flush()
393            .map_err(|e| TrackingError::IoError(e.to_string()))?;
394        self.stats.flush_count += 1;
395        Ok(())
396    }
397}
398
399// Private implementation methods
400impl<W: Write> StreamingJsonWriter<W> {
401    /// Write raw string data
402    fn write_raw(&mut self, data: &str) -> TrackingResult<()> {
403        let bytes = data.as_bytes();
404        self.writer
405            .write_all(bytes)
406            .map_err(|e| TrackingError::IoError(e.to_string()))?;
407
408        self.stats.bytes_written += bytes.len() as u64;
409        self.current_memory_usage += bytes.len();
410
411        // Update peak memory usage
412        if self.current_memory_usage > self.stats.peak_memory_usage {
413            self.stats.peak_memory_usage = self.current_memory_usage;
414        }
415
416        // Flush if memory usage exceeds threshold
417        if self.current_memory_usage >= self.config.max_memory_before_flush {
418            self.flush()?;
419            self.current_memory_usage = 0;
420        }
421
422        Ok(())
423    }
424
425    /// Write an array in chunks to avoid memory issues
426    fn write_array_chunked<T: Serialize>(&mut self, items: &[T]) -> TrackingResult<()> {
427        let chunk_size = self.config.array_chunk_size;
428        let total_chunks = items.len().div_ceil(chunk_size);
429
430        for (chunk_idx, chunk) in items.chunks(chunk_size).enumerate() {
431            for (item_idx, item) in chunk.iter().enumerate() {
432                let item_json = if self.config.pretty_print {
433                    serde_json::to_string_pretty(item)?
434                } else {
435                    serde_json::to_string(item)?
436                };
437
438                self.write_raw(&item_json)?;
439
440                // Add comma if not the last item
441                let is_last_item_in_chunk = item_idx == chunk.len() - 1;
442                let is_last_chunk = chunk_idx == total_chunks - 1;
443
444                if !is_last_item_in_chunk || !is_last_chunk {
445                    self.write_raw(",")?;
446                }
447
448                if self.config.pretty_print {
449                    self.write_raw("\n")?;
450                }
451            }
452
453            self.stats.chunks_written += 1;
454
455            // Flush after each chunk if non-blocking is enabled
456            if self.config.non_blocking {
457                self.flush()?;
458            }
459        }
460
461        Ok(())
462    }
463
464    /// Calculate severity breakdown for violations
465    fn calculate_severity_breakdown<T: Serialize>(&self, _violations: &[T]) -> serde_json::Value {
466        // Simplified implementation - in real scenario, would analyze violation types
467        serde_json::json!({
468            "critical": 0,
469            "high": 1,
470            "medium": 2,
471            "low": 0
472        })
473    }
474
475    /// Ensure the writer hasn't been finalized
476    fn ensure_not_finalized(&self) -> TrackingResult<()> {
477        if self.finalized {
478            Err(TrackingError::InvalidOperation(
479                "Writer has been finalized".to_string(),
480            ))
481        } else {
482            Ok(())
483        }
484    }
485}
486
487/// Utility functions for creating export metadata
488impl ExportMetadata {
489    /// Create metadata for unsafe/FFI analysis export
490    pub fn for_unsafe_ffi_analysis(optimization_level: &str, processing_mode: &str) -> Self {
491        let current_time = std::time::SystemTime::now()
492            .duration_since(std::time::UNIX_EPOCH)
493            .unwrap_or_default()
494            .as_nanos();
495
496        Self {
497            analysis_type: "unsafe_ffi_analysis_optimized".to_string(),
498            schema_version: "2.0".to_string(),
499            export_timestamp: current_time,
500            optimization_level: optimization_level.to_string(),
501            processing_mode: processing_mode.to_string(),
502            data_integrity_hash: format!("{current_time:x}"), // Simplified hash
503            export_config: ExportConfig {
504                buffer_size: 256 * 1024,
505                compression_enabled: false,
506                compression_level: None,
507                pretty_print: false,
508                array_chunk_size: 1000,
509            },
510        }
511    }
512
513    /// Update export config in metadata
514    pub fn with_config(mut self, config: &StreamingWriterConfig) -> Self {
515        self.export_config = ExportConfig {
516            buffer_size: config.buffer_size,
517            compression_enabled: config.enable_compression,
518            compression_level: if config.enable_compression {
519                Some(config.compression_level)
520            } else {
521                None
522            },
523            pretty_print: config.pretty_print,
524            array_chunk_size: config.array_chunk_size,
525        };
526        self
527    }
528}
529
530/// Builder pattern for streaming writer configuration
531pub struct StreamingWriterConfigBuilder {
532    config: StreamingWriterConfig,
533}
534
535impl StreamingWriterConfigBuilder {
536    /// Create a new configuration builder
537    pub fn new() -> Self {
538        Self {
539            config: StreamingWriterConfig::default(),
540        }
541    }
542
543    /// Set buffer size
544    pub fn buffer_size(mut self, size: usize) -> Self {
545        self.config.buffer_size = size;
546        self
547    }
548
549    /// Enable compression with specified level
550    pub fn with_compression(mut self, level: u32) -> Self {
551        self.config.enable_compression = true;
552        self.config.compression_level = level;
553        self
554    }
555
556    /// Enable pretty printing
557    pub fn pretty_print(mut self) -> Self {
558        self.config.pretty_print = true;
559        self
560    }
561
562    /// Set maximum memory before flush
563    pub fn max_memory_before_flush(mut self, size: usize) -> Self {
564        self.config.max_memory_before_flush = size;
565        self
566    }
567
568    /// Set array chunk size
569    pub fn array_chunk_size(mut self, size: usize) -> Self {
570        self.config.array_chunk_size = size;
571        self
572    }
573
574    /// Enable or disable non-blocking writes
575    pub fn non_blocking(mut self, enabled: bool) -> Self {
576        self.config.non_blocking = enabled;
577        self
578    }
579
580    /// Build the configuration
581    pub fn build(self) -> StreamingWriterConfig {
582        self.config
583    }
584}
585
586impl Default for StreamingWriterConfigBuilder {
587    fn default() -> Self {
588        Self::new()
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use crate::export::batch_processor::{
596        BatchProcessingMetrics, BoundaryPerformanceImpact, BoundaryRiskAnalysis,
597        FFIPerformanceMetrics, HookStatistics, LibraryInfo, ProcessedBoundaryData,
598        ProcessedBoundaryEvent, ProcessedFFIAllocation, ProcessedFFIData,
599        ProcessedUnsafeAllocation, ProcessedUnsafeData, RiskDistribution, TransferPatterns,
600        UnsafeBlockInfo, UnsafePerformanceMetrics,
601    };
602    use std::io::Cursor;
603
604    fn create_test_writer() -> StreamingJsonWriter<Cursor<Vec<u8>>> {
605        let buffer = Vec::new();
606        let cursor = Cursor::new(buffer);
607        StreamingJsonWriter::new(cursor).unwrap()
608    }
609
610    fn create_test_writer_with_config(
611        config: StreamingWriterConfig,
612    ) -> StreamingJsonWriter<Cursor<Vec<u8>>> {
613        let buffer = Vec::new();
614        let cursor = Cursor::new(buffer);
615        StreamingJsonWriter::with_config(cursor, config).unwrap()
616    }
617
618    #[test]
619    fn test_streaming_writer_creation() {
620        let buffer = Vec::new();
621        let cursor = Cursor::new(buffer);
622        let writer = StreamingJsonWriter::new(cursor);
623        assert!(writer.is_ok());
624    }
625
626    #[test]
627    fn test_streaming_writer_with_custom_config() {
628        let config = StreamingWriterConfig {
629            buffer_size: 128 * 1024,
630            enable_compression: true,
631            compression_level: 5,
632            pretty_print: true,
633            max_memory_before_flush: 32 * 1024 * 1024,
634            non_blocking: false,
635            array_chunk_size: 500,
636        };
637
638        let buffer = Vec::new();
639        let cursor = Cursor::new(buffer);
640        let writer = StreamingJsonWriter::with_config(cursor, config.clone());
641        assert!(writer.is_ok());
642
643        let writer = writer.unwrap();
644        assert_eq!(writer.config.buffer_size, config.buffer_size);
645        assert_eq!(writer.config.enable_compression, config.enable_compression);
646        assert_eq!(writer.config.compression_level, config.compression_level);
647        assert_eq!(writer.config.pretty_print, config.pretty_print);
648    }
649
650    #[test]
651    fn test_config_builder() {
652        let config = StreamingWriterConfigBuilder::new()
653            .buffer_size(512 * 1024)
654            .with_compression(9)
655            .pretty_print()
656            .build();
657
658        assert_eq!(config.buffer_size, 512 * 1024);
659        assert!(config.enable_compression);
660        assert_eq!(config.compression_level, 9);
661        assert!(config.pretty_print);
662    }
663
664    #[test]
665    fn test_config_builder_all_methods() {
666        let config = StreamingWriterConfigBuilder::new()
667            .buffer_size(1024 * 1024)
668            .with_compression(3)
669            .pretty_print()
670            .max_memory_before_flush(128 * 1024 * 1024)
671            .array_chunk_size(2000)
672            .non_blocking(false)
673            .build();
674
675        assert_eq!(config.buffer_size, 1024 * 1024);
676        assert!(config.enable_compression);
677        assert_eq!(config.compression_level, 3);
678        assert!(config.pretty_print);
679        assert_eq!(config.max_memory_before_flush, 128 * 1024 * 1024);
680        assert_eq!(config.array_chunk_size, 2000);
681        assert!(!config.non_blocking);
682    }
683
684    #[test]
685    fn test_config_builder_default() {
686        let builder1 = StreamingWriterConfigBuilder::new();
687        let builder2 = StreamingWriterConfigBuilder::default();
688
689        let config1 = builder1.build();
690        let config2 = builder2.build();
691
692        assert_eq!(config1.buffer_size, config2.buffer_size);
693        assert_eq!(config1.enable_compression, config2.enable_compression);
694    }
695
696    #[test]
697    fn test_export_metadata_creation() {
698        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
699        assert_eq!(metadata.analysis_type, "unsafe_ffi_analysis_optimized");
700        assert_eq!(metadata.schema_version, "2.0");
701        assert_eq!(metadata.optimization_level, "high");
702        assert_eq!(metadata.processing_mode, "parallel");
703        assert!(metadata.export_timestamp > 0);
704        assert!(!metadata.data_integrity_hash.is_empty());
705    }
706
707    #[test]
708    fn test_export_metadata_with_config() {
709        let config = StreamingWriterConfig {
710            buffer_size: 512 * 1024,
711            enable_compression: true,
712            compression_level: 7,
713            pretty_print: true,
714            max_memory_before_flush: 64 * 1024 * 1024,
715            non_blocking: true,
716            array_chunk_size: 1500,
717        };
718
719        let metadata =
720            ExportMetadata::for_unsafe_ffi_analysis("medium", "sequential").with_config(&config);
721
722        assert_eq!(metadata.export_config.buffer_size, 512 * 1024);
723        assert!(metadata.export_config.compression_enabled);
724        assert_eq!(metadata.export_config.compression_level, Some(7));
725        assert!(metadata.export_config.pretty_print);
726        assert_eq!(metadata.export_config.array_chunk_size, 1500);
727    }
728
729    #[test]
730    fn test_write_header() {
731        let mut writer = create_test_writer();
732        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
733
734        let result = writer.write_unsafe_ffi_header(&metadata);
735        assert!(result.is_ok());
736
737        // Check that stats are updated
738        let stats = writer.get_stats();
739        assert!(stats.bytes_written > 0);
740    }
741
742    #[test]
743    fn test_write_header_pretty_print() {
744        let config = StreamingWriterConfigBuilder::new().pretty_print().build();
745        let mut writer = create_test_writer_with_config(config);
746        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
747
748        let result = writer.write_unsafe_ffi_header(&metadata);
749        assert!(result.is_ok());
750
751        let stats = writer.get_stats();
752        assert!(stats.bytes_written > 0);
753    }
754
755    #[test]
756    fn test_write_unsafe_allocations_stream() {
757        let mut writer = create_test_writer();
758        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
759        writer.write_unsafe_ffi_header(&metadata).unwrap();
760
761        let unsafe_data = ProcessedUnsafeData {
762            total_allocations: 100,
763            total_memory: 1024 * 1024,
764            risk_distribution: RiskDistribution {
765                low_risk: 50,
766                medium_risk: 30,
767                high_risk: 15,
768                critical_risk: 5,
769                overall_risk_score: 6.5,
770            },
771            unsafe_blocks: vec![UnsafeBlockInfo {
772                location: "test.rs:10".to_string(),
773                allocation_count: 10,
774                total_memory: 1024,
775                risk_level: crate::analysis::unsafe_ffi_tracker::RiskLevel::High,
776                functions_called: vec!["raw_pointer_deref".to_string()],
777            }],
778            allocations: vec![ProcessedUnsafeAllocation {
779                ptr: "0x1000".to_string(),
780                size: 1024,
781                type_name: Some("TestType".to_string()),
782                unsafe_block_location: "test.rs:15".to_string(),
783                call_stack: vec!["main".to_string(), "test_function".to_string()],
784                risk_assessment: crate::analysis::unsafe_ffi_tracker::RiskAssessment {
785                    risk_level: crate::analysis::unsafe_ffi_tracker::RiskLevel::Medium,
786                    risk_factors: vec![],
787                    mitigation_suggestions: vec![],
788                    confidence_score: 0.8,
789                    assessment_timestamp: 0,
790                },
791                lifetime_info: crate::export::batch_processor::LifetimeInfo {
792                    allocated_at: 1000,
793                    deallocated_at: None,
794                    lifetime_ns: None,
795                    scope: "test_scope".to_string(),
796                },
797                memory_layout: None,
798            }],
799            performance_metrics: UnsafePerformanceMetrics {
800                processing_time_ms: 100,
801                memory_usage_bytes: 512,
802                risk_assessments_performed: 1,
803                avg_risk_assessment_time_ns: 5000.0,
804            },
805        };
806
807        let result = writer.write_unsafe_allocations_stream(&unsafe_data);
808        assert!(result.is_ok());
809
810        let stats = writer.get_stats();
811        assert!(stats.bytes_written > 0);
812    }
813
814    #[test]
815    fn test_write_ffi_allocations_stream() {
816        let mut writer = create_test_writer();
817        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
818        writer.write_unsafe_ffi_header(&metadata).unwrap();
819
820        let ffi_data = ProcessedFFIData {
821            total_allocations: 50,
822            total_memory: 512 * 1024,
823            libraries_involved: vec![LibraryInfo {
824                name: "libc".to_string(),
825                allocation_count: 30,
826                total_memory: 300 * 1024,
827                functions_used: vec!["malloc".to_string(), "free".to_string()],
828                avg_allocation_size: 10240,
829            }],
830            hook_statistics: HookStatistics {
831                total_hooks: 10,
832                success_rate: 0.9,
833                avg_overhead_ns: 1000.0,
834                methods_used: std::collections::HashMap::new(),
835            },
836            allocations: vec![ProcessedFFIAllocation {
837                ptr: "0x2000".to_string(),
838                size: 2048,
839                library_name: "libc".to_string(),
840                function_name: "malloc".to_string(),
841                call_stack: vec!["main".to_string(), "ffi_function".to_string()],
842                hook_info: crate::analysis::unsafe_ffi_tracker::LibCHookInfo {
843                    hook_method: crate::analysis::unsafe_ffi_tracker::HookMethod::DynamicLinker,
844                    original_function: "malloc".to_string(),
845                    hook_timestamp: 1000,
846                    allocation_metadata: crate::analysis::unsafe_ffi_tracker::AllocationMetadata {
847                        requested_size: 2048,
848                        actual_size: 2048,
849                        alignment: 8,
850                        allocator_info: "libc".to_string(),
851                        protection_flags: None,
852                    },
853                    hook_overhead_ns: Some(100),
854                },
855                ownership_info: crate::export::batch_processor::OwnershipInfo {
856                    owner_context: "FFI".to_string(),
857                    owner_function: "malloc".to_string(),
858                    transfer_timestamp: 1000,
859                    expected_lifetime: None,
860                },
861                interop_metadata: crate::export::batch_processor::InteropMetadata {
862                    marshalling_info: "C-compatible".to_string(),
863                    type_conversion: "Direct".to_string(),
864                    performance_impact: "Low".to_string(),
865                    safety_considerations: vec!["Manual memory management".to_string()],
866                },
867            }],
868            performance_metrics: FFIPerformanceMetrics {
869                processing_time_ms: 50,
870                memory_usage_bytes: 256,
871                hook_operations_processed: 1,
872                avg_hook_processing_time_ns: 3000.0,
873            },
874        };
875
876        let result = writer.write_ffi_allocations_stream(&ffi_data);
877        assert!(result.is_ok());
878
879        let stats = writer.get_stats();
880        assert!(stats.bytes_written > 0);
881    }
882
883    #[test]
884    fn test_write_boundary_events_stream() {
885        let mut writer = create_test_writer();
886        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
887        writer.write_unsafe_ffi_header(&metadata).unwrap();
888
889        let boundary_data = ProcessedBoundaryData {
890            total_crossings: 25,
891            transfer_patterns: TransferPatterns {
892                dominant_direction: "Rust -> FFI".to_string(),
893                frequency_by_type: {
894                    let mut map = std::collections::HashMap::new();
895                    map.insert("safe_to_unsafe".to_string(), 15);
896                    map.insert("unsafe_to_safe".to_string(), 10);
897                    map
898                },
899                avg_transfer_size: 64,
900                peak_activity_time: Some(1234567890),
901            },
902            risk_analysis: BoundaryRiskAnalysis {
903                overall_risk_score: 7.5,
904                high_risk_transfers: 5,
905                common_risk_patterns: vec!["Unvalidated pointer transfer".to_string()],
906                mitigation_recommendations: vec!["Add validation".to_string()],
907            },
908            events: vec![ProcessedBoundaryEvent {
909                event_id: "boundary_1".to_string(),
910                event_type: "safe_to_unsafe".to_string(),
911                timestamp: 1234567890,
912                from_context: crate::export::batch_processor::ContextInfo {
913                    name: "Rust".to_string(),
914                    function: "main".to_string(),
915                    metadata: std::collections::HashMap::new(),
916                },
917                to_context: crate::export::batch_processor::ContextInfo {
918                    name: "FFI".to_string(),
919                    function: "malloc".to_string(),
920                    metadata: std::collections::HashMap::new(),
921                },
922                memory_passport: None,
923                risk_factors: vec!["raw_pointer".to_string()],
924            }],
925            performance_impact: BoundaryPerformanceImpact {
926                total_processing_time_ms: 100,
927                avg_crossing_time_ns: 5000.0,
928                overhead_percentage: 2.0,
929                optimization_opportunities: vec!["Reduce crossings".to_string()],
930            },
931        };
932
933        let result = writer.write_boundary_events_stream(&boundary_data);
934        assert!(result.is_ok());
935
936        let stats = writer.get_stats();
937        assert!(stats.bytes_written > 0);
938    }
939
940    #[test]
941    fn test_write_safety_violations_stream() {
942        let mut writer = create_test_writer();
943        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
944        writer.write_unsafe_ffi_header(&metadata).unwrap();
945
946        #[derive(serde::Serialize)]
947        struct TestViolation {
948            id: u32,
949            severity: String,
950            description: String,
951        }
952
953        let violations = vec![
954            TestViolation {
955                id: 1,
956                severity: "critical".to_string(),
957                description: "Use after free".to_string(),
958            },
959            TestViolation {
960                id: 2,
961                severity: "high".to_string(),
962                description: "Buffer overflow".to_string(),
963            },
964        ];
965
966        let result = writer.write_safety_violations_stream(&violations);
967        assert!(result.is_ok());
968
969        let stats = writer.get_stats();
970        assert!(stats.bytes_written > 0);
971    }
972
973    #[test]
974    fn test_write_processing_metrics() {
975        let mut writer = create_test_writer();
976        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
977        writer.write_unsafe_ffi_header(&metadata).unwrap();
978
979        let metrics = BatchProcessingMetrics {
980            total_items: 500,
981            batch_count: 5,
982            total_processing_time_ms: 1000,
983            avg_batch_time_ms: 200.0,
984            peak_memory_usage_bytes: 128 * 1024 * 1024,
985            parallel_processing_used: true,
986            threads_used: 4,
987            throughput_items_per_sec: 500.0,
988        };
989
990        let result = writer.write_processing_metrics(&metrics);
991        assert!(result.is_ok());
992
993        let stats = writer.get_stats();
994        assert!(stats.bytes_written > 0);
995    }
996
997    #[test]
998    fn test_finalize() {
999        let mut writer = create_test_writer();
1000        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1001        writer.write_unsafe_ffi_header(&metadata).unwrap();
1002
1003        let result = writer.finalize();
1004        assert!(result.is_ok());
1005
1006        let stats = result.unwrap();
1007        assert!(stats.bytes_written > 0);
1008        assert!(stats.flush_count > 0);
1009    }
1010
1011    #[test]
1012    fn test_finalize_twice() {
1013        let mut writer = create_test_writer();
1014        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1015        writer.write_unsafe_ffi_header(&metadata).unwrap();
1016
1017        // First finalize should succeed
1018        let result1 = writer.finalize();
1019        assert!(result1.is_ok());
1020
1021        // Second finalize should return the same stats (idempotent)
1022        let result2 = writer.finalize();
1023        assert!(result2.is_ok());
1024
1025        let stats1 = result1.unwrap();
1026        let stats2 = result2.unwrap();
1027        assert_eq!(stats1.bytes_written, stats2.bytes_written);
1028    }
1029
1030    #[test]
1031    fn test_write_after_finalize() {
1032        let mut writer = create_test_writer();
1033        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1034        writer.write_unsafe_ffi_header(&metadata).unwrap();
1035        writer.finalize().unwrap();
1036
1037        // Writing after finalize should fail
1038        let result = writer.write_unsafe_ffi_header(&metadata);
1039        assert!(result.is_err());
1040
1041        if let Err(TrackingError::InvalidOperation(msg)) = result {
1042            assert!(msg.contains("finalized"));
1043        } else {
1044            panic!("Expected InvalidOperation error");
1045        }
1046    }
1047
1048    #[test]
1049    fn test_flush() {
1050        let mut writer = create_test_writer();
1051        let initial_flush_count = writer.get_stats().flush_count;
1052
1053        let result = writer.flush();
1054        assert!(result.is_ok());
1055
1056        let stats = writer.get_stats();
1057        assert_eq!(stats.flush_count, initial_flush_count + 1);
1058    }
1059
1060    #[test]
1061    fn test_get_stats() {
1062        let writer = create_test_writer();
1063        let stats = writer.get_stats();
1064
1065        assert_eq!(stats.bytes_written, 0);
1066        assert_eq!(stats.flush_count, 0);
1067        assert_eq!(stats.total_write_time_ms, 0);
1068        assert_eq!(stats.avg_write_speed_bps, 0.0);
1069        assert_eq!(stats.peak_memory_usage, 0);
1070        assert_eq!(stats.chunks_written, 0);
1071        assert_eq!(stats.compression_ratio, None);
1072    }
1073
1074    #[test]
1075    fn test_memory_flush_threshold() {
1076        let config = StreamingWriterConfigBuilder::new()
1077            .max_memory_before_flush(100) // Very small threshold
1078            .build();
1079        let mut writer = create_test_writer_with_config(config);
1080
1081        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1082        let result = writer.write_unsafe_ffi_header(&metadata);
1083        assert!(result.is_ok());
1084
1085        // Should have triggered flush due to small threshold
1086        let stats = writer.get_stats();
1087        assert!(stats.flush_count > 0);
1088    }
1089
1090    #[test]
1091    fn test_array_chunking() {
1092        let config = StreamingWriterConfigBuilder::new()
1093            .array_chunk_size(2) // Small chunk size for testing
1094            .build();
1095        let mut writer = create_test_writer_with_config(config);
1096
1097        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1098        writer.write_unsafe_ffi_header(&metadata).unwrap();
1099
1100        #[derive(serde::Serialize)]
1101        struct TestItem {
1102            id: u32,
1103            value: String,
1104        }
1105
1106        let items = vec![
1107            TestItem {
1108                id: 1,
1109                value: "test1".to_string(),
1110            },
1111            TestItem {
1112                id: 2,
1113                value: "test2".to_string(),
1114            },
1115            TestItem {
1116                id: 3,
1117                value: "test3".to_string(),
1118            },
1119            TestItem {
1120                id: 4,
1121                value: "test4".to_string(),
1122            },
1123            TestItem {
1124                id: 5,
1125                value: "test5".to_string(),
1126            },
1127        ];
1128
1129        let violations = items;
1130        let result = writer.write_safety_violations_stream(&violations);
1131        assert!(result.is_ok());
1132
1133        let stats = writer.get_stats();
1134        assert!(stats.chunks_written > 1); // Should have multiple chunks
1135    }
1136
1137    #[test]
1138    fn test_streaming_writer_config_default() {
1139        let config = StreamingWriterConfig::default();
1140
1141        assert_eq!(config.buffer_size, 256 * 1024);
1142        assert!(!config.enable_compression);
1143        assert_eq!(config.compression_level, 6);
1144        assert!(!config.pretty_print);
1145        assert_eq!(config.max_memory_before_flush, 64 * 1024 * 1024);
1146        assert!(config.non_blocking);
1147        assert_eq!(config.array_chunk_size, 1000);
1148    }
1149
1150    #[test]
1151    fn test_streaming_stats_serialization() {
1152        let stats = StreamingStats {
1153            bytes_written: 1024,
1154            flush_count: 5,
1155            total_write_time_ms: 100,
1156            avg_write_speed_bps: 10240.0,
1157            peak_memory_usage: 2048,
1158            chunks_written: 3,
1159            compression_ratio: Some(0.75),
1160        };
1161
1162        let json = serde_json::to_string(&stats);
1163        assert!(json.is_ok());
1164
1165        let deserialized: Result<StreamingStats, _> = serde_json::from_str(&json.unwrap());
1166        assert!(deserialized.is_ok());
1167
1168        let deserialized_stats = deserialized.unwrap();
1169        assert_eq!(deserialized_stats.bytes_written, stats.bytes_written);
1170        assert_eq!(
1171            deserialized_stats.compression_ratio,
1172            stats.compression_ratio
1173        );
1174    }
1175
1176    #[test]
1177    fn test_export_metadata_serialization() {
1178        let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1179
1180        let json = serde_json::to_string(&metadata);
1181        assert!(json.is_ok());
1182
1183        let deserialized: Result<ExportMetadata, _> = serde_json::from_str(&json.unwrap());
1184        assert!(deserialized.is_ok());
1185
1186        let deserialized_metadata = deserialized.unwrap();
1187        assert_eq!(deserialized_metadata.analysis_type, metadata.analysis_type);
1188        assert_eq!(
1189            deserialized_metadata.schema_version,
1190            metadata.schema_version
1191        );
1192    }
1193}