use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
use crate::{StreamError, StreamResult};
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Write};
use std::path::PathBuf;
use std::sync::{
Arc, Condvar, Mutex,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
const DEFAULT_CHUNK_SIZE: usize = 8192;
const READER_QUEUE_CAPACITY: usize = 8;
const INPUT_STREAM_BUFFER_CAPACITY: usize = 16;
const OUTPUT_STREAM_BUFFER_CAPACITY: usize = 16;
fn io_error(error: std::io::Error) -> StreamError {
StreamError::Failed(error.to_string())
}
#[derive(Clone)]
enum SourceTerminal {
Complete,
Error(StreamError),
}
struct SourceQueueState {
queue: VecDeque<Vec<u8>>,
terminal: Option<SourceTerminal>,
}
struct SourceQueue {
state: Mutex<SourceQueueState>,
available: Condvar,
space: Condvar,
capacity: usize,
cancelled: Arc<AtomicBool>,
}
impl SourceQueue {
fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(SourceQueueState {
queue: VecDeque::new(),
terminal: None,
}),
available: Condvar::new(),
space: Condvar::new(),
capacity: READER_QUEUE_CAPACITY,
cancelled: Arc::new(AtomicBool::new(false)),
})
}
fn push(&self, chunk: Vec<u8>) -> bool {
let mut state = self.state.lock().expect("io source queue poisoned");
while state.queue.len() >= self.capacity
&& state.terminal.is_none()
&& !self.cancelled.load(Ordering::SeqCst)
{
state = self
.space
.wait(state)
.expect("io source queue poisoned while waiting for space");
}
if state.terminal.is_some() || self.cancelled.load(Ordering::SeqCst) {
return false;
}
if state.terminal.is_none() {
state.queue.push_back(chunk);
}
drop(state);
self.available.notify_all();
true
}
fn finish(&self, terminal: SourceTerminal) {
let mut state = self.state.lock().expect("io source queue poisoned");
if state.terminal.is_none() {
state.terminal = Some(terminal);
}
drop(state);
self.available.notify_all();
self.space.notify_all();
}
}
struct ReaderWorkerGuard {
queue: Arc<SourceQueue>,
armed: bool,
}
impl ReaderWorkerGuard {
fn new(queue: Arc<SourceQueue>) -> Self {
Self { queue, armed: true }
}
fn disarm(&mut self) {
self.armed = false;
}
}
impl Drop for ReaderWorkerGuard {
fn drop(&mut self) {
if self.armed {
self.queue
.finish(SourceTerminal::Error(StreamError::AbruptTermination));
}
}
}
struct ReaderSourceStream {
queue: Arc<SourceQueue>,
completion: Option<StreamCompletion<NotUsed>>,
}
impl Iterator for ReaderSourceStream {
type Item = StreamResult<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self.queue.state.lock().expect("io source queue poisoned");
loop {
if let Some(chunk) = state.queue.pop_front() {
self.queue.space.notify_all();
return Some(Ok(chunk));
}
if let Some(terminal) = state.terminal.clone() {
return match terminal {
SourceTerminal::Complete => None,
SourceTerminal::Error(error) => Some(Err(error)),
};
}
state = self
.queue
.available
.wait(state)
.expect("io source queue poisoned while waiting");
}
}
}
impl Drop for ReaderSourceStream {
fn drop(&mut self) {
self.queue.cancelled.store(true, Ordering::SeqCst);
drop(self.queue.state.lock().unwrap_or_else(|p| p.into_inner()));
self.queue.available.notify_all();
self.queue.space.notify_all();
let _ = self.completion.take();
}
}
struct WriterGuard<W: Write> {
writer: W,
flushed: bool,
}
impl<W: Write> WriterGuard<W> {
fn new(writer: W) -> Self {
Self {
writer,
flushed: false,
}
}
fn writer_mut(&mut self) -> &mut W {
&mut self.writer
}
fn flush_once(&mut self) -> StreamResult<()> {
if self.flushed {
return Ok(());
}
self.writer.flush().map_err(io_error)?;
self.flushed = true;
Ok(())
}
}
impl<W: Write> Drop for WriterGuard<W> {
fn drop(&mut self) {
let _ = self.flush_once();
}
}
#[derive(Clone)]
enum InputStreamTerminal {
Complete,
Error(StreamError),
}
struct InputStreamBufferState {
chunks: VecDeque<Vec<u8>>,
terminal: Option<InputStreamTerminal>,
}
struct InputStreamShared {
state: Mutex<InputStreamBufferState>,
available: Condvar,
space: Condvar,
cancelled: AtomicBool,
}
impl InputStreamShared {
fn new() -> Self {
Self {
state: Mutex::new(InputStreamBufferState {
chunks: VecDeque::new(),
terminal: None,
}),
available: Condvar::new(),
space: Condvar::new(),
cancelled: AtomicBool::new(false),
}
}
fn set_terminal(&self, terminal: InputStreamTerminal) {
let mut state = self.state.lock().expect("input stream buffer poisoned");
if state.terminal.is_none() {
state.terminal = Some(terminal);
}
drop(state);
self.available.notify_all();
self.space.notify_all();
}
}
pub struct InputStreamHandle {
shared: Arc<InputStreamShared>,
detached: Vec<u8>,
detached_offset: usize,
read_timeout: Duration,
stream_closed: bool,
_completion: StreamCompletion<NotUsed>,
}
impl InputStreamHandle {
fn new(
shared: Arc<InputStreamShared>,
read_timeout: Duration,
completion: StreamCompletion<NotUsed>,
) -> Self {
Self {
shared,
detached: Vec::new(),
detached_offset: 0,
read_timeout,
stream_closed: false,
_completion: completion,
}
}
}
impl Read for InputStreamHandle {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.stream_closed {
return Err(io::Error::other(
"stream is terminated, no reads are possible",
));
}
if buf.is_empty() {
return Ok(0);
}
let mut total = 0_usize;
if self.detached_offset < self.detached.len() {
let available = self.detached.len() - self.detached_offset;
let n = available.min(buf.len());
buf[..n]
.copy_from_slice(&self.detached[self.detached_offset..self.detached_offset + n]);
self.detached_offset += n;
total += n;
if self.detached_offset >= self.detached.len() {
self.detached.clear();
self.detached_offset = 0;
}
if total == buf.len() {
return Ok(total);
}
}
let mut state = self
.shared
.state
.lock()
.expect("input stream buffer poisoned");
loop {
while total < buf.len() {
if let Some(chunk) = state.chunks.pop_front() {
self.shared.space.notify_all();
drop(state);
let space = buf.len() - total;
let n = chunk.len().min(space);
buf[total..total + n].copy_from_slice(&chunk[..n]);
total += n;
if n < chunk.len() {
self.detached = chunk;
self.detached_offset = n;
}
state = self
.shared
.state
.lock()
.expect("input stream buffer poisoned");
continue;
}
break;
}
if total > 0 {
return Ok(total);
}
if let Some(terminal) = state.terminal.clone() {
return match terminal {
InputStreamTerminal::Complete => Ok(0),
InputStreamTerminal::Error(e) => {
Err(io::Error::other(format!("stream failed: {e}")))
}
};
}
let (new_state, timeout) = self
.shared
.available
.wait_timeout(state, self.read_timeout)
.expect("input stream buffer poisoned while waiting");
state = new_state;
if timeout.timed_out() && state.chunks.is_empty() && state.terminal.is_none() {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("timeout after {:?} waiting for new data", self.read_timeout),
));
}
}
}
}
impl Drop for InputStreamHandle {
fn drop(&mut self) {
self.shared.cancelled.store(true, Ordering::SeqCst);
drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
self.shared.available.notify_all();
self.shared.space.notify_all();
}
}
#[derive(Clone)]
#[allow(dead_code)]
enum OutputStreamTerminal {
Complete,
Error(StreamError),
}
struct OutputStreamBufferState {
chunks: VecDeque<Vec<u8>>,
terminal: Option<OutputStreamTerminal>,
}
struct OutputStreamShared {
state: Mutex<OutputStreamBufferState>,
available: Condvar,
space: Condvar,
cancelled: AtomicBool,
}
impl OutputStreamShared {
fn new() -> Self {
Self {
state: Mutex::new(OutputStreamBufferState {
chunks: VecDeque::new(),
terminal: None,
}),
available: Condvar::new(),
space: Condvar::new(),
cancelled: AtomicBool::new(false),
}
}
}
struct OutputStreamSourceStream {
shared: Arc<OutputStreamShared>,
done: bool,
}
impl Iterator for OutputStreamSourceStream {
type Item = StreamResult<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
let mut state = self
.shared
.state
.lock()
.expect("output stream buffer poisoned");
loop {
if let Some(chunk) = state.chunks.pop_front() {
self.shared.space.notify_all();
return Some(Ok(chunk));
}
match &state.terminal {
Some(OutputStreamTerminal::Complete) => {
self.done = true;
return None;
}
Some(OutputStreamTerminal::Error(e)) => {
self.done = true;
return Some(Err(e.clone()));
}
None => {}
}
state = self
.shared
.available
.wait(state)
.expect("output stream buffer poisoned while waiting");
}
}
}
impl Drop for OutputStreamSourceStream {
fn drop(&mut self) {
self.shared.cancelled.store(true, Ordering::SeqCst);
drop(self.shared.state.lock().unwrap_or_else(|p| p.into_inner()));
self.shared.available.notify_all();
self.shared.space.notify_all();
}
}
pub struct OutputStreamHandle {
shared: Arc<OutputStreamShared>,
write_timeout: Duration,
closed: AtomicBool,
}
impl OutputStreamHandle {
fn new(shared: Arc<OutputStreamShared>, write_timeout: Duration) -> Self {
Self {
shared,
write_timeout,
closed: AtomicBool::new(false),
}
}
pub fn close(&self) -> io::Result<()> {
self.closed.store(true, Ordering::SeqCst);
let mut state = self
.shared
.state
.lock()
.expect("output stream buffer poisoned");
if state.terminal.is_none() {
state.terminal = Some(OutputStreamTerminal::Complete);
}
drop(state);
self.shared.available.notify_all();
self.shared.space.notify_all();
Ok(())
}
}
impl Write for OutputStreamHandle {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.closed.load(Ordering::SeqCst) {
return Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"stream is closed, no writes are possible",
));
}
if buf.is_empty() {
return Ok(0);
}
let mut state = self
.shared
.state
.lock()
.expect("output stream buffer poisoned");
loop {
if self.closed.load(Ordering::SeqCst) || self.shared.cancelled.load(Ordering::SeqCst) {
return Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"stream is closed, no writes are possible",
));
}
if let Some(OutputStreamTerminal::Error(e)) = &state.terminal {
return Err(io::Error::other(format!("stream failed: {e}")));
}
if state.chunks.len() < OUTPUT_STREAM_BUFFER_CAPACITY {
state.chunks.push_back(buf.to_vec());
drop(state);
self.shared.available.notify_all();
return Ok(buf.len());
}
let (new_state, timeout) = self
.shared
.space
.wait_timeout(state, self.write_timeout)
.expect("output stream buffer poisoned while waiting");
state = new_state;
if timeout.timed_out()
&& state.chunks.len() >= OUTPUT_STREAM_BUFFER_CAPACITY
&& state.terminal.is_none()
{
return Err(io::Error::new(
io::ErrorKind::TimedOut,
format!(
"timed out trying to write data to stream after {:?}",
self.write_timeout
),
));
}
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Drop for OutputStreamHandle {
fn drop(&mut self) {
self.shared.cancelled.store(true, Ordering::SeqCst);
let _ = self.close();
}
}
pub struct StreamConverters;
impl StreamConverters {
#[must_use]
pub fn from_reader<F, R>(factory: F, chunk_size: usize) -> Source<Vec<u8>>
where
F: Fn() -> std::io::Result<R> + Send + Sync + 'static,
R: Read + Send + 'static,
{
assert!(chunk_size > 0, "chunk size must be greater than zero");
Source::from_materialized_factory(move |materializer| {
let reader = factory().map_err(io_error)?;
let queue = SourceQueue::new();
let queue_for_worker = Arc::clone(&queue);
let cancelled = Arc::clone(&queue.cancelled);
let completion = materializer.spawn_stream(move |_worker_cancelled| {
let mut reader = reader;
let mut guard = ReaderWorkerGuard::new(Arc::clone(&queue_for_worker));
let mut buffer = vec![0_u8; chunk_size];
loop {
if cancelled.load(Ordering::SeqCst) {
guard.disarm();
return Ok(NotUsed);
}
match reader.read(&mut buffer) {
Ok(0) => {
guard.disarm();
queue_for_worker.finish(SourceTerminal::Complete);
return Ok(NotUsed);
}
Ok(read) => {
if !queue_for_worker.push(buffer[..read].to_vec()) {
guard.disarm();
return Ok(NotUsed);
}
}
Err(error) => {
guard.disarm();
queue_for_worker.finish(SourceTerminal::Error(io_error(error)));
return Ok(NotUsed);
}
}
}
});
Ok((
Box::new(ReaderSourceStream {
queue,
completion: Some(completion),
}) as BoxStream<Vec<u8>>,
NotUsed,
))
})
}
#[must_use]
pub fn to_writer<F, W>(factory: F) -> Sink<Vec<u8>, StreamCompletion<NotUsed>>
where
F: Fn() -> std::io::Result<W> + Send + Sync + 'static,
W: Write + Send + 'static,
{
Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
let writer = WriterGuard::new(factory().map_err(io_error)?);
Ok(materializer.spawn_stream(move |cancelled| {
let mut input = input;
let mut writer = writer;
loop {
if cancelled.load(Ordering::SeqCst) {
let _ = writer.flush_once();
return Err(StreamError::Cancelled);
}
match input.next() {
Some(Ok(chunk)) => {
writer.writer_mut().write_all(&chunk).map_err(io_error)?
}
Some(Err(error)) => {
let _ = writer.flush_once();
return Err(error);
}
None => {
writer.flush_once()?;
return Ok(NotUsed);
}
}
}
}))
})
}
#[must_use]
pub fn as_input_stream(read_timeout: Duration) -> Sink<Vec<u8>, InputStreamHandle> {
Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
let shared = Arc::new(InputStreamShared::new());
let shared_for_worker = Arc::clone(&shared);
let completion = materializer.spawn_stream(move |_task_cancelled| {
let mut input = input;
loop {
if shared_for_worker.cancelled.load(Ordering::SeqCst) {
return Ok(NotUsed);
}
match input.next() {
Some(Ok(chunk)) => {
let mut state = shared_for_worker
.state
.lock()
.expect("input stream buffer poisoned");
while state.chunks.len() >= INPUT_STREAM_BUFFER_CAPACITY
&& state.terminal.is_none()
&& !shared_for_worker.cancelled.load(Ordering::SeqCst)
{
state = shared_for_worker
.space
.wait(state)
.expect("input stream buffer poisoned while waiting");
}
if state.terminal.is_some()
|| shared_for_worker.cancelled.load(Ordering::SeqCst)
{
return Ok(NotUsed);
}
if !chunk.is_empty() {
state.chunks.push_back(chunk);
}
drop(state);
shared_for_worker.available.notify_all();
}
Some(Err(e)) => {
shared_for_worker.set_terminal(InputStreamTerminal::Error(e));
return Ok(NotUsed);
}
None => {
shared_for_worker.set_terminal(InputStreamTerminal::Complete);
return Ok(NotUsed);
}
}
}
});
Ok(InputStreamHandle::new(shared, read_timeout, completion))
})
}
#[must_use]
pub fn as_output_stream(write_timeout: Duration) -> Source<Vec<u8>, OutputStreamHandle> {
Source::from_materialized_factory(move |_materializer| {
let shared = Arc::new(OutputStreamShared::new());
let handle = OutputStreamHandle::new(Arc::clone(&shared), write_timeout);
let stream = OutputStreamSourceStream {
shared,
done: false,
};
Ok((Box::new(stream) as BoxStream<Vec<u8>>, handle))
})
}
}
pub struct FileIO;
impl FileIO {
#[must_use]
pub fn from_path(path: impl Into<PathBuf>, chunk_size: usize) -> Source<Vec<u8>> {
let path = path.into();
StreamConverters::from_reader(move || File::open(&path), chunk_size)
}
#[must_use]
pub fn from_path_default(path: impl Into<PathBuf>) -> Source<Vec<u8>> {
Self::from_path(path, DEFAULT_CHUNK_SIZE)
}
#[must_use]
pub fn to_path(path: impl Into<PathBuf>) -> Sink<Vec<u8>, StreamCompletion<NotUsed>> {
let path = path.into();
StreamConverters::to_writer(move || {
OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&path)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Source;
use crate::testkit::TestSink;
use std::io::Cursor;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
fn wait_for_counter(counter: &AtomicUsize, expected: usize) {
let deadline = std::time::Instant::now() + Duration::from_secs(1);
while std::time::Instant::now() < deadline {
if counter.load(Ordering::SeqCst) == expected {
return;
}
thread::sleep(Duration::from_millis(5));
}
assert_eq!(counter.load(Ordering::SeqCst), expected);
}
fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::sleep(Duration::from_millis(5));
}
condition()
}
fn unique_temp_path(name: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock after epoch")
.as_nanos();
std::env::temp_dir().join(format!(
"datum-wp12-{name}-{}-{nanos}.bin",
std::process::id()
))
}
struct CountingReader {
inner: Cursor<Vec<u8>>,
drops: Arc<AtomicUsize>,
}
impl Read for CountingReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.inner.read(buf)
}
}
impl Drop for CountingReader {
fn drop(&mut self) {
self.drops.fetch_add(1, Ordering::SeqCst);
}
}
struct CountingWriter {
writes: Arc<Mutex<Vec<Vec<u8>>>>,
flushes: Arc<AtomicUsize>,
drops: Arc<AtomicUsize>,
fail_write: bool,
}
impl Write for CountingWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.fail_write {
return Err(std::io::Error::other("writer boom"));
}
self.writes
.lock()
.expect("writer log poisoned")
.push(buf.to_vec());
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
self.flushes.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
impl Drop for CountingWriter {
fn drop(&mut self) {
self.drops.fetch_add(1, Ordering::SeqCst);
}
}
struct CountingChunkReader {
inner: Cursor<Vec<u8>>,
chunk_size: usize,
reads: Arc<AtomicUsize>,
}
impl Read for CountingChunkReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.reads.fetch_add(1, Ordering::SeqCst);
let mut chunk = vec![0_u8; self.chunk_size.min(buf.len())];
let read = self.inner.read(&mut chunk)?;
buf[..read].copy_from_slice(&chunk[..read]);
Ok(read)
}
}
#[test]
fn from_reader_emits_chunked_bytes_and_completes() {
let sink = StreamConverters::from_reader(|| Ok(Cursor::new(b"abcdef".to_vec())), 2)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(4);
sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec(), b"ef".to_vec()]);
sink.expect_complete();
}
#[test]
fn from_reader_closes_exactly_once_on_completion() {
let drops = Arc::new(AtomicUsize::new(0));
let drops_for_reader = Arc::clone(&drops);
let sink = StreamConverters::from_reader(
move || {
Ok(CountingReader {
inner: Cursor::new(b"hello".to_vec()),
drops: Arc::clone(&drops_for_reader),
})
},
8,
)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(2);
sink.assert_next(b"hello".to_vec());
sink.expect_complete();
wait_for_counter(&drops, 1);
}
#[test]
fn from_reader_closes_exactly_once_on_cancellation() {
let drops = Arc::new(AtomicUsize::new(0));
let drops_for_reader = Arc::clone(&drops);
let mut sink = StreamConverters::from_reader(
move || {
Ok(CountingReader {
inner: Cursor::new(vec![1_u8; 32]),
drops: Arc::clone(&drops_for_reader),
})
},
4,
)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(1);
sink.assert_next(vec![1_u8; 4]);
sink.cancel();
wait_for_counter(&drops, 1);
}
#[test]
fn from_reader_surfaces_read_failure() {
struct FailingReader;
impl Read for FailingReader {
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
Err(std::io::Error::other("reader boom"))
}
}
let sink = StreamConverters::from_reader(|| Ok(FailingReader), 8)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed("reader boom".to_owned())
);
}
#[test]
fn from_reader_caps_buffered_read_ahead_for_slow_consumers() {
let payload: Vec<u8> = (0..64_u8).cycle().take(8192 * 16).collect();
let payload_for_reader = payload.clone();
let reads = Arc::new(AtomicUsize::new(0));
let reads_for_reader = Arc::clone(&reads);
let sink = StreamConverters::from_reader(
move || {
Ok(CountingChunkReader {
inner: Cursor::new(payload_for_reader.clone()),
chunk_size: 256,
reads: Arc::clone(&reads_for_reader),
})
},
256,
)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(1);
let first = sink.expect_next();
assert_eq!(first.len(), 256);
let last_seen = Arc::new(AtomicUsize::new(0));
let quiet_since_ms = Arc::new(AtomicU64::new(0));
let start = Instant::now();
assert!(wait_until(Duration::from_secs(2), {
let last_seen = Arc::clone(&last_seen);
let quiet_since_ms = Arc::clone(&quiet_since_ms);
let reads = Arc::clone(&reads);
move || {
let current = reads.load(Ordering::SeqCst);
let last = last_seen.load(Ordering::SeqCst);
if current != last {
last_seen.store(current, Ordering::SeqCst);
quiet_since_ms.store(start.elapsed().as_millis() as u64, Ordering::SeqCst);
return false;
}
let quiet_for =
start.elapsed().as_millis() as u64 - quiet_since_ms.load(Ordering::SeqCst);
current > 0 && quiet_for >= 100
}
}));
assert!(
reads.load(Ordering::SeqCst) <= READER_QUEUE_CAPACITY + 2,
"reader should plateau near the bounded queue capacity"
);
sink.request(usize::MAX);
let mut collected = first;
for chunk in sink.drain_until_complete() {
collected.extend_from_slice(&chunk);
}
assert_eq!(collected, payload);
}
#[test]
fn to_writer_writes_all_chunks_and_flushes_once() {
let writes = Arc::new(Mutex::new(Vec::new()));
let flushes = Arc::new(AtomicUsize::new(0));
let drops = Arc::new(AtomicUsize::new(0));
let completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
.run_with(StreamConverters::to_writer({
let writes = Arc::clone(&writes);
let flushes = Arc::clone(&flushes);
let drops = Arc::clone(&drops);
move || {
Ok(CountingWriter {
writes: Arc::clone(&writes),
flushes: Arc::clone(&flushes),
drops: Arc::clone(&drops),
fail_write: false,
})
}
}))
.expect("writer sink materializes");
completion.wait().expect("writer sink completes");
assert_eq!(
writes.lock().expect("writes poisoned").as_slice(),
&[b"ab".to_vec(), b"cd".to_vec()]
);
assert_eq!(flushes.load(Ordering::SeqCst), 1);
assert_eq!(drops.load(Ordering::SeqCst), 1);
}
#[test]
fn to_writer_flushes_and_drops_once_on_failure() {
let flushes = Arc::new(AtomicUsize::new(0));
let drops = Arc::new(AtomicUsize::new(0));
let completion = Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
.run_with(StreamConverters::to_writer({
let flushes = Arc::clone(&flushes);
let drops = Arc::clone(&drops);
move || {
Ok(CountingWriter {
writes: Arc::new(Mutex::new(Vec::new())),
flushes: Arc::clone(&flushes),
drops: Arc::clone(&drops),
fail_write: false,
})
}
}))
.expect("writer sink materializes");
assert_eq!(
completion.wait(),
Err(StreamError::Failed("upstream boom".to_owned()))
);
assert_eq!(flushes.load(Ordering::SeqCst), 1);
assert_eq!(drops.load(Ordering::SeqCst), 1);
}
#[test]
fn to_writer_flushes_and_drops_once_on_cancellation() {
let flushes = Arc::new(AtomicUsize::new(0));
let drops = Arc::new(AtomicUsize::new(0));
let completion = Source::repeat(vec![7_u8; 4])
.run_with(StreamConverters::to_writer({
let flushes = Arc::clone(&flushes);
let drops = Arc::clone(&drops);
move || {
Ok(CountingWriter {
writes: Arc::new(Mutex::new(Vec::new())),
flushes: Arc::clone(&flushes),
drops: Arc::clone(&drops),
fail_write: false,
})
}
}))
.expect("writer sink materializes");
drop(completion);
wait_for_counter(&flushes, 1);
wait_for_counter(&drops, 1);
}
#[test]
fn to_writer_surfaces_write_failure() {
let completion = Source::single(vec![1_u8])
.run_with(StreamConverters::to_writer(|| {
Ok(CountingWriter {
writes: Arc::new(Mutex::new(Vec::new())),
flushes: Arc::new(AtomicUsize::new(0)),
drops: Arc::new(AtomicUsize::new(0)),
fail_write: true,
})
}))
.expect("writer sink materializes");
assert_eq!(
completion.wait(),
Err(StreamError::Failed("writer boom".to_owned()))
);
}
#[test]
fn file_io_round_trips_bytes() {
let path = unique_temp_path("roundtrip");
let write_completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
.run_with(FileIO::to_path(path.clone()))
.expect("file sink materializes");
write_completion.wait().expect("file write completes");
let sink = FileIO::from_path(path.clone(), 2)
.run_with(TestSink::probe())
.expect("file source materializes");
sink.request(4);
sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
sink.expect_complete();
std::fs::remove_file(path).expect("remove roundtrip file");
}
#[test]
fn file_io_source_surfaces_open_failure() {
let missing = unique_temp_path("missing");
let result = FileIO::from_path(missing, 4).run_with(TestSink::probe());
assert!(matches!(result, Err(StreamError::Failed(_))));
}
#[test]
fn file_io_sink_creates_and_truncates_file() {
let path = unique_temp_path("truncate");
std::fs::write(&path, b"stale bytes").expect("seed file");
let completion = Source::single(b"ok".to_vec())
.run_with(FileIO::to_path(path.clone()))
.expect("file sink materializes");
completion.wait().expect("file write completes");
assert_eq!(std::fs::read(&path).expect("read file"), b"ok");
std::fs::remove_file(path).expect("remove truncate file");
}
#[test]
fn file_io_source_default_chunk_size_works() {
let path = unique_temp_path("default");
std::fs::write(&path, b"hi").expect("write seed file");
let sink = FileIO::from_path_default(path.clone())
.run_with(TestSink::probe())
.expect("file source materializes");
sink.request(2);
sink.assert_next(b"hi".to_vec());
sink.expect_complete();
std::fs::remove_file(path).expect("remove default file");
}
#[test]
fn as_input_stream_reads_data_written_by_stream() {
let mut handle: InputStreamHandle =
Source::from_iter([b"hello".to_vec(), b"world".to_vec()])
.run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
.expect("input stream sink materializes");
let mut buf = [0_u8; 32];
let mut total = 0_usize;
loop {
let n = handle.read(&mut buf[total..]).expect("read succeeds");
if n == 0 {
break;
}
total += n;
}
assert_eq!(&buf[..total], b"helloworld");
}
#[test]
fn as_input_stream_eof_when_stream_completes() {
let mut handle: InputStreamHandle = Source::from_iter([b"x".to_vec()])
.run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
.expect("input stream sink materializes");
let mut buf = [0_u8; 4];
let n = handle.read(&mut buf).expect("first read succeeds");
assert_eq!(&buf[..n], b"x");
let n = handle.read(&mut buf).expect("second read returns eof");
assert_eq!(n, 0);
}
#[test]
fn as_input_stream_partial_reads_work() {
let mut handle: InputStreamHandle = Source::single(b"abcde".to_vec())
.run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
.expect("input stream sink materializes");
let mut small = [0_u8; 2];
let n = handle.read(&mut small).expect("small read succeeds");
assert_eq!(n, 2);
assert_eq!(&small[..], b"ab");
let n = handle.read(&mut small).expect("second small read succeeds");
assert_eq!(n, 2);
assert_eq!(&small[..], b"cd");
let n = handle.read(&mut small).expect("third small read succeeds");
assert_eq!(n, 1);
assert_eq!(&small[..1], b"e");
let n = handle.read(&mut small).expect("fourth read returns eof");
assert_eq!(n, 0);
}
#[test]
fn as_input_stream_error_surfaces_as_io_error() {
let mut handle: InputStreamHandle =
Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
.run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
.expect("input stream sink materializes");
let mut buf = [0_u8; 8];
let err = handle.read(&mut buf).expect_err("read surfaces error");
let msg = err.to_string();
assert!(msg.contains("upstream boom"), "got: {msg}");
}
#[test]
fn as_input_stream_cancellation_stops_reads() {
let mut handle: InputStreamHandle = Source::repeat(b"x".to_vec())
.run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
.expect("input stream sink materializes");
let mut buf = [0_u8; 3];
let n = handle.read(&mut buf).expect("first read succeeds");
assert!((1..=3).contains(&n), "expected 1..=3 bytes, got {n}");
drop(handle);
}
#[test]
fn as_input_stream_read_timeout_returns_timed_out() {
let mut handle: InputStreamHandle = Source::<Vec<u8>>::never()
.run_with(StreamConverters::as_input_stream(Duration::from_millis(10)))
.expect("input stream sink materializes");
let mut buf = [0_u8; 4];
let err = handle.read(&mut buf).expect_err("read times out");
assert_eq!(err.kind(), io::ErrorKind::TimedOut);
}
#[test]
fn as_output_stream_writes_data_appear_in_stream() {
let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
.to_mat(Sink::collect(), crate::Keep::both)
.run()
.expect("output stream source materializes");
handle.write_all(b"alpha").expect("first write succeeds");
handle.write_all(b"beta").expect("second write succeeds");
handle.close().expect("close succeeds");
let chunks = completion.wait().expect("stream completes");
assert_eq!(chunks, vec![b"alpha".to_vec(), b"beta".to_vec()]);
}
#[test]
fn as_output_stream_close_completes_stream() {
let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
.to_mat(Sink::collect(), crate::Keep::both)
.run()
.expect("output stream source materializes");
handle.write_all(b"done").expect("write succeeds");
let result = handle.close();
assert!(result.is_ok());
let chunks = completion.wait().expect("stream completes after close");
assert_eq!(chunks, vec![b"done".to_vec()]);
}
#[test]
fn as_output_stream_write_after_close_fails() {
let (mut handle, _completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
.to_mat(Sink::ignore(), crate::Keep::both)
.run()
.expect("output stream source materializes");
handle.close().expect("first close succeeds");
let err = handle.write(b"late").expect_err("write after close fails");
assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
}
#[test]
fn as_output_stream_cancellation_stops_writes() {
let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
.to_mat(Sink::ignore(), crate::Keep::both)
.run()
.expect("output stream source materializes");
handle.write_all(b"ok").expect("write succeeds");
drop(completion);
let deadline = std::time::Instant::now() + Duration::from_secs(1);
let mut last_err = None;
while std::time::Instant::now() < deadline {
match handle.write(b"after cancel") {
Err(e) => {
last_err = Some(e);
break;
}
_ => thread::sleep(Duration::from_millis(5)),
}
}
let err = last_err.expect("write after cancellation should fail");
assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
}
#[test]
fn as_output_stream_write_timeout_returns_timed_out() {
let hang_sink: Sink<Vec<u8>, StreamCompletion<NotUsed>> =
Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
Ok(materializer.spawn_stream(move |cancelled| {
let _input = input;
loop {
if cancelled.load(Ordering::SeqCst) {
return Ok(NotUsed);
}
thread::sleep(Duration::from_millis(1));
}
}))
});
let (mut handle, _hang_completion) =
StreamConverters::as_output_stream(Duration::from_millis(50))
.to_mat(hang_sink, crate::Keep::both)
.run()
.expect("output stream source materializes");
let capacity = 16_usize;
for _ in 0..capacity {
handle.write_all(b"x").expect("buffer-fill write succeeds");
}
let err = handle.write(b"overflow").expect_err("write times out");
assert_eq!(err.kind(), io::ErrorKind::TimedOut);
}
#[test]
fn as_output_stream_flush_is_noop() {
let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
.to_mat(Sink::collect(), crate::Keep::both)
.run()
.expect("output stream source materializes");
handle.write_all(b"data").expect("write succeeds");
handle.flush().expect("flush is a noop");
handle.close().expect("close succeeds");
let chunks = completion.wait().expect("stream completes");
assert_eq!(chunks, vec![b"data".to_vec()]);
}
#[test]
fn as_output_stream_drop_completes_stream() {
let (mut handle, completion) = StreamConverters::as_output_stream(Duration::from_secs(5))
.to_mat(Sink::collect(), crate::Keep::both)
.run()
.expect("output stream source materializes");
handle.write_all(b"drop-me").expect("write succeeds");
drop(handle);
let chunks = completion.wait().expect("stream completes after drop");
assert_eq!(chunks, vec![b"drop-me".to_vec()]);
}
#[test]
fn round_trip_output_stream_to_input_stream() {
let (mut out_handle, mut in_handle): (OutputStreamHandle, InputStreamHandle) =
StreamConverters::as_output_stream(Duration::from_secs(5))
.to_mat(
StreamConverters::as_input_stream(Duration::from_secs(5)),
crate::Keep::both,
)
.run()
.expect("round-trip stream materializes");
out_handle.write_all(b"round").expect("write round");
out_handle.write_all(b"trip").expect("write trip");
out_handle.close().expect("close output");
let mut buf = [0_u8; 16];
let mut total = 0_usize;
loop {
let n = in_handle.read(&mut buf[total..]).expect("read");
if n == 0 {
break;
}
total += n;
}
assert_eq!(&buf[..total], b"roundtrip");
}
#[test]
fn as_input_stream_empty_buf_read_returns_zero() {
let mut handle: InputStreamHandle = Source::single(b"abc".to_vec())
.run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
.expect("input stream sink materializes");
let n = handle.read(&mut []).expect("empty read succeeds");
assert_eq!(n, 0);
}
#[test]
fn as_input_stream_large_read_across_multiple_chunks() {
let chunks: Vec<Vec<u8>> = (0..10).map(|i| vec![i as u8; 3]).collect();
let total_bytes: usize = chunks.iter().map(|c| c.len()).sum();
let mut handle: InputStreamHandle = Source::from_iter(chunks)
.run_with(StreamConverters::as_input_stream(Duration::from_secs(5)))
.expect("input stream sink materializes");
let mut buf = vec![0_u8; total_bytes];
let mut total = 0_usize;
loop {
let n = handle.read(&mut buf[total..]).expect("large read succeeds");
if n == 0 {
break;
}
total += n;
}
assert_eq!(total, total_bytes);
}
}