Skip to main content

datum/io/
adapters.rs

1use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
2use crate::{StreamError, StreamResult};
3use std::collections::VecDeque;
4use std::fs::{File, OpenOptions};
5use std::io::{Read, Write};
6use std::path::PathBuf;
7use std::sync::{
8    Arc, Condvar, Mutex,
9    atomic::{AtomicBool, Ordering},
10};
11
12const DEFAULT_CHUNK_SIZE: usize = 8192;
13const READER_QUEUE_CAPACITY: usize = 8;
14
15fn io_error(error: std::io::Error) -> StreamError {
16    StreamError::Failed(error.to_string())
17}
18
19#[derive(Clone)]
20enum SourceTerminal {
21    Complete,
22    Error(StreamError),
23}
24
25struct SourceQueueState {
26    queue: VecDeque<Vec<u8>>,
27    terminal: Option<SourceTerminal>,
28}
29
30struct SourceQueue {
31    state: Mutex<SourceQueueState>,
32    available: Condvar,
33    space: Condvar,
34    capacity: usize,
35    cancelled: Arc<AtomicBool>,
36}
37
38impl SourceQueue {
39    fn new() -> Arc<Self> {
40        Arc::new(Self {
41            state: Mutex::new(SourceQueueState {
42                queue: VecDeque::new(),
43                terminal: None,
44            }),
45            available: Condvar::new(),
46            space: Condvar::new(),
47            capacity: READER_QUEUE_CAPACITY,
48            cancelled: Arc::new(AtomicBool::new(false)),
49        })
50    }
51
52    fn push(&self, chunk: Vec<u8>) -> bool {
53        let mut state = self.state.lock().expect("io source queue poisoned");
54        while state.queue.len() >= self.capacity
55            && state.terminal.is_none()
56            && !self.cancelled.load(Ordering::SeqCst)
57        {
58            state = self
59                .space
60                .wait(state)
61                .expect("io source queue poisoned while waiting for space");
62        }
63
64        if state.terminal.is_some() || self.cancelled.load(Ordering::SeqCst) {
65            return false;
66        }
67
68        if state.terminal.is_none() {
69            state.queue.push_back(chunk);
70        }
71        drop(state);
72        self.available.notify_all();
73        true
74    }
75
76    fn finish(&self, terminal: SourceTerminal) {
77        let mut state = self.state.lock().expect("io source queue poisoned");
78        if state.terminal.is_none() {
79            state.terminal = Some(terminal);
80        }
81        drop(state);
82        self.available.notify_all();
83        self.space.notify_all();
84    }
85}
86
87struct ReaderWorkerGuard {
88    queue: Arc<SourceQueue>,
89    armed: bool,
90}
91
92impl ReaderWorkerGuard {
93    fn new(queue: Arc<SourceQueue>) -> Self {
94        Self { queue, armed: true }
95    }
96
97    fn disarm(&mut self) {
98        self.armed = false;
99    }
100}
101
102impl Drop for ReaderWorkerGuard {
103    fn drop(&mut self) {
104        if self.armed {
105            self.queue
106                .finish(SourceTerminal::Error(StreamError::AbruptTermination));
107        }
108    }
109}
110
111struct ReaderSourceStream {
112    queue: Arc<SourceQueue>,
113    completion: Option<StreamCompletion<NotUsed>>,
114}
115
116impl Iterator for ReaderSourceStream {
117    type Item = StreamResult<Vec<u8>>;
118
119    fn next(&mut self) -> Option<Self::Item> {
120        let mut state = self.queue.state.lock().expect("io source queue poisoned");
121        loop {
122            if let Some(chunk) = state.queue.pop_front() {
123                self.queue.space.notify_all();
124                return Some(Ok(chunk));
125            }
126            if let Some(terminal) = state.terminal.clone() {
127                return match terminal {
128                    SourceTerminal::Complete => None,
129                    SourceTerminal::Error(error) => Some(Err(error)),
130                };
131            }
132            state = self
133                .queue
134                .available
135                .wait(state)
136                .expect("io source queue poisoned while waiting");
137        }
138    }
139}
140
141impl Drop for ReaderSourceStream {
142    fn drop(&mut self) {
143        self.queue.cancelled.store(true, Ordering::SeqCst);
144        // Take and release the state lock before notifying: a producer that
145        // already read `cancelled == false` under the lock is then guaranteed
146        // to be parked inside `space.wait` before the notification fires.
147        // Without this the wake-up can land in that gap and be lost, parking
148        // the reader worker forever (no later notifier exists once the
149        // consumer is gone). `unwrap_or_else` instead of `expect`: panicking
150        // in Drop during unwind would abort.
151        drop(self.queue.state.lock().unwrap_or_else(|p| p.into_inner()));
152        self.queue.available.notify_all();
153        self.queue.space.notify_all();
154        let _ = self.completion.take();
155    }
156}
157
158struct WriterGuard<W: Write> {
159    writer: W,
160    flushed: bool,
161}
162
163impl<W: Write> WriterGuard<W> {
164    fn new(writer: W) -> Self {
165        Self {
166            writer,
167            flushed: false,
168        }
169    }
170
171    fn writer_mut(&mut self) -> &mut W {
172        &mut self.writer
173    }
174
175    fn flush_once(&mut self) -> StreamResult<()> {
176        if self.flushed {
177            return Ok(());
178        }
179        self.writer.flush().map_err(io_error)?;
180        self.flushed = true;
181        Ok(())
182    }
183}
184
185impl<W: Write> Drop for WriterGuard<W> {
186    fn drop(&mut self) {
187        let _ = self.flush_once();
188    }
189}
190
191pub struct StreamConverters;
192
193impl StreamConverters {
194    #[must_use]
195    pub fn from_reader<F, R>(factory: F, chunk_size: usize) -> Source<Vec<u8>>
196    where
197        F: Fn() -> std::io::Result<R> + Send + Sync + 'static,
198        R: Read + Send + 'static,
199    {
200        assert!(chunk_size > 0, "chunk size must be greater than zero");
201        Source::from_materialized_factory(move |materializer| {
202            let reader = factory().map_err(io_error)?;
203            let queue = SourceQueue::new();
204            let queue_for_worker = Arc::clone(&queue);
205            let cancelled = Arc::clone(&queue.cancelled);
206            let completion = materializer.spawn_stream(move |_worker_cancelled| {
207                let mut reader = reader;
208                let mut guard = ReaderWorkerGuard::new(Arc::clone(&queue_for_worker));
209                let mut buffer = vec![0_u8; chunk_size];
210
211                loop {
212                    if cancelled.load(Ordering::SeqCst) {
213                        guard.disarm();
214                        return Ok(NotUsed);
215                    }
216
217                    match reader.read(&mut buffer) {
218                        Ok(0) => {
219                            guard.disarm();
220                            queue_for_worker.finish(SourceTerminal::Complete);
221                            return Ok(NotUsed);
222                        }
223                        Ok(read) => {
224                            if !queue_for_worker.push(buffer[..read].to_vec()) {
225                                guard.disarm();
226                                return Ok(NotUsed);
227                            }
228                        }
229                        Err(error) => {
230                            guard.disarm();
231                            queue_for_worker.finish(SourceTerminal::Error(io_error(error)));
232                            return Ok(NotUsed);
233                        }
234                    }
235                }
236            });
237
238            Ok((
239                Box::new(ReaderSourceStream {
240                    queue,
241                    completion: Some(completion),
242                }) as BoxStream<Vec<u8>>,
243                NotUsed,
244            ))
245        })
246    }
247
248    #[must_use]
249    pub fn to_writer<F, W>(factory: F) -> Sink<Vec<u8>, StreamCompletion<NotUsed>>
250    where
251        F: Fn() -> std::io::Result<W> + Send + Sync + 'static,
252        W: Write + Send + 'static,
253    {
254        Sink::from_runner(move |input: BoxStream<Vec<u8>>, materializer| {
255            let writer = WriterGuard::new(factory().map_err(io_error)?);
256            Ok(materializer.spawn_stream(move |cancelled| {
257                let mut input = input;
258                let mut writer = writer;
259                loop {
260                    if cancelled.load(Ordering::SeqCst) {
261                        let _ = writer.flush_once();
262                        return Err(StreamError::Cancelled);
263                    }
264
265                    match input.next() {
266                        Some(Ok(chunk)) => {
267                            writer.writer_mut().write_all(&chunk).map_err(io_error)?
268                        }
269                        Some(Err(error)) => {
270                            let _ = writer.flush_once();
271                            return Err(error);
272                        }
273                        None => {
274                            writer.flush_once()?;
275                            return Ok(NotUsed);
276                        }
277                    }
278                }
279            }))
280        })
281    }
282}
283
284pub struct FileIO;
285
286impl FileIO {
287    #[must_use]
288    pub fn from_path(path: impl Into<PathBuf>, chunk_size: usize) -> Source<Vec<u8>> {
289        let path = path.into();
290        StreamConverters::from_reader(move || File::open(&path), chunk_size)
291    }
292
293    #[must_use]
294    pub fn from_path_default(path: impl Into<PathBuf>) -> Source<Vec<u8>> {
295        Self::from_path(path, DEFAULT_CHUNK_SIZE)
296    }
297
298    #[must_use]
299    pub fn to_path(path: impl Into<PathBuf>) -> Sink<Vec<u8>, StreamCompletion<NotUsed>> {
300        let path = path.into();
301        StreamConverters::to_writer(move || {
302            OpenOptions::new()
303                .create(true)
304                .truncate(true)
305                .write(true)
306                .open(&path)
307        })
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::Source;
315    use crate::testkit::TestSink;
316    use std::io::Cursor;
317    use std::sync::atomic::{AtomicU64, AtomicUsize};
318    use std::thread;
319    use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
320
321    fn wait_for_counter(counter: &AtomicUsize, expected: usize) {
322        let deadline = std::time::Instant::now() + Duration::from_secs(1);
323        while std::time::Instant::now() < deadline {
324            if counter.load(Ordering::SeqCst) == expected {
325                return;
326            }
327            thread::sleep(Duration::from_millis(5));
328        }
329        assert_eq!(counter.load(Ordering::SeqCst), expected);
330    }
331
332    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
333        let deadline = Instant::now() + timeout;
334        while Instant::now() < deadline {
335            if condition() {
336                return true;
337            }
338            thread::sleep(Duration::from_millis(5));
339        }
340        condition()
341    }
342
343    fn unique_temp_path(name: &str) -> PathBuf {
344        let nanos = SystemTime::now()
345            .duration_since(UNIX_EPOCH)
346            .expect("clock after epoch")
347            .as_nanos();
348        std::env::temp_dir().join(format!(
349            "datum-wp12-{name}-{}-{nanos}.bin",
350            std::process::id()
351        ))
352    }
353
354    struct CountingReader {
355        inner: Cursor<Vec<u8>>,
356        drops: Arc<AtomicUsize>,
357    }
358
359    impl Read for CountingReader {
360        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
361            self.inner.read(buf)
362        }
363    }
364
365    impl Drop for CountingReader {
366        fn drop(&mut self) {
367            self.drops.fetch_add(1, Ordering::SeqCst);
368        }
369    }
370
371    struct CountingWriter {
372        writes: Arc<Mutex<Vec<Vec<u8>>>>,
373        flushes: Arc<AtomicUsize>,
374        drops: Arc<AtomicUsize>,
375        fail_write: bool,
376    }
377
378    impl Write for CountingWriter {
379        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
380            if self.fail_write {
381                return Err(std::io::Error::other("writer boom"));
382            }
383            self.writes
384                .lock()
385                .expect("writer log poisoned")
386                .push(buf.to_vec());
387            Ok(buf.len())
388        }
389
390        fn flush(&mut self) -> std::io::Result<()> {
391            self.flushes.fetch_add(1, Ordering::SeqCst);
392            Ok(())
393        }
394    }
395
396    impl Drop for CountingWriter {
397        fn drop(&mut self) {
398            self.drops.fetch_add(1, Ordering::SeqCst);
399        }
400    }
401
402    struct CountingChunkReader {
403        inner: Cursor<Vec<u8>>,
404        chunk_size: usize,
405        reads: Arc<AtomicUsize>,
406    }
407
408    impl Read for CountingChunkReader {
409        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
410            self.reads.fetch_add(1, Ordering::SeqCst);
411            let mut chunk = vec![0_u8; self.chunk_size.min(buf.len())];
412            let read = self.inner.read(&mut chunk)?;
413            buf[..read].copy_from_slice(&chunk[..read]);
414            Ok(read)
415        }
416    }
417
418    #[test]
419    fn from_reader_emits_chunked_bytes_and_completes() {
420        let sink = StreamConverters::from_reader(|| Ok(Cursor::new(b"abcdef".to_vec())), 2)
421            .run_with(TestSink::probe())
422            .expect("reader source materializes");
423
424        sink.request(4);
425        sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec(), b"ef".to_vec()]);
426        sink.expect_complete();
427    }
428
429    #[test]
430    fn from_reader_closes_exactly_once_on_completion() {
431        let drops = Arc::new(AtomicUsize::new(0));
432        let drops_for_reader = Arc::clone(&drops);
433        let sink = StreamConverters::from_reader(
434            move || {
435                Ok(CountingReader {
436                    inner: Cursor::new(b"hello".to_vec()),
437                    drops: Arc::clone(&drops_for_reader),
438                })
439            },
440            8,
441        )
442        .run_with(TestSink::probe())
443        .expect("reader source materializes");
444
445        sink.request(2);
446        sink.assert_next(b"hello".to_vec());
447        sink.expect_complete();
448        wait_for_counter(&drops, 1);
449    }
450
451    #[test]
452    fn from_reader_closes_exactly_once_on_cancellation() {
453        let drops = Arc::new(AtomicUsize::new(0));
454        let drops_for_reader = Arc::clone(&drops);
455        let mut sink = StreamConverters::from_reader(
456            move || {
457                Ok(CountingReader {
458                    inner: Cursor::new(vec![1_u8; 32]),
459                    drops: Arc::clone(&drops_for_reader),
460                })
461            },
462            4,
463        )
464        .run_with(TestSink::probe())
465        .expect("reader source materializes");
466
467        sink.request(1);
468        sink.assert_next(vec![1_u8; 4]);
469        sink.cancel();
470        wait_for_counter(&drops, 1);
471    }
472
473    #[test]
474    fn from_reader_surfaces_read_failure() {
475        struct FailingReader;
476
477        impl Read for FailingReader {
478            fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
479                Err(std::io::Error::other("reader boom"))
480            }
481        }
482
483        let sink = StreamConverters::from_reader(|| Ok(FailingReader), 8)
484            .run_with(TestSink::probe())
485            .expect("reader source materializes");
486
487        sink.request(1);
488        assert_eq!(
489            sink.expect_error(),
490            StreamError::Failed("reader boom".to_owned())
491        );
492    }
493
494    #[test]
495    fn from_reader_caps_buffered_read_ahead_for_slow_consumers() {
496        let payload: Vec<u8> = (0..64_u8).cycle().take(8192 * 16).collect();
497        let payload_for_reader = payload.clone();
498        let reads = Arc::new(AtomicUsize::new(0));
499        let reads_for_reader = Arc::clone(&reads);
500        let sink = StreamConverters::from_reader(
501            move || {
502                Ok(CountingChunkReader {
503                    inner: Cursor::new(payload_for_reader.clone()),
504                    chunk_size: 256,
505                    reads: Arc::clone(&reads_for_reader),
506                })
507            },
508            256,
509        )
510        .run_with(TestSink::probe())
511        .expect("reader source materializes");
512
513        sink.request(1);
514        let first = sink.expect_next();
515        assert_eq!(first.len(), 256);
516
517        let last_seen = Arc::new(AtomicUsize::new(0));
518        let quiet_since_ms = Arc::new(AtomicU64::new(0));
519        let start = Instant::now();
520        assert!(wait_until(Duration::from_secs(2), {
521            let last_seen = Arc::clone(&last_seen);
522            let quiet_since_ms = Arc::clone(&quiet_since_ms);
523            let reads = Arc::clone(&reads);
524            move || {
525                let current = reads.load(Ordering::SeqCst);
526                let last = last_seen.load(Ordering::SeqCst);
527                if current != last {
528                    last_seen.store(current, Ordering::SeqCst);
529                    quiet_since_ms.store(start.elapsed().as_millis() as u64, Ordering::SeqCst);
530                    return false;
531                }
532
533                let quiet_for =
534                    start.elapsed().as_millis() as u64 - quiet_since_ms.load(Ordering::SeqCst);
535                current > 0 && quiet_for >= 100
536            }
537        }));
538
539        assert!(
540            reads.load(Ordering::SeqCst) <= READER_QUEUE_CAPACITY + 2,
541            "reader should plateau near the bounded queue capacity"
542        );
543
544        sink.request(usize::MAX);
545        let mut collected = first;
546        for chunk in sink.drain_until_complete() {
547            collected.extend_from_slice(&chunk);
548        }
549        assert_eq!(collected, payload);
550    }
551
552    #[test]
553    fn to_writer_writes_all_chunks_and_flushes_once() {
554        let writes = Arc::new(Mutex::new(Vec::new()));
555        let flushes = Arc::new(AtomicUsize::new(0));
556        let drops = Arc::new(AtomicUsize::new(0));
557        let completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
558            .run_with(StreamConverters::to_writer({
559                let writes = Arc::clone(&writes);
560                let flushes = Arc::clone(&flushes);
561                let drops = Arc::clone(&drops);
562                move || {
563                    Ok(CountingWriter {
564                        writes: Arc::clone(&writes),
565                        flushes: Arc::clone(&flushes),
566                        drops: Arc::clone(&drops),
567                        fail_write: false,
568                    })
569                }
570            }))
571            .expect("writer sink materializes");
572
573        completion.wait().expect("writer sink completes");
574        assert_eq!(
575            writes.lock().expect("writes poisoned").as_slice(),
576            &[b"ab".to_vec(), b"cd".to_vec()]
577        );
578        assert_eq!(flushes.load(Ordering::SeqCst), 1);
579        assert_eq!(drops.load(Ordering::SeqCst), 1);
580    }
581
582    #[test]
583    fn to_writer_flushes_and_drops_once_on_failure() {
584        let flushes = Arc::new(AtomicUsize::new(0));
585        let drops = Arc::new(AtomicUsize::new(0));
586        let completion = Source::<Vec<u8>>::failed(StreamError::Failed("upstream boom".to_owned()))
587            .run_with(StreamConverters::to_writer({
588                let flushes = Arc::clone(&flushes);
589                let drops = Arc::clone(&drops);
590                move || {
591                    Ok(CountingWriter {
592                        writes: Arc::new(Mutex::new(Vec::new())),
593                        flushes: Arc::clone(&flushes),
594                        drops: Arc::clone(&drops),
595                        fail_write: false,
596                    })
597                }
598            }))
599            .expect("writer sink materializes");
600
601        assert_eq!(
602            completion.wait(),
603            Err(StreamError::Failed("upstream boom".to_owned()))
604        );
605        assert_eq!(flushes.load(Ordering::SeqCst), 1);
606        assert_eq!(drops.load(Ordering::SeqCst), 1);
607    }
608
609    #[test]
610    fn to_writer_flushes_and_drops_once_on_cancellation() {
611        let flushes = Arc::new(AtomicUsize::new(0));
612        let drops = Arc::new(AtomicUsize::new(0));
613        let completion = Source::repeat(vec![7_u8; 4])
614            .run_with(StreamConverters::to_writer({
615                let flushes = Arc::clone(&flushes);
616                let drops = Arc::clone(&drops);
617                move || {
618                    Ok(CountingWriter {
619                        writes: Arc::new(Mutex::new(Vec::new())),
620                        flushes: Arc::clone(&flushes),
621                        drops: Arc::clone(&drops),
622                        fail_write: false,
623                    })
624                }
625            }))
626            .expect("writer sink materializes");
627
628        drop(completion);
629        wait_for_counter(&flushes, 1);
630        wait_for_counter(&drops, 1);
631    }
632
633    #[test]
634    fn to_writer_surfaces_write_failure() {
635        let completion = Source::single(vec![1_u8])
636            .run_with(StreamConverters::to_writer(|| {
637                Ok(CountingWriter {
638                    writes: Arc::new(Mutex::new(Vec::new())),
639                    flushes: Arc::new(AtomicUsize::new(0)),
640                    drops: Arc::new(AtomicUsize::new(0)),
641                    fail_write: true,
642                })
643            }))
644            .expect("writer sink materializes");
645
646        assert_eq!(
647            completion.wait(),
648            Err(StreamError::Failed("writer boom".to_owned()))
649        );
650    }
651
652    #[test]
653    fn file_io_round_trips_bytes() {
654        let path = unique_temp_path("roundtrip");
655        let write_completion = Source::from_iter([b"ab".to_vec(), b"cd".to_vec()])
656            .run_with(FileIO::to_path(path.clone()))
657            .expect("file sink materializes");
658        write_completion.wait().expect("file write completes");
659
660        let sink = FileIO::from_path(path.clone(), 2)
661            .run_with(TestSink::probe())
662            .expect("file source materializes");
663        sink.request(4);
664        sink.assert_next_n([b"ab".to_vec(), b"cd".to_vec()]);
665        sink.expect_complete();
666
667        std::fs::remove_file(path).expect("remove roundtrip file");
668    }
669
670    #[test]
671    fn file_io_source_surfaces_open_failure() {
672        let missing = unique_temp_path("missing");
673        let result = FileIO::from_path(missing, 4).run_with(TestSink::probe());
674        assert!(matches!(result, Err(StreamError::Failed(_))));
675    }
676
677    #[test]
678    fn file_io_sink_creates_and_truncates_file() {
679        let path = unique_temp_path("truncate");
680        std::fs::write(&path, b"stale bytes").expect("seed file");
681
682        let completion = Source::single(b"ok".to_vec())
683            .run_with(FileIO::to_path(path.clone()))
684            .expect("file sink materializes");
685        completion.wait().expect("file write completes");
686        assert_eq!(std::fs::read(&path).expect("read file"), b"ok");
687
688        std::fs::remove_file(path).expect("remove truncate file");
689    }
690
691    #[test]
692    fn file_io_source_default_chunk_size_works() {
693        let path = unique_temp_path("default");
694        std::fs::write(&path, b"hi").expect("write seed file");
695
696        let sink = FileIO::from_path_default(path.clone())
697            .run_with(TestSink::probe())
698            .expect("file source materializes");
699        sink.request(2);
700        sink.assert_next(b"hi".to_vec());
701        sink.expect_complete();
702
703        std::fs::remove_file(path).expect("remove default file");
704    }
705}