1use crate::core::types::{TrackingError, TrackingResult};
7use crate::export::parallel_shard_processor::ProcessedShard;
8use std::fs::File;
9use std::io::{BufWriter, Write};
10use std::path::Path;
11use std::time::Instant;
12
13#[derive(Debug, Clone)]
15pub struct HighSpeedWriterConfig {
16 pub buffer_size: usize,
18 pub enable_monitoring: bool,
20 pub estimated_total_size: Option<usize>,
22 pub enable_compression: bool,
24 pub auto_flush: bool,
26}
27
28impl Default for HighSpeedWriterConfig {
29 fn default() -> Self {
30 Self {
31 buffer_size: 2 * 1024 * 1024, enable_monitoring: true,
33 estimated_total_size: None,
34 enable_compression: false,
35 auto_flush: true,
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct WritePerformanceStats {
43 pub total_bytes_written: usize,
45 pub shards_written: usize,
47 pub total_write_time_ms: u64,
49 pub avg_write_speed_bps: f64,
51 pub flush_count: usize,
53 pub preallocation_effective: bool,
55 pub buffer_utilization: f64,
57}
58
59pub struct HighSpeedBufferedWriter {
61 writer: BufWriter<File>,
63 config: HighSpeedWriterConfig,
65 internal_buffer: Vec<u8>,
67 stats: WritePerformanceStats,
69 start_time: Instant,
71 flush_count: usize,
73}
74
75impl HighSpeedBufferedWriter {
76 pub fn new<P: AsRef<Path>>(path: P, config: HighSpeedWriterConfig) -> TrackingResult<Self> {
78 let file = File::create(path.as_ref())
79 .map_err(|e| TrackingError::IoError(format!("create file failed: {e}")))?;
80
81 let writer = BufWriter::with_capacity(config.buffer_size, file);
82
83 let initial_capacity = config.estimated_total_size.unwrap_or(1024 * 1024); let internal_buffer = Vec::with_capacity(initial_capacity);
86
87 let stats = WritePerformanceStats {
88 total_bytes_written: 0,
89 shards_written: 0,
90 total_write_time_ms: 0,
91 avg_write_speed_bps: 0.0,
92 flush_count: 0,
93 preallocation_effective: false,
94 buffer_utilization: 0.0,
95 };
96
97 Ok(Self {
98 writer,
99 config,
100 internal_buffer,
101 stats,
102 start_time: Instant::now(),
103 flush_count: 0,
104 })
105 }
106
107 pub fn write_processed_shards(
109 &mut self,
110 shards: &[ProcessedShard],
111 ) -> TrackingResult<WritePerformanceStats> {
112 let write_start = Instant::now();
113
114 if self.config.enable_monitoring {
115 tracing::info!(
116 "🔄 Starting high-speed buffered write for {} shards...",
117 shards.len()
118 );
119 }
120
121 let total_size: usize = shards.iter().map(|s| s.data.len()).sum();
123 let estimated_final_size = total_size + 1024; self.stats.preallocation_effective =
127 self.internal_buffer.capacity() >= estimated_final_size;
128
129 if !self.stats.preallocation_effective {
130 self.internal_buffer.reserve(estimated_final_size);
131 }
132
133 self.build_complete_json(shards)?;
135
136 self.writer
138 .write_all(&self.internal_buffer)
139 .map_err(|e| TrackingError::IoError(format!("write file failed: {e}")))?;
140
141 if self.config.auto_flush {
143 self.flush()?;
144 }
145
146 let write_time = write_start.elapsed();
147
148 self.stats.total_bytes_written = self.internal_buffer.len();
150 self.stats.shards_written = shards.len();
151 self.stats.total_write_time_ms = write_time.as_millis() as u64;
152 self.stats.avg_write_speed_bps = if write_time.as_secs_f64() > 0.0 {
153 self.stats.total_bytes_written as f64 / write_time.as_secs_f64()
154 } else {
155 0.0
156 };
157 self.stats.buffer_utilization = if self.internal_buffer.capacity() > 0 {
158 self.internal_buffer.len() as f64 / self.internal_buffer.capacity() as f64
159 } else {
160 0.0
161 };
162
163 if self.config.enable_monitoring {
164 self.print_write_stats();
165 }
166
167 Ok(self.stats.clone())
168 }
169
170 fn build_complete_json(&mut self, shards: &[ProcessedShard]) -> TrackingResult<()> {
172 self.internal_buffer.clear();
174
175 self.internal_buffer
177 .extend_from_slice(b"{\"allocations\":[");
178
179 for (i, shard) in shards.iter().enumerate() {
181 if i > 0 {
182 self.internal_buffer.extend_from_slice(b",");
183 }
184
185 let shard_content = if shard.data.starts_with(b"[") && shard.data.ends_with(b"]") {
187 &shard.data[1..shard.data.len() - 1]
188 } else {
189 &shard.data
190 };
191
192 self.internal_buffer.extend_from_slice(shard_content);
193 }
194
195 self.internal_buffer.extend_from_slice(b"]}");
197
198 Ok(())
199 }
200
201 pub fn write_custom_json(&mut self, json_data: &[u8]) -> TrackingResult<WritePerformanceStats> {
203 let write_start = Instant::now();
204
205 if self.config.enable_monitoring {
206 tracing::info!(
207 "🔄 Starting custom JSON data write ({} bytes)...",
208 json_data.len()
209 );
210 }
211
212 if self.internal_buffer.capacity() < json_data.len() {
214 self.internal_buffer.reserve(json_data.len());
215 }
216
217 self.internal_buffer.clear();
219 self.internal_buffer.extend_from_slice(json_data);
220
221 self.writer
223 .write_all(&self.internal_buffer)
224 .map_err(|e| TrackingError::IoError(format!("write custom JSON failed: {e}")))?;
225
226 if self.config.auto_flush {
227 self.flush()?;
228 }
229
230 let write_time = write_start.elapsed();
231
232 self.stats.total_bytes_written = json_data.len();
234 self.stats.shards_written = 1; self.stats.total_write_time_ms = write_time.as_millis() as u64;
236 self.stats.avg_write_speed_bps = if write_time.as_secs_f64() > 0.0 {
237 json_data.len() as f64 / write_time.as_secs_f64()
238 } else {
239 0.0
240 };
241
242 if self.config.enable_monitoring {
243 self.print_write_stats();
244 }
245
246 Ok(self.stats.clone())
247 }
248
249 pub fn flush(&mut self) -> TrackingResult<()> {
251 self.writer
252 .flush()
253 .map_err(|e| TrackingError::IoError(format!("flush buffer failed: {e}")))?;
254
255 self.flush_count += 1;
256 self.stats.flush_count = self.flush_count;
257
258 Ok(())
259 }
260
261 pub fn finalize(mut self) -> TrackingResult<WritePerformanceStats> {
263 self.flush()?;
265
266 let total_time = self.start_time.elapsed();
268 self.stats.total_write_time_ms = total_time.as_millis() as u64;
269
270 if self.config.enable_monitoring {
271 tracing::info!("✅ High-speed buffered write completed:");
272 tracing::info!(" Total time: {:?}", total_time);
273 self.print_write_stats();
274 }
275
276 Ok(self.stats)
277 }
278
279 pub fn get_stats(&self) -> &WritePerformanceStats {
281 &self.stats
282 }
283
284 pub fn get_config(&self) -> &HighSpeedWriterConfig {
286 &self.config
287 }
288
289 fn print_write_stats(&self) {
291 tracing::info!(
292 " Bytes written: {} ({:.2} MB)",
293 self.stats.total_bytes_written,
294 self.stats.total_bytes_written as f64 / 1024.0 / 1024.0
295 );
296 tracing::info!(" Shards written: {}", self.stats.shards_written);
297 tracing::info!(
298 " Write speed: {:.2} MB/s",
299 self.stats.avg_write_speed_bps / 1024.0 / 1024.0
300 );
301 tracing::info!(
302 " Buffer utilization: {:.1}%",
303 self.stats.buffer_utilization * 100.0
304 );
305 tracing::info!(" Flush count: {}", self.stats.flush_count);
306 tracing::info!(
307 " Preallocation effective: {}",
308 self.stats.preallocation_effective
309 );
310 }
311}
312
313pub fn write_shards_fast<P: AsRef<Path>>(
315 path: P,
316 shards: &[ProcessedShard],
317) -> TrackingResult<WritePerformanceStats> {
318 let total_size: usize = shards.iter().map(|s| s.data.len()).sum();
319 let config = HighSpeedWriterConfig {
320 estimated_total_size: Some(total_size + 1024),
321 ..Default::default()
322 };
323
324 let mut writer = HighSpeedBufferedWriter::new(path, config)?;
325 writer.write_processed_shards(shards)
326}
327
328pub fn write_shards_with_config<P: AsRef<Path>>(
330 path: P,
331 shards: &[ProcessedShard],
332 config: HighSpeedWriterConfig,
333) -> TrackingResult<WritePerformanceStats> {
334 let mut writer = HighSpeedBufferedWriter::new(path, config)?;
335 writer.write_processed_shards(shards)
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use std::fs;
342 use tempfile::NamedTempFile;
343
344 fn create_test_shards(count: usize, size_per_shard: usize) -> Vec<ProcessedShard> {
345 let mut shards = Vec::new();
346 for i in 0..count {
347 let data = format!("{{\"test_data_{i}\": {i}}}").repeat(size_per_shard / 20);
348 shards.push(ProcessedShard {
349 data: format!("[{data}]").into_bytes(),
350 allocation_count: 1,
351 shard_index: i,
352 processing_time_ms: 1,
353 });
354 }
355 shards
356 }
357
358 #[test]
359 fn test_high_speed_writer_creation() {
360 let temp_file = NamedTempFile::new().expect("Failed to create temp file");
361 let config = HighSpeedWriterConfig::default();
362 let writer = HighSpeedBufferedWriter::new(temp_file.path(), config);
363 assert!(writer.is_ok());
364 }
365
366 #[test]
367 fn test_write_processed_shards() {
368 let temp_file = NamedTempFile::new().expect("Failed to create temp file");
369 let config = HighSpeedWriterConfig::default();
370 let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
371 .expect("Failed to create writer");
372
373 let shards = create_test_shards(3, 100);
374 let result = writer.write_processed_shards(&shards);
375 assert!(result.is_ok());
376
377 let stats = result.expect("Failed to get test value");
378 assert_eq!(stats.shards_written, 3);
379 assert!(stats.total_bytes_written > 0);
380 assert!(stats.avg_write_speed_bps > 0.0);
381
382 let content = fs::read_to_string(temp_file.path()).expect("Failed to read temp file");
384 assert!(content.starts_with("{\"allocations\":["));
385 assert!(content.ends_with("]}"));
386 }
387
388 #[test]
389 fn test_write_custom_json() {
390 let temp_file = NamedTempFile::new().expect("Failed to create temp file");
391 let config = HighSpeedWriterConfig::default();
392 let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
393 .expect("Failed to create writer");
394
395 let json_data = b"{\"test\": \"data\"}";
396 let result = writer.write_custom_json(json_data);
397 assert!(result.is_ok());
398
399 let stats = result.expect("Failed to get write stats");
400 assert_eq!(stats.total_bytes_written, json_data.len());
401
402 let content = fs::read(temp_file.path()).expect("Failed to read temp file");
404 assert_eq!(content, json_data);
405 }
406
407 #[test]
408 fn test_preallocation_effectiveness() {
409 let temp_file = NamedTempFile::new().expect("Failed to get test value");
410 let shards = create_test_shards(5, 200);
411 let total_size: usize = shards.iter().map(|s| s.data.len()).sum();
412
413 let config = HighSpeedWriterConfig {
415 estimated_total_size: Some(total_size + 1024),
416 enable_monitoring: false,
417 ..Default::default()
418 };
419 let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
420 .expect("Failed to create temp file");
421 let stats = writer
422 .write_processed_shards(&shards)
423 .expect("Test operation failed");
424 assert!(stats.preallocation_effective);
425
426 let temp_file2 = NamedTempFile::new().expect("Failed to create temp file 2");
428 let config2 = HighSpeedWriterConfig {
429 estimated_total_size: Some(100), enable_monitoring: false,
431 ..Default::default()
432 };
433 let mut writer2 = HighSpeedBufferedWriter::new(temp_file2.path(), config2)
434 .expect("Failed to create writer 2");
435 let stats2 = writer2
436 .write_processed_shards(&shards)
437 .expect("Failed to write shards 2");
438 assert!(!stats2.preallocation_effective);
439 }
440
441 #[test]
442 fn test_convenience_functions() {
443 let temp_file = NamedTempFile::new().expect("Failed to get test value");
444 let shards = create_test_shards(2, 150);
445
446 let result = write_shards_fast(temp_file.path(), &shards);
448 assert!(result.is_ok());
449
450 let temp_file2 = NamedTempFile::new().expect("Failed to get test value");
452 let config = HighSpeedWriterConfig {
453 buffer_size: 1024 * 1024,
454 enable_monitoring: false,
455 ..Default::default()
456 };
457 let result2 = write_shards_with_config(temp_file2.path(), &shards, config);
458 assert!(result2.is_ok());
459 }
460
461 #[test]
462 fn test_flush_functionality() {
463 let temp_file = NamedTempFile::new().expect("Failed to create temp file");
464 let config = HighSpeedWriterConfig {
465 auto_flush: false,
466 enable_monitoring: false,
467 ..Default::default()
468 };
469 let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
470 .expect("Failed to create writer");
471
472 let shards = create_test_shards(1, 100);
473 let _stats = writer
474 .write_processed_shards(&shards)
475 .expect("Failed to write shards");
476
477 let flush_result = writer.flush();
479 assert!(flush_result.is_ok());
480 assert_eq!(writer.get_stats().flush_count, 1);
481 }
482
483 #[test]
484 fn test_finalize() {
485 let temp_file = NamedTempFile::new().expect("Failed to create temp file");
486 let config = HighSpeedWriterConfig {
487 enable_monitoring: false,
488 ..Default::default()
489 };
490 let mut writer = HighSpeedBufferedWriter::new(temp_file.path(), config)
491 .expect("Failed to create writer");
492
493 let shards = create_test_shards(2, 100);
494 let _stats = writer
495 .write_processed_shards(&shards)
496 .expect("Failed to write shards");
497
498 let final_stats = writer.finalize();
499 assert!(final_stats.is_ok());
500
501 let stats = final_stats.expect("Failed to finalize");
502 assert!(stats.total_bytes_written > 0);
504 }
505}