mini_io_queue/
blocking.rs

1//! Synchronous reader/writer queue for generic items or byte arrays.
2//!
3//! Each queue has a [`Reader`] and [`Writer`] part. Data can be copied into the writer's buffer and
4//! sent to the reader allocations, allowing nonblocking communication across threads.
5//!
6//! Reading and writing with the queue does not require any allocation, with the downside that the
7//! queue has a fixed capacity on creation.
8//!
9//! Unlike [`nonblocking`], this queue blocks to wait for data on the reader end, or wait for space
10//! on the writer end. For [`u8`] storage, this means the queue can be used as a [`Read`] or
11//! [`Write`].
12//!
13//! If you are using an async runtime, you are probably more interested in the [`asyncio`] queue,
14//! which does not block.
15//!
16//! # Example
17//! ```
18//! use mini_io_queue::blocking::queue;
19//!
20//! let (mut reader, mut writer) = queue(8);
21//!
22//! let write_thread = std::thread::spawn(move || {
23//!     for i in 0..16 {
24//!         writer.write(&[i]);
25//!     }
26//! });
27//!
28//! let read_thread = std::thread::spawn(move || {
29//!     for i in 0..16 {
30//!         let mut buf = [0];
31//!         reader.read_exact(&mut buf).unwrap();
32//!
33//!         assert_eq!(buf[0], i);
34//!     }
35//! });
36//!
37//! write_thread.join().unwrap();
38//! read_thread.join().unwrap();
39//! ```
40//!
41//! [`Reader`]: self::Reader
42//! [`Writer`]: self::Writer
43//! [`nonblocking`]: crate::nonblocking
44//! [`Read`]: std::io::Read
45//! [`Write`]: std::io::Write
46//! [`asyncio`]: crate::asyncio
47
48use crate::storage::Storage;
49use crate::{Region, RegionMut, Ring};
50use std::sync::atomic::{AtomicBool, Ordering};
51use std::sync::{Arc, Condvar, Mutex};
52use std::{error, fmt, io};
53
54/// Creates a queue that is backed by a specific storage. The queue will use the storage's entire
55/// capacity, and will be initialized with an empty read buffer and a full write buffer.
56///
57/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
58/// does.
59///
60/// # Example
61/// ```
62/// use mini_io_queue::blocking::queue_from;
63/// use mini_io_queue::storage::HeapBuffer;
64///
65/// let buffer = HeapBuffer::<u8>::new(100);
66/// let (reader, writer) = queue_from(buffer);
67/// ```
68///
69/// [`Send`]: std::marker::Send
70/// [`Sync`] std::marker::Sync
71pub fn queue_from<T, S>(storage: S) -> (Reader<S>, Writer<S>)
72where
73    S: Storage<T>,
74{
75    let ring = Ring::new(storage.capacity());
76    queue_from_parts(ring, storage)
77}
78
79/// Creates a queue from a separately allocated ring and storage. The queue will use the ring's
80/// capacity, and be initialized with a read buffer from the ring's left region and a write buffer
81/// from the ring's right region.
82///
83/// It is up to the user to ensure the storage has enough capacity for the ring. If the ring's
84/// capacity is larger than the storage's length, the reader and writer may panic.
85///
86/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
87/// does.
88///
89/// # Example
90/// ```
91/// use mini_io_queue::Ring;
92/// use mini_io_queue::blocking::queue_from_parts;
93/// use mini_io_queue::storage::{HeapBuffer, Storage};
94///
95/// // Create a queue with half of the underlying buffer in the read side.
96/// let ring = Ring::new(10);
97/// ring.advance_right(5);
98///
99/// let mut buffer = HeapBuffer::new(10);
100/// buffer.slice_mut(0..5).copy_from_slice(&[1, 2, 3, 4, 5]);
101///
102/// let (reader, writer) = queue_from_parts(ring, buffer);
103/// ```
104///
105/// [`Send`]: std::marker::Send
106/// [`Sync`]: std::marker::Sync
107pub fn queue_from_parts<S>(ring: Ring, storage: S) -> (Reader<S>, Writer<S>) {
108    let state = Arc::new(State {
109        ring,
110        storage,
111
112        is_reader_open: AtomicBool::new(true),
113        is_writer_open: AtomicBool::new(true),
114
115        data_available_cond: Condvar::new(),
116        space_available_cond: Condvar::new(),
117    });
118
119    let reader = Reader {
120        state: state.clone(),
121        data_available_mutex: Mutex::new(()),
122    };
123    let writer = Writer {
124        state,
125        space_available_mutex: Mutex::new(()),
126    };
127
128    (reader, writer)
129}
130
131#[cfg(feature = "heap-buffer")]
132mod heap_constructors {
133    use crate::blocking::{queue_from_parts, Reader, Writer};
134    use crate::storage::HeapBuffer;
135    use crate::Ring;
136
137    /// Creates a queue with a specific capacity, allocating storage on the heap. The queue will
138    /// be initialized with an empty read buffer and a full write buffer containing the element's
139    /// default value.
140    ///
141    /// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the element
142    /// type also does.
143    ///
144    /// # Example
145    /// ```
146    /// use mini_io_queue::blocking::queue;
147    ///
148    /// let (reader, writer) = queue::<u8>(100);
149    /// ```
150    ///
151    /// [`Send`]: std::marker::Send
152    /// [`Sync`]: std::marker::Sync
153    #[cfg_attr(docsrs, doc(cfg(feature = "heap-buffer")))]
154    pub fn queue<T>(capacity: usize) -> (Reader<HeapBuffer<T>>, Writer<HeapBuffer<T>>)
155    where
156        T: Default,
157    {
158        let ring = Ring::new(capacity);
159        let buffer = HeapBuffer::new(capacity);
160
161        queue_from_parts(ring, buffer)
162    }
163}
164
165#[cfg(feature = "heap-buffer")]
166pub use self::heap_constructors::*;
167
168#[derive(Debug)]
169struct State<S> {
170    ring: Ring,
171    storage: S,
172
173    is_reader_open: AtomicBool,
174    is_writer_open: AtomicBool,
175
176    data_available_cond: Condvar,
177    space_available_cond: Condvar,
178}
179
180/// An error indicating why a writer failed.
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub enum WriteError {
183    /// Writing failed because the reader was closed, preventing the read buffer from emptying.
184    ReaderClosed,
185}
186
187/// An error indicating why reading failed.
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum ReadExactError {
190    /// Reading failed because the writer was closed, meaning no more data will become available.
191    WriterClosed,
192}
193
194/// Receives items from the queue.
195///
196/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
197/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
198#[derive(Debug)]
199pub struct Reader<S> {
200    state: Arc<State<S>>,
201    data_available_mutex: Mutex<()>,
202}
203
204/// Adds items to the queue.
205///
206/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
207/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
208#[derive(Debug)]
209pub struct Writer<S> {
210    state: Arc<State<S>>,
211    space_available_mutex: Mutex<()>,
212}
213
214impl<S> State<S> {
215    fn close_reader(&self) {
216        let was_open = self.is_reader_open.swap(false, Ordering::AcqRel);
217        if was_open {
218            self.space_available_cond.notify_all();
219        }
220    }
221
222    fn close_writer(&self) {
223        let was_open = self.is_writer_open.swap(false, Ordering::AcqRel);
224        if was_open {
225            self.data_available_cond.notify_all();
226        }
227    }
228}
229
230impl<S> Reader<S> {
231    /// Returns if the corresponding writer is still open.
232    ///
233    /// If this is `false`, unread data will still be available to read but a well-behaved writer
234    /// will not provide any new data.
235    #[inline]
236    pub fn is_writer_open(&self) -> bool {
237        self.state.is_writer_open.load(Ordering::Acquire)
238    }
239
240    /// Returns if data is available in the reader's buffer.
241    ///
242    /// If this is true it is guaranteed that the next call to [`fill_buf`] will return a non-empty
243    /// slice, unless [`consume`] is called first.
244    ///
245    /// Keep in mind that when using a reader and writer on separate threads, a reader that has no
246    /// data can receive data at any time - even between calls to `has_data` and other functions.
247    ///
248    /// [`fill_buf`]: Reader::fill_buf
249    /// [`consume`]: Reader::consume
250    #[inline]
251    pub fn has_data(&self) -> bool {
252        let (r1, r2) = self.state.ring.left_ranges();
253        !r1.is_empty() || !r2.is_empty()
254    }
255
256    /// Returns if the buffer is full, i.e all space is allocated to the reader and any write
257    /// operations will block.
258    ///
259    /// If this is true a reader can only resume the writer by calling [`consume`] to pass capacity
260    /// to the writer.
261    ///
262    /// Keep in mind that when using a reader and writer on separate threads, a reader that is not
263    /// full can become full at any time - even between calls to `is_full` and other functions.
264    ///
265    /// [`consume`]: Reader::consume
266    #[inline]
267    pub fn is_full(&self) -> bool {
268        let (r1, r2) = self.state.ring.right_ranges();
269        r1.is_empty() && r2.is_empty()
270    }
271
272    /// Attempt to read from the reader's buffer, blocking to wait for more data if it is empty.
273    ///
274    /// This function is a lower-level call. It needs to be paired with the [`consume`] method to
275    /// function properly. When calling this method, none of the contents will be "read" in the
276    /// sense that later calling `fill_buf` may return the same contents. As such, [`consume`] must
277    /// be called with the number of bytes that are consumed from this buffer to ensure that the
278    /// items are never returned twice.
279    ///
280    /// An empty buffer returned indicates that all data has been read and the writer has closed.
281    ///
282    /// [`consume`]: Reader::consume
283    pub fn fill_buf<T>(&mut self) -> Region<T>
284    where
285        S: Storage<T>,
286    {
287        if self.has_data() {
288            return self.buf();
289        }
290
291        // If the writer is closed, we've now read everything we could.
292        if !self.is_writer_open() {
293            // Empty slice indicates the writer closed.
294            return Default::default();
295        }
296
297        // If no data is available, park and ask the writer to unpark us when it writes something.
298        let mut lock = self.data_available_mutex.lock().unwrap();
299        loop {
300            lock = self.state.data_available_cond.wait(lock).unwrap();
301
302            if self.has_data() {
303                return self.buf();
304            }
305
306            // If the writer is closed, we've now read everything we could.
307            if !self.is_writer_open() {
308                // Empty slice indicates the writer closed.
309                return Default::default();
310            }
311        }
312    }
313
314    /// Marks items at the start of the reader buffer as consumed, removing them from the slice
315    /// returned by [`fill_buf`] and adding their capacity to the end of the writer's buffer.
316    /// Since queues have a fixed underlying length, calling this is required to allow the transfer
317    /// of more data.
318    ///
319    /// # Panics
320    /// This function will panic if `amt` is larger than the reader's available data buffer.
321    ///
322    /// [`fill_buf`]: Reader::fill_buf
323    pub fn consume(&mut self, amt: usize) {
324        self.state.ring.advance_left(amt);
325
326        // Unpark the writer if it was waiting for space.
327        self.state.space_available_cond.notify_all();
328    }
329
330    /// Pulls some items from this queue into the specified buffer, returning how many items were
331    /// read.
332    ///
333    /// This method will complete immediately if at least one item is available to read, otherwise
334    /// it will block until some are available.
335    ///
336    /// # Return
337    /// It is guaranteed that the return value is `<= buf.len()`.
338    ///
339    /// A return value of `0` indicates one of these two scenarios:
340    ///  1. The writer has closed and all items have been read.
341    ///  2. The buffer specified had a length of 0.
342    pub fn read<T>(&mut self, buf: &mut [T]) -> usize
343    where
344        S: Storage<T>,
345        T: Clone,
346    {
347        let src_buf = self.fill_buf();
348
349        if src_buf.is_empty() {
350            // This indicates the writer has closed and all data has been read.
351            return 0;
352        }
353
354        let len = src_buf.len().min(buf.len());
355        src_buf.slice(..len).clone_to_slice(&mut buf[..len]);
356
357        self.consume(len);
358        len
359    }
360
361    /// Reads the exact number of items required to fill `buf`.
362    ///
363    /// If the writer closes before the buffer is completely filled, an error of the kind
364    /// [`ReadExactError::WriterClosed`] will be returned.
365    ///
366    /// # Return
367    /// If the return value is `Ok(n)`, it is guaranteed that `n == buf.len()`.
368    pub fn read_exact<T>(&mut self, buf: &mut [T]) -> Result<usize, ReadExactError>
369    where
370        S: Storage<T>,
371        T: Clone,
372    {
373        let len = buf.len();
374        let src_buf = loop {
375            let src_buf = self.fill_buf();
376
377            if src_buf.len() >= len {
378                break src_buf;
379            }
380
381            if !self.is_writer_open() {
382                // The writer has closed, required data will never be ready.
383                return Err(ReadExactError::WriterClosed);
384            }
385        };
386
387        src_buf.slice(..len).clone_to_slice(buf);
388        self.consume(len);
389
390        Ok(len)
391    }
392
393    /// Close the reader, indicating to the writer that no more data will be read.
394    ///
395    /// Any in-progress writes or flushes on the writer will be interrupted, and any future
396    /// operations will fail. Closing the reader multiple times has no effect.
397    ///
398    /// Dropping the reader object will also close it.
399    #[inline]
400    pub fn close(&mut self) {
401        self.state.close_reader();
402    }
403
404    #[inline]
405    fn buf<T>(&self) -> Region<T>
406    where
407        S: Storage<T>,
408    {
409        let (range_0, range_1) = self.state.ring.left_ranges();
410        Region::new(
411            self.state.storage.slice(range_0),
412            self.state.storage.slice(range_1),
413        )
414    }
415}
416
417impl<S> io::Read for Reader<S>
418where
419    S: Storage<u8>,
420{
421    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
422        let src_buf = self.fill_buf();
423
424        let len = src_buf.len().min(buf.len());
425        src_buf.slice(..len).copy_to_slice(&mut buf[..len]);
426
427        self.consume(len);
428
429        Ok(len)
430    }
431}
432
433impl<S> io::BufRead for Reader<S>
434where
435    S: Storage<u8>,
436{
437    fn fill_buf(&mut self) -> io::Result<&[u8]> {
438        Ok(self.fill_buf().contiguous())
439    }
440
441    fn consume(&mut self, amt: usize) {
442        self.consume(amt);
443    }
444}
445
446impl<S> Drop for Reader<S> {
447    fn drop(&mut self) {
448        self.state.close_reader();
449    }
450}
451
452impl<S> Writer<S> {
453    /// Returns if the corresponding reader is still open.
454    ///
455    /// If this is `false`, any attempt to write or flush the object will fail.
456    #[inline]
457    pub fn is_reader_open(&self) -> bool {
458        self.state.is_reader_open.load(Ordering::Acquire)
459    }
460
461    /// Returns if space is available in the writer's buffer.
462    ///
463    /// If this is true it is guaranteed that the next call to [`empty_buf`] will return a non-empty
464    /// slice, unless [`feed`] is called first.
465    ///
466    /// Keep in mind that when using a reader and writer on separate threads, a writer that has no
467    /// space can have more made available at any time - even between calls to `has_space` and other
468    /// functions.
469    ///
470    /// [`empty_buf`]: Writer::empty_buf
471    /// [`feed`]: Writer::feed
472    #[inline]
473    pub fn has_space(&self) -> bool {
474        let (r0, r1) = self.state.ring.right_ranges();
475        !r0.is_empty() || !r1.is_empty()
476    }
477
478    /// Returns if the buffer is flushed, i.e there are no items to read and any read operations
479    /// will stall.
480    ///
481    /// If this is true a writer can only resume the reader by calling [`feed`] to pass items to
482    /// the reader.
483    ///
484    /// Keep in mind that when using a reader and writer on separate threads, a writer that is not
485    /// flushed can become flushed at any time - even between calls to `is_flushed` and other
486    /// functions.
487    ///
488    /// [`feed`]: Writer::feed
489    #[inline]
490    pub fn is_flushed(&self) -> bool {
491        let (r0, r1) = self.state.ring.left_ranges();
492        r0.is_empty() && r1.is_empty()
493    }
494
495    fn get_flush_state(&self) -> Option<Result<(), WriteError>> {
496        if self.is_flushed() {
497            return Some(Ok(()));
498        }
499        if !self.is_reader_open() {
500            return Some(Err(WriteError::ReaderClosed));
501        }
502        None
503    }
504
505    #[inline]
506    fn buf<T>(&mut self) -> RegionMut<T>
507    where
508        S: Storage<T>,
509    {
510        let (range_0, range_1) = self.state.ring.right_ranges();
511
512        // `Ring` guarantees that a left region will only overlap a right region when this order
513        // is followed:
514        //  - Get the right region range
515        //  - Advance the right region
516        //  - Get the left region range
517        // Given that the borrow checker prevents this here (`buf` and `consume` both take
518        // &mut self), and assuming the Reader behaves correctly and does not hold references to the
519        // left region's buffer while advancing it, there is no way to get another range that
520        // overlaps this one.
521        RegionMut::new(
522            unsafe { self.state.storage.slice_mut_unchecked(range_0) },
523            unsafe { self.state.storage.slice_mut_unchecked(range_1) },
524        )
525    }
526
527    /// Attempt to get the writable buffer, blocking to wait for more space if it is empty.
528    ///
529    /// This functions is a lower-level call. It needs to be paired with the [`feed`] method to
530    /// function properly. When calling this method, none of the contents will be "written" in the
531    /// sense that later calling `empty_buf` may return the same contents. As such, [`feed`] must be
532    /// called with the number of items that have been written to the buffer to ensure that the
533    /// items are never returned twice.
534    ///
535    /// An empty buffer returned indicates that the queue cannot be written to as the reader has
536    /// closed.
537    ///
538    /// [`feed`]: Writer::feed
539    pub fn empty_buf<T>(&mut self) -> RegionMut<T>
540    where
541        S: Storage<T>,
542    {
543        // If the reader is closed there is no point in writing anything, even if space is
544        // available.
545        if !self.is_reader_open() {
546            // Empty slice indicates the reader closed.
547            return Default::default();
548        }
549        if self.has_space() {
550            return self.buf();
551        }
552
553        // If no space is available, park and ask the reader to unpark us when it writes something.
554        {
555            let mut lock = self.space_available_mutex.lock().unwrap();
556            loop {
557                lock = self.state.space_available_cond.wait(lock).unwrap();
558
559                if !self.is_reader_open() {
560                    // Empty slice indicates the reader closed.
561                    return Default::default();
562                }
563                if self.has_space() {
564                    break;
565                }
566            }
567        }
568
569        self.buf()
570    }
571
572    /// Marks items at the start of the writer buffer as ready to be read, removing them from the
573    /// slice returned by [`empty_buf`] and making them available in the reader's buffer.
574    ///
575    /// # Panics
576    /// This function will panic if `amt` is larger than the writer's available space buffer.
577    ///
578    /// [`empty_buf`]: Writer::empty_buf
579    pub fn feed(&mut self, len: usize) {
580        self.state.ring.advance_right(len);
581
582        // Unpark the reader if it was waiting for data.
583        self.state.data_available_cond.notify_all();
584    }
585
586    /// Writes some items from a buffer into this queue, returning how many items were written.
587    ///
588    /// This function will attempt to write the entire contents of `buf`, but the entire write may
589    /// not succeed if not enough space is available.
590    ///
591    /// # Return
592    /// It is guaranteed that the return value is `<= buf.len()`.
593    ///
594    /// A return value of `0` indicates one of these two scenarios:
595    ///  1. The reader has closed.
596    ///  2. The buffer specified had a length of 0.
597    pub fn write<T>(&mut self, buf: &[T]) -> usize
598    where
599        S: Storage<T>,
600        T: Clone,
601    {
602        let mut dest_buf = self.empty_buf();
603
604        if dest_buf.is_empty() {
605            // This indicates the reader has closed.
606            return 0;
607        }
608
609        let len = dest_buf.len().min(buf.len());
610        dest_buf.slice_mut(..len).clone_from_slice(&buf[..len]);
611
612        self.feed(len);
613        len
614    }
615
616    /// Attempts to write all items in a buffer into this queue.
617    ///
618    /// If the reader closes before all items are written, an error of the kind
619    /// [`WriteError::ReaderClosed`] will be returned.
620    ///
621    /// # Return
622    /// If the return value is `Ok(n)`, it is guaranteed that `n == buf.len()`.
623    pub fn write_all<T>(&mut self, buf: &[T]) -> Result<usize, WriteError>
624    where
625        S: Storage<T>,
626        T: Clone,
627    {
628        let len = buf.len();
629        let mut dest_buf = loop {
630            let dest_buf = self.empty_buf();
631
632            if dest_buf.is_empty() {
633                // The reader has closed.
634                return Err(WriteError::ReaderClosed);
635            }
636
637            if dest_buf.len() >= len {
638                break dest_buf;
639            }
640        };
641
642        dest_buf.slice_mut(..len).clone_from_slice(buf);
643        self.feed(len);
644
645        Ok(len)
646    }
647
648    /// Flush the buffer, ensuring that any items waiting to be read are consumed by the reader.
649    ///
650    /// If the reader is closed, returns `FlushError::ReaderClosed`. If blocking cannot be completed
651    /// immediately, this method blocks until the reader closed or the buffer is flushed.
652    pub fn flush(&mut self) -> Result<(), WriteError> {
653        if let Some(flush_state) = self.get_flush_state() {
654            return flush_state;
655        }
656
657        // Wait for more data to be read.
658        let mut lock = self.space_available_mutex.lock().unwrap();
659        loop {
660            lock = self.state.space_available_cond.wait(lock).unwrap();
661
662            if let Some(flush_state) = self.get_flush_state() {
663                return flush_state;
664            }
665        }
666    }
667
668    /// Close the writer, flushing any remaining data and indicating to the reader that no more data
669    /// will be written.
670    ///
671    /// Any future read operations will fail. Closing the writer multiple times has no effect.
672    pub fn close(&mut self) -> Result<(), WriteError> {
673        self.flush()?;
674        self.state.close_writer();
675        Ok(())
676    }
677}
678
679impl<S> io::Write for Writer<S>
680where
681    S: Storage<u8>,
682{
683    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
684        let mut dest_buf = self.empty_buf();
685
686        let len = dest_buf.len().min(buf.len());
687        dest_buf.slice_mut(..len).copy_from_slice(&buf[..len]);
688
689        self.feed(len);
690
691        Ok(len)
692    }
693
694    fn flush(&mut self) -> io::Result<()> {
695        self.flush().map_err(Into::into)
696    }
697}
698
699impl<S> Drop for Writer<S> {
700    #[inline]
701    fn drop(&mut self) {
702        self.state.close_writer();
703    }
704}
705
706impl error::Error for WriteError {}
707
708impl fmt::Display for WriteError {
709    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
710        match self {
711            WriteError::ReaderClosed => write!(f, "reader closed"),
712        }
713    }
714}
715
716impl From<WriteError> for io::Error {
717    fn from(err: WriteError) -> Self {
718        match err {
719            WriteError::ReaderClosed => io::ErrorKind::UnexpectedEof.into(),
720        }
721    }
722}
723
724impl error::Error for ReadExactError {}
725
726impl fmt::Display for ReadExactError {
727    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728        match self {
729            ReadExactError::WriterClosed => write!(f, "writer closed"),
730        }
731    }
732}
733
734impl From<ReadExactError> for io::Error {
735    fn from(err: ReadExactError) -> Self {
736        match err {
737            ReadExactError::WriterClosed => io::ErrorKind::UnexpectedEof.into(),
738        }
739    }
740}