1use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
2use crate::{StreamError, StreamResult};
3use std::collections::VecDeque;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::path::PathBuf;
7use std::sync::{
8 Arc, Condvar, Mutex,
9 atomic::{AtomicBool, Ordering},
10};
11use std::time::Duration;
12
13const DEFAULT_CHUNK_SIZE: usize = 8192;
14const READER_QUEUE_CAPACITY: usize = 8;
15const INPUT_STREAM_BUFFER_CAPACITY: usize = 16;
16const OUTPUT_STREAM_BUFFER_CAPACITY: usize = 16;
17
18fn io_error(error: std::io::Error) -> StreamError {
19 StreamError::Failed(error.to_string())
20}
21
22#[derive(Clone)]
23enum SourceTerminal {
24 Complete,
25 Error(StreamError),
26}
27
28struct SourceQueueState {
29 queue: VecDeque<Vec<u8>>,
30 terminal: Option<SourceTerminal>,
31}
32
33struct SourceQueue {
34 state: Mutex<SourceQueueState>,
35 available: Condvar,
36 space: Condvar,
37 capacity: usize,
38 cancelled: Arc<AtomicBool>,
39}
40
41impl SourceQueue {
42 fn new() -> Arc<Self> {
43 Arc::new(Self {
44 state: Mutex::new(SourceQueueState {
45 queue: VecDeque::new(),
46 terminal: None,
47 }),
48 available: Condvar::new(),
49 space: Condvar::new(),
50 capacity: READER_QUEUE_CAPACITY,
51 cancelled: Arc::new(AtomicBool::new(false)),
52 })
53 }
54
55 fn push(&self, chunk: Vec<u8>) -> bool {
56 let mut state = self.state.lock().expect("io source queue poisoned");
57 while state.queue.len() >= self.capacity
58 && state.terminal.is_none()
59 && !self.cancelled.load(Ordering::SeqCst)
60 {
61 state = self
62 .space
63 .wait(state)
64 .expect("io source queue poisoned while waiting for space");
65 }
66
67 if state.terminal.is_some() || self.cancelled.load(Ordering::SeqCst) {
68 return false;
69 }
70
71 if state.terminal.is_none() {
72 state.queue.push_back(chunk);
73 }
74 drop(state);
75 self.available.notify_all();
76 true
77 }
78
79 fn finish(&self, terminal: SourceTerminal) {
80 let mut state = self.state.lock().expect("io source queue poisoned");
81 if state.terminal.is_none() {
82 state.terminal = Some(terminal);
83 }
84 drop(state);
85 self.available.notify_all();
86 self.space.notify_all();
87 }
88}
89
90struct ReaderWorkerGuard {
91 queue: Arc<SourceQueue>,
92 armed: bool,
93}
94
95impl ReaderWorkerGuard {
96 fn new(queue: Arc<SourceQueue>) -> Self {
97 Self { queue, armed: true }
98 }
99
100 fn disarm(&mut self) {
101 self.armed = false;
102 }
103}
104
105impl Drop for ReaderWorkerGuard {
106 fn drop(&mut self) {
107 if self.armed {
108 self.queue
109 .finish(SourceTerminal::Error(StreamError::AbruptTermination));
110 }
111 }
112}
113
114struct ReaderSourceStream {
115 queue: Arc<SourceQueue>,
116 completion: Option<StreamCompletion<NotUsed>>,
117}
118
119impl Iterator for ReaderSourceStream {
120 type Item = StreamResult<Vec<u8>>;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 let mut state = self.queue.state.lock().expect("io source queue poisoned");
124 loop {
125 if let Some(chunk) = state.queue.pop_front() {
126 self.queue.space.notify_all();
127 return Some(Ok(chunk));
128 }
129 if let Some(terminal) = state.terminal.clone() {
130 return match terminal {
131 SourceTerminal::Complete => None,
132 SourceTerminal::Error(error) => Some(Err(error)),
133 };
134 }
135 state = self
136 .queue
137 .available
138 .wait(state)
139 .expect("io source queue poisoned while waiting");
140 }
141 }
142}
143
144impl Drop for ReaderSourceStream {
145 fn drop(&mut self) {
146 self.queue.cancelled.store(true, Ordering::SeqCst);
147 drop(self.queue.state.lock().unwrap_or_else(|p| p.into_inner()));
155 self.queue.available.notify_all();
156 self.queue.space.notify_all();
157 let _ = self.completion.take();
158 }
159}
160
161struct WriterGuard<W: Write> {
162 writer: W,
163 flushed: bool,
164}
165
166impl<W: Write> WriterGuard<W> {
167 fn new(writer: W) -> Self {
168 Self {
169 writer,
170 flushed: false,
171 }
172 }
173
174 fn writer_mut(&mut self) -> &mut W {
175 &mut self.writer
176 }
177
178 fn flush_once(&mut self) -> StreamResult<()> {
179 if self.flushed {
180 return Ok(());
181 }
182 self.writer.flush().map_err(io_error)?;
183 self.flushed = true;
184 Ok(())
185 }
186}
187
188impl<W: Write> Drop for WriterGuard<W> {
189 fn drop(&mut self) {
190 let _ = self.flush_once();
191 }
192}
193
194#[derive(Clone)]
197enum InputStreamTerminal {
198 Complete,
199 Error(StreamError),
200}
201
202struct InputStreamBufferState {
203 chunks: VecDeque<Vec<u8>>,
204 terminal: Option<InputStreamTerminal>,
205}
206
207struct InputStreamShared {
208 state: Mutex<InputStreamBufferState>,
209 available: Condvar,
210 space: Condvar,
211 cancelled: AtomicBool,
212}
213
214impl InputStreamShared {
215 fn new() -> Self {
216 Self {
217 state: Mutex::new(InputStreamBufferState {
218 chunks: VecDeque::new(),
219 terminal: None,
220 }),
221 available: Condvar::new(),
222 space: Condvar::new(),
223 cancelled: AtomicBool::new(false),
224 }
225 }
226
227 fn set_terminal(&self, terminal: InputStreamTerminal) {
228 let mut state = self.state.lock().expect("input stream buffer poisoned");
229 if state.terminal.is_none() {
230 state.terminal = Some(terminal);
231 }
232 drop(state);
233 self.available.notify_all();
234 self.space.notify_all();
235 }
236}
237
238pub struct InputStreamHandle {
245 shared: Arc<InputStreamShared>,
246 detached: Vec<u8>,
247 detached_offset: usize,
248 read_timeout: Duration,
249 stream_closed: bool,
250 _completion: StreamCompletion<NotUsed>,
251}
252
253impl InputStreamHandle {
254 fn new(
255 shared: Arc<InputStreamShared>,
256 read_timeout: Duration,
257 completion: StreamCompletion<NotUsed>,
258 ) -> Self {
259 Self {
260 shared,
261 detached: Vec::new(),
262 detached_offset: 0,
263 read_timeout,
264 stream_closed: false,
265 _completion: completion,
266 }
267 }
268}
269
270impl Read for InputStreamHandle {
271 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
272 if self.stream_closed {
273 return Err(io::Error::other(
274 "stream is terminated, no reads are possible",
275 ));
276 }
277 if buf.is_empty() {
278 return Ok(0);
279 }
280
281 let mut total = 0_usize;
282
283 if self.detached_offset < self.detached.len() {
285 let available = self.detached.len() - self.detached_offset;
286 let n = available.min(buf.len());
287 buf[..n]
288 .copy_from_slice(&self.detached[self.detached_offset..self.detached_offset + n]);
289 self.detached_offset += n;
290 total += n;
291 if self.detached_offset >= self.detached.len() {
292 self.detached.clear();
293 self.detached_offset = 0;
294 }
295 if total == buf.len() {
296 return Ok(total);
297 }
298 }
299
300 let mut state = self
301 .shared
302 .state
303 .lock()
304 .expect("input stream buffer poisoned");
305 loop {
306 while total < buf.len() {
308 if let Some(chunk) = state.chunks.pop_front() {
309 self.shared.space.notify_all();
310 drop(state);
311
312 let space = buf.len() - total;
313 let n = chunk.len().min(space);
314 buf[total..total + n].copy_from_slice(&chunk[..n]);
315 total += n;
316 if n < chunk.len() {
317 self.detached = chunk;
318 self.detached_offset = n;
319 }
320
321 state = self
323 .shared
324 .state
325 .lock()
326 .expect("input stream buffer poisoned");
327 continue;
328 }
329 break;
330 }
331
332 if total > 0 {
334 return Ok(total);
335 }
336
337 if let Some(terminal) = state.terminal.clone() {
339 return match terminal {
340 InputStreamTerminal::Complete => Ok(0),
341 InputStreamTerminal::Error(e) => {
342 Err(io::Error::other(format!("stream failed: {e}")))
343 }
344 };
345 }
346
347 let (new_state, timeout) = self
349 .shared
350 .available
351 .wait_timeout(state, self.read_timeout)
352 .expect("input stream buffer poisoned while waiting");
353 state = new_state;
354 if timeout.timed_out() && state.chunks.is_empty() && state.terminal.is_none() {
355 return Err(io::Error::new(
356 io::ErrorKind::TimedOut,
357 format!("timeout after {:?} waiting for new data", self.read_timeout),
358 ));
359 }
360 }
361 }
362}
363
364impl Drop for InputStreamHandle {
365 fn drop(&mut self) {
366 self.shared.cancelled.store(true, Ordering::SeqCst);
367 drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
368 self.shared.available.notify_all();
369 self.shared.space.notify_all();
370 }
371}
372
373#[derive(Clone)]
376#[allow(dead_code)]
377enum OutputStreamTerminal {
378 Complete,
379 Error(StreamError),
380}
381
382struct OutputStreamBufferState {
383 chunks: VecDeque<Vec<u8>>,
384 terminal: Option<OutputStreamTerminal>,
385}
386
387struct OutputStreamShared {
388 state: Mutex<OutputStreamBufferState>,
389 available: Condvar,
390 space: Condvar,
391 cancelled: AtomicBool,
392}
393
394impl OutputStreamShared {
395 fn new() -> Self {
396 Self {
397 state: Mutex::new(OutputStreamBufferState {
398 chunks: VecDeque::new(),
399 terminal: None,
400 }),
401 available: Condvar::new(),
402 space: Condvar::new(),
403 cancelled: AtomicBool::new(false),
404 }
405 }
406}
407
408struct OutputStreamSourceStream {
409 shared: Arc<OutputStreamShared>,
410 done: bool,
411}
412
413impl Iterator for OutputStreamSourceStream {
414 type Item = StreamResult<Vec<u8>>;
415
416 fn next(&mut self) -> Option<Self::Item> {
417 if self.done {
418 return None;
419 }
420
421 let mut state = self
422 .shared
423 .state
424 .lock()
425 .expect("output stream buffer poisoned");
426 loop {
427 if let Some(chunk) = state.chunks.pop_front() {
428 self.shared.space.notify_all();
429 return Some(Ok(chunk));
430 }
431
432 match &state.terminal {
433 Some(OutputStreamTerminal::Complete) => {
434 self.done = true;
435 return None;
436 }
437 Some(OutputStreamTerminal::Error(e)) => {
438 self.done = true;
439 return Some(Err(e.clone()));
440 }
441 None => {}
442 }
443
444 state = self
445 .shared
446 .available
447 .wait(state)
448 .expect("output stream buffer poisoned while waiting");
449 }
450 }
451}
452
453impl Drop for OutputStreamSourceStream {
454 fn drop(&mut self) {
455 self.shared.cancelled.store(true, Ordering::SeqCst);
456 drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
457 self.shared.available.notify_all();
458 self.shared.space.notify_all();
459 }
460}
461
462pub struct OutputStreamHandle {
469 shared: Arc<OutputStreamShared>,
470 write_timeout: Duration,
471 closed: AtomicBool,
472}
473
474impl OutputStreamHandle {
475 fn new(shared: Arc<OutputStreamShared>, write_timeout: Duration) -> Self {
476 Self {
477 shared,
478 write_timeout,
479 closed: AtomicBool::new(false),
480 }
481 }
482
483 pub fn close(&self) -> io::Result<()> {
488 self.closed.store(true, Ordering::SeqCst);
489 let mut state = self
490 .shared
491 .state
492 .lock()
493 .expect("output stream buffer poisoned");
494 if state.terminal.is_none() {
495 state.terminal = Some(OutputStreamTerminal::Complete);
496 }
497 drop(state);
498 self.shared.available.notify_all();
499 self.shared.space.notify_all();
500 Ok(())
501 }
502}
503
504impl Write for OutputStreamHandle {
505 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
506 if self.closed.load(Ordering::SeqCst) {
507 return Err(io::Error::new(
508 io::ErrorKind::BrokenPipe,
509 "stream is closed, no writes are possible",
510 ));
511 }
512 if buf.is_empty() {
513 return Ok(0);
514 }
515
516 let mut state = self
517 .shared
518 .state
519 .lock()
520 .expect("output stream buffer poisoned");
521 loop {
522 if self.closed.load(Ordering::SeqCst) || self.shared.cancelled.load(Ordering::SeqCst) {
523 return Err(io::Error::new(
524 io::ErrorKind::BrokenPipe,
525 "stream is closed, no writes are possible",
526 ));
527 }
528
529 if let Some(OutputStreamTerminal::Error(e)) = &state.terminal {
530 return Err(io::Error::other(format!("stream failed: {e}")));
531 }
532
533 if state.chunks.len() < OUTPUT_STREAM_BUFFER_CAPACITY {
534 state.chunks.push_back(buf.to_vec());
535 drop(state);
536 self.shared.available.notify_all();
537 return Ok(buf.len());
538 }
539
540 let (new_state, timeout) = self
541 .shared
542 .space
543 .wait_timeout(state, self.write_timeout)
544 .expect("output stream buffer poisoned while waiting");
545 state = new_state;
546 if timeout.timed_out()
547 && state.chunks.len() >= OUTPUT_STREAM_BUFFER_CAPACITY
548 && state.terminal.is_none()
549 {
550 return Err(io::Error::new(
551 io::ErrorKind::TimedOut,
552 format!(
553 "timed out trying to write data to stream after {:?}",
554 self.write_timeout
555 ),
556 ));
557 }
558 }
559 }
560
561 fn flush(&mut self) -> io::Result<()> {
562 Ok(())
563 }
564}
565
566impl Drop for OutputStreamHandle {
567 fn drop(&mut self) {
568 self.shared.cancelled.store(true, Ordering::SeqCst);
569 let _ = self.close();
570 }
571}
572
573pub struct StreamConverters;
574
575impl StreamConverters {
576 #[must_use]
577 pub fn from_reader<F, R>(factory: F, chunk_size: usize) -> Source<Vec<u8>>
578 where
579 F: Fn() -> std::io::Result<R> + Send + Sync + 'static,
580 R: Read + Send + 'static,
581 {
582 assert!(chunk_size > 0, "chunk size must be greater than zero");
583 Source::from_materialized_factory(move |materializer| {
584 let reader = factory().map_err(io_error)?;
585 let queue = SourceQueue::new();
586 let queue_for_worker = Arc::clone(&queue);
587 let cancelled = Arc::clone(&queue.cancelled);
588 let completion = materializer.spawn_stream(move |_worker_cancelled| {
589 let mut reader = reader;
590 let mut guard = ReaderWorkerGuard::new(Arc::clone(&queue_for_worker));
591 let mut buffer = vec![0_u8; chunk_size];
592
593 loop {
594 if cancelled.load(Ordering::SeqCst) {
595 guard.disarm();
596 return Ok(NotUsed);
597 }
598
599 match reader.read(&mut buffer) {
600 Ok(0) => {
601 guard.disarm();
602 queue_for_worker.finish(SourceTerminal::Complete);
603 return Ok(NotUsed);
604 }
605 Ok(read) => {
606 if !queue_for_worker.push(buffer[..read].to_vec()) {
607 guard.disarm();
608 return Ok(NotUsed);
609 }
610 }
611 Err(error) => {
612 guard.disarm();
613 queue_for_worker.finish(SourceTerminal::Error(io_error(error)));
614 return Ok(NotUsed);
615 }
616 }
617 }
618 });
619
620 Ok((
621 Box::new(ReaderSourceStream {
622 queue,
623 completion: Some(completion),
624 }) as BoxStream<Vec<u8>>,
625 NotUsed,
626 ))
627 })
628 }
629
630 #[must_use]
631 pub fn to_writer<F, W>(factory: F) -> Sink<Vec<u8>, StreamCompletion<NotUsed>>
632 where
633 F: Fn() -> std::io::Result<W> + Send + Sync + 'static,
634 W: Write + Send + 'static,
635 {
636 Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
637 let writer = WriterGuard::new(factory().map_err(io_error)?);
638 Ok(materializer.spawn_stream(move |cancelled| {
639 let mut input = input;
640 let mut writer = writer;
641 loop {
642 if cancelled.load(Ordering::SeqCst) {
643 let _ = writer.flush_once();
644 return Err(StreamError::Cancelled);
645 }
646
647 match input.next() {
648 Some(Ok(chunk)) => {
649 writer.writer_mut().write_all(&chunk).map_err(io_error)?
650 }
651 Some(Err(error)) => {
652 let _ = writer.flush_once();
653 return Err(error);
654 }
655 None => {
656 writer.flush_once()?;
657 return Ok(NotUsed);
658 }
659 }
660 }
661 }))
662 })
663 }
664
665 #[must_use]
679 pub fn as_input_stream(read_timeout: Duration) -> Sink<Vec<u8>, InputStreamHandle> {
680 Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
681 let shared = Arc::new(InputStreamShared::new());
682 let shared_for_worker = Arc::clone(&shared);
683
684 let completion = materializer.spawn_stream(move |_task_cancelled| {
685 let mut input = input;
686 loop {
687 if shared_for_worker.cancelled.load(Ordering::SeqCst) {
688 return Ok(NotUsed);
689 }
690
691 match input.next() {
692 Some(Ok(chunk)) => {
693 let mut state = shared_for_worker
694 .state
695 .lock()
696 .expect("input stream buffer poisoned");
697 while state.chunks.len() >= INPUT_STREAM_BUFFER_CAPACITY
698 && state.terminal.is_none()
699 && !shared_for_worker.cancelled.load(Ordering::SeqCst)
700 {
701 state = shared_for_worker
702 .space
703 .wait(state)
704 .expect("input stream buffer poisoned while waiting");
705 }
706
707 if state.terminal.is_some()
708 || shared_for_worker.cancelled.load(Ordering::SeqCst)
709 {
710 return Ok(NotUsed);
711 }
712
713 if !chunk.is_empty() {
714 state.chunks.push_back(chunk);
715 }
716 drop(state);
717 shared_for_worker.available.notify_all();
718 }
719 Some(Err(e)) => {
720 shared_for_worker.set_terminal(InputStreamTerminal::Error(e));
721 return Ok(NotUsed);
722 }
723 None => {
724 shared_for_worker.set_terminal(InputStreamTerminal::Complete);
725 return Ok(NotUsed);
726 }
727 }
728 }
729 });
730
731 Ok(InputStreamHandle::new(shared, read_timeout, completion))
732 })
733 }
734
735 #[must_use]
749 pub fn as_output_stream(write_timeout: Duration) -> Source<Vec<u8>, OutputStreamHandle> {
750 Source::from_materialized_factory(move |_materializer| {
751 let shared = Arc::new(OutputStreamShared::new());
752 let handle = OutputStreamHandle::new(Arc::clone(&shared), write_timeout);
753 let stream = OutputStreamSourceStream {
754 shared,
755 done: false,
756 };
757 Ok((Box::new(stream) as BoxStream<Vec<u8>>, handle))
758 })
759 }
760}
761
762pub struct FileIO;
763
764impl FileIO {
765 #[must_use]
766 pub fn from_path(path: impl Into<PathBuf>, chunk_size: usize) -> Source<Vec<u8>> {
767 let path = path.into();
768 StreamConverters::from_reader(move || File::open(&path), chunk_size)
769 }
770
771 #[must_use]
772 pub fn from_path_default(path: impl Into<PathBuf>) -> Source<Vec<u8>> {
773 Self::from_path(path, DEFAULT_CHUNK_SIZE)
774 }
775
776 #[must_use]
777 pub fn to_path(path: impl Into<PathBuf>) -> Sink<Vec<u8>, StreamCompletion<NotUsed>> {
778 let path = path.into();
779 StreamConverters::to_writer(move || {
780 OpenOptions::new()
781 .create(true)
782 .truncate(true)
783 .write(true)
784 .open(&path)
785 })
786 }
787}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792 use crate::Source;
793 use crate::testkit::TestSink;
794 use std::io::Cursor;
795 use std::sync::atomic::{AtomicU64, AtomicUsize};
796 use std::thread;
797 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
798
799 fn wait_for_counter(counter: &AtomicUsize, expected: usize) {
800 let deadline = std::time::Instant::now() + Duration::from_secs(1);
801 while std::time::Instant::now() < deadline {
802 if counter.load(Ordering::SeqCst) == expected {
803 return;
804 }
805 thread::sleep(Duration::from_millis(5));
806 }
807 assert_eq!(counter.load(Ordering::SeqCst), expected);
808 }
809
810 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
811 let deadline = Instant::now() + timeout;
812 while Instant::now() < deadline {
813 if condition() {
814 return true;
815 }
816 thread::sleep(Duration::from_millis(5));
817 }
818 condition()
819 }
820
821 fn unique_temp_path(name: &str) -> PathBuf {
822 let nanos = SystemTime::now()
823 .duration_since(UNIX_EPOCH)
824 .expect("clock after epoch")
825 .as_nanos();
826 std::env::temp_dir().join(format!(
827 "datum-wp12-{name}-{}-{nanos}.bin",
828 std::process::id()
829 ))
830 }
831
832 struct CountingReader {
833 inner: Cursor<Vec<u8>>,
834 drops: Arc<AtomicUsize>,
835 }
836
837 impl Read for CountingReader {
838 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
839 self.inner.read(buf)
840 }
841 }
842
843 impl Drop for CountingReader {
844 fn drop(&mut self) {
845 self.drops.fetch_add(1, Ordering::SeqCst);
846 }
847 }
848
849 struct CountingWriter {
850 writes: Arc<Mutex<Vec<Vec<u8>>>>,
851 flushes: Arc<AtomicUsize>,
852 drops: Arc<AtomicUsize>,
853 fail_write: bool,
854 }
855
856 impl Write for CountingWriter {
857 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
858 if self.fail_write {
859 return Err(std::io::Error::other("writer boom"));
860 }
861 self.writes
862 .lock()
863 .expect("writer log poisoned")
864 .push(buf.to_vec());
865 Ok(buf.len())
866 }
867
868 fn flush(&mut self) -> std::io::Result<()> {
869 self.flushes.fetch_add(1, Ordering::SeqCst);
870 Ok(())
871 }
872 }
873
874 impl Drop for CountingWriter {
875 fn drop(&mut self) {
876 self.drops.fetch_add(1, Ordering::SeqCst);
877 }
878 }
879
880 struct CountingChunkReader {
881 inner: Cursor<Vec<u8>>,
882 chunk_size: usize,
883 reads: Arc<AtomicUsize>,
884 }
885
886 impl Read for CountingChunkReader {
887 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
888 self.reads.fetch_add(1, Ordering::SeqCst);
889 let mut chunk = vec![0_u8; self.chunk_size.min(buf.len())];
890 let read = self.inner.read(&mut chunk)?;
891 buf[..read].copy_from_slice(&chunk[..read]);
892 Ok(read)
893 }
894 }
895
896 #[test]
897 fn from_reader_emits_chunked_bytes_and_completes() {
898 let sink = StreamConverters::from_reader(|| Ok(Cursor::new(b"abcdef".to_vec())), 2)
899 .run_with(TestSink::probe())
900 .expect("reader source materializes");
901
902 sink.request(4);
903 sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec(), b"ef".to_vec()]);
904 sink.expect_complete();
905 }
906
907 #[test]
908 fn from_reader_closes_exactly_once_on_completion() {
909 let drops = Arc::new(AtomicUsize::new(0));
910 let drops_for_reader = Arc::clone(&drops);
911 let sink = StreamConverters::from_reader(
912 move || {
913 Ok(CountingReader {
914 inner: Cursor::new(b"hello".to_vec()),
915 drops: Arc::clone(&drops_for_reader),
916 })
917 },
918 8,
919 )
920 .run_with(TestSink::probe())
921 .expect("reader source materializes");
922
923 sink.request(2);
924 sink.assert_next(b"hello".to_vec());
925 sink.expect_complete();
926 wait_for_counter(&drops, 1);
927 }
928
929 #[test]
930 fn from_reader_closes_exactly_once_on_cancellation() {
931 let drops = Arc::new(AtomicUsize::new(0));
932 let drops_for_reader = Arc::clone(&drops);
933 let mut sink = StreamConverters::from_reader(
934 move || {
935 Ok(CountingReader {
936 inner: Cursor::new(vec![1_u8; 32]),
937 drops: Arc::clone(&drops_for_reader),
938 })
939 },
940 4,
941 )
942 .run_with(TestSink::probe())
943 .expect("reader source materializes");
944
945 sink.request(1);
946 sink.assert_next(vec![1_u8; 4]);
947 sink.cancel();
948 wait_for_counter(&drops, 1);
949 }
950
951 #[test]
952 fn from_reader_surfaces_read_failure() {
953 struct FailingReader;
954
955 impl Read for FailingReader {
956 fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
957 Err(std::io::Error::other("reader boom"))
958 }
959 }
960
961 let sink = StreamConverters::from_reader(|| Ok(FailingReader), 8)
962 .run_with(TestSink::probe())
963 .expect("reader source materializes");
964
965 sink.request(1);
966 assert_eq!(
967 sink.expect_error(),
968 StreamError::Failed("reader boom".to_owned())
969 );
970 }
971
972 #[test]
973 fn from_reader_caps_buffered_read_ahead_for_slow_consumers() {
974 let payload: Vec<u8> = (0..64_u8).cycle().take(8192 * 16).collect();
975 let payload_for_reader = payload.clone();
976 let reads = Arc::new(AtomicUsize::new(0));
977 let reads_for_reader = Arc::clone(&reads);
978 let sink = StreamConverters::from_reader(
979 move || {
980 Ok(CountingChunkReader {
981 inner: Cursor::new(payload_for_reader.clone()),
982 chunk_size: 256,
983 reads: Arc::clone(&reads_for_reader),
984 })
985 },
986 256,
987 )
988 .run_with(TestSink::probe())
989 .expect("reader source materializes");
990
991 sink.request(1);
992 let first = sink.expect_next();
993 assert_eq!(first.len(), 256);
994
995 let last_seen = Arc::new(AtomicUsize::new(0));
996 let quiet_since_ms = Arc::new(AtomicU64::new(0));
997 let start = Instant::now();
998 assert!(wait_until(Duration::from_secs(2), {
999 let last_seen = Arc::clone(&last_seen);
1000 let quiet_since_ms = Arc::clone(&quiet_since_ms);
1001 let reads = Arc::clone(&reads);
1002 move || {
1003 let current = reads.load(Ordering::SeqCst);
1004 let last = last_seen.load(Ordering::SeqCst);
1005 if current != last {
1006 last_seen.store(current, Ordering::SeqCst);
1007 quiet_since_ms.store(start.elapsed().as_millis() as u64, Ordering::SeqCst);
1008 return false;
1009 }
1010
1011 let quiet_for =
1012 start.elapsed().as_millis() as u64 - quiet_since_ms.load(Ordering::SeqCst);
1013 current > 0 && quiet_for >= 100
1014 }
1015 }));
1016
1017 assert!(
1018 reads.load(Ordering::SeqCst) <= READER_QUEUE_CAPACITY + 2,
1019 "reader should plateau near the bounded queue capacity"
1020 );
1021
1022 sink.request(usize::MAX);
1023 let mut collected = first;
1024 for chunk in sink.drain_until_complete() {
1025 collected.extend_from_slice(&chunk);
1026 }
1027 assert_eq!(collected, payload);
1028 }
1029
1030 #[test]
1031 fn to_writer_writes_all_chunks_and_flushes_once() {
1032 let writes = Arc::new(Mutex::new(Vec::new()));
1033 let flushes = Arc::new(AtomicUsize::new(0));
1034 let drops = Arc::new(AtomicUsize::new(0));
1035 let completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
1036 .run_with(StreamConverters::to_writer({
1037 let writes = Arc::clone(&writes);
1038 let flushes = Arc::clone(&flushes);
1039 let drops = Arc::clone(&drops);
1040 move || {
1041 Ok(CountingWriter {
1042 writes: Arc::clone(&writes),
1043 flushes: Arc::clone(&flushes),
1044 drops: Arc::clone(&drops),
1045 fail_write: false,
1046 })
1047 }
1048 }))
1049 .expect("writer sink materializes");
1050
1051 completion.wait().expect("writer sink completes");
1052 assert_eq!(
1053 writes.lock().expect("writes poisoned").as_slice(),
1054 &[b"ab".to_vec(), b"cd".to_vec()]
1055 );
1056 assert_eq!(flushes.load(Ordering::SeqCst), 1);
1057 assert_eq!(drops.load(Ordering::SeqCst), 1);
1058 }
1059
1060 #[test]
1061 fn to_writer_flushes_and_drops_once_on_failure() {
1062 let flushes = Arc::new(AtomicUsize::new(0));
1063 let drops = Arc::new(AtomicUsize::new(0));
1064 let completion = Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
1065 .run_with(StreamConverters::to_writer({
1066 let flushes = Arc::clone(&flushes);
1067 let drops = Arc::clone(&drops);
1068 move || {
1069 Ok(CountingWriter {
1070 writes: Arc::new(Mutex::new(Vec::new())),
1071 flushes: Arc::clone(&flushes),
1072 drops: Arc::clone(&drops),
1073 fail_write: false,
1074 })
1075 }
1076 }))
1077 .expect("writer sink materializes");
1078
1079 assert_eq!(
1080 completion.wait(),
1081 Err(StreamError::Failed("upstream boom".to_owned()))
1082 );
1083 assert_eq!(flushes.load(Ordering::SeqCst), 1);
1084 assert_eq!(drops.load(Ordering::SeqCst), 1);
1085 }
1086
1087 #[test]
1088 fn to_writer_flushes_and_drops_once_on_cancellation() {
1089 let flushes = Arc::new(AtomicUsize::new(0));
1090 let drops = Arc::new(AtomicUsize::new(0));
1091 let completion = Source::repeat(vec![7_u8; 4])
1092 .run_with(StreamConverters::to_writer({
1093 let flushes = Arc::clone(&flushes);
1094 let drops = Arc::clone(&drops);
1095 move || {
1096 Ok(CountingWriter {
1097 writes: Arc::new(Mutex::new(Vec::new())),
1098 flushes: Arc::clone(&flushes),
1099 drops: Arc::clone(&drops),
1100 fail_write: false,
1101 })
1102 }
1103 }))
1104 .expect("writer sink materializes");
1105
1106 drop(completion);
1107 wait_for_counter(&flushes, 1);
1108 wait_for_counter(&drops, 1);
1109 }
1110
1111 #[test]
1112 fn to_writer_surfaces_write_failure() {
1113 let completion = Source::single(vec![1_u8])
1114 .run_with(StreamConverters::to_writer(|| {
1115 Ok(CountingWriter {
1116 writes: Arc::new(Mutex::new(Vec::new())),
1117 flushes: Arc::new(AtomicUsize::new(0)),
1118 drops: Arc::new(AtomicUsize::new(0)),
1119 fail_write: true,
1120 })
1121 }))
1122 .expect("writer sink materializes");
1123
1124 assert_eq!(
1125 completion.wait(),
1126 Err(StreamError::Failed("writer boom".to_owned()))
1127 );
1128 }
1129
1130 #[test]
1131 fn file_io_round_trips_bytes() {
1132 let path = unique_temp_path("roundtrip");
1133 let write_completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
1134 .run_with(FileIO::to_path(path.clone()))
1135 .expect("file sink materializes");
1136 write_completion.wait().expect("file write completes");
1137
1138 let sink = FileIO::from_path(path.clone(), 2)
1139 .run_with(TestSink::probe())
1140 .expect("file source materializes");
1141 sink.request(4);
1142 sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
1143 sink.expect_complete();
1144
1145 std::fs::remove_file(path).expect("remove roundtrip file");
1146 }
1147
1148 #[test]
1149 fn file_io_source_surfaces_open_failure() {
1150 let missing = unique_temp_path("missing");
1151 let result = FileIO::from_path(missing, 4).run_with(TestSink::probe());
1152 assert!(matches!(result, Err(StreamError::Failed(_))));
1153 }
1154
1155 #[test]
1156 fn file_io_sink_creates_and_truncates_file() {
1157 let path = unique_temp_path("truncate");
1158 std::fs::write(&path, b"stale bytes").expect("seed file");
1159
1160 let completion = Source::single(b"ok".to_vec())
1161 .run_with(FileIO::to_path(path.clone()))
1162 .expect("file sink materializes");
1163 completion.wait().expect("file write completes");
1164 assert_eq!(std::fs::read(&path).expect("read file"), b"ok");
1165
1166 std::fs::remove_file(path).expect("remove truncate file");
1167 }
1168
1169 #[test]
1170 fn file_io_source_default_chunk_size_works() {
1171 let path = unique_temp_path("default");
1172 std::fs::write(&path, b"hi").expect("write seed file");
1173
1174 let sink = FileIO::from_path_default(path.clone())
1175 .run_with(TestSink::probe())
1176 .expect("file source materializes");
1177 sink.request(2);
1178 sink.assert_next(b"hi".to_vec());
1179 sink.expect_complete();
1180
1181 std::fs::remove_file(path).expect("remove default file");
1182 }
1183
1184 #[test]
1187 fn as_input_stream_reads_data_written_by_stream() {
1188 let mut handle: InputStreamHandle =
1189 Source::from_iter([b"hello".to_vec(), b"world".to_vec()])
1190 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1191 .expect("input stream sink materializes");
1192
1193 let mut buf = [0_u8; 32];
1197 let mut total = 0_usize;
1198 loop {
1199 let n = handle.read(&mut buf[total..]).expect("read succeeds");
1200 if n == 0 {
1201 break;
1202 }
1203 total += n;
1204 }
1205 assert_eq!(&buf[..total], b"helloworld");
1206 }
1207
1208 #[test]
1209 fn as_input_stream_eof_when_stream_completes() {
1210 let mut handle: InputStreamHandle = Source::from_iter([b"x".to_vec()])
1211 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1212 .expect("input stream sink materializes");
1213
1214 let mut buf = [0_u8; 4];
1215 let n = handle.read(&mut buf).expect("first read succeeds");
1216 assert_eq!(&buf[..n], b"x");
1217
1218 let n = handle.read(&mut buf).expect("second read returns eof");
1220 assert_eq!(n, 0);
1221 }
1222
1223 #[test]
1224 fn as_input_stream_partial_reads_work() {
1225 let mut handle: InputStreamHandle = Source::single(b"abcde".to_vec())
1226 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1227 .expect("input stream sink materializes");
1228
1229 let mut small = [0_u8; 2];
1230 let n = handle.read(&mut small).expect("small read succeeds");
1231 assert_eq!(n, 2);
1232 assert_eq!(&small[..], b"ab");
1233
1234 let n = handle.read(&mut small).expect("second small read succeeds");
1236 assert_eq!(n, 2);
1237 assert_eq!(&small[..], b"cd");
1238
1239 let n = handle.read(&mut small).expect("third small read succeeds");
1241 assert_eq!(n, 1);
1242 assert_eq!(&small[..1], b"e");
1243
1244 let n = handle.read(&mut small).expect("fourth read returns eof");
1246 assert_eq!(n, 0);
1247 }
1248
1249 #[test]
1250 fn as_input_stream_error_surfaces_as_io_error() {
1251 let mut handle: InputStreamHandle =
1252 Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
1253 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1254 .expect("input stream sink materializes");
1255
1256 let mut buf = [0_u8; 8];
1257 let err = handle.read(&mut buf).expect_err("read surfaces error");
1258 let msg = err.to_string();
1259 assert!(msg.contains("upstream boom"), "got: {msg}");
1260 }
1261
1262 #[test]
1263 fn as_input_stream_cancellation_stops_reads() {
1264 let mut handle: InputStreamHandle = Source::repeat(b"x".to_vec())
1265 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1266 .expect("input stream sink materializes");
1267
1268 let mut buf = [0_u8; 3];
1269 let n = handle.read(&mut buf).expect("first read succeeds");
1270 assert!((1..=3).contains(&n), "expected 1..=3 bytes, got {n}");
1274
1275 drop(handle);
1276
1277 }
1280
1281 #[test]
1282 fn as_input_stream_read_timeout_returns_timed_out() {
1283 let mut handle: InputStreamHandle = Source::<Vec<u8>>::never()
1285 .run_with(StreamConverters::as_input_stream(Duration::from_millis(10)))
1286 .expect("input stream sink materializes");
1287
1288 let mut buf = [0_u8; 4];
1289 let err = handle.read(&mut buf).expect_err("read times out");
1290 assert_eq!(err.kind(), io::ErrorKind::TimedOut);
1291 }
1292
1293 #[test]
1294 fn as_output_stream_writes_data_appear_in_stream() {
1295 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1296 .to_mat(Sink::collect(), crate::Keep::both)
1297 .run()
1298 .expect("output stream source materializes");
1299
1300 handle.write_all(b"alpha").expect("first write succeeds");
1301 handle.write_all(b"beta").expect("second write succeeds");
1302 handle.close().expect("close succeeds");
1303
1304 let chunks = completion.wait().expect("stream completes");
1305 assert_eq!(chunks, vec![b"alpha".to_vec(), b"beta".to_vec()]);
1306 }
1307
1308 #[test]
1309 fn as_output_stream_close_completes_stream() {
1310 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1311 .to_mat(Sink::collect(), crate::Keep::both)
1312 .run()
1313 .expect("output stream source materializes");
1314
1315 handle.write_all(b"done").expect("write succeeds");
1316 let result = handle.close();
1317 assert!(result.is_ok());
1318
1319 let chunks = completion.wait().expect("stream completes after close");
1320 assert_eq!(chunks, vec![b"done".to_vec()]);
1321 }
1322
1323 #[test]
1324 fn as_output_stream_write_after_close_fails() {
1325 let (mut handle, _completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1326 .to_mat(Sink::ignore(), crate::Keep::both)
1327 .run()
1328 .expect("output stream source materializes");
1329
1330 handle.close().expect("first close succeeds");
1331 let err = handle.write(b"late").expect_err("write after close fails");
1332 assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
1333 }
1334
1335 #[test]
1336 fn as_output_stream_cancellation_stops_writes() {
1337 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1338 .to_mat(Sink::ignore(), crate::Keep::both)
1339 .run()
1340 .expect("output stream source materializes");
1341
1342 handle.write_all(b"ok").expect("write succeeds");
1343
1344 drop(completion);
1346
1347 let deadline = std::time::Instant::now() + Duration::from_secs(1);
1349 let mut last_err = None;
1350 while std::time::Instant::now() < deadline {
1351 match handle.write(b"after cancel") {
1352 Err(e) => {
1353 last_err = Some(e);
1354 break;
1355 }
1356 _ => thread::sleep(Duration::from_millis(5)),
1357 }
1358 }
1359 let err = last_err.expect("write after cancellation should fail");
1360 assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
1361 }
1362
1363 #[test]
1364 fn as_output_stream_write_timeout_returns_timed_out() {
1365 let hang_sink: Sink<Vec<u8>, StreamCompletion<NotUsed>> =
1368 Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
1369 Ok(materializer.spawn_stream(move |cancelled| {
1370 let _input = input;
1371 loop {
1372 if cancelled.load(Ordering::SeqCst) {
1373 return Ok(NotUsed);
1374 }
1375 thread::sleep(Duration::from_millis(1));
1376 }
1377 }))
1378 });
1379 let (mut handle, _hang_completion) =
1380 StreamConverters::as_output_stream(Duration::from_millis(50))
1381 .to_mat(hang_sink, crate::Keep::both)
1382 .run()
1383 .expect("output stream source materializes");
1384
1385 let capacity = 16_usize;
1386
1387 for _ in 0..capacity {
1389 handle.write_all(b"x").expect("buffer-fill write succeeds");
1390 }
1391
1392 let err = handle.write(b"overflow").expect_err("write times out");
1394 assert_eq!(err.kind(), io::ErrorKind::TimedOut);
1395 }
1396
1397 #[test]
1398 fn as_output_stream_flush_is_noop() {
1399 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1400 .to_mat(Sink::collect(), crate::Keep::both)
1401 .run()
1402 .expect("output stream source materializes");
1403
1404 handle.write_all(b"data").expect("write succeeds");
1405 handle.flush().expect("flush is a noop");
1406 handle.close().expect("close succeeds");
1407
1408 let chunks = completion.wait().expect("stream completes");
1409 assert_eq!(chunks, vec![b"data".to_vec()]);
1410 }
1411
1412 #[test]
1413 fn as_output_stream_drop_completes_stream() {
1414 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1415 .to_mat(Sink::collect(), crate::Keep::both)
1416 .run()
1417 .expect("output stream source materializes");
1418
1419 handle.write_all(b"drop-me").expect("write succeeds");
1420 drop(handle);
1421
1422 let chunks = completion.wait().expect("stream completes after drop");
1423 assert_eq!(chunks, vec![b"drop-me".to_vec()]);
1424 }
1425
1426 #[test]
1427 fn round_trip_output_stream_to_input_stream() {
1428 let (mut out_handle, mut in_handle): (OutputStreamHandle, InputStreamHandle) =
1431 StreamConverters::as_output_stream(Duration::from_secs(5))
1432 .to_mat(
1433 StreamConverters::as_input_stream(Duration::from_secs(5)),
1434 crate::Keep::both,
1435 )
1436 .run()
1437 .expect("round-trip stream materializes");
1438
1439 out_handle.write_all(b"round").expect("write round");
1440 out_handle.write_all(b"trip").expect("write trip");
1441 out_handle.close().expect("close output");
1442
1443 let mut buf = [0_u8; 16];
1444 let mut total = 0_usize;
1445 loop {
1446 let n = in_handle.read(&mut buf[total..]).expect("read");
1447 if n == 0 {
1448 break;
1449 }
1450 total += n;
1451 }
1452 assert_eq!(&buf[..total], b"roundtrip");
1453 }
1454
1455 #[test]
1456 fn as_input_stream_empty_buf_read_returns_zero() {
1457 let mut handle: InputStreamHandle = Source::single(b"abc".to_vec())
1458 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1459 .expect("input stream sink materializes");
1460
1461 let n = handle.read(&mut []).expect("empty read succeeds");
1462 assert_eq!(n, 0);
1463 }
1464
1465 #[test]
1466 fn as_input_stream_large_read_across_multiple_chunks() {
1467 let chunks: Vec<Vec<u8>> = (0..10).map(|i| vec![i as u8; 3]).collect();
1470 let total_bytes: usize = chunks.iter().map(|c| c.len()).sum();
1471
1472 let mut handle: InputStreamHandle = Source::from_iter(chunks)
1473 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1474 .expect("input stream sink materializes");
1475
1476 let mut buf = vec![0_u8; total_bytes];
1477 let mut total = 0_usize;
1478 loop {
1479 let n = handle.read(&mut buf[total..]).expect("large read succeeds");
1480 if n == 0 {
1481 break;
1482 }
1483 total += n;
1484 }
1485 assert_eq!(total, total_bytes);
1486 }
1487}