1use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
2use crate::{StreamError, StreamResult};
3use std::collections::VecDeque;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::path::PathBuf;
7use std::sync::{
8 Arc, Condvar, Mutex,
9 atomic::{AtomicBool, Ordering},
10};
11use std::time::Duration;
12
13const DEFAULT_CHUNK_SIZE: usize = 8192;
14const READER_QUEUE_CAPACITY: usize = 8;
15const INPUT_STREAM_BUFFER_CAPACITY: usize = 16;
16const OUTPUT_STREAM_BUFFER_CAPACITY: usize = 16;
17
18fn io_error(error: std::io::Error) -> StreamError {
19 StreamError::Failed(error.to_string())
20}
21
22#[derive(Clone)]
23enum SourceTerminal {
24 Complete,
25 Error(StreamError),
26}
27
28struct SourceQueueState {
29 queue: VecDeque<Vec<u8>>,
30 terminal: Option<SourceTerminal>,
31}
32
33struct SourceQueue {
34 state: Mutex<SourceQueueState>,
35 available: Condvar,
36 space: Condvar,
37 capacity: usize,
38 cancelled: Arc<AtomicBool>,
39}
40
41impl SourceQueue {
42 fn new() -> Arc<Self> {
43 Arc::new(Self {
44 state: Mutex::new(SourceQueueState {
45 queue: VecDeque::new(),
46 terminal: None,
47 }),
48 available: Condvar::new(),
49 space: Condvar::new(),
50 capacity: READER_QUEUE_CAPACITY,
51 cancelled: Arc::new(AtomicBool::new(false)),
52 })
53 }
54
55 fn push(&self, chunk: Vec<u8>) -> bool {
56 let mut state = self.state.lock().expect("io source queue poisoned");
57 while state.queue.len() >= self.capacity
58 && state.terminal.is_none()
59 && !self.cancelled.load(Ordering::SeqCst)
60 {
61 state = self
62 .space
63 .wait(state)
64 .expect("io source queue poisoned while waiting for space");
65 }
66
67 if state.terminal.is_some() || self.cancelled.load(Ordering::SeqCst) {
68 return false;
69 }
70
71 if state.terminal.is_none() {
72 state.queue.push_back(chunk);
73 }
74 drop(state);
75 self.available.notify_all();
76 true
77 }
78
79 fn finish(&self, terminal: SourceTerminal) {
80 let mut state = self.state.lock().expect("io source queue poisoned");
81 if state.terminal.is_none() {
82 state.terminal = Some(terminal);
83 }
84 drop(state);
85 self.available.notify_all();
86 self.space.notify_all();
87 }
88}
89
90struct ReaderWorkerGuard {
91 queue: Arc<SourceQueue>,
92 armed: bool,
93}
94
95impl ReaderWorkerGuard {
96 fn new(queue: Arc<SourceQueue>) -> Self {
97 Self { queue, armed: true }
98 }
99
100 fn disarm(&mut self) {
101 self.armed = false;
102 }
103}
104
105impl Drop for ReaderWorkerGuard {
106 fn drop(&mut self) {
107 if self.armed {
108 self.queue
109 .finish(SourceTerminal::Error(StreamError::AbruptTermination));
110 }
111 }
112}
113
114struct ReaderSourceStream {
115 queue: Arc<SourceQueue>,
116 completion: Option<StreamCompletion<NotUsed>>,
117}
118
119impl Iterator for ReaderSourceStream {
120 type Item = StreamResult<Vec<u8>>;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 let mut state = self.queue.state.lock().expect("io source queue poisoned");
124 loop {
125 if let Some(chunk) = state.queue.pop_front() {
126 self.queue.space.notify_all();
127 return Some(Ok(chunk));
128 }
129 if let Some(terminal) = state.terminal.clone() {
130 return match terminal {
131 SourceTerminal::Complete => None,
132 SourceTerminal::Error(error) => Some(Err(error)),
133 };
134 }
135 state = self
136 .queue
137 .available
138 .wait(state)
139 .expect("io source queue poisoned while waiting");
140 }
141 }
142}
143
144impl Drop for ReaderSourceStream {
145 fn drop(&mut self) {
146 self.queue.cancelled.store(true, Ordering::SeqCst);
147 drop(self.queue.state.lock().unwrap_or_else(|p| p.into_inner()));
155 self.queue.available.notify_all();
156 self.queue.space.notify_all();
157 let _ = self.completion.take();
158 }
159}
160
161struct WriterGuard<W: Write> {
162 writer: W,
163 flushed: bool,
164}
165
166impl<W: Write> WriterGuard<W> {
167 fn new(writer: W) -> Self {
168 Self {
169 writer,
170 flushed: false,
171 }
172 }
173
174 fn writer_mut(&mut self) -> &mut W {
175 &mut self.writer
176 }
177
178 fn flush_once(&mut self) -> StreamResult<()> {
179 if self.flushed {
180 return Ok(());
181 }
182 self.writer.flush().map_err(io_error)?;
183 self.flushed = true;
184 Ok(())
185 }
186}
187
188impl<W: Write> Drop for WriterGuard<W> {
189 fn drop(&mut self) {
190 let _ = self.flush_once();
191 }
192}
193
194#[derive(Clone)]
197enum InputStreamTerminal {
198 Complete,
199 Error(StreamError),
200}
201
202struct InputStreamBufferState {
203 chunks: VecDeque<Vec<u8>>,
204 terminal: Option<InputStreamTerminal>,
205}
206
207struct InputStreamShared {
208 state: Mutex<InputStreamBufferState>,
209 available: Condvar,
210 space: Condvar,
211 cancelled: AtomicBool,
212}
213
214impl InputStreamShared {
215 fn new() -> Self {
216 Self {
217 state: Mutex::new(InputStreamBufferState {
218 chunks: VecDeque::new(),
219 terminal: None,
220 }),
221 available: Condvar::new(),
222 space: Condvar::new(),
223 cancelled: AtomicBool::new(false),
224 }
225 }
226
227 fn set_terminal(&self, terminal: InputStreamTerminal) {
228 let mut state = self.state.lock().expect("input stream buffer poisoned");
229 if state.terminal.is_none() {
230 state.terminal = Some(terminal);
231 }
232 drop(state);
233 self.available.notify_all();
234 self.space.notify_all();
235 }
236}
237
238pub struct InputStreamHandle {
245 shared: Arc<InputStreamShared>,
246 detached: Vec<u8>,
247 detached_offset: usize,
248 read_timeout: Duration,
249 stream_closed: bool,
250 _completion: StreamCompletion<NotUsed>,
251}
252
253impl InputStreamHandle {
254 fn new(
255 shared: Arc<InputStreamShared>,
256 read_timeout: Duration,
257 completion: StreamCompletion<NotUsed>,
258 ) -> Self {
259 Self {
260 shared,
261 detached: Vec::new(),
262 detached_offset: 0,
263 read_timeout,
264 stream_closed: false,
265 _completion: completion,
266 }
267 }
268}
269
270impl Read for InputStreamHandle {
271 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
272 if self.stream_closed {
273 return Err(io::Error::other(
274 "stream is terminated, no reads are possible",
275 ));
276 }
277 if buf.is_empty() {
278 return Ok(0);
279 }
280
281 let mut total = 0_usize;
282
283 if self.detached_offset < self.detached.len() {
285 let available = self.detached.len() - self.detached_offset;
286 let n = available.min(buf.len());
287 buf[..n]
288 .copy_from_slice(&self.detached[self.detached_offset..self.detached_offset + n]);
289 self.detached_offset += n;
290 total += n;
291 if self.detached_offset >= self.detached.len() {
292 self.detached.clear();
293 self.detached_offset = 0;
294 }
295 if total == buf.len() {
296 return Ok(total);
297 }
298 }
299
300 let mut state = self
301 .shared
302 .state
303 .lock()
304 .expect("input stream buffer poisoned");
305 loop {
306 while total < buf.len() {
308 if let Some(chunk) = state.chunks.pop_front() {
309 self.shared.space.notify_all();
310 drop(state);
311
312 let space = buf.len() - total;
313 let n = chunk.len().min(space);
314 buf[total..total + n].copy_from_slice(&chunk[..n]);
315 total += n;
316 if n < chunk.len() {
317 self.detached = chunk;
318 self.detached_offset = n;
319 }
320
321 state = self
323 .shared
324 .state
325 .lock()
326 .expect("input stream buffer poisoned");
327 continue;
328 }
329 break;
330 }
331
332 if total > 0 {
334 return Ok(total);
335 }
336
337 if let Some(terminal) = state.terminal.clone() {
339 return match terminal {
340 InputStreamTerminal::Complete => Ok(0),
341 InputStreamTerminal::Error(e) => {
342 Err(io::Error::other(format!("stream failed: {e}")))
343 }
344 };
345 }
346
347 let (new_state, timeout) = self
349 .shared
350 .available
351 .wait_timeout(state, self.read_timeout)
352 .expect("input stream buffer poisoned while waiting");
353 state = new_state;
354 if timeout.timed_out() && state.chunks.is_empty() && state.terminal.is_none() {
355 return Err(io::Error::new(
356 io::ErrorKind::TimedOut,
357 format!("timeout after {:?} waiting for new data", self.read_timeout),
358 ));
359 }
360 }
361 }
362}
363
364impl Drop for InputStreamHandle {
365 fn drop(&mut self) {
366 self.shared.cancelled.store(true, Ordering::SeqCst);
367 drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
368 self.shared.available.notify_all();
369 self.shared.space.notify_all();
370 }
371}
372
373#[derive(Clone)]
376#[allow(dead_code)]
377enum OutputStreamTerminal {
378 Complete,
379 Error(StreamError),
380}
381
382struct OutputStreamBufferState {
383 chunks: VecDeque<Vec<u8>>,
384 terminal: Option<OutputStreamTerminal>,
385}
386
387struct OutputStreamShared {
388 state: Mutex<OutputStreamBufferState>,
389 available: Condvar,
390 space: Condvar,
391 cancelled: AtomicBool,
392}
393
394impl OutputStreamShared {
395 fn new() -> Self {
396 Self {
397 state: Mutex::new(OutputStreamBufferState {
398 chunks: VecDeque::new(),
399 terminal: None,
400 }),
401 available: Condvar::new(),
402 space: Condvar::new(),
403 cancelled: AtomicBool::new(false),
404 }
405 }
406}
407
408struct OutputStreamSourceStream {
409 shared: Arc<OutputStreamShared>,
410 done: bool,
411}
412
413impl Iterator for OutputStreamSourceStream {
414 type Item = StreamResult<Vec<u8>>;
415
416 fn next(&mut self) -> Option<Self::Item> {
417 if self.done {
418 return None;
419 }
420
421 let mut state = self
422 .shared
423 .state
424 .lock()
425 .expect("output stream buffer poisoned");
426 loop {
427 if let Some(chunk) = state.chunks.pop_front() {
428 self.shared.space.notify_all();
429 return Some(Ok(chunk));
430 }
431
432 match &state.terminal {
433 Some(OutputStreamTerminal::Complete) => {
434 self.done = true;
435 return None;
436 }
437 Some(OutputStreamTerminal::Error(e)) => {
439 self.done = true;
440 return Some(Err(e.clone()));
441 }
442 None => {}
443 }
444
445 state = self
446 .shared
447 .available
448 .wait(state)
449 .expect("output stream buffer poisoned while waiting");
450 }
451 }
452}
453
454impl Drop for OutputStreamSourceStream {
455 fn drop(&mut self) {
456 self.shared.cancelled.store(true, Ordering::SeqCst);
457 drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
458 self.shared.available.notify_all();
459 self.shared.space.notify_all();
460 }
461}
462
463pub struct OutputStreamHandle {
470 shared: Arc<OutputStreamShared>,
471 write_timeout: Duration,
472 closed: AtomicBool,
473}
474
475impl OutputStreamHandle {
476 fn new(shared: Arc<OutputStreamShared>, write_timeout: Duration) -> Self {
477 Self {
478 shared,
479 write_timeout,
480 closed: AtomicBool::new(false),
481 }
482 }
483
484 pub fn close(&self) -> io::Result<()> {
489 self.closed.store(true, Ordering::SeqCst);
490 let mut state = self
491 .shared
492 .state
493 .lock()
494 .expect("output stream buffer poisoned");
495 if state.terminal.is_none() {
496 state.terminal = Some(OutputStreamTerminal::Complete);
497 }
498 drop(state);
499 self.shared.available.notify_all();
500 self.shared.space.notify_all();
501 Ok(())
502 }
503}
504
505impl Write for OutputStreamHandle {
506 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
507 if self.closed.load(Ordering::SeqCst) {
508 return Err(io::Error::new(
509 io::ErrorKind::BrokenPipe,
510 "stream is closed, no writes are possible",
511 ));
512 }
513 if buf.is_empty() {
514 return Ok(0);
515 }
516
517 let mut state = self
518 .shared
519 .state
520 .lock()
521 .expect("output stream buffer poisoned");
522 loop {
523 if self.closed.load(Ordering::SeqCst) || self.shared.cancelled.load(Ordering::SeqCst) {
524 return Err(io::Error::new(
525 io::ErrorKind::BrokenPipe,
526 "stream is closed, no writes are possible",
527 ));
528 }
529
530 if let Some(OutputStreamTerminal::Error(e)) = &state.terminal {
531 return Err(io::Error::other(format!("stream failed: {e}")));
532 }
533
534 if state.chunks.len() < OUTPUT_STREAM_BUFFER_CAPACITY {
535 state.chunks.push_back(buf.to_vec());
536 drop(state);
537 self.shared.available.notify_all();
538 return Ok(buf.len());
539 }
540
541 let (new_state, timeout) = self
542 .shared
543 .space
544 .wait_timeout(state, self.write_timeout)
545 .expect("output stream buffer poisoned while waiting");
546 state = new_state;
547 if timeout.timed_out()
548 && state.chunks.len() >= OUTPUT_STREAM_BUFFER_CAPACITY
549 && state.terminal.is_none()
550 {
551 return Err(io::Error::new(
552 io::ErrorKind::TimedOut,
553 format!(
554 "timed out trying to write data to stream after {:?}",
555 self.write_timeout
556 ),
557 ));
558 }
559 }
560 }
561
562 fn flush(&mut self) -> io::Result<()> {
563 Ok(())
564 }
565}
566
567impl Drop for OutputStreamHandle {
568 fn drop(&mut self) {
569 self.shared.cancelled.store(true, Ordering::SeqCst);
570 let _ = self.close();
571 }
572}
573
574pub struct StreamConverters;
582
583impl StreamConverters {
584 #[must_use]
591 pub fn from_reader<F, R>(factory: F, chunk_size: usize) -> Source<Vec<u8>>
592 where
593 F: Fn() -> std::io::Result<R> + Send + Sync + 'static,
594 R: Read + Send + 'static,
595 {
596 assert!(chunk_size > 0, "chunk size must be greater than zero");
597 Source::from_materialized_factory(move |materializer| {
598 let reader = factory().map_err(io_error)?;
599 let queue = SourceQueue::new();
600 let queue_for_worker = Arc::clone(&queue);
601 let cancelled = Arc::clone(&queue.cancelled);
602 let completion = materializer.spawn_stream(move |_worker_cancelled| {
603 let mut reader = reader;
604 let mut guard = ReaderWorkerGuard::new(Arc::clone(&queue_for_worker));
605 let mut buffer = vec![0_u8; chunk_size];
606
607 loop {
608 if cancelled.load(Ordering::SeqCst) {
609 guard.disarm();
610 return Ok(NotUsed);
611 }
612
613 match reader.read(&mut buffer) {
614 Ok(0) => {
615 guard.disarm();
616 queue_for_worker.finish(SourceTerminal::Complete);
617 return Ok(NotUsed);
618 }
619 Ok(read) => {
620 if !queue_for_worker.push(buffer[..read].to_vec()) {
621 guard.disarm();
622 return Ok(NotUsed);
623 }
624 }
625 Err(error) => {
626 guard.disarm();
627 queue_for_worker.finish(SourceTerminal::Error(io_error(error)));
628 return Ok(NotUsed);
629 }
630 }
631 }
632 });
633
634 Ok((
635 Box::new(ReaderSourceStream {
636 queue,
637 completion: Some(completion),
638 }) as BoxStream<Vec<u8>>,
639 NotUsed,
640 ))
641 })
642 }
643
644 #[must_use]
651 pub fn to_writer<F, W>(factory: F) -> Sink<Vec<u8>, StreamCompletion<NotUsed>>
652 where
653 F: Fn() -> std::io::Result<W> + Send + Sync + 'static,
654 W: Write + Send + 'static,
655 {
656 Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
657 let writer = WriterGuard::new(factory().map_err(io_error)?);
658 Ok(materializer.spawn_stream(move |cancelled| {
659 let mut input = input;
660 let mut writer = writer;
661 loop {
662 if cancelled.load(Ordering::SeqCst) {
663 let _ = writer.flush_once();
664 return Err(StreamError::Cancelled);
665 }
666
667 match input.next() {
668 Some(Ok(chunk)) => {
669 writer.writer_mut().write_all(&chunk).map_err(io_error)?
670 }
671 Some(Err(error)) => {
672 let _ = writer.flush_once();
673 return Err(error);
674 }
675 None => {
676 writer.flush_once()?;
677 return Ok(NotUsed);
678 }
679 }
680 }
681 }))
682 })
683 }
684
685 #[must_use]
699 pub fn as_input_stream(read_timeout: Duration) -> Sink<Vec<u8>, InputStreamHandle> {
700 Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
701 let shared = Arc::new(InputStreamShared::new());
702 let shared_for_worker = Arc::clone(&shared);
703
704 let completion = materializer.spawn_stream(move |_task_cancelled| {
705 let mut input = input;
706 loop {
707 if shared_for_worker.cancelled.load(Ordering::SeqCst) {
708 return Ok(NotUsed);
709 }
710
711 match input.next() {
712 Some(Ok(chunk)) => {
713 let mut state = shared_for_worker
714 .state
715 .lock()
716 .expect("input stream buffer poisoned");
717 while state.chunks.len() >= INPUT_STREAM_BUFFER_CAPACITY
718 && state.terminal.is_none()
719 && !shared_for_worker.cancelled.load(Ordering::SeqCst)
720 {
721 state = shared_for_worker
722 .space
723 .wait(state)
724 .expect("input stream buffer poisoned while waiting");
725 }
726
727 if state.terminal.is_some()
728 || shared_for_worker.cancelled.load(Ordering::SeqCst)
729 {
730 return Ok(NotUsed);
731 }
732
733 if !chunk.is_empty() {
734 state.chunks.push_back(chunk);
735 }
736 drop(state);
737 shared_for_worker.available.notify_all();
738 }
739 Some(Err(e)) => {
740 shared_for_worker.set_terminal(InputStreamTerminal::Error(e));
741 return Ok(NotUsed);
742 }
743 None => {
744 shared_for_worker.set_terminal(InputStreamTerminal::Complete);
745 return Ok(NotUsed);
746 }
747 }
748 }
749 });
750
751 Ok(InputStreamHandle::new(shared, read_timeout, completion))
752 })
753 }
754
755 #[must_use]
769 pub fn as_output_stream(write_timeout: Duration) -> Source<Vec<u8>, OutputStreamHandle> {
770 Source::from_materialized_factory(move |_materializer| {
771 let shared = Arc::new(OutputStreamShared::new());
772 let handle = OutputStreamHandle::new(Arc::clone(&shared), write_timeout);
773 let stream = OutputStreamSourceStream {
774 shared,
775 done: false,
776 };
777 Ok((Box::new(stream) as BoxStream<Vec<u8>>, handle))
778 })
779 }
780}
781
782pub struct FileIO;
785
786impl FileIO {
787 #[must_use]
790 pub fn from_path(path: impl Into<PathBuf>, chunk_size: usize) -> Source<Vec<u8>> {
791 let path = path.into();
792 StreamConverters::from_reader(move || File::open(&path), chunk_size)
793 }
794
795 #[must_use]
797 pub fn from_path_default(path: impl Into<PathBuf>) -> Source<Vec<u8>> {
798 Self::from_path(path, DEFAULT_CHUNK_SIZE)
799 }
800
801 #[must_use]
804 pub fn to_path(path: impl Into<PathBuf>) -> Sink<Vec<u8>, StreamCompletion<NotUsed>> {
805 let path = path.into();
806 StreamConverters::to_writer(move || {
807 OpenOptions::new()
808 .create(true)
809 .truncate(true)
810 .write(true)
811 .open(&path)
812 })
813 }
814}
815
816#[cfg(test)]
817mod tests {
818 use super::*;
819 use crate::Source;
820 use crate::testkit::TestSink;
821 use std::io::Cursor;
822 use std::sync::atomic::{AtomicU64, AtomicUsize};
823 use std::thread;
824 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
825
826 fn wait_for_counter(counter: &AtomicUsize, expected: usize) {
827 let deadline = std::time::Instant::now() + Duration::from_secs(1);
828 while std::time::Instant::now() < deadline {
829 if counter.load(Ordering::SeqCst) == expected {
830 return;
831 }
832 thread::sleep(Duration::from_millis(5));
833 }
834 assert_eq!(counter.load(Ordering::SeqCst), expected);
835 }
836
837 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
838 let deadline = Instant::now() + timeout;
839 while Instant::now() < deadline {
840 if condition() {
841 return true;
842 }
843 thread::sleep(Duration::from_millis(5));
844 }
845 condition()
846 }
847
848 fn unique_temp_path(name: &str) -> PathBuf {
849 let nanos = SystemTime::now()
850 .duration_since(UNIX_EPOCH)
851 .expect("clock after epoch")
852 .as_nanos();
853 std::env::temp_dir().join(format!(
854 "datum-wp12-{name}-{}-{nanos}.bin",
855 std::process::id()
856 ))
857 }
858
859 struct CountingReader {
860 inner: Cursor<Vec<u8>>,
861 drops: Arc<AtomicUsize>,
862 }
863
864 impl Read for CountingReader {
865 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
866 self.inner.read(buf)
867 }
868 }
869
870 impl Drop for CountingReader {
871 fn drop(&mut self) {
872 self.drops.fetch_add(1, Ordering::SeqCst);
873 }
874 }
875
876 struct CountingWriter {
877 writes: Arc<Mutex<Vec<Vec<u8>>>>,
878 flushes: Arc<AtomicUsize>,
879 drops: Arc<AtomicUsize>,
880 fail_write: bool,
881 }
882
883 impl Write for CountingWriter {
884 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
885 if self.fail_write {
886 return Err(std::io::Error::other("writer boom"));
887 }
888 self.writes
889 .lock()
890 .expect("writer log poisoned")
891 .push(buf.to_vec());
892 Ok(buf.len())
893 }
894
895 fn flush(&mut self) -> std::io::Result<()> {
896 self.flushes.fetch_add(1, Ordering::SeqCst);
897 Ok(())
898 }
899 }
900
901 impl Drop for CountingWriter {
902 fn drop(&mut self) {
903 self.drops.fetch_add(1, Ordering::SeqCst);
904 }
905 }
906
907 struct CountingChunkReader {
908 inner: Cursor<Vec<u8>>,
909 chunk_size: usize,
910 reads: Arc<AtomicUsize>,
911 }
912
913 impl Read for CountingChunkReader {
914 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
915 self.reads.fetch_add(1, Ordering::SeqCst);
916 let mut chunk = vec![0_u8; self.chunk_size.min(buf.len())];
917 let read = self.inner.read(&mut chunk)?;
918 buf[..read].copy_from_slice(&chunk[..read]);
919 Ok(read)
920 }
921 }
922
923 #[test]
924 fn from_reader_emits_chunked_bytes_and_completes() {
925 let sink = StreamConverters::from_reader(|| Ok(Cursor::new(b"abcdef".to_vec())), 2)
926 .run_with(TestSink::probe())
927 .expect("reader source materializes");
928
929 sink.request(4);
930 sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec(), b"ef".to_vec()]);
931 sink.expect_complete();
932 }
933
934 #[test]
935 fn from_reader_closes_exactly_once_on_completion() {
936 let drops = Arc::new(AtomicUsize::new(0));
937 let drops_for_reader = Arc::clone(&drops);
938 let sink = StreamConverters::from_reader(
939 move || {
940 Ok(CountingReader {
941 inner: Cursor::new(b"hello".to_vec()),
942 drops: Arc::clone(&drops_for_reader),
943 })
944 },
945 8,
946 )
947 .run_with(TestSink::probe())
948 .expect("reader source materializes");
949
950 sink.request(2);
951 sink.assert_next(b"hello".to_vec());
952 sink.expect_complete();
953 wait_for_counter(&drops, 1);
954 }
955
956 #[test]
957 fn from_reader_closes_exactly_once_on_cancellation() {
958 let drops = Arc::new(AtomicUsize::new(0));
959 let drops_for_reader = Arc::clone(&drops);
960 let mut sink = StreamConverters::from_reader(
961 move || {
962 Ok(CountingReader {
963 inner: Cursor::new(vec![1_u8; 32]),
964 drops: Arc::clone(&drops_for_reader),
965 })
966 },
967 4,
968 )
969 .run_with(TestSink::probe())
970 .expect("reader source materializes");
971
972 sink.request(1);
973 sink.assert_next(vec![1_u8; 4]);
974 sink.cancel();
975 wait_for_counter(&drops, 1);
976 }
977
978 #[test]
979 fn from_reader_surfaces_read_failure() {
980 struct FailingReader;
981
982 impl Read for FailingReader {
983 fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
984 Err(std::io::Error::other("reader boom"))
985 }
986 }
987
988 let sink = StreamConverters::from_reader(|| Ok(FailingReader), 8)
989 .run_with(TestSink::probe())
990 .expect("reader source materializes");
991
992 sink.request(1);
993 assert_eq!(
994 sink.expect_error(),
995 StreamError::Failed("reader boom".to_owned())
996 );
997 }
998
999 #[test]
1000 fn from_reader_caps_buffered_read_ahead_for_slow_consumers() {
1001 let payload: Vec<u8> = (0..64_u8).cycle().take(8192 * 16).collect();
1002 let payload_for_reader = payload.clone();
1003 let reads = Arc::new(AtomicUsize::new(0));
1004 let reads_for_reader = Arc::clone(&reads);
1005 let sink = StreamConverters::from_reader(
1006 move || {
1007 Ok(CountingChunkReader {
1008 inner: Cursor::new(payload_for_reader.clone()),
1009 chunk_size: 256,
1010 reads: Arc::clone(&reads_for_reader),
1011 })
1012 },
1013 256,
1014 )
1015 .run_with(TestSink::probe())
1016 .expect("reader source materializes");
1017
1018 sink.request(1);
1019 let first = sink.expect_next();
1020 assert_eq!(first.len(), 256);
1021
1022 let last_seen = Arc::new(AtomicUsize::new(0));
1023 let quiet_since_ms = Arc::new(AtomicU64::new(0));
1024 let start = Instant::now();
1025 assert!(wait_until(Duration::from_secs(2), {
1026 let last_seen = Arc::clone(&last_seen);
1027 let quiet_since_ms = Arc::clone(&quiet_since_ms);
1028 let reads = Arc::clone(&reads);
1029 move || {
1030 let current = reads.load(Ordering::SeqCst);
1031 let last = last_seen.load(Ordering::SeqCst);
1032 if current != last {
1033 last_seen.store(current, Ordering::SeqCst);
1034 quiet_since_ms.store(start.elapsed().as_millis() as u64, Ordering::SeqCst);
1035 return false;
1036 }
1037
1038 let quiet_for =
1039 start.elapsed().as_millis() as u64 - quiet_since_ms.load(Ordering::SeqCst);
1040 current > 0 && quiet_for >= 100
1041 }
1042 }));
1043
1044 assert!(
1045 reads.load(Ordering::SeqCst) <= READER_QUEUE_CAPACITY + 2,
1046 "reader should plateau near the bounded queue capacity"
1047 );
1048
1049 sink.request(usize::MAX);
1050 let mut collected = first;
1051 for chunk in sink.drain_until_complete() {
1052 collected.extend_from_slice(&chunk);
1053 }
1054 assert_eq!(collected, payload);
1055 }
1056
1057 #[test]
1058 fn to_writer_writes_all_chunks_and_flushes_once() {
1059 let writes = Arc::new(Mutex::new(Vec::new()));
1060 let flushes = Arc::new(AtomicUsize::new(0));
1061 let drops = Arc::new(AtomicUsize::new(0));
1062 let completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
1063 .run_with(StreamConverters::to_writer({
1064 let writes = Arc::clone(&writes);
1065 let flushes = Arc::clone(&flushes);
1066 let drops = Arc::clone(&drops);
1067 move || {
1068 Ok(CountingWriter {
1069 writes: Arc::clone(&writes),
1070 flushes: Arc::clone(&flushes),
1071 drops: Arc::clone(&drops),
1072 fail_write: false,
1073 })
1074 }
1075 }))
1076 .expect("writer sink materializes");
1077
1078 completion.wait().expect("writer sink completes");
1079 assert_eq!(
1080 writes.lock().expect("writes poisoned").as_slice(),
1081 &[b"ab".to_vec(), b"cd".to_vec()]
1082 );
1083 assert_eq!(flushes.load(Ordering::SeqCst), 1);
1084 assert_eq!(drops.load(Ordering::SeqCst), 1);
1085 }
1086
1087 #[test]
1088 fn to_writer_flushes_and_drops_once_on_failure() {
1089 let flushes = Arc::new(AtomicUsize::new(0));
1090 let drops = Arc::new(AtomicUsize::new(0));
1091 let completion = Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
1092 .run_with(StreamConverters::to_writer({
1093 let flushes = Arc::clone(&flushes);
1094 let drops = Arc::clone(&drops);
1095 move || {
1096 Ok(CountingWriter {
1097 writes: Arc::new(Mutex::new(Vec::new())),
1098 flushes: Arc::clone(&flushes),
1099 drops: Arc::clone(&drops),
1100 fail_write: false,
1101 })
1102 }
1103 }))
1104 .expect("writer sink materializes");
1105
1106 assert_eq!(
1107 completion.wait(),
1108 Err(StreamError::Failed("upstream boom".to_owned()))
1109 );
1110 assert_eq!(flushes.load(Ordering::SeqCst), 1);
1111 assert_eq!(drops.load(Ordering::SeqCst), 1);
1112 }
1113
1114 #[test]
1115 fn to_writer_flushes_and_drops_once_on_cancellation() {
1116 let flushes = Arc::new(AtomicUsize::new(0));
1117 let drops = Arc::new(AtomicUsize::new(0));
1118 let completion = Source::repeat(vec![7_u8; 4])
1119 .run_with(StreamConverters::to_writer({
1120 let flushes = Arc::clone(&flushes);
1121 let drops = Arc::clone(&drops);
1122 move || {
1123 Ok(CountingWriter {
1124 writes: Arc::new(Mutex::new(Vec::new())),
1125 flushes: Arc::clone(&flushes),
1126 drops: Arc::clone(&drops),
1127 fail_write: false,
1128 })
1129 }
1130 }))
1131 .expect("writer sink materializes");
1132
1133 drop(completion);
1134 wait_for_counter(&flushes, 1);
1135 wait_for_counter(&drops, 1);
1136 }
1137
1138 #[test]
1139 fn to_writer_surfaces_write_failure() {
1140 let completion = Source::single(vec![1_u8])
1141 .run_with(StreamConverters::to_writer(|| {
1142 Ok(CountingWriter {
1143 writes: Arc::new(Mutex::new(Vec::new())),
1144 flushes: Arc::new(AtomicUsize::new(0)),
1145 drops: Arc::new(AtomicUsize::new(0)),
1146 fail_write: true,
1147 })
1148 }))
1149 .expect("writer sink materializes");
1150
1151 assert_eq!(
1152 completion.wait(),
1153 Err(StreamError::Failed("writer boom".to_owned()))
1154 );
1155 }
1156
1157 #[test]
1158 fn file_io_round_trips_bytes() {
1159 let path = unique_temp_path("roundtrip");
1160 let write_completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
1161 .run_with(FileIO::to_path(path.clone()))
1162 .expect("file sink materializes");
1163 write_completion.wait().expect("file write completes");
1164
1165 let sink = FileIO::from_path(path.clone(), 2)
1166 .run_with(TestSink::probe())
1167 .expect("file source materializes");
1168 sink.request(4);
1169 sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
1170 sink.expect_complete();
1171
1172 std::fs::remove_file(path).expect("remove roundtrip file");
1173 }
1174
1175 #[test]
1176 fn file_io_source_surfaces_open_failure() {
1177 let missing = unique_temp_path("missing");
1178 let result = FileIO::from_path(missing, 4).run_with(TestSink::probe());
1179 assert!(matches!(result, Err(StreamError::Failed(_))));
1180 }
1181
1182 #[test]
1183 fn file_io_sink_creates_and_truncates_file() {
1184 let path = unique_temp_path("truncate");
1185 std::fs::write(&path, b"stale bytes").expect("seed file");
1186
1187 let completion = Source::single(b"ok".to_vec())
1188 .run_with(FileIO::to_path(path.clone()))
1189 .expect("file sink materializes");
1190 completion.wait().expect("file write completes");
1191 assert_eq!(std::fs::read(&path).expect("read file"), b"ok");
1192
1193 std::fs::remove_file(path).expect("remove truncate file");
1194 }
1195
1196 #[test]
1197 fn file_io_source_default_chunk_size_works() {
1198 let path = unique_temp_path("default");
1199 std::fs::write(&path, b"hi").expect("write seed file");
1200
1201 let sink = FileIO::from_path_default(path.clone())
1202 .run_with(TestSink::probe())
1203 .expect("file source materializes");
1204 sink.request(2);
1205 sink.assert_next(b"hi".to_vec());
1206 sink.expect_complete();
1207
1208 std::fs::remove_file(path).expect("remove default file");
1209 }
1210
1211 #[test]
1214 fn as_input_stream_reads_data_written_by_stream() {
1215 let mut handle: InputStreamHandle =
1216 Source::from_iter([b"hello".to_vec(), b"world".to_vec()])
1217 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1218 .expect("input stream sink materializes");
1219
1220 let mut buf = [0_u8; 32];
1224 let mut total = 0_usize;
1225 loop {
1226 let n = handle.read(&mut buf[total..]).expect("read succeeds");
1227 if n == 0 {
1228 break;
1229 }
1230 total += n;
1231 }
1232 assert_eq!(&buf[..total], b"helloworld");
1233 }
1234
1235 #[test]
1236 fn as_input_stream_eof_when_stream_completes() {
1237 let mut handle: InputStreamHandle = Source::from_iter([b"x".to_vec()])
1238 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1239 .expect("input stream sink materializes");
1240
1241 let mut buf = [0_u8; 4];
1242 let n = handle.read(&mut buf).expect("first read succeeds");
1243 assert_eq!(&buf[..n], b"x");
1244
1245 let n = handle.read(&mut buf).expect("second read returns eof");
1247 assert_eq!(n, 0);
1248 }
1249
1250 #[test]
1251 fn as_input_stream_partial_reads_work() {
1252 let mut handle: InputStreamHandle = Source::single(b"abcde".to_vec())
1253 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1254 .expect("input stream sink materializes");
1255
1256 let mut small = [0_u8; 2];
1257 let n = handle.read(&mut small).expect("small read succeeds");
1258 assert_eq!(n, 2);
1259 assert_eq!(&small[..], b"ab");
1260
1261 let n = handle.read(&mut small).expect("second small read succeeds");
1263 assert_eq!(n, 2);
1264 assert_eq!(&small[..], b"cd");
1265
1266 let n = handle.read(&mut small).expect("third small read succeeds");
1268 assert_eq!(n, 1);
1269 assert_eq!(&small[..1], b"e");
1270
1271 let n = handle.read(&mut small).expect("fourth read returns eof");
1273 assert_eq!(n, 0);
1274 }
1275
1276 #[test]
1277 fn as_input_stream_error_surfaces_as_io_error() {
1278 let mut handle: InputStreamHandle =
1279 Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
1280 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1281 .expect("input stream sink materializes");
1282
1283 let mut buf = [0_u8; 8];
1284 let err = handle.read(&mut buf).expect_err("read surfaces error");
1285 let msg = err.to_string();
1286 assert!(msg.contains("upstream boom"), "got: {msg}");
1287 }
1288
1289 #[test]
1290 fn as_input_stream_cancellation_stops_reads() {
1291 let mut handle: InputStreamHandle = Source::repeat(b"x".to_vec())
1292 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1293 .expect("input stream sink materializes");
1294
1295 let mut buf = [0_u8; 3];
1296 let n = handle.read(&mut buf).expect("first read succeeds");
1297 assert!((1..=3).contains(&n), "expected 1..=3 bytes, got {n}");
1301
1302 drop(handle);
1303
1304 }
1307
1308 #[test]
1309 fn as_input_stream_read_timeout_returns_timed_out() {
1310 let mut handle: InputStreamHandle = Source::<Vec<u8>>::never()
1312 .run_with(StreamConverters::as_input_stream(Duration::from_millis(10)))
1313 .expect("input stream sink materializes");
1314
1315 let mut buf = [0_u8; 4];
1316 let err = handle.read(&mut buf).expect_err("read times out");
1317 assert_eq!(err.kind(), io::ErrorKind::TimedOut);
1318 }
1319
1320 #[test]
1321 fn as_output_stream_writes_data_appear_in_stream() {
1322 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1323 .to_mat(Sink::collect(), crate::Keep::both)
1324 .run()
1325 .expect("output stream source materializes");
1326
1327 handle.write_all(b"alpha").expect("first write succeeds");
1328 handle.write_all(b"beta").expect("second write succeeds");
1329 handle.close().expect("close succeeds");
1330
1331 let chunks = completion.wait().expect("stream completes");
1332 assert_eq!(chunks, vec![b"alpha".to_vec(), b"beta".to_vec()]);
1333 }
1334
1335 #[test]
1336 fn as_output_stream_close_completes_stream() {
1337 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1338 .to_mat(Sink::collect(), crate::Keep::both)
1339 .run()
1340 .expect("output stream source materializes");
1341
1342 handle.write_all(b"done").expect("write succeeds");
1343 let result = handle.close();
1344 assert!(result.is_ok());
1345
1346 let chunks = completion.wait().expect("stream completes after close");
1347 assert_eq!(chunks, vec![b"done".to_vec()]);
1348 }
1349
1350 #[test]
1351 fn as_output_stream_write_after_close_fails() {
1352 let (mut handle, _completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1353 .to_mat(Sink::ignore(), crate::Keep::both)
1354 .run()
1355 .expect("output stream source materializes");
1356
1357 handle.close().expect("first close succeeds");
1358 let err = handle.write(b"late").expect_err("write after close fails");
1359 assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
1360 }
1361
1362 #[test]
1363 fn as_output_stream_cancellation_stops_writes() {
1364 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1365 .to_mat(Sink::ignore(), crate::Keep::both)
1366 .run()
1367 .expect("output stream source materializes");
1368
1369 handle.write_all(b"ok").expect("write succeeds");
1370
1371 drop(completion);
1373
1374 let deadline = std::time::Instant::now() + Duration::from_secs(1);
1376 let mut last_err = None;
1377 while std::time::Instant::now() < deadline {
1378 match handle.write(b"after cancel") {
1379 Err(e) => {
1380 last_err = Some(e);
1381 break;
1382 }
1383 _ => thread::sleep(Duration::from_millis(5)),
1384 }
1385 }
1386 let err = last_err.expect("write after cancellation should fail");
1387 assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
1388 }
1389
1390 #[test]
1391 fn as_output_stream_write_timeout_returns_timed_out() {
1392 let hang_sink: Sink<Vec<u8>, StreamCompletion<NotUsed>> =
1395 Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
1396 Ok(materializer.spawn_stream(move |cancelled| {
1397 let _input = input;
1398 loop {
1399 if cancelled.load(Ordering::SeqCst) {
1400 return Ok(NotUsed);
1401 }
1402 thread::sleep(Duration::from_millis(1));
1403 }
1404 }))
1405 });
1406 let (mut handle, _hang_completion) =
1407 StreamConverters::as_output_stream(Duration::from_millis(50))
1408 .to_mat(hang_sink, crate::Keep::both)
1409 .run()
1410 .expect("output stream source materializes");
1411
1412 let capacity = 16_usize;
1413
1414 for _ in 0..capacity {
1416 handle.write_all(b"x").expect("buffer-fill write succeeds");
1417 }
1418
1419 let err = handle.write(b"overflow").expect_err("write times out");
1421 assert_eq!(err.kind(), io::ErrorKind::TimedOut);
1422 }
1423
1424 #[test]
1425 fn as_output_stream_flush_is_noop() {
1426 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1427 .to_mat(Sink::collect(), crate::Keep::both)
1428 .run()
1429 .expect("output stream source materializes");
1430
1431 handle.write_all(b"data").expect("write succeeds");
1432 handle.flush().expect("flush is a noop");
1433 handle.close().expect("close succeeds");
1434
1435 let chunks = completion.wait().expect("stream completes");
1436 assert_eq!(chunks, vec![b"data".to_vec()]);
1437 }
1438
1439 #[test]
1440 fn as_output_stream_drop_completes_stream() {
1441 let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1442 .to_mat(Sink::collect(), crate::Keep::both)
1443 .run()
1444 .expect("output stream source materializes");
1445
1446 handle.write_all(b"drop-me").expect("write succeeds");
1447 drop(handle);
1448
1449 let chunks = completion.wait().expect("stream completes after drop");
1450 assert_eq!(chunks, vec![b"drop-me".to_vec()]);
1451 }
1452
1453 #[test]
1454 fn round_trip_output_stream_to_input_stream() {
1455 let (mut out_handle, mut in_handle): (OutputStreamHandle, InputStreamHandle) =
1458 StreamConverters::as_output_stream(Duration::from_secs(5))
1459 .to_mat(
1460 StreamConverters::as_input_stream(Duration::from_secs(5)),
1461 crate::Keep::both,
1462 )
1463 .run()
1464 .expect("round-trip stream materializes");
1465
1466 out_handle.write_all(b"round").expect("write round");
1467 out_handle.write_all(b"trip").expect("write trip");
1468 out_handle.close().expect("close output");
1469
1470 let mut buf = [0_u8; 16];
1471 let mut total = 0_usize;
1472 loop {
1473 let n = in_handle.read(&mut buf[total..]).expect("read");
1474 if n == 0 {
1475 break;
1476 }
1477 total += n;
1478 }
1479 assert_eq!(&buf[..total], b"roundtrip");
1480 }
1481
1482 #[test]
1483 fn as_input_stream_empty_buf_read_returns_zero() {
1484 let mut handle: InputStreamHandle = Source::single(b"abc".to_vec())
1485 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1486 .expect("input stream sink materializes");
1487
1488 let n = handle.read(&mut []).expect("empty read succeeds");
1489 assert_eq!(n, 0);
1490 }
1491
1492 #[test]
1493 fn as_input_stream_large_read_across_multiple_chunks() {
1494 let chunks: Vec<Vec<u8>> = (0..10).map(|i| vec![i as u8; 3]).collect();
1497 let total_bytes: usize = chunks.iter().map(|c| c.len()).sum();
1498
1499 let mut handle: InputStreamHandle = Source::from_iter(chunks)
1500 .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1501 .expect("input stream sink materializes");
1502
1503 let mut buf = vec![0_u8; total_bytes];
1504 let mut total = 0_usize;
1505 loop {
1506 let n = handle.read(&mut buf[total..]).expect("large read succeeds");
1507 if n == 0 {
1508 break;
1509 }
1510 total += n;
1511 }
1512 assert_eq!(total, total_bytes);
1513 }
1514}