adaptive_pipeline/infrastructure/services/
binary_format.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8// Infrastructure service with future buffered writer features
9#![allow(dead_code, unused_variables)]
10
11//! # Binary Format Service Implementation
12//!
13//! Services for reading and writing the Adaptive Pipeline binary format
14//! (.adapipe). Provides streaming I/O, integrity verification with SHA-256
15//! checksums, metadata preservation, and format versioning. Structure:
16//! \[CHUNK_DATA\]\[JSON_HEADER\] \[HEADER_LENGTH\]\[FORMAT_VERSION\]\
17//! [MAGIC_BYTES\]. See mdBook for detailed format specification.
18
19use async_trait::async_trait;
20
21use adaptive_pipeline_domain::value_objects::{ChunkFormat, FileHeader};
22use adaptive_pipeline_domain::PipelineError;
23use sha2::{Digest, Sha256};
24use std::collections::HashSet;
25use std::io::SeekFrom;
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::fs::{self as fs};
30use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
31use tokio::sync::Mutex;
32use tracing::{debug, warn};
33
34/// Service for writing and reading Adaptive Pipeline processed files (.adapipe
35/// format)
36///
37/// This trait defines the interface for handling the .adapipe binary format,
38/// which is specifically designed for files that have been processed through
39/// the adaptive pipeline system with compression and/or encryption.
40///
41/// # Important Note
42///
43/// This service handles .adapipe format files (processed pipeline output),
44/// NOT general binary files like .png, .exe, etc. The .adapipe format is
45/// a custom format designed for pipeline-processed data with embedded metadata.
46///
47/// # Key Features
48///
49/// - **Streaming I/O**: Efficient processing without loading entire files
50/// - **Metadata Preservation**: Maintains original file information
51/// - **Integrity Verification**: Built-in checksums and validation
52/// - **Version Management**: Handles format versioning and compatibility
53///
54/// # Examples
55#[async_trait]
56pub trait BinaryFormatService: Send + Sync {
57    /// Creates a new .adapipe format writer for streaming processed output
58    async fn create_writer(
59        &self,
60        output_path: &Path,
61        header: FileHeader,
62    ) -> Result<Box<dyn BinaryFormatWriter>, PipelineError>;
63
64    /// Creates a new .adapipe format reader for streaming processed input
65    async fn create_reader(&self, input_path: &Path) -> Result<Box<dyn BinaryFormatReader>, PipelineError>;
66
67    /// Validates an .adapipe processed file without full restoration
68    async fn validate_file(&self, file_path: &Path) -> Result<ValidationResult, PipelineError>;
69
70    /// Extracts metadata from an .adapipe processed file
71    async fn read_metadata(&self, file_path: &Path) -> Result<FileHeader, PipelineError>;
72}
73
74/// Writer for streaming .adapipe processed files
75#[async_trait]
76pub trait BinaryFormatWriter: Send + Sync {
77    /// Writes a processed chunk (compressed/encrypted data) to the .adapipe
78    /// file
79    fn write_chunk(&mut self, chunk: ChunkFormat) -> Result<(), PipelineError>;
80
81    /// Writes a processed chunk at a specific position for concurrent
82    /// processing
83    ///
84    /// Changed from `&mut self` to `&self` for thread-safe concurrent access.
85    /// Multiple workers can now call this simultaneously without mutex!
86    async fn write_chunk_at_position(&self, chunk: ChunkFormat, sequence_number: u64) -> Result<(), PipelineError>;
87
88    /// Finalizes the .adapipe file by writing the footer with complete metadata
89    ///
90    /// Changed from `self: Box<Self>` to `&self` for Arc sharing compatibility.
91    /// Uses internal AtomicBool to prevent double-finalization.
92    async fn finalize(&self, final_header: FileHeader) -> Result<u64, PipelineError>;
93
94    /// Gets the current number of bytes written
95    fn bytes_written(&self) -> u64;
96
97    /// Gets the current number of chunks written
98    fn chunks_written(&self) -> u32;
99}
100
101/// Reader for streaming .adapipe processed files
102#[async_trait]
103pub trait BinaryFormatReader: Send + Sync {
104    /// Reads the .adapipe file header/metadata
105    fn read_header(&self) -> Result<FileHeader, PipelineError>;
106
107    /// Reads the next processed chunk (compressed/encrypted data) from the
108    /// .adapipe file
109    async fn read_next_chunk(&mut self) -> Result<Option<ChunkFormat>, PipelineError>;
110
111    /// Seeks to a specific chunk by index
112    async fn seek_to_chunk(&mut self, chunk_index: u32) -> Result<(), PipelineError>;
113
114    /// Validates the file integrity
115    async fn validate_integrity(&mut self) -> Result<bool, PipelineError>;
116}
117
118/// Result of file validation
119#[derive(Debug, Clone)]
120pub struct ValidationResult {
121    pub is_valid: bool,
122    pub format_version: u16,
123    pub file_size: u64,
124    pub chunk_count: u32,
125    pub processing_summary: String,
126    pub integrity_verified: bool,
127    pub errors: Vec<String>,
128}
129
130/// Implementation of BinaryFormatService
131pub struct AdapipeFormat;
132
133impl AdapipeFormat {
134    pub fn new() -> Self {
135        Self
136    }
137}
138
139#[async_trait]
140impl BinaryFormatService for AdapipeFormat {
141    async fn create_writer(
142        &self,
143        output_path: &Path,
144        header: FileHeader,
145    ) -> Result<Box<dyn BinaryFormatWriter>, PipelineError> {
146        // Create a streaming writer that supports concurrent writes
147        let writer = StreamingBinaryWriter::new(output_path, header).await?;
148        Ok(Box::new(writer))
149    }
150
151    async fn create_reader(&self, input_path: &Path) -> Result<Box<dyn BinaryFormatReader>, PipelineError> {
152        let reader = StreamingBinaryReader::new(input_path).await?;
153        Ok(Box::new(reader))
154    }
155
156    async fn validate_file(&self, file_path: &Path) -> Result<ValidationResult, PipelineError> {
157        let mut reader = self.create_reader(file_path).await?;
158        let header = reader.read_header()?;
159        let integrity_verified = reader.validate_integrity().await?;
160
161        let file_metadata = fs::metadata(file_path)
162            .await
163            .map_err(|e| PipelineError::IoError(e.to_string()))?;
164
165        Ok(ValidationResult {
166            is_valid: true,
167            format_version: header.format_version,
168            file_size: file_metadata.len(),
169            chunk_count: header.chunk_count,
170            processing_summary: header.get_processing_summary(),
171            integrity_verified,
172            errors: Vec::new(),
173        })
174    }
175
176    async fn read_metadata(&self, file_path: &Path) -> Result<FileHeader, PipelineError> {
177        let reader = self.create_reader(file_path).await?;
178        reader.read_header()
179    }
180}
181
182/// Buffered writer that stores chunks in memory and writes them all during
183/// finalize This is simpler than StreamingBinaryWriter and suitable for tests
184/// and small files
185pub struct BufferedBinaryWriter {
186    output_path: PathBuf,
187    header: FileHeader,
188    chunks: Vec<ChunkFormat>,
189}
190
191impl BufferedBinaryWriter {
192    fn new(output_path: PathBuf, header: FileHeader) -> Self {
193        Self {
194            output_path,
195            header,
196            chunks: Vec::new(),
197        }
198    }
199}
200
201#[async_trait]
202impl BinaryFormatWriter for BufferedBinaryWriter {
203    fn write_chunk(&mut self, chunk: ChunkFormat) -> Result<(), PipelineError> {
204        // Just buffer the chunk in memory
205        self.chunks.push(chunk);
206        Ok(())
207    }
208
209    async fn write_chunk_at_position(&self, _chunk: ChunkFormat, _sequence_number: u64) -> Result<(), PipelineError> {
210        // For buffered writer, this would need interior mutability (Mutex<Vec>)
211        // but it's only used for tests with write_chunk(), so we return an error here
212        Err(PipelineError::processing_failed(
213            "BufferedBinaryWriter doesn't support concurrent writes - use StreamingBinaryWriter",
214        ))
215    }
216
217    async fn finalize(&self, mut final_header: FileHeader) -> Result<u64, PipelineError> {
218        // BufferedBinaryWriter is only for tests, not production
219        // In production, use StreamingBinaryWriter with concurrent writes
220        // This implementation writes all buffered chunks to file
221
222        // Create the output file
223        let mut file = tokio::fs::OpenOptions::new()
224            .create(true)
225            .write(true)
226            .truncate(true)
227            .open(&self.output_path)
228            .await
229            .map_err(|e| PipelineError::IoError(e.to_string()))?;
230
231        // Write all buffered chunks
232        let mut total_bytes = 0u64;
233        let mut hasher = Sha256::new();
234
235        for chunk in &self.chunks {
236            let (chunk_bytes, chunk_size) = chunk.to_bytes_with_size();
237            file.write_all(&chunk_bytes)
238                .await
239                .map_err(|e| PipelineError::IoError(e.to_string()))?;
240            hasher.update(&chunk_bytes);
241            total_bytes += chunk_size;
242        }
243
244        // Update final header with actual values
245        final_header.chunk_count = self.chunks.len() as u32;
246        final_header.processed_at = chrono::Utc::now();
247        final_header.output_checksum = format!("{:x}", hasher.finalize());
248
249        // Write footer
250        let footer_bytes = final_header.to_footer_bytes()?;
251        file.write_all(&footer_bytes)
252            .await
253            .map_err(|e| PipelineError::IoError(e.to_string()))?;
254
255        file.flush().await.map_err(|e| PipelineError::IoError(e.to_string()))?;
256
257        Ok(total_bytes + (footer_bytes.len() as u64))
258    }
259
260    fn bytes_written(&self) -> u64 {
261        self.chunks.iter().map(|c| (c.payload.len() as u64) + 16).sum()
262    }
263
264    fn chunks_written(&self) -> u32 {
265        self.chunks.len() as u32
266    }
267}
268
269/// Streaming writer implementation
270///
271/// ## Thread-Safe Concurrent Random-Access Writes
272///
273/// This writer supports **concurrent writes** from multiple worker tasks by
274/// using:
275/// 1. `Arc<std::fs::File>` - Shared file handle (no mutex needed!)
276/// 2. Platform-specific atomic write operations (pwrite/seek_write)
277/// 3. `&self` methods instead of `&mut self` (thread-safe)
278///
279/// **Educational: Why no mutex?**
280/// - Each write goes to a DIFFERENT file position
281/// - Platform syscalls (pwrite/seek_write) are atomic
282/// - OS kernel handles concurrency safely
283/// - Only shared state is atomic counters (lock-free)
284#[allow(dead_code)]
285pub struct StreamingBinaryWriter {
286    /// Shared file handle for concurrent access
287    /// Educational: Arc allows sharing, std::fs::File supports position-based
288    /// writes
289    file: Arc<std::fs::File>,
290
291    /// Atomic counters for thread-safe statistics
292    bytes_written: Arc<AtomicU64>,
293    chunks_written: Arc<AtomicU64>,
294
295    initial_header: FileHeader,
296
297    /// Incremental checksum calculation (mutex needed - shared mutable state)
298    output_hasher: Arc<Mutex<Sha256>>,
299
300    // Flushing strategy fields
301    flush_interval: u64,
302    buffer_size_threshold: u64,
303    bytes_since_flush: Arc<AtomicU64>,
304
305    /// Track finalization state to prevent double-finalization
306    /// Educational: AtomicBool enables thread-safe state checking without mutex
307    finalized: Arc<AtomicBool>,
308}
309
310impl StreamingBinaryWriter {
311    async fn new(output_path: &Path, header: FileHeader) -> Result<Self, PipelineError> {
312        // Create sync file handle (std::fs::File, not tokio::fs::File)
313        // Educational: We need sync file for platform-specific write_at() operations
314        let file = std::fs::OpenOptions::new()
315            .create(true)
316            .write(true)
317            .read(true) // Needed for some platform operations
318            .truncate(true)
319            .open(output_path)
320            .map_err(|e| PipelineError::IoError(e.to_string()))?;
321
322        Ok(Self {
323            file: Arc::new(file),
324            bytes_written: Arc::new(AtomicU64::new(0)),
325            chunks_written: Arc::new(AtomicU64::new(0)),
326            initial_header: header,
327            output_hasher: Arc::new(Mutex::new(Sha256::new())),
328            flush_interval: 1024 * 1024,
329            buffer_size_threshold: 10 * 1024 * 1024,
330            bytes_since_flush: Arc::new(AtomicU64::new(0)),
331            finalized: Arc::new(AtomicBool::new(false)),
332        })
333    }
334}
335
336#[async_trait]
337impl BinaryFormatWriter for StreamingBinaryWriter {
338    fn write_chunk(&mut self, chunk: ChunkFormat) -> Result<(), PipelineError> {
339        // Sequential write using current chunk count as sequence number
340        // This allows StreamingBinaryWriter to work for both sequential (tests) and
341        // concurrent (production) writes
342        let sequence_number = self.chunks_written.load(Ordering::Relaxed);
343
344        // Use async write_chunk_at_position internally
345        // We use futures::executor::block_on instead of tokio's block_on
346        // because it works both inside and outside of a tokio runtime
347        futures::executor::block_on(async { self.write_chunk_at_position(chunk, sequence_number).await })
348    }
349
350    /// Writes a processed chunk at a specific position for concurrent
351    /// processing
352    ///
353    /// This method implements **random access writing**, which is the key to
354    /// achieving true concurrent chunk processing. Instead of writing
355    /// chunks sequentially, each chunk is written directly to its
356    /// calculated position in the file.
357    ///
358    /// # How Random Access Writing Works
359    ///
360    /// ## The Problem with Sequential Writing:
361    /// ```text
362    /// Traditional approach:
363    /// Thread 1: Process chunk 0 → Wait for write slot → Write chunk 0
364    /// Thread 2: Process chunk 1 → Wait for chunk 0 to finish → Write chunk 1
365    /// Thread 3: Process chunk 2 → Wait for chunk 1 to finish → Write chunk 2
366    ///
367    /// Result: Processing is concurrent, but writing is still sequential!
368    /// ```
369    ///
370    /// ## The Solution - Random Access Writing:
371    /// ```text
372    /// Our approach:
373    /// Thread 1: Process chunk 0 → Write to position 0 (immediately)
374    /// Thread 2: Process chunk 1 → Write to position 1024 (immediately)
375    /// Thread 3: Process chunk 2 → Write to position 2048 (immediately)
376    ///
377    /// Result: Both processing AND writing are truly concurrent!
378    /// ```
379    ///
380    /// ## Position Calculation:
381    /// Each chunk's file position is calculated as:
382    /// `file_position = sequence_number * chunk_size`
383    ///
384    /// This ensures chunks are written to the correct location in the final
385    /// file, regardless of the order in which they complete processing.
386    ///
387    /// # Arguments
388    /// * `chunk` - The processed chunk data to write
389    /// * `sequence_number` - The chunk's position in the original file (0, 1,
390    ///   2, ...)
391    ///
392    /// # Returns
393    /// * `Ok(())` if the chunk was written successfully
394    /// * `Err(PipelineError)` if there was an I/O error or validation failure
395    /// Concurrent random-access writes using platform-specific atomic
396    /// operations
397    ///
398    /// ## Changed from &mut self to &self
399    /// This method is now thread-safe and can be called concurrently from
400    /// multiple workers!
401    ///
402    /// ## How Concurrent Writes Work
403    ///
404    /// **Old approach (BROKEN):**
405    /// ```text
406    /// Worker 1: Lock → Seek to pos 0 → [INTERRUPT] → Write at wrong position!
407    /// Worker 2: Lock → Seek to pos 1024 → Write → Unlock
408    /// ```
409    ///
410    /// **New approach (CORRECT):**
411    /// ```text
412    /// Worker 1: write_at(data, pos=0)     ← Atomic syscall!
413    /// Worker 2: write_at(data, pos=1024)  ← Concurrent!
414    /// Worker 3: write_at(data, pos=2048)  ← No interference!
415    /// ```
416    ///
417    /// Platform-specific operations:
418    /// - Unix/Linux/macOS: `pwrite()` via FileExt::write_all_at()
419    /// - Windows: `WriteFile()` with OVERLAPPED via FileExt::seek_write()
420    ///
421    /// Both are **single atomic syscalls** that write to a specific position
422    /// without moving the file pointer or requiring a mutex.
423    async fn write_chunk_at_position(&self, chunk: ChunkFormat, sequence_number: u64) -> Result<(), PipelineError> {
424        // STEP 1: Validate chunk format
425        chunk.validate()?;
426
427        // STEP 2: Convert chunk to bytes
428        let (chunk_bytes, chunk_size) = chunk.to_bytes_with_size();
429
430        // STEP 3: Calculate file position
431        // Educational: Each chunk has a pre-calculated position based on sequence
432        // number
433        let file_position = sequence_number * chunk_size;
434
435        // STEP 4: Concurrent random-access write using platform-specific atomic
436        // operation Educational: This is a SINGLE atomic syscall - no seek
437        // needed, no mutex needed!
438        //
439        // We use spawn_blocking because:
440        // 1. std::fs::File operations are synchronous (blocking)
441        // 2. We don't want to block the tokio runtime thread
442        // 3. Tokio's blocking thread pool handles this efficiently
443        let file_clone = self.file.clone();
444        let chunk_bytes_clone = chunk_bytes.clone();
445
446        tokio::task::spawn_blocking(move || {
447            // Platform-specific position-based write
448            #[cfg(unix)]
449            {
450                use std::os::unix::fs::FileExt;
451                // Atomic pwrite() syscall - writes at position without seeking
452                file_clone.write_all_at(&chunk_bytes_clone, file_position).map_err(|e| {
453                    PipelineError::IoError(format!("Failed to write chunk at position {}: {}", file_position, e))
454                })
455            }
456
457            #[cfg(windows)]
458            {
459                use std::os::windows::fs::FileExt;
460                // Atomic WriteFile() with OVERLAPPED - writes at position
461                file_clone
462                    .seek_write(&chunk_bytes_clone, file_position)
463                    .map(|_| ())
464                    .map_err(|e| {
465                        PipelineError::IoError(format!("Failed to write chunk at position {}: {}", file_position, e))
466                    })
467            }
468
469            #[cfg(not(any(unix, windows)))]
470            {
471                compile_error!("Platform not supported for position-based writes")
472            }
473        })
474        .await
475        .map_err(|e| PipelineError::IoError(format!("Task join error: {}", e)))??;
476
477        // STEP 5: Update incremental checksum (mutex needed - shared mutable state)
478        {
479            let mut hasher = self.output_hasher.lock().await;
480            hasher.update(&chunk_bytes);
481        }
482
483        // STEP 6: Update atomic statistics (lock-free!)
484        self.bytes_written.fetch_add(chunk_size, Ordering::Relaxed);
485        self.chunks_written.fetch_add(1, Ordering::Relaxed);
486        self.bytes_since_flush.fetch_add(chunk_size, Ordering::Relaxed);
487
488        Ok(())
489    }
490
491    async fn finalize(&self, mut final_header: FileHeader) -> Result<u64, PipelineError> {
492        // Check if already finalized (prevents double-finalization)
493        // Educational: swap() atomically sets to true and returns old value
494        if self.finalized.swap(true, Ordering::SeqCst) {
495            return Err(PipelineError::internal_error("Writer already finalized"));
496        }
497
498        // Update header with final statistics
499        final_header.chunk_count = self.chunks_written.load(Ordering::Relaxed) as u32;
500        final_header.processed_at = chrono::Utc::now();
501
502        // Finalize incremental checksum calculation
503        let output_checksum = {
504            let mut hasher = self.output_hasher.lock().await;
505            let result = hasher.finalize_reset();
506            format!("{:x}", result)
507        };
508        final_header.output_checksum = output_checksum;
509
510        // Write footer with calculated checksum
511        let footer_bytes = final_header.to_footer_bytes()?;
512        let footer_size = footer_bytes.len() as u64;
513
514        // Use spawn_blocking for sync file operations
515        let file = self.file.clone();
516        tokio::task::spawn_blocking(move || {
517            // Get mutable reference to file for write
518            let file_ref = &*file;
519
520            // Get current file size for append position
521            let current_pos = file_ref.metadata().map(|m| m.len()).unwrap_or(0);
522
523            // Write footer using platform-specific positional write
524            #[cfg(unix)]
525            {
526                use std::os::unix::fs::FileExt;
527                file_ref
528                    .write_all_at(&footer_bytes, current_pos)
529                    .map_err(|e| PipelineError::IoError(e.to_string()))?;
530            }
531
532            #[cfg(windows)]
533            {
534                use std::io::{Seek, SeekFrom, Write};
535                // Note: On Windows, seek+write is not atomic, but sufficient for single-writer
536                // scenario
537                let mut file_mut = file_ref;
538                file_mut
539                    .seek(SeekFrom::Start(current_pos))
540                    .map_err(|e| PipelineError::IoError(e.to_string()))?;
541                file_mut
542                    .write_all(&footer_bytes)
543                    .map_err(|e| PipelineError::IoError(e.to_string()))?;
544            }
545
546            // Sync to disk for durability
547            file_ref.sync_all().map_err(|e| PipelineError::IoError(e.to_string()))
548        })
549        .await
550        .map_err(|e| PipelineError::IoError(format!("Task join error: {}", e)))??;
551
552        let total_bytes = self.bytes_written.load(Ordering::Relaxed) + footer_size;
553
554        Ok(total_bytes)
555    }
556
557    fn bytes_written(&self) -> u64 {
558        self.bytes_written.load(Ordering::Relaxed)
559    }
560
561    fn chunks_written(&self) -> u32 {
562        self.chunks_written.load(Ordering::Relaxed) as u32
563    }
564}
565
566/// Streaming reader implementation
567#[allow(dead_code)]
568pub struct StreamingBinaryReader {
569    file: tokio::fs::File,
570    file_size: u64,
571    header: Option<FileHeader>,
572    current_chunk_index: u32,
573    chunks_start_offset: u64,
574}
575
576impl StreamingBinaryReader {
577    async fn new(input_path: &Path) -> Result<Self, PipelineError> {
578        let mut file = tokio::fs::File::open(input_path)
579            .await
580            .map_err(|e| PipelineError::IoError(e.to_string()))?;
581
582        let metadata = std::fs::metadata(input_path).map_err(|e| PipelineError::IoError(e.to_string()))?;
583        let file_size = metadata.len();
584
585        // Read the header from the file footer
586        let mut file_data = Vec::new();
587        file.read_to_end(&mut file_data)
588            .await
589            .map_err(|e| PipelineError::IoError(e.to_string()))?;
590
591        let (header, footer_size) = FileHeader::from_footer_bytes(&file_data)?;
592
593        // Calculate where chunk data starts (beginning of file)
594        let chunks_start_offset = 0;
595
596        // Reopen file and seek to start of chunks
597        let mut file = tokio::fs::File::open(input_path)
598            .await
599            .map_err(|e| PipelineError::IoError(e.to_string()))?;
600        file.seek(SeekFrom::Start(chunks_start_offset))
601            .await
602            .map_err(|e| PipelineError::IoError(e.to_string()))?;
603
604        Ok(Self {
605            file,
606            file_size,
607            header: Some(header),
608            current_chunk_index: 0,
609            chunks_start_offset,
610        })
611    }
612}
613
614#[async_trait]
615impl BinaryFormatReader for StreamingBinaryReader {
616    fn read_header(&self) -> Result<FileHeader, PipelineError> {
617        // Return the header that was parsed during initialization
618        self.header
619            .clone()
620            .ok_or_else(|| PipelineError::ValidationError("Header not loaded".to_string()))
621    }
622
623    async fn read_next_chunk(&mut self) -> Result<Option<ChunkFormat>, PipelineError> {
624        // Check if we've read all chunks
625        let header = self
626            .header
627            .as_ref()
628            .ok_or_else(|| PipelineError::ValidationError("Header not loaded".to_string()))?;
629
630        if self.current_chunk_index >= header.chunk_count {
631            return Ok(None); // EOF - all chunks read
632        }
633
634        // Read chunk header first (12 bytes nonce + 4 bytes length)
635        let mut chunk_header = vec![0u8; 16];
636        match self.file.read_exact(&mut chunk_header).await {
637            Ok(_) => {}
638            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
639                // Reached end of chunk data (before footer)
640                return Ok(None);
641            }
642            Err(e) => {
643                return Err(PipelineError::IoError(format!("Failed to read chunk header: {}", e)));
644            }
645        }
646
647        // Parse nonce and data length
648        let mut nonce = [0u8; 12];
649        nonce.copy_from_slice(&chunk_header[0..12]);
650        let data_length =
651            u32::from_le_bytes([chunk_header[12], chunk_header[13], chunk_header[14], chunk_header[15]]) as usize;
652
653        // Read encrypted data
654        let mut encrypted_data = vec![0u8; data_length];
655        self.file
656            .read_exact(&mut encrypted_data)
657            .await
658            .map_err(|e| PipelineError::IoError(format!("Failed to read chunk data: {}", e)))?;
659
660        // Create chunk format
661        let chunk = ChunkFormat::new(nonce, encrypted_data);
662
663        // Increment chunk index
664        self.current_chunk_index += 1;
665
666        Ok(Some(chunk))
667    }
668
669    async fn seek_to_chunk(&mut self, chunk_index: u32) -> Result<(), PipelineError> {
670        // For now, we'll implement a simple approach
671        // TODO: In production, we could maintain a chunk index for faster seeking
672
673        if chunk_index == 0 {
674            self.file
675                .seek(SeekFrom::Start(self.chunks_start_offset))
676                .await
677                .map_err(|e| PipelineError::IoError(e.to_string()))?;
678            self.current_chunk_index = 0;
679            return Ok(());
680        }
681
682        // Reset to beginning and skip chunks
683        self.file
684            .seek(SeekFrom::Start(self.chunks_start_offset))
685            .await
686            .map_err(|e| PipelineError::IoError(e.to_string()))?;
687        self.current_chunk_index = 0;
688
689        // Skip chunks until we reach the desired index
690        for _ in 0..chunk_index {
691            if self.read_next_chunk().await?.is_none() {
692                return Err(PipelineError::ValidationError("Chunk index out of bounds".to_string()));
693            }
694        }
695
696        Ok(())
697    }
698
699    async fn validate_integrity(&mut self) -> Result<bool, PipelineError> {
700        // Ensure we have header
701        let header = self
702            .header
703            .as_ref()
704            .ok_or_else(|| PipelineError::ValidationError("Header not loaded".to_string()))?;
705
706        // We need to calculate checksum of only the chunk data (not the footer)
707        // The footer contains:
708        // [JSON_HEADER][HEADER_LENGTH][FORMAT_VERSION][MAGIC_BYTES]
709
710        // First, get the footer size from the header
711        let footer_bytes = header.to_footer_bytes()?;
712        let footer_size = footer_bytes.len() as u64;
713
714        // Calculate the size of chunk data (total file size - footer size)
715        let chunk_data_size = self.file_size - footer_size;
716
717        // Seek to beginning of file
718        self.file
719            .seek(SeekFrom::Start(0))
720            .await
721            .map_err(|e| PipelineError::IoError(e.to_string()))?;
722
723        // Read only the chunk data (not the footer)
724        let mut chunk_data = vec![0u8; chunk_data_size as usize];
725        self.file
726            .read_exact(&mut chunk_data)
727            .await
728            .map_err(|e| PipelineError::IoError(e.to_string()))?;
729
730        // Calculate SHA256 checksum of chunk data
731        use sha2::Digest;
732        let mut hasher = Sha256::new();
733        hasher.update(&chunk_data);
734        let calculated_checksum = format!("{:x}", hasher.finalize());
735
736        // Compare with stored checksum
737        let is_valid = calculated_checksum == header.output_checksum;
738
739        // Reset file position to continue reading chunks if needed
740        self.file
741            .seek(SeekFrom::Start(self.chunks_start_offset))
742            .await
743            .map_err(|e| PipelineError::IoError(e.to_string()))?;
744        self.current_chunk_index = 0;
745
746        Ok(is_valid)
747    }
748}
749
750impl Default for AdapipeFormat {
751    fn default() -> Self {
752        Self::new()
753    }
754}
755
756// ============================================================================
757// Transactional Binary Writer
758// ============================================================================
759
760/// Transactional binary writer providing ACID guarantees for concurrent chunk
761/// operations.
762///
763/// The `TransactionalBinaryWriter` manages the complex process of writing
764/// multiple data chunks to a file while maintaining transactional integrity. It
765/// supports high-concurrency scenarios where multiple chunks can be written
766/// simultaneously from different threads or async tasks.
767///
768/// ## ACID Guarantees
769///
770/// ### Atomicity
771/// Either all chunks are successfully written and committed, or no changes
772/// are made to the final output file. Partial writes are isolated in temporary
773/// files until the complete transaction is ready for commit.
774///
775/// ### Consistency
776/// The file system remains in a consistent state throughout the operation.
777/// Temporary files are used to prevent corruption of the final output.
778///
779/// ### Isolation
780/// Concurrent chunk writes do not interfere with each other. Each chunk
781/// is written to its designated position without affecting other chunks.
782///
783/// ### Durability
784/// Once committed, the written data survives system crashes and power failures.
785/// Data is properly flushed to disk before the transaction is considered
786/// complete.
787///
788/// ## Core Capabilities
789///
790/// - **Transactional Semantics**: All-or-nothing commit behavior
791/// - **Concurrent Writing**: Multiple chunks written simultaneously
792/// - **Progress Tracking**: Real-time monitoring of write completion
793/// - **Crash Recovery**: Checkpoint-based recovery mechanisms
794/// - **Resource Management**: Automatic cleanup of temporary resources
795///
796/// ## Thread Safety
797///
798/// All operations are thread-safe and can be called concurrently:
799///
800/// - File access is protected by `Arc<Mutex<File>>`
801/// - Progress counters use atomic operations
802/// - Chunk tracking uses mutex-protected HashSet
803/// - No data races or undefined behavior in concurrent scenarios
804pub struct TransactionalBinaryWriter {
805    /// Temporary file handle for writing chunks
806    temp_file: Arc<Mutex<tokio::fs::File>>,
807
808    /// Path to temporary file (will be renamed to final_path on commit)
809    temp_path: PathBuf,
810
811    /// Final output path where file will be moved on commit
812    final_path: PathBuf,
813
814    /// Set of completed chunk sequence numbers for tracking progress
815    completed_chunks: Arc<Mutex<HashSet<u64>>>,
816
817    /// Total number of chunks expected to be written
818    expected_chunk_count: u64,
819
820    /// Total bytes written (atomic counter for lock-free updates)
821    bytes_written: Arc<AtomicU64>,
822
823    /// Total chunks written (atomic counter for lock-free updates)
824    chunks_written: Arc<AtomicU64>,
825
826    /// Checkpoint interval - create checkpoint every N chunks
827    checkpoint_interval: u64,
828
829    /// Last checkpoint chunk count (atomic for lock-free access)
830    last_checkpoint: Arc<AtomicU64>,
831}
832
833impl TransactionalBinaryWriter {
834    /// Creates a new transactional binary writer.
835    ///
836    /// # Arguments
837    /// * `output_path` - Final path where the file will be written
838    /// * `expected_chunk_count` - Total number of chunks expected
839    ///
840    /// # Returns
841    /// * `Result<Self, PipelineError>` - New writer or error
842    pub async fn new(output_path: PathBuf, expected_chunk_count: u64) -> Result<Self, PipelineError> {
843        // Create temporary file path with .adapipe.tmp extension
844        let temp_path = output_path.with_extension("adapipe.tmp");
845
846        // Create temporary file for writing
847        let temp_file = tokio::fs::File::create(&temp_path)
848            .await
849            .map_err(|e| PipelineError::io_error(format!("Failed to create temporary file: {}", e)))?;
850
851        Ok(Self {
852            temp_file: Arc::new(Mutex::new(temp_file)),
853            temp_path,
854            final_path: output_path,
855            completed_chunks: Arc::new(Mutex::new(HashSet::new())),
856            expected_chunk_count,
857            bytes_written: Arc::new(AtomicU64::new(0)),
858            chunks_written: Arc::new(AtomicU64::new(0)),
859            checkpoint_interval: 10, // Create checkpoint every 10 chunks
860            last_checkpoint: Arc::new(AtomicU64::new(0)),
861        })
862    }
863
864    /// Creates a checkpoint for crash recovery.
865    ///
866    /// Checkpoints allow the system to resume processing from a known good
867    /// state if the process crashes during chunk writing.
868    async fn create_checkpoint(&self) -> Result<(), PipelineError> {
869        // Flush data to disk to ensure durability
870        {
871            let file_guard = self.temp_file.lock().await;
872            file_guard
873                .sync_data()
874                .await
875                .map_err(|e| PipelineError::io_error(format!("Failed to sync data for checkpoint: {}", e)))?;
876        }
877
878        // Update last checkpoint counter
879        let current_chunks = self.chunks_written.load(Ordering::Relaxed);
880        self.last_checkpoint.store(current_chunks, Ordering::Relaxed);
881
882        debug!(
883            "Created checkpoint: {} chunks completed out of {} expected",
884            current_chunks, self.expected_chunk_count
885        );
886
887        Ok(())
888    }
889
890    /// Commits all written chunks atomically.
891    ///
892    /// This method validates that all expected chunks have been written,
893    /// flushes data to disk, and atomically moves the temporary file to
894    /// the final output location.
895    ///
896    /// # Returns
897    /// * `Result<(), PipelineError>` - Success or error
898    pub async fn commit(self) -> Result<(), PipelineError> {
899        // Validate that all expected chunks have been written
900        let completed_count = self.completed_chunks.lock().await.len() as u64;
901        if completed_count != self.expected_chunk_count {
902            return Err(PipelineError::ValidationError(format!(
903                "Incomplete transaction: {} chunks written, {} expected",
904                completed_count, self.expected_chunk_count
905            )));
906        }
907
908        // Flush all data to disk before commit
909        {
910            let file_guard = self.temp_file.lock().await;
911            file_guard
912                .sync_all()
913                .await
914                .map_err(|e| PipelineError::io_error(format!("Failed to sync file before commit: {}", e)))?;
915        }
916
917        // Atomically move temporary file to final location
918        tokio::fs::rename(&self.temp_path, &self.final_path)
919            .await
920            .map_err(|e| PipelineError::io_error(format!("Failed to commit transaction (rename): {}", e)))?;
921
922        let bytes_written = self.bytes_written.load(Ordering::Relaxed);
923        debug!(
924            "Transaction committed successfully: {} chunks, {} bytes written to {:?}",
925            completed_count, bytes_written, self.final_path
926        );
927
928        Ok(())
929    }
930
931    /// Rolls back the transaction and cleans up temporary files.
932    ///
933    /// # Returns
934    /// * `Result<(), PipelineError>` - Success or error
935    pub async fn rollback(self) -> Result<(), PipelineError> {
936        // Remove temporary file if it exists
937        if self.temp_path.exists() {
938            tokio::fs::remove_file(&self.temp_path).await.map_err(|e| {
939                PipelineError::io_error(format!("Failed to remove temporary file during rollback: {}", e))
940            })?;
941        }
942
943        let completed_count = self.completed_chunks.lock().await.len();
944        warn!(
945            "Transaction rolled back: {} chunks were written before rollback",
946            completed_count
947        );
948
949        Ok(())
950    }
951
952    /// Returns the current progress of the transaction.
953    ///
954    /// # Returns
955    /// * `(completed_chunks, total_expected, bytes_written)` - Progress
956    ///   information
957    pub async fn progress(&self) -> (u64, u64, u64) {
958        let completed_count = self.completed_chunks.lock().await.len() as u64;
959        let bytes_written = self.bytes_written.load(Ordering::Relaxed);
960        (completed_count, self.expected_chunk_count, bytes_written)
961    }
962
963    /// Checks if the transaction is complete (all chunks written).
964    pub async fn is_complete(&self) -> bool {
965        let completed_count = self.completed_chunks.lock().await.len() as u64;
966        completed_count == self.expected_chunk_count
967    }
968
969    /// Returns the total number of chunks expected.
970    pub fn total_chunks(&self) -> u64 {
971        self.expected_chunk_count
972    }
973
974    /// Returns the progress as a percentage.
975    pub fn progress_percentage(&self) -> f64 {
976        let written = self.chunks_written.load(Ordering::Relaxed) as f64;
977        let total = self.expected_chunk_count as f64;
978        if total == 0.0 {
979            100.0
980        } else {
981            (written / total) * 100.0
982        }
983    }
984
985    /// Checks if a transaction is currently active.
986    pub fn is_transaction_active(&self) -> bool {
987        self.temp_path.exists()
988    }
989}
990
991/// Implement BinaryFormatWriter trait for TransactionalBinaryWriter
992#[async_trait]
993impl BinaryFormatWriter for TransactionalBinaryWriter {
994    fn write_chunk(&mut self, chunk: ChunkFormat) -> Result<(), PipelineError> {
995        // Sequential write using current chunk count as sequence number
996        let sequence_number = self.chunks_written.load(Ordering::Relaxed);
997
998        // Block on async write_chunk_at_position
999        futures::executor::block_on(async { self.write_chunk_at_position(chunk, sequence_number).await })
1000    }
1001
1002    async fn write_chunk_at_position(&self, chunk: ChunkFormat, sequence_number: u64) -> Result<(), PipelineError> {
1003        // Validate chunk before writing
1004        chunk.validate()?;
1005
1006        // Convert chunk to bytes for writing
1007        let (chunk_bytes, chunk_size) = chunk.to_bytes_with_size();
1008
1009        // Calculate file position based on sequence number and chunk size
1010        let file_position = sequence_number * chunk_size;
1011
1012        // Lock the file for thread-safe seeking and writing
1013        {
1014            let mut file_guard = self.temp_file.lock().await;
1015
1016            // Seek to the calculated position
1017            file_guard
1018                .seek(SeekFrom::Start(file_position))
1019                .await
1020                .map_err(|e| PipelineError::io_error(format!("Failed to seek to position {}: {}", file_position, e)))?;
1021
1022            // Write the chunk bytes
1023            file_guard.write_all(&chunk_bytes).await.map_err(|e| {
1024                PipelineError::io_error(format!("Failed to write chunk at position {}: {}", file_position, e))
1025            })?;
1026        }
1027
1028        // Update tracking information
1029        {
1030            let mut completed = self.completed_chunks.lock().await;
1031            completed.insert(sequence_number);
1032        }
1033
1034        // Update progress counters using atomic operations
1035        self.bytes_written.fetch_add(chunk_size, Ordering::Relaxed);
1036        let current_chunks = self.chunks_written.fetch_add(1, Ordering::Relaxed) + 1;
1037
1038        // Check if we should create a checkpoint
1039        let should_checkpoint = {
1040            let last_checkpoint = self.last_checkpoint.load(Ordering::Relaxed);
1041            current_chunks - last_checkpoint >= self.checkpoint_interval
1042        };
1043
1044        if should_checkpoint {
1045            self.create_checkpoint().await?;
1046        }
1047
1048        Ok(())
1049    }
1050
1051    async fn finalize(&self, final_header: FileHeader) -> Result<u64, PipelineError> {
1052        // Write footer with metadata
1053        let footer_bytes = final_header.to_footer_bytes()?;
1054
1055        {
1056            let mut file_guard = self.temp_file.lock().await;
1057            file_guard
1058                .write_all(&footer_bytes)
1059                .await
1060                .map_err(|e| PipelineError::io_error(format!("Failed to write footer: {}", e)))?;
1061
1062            file_guard
1063                .flush()
1064                .await
1065                .map_err(|e| PipelineError::io_error(format!("Failed to flush footer: {}", e)))?;
1066        }
1067
1068        Ok(self.bytes_written.load(Ordering::Relaxed))
1069    }
1070
1071    fn bytes_written(&self) -> u64 {
1072        self.bytes_written.load(Ordering::Relaxed)
1073    }
1074
1075    fn chunks_written(&self) -> u32 {
1076        self.chunks_written.load(Ordering::Relaxed) as u32
1077    }
1078}
1079
1080/// Implement Drop to ensure cleanup on panic or early termination
1081impl Drop for TransactionalBinaryWriter {
1082    fn drop(&mut self) {
1083        if self.temp_path.exists() {
1084            warn!(
1085                "TransactionalBinaryWriter dropped with uncommitted temporary file: {:?}",
1086                self.temp_path
1087            );
1088            warn!("Consider calling rollback() explicitly to clean up resources");
1089        }
1090    }
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096    use adaptive_pipeline_domain::value_objects::{ChunkFormat, FileHeader};
1097    use tempfile::TempDir;
1098
1099    #[tokio::test]
1100    async fn test_binary_format_roundtrip() {
1101        // Create a temporary file for testing
1102        let temp_dir = TempDir::new().unwrap();
1103        let test_file_path = temp_dir.path().join("test.adapipe");
1104
1105        // Create test header
1106        let header = FileHeader::new(
1107            "test_file.txt".to_string(),
1108            1024,
1109            "original_checksum_abc123".to_string(),
1110        )
1111        .add_compression_step("brotli", 6)
1112        .add_encryption_step("aes256gcm", "argon2", 32, 12)
1113        .with_chunk_info(1024, 2)
1114        .with_pipeline_id("test-pipeline".to_string());
1115
1116        // Create test chunks
1117        let chunk1 = ChunkFormat::new([1u8; 12], vec![0xde, 0xad, 0xbe, 0xef]);
1118        let chunk2 = ChunkFormat::new([2u8; 12], vec![0xca, 0xfe, 0xba, 0xbe]);
1119
1120        // Write file using StreamingBinaryWriter
1121        let service = AdapipeFormat::new();
1122        let mut writer = service.create_writer(&test_file_path, header.clone()).await.unwrap();
1123        writer.write_chunk(chunk1.clone()).unwrap();
1124        writer.write_chunk(chunk2.clone()).unwrap();
1125
1126        // Finalize with updated header
1127        let final_header = header.clone();
1128        writer.finalize(final_header).await.unwrap();
1129
1130        // Read the file back
1131        let mut reader = service.create_reader(&test_file_path).await.unwrap();
1132
1133        // Test read_header
1134        let read_header = reader.read_header().unwrap();
1135        assert_eq!(read_header.original_filename, "test_file.txt");
1136        assert_eq!(read_header.chunk_count, 2);
1137        assert!(read_header.is_compressed());
1138        assert!(read_header.is_encrypted());
1139
1140        // Test read_next_chunk
1141        let read_chunk1 = reader.read_next_chunk().await.unwrap();
1142        assert!(read_chunk1.is_some());
1143        let read_chunk1 = read_chunk1.unwrap();
1144        assert_eq!(read_chunk1.nonce, chunk1.nonce);
1145        assert_eq!(read_chunk1.payload, chunk1.payload);
1146
1147        let read_chunk2 = reader.read_next_chunk().await.unwrap();
1148        assert!(read_chunk2.is_some());
1149        let read_chunk2 = read_chunk2.unwrap();
1150        assert_eq!(read_chunk2.nonce, chunk2.nonce);
1151        assert_eq!(read_chunk2.payload, chunk2.payload);
1152
1153        // Test EOF
1154        let read_chunk3 = reader.read_next_chunk().await.unwrap();
1155        assert!(read_chunk3.is_none());
1156
1157        // Test validate_integrity
1158        let is_valid = reader.validate_integrity().await.unwrap();
1159        assert!(is_valid, "File integrity validation should pass");
1160    }
1161
1162    #[tokio::test]
1163    async fn test_file_validation() {
1164        // Create a temporary file for testing
1165        let temp_dir = TempDir::new().unwrap();
1166        let test_file_path = temp_dir.path().join("test_validation.adapipe");
1167
1168        // Create test header with specific checksum
1169        let header = FileHeader::new(
1170            "validation_test.txt".to_string(),
1171            2048,
1172            "original_checksum_xyz789".to_string(),
1173        )
1174        .add_compression_step("zstd", 3)
1175        .with_chunk_info(1024, 1)
1176        .with_pipeline_id("validation-pipeline".to_string());
1177
1178        // Create test chunk
1179        let chunk = ChunkFormat::new([5u8; 12], vec![0x12, 0x34, 0x56, 0x78]);
1180
1181        // Write file
1182        let service = AdapipeFormat::new();
1183        let mut writer = service.create_writer(&test_file_path, header.clone()).await.unwrap();
1184        writer.write_chunk(chunk.clone()).unwrap();
1185        let final_header = header.clone();
1186        writer.finalize(final_header).await.unwrap();
1187
1188        // Validate the file
1189        let validation_result = service.validate_file(&test_file_path).await.unwrap();
1190        assert!(validation_result.is_valid);
1191        assert_eq!(validation_result.chunk_count, 1);
1192        assert_eq!(validation_result.format_version, 1);
1193        assert!(validation_result.integrity_verified);
1194        assert!(validation_result.errors.is_empty());
1195    }
1196
1197    #[tokio::test]
1198    async fn test_read_metadata() {
1199        // Create a temporary file for testing
1200        let temp_dir = TempDir::new().unwrap();
1201        let test_file_path = temp_dir.path().join("test_metadata.adapipe");
1202
1203        // Create test header with metadata
1204        let header = FileHeader::new(
1205            "metadata_test.txt".to_string(),
1206            4096,
1207            "checksum_metadata_test".to_string(),
1208        )
1209        .add_encryption_step("chacha20poly1305", "pbkdf2", 32, 12)
1210        .with_chunk_info(2048, 2)
1211        .with_pipeline_id("metadata-pipeline".to_string())
1212        .with_metadata("custom_key".to_string(), "custom_value".to_string());
1213
1214        // Create and write chunks
1215        let chunk1 = ChunkFormat::new([7u8; 12], vec![0xaa, 0xbb, 0xcc, 0xdd]);
1216        let chunk2 = ChunkFormat::new([8u8; 12], vec![0x11, 0x22, 0x33, 0x44]);
1217
1218        let service = AdapipeFormat::new();
1219        let mut writer = service.create_writer(&test_file_path, header.clone()).await.unwrap();
1220        writer.write_chunk(chunk1).unwrap();
1221        writer.write_chunk(chunk2).unwrap();
1222        let final_header = header.clone();
1223        writer.finalize(final_header).await.unwrap();
1224
1225        // Read metadata
1226        let metadata = service.read_metadata(&test_file_path).await.unwrap();
1227        assert_eq!(metadata.original_filename, "metadata_test.txt");
1228        assert_eq!(metadata.original_size, 4096);
1229        assert_eq!(metadata.chunk_count, 2);
1230        assert_eq!(metadata.pipeline_id, "metadata-pipeline");
1231        assert!(metadata.is_encrypted());
1232        assert!(!metadata.is_compressed());
1233        assert_eq!(metadata.encryption_algorithm(), Some("chacha20poly1305"));
1234        assert_eq!(metadata.metadata.get("custom_key"), Some(&"custom_value".to_string()));
1235    }
1236
1237    #[tokio::test]
1238    async fn test_seek_to_chunk() {
1239        // Create a temporary file for testing
1240        let temp_dir = TempDir::new().unwrap();
1241        let test_file_path = temp_dir.path().join("test_seek.adapipe");
1242
1243        // Create test header
1244        let header = FileHeader::new("seek_test.txt".to_string(), 3072, "checksum_seek_test".to_string())
1245            .with_chunk_info(1024, 3);
1246
1247        // Create test chunks with distinct data
1248        let chunk1 = ChunkFormat::new([1u8; 12], vec![0x01, 0x02, 0x03, 0x04]);
1249        let chunk2 = ChunkFormat::new([2u8; 12], vec![0x05, 0x06, 0x07, 0x08]);
1250        let chunk3 = ChunkFormat::new([3u8; 12], vec![0x09, 0x0a, 0x0b, 0x0c]);
1251
1252        // Write file
1253        let service = AdapipeFormat::new();
1254        let mut writer = service.create_writer(&test_file_path, header.clone()).await.unwrap();
1255        writer.write_chunk(chunk1.clone()).unwrap();
1256        writer.write_chunk(chunk2.clone()).unwrap();
1257        writer.write_chunk(chunk3.clone()).unwrap();
1258        let final_header = header.clone();
1259        writer.finalize(final_header).await.unwrap();
1260
1261        // Create reader
1262        let mut reader = service.create_reader(&test_file_path).await.unwrap();
1263
1264        // Seek to chunk 2 (0-indexed)
1265        reader.seek_to_chunk(2).await.unwrap();
1266        let read_chunk = reader.read_next_chunk().await.unwrap().unwrap();
1267        assert_eq!(read_chunk.nonce, chunk3.nonce);
1268        assert_eq!(read_chunk.payload, chunk3.payload);
1269
1270        // Seek back to chunk 0
1271        reader.seek_to_chunk(0).await.unwrap();
1272        let read_chunk = reader.read_next_chunk().await.unwrap().unwrap();
1273        assert_eq!(read_chunk.nonce, chunk1.nonce);
1274        assert_eq!(read_chunk.payload, chunk1.payload);
1275
1276        // Seek to chunk 1
1277        reader.seek_to_chunk(1).await.unwrap();
1278        let read_chunk = reader.read_next_chunk().await.unwrap().unwrap();
1279        assert_eq!(read_chunk.nonce, chunk2.nonce);
1280        assert_eq!(read_chunk.payload, chunk2.payload);
1281    }
1282}