memscope_rs/export/
high_speed_buffered_writer.rs

1//! High-speed buffered writer - high performance file writing with reduced I/O overhead
2//!
3//! This module implements high-speed buffered writing functionality, using large buffers to reduce system call frequency,
4//! and provides efficient shard merging logic, significantly improving file writing performance.
5
6use crate::core::types::{TrackingError, TrackingResult};
7use crate::export::parallel_shard_processor::ProcessedShard;
8use std::fs::File;
9use std::io::{BufWriter, Write};
10use std::path::Path;
11use std::time::Instant;
12
13/// High-speed buffered writer configuration
14#[derive(Debug, Clone)]
15pub struct HighSpeedWriterConfig {
16    /// Buffer size (bytes)
17    pub buffer_size: usize,
18    /// Whether to enable performance monitoring
19    pub enable_monitoring: bool,
20    /// Estimated total output size (for pre-allocation)
21    pub estimated_total_size: Option<usize>,
22    /// Whether to use compression writing
23    pub enable_compression: bool,
24    /// Whether to flush the buffer automatically after writing
25    pub auto_flush: bool,
26}
27
28impl Default for HighSpeedWriterConfig {
29    fn default() -> Self {
30        Self {
31            buffer_size: 2 * 1024 * 1024, // 2MB buffer
32            enable_monitoring: true,
33            estimated_total_size: None,
34            enable_compression: false,
35            auto_flush: true,
36        }
37    }
38}
39
40/// Write performance statistics
41#[derive(Debug, Clone)]
42pub struct WritePerformanceStats {
43    /// Total bytes written
44    pub total_bytes_written: usize,
45    /// Number of shards written
46    pub shards_written: usize,
47    /// Total write time (milliseconds)
48    pub total_write_time_ms: u64,
49    /// Average write speed (bytes per second)
50    pub avg_write_speed_bps: f64,
51    /// Buffer flush count
52    pub flush_count: usize,
53    /// Whether pre-allocation was effective
54    pub preallocation_effective: bool,
55    /// Actual buffer utilization
56    pub buffer_utilization: f64,
57}
58
59/// High-speed buffered writer
60pub struct HighSpeedBufferedWriter {
61    /// Internal buffer writer
62    writer: BufWriter<File>,
63    /// Configuration
64    config: HighSpeedWriterConfig,
65    /// Internal buffer
66    internal_buffer: Vec<u8>,
67    /// Write performance statistics
68    stats: WritePerformanceStats,
69    /// Start time
70    start_time: Instant,
71    /// Flush count
72    flush_count: usize,
73}
74
75impl HighSpeedBufferedWriter {
76    /// Create a new high-speed buffered writer
77    pub fn new<P: AsRef<Path>>(path: P, config: HighSpeedWriterConfig) -> TrackingResult<Self> {
78        let file = File::create(path.as_ref())
79            .map_err(|e| TrackingError::IoError(format!("create file failed: {e}")))?;
80
81        let writer = BufWriter::with_capacity(config.buffer_size, file);
82
83        // Pre-allocate internal buffer
84        let initial_capacity = config.estimated_total_size.unwrap_or(1024 * 1024); // Default 1MB
85        let internal_buffer = Vec::with_capacity(initial_capacity);
86
87        let stats = WritePerformanceStats {
88            total_bytes_written: 0,
89            shards_written: 0,
90            total_write_time_ms: 0,
91            avg_write_speed_bps: 0.0,
92            flush_count: 0,
93            preallocation_effective: false,
94            buffer_utilization: 0.0,
95        };
96
97        Ok(Self {
98            writer,
99            config,
100            internal_buffer,
101            stats,
102            start_time: Instant::now(),
103            flush_count: 0,
104        })
105    }
106
107    /// Write processed shards
108    pub fn write_processed_shards(
109        &mut self,
110        shards: &[ProcessedShard],
111    ) -> TrackingResult<WritePerformanceStats> {
112        let write_start = Instant::now();
113
114        if self.config.enable_monitoring {
115            tracing::info!(
116                "🔄 Starting high-speed buffered write for {} shards...",
117                shards.len()
118            );
119        }
120
121        // Pre-calculate total size and pre-allocate buffer
122        let total_size: usize = shards.iter().map(|s| s.data.len()).sum();
123        let estimated_final_size = total_size + 1024; // Extra space for JSON structure
124
125        // Check if pre-allocation was effective
126        self.stats.preallocation_effective =
127            self.internal_buffer.capacity() >= estimated_final_size;
128
129        if !self.stats.preallocation_effective {
130            self.internal_buffer.reserve(estimated_final_size);
131        }
132
133        // Build complete JSON structure
134        self.build_complete_json(shards)?;
135
136        // Write to file
137        self.writer
138            .write_all(&self.internal_buffer)
139            .map_err(|e| TrackingError::IoError(format!("write file failed: {e}")))?;
140
141        // flush cache
142        if self.config.auto_flush {
143            self.flush()?;
144        }
145
146        let write_time = write_start.elapsed();
147
148        // update data
149        self.stats.total_bytes_written = self.internal_buffer.len();
150        self.stats.shards_written = shards.len();
151        self.stats.total_write_time_ms = write_time.as_millis() as u64;
152        self.stats.avg_write_speed_bps = if write_time.as_secs_f64() > 0.0 {
153            self.stats.total_bytes_written as f64 / write_time.as_secs_f64()
154        } else {
155            0.0
156        };
157        self.stats.buffer_utilization = if self.internal_buffer.capacity() > 0 {
158            self.internal_buffer.len() as f64 / self.internal_buffer.capacity() as f64
159        } else {
160            0.0
161        };
162
163        if self.config.enable_monitoring {
164            self.print_write_stats();
165        }
166
167        Ok(self.stats.clone())
168    }
169
170    /// Build complete JSON structure
171    fn build_complete_json(&mut self, shards: &[ProcessedShard]) -> TrackingResult<()> {
172        // Clear internal buffer
173        self.internal_buffer.clear();
174
175        // Write JSON start
176        self.internal_buffer
177            .extend_from_slice(b"{\"allocations\":[");
178
179        // Merge all shards, add comma only between shards
180        for (i, shard) in shards.iter().enumerate() {
181            if i > 0 {
182                self.internal_buffer.extend_from_slice(b",");
183            }
184
185            // Remove shard's [ and ], only keep content
186            let shard_content = if shard.data.starts_with(b"[") && shard.data.ends_with(b"]") {
187                &shard.data[1..shard.data.len() - 1]
188            } else {
189                &shard.data
190            };
191
192            self.internal_buffer.extend_from_slice(shard_content);
193        }
194
195        // Write JSON end
196        self.internal_buffer.extend_from_slice(b"]}");
197
198        Ok(())
199    }
200
201    /// Write custom JSON data
202    pub fn write_custom_json(&mut self, json_data: &[u8]) -> TrackingResult<WritePerformanceStats> {
203        let write_start = Instant::now();
204
205        if self.config.enable_monitoring {
206            tracing::info!(
207                "🔄 Starting custom JSON data write ({} bytes)...",
208                json_data.len()
209            );
210        }
211
212        // Pre-allocate buffer
213        if self.internal_buffer.capacity() < json_data.len() {
214            self.internal_buffer.reserve(json_data.len());
215        }
216
217        // Copy data to internal buffer
218        self.internal_buffer.clear();
219        self.internal_buffer.extend_from_slice(json_data);
220
221        // Write to file
222        self.writer
223            .write_all(&self.internal_buffer)
224            .map_err(|e| TrackingError::IoError(format!("write custom JSON failed: {e}")))?;
225
226        if self.config.auto_flush {
227            self.flush()?;
228        }
229
230        let write_time = write_start.elapsed();
231
232        // Update statistics
233        self.stats.total_bytes_written = json_data.len();
234        self.stats.shards_written = 1; // Custom data counts as one shard
235        self.stats.total_write_time_ms = write_time.as_millis() as u64;
236        self.stats.avg_write_speed_bps = if write_time.as_secs_f64() > 0.0 {
237            json_data.len() as f64 / write_time.as_secs_f64()
238        } else {
239            0.0
240        };
241
242        if self.config.enable_monitoring {
243            self.print_write_stats();
244        }
245
246        Ok(self.stats.clone())
247    }
248
249    /// Force buffer flush
250    pub fn flush(&mut self) -> TrackingResult<()> {
251        self.writer
252            .flush()
253            .map_err(|e| TrackingError::IoError(format!("flush buffer failed: {e}")))?;
254
255        self.flush_count += 1;
256        self.stats.flush_count = self.flush_count;
257
258        Ok(())
259    }
260
261    /// Finalize writing and get final statistics
262    pub fn finalize(mut self) -> TrackingResult<WritePerformanceStats> {
263        // Ensure all data is written
264        self.flush()?;
265
266        // Calculate total statistics
267        let total_time = self.start_time.elapsed();
268        self.stats.total_write_time_ms = total_time.as_millis() as u64;
269
270        if self.config.enable_monitoring {
271            tracing::info!("✅ High-speed buffered write completed:");
272            tracing::info!("   Total time: {:?}", total_time);
273            self.print_write_stats();
274        }
275
276        Ok(self.stats)
277    }
278
279    /// Get current statistics
280    pub fn get_stats(&self) -> &WritePerformanceStats {
281        &self.stats
282    }
283
284    /// Get configuration
285    pub fn get_config(&self) -> &HighSpeedWriterConfig {
286        &self.config
287    }
288
289    /// Print write statistics
290    fn print_write_stats(&self) {
291        tracing::info!(
292            "   Bytes written: {} ({:.2} MB)",
293            self.stats.total_bytes_written,
294            self.stats.total_bytes_written as f64 / 1024.0 / 1024.0
295        );
296        tracing::info!("   Shards written: {}", self.stats.shards_written);
297        tracing::info!(
298            "   Write speed: {:.2} MB/s",
299            self.stats.avg_write_speed_bps / 1024.0 / 1024.0
300        );
301        tracing::info!(
302            "   Buffer utilization: {:.1}%",
303            self.stats.buffer_utilization * 100.0
304        );
305        tracing::info!("   Flush count: {}", self.stats.flush_count);
306        tracing::info!(
307            "   Preallocation effective: {}",
308            self.stats.preallocation_effective
309        );
310    }
311}
312
313/// Convenience function: fast write shards
314pub fn write_shards_fast<P: AsRef<Path>>(
315    path: P,
316    shards: &[ProcessedShard],
317) -> TrackingResult<WritePerformanceStats> {
318    let total_size: usize = shards.iter().map(|s| s.data.len()).sum();
319    let config = HighSpeedWriterConfig {
320        estimated_total_size: Some(total_size + 1024),
321        ..Default::default()
322    };
323
324    let mut writer = HighSpeedBufferedWriter::new(path, config)?;
325    writer.write_processed_shards(shards)
326}
327
328/// Convenience function: write shards with custom config
329pub fn write_shards_with_config<P: AsRef<Path>>(
330    path: P,
331    shards: &[ProcessedShard],
332    config: HighSpeedWriterConfig,
333) -> TrackingResult<WritePerformanceStats> {
334    let mut writer = HighSpeedBufferedWriter::new(path, config)?;
335    writer.write_processed_shards(shards)
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use std::fs;
342    use tempfile::NamedTempFile;
343
344    fn create_test_shards(count: usize, size_per_shard: usize) -> Vec<ProcessedShard> {
345        let mut shards = Vec::new();
346        for i in 0..count {
347            let data = format!("{{\"test_data_{i}\": {i}}}").repeat(size_per_shard / 20);
348            shards.push(ProcessedShard {
349                data: format!("[{data}]").into_bytes(),
350                allocation_count: 1,
351                shard_index: i,
352                processing_time_ms: 1,
353            });
354        }
355        shards
356    }
357
358    #[test]
359    fn test_high_speed_writer_creation() {
360        let temp_file = NamedTempFile::new().expect("Failed to create temp file");
361        let config = HighSpeedWriterConfig::default();
362        let writer = HighSpeedBufferedWriter::new(temp_file.path(), config);
363        assert!(writer.is_ok());
364    }
365
366    #[test]
367    fn test_write_processed_shards() {
368        let temp_file = NamedTempFile::new().expect("Failed to create temp file");
369        let config = HighSpeedWriterConfig::default();
370        let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
371            .expect("Failed to create writer");
372
373        let shards = create_test_shards(3, 100);
374        let result = writer.write_processed_shards(&shards);
375        assert!(result.is_ok());
376
377        let stats = result.expect("Failed to get test value");
378        assert_eq!(stats.shards_written, 3);
379        assert!(stats.total_bytes_written > 0);
380        assert!(stats.avg_write_speed_bps > 0.0);
381
382        // Verify file content
383        let content = fs::read_to_string(temp_file.path()).expect("Failed to read temp file");
384        assert!(content.starts_with("{\"allocations\":["));
385        assert!(content.ends_with("]}"));
386    }
387
388    #[test]
389    fn test_write_custom_json() {
390        let temp_file = NamedTempFile::new().expect("Failed to create temp file");
391        let config = HighSpeedWriterConfig::default();
392        let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
393            .expect("Failed to create writer");
394
395        let json_data = b"{\"test\": \"data\"}";
396        let result = writer.write_custom_json(json_data);
397        assert!(result.is_ok());
398
399        let stats = result.expect("Failed to get write stats");
400        assert_eq!(stats.total_bytes_written, json_data.len());
401
402        // Verify file content
403        let content = fs::read(temp_file.path()).expect("Failed to read temp file");
404        assert_eq!(content, json_data);
405    }
406
407    #[test]
408    fn test_preallocation_effectiveness() {
409        let temp_file = NamedTempFile::new().expect("Failed to get test value");
410        let shards = create_test_shards(5, 200);
411        let total_size: usize = shards.iter().map(|s| s.data.len()).sum();
412
413        // Test effective preallocation
414        let config = HighSpeedWriterConfig {
415            estimated_total_size: Some(total_size + 1024),
416            enable_monitoring: false,
417            ..Default::default()
418        };
419        let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
420            .expect("Failed to create temp file");
421        let stats = writer
422            .write_processed_shards(&shards)
423            .expect("Test operation failed");
424        assert!(stats.preallocation_effective);
425
426        // Test ineffective preallocation
427        let temp_file2 = NamedTempFile::new().expect("Failed to create temp file 2");
428        let config2 = HighSpeedWriterConfig {
429            estimated_total_size: Some(100), // too small preallocation
430            enable_monitoring: false,
431            ..Default::default()
432        };
433        let mut writer2 = HighSpeedBufferedWriter::new(temp_file2.path(), config2)
434            .expect("Failed to create writer 2");
435        let stats2 = writer2
436            .write_processed_shards(&shards)
437            .expect("Failed to write shards 2");
438        assert!(!stats2.preallocation_effective);
439    }
440
441    #[test]
442    fn test_convenience_functions() {
443        let temp_file = NamedTempFile::new().expect("Failed to get test value");
444        let shards = create_test_shards(2, 150);
445
446        // Test fast write function
447        let result = write_shards_fast(temp_file.path(), &shards);
448        assert!(result.is_ok());
449
450        // Test custom config function
451        let temp_file2 = NamedTempFile::new().expect("Failed to get test value");
452        let config = HighSpeedWriterConfig {
453            buffer_size: 1024 * 1024,
454            enable_monitoring: false,
455            ..Default::default()
456        };
457        let result2 = write_shards_with_config(temp_file2.path(), &shards, config);
458        assert!(result2.is_ok());
459    }
460
461    #[test]
462    fn test_flush_functionality() {
463        let temp_file = NamedTempFile::new().expect("Failed to create temp file");
464        let config = HighSpeedWriterConfig {
465            auto_flush: false,
466            enable_monitoring: false,
467            ..Default::default()
468        };
469        let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
470            .expect("Failed to create writer");
471
472        let shards = create_test_shards(1, 100);
473        let _stats = writer
474            .write_processed_shards(&shards)
475            .expect("Failed to write shards");
476
477        // Manually flush
478        let flush_result = writer.flush();
479        assert!(flush_result.is_ok());
480        assert_eq!(writer.get_stats().flush_count, 1);
481    }
482
483    #[test]
484    fn test_finalize() {
485        let temp_file = NamedTempFile::new().expect("Failed to create temp file");
486        let config = HighSpeedWriterConfig {
487            enable_monitoring: false,
488            ..Default::default()
489        };
490        let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
491            .expect("Failed to create writer");
492
493        let shards = create_test_shards(2, 100);
494        let _stats = writer
495            .write_processed_shards(&shards)
496            .expect("Failed to write shards");
497
498        let final_stats = writer.finalize();
499        assert!(final_stats.is_ok());
500
501        let stats = final_stats.expect("Failed to finalize");
502        // Stats should be valid after successful write
503        assert!(stats.total_bytes_written > 0);
504    }
505}