mini_io_queue/nonblocking.rs
1//! Non-blocking reader/writer queue for generic items or byte arrays.
2//!
3//! Each queue has a [`Reader`] and [`Writer`] part. Data can be copied into the writer's buffer
4//! and sent to the reader without locks or allocation, allowing nonblocking communication across
5//! threads.
6//!
7//! Reading and writing with the queue does not require any allocation, with the downside that the
8//! queue has a fixed capacity on creation.
9//!
10//! `Reader` and `Writer` can implement [`Read`] and [`Write`] if the `std-io` feature is
11//! enabled. These implementations will return [`WouldBlock`] errors instead of blocking.
12//!
13//! # Example
14//! ```
15//! use mini_io_queue::nonblocking::queue;
16//!
17//! let (mut reader, mut writer) = queue(8);
18//!
19//! let write_thread = std::thread::spawn(move || {
20//! for i in 0..16 {
21//! let buf = [i];
22//!
23//! // spin until there is space to write
24//! loop {
25//! let write_len = writer.write(&buf);
26//! if write_len == 1 {
27//! break;
28//! }
29//! }
30//! }
31//! });
32//!
33//! let read_thread = std::thread::spawn(move || {
34//! for i in 0..16 {
35//! let mut buf = [0];
36//!
37//! // spin until there is data to read
38//! loop {
39//! let read_len = reader.read(&mut buf);
40//! if read_len == 1 {
41//! break;
42//! }
43//! }
44//!
45//! assert_eq!(buf[0], i);
46//! }
47//! });
48//!
49//! write_thread.join().unwrap();
50//! read_thread.join().unwrap();
51//! ```
52//!
53//! [`Reader`]: self::Reader
54//! [`Writer`]: self::Writer
55//! [`Read`]: std::io::Read
56//! [`Write`]: std::io::Write
57//! [`WouldBlock`]: std::io::ErrorKind::WouldBlock
58
59use crate::storage::Storage;
60use crate::{Region, RegionMut, Ring};
61use alloc::sync::Arc;
62use core::sync::atomic::{AtomicBool, Ordering};
63
64/// Creates a queue that is backed by a specific storage. The queue will use the storage's entire
65/// capacity, and will be initialized with an empty read buffer and a full write buffer.
66///
67/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
68/// does.
69///
70/// # Example
71/// ```
72/// use mini_io_queue::nonblocking::queue_from;
73/// use mini_io_queue::storage::HeapBuffer;
74///
75/// let buffer = HeapBuffer::<u8>::new(100);
76/// let (reader, writer) = queue_from(buffer);
77/// ```
78///
79/// [`Send`]: std::marker::Send
80/// [`Sync`]: std::marker::Sync
81pub fn queue_from<T, S>(storage: S) -> (Reader<S>, Writer<S>)
82where
83 S: Storage<T>,
84{
85 let ring = Ring::new(storage.capacity());
86 queue_from_parts(ring, storage)
87}
88
89/// Creates a queue from a separately allocated ring and storage. The queue will use the ring's
90/// capacity, and be initialized with a read buffer from the ring's left region and a write buffer
91/// from the ring's right region.
92///
93/// It is up to the user to ensure the storage has enough capacity for the ring. If the ring's
94/// capacity is larger than the storage's length, the reader and writer may panic.
95///
96/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
97/// does.
98///
99/// # Example
100/// ```
101/// use mini_io_queue::Ring;
102/// use mini_io_queue::nonblocking::queue_from_parts;
103/// use mini_io_queue::storage::{HeapBuffer, Storage};
104///
105/// // Create a queue with half of the underlying buffer in the read side.
106/// let ring = Ring::new(10);
107/// ring.advance_right(5);
108///
109/// let mut buffer = HeapBuffer::new(10);
110/// buffer.slice_mut(0..5).copy_from_slice(&[1, 2, 3, 4, 5]);
111///
112/// let (reader, writer) = queue_from_parts(ring, buffer);
113/// ```
114///
115/// [`Send`]: std::marker::Send
116/// [`Sync`]: std::marker::Sync
117pub fn queue_from_parts<S>(ring: Ring, storage: S) -> (Reader<S>, Writer<S>) {
118 let state = Arc::new(State {
119 ring,
120 storage,
121
122 is_reader_open: AtomicBool::new(true),
123 is_writer_open: AtomicBool::new(true),
124 });
125
126 let reader = Reader {
127 state: state.clone(),
128 };
129 let writer = Writer { state };
130
131 (reader, writer)
132}
133
134#[cfg(feature = "heap-buffer")]
135mod heap_constructors {
136 use crate::nonblocking::{queue_from_parts, Reader, Writer};
137 use crate::storage::HeapBuffer;
138 use crate::Ring;
139
140 /// Creates a queue with a specific capacity, allocating storage on the heap. The queue will
141 /// be initialized with an empty read buffer and a full write buffer containing the element's
142 /// default value.
143 ///
144 /// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the element
145 /// type also does.
146 ///
147 /// # Example
148 /// ```
149 /// use mini_io_queue::nonblocking::queue;
150 ///
151 /// let (reader, writer) = queue::<u8>(100);
152 /// ```
153 ///
154 /// [`Send`]: std::marker::Send
155 /// [`Sync`]: std::marker::Sync
156 #[cfg_attr(docsrs, doc(cfg(feature = "heap-buffer")))]
157 pub fn queue<T>(capacity: usize) -> (Reader<HeapBuffer<T>>, Writer<HeapBuffer<T>>)
158 where
159 T: Default,
160 {
161 let ring = Ring::new(capacity);
162 let buffer = HeapBuffer::new(capacity);
163
164 queue_from_parts(ring, buffer)
165 }
166}
167
168#[cfg(feature = "heap-buffer")]
169pub use self::heap_constructors::*;
170
171#[derive(Debug)]
172struct State<S> {
173 ring: Ring,
174 storage: S,
175
176 is_reader_open: AtomicBool,
177 is_writer_open: AtomicBool,
178}
179
180/// Receives items from the queue.
181///
182/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
183/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
184#[derive(Debug)]
185pub struct Reader<S> {
186 state: Arc<State<S>>,
187}
188
189/// Adds items to the queue.
190///
191/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
192/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
193#[derive(Debug)]
194pub struct Writer<S> {
195 state: Arc<State<S>>,
196}
197
198impl<S> State<S> {
199 fn close_reader(&self) {
200 self.is_reader_open.store(false, Ordering::Release);
201 }
202
203 fn close_writer(&self) {
204 self.is_writer_open.store(false, Ordering::Release);
205 }
206}
207
208impl<S> Reader<S> {
209 /// Returns if the corresponding writer is still open.
210 ///
211 /// If this is `false`, unread data will still be available to read but a well-behaved writer
212 /// will not provide any new data.
213 #[inline]
214 pub fn is_writer_open(&self) -> bool {
215 self.state.is_writer_open.load(Ordering::Acquire)
216 }
217
218 /// Returns if data is available in the reader's buffer.
219 ///
220 /// If this is true it is guaranteed that the next call to [`buf`] will return a non-empty
221 /// slice, unless [`consume`] is called first.
222 ///
223 /// Keep in mind that when using a reader and writer on separate threads, a reader that has no
224 /// data can receive data at any time - even between calls to `has_data` and other functions.
225 ///
226 /// [`buf`]: Reader::buf
227 /// [`consume`]: Reader::consume
228 #[inline]
229 pub fn has_data(&self) -> bool {
230 let (r1, r2) = self.state.ring.left_ranges();
231 !r1.is_empty() || !r2.is_empty()
232 }
233
234 /// Returns if the buffer is full, i.e all space is allocated to the reader and any write
235 /// operations will fail.
236 ///
237 /// If this is true a reader can only resume the writer by calling [`consume`] to pass capacity
238 /// to the writer.
239 ///
240 /// Keep in mind that when using a reader and writer on separate threads, a reader that is not
241 /// full can become full at any time - even between calls to `is_full` and other functions.
242 ///
243 /// [`consume`]: Reader::consume
244 #[inline]
245 pub fn is_full(&self) -> bool {
246 let (r1, r2) = self.state.ring.right_ranges();
247 r1.is_empty() && r2.is_empty()
248 }
249
250 /// Gets the readable buffer.
251 ///
252 /// This function is a lower-level call. It needs to be paired with the [`consume`] method to
253 /// function properly. When calling this method, none of the contents will be "read" in the
254 /// sense that later calling `buf` may return the same contents. As such, [`consume`] must
255 /// be called with the number of bytes that are consumed from this buffer to ensure that the
256 /// items are never returned twice.
257 ///
258 /// An empty buffer returned indicates that no data is available to read.
259 ///
260 /// # Panics
261 /// This function may panic if the underlying storage panics when trying to get a slice to the
262 /// data. This may happen if queue was created with a ring that has a larger capacity than the
263 /// storage.
264 ///
265 /// [`consume`]: Reader::consume
266 #[inline]
267 pub fn buf<T>(&self) -> Region<T>
268 where
269 S: Storage<T>,
270 {
271 let (range_0, range_1) = self.state.ring.left_ranges();
272 Region::new(
273 self.state.storage.slice(range_0),
274 self.state.storage.slice(range_1),
275 )
276 }
277
278 /// Marks items at the start of the reader buffer as consumed, removing them from the slice
279 /// returned by [`buf`] and adding their capacity to the end of the writer's buffer. Since
280 /// queues have a fixed underlying length, calling this is required to allow the transfer of
281 /// more data.
282 ///
283 /// # Panics
284 /// This function will panic if `amt` is larger than the reader's available data buffer.
285 ///
286 /// [`buf`]: Reader::buf
287 #[inline]
288 pub fn consume(&mut self, amt: usize) {
289 self.state.ring.advance_left(amt);
290 }
291
292 /// Pulls some items from this queue into the specified buffer, returning how many items were
293 /// read.
294 ///
295 /// # Return
296 /// It is guaranteed that the return value is `<= buf.len()`.
297 ///
298 /// A return value of `0` indicates one of these three scenarios:
299 /// 1. No data was available to read.
300 /// 2. The writer has closed and all items have been read.
301 /// 3. The buffer specified had a length of 0.
302 pub fn read<T>(&mut self, buf: &mut [T]) -> usize
303 where
304 S: Storage<T>,
305 T: Clone,
306 {
307 let src_buf = self.buf();
308
309 let len = src_buf.len().min(buf.len());
310 src_buf.slice(..len).clone_to_slice(&mut buf[..len]);
311
312 self.consume(len);
313 len
314 }
315
316 /// Close the reader, indicating to the writer that no more data will be read.
317 ///
318 /// Any future write operations will fail. Closing the reader multiple times has no effect.
319 ///
320 /// Dropping the reader object will also close it.
321 #[inline]
322 pub fn close(&mut self) {
323 self.state.close_reader();
324 }
325}
326
327impl<S> Drop for Reader<S> {
328 #[inline]
329 fn drop(&mut self) {
330 self.state.close_reader();
331 }
332}
333
334impl<S> Writer<S> {
335 /// Returns if the corresponding reader is still open.
336 ///
337 /// If this is `false`, any attempt to write or flush the object will fail.
338 #[inline]
339 pub fn is_reader_open(&self) -> bool {
340 self.state.is_reader_open.load(Ordering::Acquire)
341 }
342
343 /// Returns if space is available in the writer's buffer.
344 ///
345 /// If this is true it is guaranteed that the next call to [`buf`] will return a non-empty
346 /// slice, unless [`feed`] is called first.
347 ///
348 /// Keep in mind that when using a reader and writer on separate threads, a writer that has no
349 /// space can have more made available at any time - even between calls to `has_space` and other
350 /// functions.
351 ///
352 /// [`buf`]: Writer::buf
353 /// [`feed`]: Writer::feed
354 #[inline]
355 pub fn has_space(&self) -> bool {
356 let (r1, r2) = self.state.ring.right_ranges();
357 !r1.is_empty() || !r2.is_empty()
358 }
359
360 /// Returns if the buffer is flushed, i.e there are no items to read and any read operations
361 /// will stall.
362 ///
363 /// If this is true a writer can only resume the reader by calling [`feed`] to pass items to
364 /// the reader.
365 ///
366 /// Keep in mind that when using a reader and writer on separate threads, a writer that is not
367 /// flushed can become flushed at any time - even between calls to `is_flushed` and other
368 /// functions.
369 ///
370 /// [`feed`]: Writer::feed
371 #[inline]
372 pub fn is_flushed(&self) -> bool {
373 let (r1, r2) = self.state.ring.left_ranges();
374 r1.is_empty() && r2.is_empty()
375 }
376
377 /// Gets the writable buffer.
378 ///
379 /// This functions is a lower-level call. It needs to be paired with the [`feed`] method to
380 /// function properly. When calling this method, none of the contents will be "written" in the
381 /// sense that later calling `buf` may return the same contents. As such, [`feed`] must be
382 /// called with the number of items that have been written to the buffer to ensure that the
383 /// items are never returned twice.
384 ///
385 /// An empty buffer returned indicates that no space is currently available, or the reader has
386 /// closed.
387 ///
388 /// # Panics
389 /// This function may panic if the underlying storage panics when trying to get a slice to the
390 /// data. This may happen if queue was created with a ring that has a larger capacity than the
391 /// storage.
392 ///
393 /// [`feed`]: Writer::feed
394 pub fn buf<T>(&mut self) -> RegionMut<T>
395 where
396 S: Storage<T>,
397 {
398 // If the reader is closed there is no point in writing anything, even if space is
399 // available.
400 if !self.is_reader_open() {
401 // Empty slice indicates the reader closed.
402 return Default::default();
403 }
404
405 let (range_0, range_1) = self.state.ring.right_ranges();
406
407 // `Ring` guarantees that a left region will only overlap a right region when this order
408 // is followed:
409 // - Get the right region range
410 // - Advance the right region
411 // - Get the left region range
412 // Given that the borrow checker prevents this here (`buf` and `consume` both take
413 // &mut self), and assuming the Reader behaves correctly and does not hold references to the
414 // left region's buffer while advancing it, there is no way to get another range that
415 // overlaps this one.
416 RegionMut::new(
417 unsafe { self.state.storage.slice_mut_unchecked(range_0) },
418 unsafe { self.state.storage.slice_mut_unchecked(range_1) },
419 )
420 }
421
422 /// Mark items at the start of the writer buffer as ready to be read, removing them from the
423 /// slice returned by [`buf`] and making them available in the reader's buffer.
424 ///
425 /// # Panics
426 /// This function will panic if `amt` is larger than the writer's available space buffer.
427 ///
428 /// [`buf`]: Writer::buf
429 #[inline]
430 pub fn feed(&mut self, amt: usize) {
431 self.state.ring.advance_right(amt);
432 }
433
434 /// Writes some items from a buffer into this queue, returning how many items were written.
435 ///
436 /// This function will attempt to write the entire contents of `buf`, but the entire write may
437 /// not succeed if not enough space is available.
438 ///
439 /// # Return
440 /// It is guaranteed that the return value is `<= buf.len()`.
441 ///
442 /// A return value of `0` indicates one of these three scenarios:
443 /// 1. No space is available to write.
444 /// 2. The reader has closed.
445 /// 3. The buffer specified had a length of 0.
446 pub fn write<T>(&mut self, buf: &[T]) -> usize
447 where
448 S: Storage<T>,
449 T: Clone,
450 {
451 let mut dest_buf = self.buf();
452
453 let len = dest_buf.len().min(buf.len());
454 dest_buf.slice_mut(..len).clone_from_slice(&buf[..len]);
455
456 self.feed(len);
457 len
458 }
459
460 /// Close the writer, indicating to the reader that no more data will be written.
461 ///
462 /// Closing the writer multiple times has no effect.
463 ///
464 /// Dropping the writer object will also close it.
465 #[inline]
466 pub fn close(&mut self) {
467 self.state.close_writer();
468 }
469}
470
471impl<S> Drop for Writer<S> {
472 #[inline]
473 fn drop(&mut self) {
474 self.state.close_writer();
475 }
476}
477
478#[cfg(feature = "std-io")]
479mod io_impls {
480 use crate::nonblocking::{Reader, Writer};
481 use crate::storage::Storage;
482 use std::io::{BufRead, ErrorKind, Read, Result, Write};
483
484 #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
485 impl<S> Read for Reader<S>
486 where
487 S: Storage<u8>,
488 {
489 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
490 let src_buf = self.buf();
491
492 if src_buf.is_empty() {
493 return if self.is_writer_open() {
494 // No data to read, this is an error since blocking would be required.
495 Err(ErrorKind::WouldBlock.into())
496 } else {
497 // Writer is closed and all data has been read, return 0 to indicate EOF.
498 Ok(0)
499 };
500 }
501
502 let len = src_buf.len().min(buf.len());
503 src_buf.slice(..len).copy_to_slice(&mut buf[..len]);
504
505 self.consume(len);
506
507 Ok(len)
508 }
509 }
510
511 #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
512 impl<S> BufRead for Reader<S>
513 where
514 S: Storage<u8>,
515 {
516 fn fill_buf(&mut self) -> Result<&[u8]> {
517 let buf = self.buf().contiguous();
518
519 if !buf.is_empty() {
520 return Ok(buf);
521 }
522
523 if self.is_writer_open() {
524 // No data to read, this is an error since blocking would be required.
525 return Err(ErrorKind::WouldBlock.into());
526 }
527
528 // Writer is closed and all data has been read, return an empty slice to indicate EOF.
529 Ok(Default::default())
530 }
531
532 fn consume(&mut self, amt: usize) {
533 self.consume(amt);
534 }
535 }
536
537 #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
538 impl<S> Write for Writer<S>
539 where
540 S: Storage<u8>,
541 {
542 fn write(&mut self, buf: &[u8]) -> Result<usize> {
543 let mut dest_buf = self.buf();
544
545 if !dest_buf.is_empty() {
546 let len = dest_buf.len().min(buf.len());
547 dest_buf.slice_mut(..len).copy_from_slice(&buf[..len]);
548
549 self.feed(len);
550
551 return Ok(len);
552 }
553
554 if !self.is_reader_open() {
555 // Return an empty slice to indicate EOF.
556 return Ok(Default::default());
557 }
558
559 // No space to write, this is an error since blocking would be required.
560 Err(ErrorKind::WouldBlock.into())
561 }
562
563 fn flush(&mut self) -> Result<()> {
564 if self.is_flushed() {
565 return Ok(());
566 }
567 if self.is_reader_open() {
568 return Err(ErrorKind::WouldBlock.into());
569 }
570 Err(ErrorKind::UnexpectedEof.into())
571 }
572 }
573}
574
575#[cfg(feature = "std-io")]
576pub use self::io_impls::*;