Skip to main content

datum/io/
adapters.rs

1use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
2use crate::{StreamError, StreamResult};
3use std::collections::VecDeque;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::path::PathBuf;
7use std::sync::{
8    Arc, Condvar, Mutex,
9    atomic::{AtomicBool, Ordering},
10};
11use std::time::Duration;
12
13const DEFAULT_CHUNK_SIZE: usize = 8192;
14const READER_QUEUE_CAPACITY: usize = 8;
15const INPUT_STREAM_BUFFER_CAPACITY: usize = 16;
16const OUTPUT_STREAM_BUFFER_CAPACITY: usize = 16;
17
18fn io_error(error: std::io::Error) -> StreamError {
19    StreamError::Failed(error.to_string())
20}
21
22#[derive(Clone)]
23enum SourceTerminal {
24    Complete,
25    Error(StreamError),
26}
27
28struct SourceQueueState {
29    queue: VecDeque<Vec<u8>>,
30    terminal: Option<SourceTerminal>,
31}
32
33struct SourceQueue {
34    state: Mutex<SourceQueueState>,
35    available: Condvar,
36    space: Condvar,
37    capacity: usize,
38    cancelled: Arc<AtomicBool>,
39}
40
41impl SourceQueue {
42    fn new() -> Arc<Self> {
43        Arc::new(Self {
44            state: Mutex::new(SourceQueueState {
45                queue: VecDeque::new(),
46                terminal: None,
47            }),
48            available: Condvar::new(),
49            space: Condvar::new(),
50            capacity: READER_QUEUE_CAPACITY,
51            cancelled: Arc::new(AtomicBool::new(false)),
52        })
53    }
54
55    fn push(&self, chunk: Vec<u8>) -> bool {
56        let mut state = self.state.lock().expect("io source queue poisoned");
57        while state.queue.len() >= self.capacity
58            && state.terminal.is_none()
59            && !self.cancelled.load(Ordering::SeqCst)
60        {
61            state = self
62                .space
63                .wait(state)
64                .expect("io source queue poisoned while waiting for space");
65        }
66
67        if state.terminal.is_some() || self.cancelled.load(Ordering::SeqCst) {
68            return false;
69        }
70
71        if state.terminal.is_none() {
72            state.queue.push_back(chunk);
73        }
74        drop(state);
75        self.available.notify_all();
76        true
77    }
78
79    fn finish(&self, terminal: SourceTerminal) {
80        let mut state = self.state.lock().expect("io source queue poisoned");
81        if state.terminal.is_none() {
82            state.terminal = Some(terminal);
83        }
84        drop(state);
85        self.available.notify_all();
86        self.space.notify_all();
87    }
88}
89
90struct ReaderWorkerGuard {
91    queue: Arc<SourceQueue>,
92    armed: bool,
93}
94
95impl ReaderWorkerGuard {
96    fn new(queue: Arc<SourceQueue>) -> Self {
97        Self { queue, armed: true }
98    }
99
100    fn disarm(&mut self) {
101        self.armed = false;
102    }
103}
104
105impl Drop for ReaderWorkerGuard {
106    fn drop(&mut self) {
107        if self.armed {
108            self.queue
109                .finish(SourceTerminal::Error(StreamError::AbruptTermination));
110        }
111    }
112}
113
114struct ReaderSourceStream {
115    queue: Arc<SourceQueue>,
116    completion: Option<StreamCompletion<NotUsed>>,
117}
118
119impl Iterator for ReaderSourceStream {
120    type Item = StreamResult<Vec<u8>>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        let mut state = self.queue.state.lock().expect("io source queue poisoned");
124        loop {
125            if let Some(chunk) = state.queue.pop_front() {
126                self.queue.space.notify_all();
127                return Some(Ok(chunk));
128            }
129            if let Some(terminal) = state.terminal.clone() {
130                return match terminal {
131                    SourceTerminal::Complete => None,
132                    SourceTerminal::Error(error) => Some(Err(error)),
133                };
134            }
135            state = self
136                .queue
137                .available
138                .wait(state)
139                .expect("io source queue poisoned while waiting");
140        }
141    }
142}
143
144impl Drop for ReaderSourceStream {
145    fn drop(&mut self) {
146        self.queue.cancelled.store(true, Ordering::SeqCst);
147        // Take and release the state lock before notifying: a producer that
148        // already read `cancelled == false` under the lock is then guaranteed
149        // to be parked inside `space.wait` before the notification fires.
150        // Without this the wake-up can land in that gap and be lost, parking
151        // the reader worker forever (no later notifier exists once the
152        // consumer is gone). `unwrap_or_else` instead of `expect`: panicking
153        // in Drop during unwind would abort.
154        drop(self.queue.state.lock().unwrap_or_else(|p| p.into_inner()));
155        self.queue.available.notify_all();
156        self.queue.space.notify_all();
157        let _ = self.completion.take();
158    }
159}
160
161struct WriterGuard<W: Write> {
162    writer: W,
163    flushed: bool,
164}
165
166impl<W: Write> WriterGuard<W> {
167    fn new(writer: W) -> Self {
168        Self {
169            writer,
170            flushed: false,
171        }
172    }
173
174    fn writer_mut(&mut self) -> &mut W {
175        &mut self.writer
176    }
177
178    fn flush_once(&mut self) -> StreamResult<()> {
179        if self.flushed {
180            return Ok(());
181        }
182        self.writer.flush().map_err(io_error)?;
183        self.flushed = true;
184        Ok(())
185    }
186}
187
188impl<W: Write> Drop for WriterGuard<W> {
189    fn drop(&mut self) {
190        let _ = self.flush_once();
191    }
192}
193
194// ── as_input_stream ──────────────────────────────────────────────────────────
195
196#[derive(Clone)]
197enum InputStreamTerminal {
198    Complete,
199    Error(StreamError),
200}
201
202struct InputStreamBufferState {
203    chunks: VecDeque<Vec<u8>>,
204    terminal: Option<InputStreamTerminal>,
205}
206
207struct InputStreamShared {
208    state: Mutex<InputStreamBufferState>,
209    available: Condvar,
210    space: Condvar,
211    cancelled: AtomicBool,
212}
213
214impl InputStreamShared {
215    fn new() -> Self {
216        Self {
217            state: Mutex::new(InputStreamBufferState {
218                chunks: VecDeque::new(),
219                terminal: None,
220            }),
221            available: Condvar::new(),
222            space: Condvar::new(),
223            cancelled: AtomicBool::new(false),
224        }
225    }
226
227    fn set_terminal(&self, terminal: InputStreamTerminal) {
228        let mut state = self.state.lock().expect("input stream buffer poisoned");
229        if state.terminal.is_none() {
230            state.terminal = Some(terminal);
231        }
232        drop(state);
233        self.available.notify_all();
234        self.space.notify_all();
235    }
236}
237
238/// A blocking `std::io::Read` handle materialized by [`StreamConverters::as_input_stream`].
239///
240/// This handle bridges a Datum byte stream (`Sink<Vec<u8>>`) into synchronous, blocking
241/// read calls. It matches Akka's `InputStream` semantics: partial reads are supported,
242/// EOF is `Ok(0)` when the stream completes, stream errors surface as `io::Error`, and
243/// the `read_timeout` bounds how long `read()` waits for new data.
244pub struct InputStreamHandle {
245    shared: Arc<InputStreamShared>,
246    detached: Vec<u8>,
247    detached_offset: usize,
248    read_timeout: Duration,
249    stream_closed: bool,
250    _completion: StreamCompletion<NotUsed>,
251}
252
253impl InputStreamHandle {
254    fn new(
255        shared: Arc<InputStreamShared>,
256        read_timeout: Duration,
257        completion: StreamCompletion<NotUsed>,
258    ) -> Self {
259        Self {
260            shared,
261            detached: Vec::new(),
262            detached_offset: 0,
263            read_timeout,
264            stream_closed: false,
265            _completion: completion,
266        }
267    }
268}
269
270impl Read for InputStreamHandle {
271    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
272        if self.stream_closed {
273            return Err(io::Error::other(
274                "stream is terminated, no reads are possible",
275            ));
276        }
277        if buf.is_empty() {
278            return Ok(0);
279        }
280
281        let mut total = 0_usize;
282
283        // Serve from detached chunk first
284        if self.detached_offset < self.detached.len() {
285            let available = self.detached.len() - self.detached_offset;
286            let n = available.min(buf.len());
287            buf[..n]
288                .copy_from_slice(&self.detached[self.detached_offset..self.detached_offset + n]);
289            self.detached_offset += n;
290            total += n;
291            if self.detached_offset >= self.detached.len() {
292                self.detached.clear();
293                self.detached_offset = 0;
294            }
295            if total == buf.len() {
296                return Ok(total);
297            }
298        }
299
300        let mut state = self
301            .shared
302            .state
303            .lock()
304            .expect("input stream buffer poisoned");
305        loop {
306            // Drain as many chunks as possible without blocking
307            while total < buf.len() {
308                if let Some(chunk) = state.chunks.pop_front() {
309                    self.shared.space.notify_all();
310                    drop(state);
311
312                    let space = buf.len() - total;
313                    let n = chunk.len().min(space);
314                    buf[total..total + n].copy_from_slice(&chunk[..n]);
315                    total += n;
316                    if n < chunk.len() {
317                        self.detached = chunk;
318                        self.detached_offset = n;
319                    }
320
321                    // Re-acquire lock for next iteration
322                    state = self
323                        .shared
324                        .state
325                        .lock()
326                        .expect("input stream buffer poisoned");
327                    continue;
328                }
329                break;
330            }
331
332            // If we have data, return it
333            if total > 0 {
334                return Ok(total);
335            }
336
337            // Check terminal
338            if let Some(terminal) = state.terminal.clone() {
339                return match terminal {
340                    InputStreamTerminal::Complete => Ok(0),
341                    InputStreamTerminal::Error(e) => {
342                        Err(io::Error::other(format!("stream failed: {e}")))
343                    }
344                };
345            }
346
347            // Wait for new data
348            let (new_state, timeout) = self
349                .shared
350                .available
351                .wait_timeout(state, self.read_timeout)
352                .expect("input stream buffer poisoned while waiting");
353            state = new_state;
354            if timeout.timed_out() && state.chunks.is_empty() && state.terminal.is_none() {
355                return Err(io::Error::new(
356                    io::ErrorKind::TimedOut,
357                    format!("timeout after {:?} waiting for new data", self.read_timeout),
358                ));
359            }
360        }
361    }
362}
363
364impl Drop for InputStreamHandle {
365    fn drop(&mut self) {
366        self.shared.cancelled.store(true, Ordering::SeqCst);
367        drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
368        self.shared.available.notify_all();
369        self.shared.space.notify_all();
370    }
371}
372
373// ── as_output_stream ─────────────────────────────────────────────────────────
374
375#[derive(Clone)]
376#[allow(dead_code)]
377enum OutputStreamTerminal {
378    Complete,
379    Error(StreamError),
380}
381
382struct OutputStreamBufferState {
383    chunks: VecDeque<Vec<u8>>,
384    terminal: Option<OutputStreamTerminal>,
385}
386
387struct OutputStreamShared {
388    state: Mutex<OutputStreamBufferState>,
389    available: Condvar,
390    space: Condvar,
391    cancelled: AtomicBool,
392}
393
394impl OutputStreamShared {
395    fn new() -> Self {
396        Self {
397            state: Mutex::new(OutputStreamBufferState {
398                chunks: VecDeque::new(),
399                terminal: None,
400            }),
401            available: Condvar::new(),
402            space: Condvar::new(),
403            cancelled: AtomicBool::new(false),
404        }
405    }
406}
407
408struct OutputStreamSourceStream {
409    shared: Arc<OutputStreamShared>,
410    done: bool,
411}
412
413impl Iterator for OutputStreamSourceStream {
414    type Item = StreamResult<Vec<u8>>;
415
416    fn next(&mut self) -> Option<Self::Item> {
417        if self.done {
418            return None;
419        }
420
421        let mut state = self
422            .shared
423            .state
424            .lock()
425            .expect("output stream buffer poisoned");
426        loop {
427            if let Some(chunk) = state.chunks.pop_front() {
428                self.shared.space.notify_all();
429                return Some(Ok(chunk));
430            }
431
432            match &state.terminal {
433                Some(OutputStreamTerminal::Complete) => {
434                    self.done = true;
435                    return None;
436                }
437                Some(OutputStreamTerminal::Error(e)) => {
438                    self.done = true;
439                    return Some(Err(e.clone()));
440                }
441                None => {}
442            }
443
444            state = self
445                .shared
446                .available
447                .wait(state)
448                .expect("output stream buffer poisoned while waiting");
449        }
450    }
451}
452
453impl Drop for OutputStreamSourceStream {
454    fn drop(&mut self) {
455        self.shared.cancelled.store(true, Ordering::SeqCst);
456        drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
457        self.shared.available.notify_all();
458        self.shared.space.notify_all();
459    }
460}
461
462/// A blocking `std::io::Write` handle materialized by [`StreamConverters::as_output_stream`].
463///
464/// This handle bridges a `std::io::Write` into a Datum byte stream (`Source<Vec<u8>>`).
465/// It matches Akka's `OutputStream` semantics: each `write()` call produces one stream
466/// element; backpressure blocks the writer when the stream is not ready; `write_timeout`
467/// bounds how long writes block; `flush()` is a no-op; `close()` completes the stream.
468pub struct OutputStreamHandle {
469    shared: Arc<OutputStreamShared>,
470    write_timeout: Duration,
471    closed: AtomicBool,
472}
473
474impl OutputStreamHandle {
475    fn new(shared: Arc<OutputStreamShared>, write_timeout: Duration) -> Self {
476        Self {
477            shared,
478            write_timeout,
479            closed: AtomicBool::new(false),
480        }
481    }
482
483    /// Completes the stream, signalling EOF to downstream consumers.
484    ///
485    /// After `close()`, subsequent writes return `ErrorKind::BrokenPipe`. Calling
486    /// `close()` more than once is safe — the stream completes at most once.
487    pub fn close(&self) -> io::Result<()> {
488        self.closed.store(true, Ordering::SeqCst);
489        let mut state = self
490            .shared
491            .state
492            .lock()
493            .expect("output stream buffer poisoned");
494        if state.terminal.is_none() {
495            state.terminal = Some(OutputStreamTerminal::Complete);
496        }
497        drop(state);
498        self.shared.available.notify_all();
499        self.shared.space.notify_all();
500        Ok(())
501    }
502}
503
504impl Write for OutputStreamHandle {
505    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
506        if self.closed.load(Ordering::SeqCst) {
507            return Err(io::Error::new(
508                io::ErrorKind::BrokenPipe,
509                "stream is closed, no writes are possible",
510            ));
511        }
512        if buf.is_empty() {
513            return Ok(0);
514        }
515
516        let mut state = self
517            .shared
518            .state
519            .lock()
520            .expect("output stream buffer poisoned");
521        loop {
522            if self.closed.load(Ordering::SeqCst) || self.shared.cancelled.load(Ordering::SeqCst) {
523                return Err(io::Error::new(
524                    io::ErrorKind::BrokenPipe,
525                    "stream is closed, no writes are possible",
526                ));
527            }
528
529            if let Some(OutputStreamTerminal::Error(e)) = &state.terminal {
530                return Err(io::Error::other(format!("stream failed: {e}")));
531            }
532
533            if state.chunks.len() < OUTPUT_STREAM_BUFFER_CAPACITY {
534                state.chunks.push_back(buf.to_vec());
535                drop(state);
536                self.shared.available.notify_all();
537                return Ok(buf.len());
538            }
539
540            let (new_state, timeout) = self
541                .shared
542                .space
543                .wait_timeout(state, self.write_timeout)
544                .expect("output stream buffer poisoned while waiting");
545            state = new_state;
546            if timeout.timed_out()
547                && state.chunks.len() >= OUTPUT_STREAM_BUFFER_CAPACITY
548                && state.terminal.is_none()
549            {
550                return Err(io::Error::new(
551                    io::ErrorKind::TimedOut,
552                    format!(
553                        "timed out trying to write data to stream after {:?}",
554                        self.write_timeout
555                    ),
556                ));
557            }
558        }
559    }
560
561    fn flush(&mut self) -> io::Result<()> {
562        Ok(())
563    }
564}
565
566impl Drop for OutputStreamHandle {
567    fn drop(&mut self) {
568        self.shared.cancelled.store(true, Ordering::SeqCst);
569        let _ = self.close();
570    }
571}
572
573pub struct StreamConverters;
574
575impl StreamConverters {
576    #[must_use]
577    pub fn from_reader<F, R>(factory: F, chunk_size: usize) -> Source<Vec<u8>>
578    where
579        F: Fn() -> std::io::Result<R> + Send + Sync + 'static,
580        R: Read + Send + 'static,
581    {
582        assert!(chunk_size > 0, "chunk size must be greater than zero");
583        Source::from_materialized_factory(move |materializer| {
584            let reader = factory().map_err(io_error)?;
585            let queue = SourceQueue::new();
586            let queue_for_worker = Arc::clone(&queue);
587            let cancelled = Arc::clone(&queue.cancelled);
588            let completion = materializer.spawn_stream(move |_worker_cancelled| {
589                let mut reader = reader;
590                let mut guard = ReaderWorkerGuard::new(Arc::clone(&queue_for_worker));
591                let mut buffer = vec![0_u8; chunk_size];
592
593                loop {
594                    if cancelled.load(Ordering::SeqCst) {
595                        guard.disarm();
596                        return Ok(NotUsed);
597                    }
598
599                    match reader.read(&mut buffer) {
600                        Ok(0) => {
601                            guard.disarm();
602                            queue_for_worker.finish(SourceTerminal::Complete);
603                            return Ok(NotUsed);
604                        }
605                        Ok(read) => {
606                            if !queue_for_worker.push(buffer[..read].to_vec()) {
607                                guard.disarm();
608                                return Ok(NotUsed);
609                            }
610                        }
611                        Err(error) => {
612                            guard.disarm();
613                            queue_for_worker.finish(SourceTerminal::Error(io_error(error)));
614                            return Ok(NotUsed);
615                        }
616                    }
617                }
618            });
619
620            Ok((
621                Box::new(ReaderSourceStream {
622                    queue,
623                    completion: Some(completion),
624                }) as BoxStream<Vec<u8>>,
625                NotUsed,
626            ))
627        })
628    }
629
630    #[must_use]
631    pub fn to_writer<F, W>(factory: F) -> Sink<Vec<u8>, StreamCompletion<NotUsed>>
632    where
633        F: Fn() -> std::io::Result<W> + Send + Sync + 'static,
634        W: Write + Send + 'static,
635    {
636        Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
637            let writer = WriterGuard::new(factory().map_err(io_error)?);
638            Ok(materializer.spawn_stream(move |cancelled| {
639                let mut input = input;
640                let mut writer = writer;
641                loop {
642                    if cancelled.load(Ordering::SeqCst) {
643                        let _ = writer.flush_once();
644                        return Err(StreamError::Cancelled);
645                    }
646
647                    match input.next() {
648                        Some(Ok(chunk)) => {
649                            writer.writer_mut().write_all(&chunk).map_err(io_error)?
650                        }
651                        Some(Err(error)) => {
652                            let _ = writer.flush_once();
653                            return Err(error);
654                        }
655                        None => {
656                            writer.flush_once()?;
657                            return Ok(NotUsed);
658                        }
659                    }
660                }
661            }))
662        })
663    }
664
665    /// Creates a `Sink` which when materialized returns an [`InputStreamHandle`] that can
666    /// be used to read the values produced by the stream this sink is attached to.
667    ///
668    /// This sink bridges Datum streams to synchronous blocking code. The materialized
669    /// handle implements `std::io::Read`: each `read()` call blocks the caller (up to
670    /// `read_timeout`) until upstream produces a chunk, then copies as many bytes as
671    /// possible into the provided buffer. Partial reads are supported — the handle
672    /// retains any leftover bytes from a previous chunk and serves them on the next call.
673    ///
674    /// EOF is signalled by `read()` returning `Ok(0)` when the stream completes. Stream
675    /// errors surface as `io::Error`. Dropping the handle cancels the stream.
676    ///
677    /// Matches Akka `StreamConverters.asInputStream` semantics.
678    #[must_use]
679    pub fn as_input_stream(read_timeout: Duration) -> Sink<Vec<u8>, InputStreamHandle> {
680        Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
681            let shared = Arc::new(InputStreamShared::new());
682            let shared_for_worker = Arc::clone(&shared);
683
684            let completion = materializer.spawn_stream(move |_task_cancelled| {
685                let mut input = input;
686                loop {
687                    if shared_for_worker.cancelled.load(Ordering::SeqCst) {
688                        return Ok(NotUsed);
689                    }
690
691                    match input.next() {
692                        Some(Ok(chunk)) => {
693                            let mut state = shared_for_worker
694                                .state
695                                .lock()
696                                .expect("input stream buffer poisoned");
697                            while state.chunks.len() >= INPUT_STREAM_BUFFER_CAPACITY
698                                && state.terminal.is_none()
699                                && !shared_for_worker.cancelled.load(Ordering::SeqCst)
700                            {
701                                state = shared_for_worker
702                                    .space
703                                    .wait(state)
704                                    .expect("input stream buffer poisoned while waiting");
705                            }
706
707                            if state.terminal.is_some()
708                                || shared_for_worker.cancelled.load(Ordering::SeqCst)
709                            {
710                                return Ok(NotUsed);
711                            }
712
713                            if !chunk.is_empty() {
714                                state.chunks.push_back(chunk);
715                            }
716                            drop(state);
717                            shared_for_worker.available.notify_all();
718                        }
719                        Some(Err(e)) => {
720                            shared_for_worker.set_terminal(InputStreamTerminal::Error(e));
721                            return Ok(NotUsed);
722                        }
723                        None => {
724                            shared_for_worker.set_terminal(InputStreamTerminal::Complete);
725                            return Ok(NotUsed);
726                        }
727                    }
728                }
729            });
730
731            Ok(InputStreamHandle::new(shared, read_timeout, completion))
732        })
733    }
734
735    /// Creates a `Source` which when materialized returns an [`OutputStreamHandle`] that
736    /// can be used to write bytes into the stream.
737    ///
738    /// This source is intended for inter-operation with blocking APIs. The materialized
739    /// handle implements `std::io::Write`: each `write()` call blocks (up to
740    /// `write_timeout`) until the stream has capacity, then produces one stream element
741    /// from the written bytes. Backpressure is respected — writes block when the internal
742    /// buffer is full.
743    ///
744    /// `flush()` is a no-op. [`OutputStreamHandle::close()`] completes the stream (signals
745    /// EOF to downstream). Dropping the handle cancels the stream.
746    ///
747    /// Matches Akka `StreamConverters.asOutputStream` semantics.
748    #[must_use]
749    pub fn as_output_stream(write_timeout: Duration) -> Source<Vec<u8>, OutputStreamHandle> {
750        Source::from_materialized_factory(move |_materializer| {
751            let shared = Arc::new(OutputStreamShared::new());
752            let handle = OutputStreamHandle::new(Arc::clone(&shared), write_timeout);
753            let stream = OutputStreamSourceStream {
754                shared,
755                done: false,
756            };
757            Ok((Box::new(stream) as BoxStream<Vec<u8>>, handle))
758        })
759    }
760}
761
762pub struct FileIO;
763
764impl FileIO {
765    #[must_use]
766    pub fn from_path(path: impl Into<PathBuf>, chunk_size: usize) -> Source<Vec<u8>> {
767        let path = path.into();
768        StreamConverters::from_reader(move || File::open(&path), chunk_size)
769    }
770
771    #[must_use]
772    pub fn from_path_default(path: impl Into<PathBuf>) -> Source<Vec<u8>> {
773        Self::from_path(path, DEFAULT_CHUNK_SIZE)
774    }
775
776    #[must_use]
777    pub fn to_path(path: impl Into<PathBuf>) -> Sink<Vec<u8>, StreamCompletion<NotUsed>> {
778        let path = path.into();
779        StreamConverters::to_writer(move || {
780            OpenOptions::new()
781                .create(true)
782                .truncate(true)
783                .write(true)
784                .open(&path)
785        })
786    }
787}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792    use crate::Source;
793    use crate::testkit::TestSink;
794    use std::io::Cursor;
795    use std::sync::atomic::{AtomicU64, AtomicUsize};
796    use std::thread;
797    use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
798
799    fn wait_for_counter(counter: &AtomicUsize, expected: usize) {
800        let deadline = std::time::Instant::now() + Duration::from_secs(1);
801        while std::time::Instant::now() < deadline {
802            if counter.load(Ordering::SeqCst) == expected {
803                return;
804            }
805            thread::sleep(Duration::from_millis(5));
806        }
807        assert_eq!(counter.load(Ordering::SeqCst), expected);
808    }
809
810    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
811        let deadline = Instant::now() + timeout;
812        while Instant::now() < deadline {
813            if condition() {
814                return true;
815            }
816            thread::sleep(Duration::from_millis(5));
817        }
818        condition()
819    }
820
821    fn unique_temp_path(name: &str) -> PathBuf {
822        let nanos = SystemTime::now()
823            .duration_since(UNIX_EPOCH)
824            .expect("clock after epoch")
825            .as_nanos();
826        std::env::temp_dir().join(format!(
827            "datum-wp12-{name}-{}-{nanos}.bin",
828            std::process::id()
829        ))
830    }
831
832    struct CountingReader {
833        inner: Cursor<Vec<u8>>,
834        drops: Arc<AtomicUsize>,
835    }
836
837    impl Read for CountingReader {
838        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
839            self.inner.read(buf)
840        }
841    }
842
843    impl Drop for CountingReader {
844        fn drop(&mut self) {
845            self.drops.fetch_add(1, Ordering::SeqCst);
846        }
847    }
848
849    struct CountingWriter {
850        writes: Arc<Mutex<Vec<Vec<u8>>>>,
851        flushes: Arc<AtomicUsize>,
852        drops: Arc<AtomicUsize>,
853        fail_write: bool,
854    }
855
856    impl Write for CountingWriter {
857        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
858            if self.fail_write {
859                return Err(std::io::Error::other("writer boom"));
860            }
861            self.writes
862                .lock()
863                .expect("writer log poisoned")
864                .push(buf.to_vec());
865            Ok(buf.len())
866        }
867
868        fn flush(&mut self) -> std::io::Result<()> {
869            self.flushes.fetch_add(1, Ordering::SeqCst);
870            Ok(())
871        }
872    }
873
874    impl Drop for CountingWriter {
875        fn drop(&mut self) {
876            self.drops.fetch_add(1, Ordering::SeqCst);
877        }
878    }
879
880    struct CountingChunkReader {
881        inner: Cursor<Vec<u8>>,
882        chunk_size: usize,
883        reads: Arc<AtomicUsize>,
884    }
885
886    impl Read for CountingChunkReader {
887        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
888            self.reads.fetch_add(1, Ordering::SeqCst);
889            let mut chunk = vec![0_u8; self.chunk_size.min(buf.len())];
890            let read = self.inner.read(&mut chunk)?;
891            buf[..read].copy_from_slice(&chunk[..read]);
892            Ok(read)
893        }
894    }
895
896    #[test]
897    fn from_reader_emits_chunked_bytes_and_completes() {
898        let sink = StreamConverters::from_reader(|| Ok(Cursor::new(b"abcdef".to_vec())), 2)
899            .run_with(TestSink::probe())
900            .expect("reader source materializes");
901
902        sink.request(4);
903        sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec(), b"ef".to_vec()]);
904        sink.expect_complete();
905    }
906
907    #[test]
908    fn from_reader_closes_exactly_once_on_completion() {
909        let drops = Arc::new(AtomicUsize::new(0));
910        let drops_for_reader = Arc::clone(&drops);
911        let sink = StreamConverters::from_reader(
912            move || {
913                Ok(CountingReader {
914                    inner: Cursor::new(b"hello".to_vec()),
915                    drops: Arc::clone(&drops_for_reader),
916                })
917            },
918            8,
919        )
920        .run_with(TestSink::probe())
921        .expect("reader source materializes");
922
923        sink.request(2);
924        sink.assert_next(b"hello".to_vec());
925        sink.expect_complete();
926        wait_for_counter(&drops, 1);
927    }
928
929    #[test]
930    fn from_reader_closes_exactly_once_on_cancellation() {
931        let drops = Arc::new(AtomicUsize::new(0));
932        let drops_for_reader = Arc::clone(&drops);
933        let mut sink = StreamConverters::from_reader(
934            move || {
935                Ok(CountingReader {
936                    inner: Cursor::new(vec![1_u8; 32]),
937                    drops: Arc::clone(&drops_for_reader),
938                })
939            },
940            4,
941        )
942        .run_with(TestSink::probe())
943        .expect("reader source materializes");
944
945        sink.request(1);
946        sink.assert_next(vec![1_u8; 4]);
947        sink.cancel();
948        wait_for_counter(&drops, 1);
949    }
950
951    #[test]
952    fn from_reader_surfaces_read_failure() {
953        struct FailingReader;
954
955        impl Read for FailingReader {
956            fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
957                Err(std::io::Error::other("reader boom"))
958            }
959        }
960
961        let sink = StreamConverters::from_reader(|| Ok(FailingReader), 8)
962            .run_with(TestSink::probe())
963            .expect("reader source materializes");
964
965        sink.request(1);
966        assert_eq!(
967            sink.expect_error(),
968            StreamError::Failed("reader boom".to_owned())
969        );
970    }
971
972    #[test]
973    fn from_reader_caps_buffered_read_ahead_for_slow_consumers() {
974        let payload: Vec<u8> = (0..64_u8).cycle().take(8192 * 16).collect();
975        let payload_for_reader = payload.clone();
976        let reads = Arc::new(AtomicUsize::new(0));
977        let reads_for_reader = Arc::clone(&reads);
978        let sink = StreamConverters::from_reader(
979            move || {
980                Ok(CountingChunkReader {
981                    inner: Cursor::new(payload_for_reader.clone()),
982                    chunk_size: 256,
983                    reads: Arc::clone(&reads_for_reader),
984                })
985            },
986            256,
987        )
988        .run_with(TestSink::probe())
989        .expect("reader source materializes");
990
991        sink.request(1);
992        let first = sink.expect_next();
993        assert_eq!(first.len(), 256);
994
995        let last_seen = Arc::new(AtomicUsize::new(0));
996        let quiet_since_ms = Arc::new(AtomicU64::new(0));
997        let start = Instant::now();
998        assert!(wait_until(Duration::from_secs(2), {
999            let last_seen = Arc::clone(&last_seen);
1000            let quiet_since_ms = Arc::clone(&quiet_since_ms);
1001            let reads = Arc::clone(&reads);
1002            move || {
1003                let current = reads.load(Ordering::SeqCst);
1004                let last = last_seen.load(Ordering::SeqCst);
1005                if current != last {
1006                    last_seen.store(current, Ordering::SeqCst);
1007                    quiet_since_ms.store(start.elapsed().as_millis() as u64, Ordering::SeqCst);
1008                    return false;
1009                }
1010
1011                let quiet_for =
1012                    start.elapsed().as_millis() as u64 - quiet_since_ms.load(Ordering::SeqCst);
1013                current > 0 && quiet_for >= 100
1014            }
1015        }));
1016
1017        assert!(
1018            reads.load(Ordering::SeqCst) <= READER_QUEUE_CAPACITY + 2,
1019            "reader should plateau near the bounded queue capacity"
1020        );
1021
1022        sink.request(usize::MAX);
1023        let mut collected = first;
1024        for chunk in sink.drain_until_complete() {
1025            collected.extend_from_slice(&chunk);
1026        }
1027        assert_eq!(collected, payload);
1028    }
1029
1030    #[test]
1031    fn to_writer_writes_all_chunks_and_flushes_once() {
1032        let writes = Arc::new(Mutex::new(Vec::new()));
1033        let flushes = Arc::new(AtomicUsize::new(0));
1034        let drops = Arc::new(AtomicUsize::new(0));
1035        let completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
1036            .run_with(StreamConverters::to_writer({
1037                let writes = Arc::clone(&writes);
1038                let flushes = Arc::clone(&flushes);
1039                let drops = Arc::clone(&drops);
1040                move || {
1041                    Ok(CountingWriter {
1042                        writes: Arc::clone(&writes),
1043                        flushes: Arc::clone(&flushes),
1044                        drops: Arc::clone(&drops),
1045                        fail_write: false,
1046                    })
1047                }
1048            }))
1049            .expect("writer sink materializes");
1050
1051        completion.wait().expect("writer sink completes");
1052        assert_eq!(
1053            writes.lock().expect("writes poisoned").as_slice(),
1054            &[b"ab".to_vec(), b"cd".to_vec()]
1055        );
1056        assert_eq!(flushes.load(Ordering::SeqCst), 1);
1057        assert_eq!(drops.load(Ordering::SeqCst), 1);
1058    }
1059
1060    #[test]
1061    fn to_writer_flushes_and_drops_once_on_failure() {
1062        let flushes = Arc::new(AtomicUsize::new(0));
1063        let drops = Arc::new(AtomicUsize::new(0));
1064        let completion = Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
1065            .run_with(StreamConverters::to_writer({
1066                let flushes = Arc::clone(&flushes);
1067                let drops = Arc::clone(&drops);
1068                move || {
1069                    Ok(CountingWriter {
1070                        writes: Arc::new(Mutex::new(Vec::new())),
1071                        flushes: Arc::clone(&flushes),
1072                        drops: Arc::clone(&drops),
1073                        fail_write: false,
1074                    })
1075                }
1076            }))
1077            .expect("writer sink materializes");
1078
1079        assert_eq!(
1080            completion.wait(),
1081            Err(StreamError::Failed("upstream boom".to_owned()))
1082        );
1083        assert_eq!(flushes.load(Ordering::SeqCst), 1);
1084        assert_eq!(drops.load(Ordering::SeqCst), 1);
1085    }
1086
1087    #[test]
1088    fn to_writer_flushes_and_drops_once_on_cancellation() {
1089        let flushes = Arc::new(AtomicUsize::new(0));
1090        let drops = Arc::new(AtomicUsize::new(0));
1091        let completion = Source::repeat(vec![7_u8; 4])
1092            .run_with(StreamConverters::to_writer({
1093                let flushes = Arc::clone(&flushes);
1094                let drops = Arc::clone(&drops);
1095                move || {
1096                    Ok(CountingWriter {
1097                        writes: Arc::new(Mutex::new(Vec::new())),
1098                        flushes: Arc::clone(&flushes),
1099                        drops: Arc::clone(&drops),
1100                        fail_write: false,
1101                    })
1102                }
1103            }))
1104            .expect("writer sink materializes");
1105
1106        drop(completion);
1107        wait_for_counter(&flushes, 1);
1108        wait_for_counter(&drops, 1);
1109    }
1110
1111    #[test]
1112    fn to_writer_surfaces_write_failure() {
1113        let completion = Source::single(vec![1_u8])
1114            .run_with(StreamConverters::to_writer(|| {
1115                Ok(CountingWriter {
1116                    writes: Arc::new(Mutex::new(Vec::new())),
1117                    flushes: Arc::new(AtomicUsize::new(0)),
1118                    drops: Arc::new(AtomicUsize::new(0)),
1119                    fail_write: true,
1120                })
1121            }))
1122            .expect("writer sink materializes");
1123
1124        assert_eq!(
1125            completion.wait(),
1126            Err(StreamError::Failed("writer boom".to_owned()))
1127        );
1128    }
1129
1130    #[test]
1131    fn file_io_round_trips_bytes() {
1132        let path = unique_temp_path("roundtrip");
1133        let write_completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
1134            .run_with(FileIO::to_path(path.clone()))
1135            .expect("file sink materializes");
1136        write_completion.wait().expect("file write completes");
1137
1138        let sink = FileIO::from_path(path.clone(), 2)
1139            .run_with(TestSink::probe())
1140            .expect("file source materializes");
1141        sink.request(4);
1142        sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
1143        sink.expect_complete();
1144
1145        std::fs::remove_file(path).expect("remove roundtrip file");
1146    }
1147
1148    #[test]
1149    fn file_io_source_surfaces_open_failure() {
1150        let missing = unique_temp_path("missing");
1151        let result = FileIO::from_path(missing, 4).run_with(TestSink::probe());
1152        assert!(matches!(result, Err(StreamError::Failed(_))));
1153    }
1154
1155    #[test]
1156    fn file_io_sink_creates_and_truncates_file() {
1157        let path = unique_temp_path("truncate");
1158        std::fs::write(&path, b"stale bytes").expect("seed file");
1159
1160        let completion = Source::single(b"ok".to_vec())
1161            .run_with(FileIO::to_path(path.clone()))
1162            .expect("file sink materializes");
1163        completion.wait().expect("file write completes");
1164        assert_eq!(std::fs::read(&path).expect("read file"), b"ok");
1165
1166        std::fs::remove_file(path).expect("remove truncate file");
1167    }
1168
1169    #[test]
1170    fn file_io_source_default_chunk_size_works() {
1171        let path = unique_temp_path("default");
1172        std::fs::write(&path, b"hi").expect("write seed file");
1173
1174        let sink = FileIO::from_path_default(path.clone())
1175            .run_with(TestSink::probe())
1176            .expect("file source materializes");
1177        sink.request(2);
1178        sink.assert_next(b"hi".to_vec());
1179        sink.expect_complete();
1180
1181        std::fs::remove_file(path).expect("remove default file");
1182    }
1183
1184    // ── as_input_stream / as_output_stream tests ───────────────────────────
1185
1186    #[test]
1187    fn as_input_stream_reads_data_written_by_stream() {
1188        let mut handle: InputStreamHandle =
1189            Source::from_iter([b"hello".to_vec(), b"world".to_vec()])
1190                .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1191                .expect("input stream sink materializes");
1192
1193        // A single read() may return fewer bytes than requested (std::io::Read
1194        // contract); under scheduler load the second chunk may not be buffered
1195        // yet. Drain in a loop until EOF.
1196        let mut buf = [0_u8; 32];
1197        let mut total = 0_usize;
1198        loop {
1199            let n = handle.read(&mut buf[total..]).expect("read succeeds");
1200            if n == 0 {
1201                break;
1202            }
1203            total += n;
1204        }
1205        assert_eq!(&buf[..total], b"helloworld");
1206    }
1207
1208    #[test]
1209    fn as_input_stream_eof_when_stream_completes() {
1210        let mut handle: InputStreamHandle = Source::from_iter([b"x".to_vec()])
1211            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1212            .expect("input stream sink materializes");
1213
1214        let mut buf = [0_u8; 4];
1215        let n = handle.read(&mut buf).expect("first read succeeds");
1216        assert_eq!(&buf[..n], b"x");
1217
1218        // EOF
1219        let n = handle.read(&mut buf).expect("second read returns eof");
1220        assert_eq!(n, 0);
1221    }
1222
1223    #[test]
1224    fn as_input_stream_partial_reads_work() {
1225        let mut handle: InputStreamHandle = Source::single(b"abcde".to_vec())
1226            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1227            .expect("input stream sink materializes");
1228
1229        let mut small = [0_u8; 2];
1230        let n = handle.read(&mut small).expect("small read succeeds");
1231        assert_eq!(n, 2);
1232        assert_eq!(&small[..], b"ab");
1233
1234        // remainder from same chunk
1235        let n = handle.read(&mut small).expect("second small read succeeds");
1236        assert_eq!(n, 2);
1237        assert_eq!(&small[..], b"cd");
1238
1239        // final byte
1240        let n = handle.read(&mut small).expect("third small read succeeds");
1241        assert_eq!(n, 1);
1242        assert_eq!(&small[..1], b"e");
1243
1244        // EOF
1245        let n = handle.read(&mut small).expect("fourth read returns eof");
1246        assert_eq!(n, 0);
1247    }
1248
1249    #[test]
1250    fn as_input_stream_error_surfaces_as_io_error() {
1251        let mut handle: InputStreamHandle =
1252            Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
1253                .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1254                .expect("input stream sink materializes");
1255
1256        let mut buf = [0_u8; 8];
1257        let err = handle.read(&mut buf).expect_err("read surfaces error");
1258        let msg = err.to_string();
1259        assert!(msg.contains("upstream boom"), "got: {msg}");
1260    }
1261
1262    #[test]
1263    fn as_input_stream_cancellation_stops_reads() {
1264        let mut handle: InputStreamHandle = Source::repeat(b"x".to_vec())
1265            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1266            .expect("input stream sink materializes");
1267
1268        let mut buf = [0_u8; 3];
1269        let n = handle.read(&mut buf).expect("first read succeeds");
1270        // A single read() may return fewer bytes than requested; under load not
1271        // all repeated chunks are buffered yet. We only need some data to confirm
1272        // reads work before drop cancels the stream.
1273        assert!((1..=3).contains(&n), "expected 1..=3 bytes, got {n}");
1274
1275        drop(handle);
1276
1277        // After drop, the handle is gone — the worker thread was signalled to stop
1278        // and the shared state was cleaned up. No further reads possible.
1279    }
1280
1281    #[test]
1282    fn as_input_stream_read_timeout_returns_timed_out() {
1283        // A source that produces nothing and never completes — read should time out.
1284        let mut handle: InputStreamHandle = Source::<Vec<u8>>::never()
1285            .run_with(StreamConverters::as_input_stream(Duration::from_millis(10)))
1286            .expect("input stream sink materializes");
1287
1288        let mut buf = [0_u8; 4];
1289        let err = handle.read(&mut buf).expect_err("read times out");
1290        assert_eq!(err.kind(), io::ErrorKind::TimedOut);
1291    }
1292
1293    #[test]
1294    fn as_output_stream_writes_data_appear_in_stream() {
1295        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1296            .to_mat(Sink::collect(), crate::Keep::both)
1297            .run()
1298            .expect("output stream source materializes");
1299
1300        handle.write_all(b"alpha").expect("first write succeeds");
1301        handle.write_all(b"beta").expect("second write succeeds");
1302        handle.close().expect("close succeeds");
1303
1304        let chunks = completion.wait().expect("stream completes");
1305        assert_eq!(chunks, vec![b"alpha".to_vec(), b"beta".to_vec()]);
1306    }
1307
1308    #[test]
1309    fn as_output_stream_close_completes_stream() {
1310        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1311            .to_mat(Sink::collect(), crate::Keep::both)
1312            .run()
1313            .expect("output stream source materializes");
1314
1315        handle.write_all(b"done").expect("write succeeds");
1316        let result = handle.close();
1317        assert!(result.is_ok());
1318
1319        let chunks = completion.wait().expect("stream completes after close");
1320        assert_eq!(chunks, vec![b"done".to_vec()]);
1321    }
1322
1323    #[test]
1324    fn as_output_stream_write_after_close_fails() {
1325        let (mut handle, _completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1326            .to_mat(Sink::ignore(), crate::Keep::both)
1327            .run()
1328            .expect("output stream source materializes");
1329
1330        handle.close().expect("first close succeeds");
1331        let err = handle.write(b"late").expect_err("write after close fails");
1332        assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
1333    }
1334
1335    #[test]
1336    fn as_output_stream_cancellation_stops_writes() {
1337        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1338            .to_mat(Sink::ignore(), crate::Keep::both)
1339            .run()
1340            .expect("output stream source materializes");
1341
1342        handle.write_all(b"ok").expect("write succeeds");
1343
1344        // Drop the stream consumer side — this cancels the stream
1345        drop(completion);
1346
1347        // Give cancellation time to propagate
1348        let deadline = std::time::Instant::now() + Duration::from_secs(1);
1349        let mut last_err = None;
1350        while std::time::Instant::now() < deadline {
1351            match handle.write(b"after cancel") {
1352                Err(e) => {
1353                    last_err = Some(e);
1354                    break;
1355                }
1356                _ => thread::sleep(Duration::from_millis(5)),
1357            }
1358        }
1359        let err = last_err.expect("write after cancellation should fail");
1360        assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
1361    }
1362
1363    #[test]
1364    fn as_output_stream_write_timeout_returns_timed_out() {
1365        // Use a sink that blocks without consuming — this keeps the graph alive
1366        // while the output buffer fills up so writes time out.
1367        let hang_sink: Sink<Vec<u8>, StreamCompletion<NotUsed>> =
1368            Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
1369                Ok(materializer.spawn_stream(move |cancelled| {
1370                    let _input = input;
1371                    loop {
1372                        if cancelled.load(Ordering::SeqCst) {
1373                            return Ok(NotUsed);
1374                        }
1375                        thread::sleep(Duration::from_millis(1));
1376                    }
1377                }))
1378            });
1379        let (mut handle, _hang_completion) =
1380            StreamConverters::as_output_stream(Duration::from_millis(50))
1381                .to_mat(hang_sink, crate::Keep::both)
1382                .run()
1383                .expect("output stream source materializes");
1384
1385        let capacity = 16_usize;
1386
1387        // Fill the buffer to capacity
1388        for _ in 0..capacity {
1389            handle.write_all(b"x").expect("buffer-fill write succeeds");
1390        }
1391
1392        // This write should time out because no consumer is draining
1393        let err = handle.write(b"overflow").expect_err("write times out");
1394        assert_eq!(err.kind(), io::ErrorKind::TimedOut);
1395    }
1396
1397    #[test]
1398    fn as_output_stream_flush_is_noop() {
1399        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1400            .to_mat(Sink::collect(), crate::Keep::both)
1401            .run()
1402            .expect("output stream source materializes");
1403
1404        handle.write_all(b"data").expect("write succeeds");
1405        handle.flush().expect("flush is a noop");
1406        handle.close().expect("close succeeds");
1407
1408        let chunks = completion.wait().expect("stream completes");
1409        assert_eq!(chunks, vec![b"data".to_vec()]);
1410    }
1411
1412    #[test]
1413    fn as_output_stream_drop_completes_stream() {
1414        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1415            .to_mat(Sink::collect(), crate::Keep::both)
1416            .run()
1417            .expect("output stream source materializes");
1418
1419        handle.write_all(b"drop-me").expect("write succeeds");
1420        drop(handle);
1421
1422        let chunks = completion.wait().expect("stream completes after drop");
1423        assert_eq!(chunks, vec![b"drop-me".to_vec()]);
1424    }
1425
1426    #[test]
1427    fn round_trip_output_stream_to_input_stream() {
1428        // Write bytes to an output stream handle, pipe through the stream,
1429        // read them back from the input stream handle.
1430        let (mut out_handle, mut in_handle): (OutputStreamHandle, InputStreamHandle) =
1431            StreamConverters::as_output_stream(Duration::from_secs(5))
1432                .to_mat(
1433                    StreamConverters::as_input_stream(Duration::from_secs(5)),
1434                    crate::Keep::both,
1435                )
1436                .run()
1437                .expect("round-trip stream materializes");
1438
1439        out_handle.write_all(b"round").expect("write round");
1440        out_handle.write_all(b"trip").expect("write trip");
1441        out_handle.close().expect("close output");
1442
1443        let mut buf = [0_u8; 16];
1444        let mut total = 0_usize;
1445        loop {
1446            let n = in_handle.read(&mut buf[total..]).expect("read");
1447            if n == 0 {
1448                break;
1449            }
1450            total += n;
1451        }
1452        assert_eq!(&buf[..total], b"roundtrip");
1453    }
1454
1455    #[test]
1456    fn as_input_stream_empty_buf_read_returns_zero() {
1457        let mut handle: InputStreamHandle = Source::single(b"abc".to_vec())
1458            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1459            .expect("input stream sink materializes");
1460
1461        let n = handle.read(&mut []).expect("empty read succeeds");
1462        assert_eq!(n, 0);
1463    }
1464
1465    #[test]
1466    fn as_input_stream_large_read_across_multiple_chunks() {
1467        // Source produces many small chunks; drain with a read loop
1468        // because std::io::Read may return fewer bytes than requested.
1469        let chunks: Vec<Vec<u8>> = (0..10).map(|i| vec![i as u8; 3]).collect();
1470        let total_bytes: usize = chunks.iter().map(|c| c.len()).sum();
1471
1472        let mut handle: InputStreamHandle = Source::from_iter(chunks)
1473            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1474            .expect("input stream sink materializes");
1475
1476        let mut buf = vec![0_u8; total_bytes];
1477        let mut total = 0_usize;
1478        loop {
1479            let n = handle.read(&mut buf[total..]).expect("large read succeeds");
1480            if n == 0 {
1481                break;
1482            }
1483            total += n;
1484        }
1485        assert_eq!(total, total_bytes);
1486    }
1487}