Skip to main content

streaming_crypto/core_api/stream_v2/
io.rs

1// # 📂 src/stream_v2/io.rs
2
3// ## 📂 File: `src/stream_v2/io.rs`
4// ## Normalized I/O + ordered encrypted writer (production-ready)
5
6use std::io::{Cursor, Read, Seek, SeekFrom, Write};
7use std::path::PathBuf;
8use std::collections::BTreeMap;
9use std::sync::{Arc, Mutex};
10use bytes::Bytes;
11use tracing::{debug, error};
12
13use crate::{
14    headers::HeaderV1,
15    stream_v2::{
16        segmenting::{SegmentHeader, encode_segment, decode_segment_header, types::SegmentFlags},
17        segment_worker::{DecryptedSegment, EncryptedSegment},
18    },
19    types::StreamError
20};
21
22/// Canonical input abstraction
23pub enum InputSource {
24    Reader(Box<dyn Read + Send>),
25    File(PathBuf),
26    Memory(Vec<u8>),
27}
28
29/// Canonical output abstraction
30pub enum OutputSink {
31    Writer(Box<dyn Write + Send>),
32    File(PathBuf),
33    Memory,
34}
35
36/// Normalize input source into a boxed reader
37pub fn open_input(src: InputSource) -> Result<Box<dyn Read + Send>, StreamError> {
38    let reader: Box<dyn Read + Send> = match src {
39        InputSource::Reader(r) => r,
40        InputSource::File(p) => Box::new(std::fs::File::open(p)?),
41        InputSource::Memory(b) => Box::new(std::io::Cursor::new(b)),
42    };
43    Ok(reader)
44}
45
46/// Normalize output sink into a boxed writer
47pub fn open_output(
48    sink: OutputSink,
49    with_buf: Option<bool>,
50) -> Result<(Box<dyn Write + Send>, Option<Arc<Mutex<Vec<u8>>>>), StreamError> {
51    match sink {
52        OutputSink::Writer(w) => Ok((w, None)),
53        OutputSink::File(p) => Ok((Box::new(std::fs::File::create(p)?), None)),
54        OutputSink::Memory => {
55            match with_buf {
56                Some(true) => {
57                    let buf = Arc::new(Mutex::new(Vec::new()));
58                    let writer = SharedBufferWriter { buf: buf.clone() };
59                    Ok((Box::new(writer), Some(buf)))
60                },
61                _ => {
62                    // If we don’t need concurrent access, we can just use Cursor<Vec<u8>> directly:
63                    let cursor = Cursor::new(Vec::new());
64                    Ok((Box::new(cursor), None))
65                }
66            }
67        }
68    }
69}
70
71pub struct SharedBufferWriter {
72    buf: Arc<Mutex<Vec<u8>>>,
73}
74impl Write for SharedBufferWriter {
75    fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
76        let mut guard = self.buf.lock().unwrap();
77        guard.extend_from_slice(data); // ✅ append, not overwrite
78        Ok(data.len())
79    }
80
81    fn flush(&mut self) -> std::io::Result<()> {
82        Ok(())
83    }
84    // fn main() -> Result<(), std::io::Error> {
85    //     let (mut writer, maybe_buf) = open_output_shared(OutputSink::Memory)?;
86    //     writer.write_all(b"hello world")?;
87
88    //     if let Some(buf) = maybe_buf {
89    //         let data = buf.lock().unwrap();
90    //         println!("Captured output: {:?}", String::from_utf8_lossy(&data));
91    //     }
92
93    //     Ok(())
94    // }
95}
96
97// ================= Header =================
98
99pub fn write_header<W: Write>(w: &mut W, h: &HeaderV1) -> Result<(), StreamError> {
100    let buf = crate::headers::encode_header_le(h).map_err(|e| StreamError::Header(e))?;
101    w.write_all(&buf)?;
102    Ok(())
103}
104
105fn read_header<R: Read>(r: &mut R) -> Result<HeaderV1, StreamError> {
106    let mut buf = [0u8; HeaderV1::LEN];
107    r.read_exact(&mut buf)?;
108    Ok(crate::headers::decode_header_le(&buf).map_err(|e| StreamError::Header(e))?)
109}
110
111#[derive(Debug)]
112pub struct PayloadReader<R: Read> {
113    inner: R,
114}
115
116impl<R: Read> PayloadReader<R> {
117    /// Construct without consuming header (rarely used)
118    pub fn new(reader: R) -> Self {
119        PayloadReader { inner: reader }
120    }
121
122    /// Consume header and return both parsed header and payload reader
123    pub fn with_header(mut reader: R) -> Result<(HeaderV1, Self), StreamError> {
124        let header = read_header(&mut reader)?;
125        Ok((header, PayloadReader { inner: reader }))
126    }
127}
128
129impl<R: Read> Read for PayloadReader<R> {
130    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
131        self.inner.read(buf)
132    }
133}
134
135impl<R: Read + Seek> Seek for PayloadReader<R> {
136    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
137        self.inner.seek(pos)
138    }
139}
140
141// ================= Utilities =================
142/// Ensure the reader has advanced past the header (default 80 bytes).
143pub fn assert_reader_after_header<R: Read + Seek + Send>(reader: &mut R, header_len: usize) -> Result<(), StreamError> {
144    let pos = reader.seek(SeekFrom::Current(0))?;
145    if pos < header_len as u64 {
146        return Err(StreamError::Validation(format!(
147            "Reader not advanced past header: pos={pos}, expected >= {header_len}"
148        )));
149    }
150    Ok(())
151}
152
153// We need this version for segmenting into canonical chunk sizes like 64 KiB, 128 KiB, etc.
154pub fn read_exact_or_eof<R: Read>(
155    r: &mut R,
156    len: usize,
157) -> Result<Bytes, StreamError> {
158    let mut buf = vec![0u8; len];
159    let mut off = 0;
160
161    while off < len {
162        let n = r.read(&mut buf[off..])?;
163        if n == 0 {
164            break;
165        }
166        off += n;
167    }
168
169    buf.truncate(off);
170    Ok(Bytes::from(buf))
171}
172pub fn read_exact_or_eof_1<R: Read>(
173    r: &mut R,
174    len: usize,
175) -> Result<Bytes, StreamError> {
176    let mut buf = vec![0u8; len];
177    let n = r.read(&mut buf)?;
178    if n == 0 {
179        // EOF
180        return Ok(Bytes::new());
181    }
182    
183    buf.truncate(n);
184    Ok(Bytes::from(buf))
185}
186
187// ================= Segment I/O =================
188
189pub fn read_segment<R: Read>(
190    r: &mut R,
191) -> Result<Option<(SegmentHeader, Bytes)>, StreamError> {
192    let mut hdr_buf = [0u8; SegmentHeader::LEN];
193
194    // If we can't read a full header, it's true EOF
195    if let Err(_) = r.read_exact(&mut hdr_buf) {
196        return Ok(None);
197    }
198
199    let header = decode_segment_header(&hdr_buf).map_err(StreamError::Segment)?;
200    // 🔍 Debug header summary
201    // debug!("[IO:DECRYPT] Parsed header: {}", header.summary());
202
203    // Allocate wire buffer according to header
204    let mut wire = vec![0u8; header.wire_len() as usize];
205    if header.wire_len() > 0 {
206        r.read_exact(&mut wire)?;
207    }
208
209    // ✅ Special case: final empty segment
210    // if header.flags.contains(SegmentFlags::FINAL_SEGMENT) && header.wire_len == 0 {
211    //     debug!("[IO:DECRYPT] Empty FINAL_SEGMENT detected at index {}", header.segment_index);
212    //     return Ok(Some((header, Bytes::new())));
213    // }
214
215    Ok(Some((header, Bytes::from(wire))))
216}
217
218// pub fn read_segment<R: Read>(
219//     r: &mut R,
220// ) -> Result<Option<(SegmentHeader, Bytes)>, StreamError> {
221//     let mut hdr_buf = [0u8; SegmentHeader::LEN];
222
223//     match r.read_exact(&mut hdr_buf) {
224//         Ok(()) => {
225//             // Successfully read a full segment header
226//             let header = decode_segment_header(&hdr_buf)
227//                 .map_err(StreamError::Segment)?;
228//             let mut wire = vec![0u8; header.wire_len as usize];
229//             r.read_exact(&mut wire)?;
230//             Ok(Some((header, Bytes::from(wire))))
231//         }
232//         Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
233//             // Graceful end of stream: no more segments
234//             Ok(None)
235//         }
236//         Err(e) => {
237//             // Propagate other I/O errors
238//             Err(StreamError::Io(e))
239//         }
240//     }
241// }
242
243// ================= Ordered writers =================
244
245pub struct OrderedEncryptedWriter<'a, W: Write> {
246    out: &'a mut W,
247    next: u32,
248    pending: BTreeMap<u32, EncryptedSegment>,
249    final_index: Option<u32>,
250}
251
252impl<'a, W: Write> OrderedEncryptedWriter<'a, W> {
253    pub fn new(out: &'a mut W) -> Self {
254        Self {
255            out,
256            next: 0,
257            pending: BTreeMap::new(),
258            final_index: None,
259        }
260    }
261
262    pub fn push(&mut self, segment: EncryptedSegment) -> Result<(), StreamError> {
263        // Accept empty wire if FINAL_SEGMENT is set
264        if segment.header.flags().contains(SegmentFlags::FINAL_SEGMENT) && segment.wire.is_empty() {
265            debug!("[ENCRYPT WRITER] Final empty segment {} detected", segment.header.segment_index());
266            self.final_index = Some(segment.header.segment_index());
267        }
268        // Don’t write immediately — enqueue it
269        self.pending.insert(segment.header.segment_index(), segment);
270        self.flush_ready()
271    }
272
273    pub fn finish(&mut self) -> Result<(), StreamError> {
274        debug!("[ENCRYPT WRITER] finish() called, pending: {}, next: {}", self.pending.len(), self.next);
275        
276        // Flush any pending segments in order
277        while let Some(seg) = self.pending.remove(&self.next) {
278            self.write(seg)?;
279            self.next += 1;
280        }
281
282        // Validation: final marker must have been seen
283        if self.final_index.is_none() {
284            return Err(StreamError::Validation("Missing final segment".into()));
285        }
286
287        debug!("[ENCRYPT WRITER] finish() completed successfully");
288        Ok(())
289    }
290
291    fn flush_ready(&mut self) -> Result<(), StreamError> {
292        while let Some(seg) = self.pending.remove(&self.next) {
293            self.write(seg)?;
294            self.next += 1;
295        }
296        Ok(())
297    }
298
299    fn write(&mut self, segment: EncryptedSegment) -> Result<(), StreamError> {
300        let idx = segment.header.segment_index();
301        debug!("[ENCRYPT WRITER] Writing segment {}", idx);
302        
303        let segment_enc = encode_segment(&segment.header, &segment.wire)
304            .map_err(|e| StreamError::Segment(e))?;
305        
306        debug!(
307            "[ENCRYPT WRITER] Encoded segment {} ({} bytes): {}",
308            idx, segment_enc.len(), segment.header.summary()
309        );
310
311        // self.out.write_all(&segment_enc)?;  // ⚠️ This converts io::Error to StreamError
312        // ✅ CRITICAL: Explicit write_all with better error context
313        match self.out.write_all(&segment_enc) {
314            Ok(()) => {
315                debug!("[ENCRYPT WRITER] Successfully wrote segment {}", idx);
316                Ok(())
317            }
318            Err(e) => {
319                error!("[ENCRYPT WRITER] ❌ WRITE FAILED for segment {}: {}", idx, e);
320                // Ensure error is properly wrapped
321                Err(StreamError::from(e))
322            }
323        }
324        // Ok(())
325    }
326}
327
328pub struct OrderedPlaintextWriter<'a, W: Write> {
329    out: &'a mut W,
330    next: u32,
331    pending: BTreeMap<u32, DecryptedSegment>,
332    final_index: Option<u32>,
333}
334
335impl<'a, W: Write> OrderedPlaintextWriter<'a, W> {
336    pub fn new(out: &'a mut W) -> Self {
337        Self {
338            out,
339            next: 0,
340            pending: BTreeMap::new(),
341            final_index: None,
342        }
343    }
344    pub fn push(&mut self, segment: &DecryptedSegment) -> Result<(), StreamError> {
345        // Accept empty wire if FINAL_SEGMENT is set
346        if segment.header.flags().contains(SegmentFlags::FINAL_SEGMENT) && segment.bytes.is_empty() {
347            debug!("[PLAINTEXT WRITER] Final empty segment {} detected", segment.header.segment_index());
348            self.final_index = Some(segment.header.segment_index());
349
350            // Enqueue the final marker like any other segment
351        }
352
353        // Normal push logic
354        debug!("[PLAINTEXT WRITER] Queuing segment {}", segment.header.segment_index());
355        self.pending.insert(segment.header.segment_index(), segment.clone());
356        self.flush_ready()
357    }
358
359    pub fn finish(&mut self) -> Result<(), StreamError> {
360        // Flush any pending segments in order
361        while let Some(segment) = self.pending.remove(&self.next) {
362            self.write(segment)?;
363            self.next += 1;
364        }
365
366        // Validation: final marker must have been seen
367        if self.final_index.is_none() {
368            return Err(StreamError::Validation("Missing final segment".into()));
369        }
370
371        debug!("[PLAINTEXT WRITER] Finished, final marker index {:?}", self.final_index);
372        Ok(())
373    }
374
375    fn flush_ready(&mut self) -> Result<(), StreamError> {
376        while let Some(segment) = self.pending.remove(&self.next) {
377            self.write(segment)?;
378            self.next += 1;
379        }
380        Ok(())
381    }
382
383    fn write(&mut self, segment: DecryptedSegment) -> Result<(), StreamError> {
384        debug!("[PLAINTEXT WRITER] Writing segment {}", segment.header.segment_index());
385        self.out.write_all(&segment.bytes)?;
386        Ok(())
387    }
388}