Skip to main content

datum/io/
adapters.rs

1use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
2use crate::{StreamError, StreamResult};
3use std::collections::VecDeque;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::path::PathBuf;
7use std::sync::{
8    Arc, Condvar, Mutex,
9    atomic::{AtomicBool, Ordering},
10};
11use std::time::Duration;
12
13const DEFAULT_CHUNK_SIZE: usize = 8192;
14const READER_QUEUE_CAPACITY: usize = 8;
15const INPUT_STREAM_BUFFER_CAPACITY: usize = 16;
16const OUTPUT_STREAM_BUFFER_CAPACITY: usize = 16;
17
18fn io_error(error: std::io::Error) -> StreamError {
19    StreamError::Failed(error.to_string())
20}
21
22#[derive(Clone)]
23enum SourceTerminal {
24    Complete,
25    Error(StreamError),
26}
27
28struct SourceQueueState {
29    queue: VecDeque<Vec<u8>>,
30    terminal: Option<SourceTerminal>,
31}
32
33struct SourceQueue {
34    state: Mutex<SourceQueueState>,
35    available: Condvar,
36    space: Condvar,
37    capacity: usize,
38    cancelled: Arc<AtomicBool>,
39}
40
41impl SourceQueue {
42    fn new() -> Arc<Self> {
43        Arc::new(Self {
44            state: Mutex::new(SourceQueueState {
45                queue: VecDeque::new(),
46                terminal: None,
47            }),
48            available: Condvar::new(),
49            space: Condvar::new(),
50            capacity: READER_QUEUE_CAPACITY,
51            cancelled: Arc::new(AtomicBool::new(false)),
52        })
53    }
54
55    fn push(&self, chunk: Vec<u8>) -> bool {
56        let mut state = self.state.lock().expect("io source queue poisoned");
57        while state.queue.len() >= self.capacity
58            && state.terminal.is_none()
59            && !self.cancelled.load(Ordering::SeqCst)
60        {
61            state = self
62                .space
63                .wait(state)
64                .expect("io source queue poisoned while waiting for space");
65        }
66
67        if state.terminal.is_some() || self.cancelled.load(Ordering::SeqCst) {
68            return false;
69        }
70
71        if state.terminal.is_none() {
72            state.queue.push_back(chunk);
73        }
74        drop(state);
75        self.available.notify_all();
76        true
77    }
78
79    fn finish(&self, terminal: SourceTerminal) {
80        let mut state = self.state.lock().expect("io source queue poisoned");
81        if state.terminal.is_none() {
82            state.terminal = Some(terminal);
83        }
84        drop(state);
85        self.available.notify_all();
86        self.space.notify_all();
87    }
88}
89
90struct ReaderWorkerGuard {
91    queue: Arc<SourceQueue>,
92    armed: bool,
93}
94
95impl ReaderWorkerGuard {
96    fn new(queue: Arc<SourceQueue>) -> Self {
97        Self { queue, armed: true }
98    }
99
100    fn disarm(&mut self) {
101        self.armed = false;
102    }
103}
104
105impl Drop for ReaderWorkerGuard {
106    fn drop(&mut self) {
107        if self.armed {
108            self.queue
109                .finish(SourceTerminal::Error(StreamError::AbruptTermination));
110        }
111    }
112}
113
114struct ReaderSourceStream {
115    queue: Arc<SourceQueue>,
116    completion: Option<StreamCompletion<NotUsed>>,
117}
118
119impl Iterator for ReaderSourceStream {
120    type Item = StreamResult<Vec<u8>>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        let mut state = self.queue.state.lock().expect("io source queue poisoned");
124        loop {
125            if let Some(chunk) = state.queue.pop_front() {
126                self.queue.space.notify_all();
127                return Some(Ok(chunk));
128            }
129            if let Some(terminal) = state.terminal.clone() {
130                return match terminal {
131                    SourceTerminal::Complete => None,
132                    SourceTerminal::Error(error) => Some(Err(error)),
133                };
134            }
135            state = self
136                .queue
137                .available
138                .wait(state)
139                .expect("io source queue poisoned while waiting");
140        }
141    }
142}
143
144impl Drop for ReaderSourceStream {
145    fn drop(&mut self) {
146        self.queue.cancelled.store(true, Ordering::SeqCst);
147        // Take and release the state lock before notifying: a producer that
148        // already read `cancelled == false` under the lock is then guaranteed
149        // to be parked inside `space.wait` before the notification fires.
150        // Without this the wake-up can land in that gap and be lost, parking
151        // the reader worker forever (no later notifier exists once the
152        // consumer is gone). `unwrap_or_else` instead of `expect`: panicking
153        // in Drop during unwind would abort.
154        drop(self.queue.state.lock().unwrap_or_else(|p| p.into_inner()));
155        self.queue.available.notify_all();
156        self.queue.space.notify_all();
157        let _ = self.completion.take();
158    }
159}
160
161struct WriterGuard<W: Write> {
162    writer: W,
163    flushed: bool,
164}
165
166impl<W: Write> WriterGuard<W> {
167    fn new(writer: W) -> Self {
168        Self {
169            writer,
170            flushed: false,
171        }
172    }
173
174    fn writer_mut(&mut self) -> &mut W {
175        &mut self.writer
176    }
177
178    fn flush_once(&mut self) -> StreamResult<()> {
179        if self.flushed {
180            return Ok(());
181        }
182        self.writer.flush().map_err(io_error)?;
183        self.flushed = true;
184        Ok(())
185    }
186}
187
188impl<W: Write> Drop for WriterGuard<W> {
189    fn drop(&mut self) {
190        let _ = self.flush_once();
191    }
192}
193
194// ── as_input_stream ──────────────────────────────────────────────────────────
195
196#[derive(Clone)]
197enum InputStreamTerminal {
198    Complete,
199    Error(StreamError),
200}
201
202struct InputStreamBufferState {
203    chunks: VecDeque<Vec<u8>>,
204    terminal: Option<InputStreamTerminal>,
205}
206
207struct InputStreamShared {
208    state: Mutex<InputStreamBufferState>,
209    available: Condvar,
210    space: Condvar,
211    cancelled: AtomicBool,
212}
213
214impl InputStreamShared {
215    fn new() -> Self {
216        Self {
217            state: Mutex::new(InputStreamBufferState {
218                chunks: VecDeque::new(),
219                terminal: None,
220            }),
221            available: Condvar::new(),
222            space: Condvar::new(),
223            cancelled: AtomicBool::new(false),
224        }
225    }
226
227    fn set_terminal(&self, terminal: InputStreamTerminal) {
228        let mut state = self.state.lock().expect("input stream buffer poisoned");
229        if state.terminal.is_none() {
230            state.terminal = Some(terminal);
231        }
232        drop(state);
233        self.available.notify_all();
234        self.space.notify_all();
235    }
236}
237
238/// A blocking `std::io::Read` handle materialized by [`StreamConverters::as_input_stream`].
239///
240/// This handle bridges a Datum byte stream (`Sink<Vec<u8>>`) into synchronous, blocking
241/// read calls. It matches Akka's `InputStream` semantics: partial reads are supported,
242/// EOF is `Ok(0)` when the stream completes, stream errors surface as `io::Error`, and
243/// the `read_timeout` bounds how long `read()` waits for new data.
244pub struct InputStreamHandle {
245    shared: Arc<InputStreamShared>,
246    detached: Vec<u8>,
247    detached_offset: usize,
248    read_timeout: Duration,
249    stream_closed: bool,
250    _completion: StreamCompletion<NotUsed>,
251}
252
253impl InputStreamHandle {
254    fn new(
255        shared: Arc<InputStreamShared>,
256        read_timeout: Duration,
257        completion: StreamCompletion<NotUsed>,
258    ) -> Self {
259        Self {
260            shared,
261            detached: Vec::new(),
262            detached_offset: 0,
263            read_timeout,
264            stream_closed: false,
265            _completion: completion,
266        }
267    }
268}
269
270impl Read for InputStreamHandle {
271    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
272        if self.stream_closed {
273            return Err(io::Error::other(
274                "stream is terminated, no reads are possible",
275            ));
276        }
277        if buf.is_empty() {
278            return Ok(0);
279        }
280
281        let mut total = 0_usize;
282
283        // Serve from detached chunk first
284        if self.detached_offset < self.detached.len() {
285            let available = self.detached.len() - self.detached_offset;
286            let n = available.min(buf.len());
287            buf[..n]
288                .copy_from_slice(&self.detached[self.detached_offset..self.detached_offset + n]);
289            self.detached_offset += n;
290            total += n;
291            if self.detached_offset >= self.detached.len() {
292                self.detached.clear();
293                self.detached_offset = 0;
294            }
295            if total == buf.len() {
296                return Ok(total);
297            }
298        }
299
300        let mut state = self
301            .shared
302            .state
303            .lock()
304            .expect("input stream buffer poisoned");
305        loop {
306            // Drain as many chunks as possible without blocking
307            while total < buf.len() {
308                if let Some(chunk) = state.chunks.pop_front() {
309                    self.shared.space.notify_all();
310                    drop(state);
311
312                    let space = buf.len() - total;
313                    let n = chunk.len().min(space);
314                    buf[total..total + n].copy_from_slice(&chunk[..n]);
315                    total += n;
316                    if n < chunk.len() {
317                        self.detached = chunk;
318                        self.detached_offset = n;
319                    }
320
321                    // Re-acquire lock for next iteration
322                    state = self
323                        .shared
324                        .state
325                        .lock()
326                        .expect("input stream buffer poisoned");
327                    continue;
328                }
329                break;
330            }
331
332            // If we have data, return it
333            if total > 0 {
334                return Ok(total);
335            }
336
337            // Check terminal
338            if let Some(terminal) = state.terminal.clone() {
339                return match terminal {
340                    InputStreamTerminal::Complete => Ok(0),
341                    InputStreamTerminal::Error(e) => {
342                        Err(io::Error::other(format!("stream failed: {e}")))
343                    }
344                };
345            }
346
347            // Wait for new data
348            let (new_state, timeout) = self
349                .shared
350                .available
351                .wait_timeout(state, self.read_timeout)
352                .expect("input stream buffer poisoned while waiting");
353            state = new_state;
354            if timeout.timed_out() && state.chunks.is_empty() && state.terminal.is_none() {
355                return Err(io::Error::new(
356                    io::ErrorKind::TimedOut,
357                    format!("timeout after {:?} waiting for new data", self.read_timeout),
358                ));
359            }
360        }
361    }
362}
363
364impl Drop for InputStreamHandle {
365    fn drop(&mut self) {
366        self.shared.cancelled.store(true, Ordering::SeqCst);
367        drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
368        self.shared.available.notify_all();
369        self.shared.space.notify_all();
370    }
371}
372
373// ── as_output_stream ─────────────────────────────────────────────────────────
374
375#[derive(Clone)]
376#[allow(dead_code)]
377enum OutputStreamTerminal {
378    Complete,
379    Error(StreamError),
380}
381
382struct OutputStreamBufferState {
383    chunks: VecDeque<Vec<u8>>,
384    terminal: Option<OutputStreamTerminal>,
385}
386
387struct OutputStreamShared {
388    state: Mutex<OutputStreamBufferState>,
389    available: Condvar,
390    space: Condvar,
391    cancelled: AtomicBool,
392}
393
394impl OutputStreamShared {
395    fn new() -> Self {
396        Self {
397            state: Mutex::new(OutputStreamBufferState {
398                chunks: VecDeque::new(),
399                terminal: None,
400            }),
401            available: Condvar::new(),
402            space: Condvar::new(),
403            cancelled: AtomicBool::new(false),
404        }
405    }
406}
407
408struct OutputStreamSourceStream {
409    shared: Arc<OutputStreamShared>,
410    done: bool,
411}
412
413impl Iterator for OutputStreamSourceStream {
414    type Item = StreamResult<Vec<u8>>;
415
416    fn next(&mut self) -> Option<Self::Item> {
417        if self.done {
418            return None;
419        }
420
421        let mut state = self
422            .shared
423            .state
424            .lock()
425            .expect("output stream buffer poisoned");
426        loop {
427            if let Some(chunk) = state.chunks.pop_front() {
428                self.shared.space.notify_all();
429                return Some(Ok(chunk));
430            }
431
432            match &state.terminal {
433                Some(OutputStreamTerminal::Complete) => {
434                    self.done = true;
435                    return None;
436                }
437                // Kept for symmetry with the input side.
438                Some(OutputStreamTerminal::Error(e)) => {
439                    self.done = true;
440                    return Some(Err(e.clone()));
441                }
442                None => {}
443            }
444
445            state = self
446                .shared
447                .available
448                .wait(state)
449                .expect("output stream buffer poisoned while waiting");
450        }
451    }
452}
453
454impl Drop for OutputStreamSourceStream {
455    fn drop(&mut self) {
456        self.shared.cancelled.store(true, Ordering::SeqCst);
457        drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
458        self.shared.available.notify_all();
459        self.shared.space.notify_all();
460    }
461}
462
463/// A blocking `std::io::Write` handle materialized by [`StreamConverters::as_output_stream`].
464///
465/// This handle bridges a `std::io::Write` into a Datum byte stream (`Source<Vec<u8>>`).
466/// It matches Akka's `OutputStream` semantics: each `write()` call produces one stream
467/// element; backpressure blocks the writer when the stream is not ready; `write_timeout`
468/// bounds how long writes block; `flush()` is a no-op; `close()` completes the stream.
469pub struct OutputStreamHandle {
470    shared: Arc<OutputStreamShared>,
471    write_timeout: Duration,
472    closed: AtomicBool,
473}
474
475impl OutputStreamHandle {
476    fn new(shared: Arc<OutputStreamShared>, write_timeout: Duration) -> Self {
477        Self {
478            shared,
479            write_timeout,
480            closed: AtomicBool::new(false),
481        }
482    }
483
484    /// Completes the stream, signalling EOF to downstream consumers.
485    ///
486    /// After `close()`, subsequent writes return `ErrorKind::BrokenPipe`. Calling
487    /// `close()` more than once is safe — the stream completes at most once.
488    pub fn close(&self) -> io::Result<()> {
489        self.closed.store(true, Ordering::SeqCst);
490        let mut state = self
491            .shared
492            .state
493            .lock()
494            .expect("output stream buffer poisoned");
495        if state.terminal.is_none() {
496            state.terminal = Some(OutputStreamTerminal::Complete);
497        }
498        drop(state);
499        self.shared.available.notify_all();
500        self.shared.space.notify_all();
501        Ok(())
502    }
503}
504
505impl Write for OutputStreamHandle {
506    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
507        if self.closed.load(Ordering::SeqCst) {
508            return Err(io::Error::new(
509                io::ErrorKind::BrokenPipe,
510                "stream is closed, no writes are possible",
511            ));
512        }
513        if buf.is_empty() {
514            return Ok(0);
515        }
516
517        let mut state = self
518            .shared
519            .state
520            .lock()
521            .expect("output stream buffer poisoned");
522        loop {
523            if self.closed.load(Ordering::SeqCst) || self.shared.cancelled.load(Ordering::SeqCst) {
524                return Err(io::Error::new(
525                    io::ErrorKind::BrokenPipe,
526                    "stream is closed, no writes are possible",
527                ));
528            }
529
530            if let Some(OutputStreamTerminal::Error(e)) = &state.terminal {
531                return Err(io::Error::other(format!("stream failed: {e}")));
532            }
533
534            if state.chunks.len() < OUTPUT_STREAM_BUFFER_CAPACITY {
535                state.chunks.push_back(buf.to_vec());
536                drop(state);
537                self.shared.available.notify_all();
538                return Ok(buf.len());
539            }
540
541            let (new_state, timeout) = self
542                .shared
543                .space
544                .wait_timeout(state, self.write_timeout)
545                .expect("output stream buffer poisoned while waiting");
546            state = new_state;
547            if timeout.timed_out()
548                && state.chunks.len() >= OUTPUT_STREAM_BUFFER_CAPACITY
549                && state.terminal.is_none()
550            {
551                return Err(io::Error::new(
552                    io::ErrorKind::TimedOut,
553                    format!(
554                        "timed out trying to write data to stream after {:?}",
555                        self.write_timeout
556                    ),
557                ));
558            }
559        }
560    }
561
562    fn flush(&mut self) -> io::Result<()> {
563        Ok(())
564    }
565}
566
567impl Drop for OutputStreamHandle {
568    fn drop(&mut self) {
569        self.shared.cancelled.store(true, Ordering::SeqCst);
570        let _ = self.close();
571    }
572}
573
574/// Bridges between `std::io::Read`/`Write` and Datum byte streams. Mirrors Akka's
575/// `StreamConverters`.
576///
577/// `from_reader`/`to_writer` adapt blocking readers/writers into a `Source`/`Sink`;
578/// `as_input_stream`/`as_output_stream` go the other way, materializing a blocking
579/// [`InputStreamHandle`]/[`OutputStreamHandle`] over a stream. All factory closures run at
580/// materialization, never at blueprint construction.
581pub struct StreamConverters;
582
583impl StreamConverters {
584    /// Wraps any blocking `std::io::Read` as a `Source<Vec<u8>>` emitting up to `chunk_size` bytes
585    /// per element.
586    ///
587    /// `factory` is invoked at materialization and the reader is driven on the Datum thread pool,
588    /// with a small bounded reader queue so a slow consumer backpressures the reader instead of
589    /// buffering unboundedly. Read errors fail the stream. Panics if `chunk_size == 0`.
590    #[must_use]
591    pub fn from_reader<F, R>(factory: F, chunk_size: usize) -> Source<Vec<u8>>
592    where
593        F: Fn() -> std::io::Result<R> + Send + Sync + 'static,
594        R: Read + Send + 'static,
595    {
596        assert!(chunk_size > 0, "chunk size must be greater than zero");
597        Source::from_materialized_factory(move |materializer| {
598            let reader = factory().map_err(io_error)?;
599            let queue = SourceQueue::new();
600            let queue_for_worker = Arc::clone(&queue);
601            let cancelled = Arc::clone(&queue.cancelled);
602            let completion = materializer.spawn_stream(move |_worker_cancelled| {
603                let mut reader = reader;
604                let mut guard = ReaderWorkerGuard::new(Arc::clone(&queue_for_worker));
605                let mut buffer = vec![0_u8; chunk_size];
606
607                loop {
608                    if cancelled.load(Ordering::SeqCst) {
609                        guard.disarm();
610                        return Ok(NotUsed);
611                    }
612
613                    match reader.read(&mut buffer) {
614                        Ok(0) => {
615                            guard.disarm();
616                            queue_for_worker.finish(SourceTerminal::Complete);
617                            return Ok(NotUsed);
618                        }
619                        Ok(read) => {
620                            if !queue_for_worker.push(buffer[..read].to_vec()) {
621                                guard.disarm();
622                                return Ok(NotUsed);
623                            }
624                        }
625                        Err(error) => {
626                            guard.disarm();
627                            queue_for_worker.finish(SourceTerminal::Error(io_error(error)));
628                            return Ok(NotUsed);
629                        }
630                    }
631                }
632            });
633
634            Ok((
635                Box::new(ReaderSourceStream {
636                    queue,
637                    completion: Some(completion),
638                }) as BoxStream<Vec<u8>>,
639                NotUsed,
640            ))
641        })
642    }
643
644    /// Wraps any blocking `std::io::Write` as a `Sink<Vec<u8>>`, writing each upstream chunk in
645    /// order.
646    ///
647    /// `factory` runs at materialization. The writer is flushed exactly once — on completion or on
648    /// error — and dropped afterwards; upstream errors are forwarded to the materialized
649    /// [`StreamCompletion`].
650    #[must_use]
651    pub fn to_writer<F, W>(factory: F) -> Sink<Vec<u8>, StreamCompletion<NotUsed>>
652    where
653        F: Fn() -> std::io::Result<W> + Send + Sync + 'static,
654        W: Write + Send + 'static,
655    {
656        Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
657            let writer = WriterGuard::new(factory().map_err(io_error)?);
658            Ok(materializer.spawn_stream(move |cancelled| {
659                let mut input = input;
660                let mut writer = writer;
661                loop {
662                    if cancelled.load(Ordering::SeqCst) {
663                        let _ = writer.flush_once();
664                        return Err(StreamError::Cancelled);
665                    }
666
667                    match input.next() {
668                        Some(Ok(chunk)) => {
669                            writer.writer_mut().write_all(&chunk).map_err(io_error)?
670                        }
671                        Some(Err(error)) => {
672                            let _ = writer.flush_once();
673                            return Err(error);
674                        }
675                        None => {
676                            writer.flush_once()?;
677                            return Ok(NotUsed);
678                        }
679                    }
680                }
681            }))
682        })
683    }
684
685    /// Creates a `Sink` which when materialized returns an [`InputStreamHandle`] that can
686    /// be used to read the values produced by the stream this sink is attached to.
687    ///
688    /// This sink bridges Datum streams to synchronous blocking code. The materialized
689    /// handle implements `std::io::Read`: each `read()` call blocks the caller (up to
690    /// `read_timeout`) until upstream produces a chunk, then copies as many bytes as
691    /// possible into the provided buffer. Partial reads are supported — the handle
692    /// retains any leftover bytes from a previous chunk and serves them on the next call.
693    ///
694    /// EOF is signalled by `read()` returning `Ok(0)` when the stream completes. Stream
695    /// errors surface as `io::Error`. Dropping the handle cancels the stream.
696    ///
697    /// Matches Akka `StreamConverters.asInputStream` semantics.
698    #[must_use]
699    pub fn as_input_stream(read_timeout: Duration) -> Sink<Vec<u8>, InputStreamHandle> {
700        Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
701            let shared = Arc::new(InputStreamShared::new());
702            let shared_for_worker = Arc::clone(&shared);
703
704            let completion = materializer.spawn_stream(move |_task_cancelled| {
705                let mut input = input;
706                loop {
707                    if shared_for_worker.cancelled.load(Ordering::SeqCst) {
708                        return Ok(NotUsed);
709                    }
710
711                    match input.next() {
712                        Some(Ok(chunk)) => {
713                            let mut state = shared_for_worker
714                                .state
715                                .lock()
716                                .expect("input stream buffer poisoned");
717                            while state.chunks.len() >= INPUT_STREAM_BUFFER_CAPACITY
718                                && state.terminal.is_none()
719                                && !shared_for_worker.cancelled.load(Ordering::SeqCst)
720                            {
721                                state = shared_for_worker
722                                    .space
723                                    .wait(state)
724                                    .expect("input stream buffer poisoned while waiting");
725                            }
726
727                            if state.terminal.is_some()
728                                || shared_for_worker.cancelled.load(Ordering::SeqCst)
729                            {
730                                return Ok(NotUsed);
731                            }
732
733                            if !chunk.is_empty() {
734                                state.chunks.push_back(chunk);
735                            }
736                            drop(state);
737                            shared_for_worker.available.notify_all();
738                        }
739                        Some(Err(e)) => {
740                            shared_for_worker.set_terminal(InputStreamTerminal::Error(e));
741                            return Ok(NotUsed);
742                        }
743                        None => {
744                            shared_for_worker.set_terminal(InputStreamTerminal::Complete);
745                            return Ok(NotUsed);
746                        }
747                    }
748                }
749            });
750
751            Ok(InputStreamHandle::new(shared, read_timeout, completion))
752        })
753    }
754
755    /// Creates a `Source` which when materialized returns an [`OutputStreamHandle`] that
756    /// can be used to write bytes into the stream.
757    ///
758    /// This source is intended for inter-operation with blocking APIs. The materialized
759    /// handle implements `std::io::Write`: each `write()` call blocks (up to
760    /// `write_timeout`) until the stream has capacity, then produces one stream element
761    /// from the written bytes. Backpressure is respected — writes block when the internal
762    /// buffer is full.
763    ///
764    /// `flush()` is a no-op. [`OutputStreamHandle::close()`] completes the stream (signals
765    /// EOF to downstream). Dropping the handle cancels the stream.
766    ///
767    /// Matches Akka `StreamConverters.asOutputStream` semantics.
768    #[must_use]
769    pub fn as_output_stream(write_timeout: Duration) -> Source<Vec<u8>, OutputStreamHandle> {
770        Source::from_materialized_factory(move |_materializer| {
771            let shared = Arc::new(OutputStreamShared::new());
772            let handle = OutputStreamHandle::new(Arc::clone(&shared), write_timeout);
773            let stream = OutputStreamSourceStream {
774                shared,
775                done: false,
776            };
777            Ok((Box::new(stream) as BoxStream<Vec<u8>>, handle))
778        })
779    }
780}
781
782/// Synchronous, low-overhead file source and sink, backed by [`StreamConverters`] over
783/// `std::fs::File`. Mirrors Akka's `FileIO`; see [`super::TokioFileIO`] for the async variant.
784pub struct FileIO;
785
786impl FileIO {
787    /// Reads `path` into `Vec<u8>` chunks of at most `chunk_size` bytes. The file is opened at
788    /// materialization; an open failure fails the stream.
789    #[must_use]
790    pub fn from_path(path: impl Into<PathBuf>, chunk_size: usize) -> Source<Vec<u8>> {
791        let path = path.into();
792        StreamConverters::from_reader(move || File::open(&path), chunk_size)
793    }
794
795    /// [`FileIO::from_path`] with the default 8 KiB chunk size.
796    #[must_use]
797    pub fn from_path_default(path: impl Into<PathBuf>) -> Source<Vec<u8>> {
798        Self::from_path(path, DEFAULT_CHUNK_SIZE)
799    }
800
801    /// Writes the byte stream to `path`, creating the file if absent and truncating it otherwise.
802    /// The materialized [`StreamCompletion`] resolves once the file is flushed and closed.
803    #[must_use]
804    pub fn to_path(path: impl Into<PathBuf>) -> Sink<Vec<u8>, StreamCompletion<NotUsed>> {
805        let path = path.into();
806        StreamConverters::to_writer(move || {
807            OpenOptions::new()
808                .create(true)
809                .truncate(true)
810                .write(true)
811                .open(&path)
812        })
813    }
814}
815
816#[cfg(test)]
817mod tests {
818    use super::*;
819    use crate::Source;
820    use crate::testkit::TestSink;
821    use std::io::Cursor;
822    use std::sync::atomic::{AtomicU64, AtomicUsize};
823    use std::thread;
824    use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
825
826    fn wait_for_counter(counter: &AtomicUsize, expected: usize) {
827        let deadline = std::time::Instant::now() + Duration::from_secs(1);
828        while std::time::Instant::now() < deadline {
829            if counter.load(Ordering::SeqCst) == expected {
830                return;
831            }
832            thread::sleep(Duration::from_millis(5));
833        }
834        assert_eq!(counter.load(Ordering::SeqCst), expected);
835    }
836
837    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
838        let deadline = Instant::now() + timeout;
839        while Instant::now() < deadline {
840            if condition() {
841                return true;
842            }
843            thread::sleep(Duration::from_millis(5));
844        }
845        condition()
846    }
847
848    fn unique_temp_path(name: &str) -> PathBuf {
849        let nanos = SystemTime::now()
850            .duration_since(UNIX_EPOCH)
851            .expect("clock after epoch")
852            .as_nanos();
853        std::env::temp_dir().join(format!(
854            "datum-wp12-{name}-{}-{nanos}.bin",
855            std::process::id()
856        ))
857    }
858
859    struct CountingReader {
860        inner: Cursor<Vec<u8>>,
861        drops: Arc<AtomicUsize>,
862    }
863
864    impl Read for CountingReader {
865        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
866            self.inner.read(buf)
867        }
868    }
869
870    impl Drop for CountingReader {
871        fn drop(&mut self) {
872            self.drops.fetch_add(1, Ordering::SeqCst);
873        }
874    }
875
876    struct CountingWriter {
877        writes: Arc<Mutex<Vec<Vec<u8>>>>,
878        flushes: Arc<AtomicUsize>,
879        drops: Arc<AtomicUsize>,
880        fail_write: bool,
881    }
882
883    impl Write for CountingWriter {
884        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
885            if self.fail_write {
886                return Err(std::io::Error::other("writer boom"));
887            }
888            self.writes
889                .lock()
890                .expect("writer log poisoned")
891                .push(buf.to_vec());
892            Ok(buf.len())
893        }
894
895        fn flush(&mut self) -> std::io::Result<()> {
896            self.flushes.fetch_add(1, Ordering::SeqCst);
897            Ok(())
898        }
899    }
900
901    impl Drop for CountingWriter {
902        fn drop(&mut self) {
903            self.drops.fetch_add(1, Ordering::SeqCst);
904        }
905    }
906
907    struct CountingChunkReader {
908        inner: Cursor<Vec<u8>>,
909        chunk_size: usize,
910        reads: Arc<AtomicUsize>,
911    }
912
913    impl Read for CountingChunkReader {
914        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
915            self.reads.fetch_add(1, Ordering::SeqCst);
916            let mut chunk = vec![0_u8; self.chunk_size.min(buf.len())];
917            let read = self.inner.read(&mut chunk)?;
918            buf[..read].copy_from_slice(&chunk[..read]);
919            Ok(read)
920        }
921    }
922
923    #[test]
924    fn from_reader_emits_chunked_bytes_and_completes() {
925        let sink = StreamConverters::from_reader(|| Ok(Cursor::new(b"abcdef".to_vec())), 2)
926            .run_with(TestSink::probe())
927            .expect("reader source materializes");
928
929        sink.request(4);
930        sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec(), b"ef".to_vec()]);
931        sink.expect_complete();
932    }
933
934    #[test]
935    fn from_reader_closes_exactly_once_on_completion() {
936        let drops = Arc::new(AtomicUsize::new(0));
937        let drops_for_reader = Arc::clone(&drops);
938        let sink = StreamConverters::from_reader(
939            move || {
940                Ok(CountingReader {
941                    inner: Cursor::new(b"hello".to_vec()),
942                    drops: Arc::clone(&drops_for_reader),
943                })
944            },
945            8,
946        )
947        .run_with(TestSink::probe())
948        .expect("reader source materializes");
949
950        sink.request(2);
951        sink.assert_next(b"hello".to_vec());
952        sink.expect_complete();
953        wait_for_counter(&drops, 1);
954    }
955
956    #[test]
957    fn from_reader_closes_exactly_once_on_cancellation() {
958        let drops = Arc::new(AtomicUsize::new(0));
959        let drops_for_reader = Arc::clone(&drops);
960        let mut sink = StreamConverters::from_reader(
961            move || {
962                Ok(CountingReader {
963                    inner: Cursor::new(vec![1_u8; 32]),
964                    drops: Arc::clone(&drops_for_reader),
965                })
966            },
967            4,
968        )
969        .run_with(TestSink::probe())
970        .expect("reader source materializes");
971
972        sink.request(1);
973        sink.assert_next(vec![1_u8; 4]);
974        sink.cancel();
975        wait_for_counter(&drops, 1);
976    }
977
978    #[test]
979    fn from_reader_surfaces_read_failure() {
980        struct FailingReader;
981
982        impl Read for FailingReader {
983            fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
984                Err(std::io::Error::other("reader boom"))
985            }
986        }
987
988        let sink = StreamConverters::from_reader(|| Ok(FailingReader), 8)
989            .run_with(TestSink::probe())
990            .expect("reader source materializes");
991
992        sink.request(1);
993        assert_eq!(
994            sink.expect_error(),
995            StreamError::Failed("reader boom".to_owned())
996        );
997    }
998
999    #[test]
1000    fn from_reader_caps_buffered_read_ahead_for_slow_consumers() {
1001        let payload: Vec<u8> = (0..64_u8).cycle().take(8192 * 16).collect();
1002        let payload_for_reader = payload.clone();
1003        let reads = Arc::new(AtomicUsize::new(0));
1004        let reads_for_reader = Arc::clone(&reads);
1005        let sink = StreamConverters::from_reader(
1006            move || {
1007                Ok(CountingChunkReader {
1008                    inner: Cursor::new(payload_for_reader.clone()),
1009                    chunk_size: 256,
1010                    reads: Arc::clone(&reads_for_reader),
1011                })
1012            },
1013            256,
1014        )
1015        .run_with(TestSink::probe())
1016        .expect("reader source materializes");
1017
1018        sink.request(1);
1019        let first = sink.expect_next();
1020        assert_eq!(first.len(), 256);
1021
1022        let last_seen = Arc::new(AtomicUsize::new(0));
1023        let quiet_since_ms = Arc::new(AtomicU64::new(0));
1024        let start = Instant::now();
1025        assert!(wait_until(Duration::from_secs(2), {
1026            let last_seen = Arc::clone(&last_seen);
1027            let quiet_since_ms = Arc::clone(&quiet_since_ms);
1028            let reads = Arc::clone(&reads);
1029            move || {
1030                let current = reads.load(Ordering::SeqCst);
1031                let last = last_seen.load(Ordering::SeqCst);
1032                if current != last {
1033                    last_seen.store(current, Ordering::SeqCst);
1034                    quiet_since_ms.store(start.elapsed().as_millis() as u64, Ordering::SeqCst);
1035                    return false;
1036                }
1037
1038                let quiet_for =
1039                    start.elapsed().as_millis() as u64 - quiet_since_ms.load(Ordering::SeqCst);
1040                current > 0 && quiet_for >= 100
1041            }
1042        }));
1043
1044        assert!(
1045            reads.load(Ordering::SeqCst) <= READER_QUEUE_CAPACITY + 2,
1046            "reader should plateau near the bounded queue capacity"
1047        );
1048
1049        sink.request(usize::MAX);
1050        let mut collected = first;
1051        for chunk in sink.drain_until_complete() {
1052            collected.extend_from_slice(&chunk);
1053        }
1054        assert_eq!(collected, payload);
1055    }
1056
1057    #[test]
1058    fn to_writer_writes_all_chunks_and_flushes_once() {
1059        let writes = Arc::new(Mutex::new(Vec::new()));
1060        let flushes = Arc::new(AtomicUsize::new(0));
1061        let drops = Arc::new(AtomicUsize::new(0));
1062        let completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
1063            .run_with(StreamConverters::to_writer({
1064                let writes = Arc::clone(&writes);
1065                let flushes = Arc::clone(&flushes);
1066                let drops = Arc::clone(&drops);
1067                move || {
1068                    Ok(CountingWriter {
1069                        writes: Arc::clone(&writes),
1070                        flushes: Arc::clone(&flushes),
1071                        drops: Arc::clone(&drops),
1072                        fail_write: false,
1073                    })
1074                }
1075            }))
1076            .expect("writer sink materializes");
1077
1078        completion.wait().expect("writer sink completes");
1079        assert_eq!(
1080            writes.lock().expect("writes poisoned").as_slice(),
1081            &[b"ab".to_vec(), b"cd".to_vec()]
1082        );
1083        assert_eq!(flushes.load(Ordering::SeqCst), 1);
1084        assert_eq!(drops.load(Ordering::SeqCst), 1);
1085    }
1086
1087    #[test]
1088    fn to_writer_flushes_and_drops_once_on_failure() {
1089        let flushes = Arc::new(AtomicUsize::new(0));
1090        let drops = Arc::new(AtomicUsize::new(0));
1091        let completion = Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
1092            .run_with(StreamConverters::to_writer({
1093                let flushes = Arc::clone(&flushes);
1094                let drops = Arc::clone(&drops);
1095                move || {
1096                    Ok(CountingWriter {
1097                        writes: Arc::new(Mutex::new(Vec::new())),
1098                        flushes: Arc::clone(&flushes),
1099                        drops: Arc::clone(&drops),
1100                        fail_write: false,
1101                    })
1102                }
1103            }))
1104            .expect("writer sink materializes");
1105
1106        assert_eq!(
1107            completion.wait(),
1108            Err(StreamError::Failed("upstream boom".to_owned()))
1109        );
1110        assert_eq!(flushes.load(Ordering::SeqCst), 1);
1111        assert_eq!(drops.load(Ordering::SeqCst), 1);
1112    }
1113
1114    #[test]
1115    fn to_writer_flushes_and_drops_once_on_cancellation() {
1116        let flushes = Arc::new(AtomicUsize::new(0));
1117        let drops = Arc::new(AtomicUsize::new(0));
1118        let completion = Source::repeat(vec![7_u8; 4])
1119            .run_with(StreamConverters::to_writer({
1120                let flushes = Arc::clone(&flushes);
1121                let drops = Arc::clone(&drops);
1122                move || {
1123                    Ok(CountingWriter {
1124                        writes: Arc::new(Mutex::new(Vec::new())),
1125                        flushes: Arc::clone(&flushes),
1126                        drops: Arc::clone(&drops),
1127                        fail_write: false,
1128                    })
1129                }
1130            }))
1131            .expect("writer sink materializes");
1132
1133        drop(completion);
1134        wait_for_counter(&flushes, 1);
1135        wait_for_counter(&drops, 1);
1136    }
1137
1138    #[test]
1139    fn to_writer_surfaces_write_failure() {
1140        let completion = Source::single(vec![1_u8])
1141            .run_with(StreamConverters::to_writer(|| {
1142                Ok(CountingWriter {
1143                    writes: Arc::new(Mutex::new(Vec::new())),
1144                    flushes: Arc::new(AtomicUsize::new(0)),
1145                    drops: Arc::new(AtomicUsize::new(0)),
1146                    fail_write: true,
1147                })
1148            }))
1149            .expect("writer sink materializes");
1150
1151        assert_eq!(
1152            completion.wait(),
1153            Err(StreamError::Failed("writer boom".to_owned()))
1154        );
1155    }
1156
1157    #[test]
1158    fn file_io_round_trips_bytes() {
1159        let path = unique_temp_path("roundtrip");
1160        let write_completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
1161            .run_with(FileIO::to_path(path.clone()))
1162            .expect("file sink materializes");
1163        write_completion.wait().expect("file write completes");
1164
1165        let sink = FileIO::from_path(path.clone(), 2)
1166            .run_with(TestSink::probe())
1167            .expect("file source materializes");
1168        sink.request(4);
1169        sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
1170        sink.expect_complete();
1171
1172        std::fs::remove_file(path).expect("remove roundtrip file");
1173    }
1174
1175    #[test]
1176    fn file_io_source_surfaces_open_failure() {
1177        let missing = unique_temp_path("missing");
1178        let result = FileIO::from_path(missing, 4).run_with(TestSink::probe());
1179        assert!(matches!(result, Err(StreamError::Failed(_))));
1180    }
1181
1182    #[test]
1183    fn file_io_sink_creates_and_truncates_file() {
1184        let path = unique_temp_path("truncate");
1185        std::fs::write(&path, b"stale bytes").expect("seed file");
1186
1187        let completion = Source::single(b"ok".to_vec())
1188            .run_with(FileIO::to_path(path.clone()))
1189            .expect("file sink materializes");
1190        completion.wait().expect("file write completes");
1191        assert_eq!(std::fs::read(&path).expect("read file"), b"ok");
1192
1193        std::fs::remove_file(path).expect("remove truncate file");
1194    }
1195
1196    #[test]
1197    fn file_io_source_default_chunk_size_works() {
1198        let path = unique_temp_path("default");
1199        std::fs::write(&path, b"hi").expect("write seed file");
1200
1201        let sink = FileIO::from_path_default(path.clone())
1202            .run_with(TestSink::probe())
1203            .expect("file source materializes");
1204        sink.request(2);
1205        sink.assert_next(b"hi".to_vec());
1206        sink.expect_complete();
1207
1208        std::fs::remove_file(path).expect("remove default file");
1209    }
1210
1211    // ── as_input_stream / as_output_stream tests ───────────────────────────
1212
1213    #[test]
1214    fn as_input_stream_reads_data_written_by_stream() {
1215        let mut handle: InputStreamHandle =
1216            Source::from_iter([b"hello".to_vec(), b"world".to_vec()])
1217                .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1218                .expect("input stream sink materializes");
1219
1220        // A single read() may return fewer bytes than requested (std::io::Read
1221        // contract); under scheduler load the second chunk may not be buffered
1222        // yet. Drain in a loop until EOF.
1223        let mut buf = [0_u8; 32];
1224        let mut total = 0_usize;
1225        loop {
1226            let n = handle.read(&mut buf[total..]).expect("read succeeds");
1227            if n == 0 {
1228                break;
1229            }
1230            total += n;
1231        }
1232        assert_eq!(&buf[..total], b"helloworld");
1233    }
1234
1235    #[test]
1236    fn as_input_stream_eof_when_stream_completes() {
1237        let mut handle: InputStreamHandle = Source::from_iter([b"x".to_vec()])
1238            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1239            .expect("input stream sink materializes");
1240
1241        let mut buf = [0_u8; 4];
1242        let n = handle.read(&mut buf).expect("first read succeeds");
1243        assert_eq!(&buf[..n], b"x");
1244
1245        // EOF
1246        let n = handle.read(&mut buf).expect("second read returns eof");
1247        assert_eq!(n, 0);
1248    }
1249
1250    #[test]
1251    fn as_input_stream_partial_reads_work() {
1252        let mut handle: InputStreamHandle = Source::single(b"abcde".to_vec())
1253            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1254            .expect("input stream sink materializes");
1255
1256        let mut small = [0_u8; 2];
1257        let n = handle.read(&mut small).expect("small read succeeds");
1258        assert_eq!(n, 2);
1259        assert_eq!(&small[..], b"ab");
1260
1261        // remainder from same chunk
1262        let n = handle.read(&mut small).expect("second small read succeeds");
1263        assert_eq!(n, 2);
1264        assert_eq!(&small[..], b"cd");
1265
1266        // final byte
1267        let n = handle.read(&mut small).expect("third small read succeeds");
1268        assert_eq!(n, 1);
1269        assert_eq!(&small[..1], b"e");
1270
1271        // EOF
1272        let n = handle.read(&mut small).expect("fourth read returns eof");
1273        assert_eq!(n, 0);
1274    }
1275
1276    #[test]
1277    fn as_input_stream_error_surfaces_as_io_error() {
1278        let mut handle: InputStreamHandle =
1279            Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
1280                .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1281                .expect("input stream sink materializes");
1282
1283        let mut buf = [0_u8; 8];
1284        let err = handle.read(&mut buf).expect_err("read surfaces error");
1285        let msg = err.to_string();
1286        assert!(msg.contains("upstream boom"), "got: {msg}");
1287    }
1288
1289    #[test]
1290    fn as_input_stream_cancellation_stops_reads() {
1291        let mut handle: InputStreamHandle = Source::repeat(b"x".to_vec())
1292            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1293            .expect("input stream sink materializes");
1294
1295        let mut buf = [0_u8; 3];
1296        let n = handle.read(&mut buf).expect("first read succeeds");
1297        // A single read() may return fewer bytes than requested; under load not
1298        // all repeated chunks are buffered yet. We only need some data to confirm
1299        // reads work before drop cancels the stream.
1300        assert!((1..=3).contains(&n), "expected 1..=3 bytes, got {n}");
1301
1302        drop(handle);
1303
1304        // After drop, the handle is gone — the worker thread was signalled to stop
1305        // and the shared state was cleaned up. No further reads possible.
1306    }
1307
1308    #[test]
1309    fn as_input_stream_read_timeout_returns_timed_out() {
1310        // A source that produces nothing and never completes — read should time out.
1311        let mut handle: InputStreamHandle = Source::<Vec<u8>>::never()
1312            .run_with(StreamConverters::as_input_stream(Duration::from_millis(10)))
1313            .expect("input stream sink materializes");
1314
1315        let mut buf = [0_u8; 4];
1316        let err = handle.read(&mut buf).expect_err("read times out");
1317        assert_eq!(err.kind(), io::ErrorKind::TimedOut);
1318    }
1319
1320    #[test]
1321    fn as_output_stream_writes_data_appear_in_stream() {
1322        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1323            .to_mat(Sink::collect(), crate::Keep::both)
1324            .run()
1325            .expect("output stream source materializes");
1326
1327        handle.write_all(b"alpha").expect("first write succeeds");
1328        handle.write_all(b"beta").expect("second write succeeds");
1329        handle.close().expect("close succeeds");
1330
1331        let chunks = completion.wait().expect("stream completes");
1332        assert_eq!(chunks, vec![b"alpha".to_vec(), b"beta".to_vec()]);
1333    }
1334
1335    #[test]
1336    fn as_output_stream_close_completes_stream() {
1337        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1338            .to_mat(Sink::collect(), crate::Keep::both)
1339            .run()
1340            .expect("output stream source materializes");
1341
1342        handle.write_all(b"done").expect("write succeeds");
1343        let result = handle.close();
1344        assert!(result.is_ok());
1345
1346        let chunks = completion.wait().expect("stream completes after close");
1347        assert_eq!(chunks, vec![b"done".to_vec()]);
1348    }
1349
1350    #[test]
1351    fn as_output_stream_write_after_close_fails() {
1352        let (mut handle, _completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1353            .to_mat(Sink::ignore(), crate::Keep::both)
1354            .run()
1355            .expect("output stream source materializes");
1356
1357        handle.close().expect("first close succeeds");
1358        let err = handle.write(b"late").expect_err("write after close fails");
1359        assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
1360    }
1361
1362    #[test]
1363    fn as_output_stream_cancellation_stops_writes() {
1364        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1365            .to_mat(Sink::ignore(), crate::Keep::both)
1366            .run()
1367            .expect("output stream source materializes");
1368
1369        handle.write_all(b"ok").expect("write succeeds");
1370
1371        // Drop the stream consumer side — this cancels the stream
1372        drop(completion);
1373
1374        // Give cancellation time to propagate
1375        let deadline = std::time::Instant::now() + Duration::from_secs(1);
1376        let mut last_err = None;
1377        while std::time::Instant::now() < deadline {
1378            match handle.write(b"after cancel") {
1379                Err(e) => {
1380                    last_err = Some(e);
1381                    break;
1382                }
1383                _ => thread::sleep(Duration::from_millis(5)),
1384            }
1385        }
1386        let err = last_err.expect("write after cancellation should fail");
1387        assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
1388    }
1389
1390    #[test]
1391    fn as_output_stream_write_timeout_returns_timed_out() {
1392        // Use a sink that blocks without consuming — this keeps the graph alive
1393        // while the output buffer fills up so writes time out.
1394        let hang_sink: Sink<Vec<u8>, StreamCompletion<NotUsed>> =
1395            Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
1396                Ok(materializer.spawn_stream(move |cancelled| {
1397                    let _input = input;
1398                    loop {
1399                        if cancelled.load(Ordering::SeqCst) {
1400                            return Ok(NotUsed);
1401                        }
1402                        thread::sleep(Duration::from_millis(1));
1403                    }
1404                }))
1405            });
1406        let (mut handle, _hang_completion) =
1407            StreamConverters::as_output_stream(Duration::from_millis(50))
1408                .to_mat(hang_sink, crate::Keep::both)
1409                .run()
1410                .expect("output stream source materializes");
1411
1412        let capacity = 16_usize;
1413
1414        // Fill the buffer to capacity
1415        for _ in 0..capacity {
1416            handle.write_all(b"x").expect("buffer-fill write succeeds");
1417        }
1418
1419        // This write should time out because no consumer is draining
1420        let err = handle.write(b"overflow").expect_err("write times out");
1421        assert_eq!(err.kind(), io::ErrorKind::TimedOut);
1422    }
1423
1424    #[test]
1425    fn as_output_stream_flush_is_noop() {
1426        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1427            .to_mat(Sink::collect(), crate::Keep::both)
1428            .run()
1429            .expect("output stream source materializes");
1430
1431        handle.write_all(b"data").expect("write succeeds");
1432        handle.flush().expect("flush is a noop");
1433        handle.close().expect("close succeeds");
1434
1435        let chunks = completion.wait().expect("stream completes");
1436        assert_eq!(chunks, vec![b"data".to_vec()]);
1437    }
1438
1439    #[test]
1440    fn as_output_stream_drop_completes_stream() {
1441        let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
1442            .to_mat(Sink::collect(), crate::Keep::both)
1443            .run()
1444            .expect("output stream source materializes");
1445
1446        handle.write_all(b"drop-me").expect("write succeeds");
1447        drop(handle);
1448
1449        let chunks = completion.wait().expect("stream completes after drop");
1450        assert_eq!(chunks, vec![b"drop-me".to_vec()]);
1451    }
1452
1453    #[test]
1454    fn round_trip_output_stream_to_input_stream() {
1455        // Write bytes to an output stream handle, pipe through the stream,
1456        // read them back from the input stream handle.
1457        let (mut out_handle, mut in_handle): (OutputStreamHandle, InputStreamHandle) =
1458            StreamConverters::as_output_stream(Duration::from_secs(5))
1459                .to_mat(
1460                    StreamConverters::as_input_stream(Duration::from_secs(5)),
1461                    crate::Keep::both,
1462                )
1463                .run()
1464                .expect("round-trip stream materializes");
1465
1466        out_handle.write_all(b"round").expect("write round");
1467        out_handle.write_all(b"trip").expect("write trip");
1468        out_handle.close().expect("close output");
1469
1470        let mut buf = [0_u8; 16];
1471        let mut total = 0_usize;
1472        loop {
1473            let n = in_handle.read(&mut buf[total..]).expect("read");
1474            if n == 0 {
1475                break;
1476            }
1477            total += n;
1478        }
1479        assert_eq!(&buf[..total], b"roundtrip");
1480    }
1481
1482    #[test]
1483    fn as_input_stream_empty_buf_read_returns_zero() {
1484        let mut handle: InputStreamHandle = Source::single(b"abc".to_vec())
1485            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1486            .expect("input stream sink materializes");
1487
1488        let n = handle.read(&mut []).expect("empty read succeeds");
1489        assert_eq!(n, 0);
1490    }
1491
1492    #[test]
1493    fn as_input_stream_large_read_across_multiple_chunks() {
1494        // Source produces many small chunks; drain with a read loop
1495        // because std::io::Read may return fewer bytes than requested.
1496        let chunks: Vec<Vec<u8>> = (0..10).map(|i| vec![i as u8; 3]).collect();
1497        let total_bytes: usize = chunks.iter().map(|c| c.len()).sum();
1498
1499        let mut handle: InputStreamHandle = Source::from_iter(chunks)
1500            .run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
1501            .expect("input stream sink materializes");
1502
1503        let mut buf = vec![0_u8; total_bytes];
1504        let mut total = 0_usize;
1505        loop {
1506            let n = handle.read(&mut buf[total..]).expect("large read succeeds");
1507            if n == 0 {
1508                break;
1509            }
1510            total += n;
1511        }
1512        assert_eq!(total, total_bytes);
1513    }
1514}