use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
use crate::{StreamError, StreamResult};
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::path::PathBuf;
use std::sync::{
Arc, Condvar, Mutex,
atomic::{AtomicBool, Ordering},
};
const DEFAULT_CHUNK_SIZE: usize = 8192;
const READER_QUEUE_CAPACITY: usize = 8;
fn io_error(error: std::io::Error) -> StreamError {
StreamError::Failed(error.to_string())
}
#[derive(Clone)]
enum SourceTerminal {
Complete,
Error(StreamError),
}
struct SourceQueueState {
queue: VecDeque<Vec<u8>>,
terminal: Option<SourceTerminal>,
}
struct SourceQueue {
state: Mutex<SourceQueueState>,
available: Condvar,
space: Condvar,
capacity: usize,
cancelled: Arc<AtomicBool>,
}
impl SourceQueue {
fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(SourceQueueState {
queue: VecDeque::new(),
terminal: None,
}),
available: Condvar::new(),
space: Condvar::new(),
capacity: READER_QUEUE_CAPACITY,
cancelled: Arc::new(AtomicBool::new(false)),
})
}
fn push(&self, chunk: Vec<u8>) -> bool {
let mut state = self.state.lock().expect("io source queue poisoned");
while state.queue.len() >= self.capacity
&& state.terminal.is_none()
&& !self.cancelled.load(Ordering::SeqCst)
{
state = self
.space
.wait(state)
.expect("io source queue poisoned while waiting for space");
}
if state.terminal.is_some() || self.cancelled.load(Ordering::SeqCst) {
return false;
}
if state.terminal.is_none() {
state.queue.push_back(chunk);
}
drop(state);
self.available.notify_all();
true
}
fn finish(&self, terminal: SourceTerminal) {
let mut state = self.state.lock().expect("io source queue poisoned");
if state.terminal.is_none() {
state.terminal = Some(terminal);
}
drop(state);
self.available.notify_all();
self.space.notify_all();
}
}
struct ReaderWorkerGuard {
queue: Arc<SourceQueue>,
armed: bool,
}
impl ReaderWorkerGuard {
fn new(queue: Arc<SourceQueue>) -> Self {
Self { queue, armed: true }
}
fn disarm(&mut self) {
self.armed = false;
}
}
impl Drop for ReaderWorkerGuard {
fn drop(&mut self) {
if self.armed {
self.queue
.finish(SourceTerminal::Error(StreamError::AbruptTermination));
}
}
}
struct ReaderSourceStream {
queue: Arc<SourceQueue>,
completion: Option<StreamCompletion<NotUsed>>,
}
impl Iterator for ReaderSourceStream {
type Item = StreamResult<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self.queue.state.lock().expect("io source queue poisoned");
loop {
if let Some(chunk) = state.queue.pop_front() {
self.queue.space.notify_all();
return Some(Ok(chunk));
}
if let Some(terminal) = state.terminal.clone() {
return match terminal {
SourceTerminal::Complete => None,
SourceTerminal::Error(error) => Some(Err(error)),
};
}
state = self
.queue
.available
.wait(state)
.expect("io source queue poisoned while waiting");
}
}
}
impl Drop for ReaderSourceStream {
fn drop(&mut self) {
self.queue.cancelled.store(true, Ordering::SeqCst);
drop(self.queue.state.lock().unwrap_or_else(|p| p.into_inner()));
self.queue.available.notify_all();
self.queue.space.notify_all();
let _ = self.completion.take();
}
}
struct WriterGuard<W: Write> {
writer: W,
flushed: bool,
}
impl<W: Write> WriterGuard<W> {
fn new(writer: W) -> Self {
Self {
writer,
flushed: false,
}
}
fn writer_mut(&mut self) -> &mut W {
&mut self.writer
}
fn flush_once(&mut self) -> StreamResult<()> {
if self.flushed {
return Ok(());
}
self.writer.flush().map_err(io_error)?;
self.flushed = true;
Ok(())
}
}
impl<W: Write> Drop for WriterGuard<W> {
fn drop(&mut self) {
let _ = self.flush_once();
}
}
pub struct StreamConverters;
impl StreamConverters {
#[must_use]
pub fn from_reader<F, R>(factory: F, chunk_size: usize) -> Source<Vec<u8>>
where
F: Fn() -> std::io::Result<R> + Send + Sync + 'static,
R: Read + Send + 'static,
{
assert!(chunk_size > 0, "chunk size must be greater than zero");
Source::from_materialized_factory(move |materializer| {
let reader = factory().map_err(io_error)?;
let queue = SourceQueue::new();
let queue_for_worker = Arc::clone(&queue);
let cancelled = Arc::clone(&queue.cancelled);
let completion = materializer.spawn_stream(move |_worker_cancelled| {
let mut reader = reader;
let mut guard = ReaderWorkerGuard::new(Arc::clone(&queue_for_worker));
let mut buffer = vec![0_u8; chunk_size];
loop {
if cancelled.load(Ordering::SeqCst) {
guard.disarm();
return Ok(NotUsed);
}
match reader.read(&mut buffer) {
Ok(0) => {
guard.disarm();
queue_for_worker.finish(SourceTerminal::Complete);
return Ok(NotUsed);
}
Ok(read) => {
if !queue_for_worker.push(buffer[..read].to_vec()) {
guard.disarm();
return Ok(NotUsed);
}
}
Err(error) => {
guard.disarm();
queue_for_worker.finish(SourceTerminal::Error(io_error(error)));
return Ok(NotUsed);
}
}
}
});
Ok((
Box::new(ReaderSourceStream {
queue,
completion: Some(completion),
}) as BoxStream<Vec<u8>>,
NotUsed,
))
})
}
#[must_use]
pub fn to_writer<F, W>(factory: F) -> Sink<Vec<u8>, StreamCompletion<NotUsed>>
where
F: Fn() -> std::io::Result<W> + Send + Sync + 'static,
W: Write + Send + 'static,
{
Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
let writer = WriterGuard::new(factory().map_err(io_error)?);
Ok(materializer.spawn_stream(move |cancelled| {
let mut input = input;
let mut writer = writer;
loop {
if cancelled.load(Ordering::SeqCst) {
let _ = writer.flush_once();
return Err(StreamError::Cancelled);
}
match input.next() {
Some(Ok(chunk)) => {
writer.writer_mut().write_all(&chunk).map_err(io_error)?
}
Some(Err(error)) => {
let _ = writer.flush_once();
return Err(error);
}
None => {
writer.flush_once()?;
return Ok(NotUsed);
}
}
}
}))
})
}
}
pub struct FileIO;
impl FileIO {
#[must_use]
pub fn from_path(path: impl Into<PathBuf>, chunk_size: usize) -> Source<Vec<u8>> {
let path = path.into();
StreamConverters::from_reader(move || File::open(&path), chunk_size)
}
#[must_use]
pub fn from_path_default(path: impl Into<PathBuf>) -> Source<Vec<u8>> {
Self::from_path(path, DEFAULT_CHUNK_SIZE)
}
#[must_use]
pub fn to_path(path: impl Into<PathBuf>) -> Sink<Vec<u8>, StreamCompletion<NotUsed>> {
let path = path.into();
StreamConverters::to_writer(move || {
OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&path)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Source;
use crate::testkit::TestSink;
use std::io::Cursor;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
fn wait_for_counter(counter: &AtomicUsize, expected: usize) {
let deadline = std::time::Instant::now() + Duration::from_secs(1);
while std::time::Instant::now() < deadline {
if counter.load(Ordering::SeqCst) == expected {
return;
}
thread::sleep(Duration::from_millis(5));
}
assert_eq!(counter.load(Ordering::SeqCst), expected);
}
fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::sleep(Duration::from_millis(5));
}
condition()
}
fn unique_temp_path(name: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock after epoch")
.as_nanos();
std::env::temp_dir().join(format!(
"datum-wp12-{name}-{}-{nanos}.bin",
std::process::id()
))
}
struct CountingReader {
inner: Cursor<Vec<u8>>,
drops: Arc<AtomicUsize>,
}
impl Read for CountingReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.inner.read(buf)
}
}
impl Drop for CountingReader {
fn drop(&mut self) {
self.drops.fetch_add(1, Ordering::SeqCst);
}
}
struct CountingWriter {
writes: Arc<Mutex<Vec<Vec<u8>>>>,
flushes: Arc<AtomicUsize>,
drops: Arc<AtomicUsize>,
fail_write: bool,
}
impl Write for CountingWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.fail_write {
return Err(std::io::Error::other("writer boom"));
}
self.writes
.lock()
.expect("writer log poisoned")
.push(buf.to_vec());
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
self.flushes.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
impl Drop for CountingWriter {
fn drop(&mut self) {
self.drops.fetch_add(1, Ordering::SeqCst);
}
}
struct CountingChunkReader {
inner: Cursor<Vec<u8>>,
chunk_size: usize,
reads: Arc<AtomicUsize>,
}
impl Read for CountingChunkReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.reads.fetch_add(1, Ordering::SeqCst);
let mut chunk = vec![0_u8; self.chunk_size.min(buf.len())];
let read = self.inner.read(&mut chunk)?;
buf[..read].copy_from_slice(&chunk[..read]);
Ok(read)
}
}
#[test]
fn from_reader_emits_chunked_bytes_and_completes() {
let sink = StreamConverters::from_reader(|| Ok(Cursor::new(b"abcdef".to_vec())), 2)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(4);
sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec(), b"ef".to_vec()]);
sink.expect_complete();
}
#[test]
fn from_reader_closes_exactly_once_on_completion() {
let drops = Arc::new(AtomicUsize::new(0));
let drops_for_reader = Arc::clone(&drops);
let sink = StreamConverters::from_reader(
move || {
Ok(CountingReader {
inner: Cursor::new(b"hello".to_vec()),
drops: Arc::clone(&drops_for_reader),
})
},
8,
)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(2);
sink.assert_next(b"hello".to_vec());
sink.expect_complete();
wait_for_counter(&drops, 1);
}
#[test]
fn from_reader_closes_exactly_once_on_cancellation() {
let drops = Arc::new(AtomicUsize::new(0));
let drops_for_reader = Arc::clone(&drops);
let mut sink = StreamConverters::from_reader(
move || {
Ok(CountingReader {
inner: Cursor::new(vec![1_u8; 32]),
drops: Arc::clone(&drops_for_reader),
})
},
4,
)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(1);
sink.assert_next(vec![1_u8; 4]);
sink.cancel();
wait_for_counter(&drops, 1);
}
#[test]
fn from_reader_surfaces_read_failure() {
struct FailingReader;
impl Read for FailingReader {
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
Err(std::io::Error::other("reader boom"))
}
}
let sink = StreamConverters::from_reader(|| Ok(FailingReader), 8)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(1);
assert_eq!(
sink.expect_error(),
StreamError::Failed("reader boom".to_owned())
);
}
#[test]
fn from_reader_caps_buffered_read_ahead_for_slow_consumers() {
let payload: Vec<u8> = (0..64_u8).cycle().take(8192 * 16).collect();
let payload_for_reader = payload.clone();
let reads = Arc::new(AtomicUsize::new(0));
let reads_for_reader = Arc::clone(&reads);
let sink = StreamConverters::from_reader(
move || {
Ok(CountingChunkReader {
inner: Cursor::new(payload_for_reader.clone()),
chunk_size: 256,
reads: Arc::clone(&reads_for_reader),
})
},
256,
)
.run_with(TestSink::probe())
.expect("reader source materializes");
sink.request(1);
let first = sink.expect_next();
assert_eq!(first.len(), 256);
let last_seen = Arc::new(AtomicUsize::new(0));
let quiet_since_ms = Arc::new(AtomicU64::new(0));
let start = Instant::now();
assert!(wait_until(Duration::from_secs(2), {
let last_seen = Arc::clone(&last_seen);
let quiet_since_ms = Arc::clone(&quiet_since_ms);
let reads = Arc::clone(&reads);
move || {
let current = reads.load(Ordering::SeqCst);
let last = last_seen.load(Ordering::SeqCst);
if current != last {
last_seen.store(current, Ordering::SeqCst);
quiet_since_ms.store(start.elapsed().as_millis() as u64, Ordering::SeqCst);
return false;
}
let quiet_for =
start.elapsed().as_millis() as u64 - quiet_since_ms.load(Ordering::SeqCst);
current > 0 && quiet_for >= 100
}
}));
assert!(
reads.load(Ordering::SeqCst) <= READER_QUEUE_CAPACITY + 2,
"reader should plateau near the bounded queue capacity"
);
sink.request(usize::MAX);
let mut collected = first;
for chunk in sink.drain_until_complete() {
collected.extend_from_slice(&chunk);
}
assert_eq!(collected, payload);
}
#[test]
fn to_writer_writes_all_chunks_and_flushes_once() {
let writes = Arc::new(Mutex::new(Vec::new()));
let flushes = Arc::new(AtomicUsize::new(0));
let drops = Arc::new(AtomicUsize::new(0));
let completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
.run_with(StreamConverters::to_writer({
let writes = Arc::clone(&writes);
let flushes = Arc::clone(&flushes);
let drops = Arc::clone(&drops);
move || {
Ok(CountingWriter {
writes: Arc::clone(&writes),
flushes: Arc::clone(&flushes),
drops: Arc::clone(&drops),
fail_write: false,
})
}
}))
.expect("writer sink materializes");
completion.wait().expect("writer sink completes");
assert_eq!(
writes.lock().expect("writes poisoned").as_slice(),
&[b"ab".to_vec(), b"cd".to_vec()]
);
assert_eq!(flushes.load(Ordering::SeqCst), 1);
assert_eq!(drops.load(Ordering::SeqCst), 1);
}
#[test]
fn to_writer_flushes_and_drops_once_on_failure() {
let flushes = Arc::new(AtomicUsize::new(0));
let drops = Arc::new(AtomicUsize::new(0));
let completion = Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
.run_with(StreamConverters::to_writer({
let flushes = Arc::clone(&flushes);
let drops = Arc::clone(&drops);
move || {
Ok(CountingWriter {
writes: Arc::new(Mutex::new(Vec::new())),
flushes: Arc::clone(&flushes),
drops: Arc::clone(&drops),
fail_write: false,
})
}
}))
.expect("writer sink materializes");
assert_eq!(
completion.wait(),
Err(StreamError::Failed("upstream boom".to_owned()))
);
assert_eq!(flushes.load(Ordering::SeqCst), 1);
assert_eq!(drops.load(Ordering::SeqCst), 1);
}
#[test]
fn to_writer_flushes_and_drops_once_on_cancellation() {
let flushes = Arc::new(AtomicUsize::new(0));
let drops = Arc::new(AtomicUsize::new(0));
let completion = Source::repeat(vec![7_u8; 4])
.run_with(StreamConverters::to_writer({
let flushes = Arc::clone(&flushes);
let drops = Arc::clone(&drops);
move || {
Ok(CountingWriter {
writes: Arc::new(Mutex::new(Vec::new())),
flushes: Arc::clone(&flushes),
drops: Arc::clone(&drops),
fail_write: false,
})
}
}))
.expect("writer sink materializes");
drop(completion);
wait_for_counter(&flushes, 1);
wait_for_counter(&drops, 1);
}
#[test]
fn to_writer_surfaces_write_failure() {
let completion = Source::single(vec![1_u8])
.run_with(StreamConverters::to_writer(|| {
Ok(CountingWriter {
writes: Arc::new(Mutex::new(Vec::new())),
flushes: Arc::new(AtomicUsize::new(0)),
drops: Arc::new(AtomicUsize::new(0)),
fail_write: true,
})
}))
.expect("writer sink materializes");
assert_eq!(
completion.wait(),
Err(StreamError::Failed("writer boom".to_owned()))
);
}
#[test]
fn file_io_round_trips_bytes() {
let path = unique_temp_path("roundtrip");
let write_completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
.run_with(FileIO::to_path(path.clone()))
.expect("file sink materializes");
write_completion.wait().expect("file write completes");
let sink = FileIO::from_path(path.clone(), 2)
.run_with(TestSink::probe())
.expect("file source materializes");
sink.request(4);
sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
sink.expect_complete();
std::fs::remove_file(path).expect("remove roundtrip file");
}
#[test]
fn file_io_source_surfaces_open_failure() {
let missing = unique_temp_path("missing");
let result = FileIO::from_path(missing, 4).run_with(TestSink::probe());
assert!(matches!(result, Err(StreamError::Failed(_))));
}
#[test]
fn file_io_sink_creates_and_truncates_file() {
let path = unique_temp_path("truncate");
std::fs::write(&path, b"stale bytes").expect("seed file");
let completion = Source::single(b"ok".to_vec())
.run_with(FileIO::to_path(path.clone()))
.expect("file sink materializes");
completion.wait().expect("file write completes");
assert_eq!(std::fs::read(&path).expect("read file"), b"ok");
std::fs::remove_file(path).expect("remove truncate file");
}
#[test]
fn file_io_source_default_chunk_size_works() {
let path = unique_temp_path("default");
std::fs::write(&path, b"hi").expect("write seed file");
let sink = FileIO::from_path_default(path.clone())
.run_with(TestSink::probe())
.expect("file source materializes");
sink.request(2);
sink.assert_next(b"hi".to_vec());
sink.expect_complete();
std::fs::remove_file(path).expect("remove default file");
}
}