1#![allow(dead_code, unused_variables)]
10
11use async_trait::async_trait;
20
21use adaptive_pipeline_domain::value_objects::{ChunkFormat, FileHeader};
22use adaptive_pipeline_domain::PipelineError;
23use sha2::{Digest, Sha256};
24use std::collections::HashSet;
25use std::io::SeekFrom;
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::fs::{self as fs};
30use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
31use tokio::sync::Mutex;
32use tracing::{debug, warn};
33
34#[async_trait]
56pub trait BinaryFormatService: Send + Sync {
57 async fn create_writer(
59 &self,
60 output_path: &Path,
61 header: FileHeader,
62 ) -> Result<Box<dyn BinaryFormatWriter>, PipelineError>;
63
64 async fn create_reader(&self, input_path: &Path) -> Result<Box<dyn BinaryFormatReader>, PipelineError>;
66
67 async fn validate_file(&self, file_path: &Path) -> Result<ValidationResult, PipelineError>;
69
70 async fn read_metadata(&self, file_path: &Path) -> Result<FileHeader, PipelineError>;
72}
73
74#[async_trait]
76pub trait BinaryFormatWriter: Send + Sync {
77 fn write_chunk(&mut self, chunk: ChunkFormat) -> Result<(), PipelineError>;
80
81 async fn write_chunk_at_position(&self, chunk: ChunkFormat, sequence_number: u64) -> Result<(), PipelineError>;
87
88 async fn finalize(&self, final_header: FileHeader) -> Result<u64, PipelineError>;
93
94 fn bytes_written(&self) -> u64;
96
97 fn chunks_written(&self) -> u32;
99}
100
101#[async_trait]
103pub trait BinaryFormatReader: Send + Sync {
104 fn read_header(&self) -> Result<FileHeader, PipelineError>;
106
107 async fn read_next_chunk(&mut self) -> Result<Option<ChunkFormat>, PipelineError>;
110
111 async fn seek_to_chunk(&mut self, chunk_index: u32) -> Result<(), PipelineError>;
113
114 async fn validate_integrity(&mut self) -> Result<bool, PipelineError>;
116}
117
118#[derive(Debug, Clone)]
120pub struct ValidationResult {
121 pub is_valid: bool,
122 pub format_version: u16,
123 pub file_size: u64,
124 pub chunk_count: u32,
125 pub processing_summary: String,
126 pub integrity_verified: bool,
127 pub errors: Vec<String>,
128}
129
130pub struct AdapipeFormat;
132
133impl AdapipeFormat {
134 pub fn new() -> Self {
135 Self
136 }
137}
138
139#[async_trait]
140impl BinaryFormatService for AdapipeFormat {
141 async fn create_writer(
142 &self,
143 output_path: &Path,
144 header: FileHeader,
145 ) -> Result<Box<dyn BinaryFormatWriter>, PipelineError> {
146 let writer = StreamingBinaryWriter::new(output_path, header).await?;
148 Ok(Box::new(writer))
149 }
150
151 async fn create_reader(&self, input_path: &Path) -> Result<Box<dyn BinaryFormatReader>, PipelineError> {
152 let reader = StreamingBinaryReader::new(input_path).await?;
153 Ok(Box::new(reader))
154 }
155
156 async fn validate_file(&self, file_path: &Path) -> Result<ValidationResult, PipelineError> {
157 let mut reader = self.create_reader(file_path).await?;
158 let header = reader.read_header()?;
159 let integrity_verified = reader.validate_integrity().await?;
160
161 let file_metadata = fs::metadata(file_path)
162 .await
163 .map_err(|e| PipelineError::IoError(e.to_string()))?;
164
165 Ok(ValidationResult {
166 is_valid: true,
167 format_version: header.format_version,
168 file_size: file_metadata.len(),
169 chunk_count: header.chunk_count,
170 processing_summary: header.get_processing_summary(),
171 integrity_verified,
172 errors: Vec::new(),
173 })
174 }
175
176 async fn read_metadata(&self, file_path: &Path) -> Result<FileHeader, PipelineError> {
177 let reader = self.create_reader(file_path).await?;
178 reader.read_header()
179 }
180}
181
182pub struct BufferedBinaryWriter {
186 output_path: PathBuf,
187 header: FileHeader,
188 chunks: Vec<ChunkFormat>,
189}
190
191impl BufferedBinaryWriter {
192 fn new(output_path: PathBuf, header: FileHeader) -> Self {
193 Self {
194 output_path,
195 header,
196 chunks: Vec::new(),
197 }
198 }
199}
200
201#[async_trait]
202impl BinaryFormatWriter for BufferedBinaryWriter {
203 fn write_chunk(&mut self, chunk: ChunkFormat) -> Result<(), PipelineError> {
204 self.chunks.push(chunk);
206 Ok(())
207 }
208
209 async fn write_chunk_at_position(&self, _chunk: ChunkFormat, _sequence_number: u64) -> Result<(), PipelineError> {
210 Err(PipelineError::processing_failed(
213 "BufferedBinaryWriter doesn't support concurrent writes - use StreamingBinaryWriter",
214 ))
215 }
216
217 async fn finalize(&self, mut final_header: FileHeader) -> Result<u64, PipelineError> {
218 let mut file = tokio::fs::OpenOptions::new()
224 .create(true)
225 .write(true)
226 .truncate(true)
227 .open(&self.output_path)
228 .await
229 .map_err(|e| PipelineError::IoError(e.to_string()))?;
230
231 let mut total_bytes = 0u64;
233 let mut hasher = Sha256::new();
234
235 for chunk in &self.chunks {
236 let (chunk_bytes, chunk_size) = chunk.to_bytes_with_size();
237 file.write_all(&chunk_bytes)
238 .await
239 .map_err(|e| PipelineError::IoError(e.to_string()))?;
240 hasher.update(&chunk_bytes);
241 total_bytes += chunk_size;
242 }
243
244 final_header.chunk_count = self.chunks.len() as u32;
246 final_header.processed_at = chrono::Utc::now();
247 final_header.output_checksum = format!("{:x}", hasher.finalize());
248
249 let footer_bytes = final_header.to_footer_bytes()?;
251 file.write_all(&footer_bytes)
252 .await
253 .map_err(|e| PipelineError::IoError(e.to_string()))?;
254
255 file.flush().await.map_err(|e| PipelineError::IoError(e.to_string()))?;
256
257 Ok(total_bytes + (footer_bytes.len() as u64))
258 }
259
260 fn bytes_written(&self) -> u64 {
261 self.chunks.iter().map(|c| (c.payload.len() as u64) + 16).sum()
262 }
263
264 fn chunks_written(&self) -> u32 {
265 self.chunks.len() as u32
266 }
267}
268
269#[allow(dead_code)]
285pub struct StreamingBinaryWriter {
286 file: Arc<std::fs::File>,
290
291 bytes_written: Arc<AtomicU64>,
293 chunks_written: Arc<AtomicU64>,
294
295 initial_header: FileHeader,
296
297 output_hasher: Arc<Mutex<Sha256>>,
299
300 flush_interval: u64,
302 buffer_size_threshold: u64,
303 bytes_since_flush: Arc<AtomicU64>,
304
305 finalized: Arc<AtomicBool>,
308}
309
310impl StreamingBinaryWriter {
311 async fn new(output_path: &Path, header: FileHeader) -> Result<Self, PipelineError> {
312 let file = std::fs::OpenOptions::new()
315 .create(true)
316 .write(true)
317 .read(true) .truncate(true)
319 .open(output_path)
320 .map_err(|e| PipelineError::IoError(e.to_string()))?;
321
322 Ok(Self {
323 file: Arc::new(file),
324 bytes_written: Arc::new(AtomicU64::new(0)),
325 chunks_written: Arc::new(AtomicU64::new(0)),
326 initial_header: header,
327 output_hasher: Arc::new(Mutex::new(Sha256::new())),
328 flush_interval: 1024 * 1024,
329 buffer_size_threshold: 10 * 1024 * 1024,
330 bytes_since_flush: Arc::new(AtomicU64::new(0)),
331 finalized: Arc::new(AtomicBool::new(false)),
332 })
333 }
334}
335
336#[async_trait]
337impl BinaryFormatWriter for StreamingBinaryWriter {
338 fn write_chunk(&mut self, chunk: ChunkFormat) -> Result<(), PipelineError> {
339 let sequence_number = self.chunks_written.load(Ordering::Relaxed);
343
344 futures::executor::block_on(async { self.write_chunk_at_position(chunk, sequence_number).await })
348 }
349
350 async fn write_chunk_at_position(&self, chunk: ChunkFormat, sequence_number: u64) -> Result<(), PipelineError> {
424 chunk.validate()?;
426
427 let (chunk_bytes, chunk_size) = chunk.to_bytes_with_size();
429
430 let file_position = sequence_number * chunk_size;
434
435 let file_clone = self.file.clone();
444 let chunk_bytes_clone = chunk_bytes.clone();
445
446 tokio::task::spawn_blocking(move || {
447 #[cfg(unix)]
449 {
450 use std::os::unix::fs::FileExt;
451 file_clone.write_all_at(&chunk_bytes_clone, file_position).map_err(|e| {
453 PipelineError::IoError(format!("Failed to write chunk at position {}: {}", file_position, e))
454 })
455 }
456
457 #[cfg(windows)]
458 {
459 use std::os::windows::fs::FileExt;
460 file_clone
462 .seek_write(&chunk_bytes_clone, file_position)
463 .map(|_| ())
464 .map_err(|e| {
465 PipelineError::IoError(format!("Failed to write chunk at position {}: {}", file_position, e))
466 })
467 }
468
469 #[cfg(not(any(unix, windows)))]
470 {
471 compile_error!("Platform not supported for position-based writes")
472 }
473 })
474 .await
475 .map_err(|e| PipelineError::IoError(format!("Task join error: {}", e)))??;
476
477 {
479 let mut hasher = self.output_hasher.lock().await;
480 hasher.update(&chunk_bytes);
481 }
482
483 self.bytes_written.fetch_add(chunk_size, Ordering::Relaxed);
485 self.chunks_written.fetch_add(1, Ordering::Relaxed);
486 self.bytes_since_flush.fetch_add(chunk_size, Ordering::Relaxed);
487
488 Ok(())
489 }
490
491 async fn finalize(&self, mut final_header: FileHeader) -> Result<u64, PipelineError> {
492 if self.finalized.swap(true, Ordering::SeqCst) {
495 return Err(PipelineError::internal_error("Writer already finalized"));
496 }
497
498 final_header.chunk_count = self.chunks_written.load(Ordering::Relaxed) as u32;
500 final_header.processed_at = chrono::Utc::now();
501
502 let output_checksum = {
504 let mut hasher = self.output_hasher.lock().await;
505 let result = hasher.finalize_reset();
506 format!("{:x}", result)
507 };
508 final_header.output_checksum = output_checksum;
509
510 let footer_bytes = final_header.to_footer_bytes()?;
512 let footer_size = footer_bytes.len() as u64;
513
514 let file = self.file.clone();
516 tokio::task::spawn_blocking(move || {
517 let file_ref = &*file;
519
520 let current_pos = file_ref.metadata().map(|m| m.len()).unwrap_or(0);
522
523 #[cfg(unix)]
525 {
526 use std::os::unix::fs::FileExt;
527 file_ref
528 .write_all_at(&footer_bytes, current_pos)
529 .map_err(|e| PipelineError::IoError(e.to_string()))?;
530 }
531
532 #[cfg(windows)]
533 {
534 use std::io::{Seek, SeekFrom, Write};
535 let mut file_mut = file_ref;
538 file_mut
539 .seek(SeekFrom::Start(current_pos))
540 .map_err(|e| PipelineError::IoError(e.to_string()))?;
541 file_mut
542 .write_all(&footer_bytes)
543 .map_err(|e| PipelineError::IoError(e.to_string()))?;
544 }
545
546 file_ref.sync_all().map_err(|e| PipelineError::IoError(e.to_string()))
548 })
549 .await
550 .map_err(|e| PipelineError::IoError(format!("Task join error: {}", e)))??;
551
552 let total_bytes = self.bytes_written.load(Ordering::Relaxed) + footer_size;
553
554 Ok(total_bytes)
555 }
556
557 fn bytes_written(&self) -> u64 {
558 self.bytes_written.load(Ordering::Relaxed)
559 }
560
561 fn chunks_written(&self) -> u32 {
562 self.chunks_written.load(Ordering::Relaxed) as u32
563 }
564}
565
566#[allow(dead_code)]
568pub struct StreamingBinaryReader {
569 file: tokio::fs::File,
570 file_size: u64,
571 header: Option<FileHeader>,
572 current_chunk_index: u32,
573 chunks_start_offset: u64,
574}
575
576impl StreamingBinaryReader {
577 async fn new(input_path: &Path) -> Result<Self, PipelineError> {
578 let mut file = tokio::fs::File::open(input_path)
579 .await
580 .map_err(|e| PipelineError::IoError(e.to_string()))?;
581
582 let metadata = std::fs::metadata(input_path).map_err(|e| PipelineError::IoError(e.to_string()))?;
583 let file_size = metadata.len();
584
585 let mut file_data = Vec::new();
587 file.read_to_end(&mut file_data)
588 .await
589 .map_err(|e| PipelineError::IoError(e.to_string()))?;
590
591 let (header, footer_size) = FileHeader::from_footer_bytes(&file_data)?;
592
593 let chunks_start_offset = 0;
595
596 let mut file = tokio::fs::File::open(input_path)
598 .await
599 .map_err(|e| PipelineError::IoError(e.to_string()))?;
600 file.seek(SeekFrom::Start(chunks_start_offset))
601 .await
602 .map_err(|e| PipelineError::IoError(e.to_string()))?;
603
604 Ok(Self {
605 file,
606 file_size,
607 header: Some(header),
608 current_chunk_index: 0,
609 chunks_start_offset,
610 })
611 }
612}
613
614#[async_trait]
615impl BinaryFormatReader for StreamingBinaryReader {
616 fn read_header(&self) -> Result<FileHeader, PipelineError> {
617 self.header
619 .clone()
620 .ok_or_else(|| PipelineError::ValidationError("Header not loaded".to_string()))
621 }
622
623 async fn read_next_chunk(&mut self) -> Result<Option<ChunkFormat>, PipelineError> {
624 let header = self
626 .header
627 .as_ref()
628 .ok_or_else(|| PipelineError::ValidationError("Header not loaded".to_string()))?;
629
630 if self.current_chunk_index >= header.chunk_count {
631 return Ok(None); }
633
634 let mut chunk_header = vec![0u8; 16];
636 match self.file.read_exact(&mut chunk_header).await {
637 Ok(_) => {}
638 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
639 return Ok(None);
641 }
642 Err(e) => {
643 return Err(PipelineError::IoError(format!("Failed to read chunk header: {}", e)));
644 }
645 }
646
647 let mut nonce = [0u8; 12];
649 nonce.copy_from_slice(&chunk_header[0..12]);
650 let data_length =
651 u32::from_le_bytes([chunk_header[12], chunk_header[13], chunk_header[14], chunk_header[15]]) as usize;
652
653 let mut encrypted_data = vec![0u8; data_length];
655 self.file
656 .read_exact(&mut encrypted_data)
657 .await
658 .map_err(|e| PipelineError::IoError(format!("Failed to read chunk data: {}", e)))?;
659
660 let chunk = ChunkFormat::new(nonce, encrypted_data);
662
663 self.current_chunk_index += 1;
665
666 Ok(Some(chunk))
667 }
668
669 async fn seek_to_chunk(&mut self, chunk_index: u32) -> Result<(), PipelineError> {
670 if chunk_index == 0 {
674 self.file
675 .seek(SeekFrom::Start(self.chunks_start_offset))
676 .await
677 .map_err(|e| PipelineError::IoError(e.to_string()))?;
678 self.current_chunk_index = 0;
679 return Ok(());
680 }
681
682 self.file
684 .seek(SeekFrom::Start(self.chunks_start_offset))
685 .await
686 .map_err(|e| PipelineError::IoError(e.to_string()))?;
687 self.current_chunk_index = 0;
688
689 for _ in 0..chunk_index {
691 if self.read_next_chunk().await?.is_none() {
692 return Err(PipelineError::ValidationError("Chunk index out of bounds".to_string()));
693 }
694 }
695
696 Ok(())
697 }
698
699 async fn validate_integrity(&mut self) -> Result<bool, PipelineError> {
700 let header = self
702 .header
703 .as_ref()
704 .ok_or_else(|| PipelineError::ValidationError("Header not loaded".to_string()))?;
705
706 let footer_bytes = header.to_footer_bytes()?;
712 let footer_size = footer_bytes.len() as u64;
713
714 let chunk_data_size = self.file_size - footer_size;
716
717 self.file
719 .seek(SeekFrom::Start(0))
720 .await
721 .map_err(|e| PipelineError::IoError(e.to_string()))?;
722
723 let mut chunk_data = vec![0u8; chunk_data_size as usize];
725 self.file
726 .read_exact(&mut chunk_data)
727 .await
728 .map_err(|e| PipelineError::IoError(e.to_string()))?;
729
730 use sha2::Digest;
732 let mut hasher = Sha256::new();
733 hasher.update(&chunk_data);
734 let calculated_checksum = format!("{:x}", hasher.finalize());
735
736 let is_valid = calculated_checksum == header.output_checksum;
738
739 self.file
741 .seek(SeekFrom::Start(self.chunks_start_offset))
742 .await
743 .map_err(|e| PipelineError::IoError(e.to_string()))?;
744 self.current_chunk_index = 0;
745
746 Ok(is_valid)
747 }
748}
749
750impl Default for AdapipeFormat {
751 fn default() -> Self {
752 Self::new()
753 }
754}
755
756pub struct TransactionalBinaryWriter {
805 temp_file: Arc<Mutex<tokio::fs::File>>,
807
808 temp_path: PathBuf,
810
811 final_path: PathBuf,
813
814 completed_chunks: Arc<Mutex<HashSet<u64>>>,
816
817 expected_chunk_count: u64,
819
820 bytes_written: Arc<AtomicU64>,
822
823 chunks_written: Arc<AtomicU64>,
825
826 checkpoint_interval: u64,
828
829 last_checkpoint: Arc<AtomicU64>,
831}
832
833impl TransactionalBinaryWriter {
834 pub async fn new(output_path: PathBuf, expected_chunk_count: u64) -> Result<Self, PipelineError> {
843 let temp_path = output_path.with_extension("adapipe.tmp");
845
846 let temp_file = tokio::fs::File::create(&temp_path)
848 .await
849 .map_err(|e| PipelineError::io_error(format!("Failed to create temporary file: {}", e)))?;
850
851 Ok(Self {
852 temp_file: Arc::new(Mutex::new(temp_file)),
853 temp_path,
854 final_path: output_path,
855 completed_chunks: Arc::new(Mutex::new(HashSet::new())),
856 expected_chunk_count,
857 bytes_written: Arc::new(AtomicU64::new(0)),
858 chunks_written: Arc::new(AtomicU64::new(0)),
859 checkpoint_interval: 10, last_checkpoint: Arc::new(AtomicU64::new(0)),
861 })
862 }
863
864 async fn create_checkpoint(&self) -> Result<(), PipelineError> {
869 {
871 let file_guard = self.temp_file.lock().await;
872 file_guard
873 .sync_data()
874 .await
875 .map_err(|e| PipelineError::io_error(format!("Failed to sync data for checkpoint: {}", e)))?;
876 }
877
878 let current_chunks = self.chunks_written.load(Ordering::Relaxed);
880 self.last_checkpoint.store(current_chunks, Ordering::Relaxed);
881
882 debug!(
883 "Created checkpoint: {} chunks completed out of {} expected",
884 current_chunks, self.expected_chunk_count
885 );
886
887 Ok(())
888 }
889
890 pub async fn commit(self) -> Result<(), PipelineError> {
899 let completed_count = self.completed_chunks.lock().await.len() as u64;
901 if completed_count != self.expected_chunk_count {
902 return Err(PipelineError::ValidationError(format!(
903 "Incomplete transaction: {} chunks written, {} expected",
904 completed_count, self.expected_chunk_count
905 )));
906 }
907
908 {
910 let file_guard = self.temp_file.lock().await;
911 file_guard
912 .sync_all()
913 .await
914 .map_err(|e| PipelineError::io_error(format!("Failed to sync file before commit: {}", e)))?;
915 }
916
917 tokio::fs::rename(&self.temp_path, &self.final_path)
919 .await
920 .map_err(|e| PipelineError::io_error(format!("Failed to commit transaction (rename): {}", e)))?;
921
922 let bytes_written = self.bytes_written.load(Ordering::Relaxed);
923 debug!(
924 "Transaction committed successfully: {} chunks, {} bytes written to {:?}",
925 completed_count, bytes_written, self.final_path
926 );
927
928 Ok(())
929 }
930
931 pub async fn rollback(self) -> Result<(), PipelineError> {
936 if self.temp_path.exists() {
938 tokio::fs::remove_file(&self.temp_path).await.map_err(|e| {
939 PipelineError::io_error(format!("Failed to remove temporary file during rollback: {}", e))
940 })?;
941 }
942
943 let completed_count = self.completed_chunks.lock().await.len();
944 warn!(
945 "Transaction rolled back: {} chunks were written before rollback",
946 completed_count
947 );
948
949 Ok(())
950 }
951
952 pub async fn progress(&self) -> (u64, u64, u64) {
958 let completed_count = self.completed_chunks.lock().await.len() as u64;
959 let bytes_written = self.bytes_written.load(Ordering::Relaxed);
960 (completed_count, self.expected_chunk_count, bytes_written)
961 }
962
963 pub async fn is_complete(&self) -> bool {
965 let completed_count = self.completed_chunks.lock().await.len() as u64;
966 completed_count == self.expected_chunk_count
967 }
968
969 pub fn total_chunks(&self) -> u64 {
971 self.expected_chunk_count
972 }
973
974 pub fn progress_percentage(&self) -> f64 {
976 let written = self.chunks_written.load(Ordering::Relaxed) as f64;
977 let total = self.expected_chunk_count as f64;
978 if total == 0.0 {
979 100.0
980 } else {
981 (written / total) * 100.0
982 }
983 }
984
985 pub fn is_transaction_active(&self) -> bool {
987 self.temp_path.exists()
988 }
989}
990
991#[async_trait]
993impl BinaryFormatWriter for TransactionalBinaryWriter {
994 fn write_chunk(&mut self, chunk: ChunkFormat) -> Result<(), PipelineError> {
995 let sequence_number = self.chunks_written.load(Ordering::Relaxed);
997
998 futures::executor::block_on(async { self.write_chunk_at_position(chunk, sequence_number).await })
1000 }
1001
1002 async fn write_chunk_at_position(&self, chunk: ChunkFormat, sequence_number: u64) -> Result<(), PipelineError> {
1003 chunk.validate()?;
1005
1006 let (chunk_bytes, chunk_size) = chunk.to_bytes_with_size();
1008
1009 let file_position = sequence_number * chunk_size;
1011
1012 {
1014 let mut file_guard = self.temp_file.lock().await;
1015
1016 file_guard
1018 .seek(SeekFrom::Start(file_position))
1019 .await
1020 .map_err(|e| PipelineError::io_error(format!("Failed to seek to position {}: {}", file_position, e)))?;
1021
1022 file_guard.write_all(&chunk_bytes).await.map_err(|e| {
1024 PipelineError::io_error(format!("Failed to write chunk at position {}: {}", file_position, e))
1025 })?;
1026 }
1027
1028 {
1030 let mut completed = self.completed_chunks.lock().await;
1031 completed.insert(sequence_number);
1032 }
1033
1034 self.bytes_written.fetch_add(chunk_size, Ordering::Relaxed);
1036 let current_chunks = self.chunks_written.fetch_add(1, Ordering::Relaxed) + 1;
1037
1038 let should_checkpoint = {
1040 let last_checkpoint = self.last_checkpoint.load(Ordering::Relaxed);
1041 current_chunks - last_checkpoint >= self.checkpoint_interval
1042 };
1043
1044 if should_checkpoint {
1045 self.create_checkpoint().await?;
1046 }
1047
1048 Ok(())
1049 }
1050
1051 async fn finalize(&self, final_header: FileHeader) -> Result<u64, PipelineError> {
1052 let footer_bytes = final_header.to_footer_bytes()?;
1054
1055 {
1056 let mut file_guard = self.temp_file.lock().await;
1057 file_guard
1058 .write_all(&footer_bytes)
1059 .await
1060 .map_err(|e| PipelineError::io_error(format!("Failed to write footer: {}", e)))?;
1061
1062 file_guard
1063 .flush()
1064 .await
1065 .map_err(|e| PipelineError::io_error(format!("Failed to flush footer: {}", e)))?;
1066 }
1067
1068 Ok(self.bytes_written.load(Ordering::Relaxed))
1069 }
1070
1071 fn bytes_written(&self) -> u64 {
1072 self.bytes_written.load(Ordering::Relaxed)
1073 }
1074
1075 fn chunks_written(&self) -> u32 {
1076 self.chunks_written.load(Ordering::Relaxed) as u32
1077 }
1078}
1079
1080impl Drop for TransactionalBinaryWriter {
1082 fn drop(&mut self) {
1083 if self.temp_path.exists() {
1084 warn!(
1085 "TransactionalBinaryWriter dropped with uncommitted temporary file: {:?}",
1086 self.temp_path
1087 );
1088 warn!("Consider calling rollback() explicitly to clean up resources");
1089 }
1090 }
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095 use super::*;
1096 use adaptive_pipeline_domain::value_objects::{ChunkFormat, FileHeader};
1097 use tempfile::TempDir;
1098
1099 #[tokio::test]
1100 async fn test_binary_format_roundtrip() {
1101 let temp_dir = TempDir::new().unwrap();
1103 let test_file_path = temp_dir.path().join("test.adapipe");
1104
1105 let header = FileHeader::new(
1107 "test_file.txt".to_string(),
1108 1024,
1109 "original_checksum_abc123".to_string(),
1110 )
1111 .add_compression_step("brotli", 6)
1112 .add_encryption_step("aes256gcm", "argon2", 32, 12)
1113 .with_chunk_info(1024, 2)
1114 .with_pipeline_id("test-pipeline".to_string());
1115
1116 let chunk1 = ChunkFormat::new([1u8; 12], vec![0xde, 0xad, 0xbe, 0xef]);
1118 let chunk2 = ChunkFormat::new([2u8; 12], vec![0xca, 0xfe, 0xba, 0xbe]);
1119
1120 let service = AdapipeFormat::new();
1122 let mut writer = service.create_writer(&test_file_path, header.clone()).await.unwrap();
1123 writer.write_chunk(chunk1.clone()).unwrap();
1124 writer.write_chunk(chunk2.clone()).unwrap();
1125
1126 let final_header = header.clone();
1128 writer.finalize(final_header).await.unwrap();
1129
1130 let mut reader = service.create_reader(&test_file_path).await.unwrap();
1132
1133 let read_header = reader.read_header().unwrap();
1135 assert_eq!(read_header.original_filename, "test_file.txt");
1136 assert_eq!(read_header.chunk_count, 2);
1137 assert!(read_header.is_compressed());
1138 assert!(read_header.is_encrypted());
1139
1140 let read_chunk1 = reader.read_next_chunk().await.unwrap();
1142 assert!(read_chunk1.is_some());
1143 let read_chunk1 = read_chunk1.unwrap();
1144 assert_eq!(read_chunk1.nonce, chunk1.nonce);
1145 assert_eq!(read_chunk1.payload, chunk1.payload);
1146
1147 let read_chunk2 = reader.read_next_chunk().await.unwrap();
1148 assert!(read_chunk2.is_some());
1149 let read_chunk2 = read_chunk2.unwrap();
1150 assert_eq!(read_chunk2.nonce, chunk2.nonce);
1151 assert_eq!(read_chunk2.payload, chunk2.payload);
1152
1153 let read_chunk3 = reader.read_next_chunk().await.unwrap();
1155 assert!(read_chunk3.is_none());
1156
1157 let is_valid = reader.validate_integrity().await.unwrap();
1159 assert!(is_valid, "File integrity validation should pass");
1160 }
1161
1162 #[tokio::test]
1163 async fn test_file_validation() {
1164 let temp_dir = TempDir::new().unwrap();
1166 let test_file_path = temp_dir.path().join("test_validation.adapipe");
1167
1168 let header = FileHeader::new(
1170 "validation_test.txt".to_string(),
1171 2048,
1172 "original_checksum_xyz789".to_string(),
1173 )
1174 .add_compression_step("zstd", 3)
1175 .with_chunk_info(1024, 1)
1176 .with_pipeline_id("validation-pipeline".to_string());
1177
1178 let chunk = ChunkFormat::new([5u8; 12], vec![0x12, 0x34, 0x56, 0x78]);
1180
1181 let service = AdapipeFormat::new();
1183 let mut writer = service.create_writer(&test_file_path, header.clone()).await.unwrap();
1184 writer.write_chunk(chunk.clone()).unwrap();
1185 let final_header = header.clone();
1186 writer.finalize(final_header).await.unwrap();
1187
1188 let validation_result = service.validate_file(&test_file_path).await.unwrap();
1190 assert!(validation_result.is_valid);
1191 assert_eq!(validation_result.chunk_count, 1);
1192 assert_eq!(validation_result.format_version, 1);
1193 assert!(validation_result.integrity_verified);
1194 assert!(validation_result.errors.is_empty());
1195 }
1196
1197 #[tokio::test]
1198 async fn test_read_metadata() {
1199 let temp_dir = TempDir::new().unwrap();
1201 let test_file_path = temp_dir.path().join("test_metadata.adapipe");
1202
1203 let header = FileHeader::new(
1205 "metadata_test.txt".to_string(),
1206 4096,
1207 "checksum_metadata_test".to_string(),
1208 )
1209 .add_encryption_step("chacha20poly1305", "pbkdf2", 32, 12)
1210 .with_chunk_info(2048, 2)
1211 .with_pipeline_id("metadata-pipeline".to_string())
1212 .with_metadata("custom_key".to_string(), "custom_value".to_string());
1213
1214 let chunk1 = ChunkFormat::new([7u8; 12], vec![0xaa, 0xbb, 0xcc, 0xdd]);
1216 let chunk2 = ChunkFormat::new([8u8; 12], vec![0x11, 0x22, 0x33, 0x44]);
1217
1218 let service = AdapipeFormat::new();
1219 let mut writer = service.create_writer(&test_file_path, header.clone()).await.unwrap();
1220 writer.write_chunk(chunk1).unwrap();
1221 writer.write_chunk(chunk2).unwrap();
1222 let final_header = header.clone();
1223 writer.finalize(final_header).await.unwrap();
1224
1225 let metadata = service.read_metadata(&test_file_path).await.unwrap();
1227 assert_eq!(metadata.original_filename, "metadata_test.txt");
1228 assert_eq!(metadata.original_size, 4096);
1229 assert_eq!(metadata.chunk_count, 2);
1230 assert_eq!(metadata.pipeline_id, "metadata-pipeline");
1231 assert!(metadata.is_encrypted());
1232 assert!(!metadata.is_compressed());
1233 assert_eq!(metadata.encryption_algorithm(), Some("chacha20poly1305"));
1234 assert_eq!(metadata.metadata.get("custom_key"), Some(&"custom_value".to_string()));
1235 }
1236
1237 #[tokio::test]
1238 async fn test_seek_to_chunk() {
1239 let temp_dir = TempDir::new().unwrap();
1241 let test_file_path = temp_dir.path().join("test_seek.adapipe");
1242
1243 let header = FileHeader::new("seek_test.txt".to_string(), 3072, "checksum_seek_test".to_string())
1245 .with_chunk_info(1024, 3);
1246
1247 let chunk1 = ChunkFormat::new([1u8; 12], vec![0x01, 0x02, 0x03, 0x04]);
1249 let chunk2 = ChunkFormat::new([2u8; 12], vec![0x05, 0x06, 0x07, 0x08]);
1250 let chunk3 = ChunkFormat::new([3u8; 12], vec![0x09, 0x0a, 0x0b, 0x0c]);
1251
1252 let service = AdapipeFormat::new();
1254 let mut writer = service.create_writer(&test_file_path, header.clone()).await.unwrap();
1255 writer.write_chunk(chunk1.clone()).unwrap();
1256 writer.write_chunk(chunk2.clone()).unwrap();
1257 writer.write_chunk(chunk3.clone()).unwrap();
1258 let final_header = header.clone();
1259 writer.finalize(final_header).await.unwrap();
1260
1261 let mut reader = service.create_reader(&test_file_path).await.unwrap();
1263
1264 reader.seek_to_chunk(2).await.unwrap();
1266 let read_chunk = reader.read_next_chunk().await.unwrap().unwrap();
1267 assert_eq!(read_chunk.nonce, chunk3.nonce);
1268 assert_eq!(read_chunk.payload, chunk3.payload);
1269
1270 reader.seek_to_chunk(0).await.unwrap();
1272 let read_chunk = reader.read_next_chunk().await.unwrap().unwrap();
1273 assert_eq!(read_chunk.nonce, chunk1.nonce);
1274 assert_eq!(read_chunk.payload, chunk1.payload);
1275
1276 reader.seek_to_chunk(1).await.unwrap();
1278 let read_chunk = reader.read_next_chunk().await.unwrap().unwrap();
1279 assert_eq!(read_chunk.nonce, chunk2.nonce);
1280 assert_eq!(read_chunk.payload, chunk2.payload);
1281 }
1282}