mini_io_queue/
nonblocking.rs

1//! Non-blocking reader/writer queue for generic items or byte arrays.
2//!
3//! Each queue has a [`Reader`] and [`Writer`] part. Data can be copied into the writer's buffer
4//! and sent to the reader without locks or allocation, allowing nonblocking communication across
5//! threads.
6//!
7//! Reading and writing with the queue does not require any allocation, with the downside that the
8//! queue has a fixed capacity on creation.
9//!
10//! `Reader` and `Writer` can implement [`Read`] and [`Write`] if the `std-io` feature is
11//! enabled. These implementations will return [`WouldBlock`] errors instead of blocking.
12//!
13//! # Example
14//! ```
15//! use mini_io_queue::nonblocking::queue;
16//!
17//! let (mut reader, mut writer) = queue(8);
18//!
19//! let write_thread = std::thread::spawn(move || {
20//!     for i in 0..16 {
21//!         let buf = [i];
22//!
23//!         // spin until there is space to write
24//!         loop {
25//!             let write_len = writer.write(&buf);
26//!             if write_len == 1 {
27//!                 break;
28//!             }
29//!         }
30//!     }
31//! });
32//!
33//! let read_thread = std::thread::spawn(move || {
34//!     for i in 0..16 {
35//!         let mut buf = [0];
36//!
37//!         // spin until there is data to read
38//!         loop {
39//!             let read_len = reader.read(&mut buf);
40//!             if read_len == 1 {
41//!                 break;
42//!             }
43//!         }
44//!
45//!         assert_eq!(buf[0], i);
46//!     }
47//! });
48//!
49//! write_thread.join().unwrap();
50//! read_thread.join().unwrap();
51//! ```
52//!
53//! [`Reader`]: self::Reader
54//! [`Writer`]: self::Writer
55//! [`Read`]: std::io::Read
56//! [`Write`]: std::io::Write
57//! [`WouldBlock`]: std::io::ErrorKind::WouldBlock
58
59use crate::storage::Storage;
60use crate::{Region, RegionMut, Ring};
61use alloc::sync::Arc;
62use core::sync::atomic::{AtomicBool, Ordering};
63
64/// Creates a queue that is backed by a specific storage. The queue will use the storage's entire
65/// capacity, and will be initialized with an empty read buffer and a full write buffer.
66///
67/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
68/// does.
69///
70/// # Example
71/// ```
72/// use mini_io_queue::nonblocking::queue_from;
73/// use mini_io_queue::storage::HeapBuffer;
74///
75/// let buffer = HeapBuffer::<u8>::new(100);
76/// let (reader, writer) = queue_from(buffer);
77/// ```
78///
79/// [`Send`]: std::marker::Send
80/// [`Sync`]: std::marker::Sync
81pub fn queue_from<T, S>(storage: S) -> (Reader<S>, Writer<S>)
82where
83    S: Storage<T>,
84{
85    let ring = Ring::new(storage.capacity());
86    queue_from_parts(ring, storage)
87}
88
89/// Creates a queue from a separately allocated ring and storage. The queue will use the ring's
90/// capacity, and be initialized with a read buffer from the ring's left region and a write buffer
91/// from the ring's right region.
92///
93/// It is up to the user to ensure the storage has enough capacity for the ring. If the ring's
94/// capacity is larger than the storage's length, the reader and writer may panic.
95///
96/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
97/// does.
98///
99/// # Example
100/// ```
101/// use mini_io_queue::Ring;
102/// use mini_io_queue::nonblocking::queue_from_parts;
103/// use mini_io_queue::storage::{HeapBuffer, Storage};
104///
105/// // Create a queue with half of the underlying buffer in the read side.
106/// let ring = Ring::new(10);
107/// ring.advance_right(5);
108///
109/// let mut buffer = HeapBuffer::new(10);
110/// buffer.slice_mut(0..5).copy_from_slice(&[1, 2, 3, 4, 5]);
111///
112/// let (reader, writer) = queue_from_parts(ring, buffer);
113/// ```
114///
115/// [`Send`]: std::marker::Send
116/// [`Sync`]: std::marker::Sync
117pub fn queue_from_parts<S>(ring: Ring, storage: S) -> (Reader<S>, Writer<S>) {
118    let state = Arc::new(State {
119        ring,
120        storage,
121
122        is_reader_open: AtomicBool::new(true),
123        is_writer_open: AtomicBool::new(true),
124    });
125
126    let reader = Reader {
127        state: state.clone(),
128    };
129    let writer = Writer { state };
130
131    (reader, writer)
132}
133
134#[cfg(feature = "heap-buffer")]
135mod heap_constructors {
136    use crate::nonblocking::{queue_from_parts, Reader, Writer};
137    use crate::storage::HeapBuffer;
138    use crate::Ring;
139
140    /// Creates a queue with a specific capacity, allocating storage on the heap. The queue will
141    /// be initialized with an empty read buffer and a full write buffer containing the element's
142    /// default value.
143    ///
144    /// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the element
145    /// type also does.
146    ///
147    /// # Example
148    /// ```
149    /// use mini_io_queue::nonblocking::queue;
150    ///
151    /// let (reader, writer) = queue::<u8>(100);
152    /// ```
153    ///
154    /// [`Send`]: std::marker::Send
155    /// [`Sync`]: std::marker::Sync
156    #[cfg_attr(docsrs, doc(cfg(feature = "heap-buffer")))]
157    pub fn queue<T>(capacity: usize) -> (Reader<HeapBuffer<T>>, Writer<HeapBuffer<T>>)
158    where
159        T: Default,
160    {
161        let ring = Ring::new(capacity);
162        let buffer = HeapBuffer::new(capacity);
163
164        queue_from_parts(ring, buffer)
165    }
166}
167
168#[cfg(feature = "heap-buffer")]
169pub use self::heap_constructors::*;
170
171#[derive(Debug)]
172struct State<S> {
173    ring: Ring,
174    storage: S,
175
176    is_reader_open: AtomicBool,
177    is_writer_open: AtomicBool,
178}
179
180/// Receives items from the queue.
181///
182/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
183/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
184#[derive(Debug)]
185pub struct Reader<S> {
186    state: Arc<State<S>>,
187}
188
189/// Adds items to the queue.
190///
191/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
192/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
193#[derive(Debug)]
194pub struct Writer<S> {
195    state: Arc<State<S>>,
196}
197
198impl<S> State<S> {
199    fn close_reader(&self) {
200        self.is_reader_open.store(false, Ordering::Release);
201    }
202
203    fn close_writer(&self) {
204        self.is_writer_open.store(false, Ordering::Release);
205    }
206}
207
208impl<S> Reader<S> {
209    /// Returns if the corresponding writer is still open.
210    ///
211    /// If this is `false`, unread data will still be available to read but a well-behaved writer
212    /// will not provide any new data.
213    #[inline]
214    pub fn is_writer_open(&self) -> bool {
215        self.state.is_writer_open.load(Ordering::Acquire)
216    }
217
218    /// Returns if data is available in the reader's buffer.
219    ///
220    /// If this is true it is guaranteed that the next call to [`buf`] will return a non-empty
221    /// slice, unless [`consume`] is called first.
222    ///
223    /// Keep in mind that when using a reader and writer on separate threads, a reader that has no
224    /// data can receive data at any time - even between calls to `has_data` and other functions.
225    ///
226    /// [`buf`]: Reader::buf
227    /// [`consume`]: Reader::consume
228    #[inline]
229    pub fn has_data(&self) -> bool {
230        let (r1, r2) = self.state.ring.left_ranges();
231        !r1.is_empty() || !r2.is_empty()
232    }
233
234    /// Returns if the buffer is full, i.e all space is allocated to the reader and any write
235    /// operations will fail.
236    ///
237    /// If this is true a reader can only resume the writer by calling [`consume`] to pass capacity
238    /// to the writer.
239    ///
240    /// Keep in mind that when using a reader and writer on separate threads, a reader that is not
241    /// full can become full at any time - even between calls to `is_full` and other functions.
242    ///
243    /// [`consume`]: Reader::consume
244    #[inline]
245    pub fn is_full(&self) -> bool {
246        let (r1, r2) = self.state.ring.right_ranges();
247        r1.is_empty() && r2.is_empty()
248    }
249
250    /// Gets the readable buffer.
251    ///
252    /// This function is a lower-level call. It needs to be paired with the [`consume`] method to
253    /// function properly. When calling this method, none of the contents will be "read" in the
254    /// sense that later calling `buf` may return the same contents. As such, [`consume`] must
255    /// be called with the number of bytes that are consumed from this buffer to ensure that the
256    /// items are never returned twice.
257    ///
258    /// An empty buffer returned indicates that no data is available to read.
259    ///
260    /// # Panics
261    /// This function may panic if the underlying storage panics when trying to get a slice to the
262    /// data. This may happen if queue was created with a ring that has a larger capacity than the
263    /// storage.
264    ///
265    /// [`consume`]: Reader::consume
266    #[inline]
267    pub fn buf<T>(&self) -> Region<T>
268    where
269        S: Storage<T>,
270    {
271        let (range_0, range_1) = self.state.ring.left_ranges();
272        Region::new(
273            self.state.storage.slice(range_0),
274            self.state.storage.slice(range_1),
275        )
276    }
277
278    /// Marks items at the start of the reader buffer as consumed, removing them from the slice
279    /// returned by [`buf`] and adding their capacity to the end of the writer's buffer. Since
280    /// queues have a fixed underlying length, calling this is required to allow the transfer of
281    /// more data.
282    ///
283    /// # Panics
284    /// This function will panic if `amt` is larger than the reader's available data buffer.
285    ///
286    /// [`buf`]: Reader::buf
287    #[inline]
288    pub fn consume(&mut self, amt: usize) {
289        self.state.ring.advance_left(amt);
290    }
291
292    /// Pulls some items from this queue into the specified buffer, returning how many items were
293    /// read.
294    ///
295    /// # Return
296    /// It is guaranteed that the return value is `<= buf.len()`.
297    ///
298    /// A return value of `0` indicates one of these three scenarios:
299    ///  1. No data was available to read.
300    ///  2. The writer has closed and all items have been read.
301    ///  3. The buffer specified had a length of 0.
302    pub fn read<T>(&mut self, buf: &mut [T]) -> usize
303    where
304        S: Storage<T>,
305        T: Clone,
306    {
307        let src_buf = self.buf();
308
309        let len = src_buf.len().min(buf.len());
310        src_buf.slice(..len).clone_to_slice(&mut buf[..len]);
311
312        self.consume(len);
313        len
314    }
315
316    /// Close the reader, indicating to the writer that no more data will be read.
317    ///
318    /// Any future write operations will fail. Closing the reader multiple times has no effect.
319    ///
320    /// Dropping the reader object will also close it.
321    #[inline]
322    pub fn close(&mut self) {
323        self.state.close_reader();
324    }
325}
326
327impl<S> Drop for Reader<S> {
328    #[inline]
329    fn drop(&mut self) {
330        self.state.close_reader();
331    }
332}
333
334impl<S> Writer<S> {
335    /// Returns if the corresponding reader is still open.
336    ///
337    /// If this is `false`, any attempt to write or flush the object will fail.
338    #[inline]
339    pub fn is_reader_open(&self) -> bool {
340        self.state.is_reader_open.load(Ordering::Acquire)
341    }
342
343    /// Returns if space is available in the writer's buffer.
344    ///
345    /// If this is true it is guaranteed that the next call to [`buf`] will return a non-empty
346    /// slice, unless [`feed`] is called first.
347    ///
348    /// Keep in mind that when using a reader and writer on separate threads, a writer that has no
349    /// space can have more made available at any time - even between calls to `has_space` and other
350    /// functions.
351    ///
352    /// [`buf`]: Writer::buf
353    /// [`feed`]: Writer::feed
354    #[inline]
355    pub fn has_space(&self) -> bool {
356        let (r1, r2) = self.state.ring.right_ranges();
357        !r1.is_empty() || !r2.is_empty()
358    }
359
360    /// Returns if the buffer is flushed, i.e there are no items to read and any read operations
361    /// will stall.
362    ///
363    /// If this is true a writer can only resume the reader by calling [`feed`] to pass items to
364    /// the reader.
365    ///
366    /// Keep in mind that when using a reader and writer on separate threads, a writer that is not
367    /// flushed can become flushed at any time - even between calls to `is_flushed` and other
368    /// functions.
369    ///
370    /// [`feed`]: Writer::feed
371    #[inline]
372    pub fn is_flushed(&self) -> bool {
373        let (r1, r2) = self.state.ring.left_ranges();
374        r1.is_empty() && r2.is_empty()
375    }
376
377    /// Gets the writable buffer.
378    ///
379    /// This functions is a lower-level call. It needs to be paired with the [`feed`] method to
380    /// function properly. When calling this method, none of the contents will be "written" in the
381    /// sense that later calling `buf` may return the same contents. As such, [`feed`] must be
382    /// called with the number of items that have been written to the buffer to ensure that the
383    /// items are never returned twice.
384    ///
385    /// An empty buffer returned indicates that no space is currently available, or the reader has
386    /// closed.
387    ///
388    /// # Panics
389    /// This function may panic if the underlying storage panics when trying to get a slice to the
390    /// data. This may happen if queue was created with a ring that has a larger capacity than the
391    /// storage.
392    ///
393    /// [`feed`]: Writer::feed
394    pub fn buf<T>(&mut self) -> RegionMut<T>
395    where
396        S: Storage<T>,
397    {
398        // If the reader is closed there is no point in writing anything, even if space is
399        // available.
400        if !self.is_reader_open() {
401            // Empty slice indicates the reader closed.
402            return Default::default();
403        }
404
405        let (range_0, range_1) = self.state.ring.right_ranges();
406
407        // `Ring` guarantees that a left region will only overlap a right region when this order
408        // is followed:
409        //  - Get the right region range
410        //  - Advance the right region
411        //  - Get the left region range
412        // Given that the borrow checker prevents this here (`buf` and `consume` both take
413        // &mut self), and assuming the Reader behaves correctly and does not hold references to the
414        // left region's buffer while advancing it, there is no way to get another range that
415        // overlaps this one.
416        RegionMut::new(
417            unsafe { self.state.storage.slice_mut_unchecked(range_0) },
418            unsafe { self.state.storage.slice_mut_unchecked(range_1) },
419        )
420    }
421
422    /// Mark items at the start of the writer buffer as ready to be read, removing them from the
423    /// slice returned by [`buf`] and making them available in the reader's buffer.
424    ///
425    /// # Panics
426    /// This function will panic if `amt` is larger than the writer's available space buffer.
427    ///
428    /// [`buf`]: Writer::buf
429    #[inline]
430    pub fn feed(&mut self, amt: usize) {
431        self.state.ring.advance_right(amt);
432    }
433
434    /// Writes some items from a buffer into this queue, returning how many items were written.
435    ///
436    /// This function will attempt to write the entire contents of `buf`, but the entire write may
437    /// not succeed if not enough space is available.
438    ///
439    /// # Return
440    /// It is guaranteed that the return value is `<= buf.len()`.
441    ///
442    /// A return value of `0` indicates one of these three scenarios:
443    ///  1. No space is available to write.
444    ///  2. The reader has closed.
445    ///  3. The buffer specified had a length of 0.
446    pub fn write<T>(&mut self, buf: &[T]) -> usize
447    where
448        S: Storage<T>,
449        T: Clone,
450    {
451        let mut dest_buf = self.buf();
452
453        let len = dest_buf.len().min(buf.len());
454        dest_buf.slice_mut(..len).clone_from_slice(&buf[..len]);
455
456        self.feed(len);
457        len
458    }
459
460    /// Close the writer, indicating to the reader that no more data will be written.
461    ///
462    /// Closing the writer multiple times has no effect.
463    ///
464    /// Dropping the writer object will also close it.
465    #[inline]
466    pub fn close(&mut self) {
467        self.state.close_writer();
468    }
469}
470
471impl<S> Drop for Writer<S> {
472    #[inline]
473    fn drop(&mut self) {
474        self.state.close_writer();
475    }
476}
477
478#[cfg(feature = "std-io")]
479mod io_impls {
480    use crate::nonblocking::{Reader, Writer};
481    use crate::storage::Storage;
482    use std::io::{BufRead, ErrorKind, Read, Result, Write};
483
484    #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
485    impl<S> Read for Reader<S>
486    where
487        S: Storage<u8>,
488    {
489        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
490            let src_buf = self.buf();
491
492            if src_buf.is_empty() {
493                return if self.is_writer_open() {
494                    // No data to read, this is an error since blocking would be required.
495                    Err(ErrorKind::WouldBlock.into())
496                } else {
497                    // Writer is closed and all data has been read, return 0 to indicate EOF.
498                    Ok(0)
499                };
500            }
501
502            let len = src_buf.len().min(buf.len());
503            src_buf.slice(..len).copy_to_slice(&mut buf[..len]);
504
505            self.consume(len);
506
507            Ok(len)
508        }
509    }
510
511    #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
512    impl<S> BufRead for Reader<S>
513    where
514        S: Storage<u8>,
515    {
516        fn fill_buf(&mut self) -> Result<&[u8]> {
517            let buf = self.buf().contiguous();
518
519            if !buf.is_empty() {
520                return Ok(buf);
521            }
522
523            if self.is_writer_open() {
524                // No data to read, this is an error since blocking would be required.
525                return Err(ErrorKind::WouldBlock.into());
526            }
527
528            // Writer is closed and all data has been read, return an empty slice to indicate EOF.
529            Ok(Default::default())
530        }
531
532        fn consume(&mut self, amt: usize) {
533            self.consume(amt);
534        }
535    }
536
537    #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
538    impl<S> Write for Writer<S>
539    where
540        S: Storage<u8>,
541    {
542        fn write(&mut self, buf: &[u8]) -> Result<usize> {
543            let mut dest_buf = self.buf();
544
545            if !dest_buf.is_empty() {
546                let len = dest_buf.len().min(buf.len());
547                dest_buf.slice_mut(..len).copy_from_slice(&buf[..len]);
548
549                self.feed(len);
550
551                return Ok(len);
552            }
553
554            if !self.is_reader_open() {
555                // Return an empty slice to indicate EOF.
556                return Ok(Default::default());
557            }
558
559            // No space to write, this is an error since blocking would be required.
560            Err(ErrorKind::WouldBlock.into())
561        }
562
563        fn flush(&mut self) -> Result<()> {
564            if self.is_flushed() {
565                return Ok(());
566            }
567            if self.is_reader_open() {
568                return Err(ErrorKind::WouldBlock.into());
569            }
570            Err(ErrorKind::UnexpectedEof.into())
571        }
572    }
573}
574
575#[cfg(feature = "std-io")]
576pub use self::io_impls::*;