streaming_crypto/core_api/stream_v2/
io.rs1use std::io::{Cursor, Read, Seek, SeekFrom, Write};
7use std::path::PathBuf;
8use std::collections::BTreeMap;
9use std::sync::{Arc, Mutex};
10use bytes::Bytes;
11use tracing::{debug, error};
12
13use crate::{
14 headers::HeaderV1,
15 stream_v2::{
16 segmenting::{SegmentHeader, encode_segment, decode_segment_header, types::SegmentFlags},
17 segment_worker::{DecryptedSegment, EncryptedSegment},
18 },
19 types::StreamError
20};
21
22pub enum InputSource {
24 Reader(Box<dyn Read + Send>),
25 File(PathBuf),
26 Memory(Vec<u8>),
27}
28
29pub enum OutputSink {
31 Writer(Box<dyn Write + Send>),
32 File(PathBuf),
33 Memory,
34}
35
36pub fn open_input(src: InputSource) -> Result<Box<dyn Read + Send>, StreamError> {
38 let reader: Box<dyn Read + Send> = match src {
39 InputSource::Reader(r) => r,
40 InputSource::File(p) => Box::new(std::fs::File::open(p)?),
41 InputSource::Memory(b) => Box::new(std::io::Cursor::new(b)),
42 };
43 Ok(reader)
44}
45
46pub fn open_output(
48 sink: OutputSink,
49 with_buf: Option<bool>,
50) -> Result<(Box<dyn Write + Send>, Option<Arc<Mutex<Vec<u8>>>>), StreamError> {
51 match sink {
52 OutputSink::Writer(w) => Ok((w, None)),
53 OutputSink::File(p) => Ok((Box::new(std::fs::File::create(p)?), None)),
54 OutputSink::Memory => {
55 match with_buf {
56 Some(true) => {
57 let buf = Arc::new(Mutex::new(Vec::new()));
58 let writer = SharedBufferWriter { buf: buf.clone() };
59 Ok((Box::new(writer), Some(buf)))
60 },
61 _ => {
62 let cursor = Cursor::new(Vec::new());
64 Ok((Box::new(cursor), None))
65 }
66 }
67 }
68 }
69}
70
71pub struct SharedBufferWriter {
72 buf: Arc<Mutex<Vec<u8>>>,
73}
74impl Write for SharedBufferWriter {
75 fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
76 let mut guard = self.buf.lock().unwrap();
77 guard.extend_from_slice(data); Ok(data.len())
79 }
80
81 fn flush(&mut self) -> std::io::Result<()> {
82 Ok(())
83 }
84 }
96
97pub fn write_header<W: Write>(w: &mut W, h: &HeaderV1) -> Result<(), StreamError> {
100 let buf = crate::headers::encode_header_le(h).map_err(|e| StreamError::Header(e))?;
101 w.write_all(&buf)?;
102 Ok(())
103}
104
105fn read_header<R: Read>(r: &mut R) -> Result<HeaderV1, StreamError> {
106 let mut buf = [0u8; HeaderV1::LEN];
107 r.read_exact(&mut buf)?;
108 Ok(crate::headers::decode_header_le(&buf).map_err(|e| StreamError::Header(e))?)
109}
110
111#[derive(Debug)]
112pub struct PayloadReader<R: Read> {
113 inner: R,
114}
115
116impl<R: Read> PayloadReader<R> {
117 pub fn new(reader: R) -> Self {
119 PayloadReader { inner: reader }
120 }
121
122 pub fn with_header(mut reader: R) -> Result<(HeaderV1, Self), StreamError> {
124 let header = read_header(&mut reader)?;
125 Ok((header, PayloadReader { inner: reader }))
126 }
127}
128
129impl<R: Read> Read for PayloadReader<R> {
130 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
131 self.inner.read(buf)
132 }
133}
134
135impl<R: Read + Seek> Seek for PayloadReader<R> {
136 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
137 self.inner.seek(pos)
138 }
139}
140
141pub fn assert_reader_after_header<R: Read + Seek + Send>(reader: &mut R, header_len: usize) -> Result<(), StreamError> {
144 let pos = reader.seek(SeekFrom::Current(0))?;
145 if pos < header_len as u64 {
146 return Err(StreamError::Validation(format!(
147 "Reader not advanced past header: pos={pos}, expected >= {header_len}"
148 )));
149 }
150 Ok(())
151}
152
153pub fn read_exact_or_eof<R: Read>(
155 r: &mut R,
156 len: usize,
157) -> Result<Bytes, StreamError> {
158 let mut buf = vec![0u8; len];
159 let mut off = 0;
160
161 while off < len {
162 let n = r.read(&mut buf[off..])?;
163 if n == 0 {
164 break;
165 }
166 off += n;
167 }
168
169 buf.truncate(off);
170 Ok(Bytes::from(buf))
171}
172pub fn read_exact_or_eof_1<R: Read>(
173 r: &mut R,
174 len: usize,
175) -> Result<Bytes, StreamError> {
176 let mut buf = vec![0u8; len];
177 let n = r.read(&mut buf)?;
178 if n == 0 {
179 return Ok(Bytes::new());
181 }
182
183 buf.truncate(n);
184 Ok(Bytes::from(buf))
185}
186
187pub fn read_segment<R: Read>(
190 r: &mut R,
191) -> Result<Option<(SegmentHeader, Bytes)>, StreamError> {
192 let mut hdr_buf = [0u8; SegmentHeader::LEN];
193
194 if let Err(_) = r.read_exact(&mut hdr_buf) {
196 return Ok(None);
197 }
198
199 let header = decode_segment_header(&hdr_buf).map_err(StreamError::Segment)?;
200 let mut wire = vec![0u8; header.wire_len() as usize];
205 if header.wire_len() > 0 {
206 r.read_exact(&mut wire)?;
207 }
208
209 Ok(Some((header, Bytes::from(wire))))
216}
217
218pub struct OrderedEncryptedWriter<'a, W: Write> {
246 out: &'a mut W,
247 next: u32,
248 pending: BTreeMap<u32, EncryptedSegment>,
249 final_index: Option<u32>,
250}
251
252impl<'a, W: Write> OrderedEncryptedWriter<'a, W> {
253 pub fn new(out: &'a mut W) -> Self {
254 Self {
255 out,
256 next: 0,
257 pending: BTreeMap::new(),
258 final_index: None,
259 }
260 }
261
262 pub fn push(&mut self, segment: EncryptedSegment) -> Result<(), StreamError> {
263 if segment.header.flags().contains(SegmentFlags::FINAL_SEGMENT) && segment.wire.is_empty() {
265 debug!("[ENCRYPT WRITER] Final empty segment {} detected", segment.header.segment_index());
266 self.final_index = Some(segment.header.segment_index());
267 }
268 self.pending.insert(segment.header.segment_index(), segment);
270 self.flush_ready()
271 }
272
273 pub fn finish(&mut self) -> Result<(), StreamError> {
274 debug!("[ENCRYPT WRITER] finish() called, pending: {}, next: {}", self.pending.len(), self.next);
275
276 while let Some(seg) = self.pending.remove(&self.next) {
278 self.write(seg)?;
279 self.next += 1;
280 }
281
282 if self.final_index.is_none() {
284 return Err(StreamError::Validation("Missing final segment".into()));
285 }
286
287 debug!("[ENCRYPT WRITER] finish() completed successfully");
288 Ok(())
289 }
290
291 fn flush_ready(&mut self) -> Result<(), StreamError> {
292 while let Some(seg) = self.pending.remove(&self.next) {
293 self.write(seg)?;
294 self.next += 1;
295 }
296 Ok(())
297 }
298
299 fn write(&mut self, segment: EncryptedSegment) -> Result<(), StreamError> {
300 let idx = segment.header.segment_index();
301 debug!("[ENCRYPT WRITER] Writing segment {}", idx);
302
303 let segment_enc = encode_segment(&segment.header, &segment.wire)
304 .map_err(|e| StreamError::Segment(e))?;
305
306 debug!(
307 "[ENCRYPT WRITER] Encoded segment {} ({} bytes): {}",
308 idx, segment_enc.len(), segment.header.summary()
309 );
310
311 match self.out.write_all(&segment_enc) {
314 Ok(()) => {
315 debug!("[ENCRYPT WRITER] Successfully wrote segment {}", idx);
316 Ok(())
317 }
318 Err(e) => {
319 error!("[ENCRYPT WRITER] ❌ WRITE FAILED for segment {}: {}", idx, e);
320 Err(StreamError::from(e))
322 }
323 }
324 }
326}
327
328pub struct OrderedPlaintextWriter<'a, W: Write> {
329 out: &'a mut W,
330 next: u32,
331 pending: BTreeMap<u32, DecryptedSegment>,
332 final_index: Option<u32>,
333}
334
335impl<'a, W: Write> OrderedPlaintextWriter<'a, W> {
336 pub fn new(out: &'a mut W) -> Self {
337 Self {
338 out,
339 next: 0,
340 pending: BTreeMap::new(),
341 final_index: None,
342 }
343 }
344 pub fn push(&mut self, segment: &DecryptedSegment) -> Result<(), StreamError> {
345 if segment.header.flags().contains(SegmentFlags::FINAL_SEGMENT) && segment.bytes.is_empty() {
347 debug!("[PLAINTEXT WRITER] Final empty segment {} detected", segment.header.segment_index());
348 self.final_index = Some(segment.header.segment_index());
349
350 }
352
353 debug!("[PLAINTEXT WRITER] Queuing segment {}", segment.header.segment_index());
355 self.pending.insert(segment.header.segment_index(), segment.clone());
356 self.flush_ready()
357 }
358
359 pub fn finish(&mut self) -> Result<(), StreamError> {
360 while let Some(segment) = self.pending.remove(&self.next) {
362 self.write(segment)?;
363 self.next += 1;
364 }
365
366 if self.final_index.is_none() {
368 return Err(StreamError::Validation("Missing final segment".into()));
369 }
370
371 debug!("[PLAINTEXT WRITER] Finished, final marker index {:?}", self.final_index);
372 Ok(())
373 }
374
375 fn flush_ready(&mut self) -> Result<(), StreamError> {
376 while let Some(segment) = self.pending.remove(&self.next) {
377 self.write(segment)?;
378 self.next += 1;
379 }
380 Ok(())
381 }
382
383 fn write(&mut self, segment: DecryptedSegment) -> Result<(), StreamError> {
384 debug!("[PLAINTEXT WRITER] Writing segment {}", segment.header.segment_index());
385 self.out.write_all(&segment.bytes)?;
386 Ok(())
387 }
388}