1use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
2use crate::{StreamError, StreamResult};
3use std::collections::VecDeque;
4use std::fs::{File, OpenOptions};
5use std::io::{Read, Write};
6use std::path::PathBuf;
7use std::sync::{
8 Arc, Condvar, Mutex,
9 atomic::{AtomicBool, Ordering},
10};
11
12const DEFAULT_CHUNK_SIZE: usize = 8192;
13const READER_QUEUE_CAPACITY: usize = 8;
14
15fn io_error(error: std::io::Error) -> StreamError {
16 StreamError::Failed(error.to_string())
17}
18
19#[derive(Clone)]
20enum SourceTerminal {
21 Complete,
22 Error(StreamError),
23}
24
25struct SourceQueueState {
26 queue: VecDeque<Vec<u8>>,
27 terminal: Option<SourceTerminal>,
28}
29
30struct SourceQueue {
31 state: Mutex<SourceQueueState>,
32 available: Condvar,
33 space: Condvar,
34 capacity: usize,
35 cancelled: Arc<AtomicBool>,
36}
37
38impl SourceQueue {
39 fn new() -> Arc<Self> {
40 Arc::new(Self {
41 state: Mutex::new(SourceQueueState {
42 queue: VecDeque::new(),
43 terminal: None,
44 }),
45 available: Condvar::new(),
46 space: Condvar::new(),
47 capacity: READER_QUEUE_CAPACITY,
48 cancelled: Arc::new(AtomicBool::new(false)),
49 })
50 }
51
52 fn push(&self, chunk: Vec<u8>) -> bool {
53 let mut state = self.state.lock().expect("io source queue poisoned");
54 while state.queue.len() >= self.capacity
55 && state.terminal.is_none()
56 && !self.cancelled.load(Ordering::SeqCst)
57 {
58 state = self
59 .space
60 .wait(state)
61 .expect("io source queue poisoned while waiting for space");
62 }
63
64 if state.terminal.is_some() || self.cancelled.load(Ordering::SeqCst) {
65 return false;
66 }
67
68 if state.terminal.is_none() {
69 state.queue.push_back(chunk);
70 }
71 drop(state);
72 self.available.notify_all();
73 true
74 }
75
76 fn finish(&self, terminal: SourceTerminal) {
77 let mut state = self.state.lock().expect("io source queue poisoned");
78 if state.terminal.is_none() {
79 state.terminal = Some(terminal);
80 }
81 drop(state);
82 self.available.notify_all();
83 self.space.notify_all();
84 }
85}
86
87struct ReaderWorkerGuard {
88 queue: Arc<SourceQueue>,
89 armed: bool,
90}
91
92impl ReaderWorkerGuard {
93 fn new(queue: Arc<SourceQueue>) -> Self {
94 Self { queue, armed: true }
95 }
96
97 fn disarm(&mut self) {
98 self.armed = false;
99 }
100}
101
102impl Drop for ReaderWorkerGuard {
103 fn drop(&mut self) {
104 if self.armed {
105 self.queue
106 .finish(SourceTerminal::Error(StreamError::AbruptTermination));
107 }
108 }
109}
110
111struct ReaderSourceStream {
112 queue: Arc<SourceQueue>,
113 completion: Option<StreamCompletion<NotUsed>>,
114}
115
116impl Iterator for ReaderSourceStream {
117 type Item = StreamResult<Vec<u8>>;
118
119 fn next(&mut self) -> Option<Self::Item> {
120 let mut state = self.queue.state.lock().expect("io source queue poisoned");
121 loop {
122 if let Some(chunk) = state.queue.pop_front() {
123 self.queue.space.notify_all();
124 return Some(Ok(chunk));
125 }
126 if let Some(terminal) = state.terminal.clone() {
127 return match terminal {
128 SourceTerminal::Complete => None,
129 SourceTerminal::Error(error) => Some(Err(error)),
130 };
131 }
132 state = self
133 .queue
134 .available
135 .wait(state)
136 .expect("io source queue poisoned while waiting");
137 }
138 }
139}
140
141impl Drop for ReaderSourceStream {
142 fn drop(&mut self) {
143 self.queue.cancelled.store(true, Ordering::SeqCst);
144 drop(self.queue.state.lock().unwrap_or_else(|p| p.into_inner()));
152 self.queue.available.notify_all();
153 self.queue.space.notify_all();
154 let _ = self.completion.take();
155 }
156}
157
158struct WriterGuard<W: Write> {
159 writer: W,
160 flushed: bool,
161}
162
163impl<W: Write> WriterGuard<W> {
164 fn new(writer: W) -> Self {
165 Self {
166 writer,
167 flushed: false,
168 }
169 }
170
171 fn writer_mut(&mut self) -> &mut W {
172 &mut self.writer
173 }
174
175 fn flush_once(&mut self) -> StreamResult<()> {
176 if self.flushed {
177 return Ok(());
178 }
179 self.writer.flush().map_err(io_error)?;
180 self.flushed = true;
181 Ok(())
182 }
183}
184
185impl<W: Write> Drop for WriterGuard<W> {
186 fn drop(&mut self) {
187 let _ = self.flush_once();
188 }
189}
190
191pub struct StreamConverters;
192
193impl StreamConverters {
194 #[must_use]
195 pub fn from_reader<F, R>(factory: F, chunk_size: usize) -> Source<Vec<u8>>
196 where
197 F: Fn() -> std::io::Result<R> + Send + Sync + 'static,
198 R: Read + Send + 'static,
199 {
200 assert!(chunk_size > 0, "chunk size must be greater than zero");
201 Source::from_materialized_factory(move |materializer| {
202 let reader = factory().map_err(io_error)?;
203 let queue = SourceQueue::new();
204 let queue_for_worker = Arc::clone(&queue);
205 let cancelled = Arc::clone(&queue.cancelled);
206 let completion = materializer.spawn_stream(move |_worker_cancelled| {
207 let mut reader = reader;
208 let mut guard = ReaderWorkerGuard::new(Arc::clone(&queue_for_worker));
209 let mut buffer = vec![0_u8; chunk_size];
210
211 loop {
212 if cancelled.load(Ordering::SeqCst) {
213 guard.disarm();
214 return Ok(NotUsed);
215 }
216
217 match reader.read(&mut buffer) {
218 Ok(0) => {
219 guard.disarm();
220 queue_for_worker.finish(SourceTerminal::Complete);
221 return Ok(NotUsed);
222 }
223 Ok(read) => {
224 if !queue_for_worker.push(buffer[..read].to_vec()) {
225 guard.disarm();
226 return Ok(NotUsed);
227 }
228 }
229 Err(error) => {
230 guard.disarm();
231 queue_for_worker.finish(SourceTerminal::Error(io_error(error)));
232 return Ok(NotUsed);
233 }
234 }
235 }
236 });
237
238 Ok((
239 Box::new(ReaderSourceStream {
240 queue,
241 completion: Some(completion),
242 }) as BoxStream<Vec<u8>>,
243 NotUsed,
244 ))
245 })
246 }
247
248 #[must_use]
249 pub fn to_writer<F, W>(factory: F) -> Sink<Vec<u8>, StreamCompletion<NotUsed>>
250 where
251 F: Fn() -> std::io::Result<W> + Send + Sync + 'static,
252 W: Write + Send + 'static,
253 {
254 Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
255 let writer = WriterGuard::new(factory().map_err(io_error)?);
256 Ok(materializer.spawn_stream(move |cancelled| {
257 let mut input = input;
258 let mut writer = writer;
259 loop {
260 if cancelled.load(Ordering::SeqCst) {
261 let _ = writer.flush_once();
262 return Err(StreamError::Cancelled);
263 }
264
265 match input.next() {
266 Some(Ok(chunk)) => {
267 writer.writer_mut().write_all(&chunk).map_err(io_error)?
268 }
269 Some(Err(error)) => {
270 let _ = writer.flush_once();
271 return Err(error);
272 }
273 None => {
274 writer.flush_once()?;
275 return Ok(NotUsed);
276 }
277 }
278 }
279 }))
280 })
281 }
282}
283
284pub struct FileIO;
285
286impl FileIO {
287 #[must_use]
288 pub fn from_path(path: impl Into<PathBuf>, chunk_size: usize) -> Source<Vec<u8>> {
289 let path = path.into();
290 StreamConverters::from_reader(move || File::open(&path), chunk_size)
291 }
292
293 #[must_use]
294 pub fn from_path_default(path: impl Into<PathBuf>) -> Source<Vec<u8>> {
295 Self::from_path(path, DEFAULT_CHUNK_SIZE)
296 }
297
298 #[must_use]
299 pub fn to_path(path: impl Into<PathBuf>) -> Sink<Vec<u8>, StreamCompletion<NotUsed>> {
300 let path = path.into();
301 StreamConverters::to_writer(move || {
302 OpenOptions::new()
303 .create(true)
304 .truncate(true)
305 .write(true)
306 .open(&path)
307 })
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::Source;
315 use crate::testkit::TestSink;
316 use std::io::Cursor;
317 use std::sync::atomic::{AtomicU64, AtomicUsize};
318 use std::thread;
319 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
320
321 fn wait_for_counter(counter: &AtomicUsize, expected: usize) {
322 let deadline = std::time::Instant::now() + Duration::from_secs(1);
323 while std::time::Instant::now() < deadline {
324 if counter.load(Ordering::SeqCst) == expected {
325 return;
326 }
327 thread::sleep(Duration::from_millis(5));
328 }
329 assert_eq!(counter.load(Ordering::SeqCst), expected);
330 }
331
332 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
333 let deadline = Instant::now() + timeout;
334 while Instant::now() < deadline {
335 if condition() {
336 return true;
337 }
338 thread::sleep(Duration::from_millis(5));
339 }
340 condition()
341 }
342
343 fn unique_temp_path(name: &str) -> PathBuf {
344 let nanos = SystemTime::now()
345 .duration_since(UNIX_EPOCH)
346 .expect("clock after epoch")
347 .as_nanos();
348 std::env::temp_dir().join(format!(
349 "datum-wp12-{name}-{}-{nanos}.bin",
350 std::process::id()
351 ))
352 }
353
354 struct CountingReader {
355 inner: Cursor<Vec<u8>>,
356 drops: Arc<AtomicUsize>,
357 }
358
359 impl Read for CountingReader {
360 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
361 self.inner.read(buf)
362 }
363 }
364
365 impl Drop for CountingReader {
366 fn drop(&mut self) {
367 self.drops.fetch_add(1, Ordering::SeqCst);
368 }
369 }
370
371 struct CountingWriter {
372 writes: Arc<Mutex<Vec<Vec<u8>>>>,
373 flushes: Arc<AtomicUsize>,
374 drops: Arc<AtomicUsize>,
375 fail_write: bool,
376 }
377
378 impl Write for CountingWriter {
379 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
380 if self.fail_write {
381 return Err(std::io::Error::other("writer boom"));
382 }
383 self.writes
384 .lock()
385 .expect("writer log poisoned")
386 .push(buf.to_vec());
387 Ok(buf.len())
388 }
389
390 fn flush(&mut self) -> std::io::Result<()> {
391 self.flushes.fetch_add(1, Ordering::SeqCst);
392 Ok(())
393 }
394 }
395
396 impl Drop for CountingWriter {
397 fn drop(&mut self) {
398 self.drops.fetch_add(1, Ordering::SeqCst);
399 }
400 }
401
402 struct CountingChunkReader {
403 inner: Cursor<Vec<u8>>,
404 chunk_size: usize,
405 reads: Arc<AtomicUsize>,
406 }
407
408 impl Read for CountingChunkReader {
409 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
410 self.reads.fetch_add(1, Ordering::SeqCst);
411 let mut chunk = vec![0_u8; self.chunk_size.min(buf.len())];
412 let read = self.inner.read(&mut chunk)?;
413 buf[..read].copy_from_slice(&chunk[..read]);
414 Ok(read)
415 }
416 }
417
418 #[test]
419 fn from_reader_emits_chunked_bytes_and_completes() {
420 let sink = StreamConverters::from_reader(|| Ok(Cursor::new(b"abcdef".to_vec())), 2)
421 .run_with(TestSink::probe())
422 .expect("reader source materializes");
423
424 sink.request(4);
425 sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec(), b"ef".to_vec()]);
426 sink.expect_complete();
427 }
428
429 #[test]
430 fn from_reader_closes_exactly_once_on_completion() {
431 let drops = Arc::new(AtomicUsize::new(0));
432 let drops_for_reader = Arc::clone(&drops);
433 let sink = StreamConverters::from_reader(
434 move || {
435 Ok(CountingReader {
436 inner: Cursor::new(b"hello".to_vec()),
437 drops: Arc::clone(&drops_for_reader),
438 })
439 },
440 8,
441 )
442 .run_with(TestSink::probe())
443 .expect("reader source materializes");
444
445 sink.request(2);
446 sink.assert_next(b"hello".to_vec());
447 sink.expect_complete();
448 wait_for_counter(&drops, 1);
449 }
450
451 #[test]
452 fn from_reader_closes_exactly_once_on_cancellation() {
453 let drops = Arc::new(AtomicUsize::new(0));
454 let drops_for_reader = Arc::clone(&drops);
455 let mut sink = StreamConverters::from_reader(
456 move || {
457 Ok(CountingReader {
458 inner: Cursor::new(vec![1_u8; 32]),
459 drops: Arc::clone(&drops_for_reader),
460 })
461 },
462 4,
463 )
464 .run_with(TestSink::probe())
465 .expect("reader source materializes");
466
467 sink.request(1);
468 sink.assert_next(vec![1_u8; 4]);
469 sink.cancel();
470 wait_for_counter(&drops, 1);
471 }
472
473 #[test]
474 fn from_reader_surfaces_read_failure() {
475 struct FailingReader;
476
477 impl Read for FailingReader {
478 fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
479 Err(std::io::Error::other("reader boom"))
480 }
481 }
482
483 let sink = StreamConverters::from_reader(|| Ok(FailingReader), 8)
484 .run_with(TestSink::probe())
485 .expect("reader source materializes");
486
487 sink.request(1);
488 assert_eq!(
489 sink.expect_error(),
490 StreamError::Failed("reader boom".to_owned())
491 );
492 }
493
494 #[test]
495 fn from_reader_caps_buffered_read_ahead_for_slow_consumers() {
496 let payload: Vec<u8> = (0..64_u8).cycle().take(8192 * 16).collect();
497 let payload_for_reader = payload.clone();
498 let reads = Arc::new(AtomicUsize::new(0));
499 let reads_for_reader = Arc::clone(&reads);
500 let sink = StreamConverters::from_reader(
501 move || {
502 Ok(CountingChunkReader {
503 inner: Cursor::new(payload_for_reader.clone()),
504 chunk_size: 256,
505 reads: Arc::clone(&reads_for_reader),
506 })
507 },
508 256,
509 )
510 .run_with(TestSink::probe())
511 .expect("reader source materializes");
512
513 sink.request(1);
514 let first = sink.expect_next();
515 assert_eq!(first.len(), 256);
516
517 let last_seen = Arc::new(AtomicUsize::new(0));
518 let quiet_since_ms = Arc::new(AtomicU64::new(0));
519 let start = Instant::now();
520 assert!(wait_until(Duration::from_secs(2), {
521 let last_seen = Arc::clone(&last_seen);
522 let quiet_since_ms = Arc::clone(&quiet_since_ms);
523 let reads = Arc::clone(&reads);
524 move || {
525 let current = reads.load(Ordering::SeqCst);
526 let last = last_seen.load(Ordering::SeqCst);
527 if current != last {
528 last_seen.store(current, Ordering::SeqCst);
529 quiet_since_ms.store(start.elapsed().as_millis() as u64, Ordering::SeqCst);
530 return false;
531 }
532
533 let quiet_for =
534 start.elapsed().as_millis() as u64 - quiet_since_ms.load(Ordering::SeqCst);
535 current > 0 && quiet_for >= 100
536 }
537 }));
538
539 assert!(
540 reads.load(Ordering::SeqCst) <= READER_QUEUE_CAPACITY + 2,
541 "reader should plateau near the bounded queue capacity"
542 );
543
544 sink.request(usize::MAX);
545 let mut collected = first;
546 for chunk in sink.drain_until_complete() {
547 collected.extend_from_slice(&chunk);
548 }
549 assert_eq!(collected, payload);
550 }
551
552 #[test]
553 fn to_writer_writes_all_chunks_and_flushes_once() {
554 let writes = Arc::new(Mutex::new(Vec::new()));
555 let flushes = Arc::new(AtomicUsize::new(0));
556 let drops = Arc::new(AtomicUsize::new(0));
557 let completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
558 .run_with(StreamConverters::to_writer({
559 let writes = Arc::clone(&writes);
560 let flushes = Arc::clone(&flushes);
561 let drops = Arc::clone(&drops);
562 move || {
563 Ok(CountingWriter {
564 writes: Arc::clone(&writes),
565 flushes: Arc::clone(&flushes),
566 drops: Arc::clone(&drops),
567 fail_write: false,
568 })
569 }
570 }))
571 .expect("writer sink materializes");
572
573 completion.wait().expect("writer sink completes");
574 assert_eq!(
575 writes.lock().expect("writes poisoned").as_slice(),
576 &[b"ab".to_vec(), b"cd".to_vec()]
577 );
578 assert_eq!(flushes.load(Ordering::SeqCst), 1);
579 assert_eq!(drops.load(Ordering::SeqCst), 1);
580 }
581
582 #[test]
583 fn to_writer_flushes_and_drops_once_on_failure() {
584 let flushes = Arc::new(AtomicUsize::new(0));
585 let drops = Arc::new(AtomicUsize::new(0));
586 let completion = Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
587 .run_with(StreamConverters::to_writer({
588 let flushes = Arc::clone(&flushes);
589 let drops = Arc::clone(&drops);
590 move || {
591 Ok(CountingWriter {
592 writes: Arc::new(Mutex::new(Vec::new())),
593 flushes: Arc::clone(&flushes),
594 drops: Arc::clone(&drops),
595 fail_write: false,
596 })
597 }
598 }))
599 .expect("writer sink materializes");
600
601 assert_eq!(
602 completion.wait(),
603 Err(StreamError::Failed("upstream boom".to_owned()))
604 );
605 assert_eq!(flushes.load(Ordering::SeqCst), 1);
606 assert_eq!(drops.load(Ordering::SeqCst), 1);
607 }
608
609 #[test]
610 fn to_writer_flushes_and_drops_once_on_cancellation() {
611 let flushes = Arc::new(AtomicUsize::new(0));
612 let drops = Arc::new(AtomicUsize::new(0));
613 let completion = Source::repeat(vec![7_u8; 4])
614 .run_with(StreamConverters::to_writer({
615 let flushes = Arc::clone(&flushes);
616 let drops = Arc::clone(&drops);
617 move || {
618 Ok(CountingWriter {
619 writes: Arc::new(Mutex::new(Vec::new())),
620 flushes: Arc::clone(&flushes),
621 drops: Arc::clone(&drops),
622 fail_write: false,
623 })
624 }
625 }))
626 .expect("writer sink materializes");
627
628 drop(completion);
629 wait_for_counter(&flushes, 1);
630 wait_for_counter(&drops, 1);
631 }
632
633 #[test]
634 fn to_writer_surfaces_write_failure() {
635 let completion = Source::single(vec![1_u8])
636 .run_with(StreamConverters::to_writer(|| {
637 Ok(CountingWriter {
638 writes: Arc::new(Mutex::new(Vec::new())),
639 flushes: Arc::new(AtomicUsize::new(0)),
640 drops: Arc::new(AtomicUsize::new(0)),
641 fail_write: true,
642 })
643 }))
644 .expect("writer sink materializes");
645
646 assert_eq!(
647 completion.wait(),
648 Err(StreamError::Failed("writer boom".to_owned()))
649 );
650 }
651
652 #[test]
653 fn file_io_round_trips_bytes() {
654 let path = unique_temp_path("roundtrip");
655 let write_completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
656 .run_with(FileIO::to_path(path.clone()))
657 .expect("file sink materializes");
658 write_completion.wait().expect("file write completes");
659
660 let sink = FileIO::from_path(path.clone(), 2)
661 .run_with(TestSink::probe())
662 .expect("file source materializes");
663 sink.request(4);
664 sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
665 sink.expect_complete();
666
667 std::fs::remove_file(path).expect("remove roundtrip file");
668 }
669
670 #[test]
671 fn file_io_source_surfaces_open_failure() {
672 let missing = unique_temp_path("missing");
673 let result = FileIO::from_path(missing, 4).run_with(TestSink::probe());
674 assert!(matches!(result, Err(StreamError::Failed(_))));
675 }
676
677 #[test]
678 fn file_io_sink_creates_and_truncates_file() {
679 let path = unique_temp_path("truncate");
680 std::fs::write(&path, b"stale bytes").expect("seed file");
681
682 let completion = Source::single(b"ok".to_vec())
683 .run_with(FileIO::to_path(path.clone()))
684 .expect("file sink materializes");
685 completion.wait().expect("file write completes");
686 assert_eq!(std::fs::read(&path).expect("read file"), b"ok");
687
688 std::fs::remove_file(path).expect("remove truncate file");
689 }
690
691 #[test]
692 fn file_io_source_default_chunk_size_works() {
693 let path = unique_temp_path("default");
694 std::fs::write(&path, b"hi").expect("write seed file");
695
696 let sink = FileIO::from_path_default(path.clone())
697 .run_with(TestSink::probe())
698 .expect("file source materializes");
699 sink.request(2);
700 sink.assert_next(b"hi".to_vec());
701 sink.expect_complete();
702
703 std::fs::remove_file(path).expect("remove default file");
704 }
705}