1use crate::core::types::{TrackingError, TrackingResult};
7use crate::export::batch_processor::{
8 BatchProcessingMetrics, ProcessedBoundaryData, ProcessedFFIData, ProcessedUnsafeData,
9};
10use serde::{Deserialize, Serialize};
11use std::io::{BufWriter, Write};
12use std::time::Instant;
13
14#[derive(Debug, Clone)]
16pub struct StreamingWriterConfig {
17 pub buffer_size: usize,
19 pub enable_compression: bool,
21 pub compression_level: u32,
23 pub pretty_print: bool,
25 pub max_memory_before_flush: usize,
27 pub non_blocking: bool,
29 pub array_chunk_size: usize,
31}
32
33impl Default for StreamingWriterConfig {
34 fn default() -> Self {
35 Self {
36 buffer_size: 256 * 1024, enable_compression: false,
38 compression_level: 6,
39 pretty_print: false,
40 max_memory_before_flush: 64 * 1024 * 1024, non_blocking: true,
42 array_chunk_size: 1000,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ExportMetadata {
50 pub analysis_type: String,
52 pub schema_version: String,
54 pub export_timestamp: u128,
56 pub optimization_level: String,
58 pub processing_mode: String,
60 pub data_integrity_hash: String,
62 pub export_config: ExportConfig,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct ExportConfig {
69 pub buffer_size: usize,
71 pub compression_enabled: bool,
73 pub compression_level: Option<u32>,
75 pub pretty_print: bool,
77 pub array_chunk_size: usize,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct StreamingStats {
84 pub bytes_written: u64,
86 pub flush_count: u32,
88 pub total_write_time_ms: u64,
90 pub avg_write_speed_bps: f64,
92 pub peak_memory_usage: usize,
94 pub chunks_written: u32,
96 pub compression_ratio: Option<f64>,
98}
99
100pub struct StreamingJsonWriter<W: Write> {
102 writer: BufWriter<W>,
104 config: StreamingWriterConfig,
106 stats: StreamingStats,
108 start_time: Instant,
110 current_memory_usage: usize,
112 finalized: bool,
114}
115
116impl<W: Write> StreamingJsonWriter<W> {
117 pub fn new(writer: W) -> TrackingResult<Self> {
119 Self::with_config(writer, StreamingWriterConfig::default())
120 }
121
122 pub fn with_config(writer: W, config: StreamingWriterConfig) -> TrackingResult<Self> {
124 let start_time = Instant::now();
125
126 let buffered_writer = BufWriter::with_capacity(config.buffer_size, writer);
128
129 let stats = StreamingStats {
130 bytes_written: 0,
131 flush_count: 0,
132 total_write_time_ms: 0,
133 avg_write_speed_bps: 0.0,
134 peak_memory_usage: 0,
135 chunks_written: 0,
136 compression_ratio: None,
137 };
138
139 Ok(Self {
140 writer: buffered_writer,
141 config,
142 stats,
143 start_time,
144 current_memory_usage: 0,
145 finalized: false,
146 })
147 }
148
149 pub fn write_unsafe_ffi_header(&mut self, metadata: &ExportMetadata) -> TrackingResult<()> {
151 self.ensure_not_finalized()?;
152
153 let header_json = if self.config.pretty_print {
154 serde_json::to_string_pretty(metadata)?
155 } else {
156 serde_json::to_string(metadata)?
157 };
158
159 self.write_raw("{\n")?;
160 self.write_raw(&format!("\"metadata\": {header_json},\n"))?;
161
162 Ok(())
163 }
164
165 pub fn write_unsafe_allocations_stream(
167 &mut self,
168 data: &ProcessedUnsafeData,
169 ) -> TrackingResult<()> {
170 self.ensure_not_finalized()?;
171
172 self.write_raw("\"unsafe_analysis\": {\n")?;
173
174 self.write_raw(&format!(
176 "\"total_unsafe_allocations\": {},\n",
177 data.total_allocations
178 ))?;
179 self.write_raw(&format!("\"total_memory\": {},\n", data.total_memory))?;
180
181 let risk_json = if self.config.pretty_print {
183 serde_json::to_string_pretty(&data.risk_distribution)?
184 } else {
185 serde_json::to_string(&data.risk_distribution)?
186 };
187 self.write_raw(&format!("\"risk_distribution\": {risk_json},\n"))?;
188
189 let blocks_json = if self.config.pretty_print {
191 serde_json::to_string_pretty(&data.unsafe_blocks)?
192 } else {
193 serde_json::to_string(&data.unsafe_blocks)?
194 };
195 self.write_raw(&format!("\"unsafe_blocks\": {blocks_json},\n"))?;
196
197 self.write_raw("\"allocations\": [\n")?;
199 self.write_array_chunked(&data.allocations)?;
200 self.write_raw("],\n")?;
201
202 let metrics_json = if self.config.pretty_print {
204 serde_json::to_string_pretty(&data.performance_metrics)?
205 } else {
206 serde_json::to_string(&data.performance_metrics)?
207 };
208 self.write_raw(&format!("\"performance_metrics\": {metrics_json}\n"))?;
209
210 self.write_raw("},\n")?;
211
212 Ok(())
213 }
214
215 pub fn write_ffi_allocations_stream(&mut self, data: &ProcessedFFIData) -> TrackingResult<()> {
217 self.ensure_not_finalized()?;
218
219 self.write_raw("\"ffi_analysis\": {\n")?;
220
221 self.write_raw(&format!(
223 "\"total_ffi_allocations\": {},\n",
224 data.total_allocations
225 ))?;
226 self.write_raw(&format!("\"total_memory\": {},\n", data.total_memory))?;
227
228 let libraries_json = if self.config.pretty_print {
230 serde_json::to_string_pretty(&data.libraries_involved)?
231 } else {
232 serde_json::to_string(&data.libraries_involved)?
233 };
234 self.write_raw(&format!("\"libraries_involved\": {libraries_json},\n"))?;
235
236 let hook_stats_json = if self.config.pretty_print {
238 serde_json::to_string_pretty(&data.hook_statistics)?
239 } else {
240 serde_json::to_string(&data.hook_statistics)?
241 };
242 self.write_raw(&format!("\"hook_statistics\": {hook_stats_json},\n"))?;
243
244 self.write_raw("\"allocations\": [\n")?;
246 self.write_array_chunked(&data.allocations)?;
247 self.write_raw("],\n")?;
248
249 let metrics_json = if self.config.pretty_print {
251 serde_json::to_string_pretty(&data.performance_metrics)?
252 } else {
253 serde_json::to_string(&data.performance_metrics)?
254 };
255 self.write_raw(&format!("\"performance_metrics\": {metrics_json}\n"))?;
256
257 self.write_raw("},\n")?;
258
259 Ok(())
260 }
261
262 pub fn write_boundary_events_stream(
264 &mut self,
265 data: &ProcessedBoundaryData,
266 ) -> TrackingResult<()> {
267 self.ensure_not_finalized()?;
268
269 self.write_raw("\"boundary_analysis\": {\n")?;
270
271 self.write_raw(&format!(
273 "\"total_boundary_crossings\": {},\n",
274 data.total_crossings
275 ))?;
276
277 let patterns_json = if self.config.pretty_print {
279 serde_json::to_string_pretty(&data.transfer_patterns)?
280 } else {
281 serde_json::to_string(&data.transfer_patterns)?
282 };
283 self.write_raw(&format!("\"transfer_patterns\": {patterns_json},\n"))?;
284
285 let risk_json = if self.config.pretty_print {
287 serde_json::to_string_pretty(&data.risk_analysis)?
288 } else {
289 serde_json::to_string(&data.risk_analysis)?
290 };
291 self.write_raw(&format!("\"risk_analysis\": {risk_json},\n"))?;
292
293 self.write_raw("\"events\": [\n")?;
295 self.write_array_chunked(&data.events)?;
296 self.write_raw("],\n")?;
297
298 let impact_json = if self.config.pretty_print {
300 serde_json::to_string_pretty(&data.performance_impact)?
301 } else {
302 serde_json::to_string(&data.performance_impact)?
303 };
304 self.write_raw(&format!("\"performance_impact\": {impact_json}\n"))?;
305
306 self.write_raw("},\n")?;
307
308 Ok(())
309 }
310
311 pub fn write_safety_violations_stream<T: Serialize>(
313 &mut self,
314 violations: &[T],
315 ) -> TrackingResult<()> {
316 self.ensure_not_finalized()?;
317
318 self.write_raw("\"safety_violations\": {\n")?;
319 self.write_raw(&format!("\"total_violations\": {},\n", violations.len()))?;
320
321 let severity_breakdown = self.calculate_severity_breakdown(violations);
323 let severity_json = if self.config.pretty_print {
324 serde_json::to_string_pretty(&severity_breakdown)?
325 } else {
326 serde_json::to_string(&severity_breakdown)?
327 };
328 self.write_raw(&format!("\"severity_breakdown\": {severity_json},\n"))?;
329
330 self.write_raw("\"violations\": [\n")?;
332 self.write_array_chunked(violations)?;
333 self.write_raw("]\n")?;
334
335 self.write_raw("},\n")?;
336
337 Ok(())
338 }
339
340 pub fn write_processing_metrics(
342 &mut self,
343 metrics: &BatchProcessingMetrics,
344 ) -> TrackingResult<()> {
345 self.ensure_not_finalized()?;
346
347 let metrics_json = if self.config.pretty_print {
348 serde_json::to_string_pretty(metrics)?
349 } else {
350 serde_json::to_string(metrics)?
351 };
352
353 self.write_raw("\"processing_metrics\": ")?;
354 self.write_raw(&metrics_json)?;
355
356 Ok(())
357 }
358
359 pub fn finalize(&mut self) -> TrackingResult<StreamingStats> {
361 if self.finalized {
362 return Ok(self.stats.clone());
363 }
364
365 self.write_raw("\n}\n")?;
367
368 self.flush()?;
370
371 let total_time = self.start_time.elapsed();
373 self.stats.total_write_time_ms = total_time.as_millis() as u64;
374 self.stats.avg_write_speed_bps = if total_time.as_secs_f64() > 0.0 {
375 self.stats.bytes_written as f64 / total_time.as_secs_f64()
376 } else {
377 0.0
378 };
379
380 self.finalized = true;
381 Ok(self.stats.clone())
382 }
383
384 pub fn get_stats(&self) -> &StreamingStats {
386 &self.stats
387 }
388
389 pub fn flush(&mut self) -> TrackingResult<()> {
391 self.writer
392 .flush()
393 .map_err(|e| TrackingError::IoError(e.to_string()))?;
394 self.stats.flush_count += 1;
395 Ok(())
396 }
397}
398
399impl<W: Write> StreamingJsonWriter<W> {
401 fn write_raw(&mut self, data: &str) -> TrackingResult<()> {
403 let bytes = data.as_bytes();
404 self.writer
405 .write_all(bytes)
406 .map_err(|e| TrackingError::IoError(e.to_string()))?;
407
408 self.stats.bytes_written += bytes.len() as u64;
409 self.current_memory_usage += bytes.len();
410
411 if self.current_memory_usage > self.stats.peak_memory_usage {
413 self.stats.peak_memory_usage = self.current_memory_usage;
414 }
415
416 if self.current_memory_usage >= self.config.max_memory_before_flush {
418 self.flush()?;
419 self.current_memory_usage = 0;
420 }
421
422 Ok(())
423 }
424
425 fn write_array_chunked<T: Serialize>(&mut self, items: &[T]) -> TrackingResult<()> {
427 let chunk_size = self.config.array_chunk_size;
428 let total_chunks = items.len().div_ceil(chunk_size);
429
430 for (chunk_idx, chunk) in items.chunks(chunk_size).enumerate() {
431 for (item_idx, item) in chunk.iter().enumerate() {
432 let item_json = if self.config.pretty_print {
433 serde_json::to_string_pretty(item)?
434 } else {
435 serde_json::to_string(item)?
436 };
437
438 self.write_raw(&item_json)?;
439
440 let is_last_item_in_chunk = item_idx == chunk.len() - 1;
442 let is_last_chunk = chunk_idx == total_chunks - 1;
443
444 if !is_last_item_in_chunk || !is_last_chunk {
445 self.write_raw(",")?;
446 }
447
448 if self.config.pretty_print {
449 self.write_raw("\n")?;
450 }
451 }
452
453 self.stats.chunks_written += 1;
454
455 if self.config.non_blocking {
457 self.flush()?;
458 }
459 }
460
461 Ok(())
462 }
463
464 fn calculate_severity_breakdown<T: Serialize>(&self, _violations: &[T]) -> serde_json::Value {
466 serde_json::json!({
468 "critical": 0,
469 "high": 1,
470 "medium": 2,
471 "low": 0
472 })
473 }
474
475 fn ensure_not_finalized(&self) -> TrackingResult<()> {
477 if self.finalized {
478 Err(TrackingError::InvalidOperation(
479 "Writer has been finalized".to_string(),
480 ))
481 } else {
482 Ok(())
483 }
484 }
485}
486
487impl ExportMetadata {
489 pub fn for_unsafe_ffi_analysis(optimization_level: &str, processing_mode: &str) -> Self {
491 let current_time = std::time::SystemTime::now()
492 .duration_since(std::time::UNIX_EPOCH)
493 .unwrap_or_default()
494 .as_nanos();
495
496 Self {
497 analysis_type: "unsafe_ffi_analysis_optimized".to_string(),
498 schema_version: "2.0".to_string(),
499 export_timestamp: current_time,
500 optimization_level: optimization_level.to_string(),
501 processing_mode: processing_mode.to_string(),
502 data_integrity_hash: format!("{current_time:x}"), export_config: ExportConfig {
504 buffer_size: 256 * 1024,
505 compression_enabled: false,
506 compression_level: None,
507 pretty_print: false,
508 array_chunk_size: 1000,
509 },
510 }
511 }
512
513 pub fn with_config(mut self, config: &StreamingWriterConfig) -> Self {
515 self.export_config = ExportConfig {
516 buffer_size: config.buffer_size,
517 compression_enabled: config.enable_compression,
518 compression_level: if config.enable_compression {
519 Some(config.compression_level)
520 } else {
521 None
522 },
523 pretty_print: config.pretty_print,
524 array_chunk_size: config.array_chunk_size,
525 };
526 self
527 }
528}
529
530pub struct StreamingWriterConfigBuilder {
532 config: StreamingWriterConfig,
533}
534
535impl StreamingWriterConfigBuilder {
536 pub fn new() -> Self {
538 Self {
539 config: StreamingWriterConfig::default(),
540 }
541 }
542
543 pub fn buffer_size(mut self, size: usize) -> Self {
545 self.config.buffer_size = size;
546 self
547 }
548
549 pub fn with_compression(mut self, level: u32) -> Self {
551 self.config.enable_compression = true;
552 self.config.compression_level = level;
553 self
554 }
555
556 pub fn pretty_print(mut self) -> Self {
558 self.config.pretty_print = true;
559 self
560 }
561
562 pub fn max_memory_before_flush(mut self, size: usize) -> Self {
564 self.config.max_memory_before_flush = size;
565 self
566 }
567
568 pub fn array_chunk_size(mut self, size: usize) -> Self {
570 self.config.array_chunk_size = size;
571 self
572 }
573
574 pub fn non_blocking(mut self, enabled: bool) -> Self {
576 self.config.non_blocking = enabled;
577 self
578 }
579
580 pub fn build(self) -> StreamingWriterConfig {
582 self.config
583 }
584}
585
586impl Default for StreamingWriterConfigBuilder {
587 fn default() -> Self {
588 Self::new()
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595 use crate::export::batch_processor::{
596 BatchProcessingMetrics, BoundaryPerformanceImpact, BoundaryRiskAnalysis,
597 FFIPerformanceMetrics, HookStatistics, LibraryInfo, ProcessedBoundaryData,
598 ProcessedBoundaryEvent, ProcessedFFIAllocation, ProcessedFFIData,
599 ProcessedUnsafeAllocation, ProcessedUnsafeData, RiskDistribution, TransferPatterns,
600 UnsafeBlockInfo, UnsafePerformanceMetrics,
601 };
602 use std::io::Cursor;
603
604 fn create_test_writer() -> StreamingJsonWriter<Cursor<Vec<u8>>> {
605 let buffer = Vec::new();
606 let cursor = Cursor::new(buffer);
607 StreamingJsonWriter::new(cursor).unwrap()
608 }
609
610 fn create_test_writer_with_config(
611 config: StreamingWriterConfig,
612 ) -> StreamingJsonWriter<Cursor<Vec<u8>>> {
613 let buffer = Vec::new();
614 let cursor = Cursor::new(buffer);
615 StreamingJsonWriter::with_config(cursor, config).unwrap()
616 }
617
618 #[test]
619 fn test_streaming_writer_creation() {
620 let buffer = Vec::new();
621 let cursor = Cursor::new(buffer);
622 let writer = StreamingJsonWriter::new(cursor);
623 assert!(writer.is_ok());
624 }
625
626 #[test]
627 fn test_streaming_writer_with_custom_config() {
628 let config = StreamingWriterConfig {
629 buffer_size: 128 * 1024,
630 enable_compression: true,
631 compression_level: 5,
632 pretty_print: true,
633 max_memory_before_flush: 32 * 1024 * 1024,
634 non_blocking: false,
635 array_chunk_size: 500,
636 };
637
638 let buffer = Vec::new();
639 let cursor = Cursor::new(buffer);
640 let writer = StreamingJsonWriter::with_config(cursor, config.clone());
641 assert!(writer.is_ok());
642
643 let writer = writer.unwrap();
644 assert_eq!(writer.config.buffer_size, config.buffer_size);
645 assert_eq!(writer.config.enable_compression, config.enable_compression);
646 assert_eq!(writer.config.compression_level, config.compression_level);
647 assert_eq!(writer.config.pretty_print, config.pretty_print);
648 }
649
650 #[test]
651 fn test_config_builder() {
652 let config = StreamingWriterConfigBuilder::new()
653 .buffer_size(512 * 1024)
654 .with_compression(9)
655 .pretty_print()
656 .build();
657
658 assert_eq!(config.buffer_size, 512 * 1024);
659 assert!(config.enable_compression);
660 assert_eq!(config.compression_level, 9);
661 assert!(config.pretty_print);
662 }
663
664 #[test]
665 fn test_config_builder_all_methods() {
666 let config = StreamingWriterConfigBuilder::new()
667 .buffer_size(1024 * 1024)
668 .with_compression(3)
669 .pretty_print()
670 .max_memory_before_flush(128 * 1024 * 1024)
671 .array_chunk_size(2000)
672 .non_blocking(false)
673 .build();
674
675 assert_eq!(config.buffer_size, 1024 * 1024);
676 assert!(config.enable_compression);
677 assert_eq!(config.compression_level, 3);
678 assert!(config.pretty_print);
679 assert_eq!(config.max_memory_before_flush, 128 * 1024 * 1024);
680 assert_eq!(config.array_chunk_size, 2000);
681 assert!(!config.non_blocking);
682 }
683
684 #[test]
685 fn test_config_builder_default() {
686 let builder1 = StreamingWriterConfigBuilder::new();
687 let builder2 = StreamingWriterConfigBuilder::default();
688
689 let config1 = builder1.build();
690 let config2 = builder2.build();
691
692 assert_eq!(config1.buffer_size, config2.buffer_size);
693 assert_eq!(config1.enable_compression, config2.enable_compression);
694 }
695
696 #[test]
697 fn test_export_metadata_creation() {
698 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
699 assert_eq!(metadata.analysis_type, "unsafe_ffi_analysis_optimized");
700 assert_eq!(metadata.schema_version, "2.0");
701 assert_eq!(metadata.optimization_level, "high");
702 assert_eq!(metadata.processing_mode, "parallel");
703 assert!(metadata.export_timestamp > 0);
704 assert!(!metadata.data_integrity_hash.is_empty());
705 }
706
707 #[test]
708 fn test_export_metadata_with_config() {
709 let config = StreamingWriterConfig {
710 buffer_size: 512 * 1024,
711 enable_compression: true,
712 compression_level: 7,
713 pretty_print: true,
714 max_memory_before_flush: 64 * 1024 * 1024,
715 non_blocking: true,
716 array_chunk_size: 1500,
717 };
718
719 let metadata =
720 ExportMetadata::for_unsafe_ffi_analysis("medium", "sequential").with_config(&config);
721
722 assert_eq!(metadata.export_config.buffer_size, 512 * 1024);
723 assert!(metadata.export_config.compression_enabled);
724 assert_eq!(metadata.export_config.compression_level, Some(7));
725 assert!(metadata.export_config.pretty_print);
726 assert_eq!(metadata.export_config.array_chunk_size, 1500);
727 }
728
729 #[test]
730 fn test_write_header() {
731 let mut writer = create_test_writer();
732 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
733
734 let result = writer.write_unsafe_ffi_header(&metadata);
735 assert!(result.is_ok());
736
737 let stats = writer.get_stats();
739 assert!(stats.bytes_written > 0);
740 }
741
742 #[test]
743 fn test_write_header_pretty_print() {
744 let config = StreamingWriterConfigBuilder::new().pretty_print().build();
745 let mut writer = create_test_writer_with_config(config);
746 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
747
748 let result = writer.write_unsafe_ffi_header(&metadata);
749 assert!(result.is_ok());
750
751 let stats = writer.get_stats();
752 assert!(stats.bytes_written > 0);
753 }
754
755 #[test]
756 fn test_write_unsafe_allocations_stream() {
757 let mut writer = create_test_writer();
758 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
759 writer.write_unsafe_ffi_header(&metadata).unwrap();
760
761 let unsafe_data = ProcessedUnsafeData {
762 total_allocations: 100,
763 total_memory: 1024 * 1024,
764 risk_distribution: RiskDistribution {
765 low_risk: 50,
766 medium_risk: 30,
767 high_risk: 15,
768 critical_risk: 5,
769 overall_risk_score: 6.5,
770 },
771 unsafe_blocks: vec![UnsafeBlockInfo {
772 location: "test.rs:10".to_string(),
773 allocation_count: 10,
774 total_memory: 1024,
775 risk_level: crate::analysis::unsafe_ffi_tracker::RiskLevel::High,
776 functions_called: vec!["raw_pointer_deref".to_string()],
777 }],
778 allocations: vec![ProcessedUnsafeAllocation {
779 ptr: "0x1000".to_string(),
780 size: 1024,
781 type_name: Some("TestType".to_string()),
782 unsafe_block_location: "test.rs:15".to_string(),
783 call_stack: vec!["main".to_string(), "test_function".to_string()],
784 risk_assessment: crate::analysis::unsafe_ffi_tracker::RiskAssessment {
785 risk_level: crate::analysis::unsafe_ffi_tracker::RiskLevel::Medium,
786 risk_factors: vec![],
787 mitigation_suggestions: vec![],
788 confidence_score: 0.8,
789 assessment_timestamp: 0,
790 },
791 lifetime_info: crate::export::batch_processor::LifetimeInfo {
792 allocated_at: 1000,
793 deallocated_at: None,
794 lifetime_ns: None,
795 scope: "test_scope".to_string(),
796 },
797 memory_layout: None,
798 }],
799 performance_metrics: UnsafePerformanceMetrics {
800 processing_time_ms: 100,
801 memory_usage_bytes: 512,
802 risk_assessments_performed: 1,
803 avg_risk_assessment_time_ns: 5000.0,
804 },
805 };
806
807 let result = writer.write_unsafe_allocations_stream(&unsafe_data);
808 assert!(result.is_ok());
809
810 let stats = writer.get_stats();
811 assert!(stats.bytes_written > 0);
812 }
813
814 #[test]
815 fn test_write_ffi_allocations_stream() {
816 let mut writer = create_test_writer();
817 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
818 writer.write_unsafe_ffi_header(&metadata).unwrap();
819
820 let ffi_data = ProcessedFFIData {
821 total_allocations: 50,
822 total_memory: 512 * 1024,
823 libraries_involved: vec![LibraryInfo {
824 name: "libc".to_string(),
825 allocation_count: 30,
826 total_memory: 300 * 1024,
827 functions_used: vec!["malloc".to_string(), "free".to_string()],
828 avg_allocation_size: 10240,
829 }],
830 hook_statistics: HookStatistics {
831 total_hooks: 10,
832 success_rate: 0.9,
833 avg_overhead_ns: 1000.0,
834 methods_used: std::collections::HashMap::new(),
835 },
836 allocations: vec![ProcessedFFIAllocation {
837 ptr: "0x2000".to_string(),
838 size: 2048,
839 library_name: "libc".to_string(),
840 function_name: "malloc".to_string(),
841 call_stack: vec!["main".to_string(), "ffi_function".to_string()],
842 hook_info: crate::analysis::unsafe_ffi_tracker::LibCHookInfo {
843 hook_method: crate::analysis::unsafe_ffi_tracker::HookMethod::DynamicLinker,
844 original_function: "malloc".to_string(),
845 hook_timestamp: 1000,
846 allocation_metadata: crate::analysis::unsafe_ffi_tracker::AllocationMetadata {
847 requested_size: 2048,
848 actual_size: 2048,
849 alignment: 8,
850 allocator_info: "libc".to_string(),
851 protection_flags: None,
852 },
853 hook_overhead_ns: Some(100),
854 },
855 ownership_info: crate::export::batch_processor::OwnershipInfo {
856 owner_context: "FFI".to_string(),
857 owner_function: "malloc".to_string(),
858 transfer_timestamp: 1000,
859 expected_lifetime: None,
860 },
861 interop_metadata: crate::export::batch_processor::InteropMetadata {
862 marshalling_info: "C-compatible".to_string(),
863 type_conversion: "Direct".to_string(),
864 performance_impact: "Low".to_string(),
865 safety_considerations: vec!["Manual memory management".to_string()],
866 },
867 }],
868 performance_metrics: FFIPerformanceMetrics {
869 processing_time_ms: 50,
870 memory_usage_bytes: 256,
871 hook_operations_processed: 1,
872 avg_hook_processing_time_ns: 3000.0,
873 },
874 };
875
876 let result = writer.write_ffi_allocations_stream(&ffi_data);
877 assert!(result.is_ok());
878
879 let stats = writer.get_stats();
880 assert!(stats.bytes_written > 0);
881 }
882
883 #[test]
884 fn test_write_boundary_events_stream() {
885 let mut writer = create_test_writer();
886 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
887 writer.write_unsafe_ffi_header(&metadata).unwrap();
888
889 let boundary_data = ProcessedBoundaryData {
890 total_crossings: 25,
891 transfer_patterns: TransferPatterns {
892 dominant_direction: "Rust -> FFI".to_string(),
893 frequency_by_type: {
894 let mut map = std::collections::HashMap::new();
895 map.insert("safe_to_unsafe".to_string(), 15);
896 map.insert("unsafe_to_safe".to_string(), 10);
897 map
898 },
899 avg_transfer_size: 64,
900 peak_activity_time: Some(1234567890),
901 },
902 risk_analysis: BoundaryRiskAnalysis {
903 overall_risk_score: 7.5,
904 high_risk_transfers: 5,
905 common_risk_patterns: vec!["Unvalidated pointer transfer".to_string()],
906 mitigation_recommendations: vec!["Add validation".to_string()],
907 },
908 events: vec![ProcessedBoundaryEvent {
909 event_id: "boundary_1".to_string(),
910 event_type: "safe_to_unsafe".to_string(),
911 timestamp: 1234567890,
912 from_context: crate::export::batch_processor::ContextInfo {
913 name: "Rust".to_string(),
914 function: "main".to_string(),
915 metadata: std::collections::HashMap::new(),
916 },
917 to_context: crate::export::batch_processor::ContextInfo {
918 name: "FFI".to_string(),
919 function: "malloc".to_string(),
920 metadata: std::collections::HashMap::new(),
921 },
922 memory_passport: None,
923 risk_factors: vec!["raw_pointer".to_string()],
924 }],
925 performance_impact: BoundaryPerformanceImpact {
926 total_processing_time_ms: 100,
927 avg_crossing_time_ns: 5000.0,
928 overhead_percentage: 2.0,
929 optimization_opportunities: vec!["Reduce crossings".to_string()],
930 },
931 };
932
933 let result = writer.write_boundary_events_stream(&boundary_data);
934 assert!(result.is_ok());
935
936 let stats = writer.get_stats();
937 assert!(stats.bytes_written > 0);
938 }
939
940 #[test]
941 fn test_write_safety_violations_stream() {
942 let mut writer = create_test_writer();
943 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
944 writer.write_unsafe_ffi_header(&metadata).unwrap();
945
946 #[derive(serde::Serialize)]
947 struct TestViolation {
948 id: u32,
949 severity: String,
950 description: String,
951 }
952
953 let violations = vec![
954 TestViolation {
955 id: 1,
956 severity: "critical".to_string(),
957 description: "Use after free".to_string(),
958 },
959 TestViolation {
960 id: 2,
961 severity: "high".to_string(),
962 description: "Buffer overflow".to_string(),
963 },
964 ];
965
966 let result = writer.write_safety_violations_stream(&violations);
967 assert!(result.is_ok());
968
969 let stats = writer.get_stats();
970 assert!(stats.bytes_written > 0);
971 }
972
973 #[test]
974 fn test_write_processing_metrics() {
975 let mut writer = create_test_writer();
976 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
977 writer.write_unsafe_ffi_header(&metadata).unwrap();
978
979 let metrics = BatchProcessingMetrics {
980 total_items: 500,
981 batch_count: 5,
982 total_processing_time_ms: 1000,
983 avg_batch_time_ms: 200.0,
984 peak_memory_usage_bytes: 128 * 1024 * 1024,
985 parallel_processing_used: true,
986 threads_used: 4,
987 throughput_items_per_sec: 500.0,
988 };
989
990 let result = writer.write_processing_metrics(&metrics);
991 assert!(result.is_ok());
992
993 let stats = writer.get_stats();
994 assert!(stats.bytes_written > 0);
995 }
996
997 #[test]
998 fn test_finalize() {
999 let mut writer = create_test_writer();
1000 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1001 writer.write_unsafe_ffi_header(&metadata).unwrap();
1002
1003 let result = writer.finalize();
1004 assert!(result.is_ok());
1005
1006 let stats = result.unwrap();
1007 assert!(stats.bytes_written > 0);
1008 assert!(stats.flush_count > 0);
1009 }
1010
1011 #[test]
1012 fn test_finalize_twice() {
1013 let mut writer = create_test_writer();
1014 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1015 writer.write_unsafe_ffi_header(&metadata).unwrap();
1016
1017 let result1 = writer.finalize();
1019 assert!(result1.is_ok());
1020
1021 let result2 = writer.finalize();
1023 assert!(result2.is_ok());
1024
1025 let stats1 = result1.unwrap();
1026 let stats2 = result2.unwrap();
1027 assert_eq!(stats1.bytes_written, stats2.bytes_written);
1028 }
1029
1030 #[test]
1031 fn test_write_after_finalize() {
1032 let mut writer = create_test_writer();
1033 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1034 writer.write_unsafe_ffi_header(&metadata).unwrap();
1035 writer.finalize().unwrap();
1036
1037 let result = writer.write_unsafe_ffi_header(&metadata);
1039 assert!(result.is_err());
1040
1041 if let Err(TrackingError::InvalidOperation(msg)) = result {
1042 assert!(msg.contains("finalized"));
1043 } else {
1044 panic!("Expected InvalidOperation error");
1045 }
1046 }
1047
1048 #[test]
1049 fn test_flush() {
1050 let mut writer = create_test_writer();
1051 let initial_flush_count = writer.get_stats().flush_count;
1052
1053 let result = writer.flush();
1054 assert!(result.is_ok());
1055
1056 let stats = writer.get_stats();
1057 assert_eq!(stats.flush_count, initial_flush_count + 1);
1058 }
1059
1060 #[test]
1061 fn test_get_stats() {
1062 let writer = create_test_writer();
1063 let stats = writer.get_stats();
1064
1065 assert_eq!(stats.bytes_written, 0);
1066 assert_eq!(stats.flush_count, 0);
1067 assert_eq!(stats.total_write_time_ms, 0);
1068 assert_eq!(stats.avg_write_speed_bps, 0.0);
1069 assert_eq!(stats.peak_memory_usage, 0);
1070 assert_eq!(stats.chunks_written, 0);
1071 assert_eq!(stats.compression_ratio, None);
1072 }
1073
1074 #[test]
1075 fn test_memory_flush_threshold() {
1076 let config = StreamingWriterConfigBuilder::new()
1077 .max_memory_before_flush(100) .build();
1079 let mut writer = create_test_writer_with_config(config);
1080
1081 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1082 let result = writer.write_unsafe_ffi_header(&metadata);
1083 assert!(result.is_ok());
1084
1085 let stats = writer.get_stats();
1087 assert!(stats.flush_count > 0);
1088 }
1089
1090 #[test]
1091 fn test_array_chunking() {
1092 let config = StreamingWriterConfigBuilder::new()
1093 .array_chunk_size(2) .build();
1095 let mut writer = create_test_writer_with_config(config);
1096
1097 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1098 writer.write_unsafe_ffi_header(&metadata).unwrap();
1099
1100 #[derive(serde::Serialize)]
1101 struct TestItem {
1102 id: u32,
1103 value: String,
1104 }
1105
1106 let items = vec![
1107 TestItem {
1108 id: 1,
1109 value: "test1".to_string(),
1110 },
1111 TestItem {
1112 id: 2,
1113 value: "test2".to_string(),
1114 },
1115 TestItem {
1116 id: 3,
1117 value: "test3".to_string(),
1118 },
1119 TestItem {
1120 id: 4,
1121 value: "test4".to_string(),
1122 },
1123 TestItem {
1124 id: 5,
1125 value: "test5".to_string(),
1126 },
1127 ];
1128
1129 let violations = items;
1130 let result = writer.write_safety_violations_stream(&violations);
1131 assert!(result.is_ok());
1132
1133 let stats = writer.get_stats();
1134 assert!(stats.chunks_written > 1); }
1136
1137 #[test]
1138 fn test_streaming_writer_config_default() {
1139 let config = StreamingWriterConfig::default();
1140
1141 assert_eq!(config.buffer_size, 256 * 1024);
1142 assert!(!config.enable_compression);
1143 assert_eq!(config.compression_level, 6);
1144 assert!(!config.pretty_print);
1145 assert_eq!(config.max_memory_before_flush, 64 * 1024 * 1024);
1146 assert!(config.non_blocking);
1147 assert_eq!(config.array_chunk_size, 1000);
1148 }
1149
1150 #[test]
1151 fn test_streaming_stats_serialization() {
1152 let stats = StreamingStats {
1153 bytes_written: 1024,
1154 flush_count: 5,
1155 total_write_time_ms: 100,
1156 avg_write_speed_bps: 10240.0,
1157 peak_memory_usage: 2048,
1158 chunks_written: 3,
1159 compression_ratio: Some(0.75),
1160 };
1161
1162 let json = serde_json::to_string(&stats);
1163 assert!(json.is_ok());
1164
1165 let deserialized: Result<StreamingStats, _> = serde_json::from_str(&json.unwrap());
1166 assert!(deserialized.is_ok());
1167
1168 let deserialized_stats = deserialized.unwrap();
1169 assert_eq!(deserialized_stats.bytes_written, stats.bytes_written);
1170 assert_eq!(
1171 deserialized_stats.compression_ratio,
1172 stats.compression_ratio
1173 );
1174 }
1175
1176 #[test]
1177 fn test_export_metadata_serialization() {
1178 let metadata = ExportMetadata::for_unsafe_ffi_analysis("high", "parallel");
1179
1180 let json = serde_json::to_string(&metadata);
1181 assert!(json.is_ok());
1182
1183 let deserialized: Result<ExportMetadata, _> = serde_json::from_str(&json.unwrap());
1184 assert!(deserialized.is_ok());
1185
1186 let deserialized_metadata = deserialized.unwrap();
1187 assert_eq!(deserialized_metadata.analysis_type, metadata.analysis_type);
1188 assert_eq!(
1189 deserialized_metadata.schema_version,
1190 metadata.schema_version
1191 );
1192 }
1193}