mini_io_queue/blocking.rs
1//! Synchronous reader/writer queue for generic items or byte arrays.
2//!
3//! Each queue has a [`Reader`] and [`Writer`] part. Data can be copied into the writer's buffer and
4//! sent to the reader allocations, allowing nonblocking communication across threads.
5//!
6//! Reading and writing with the queue does not require any allocation, with the downside that the
7//! queue has a fixed capacity on creation.
8//!
9//! Unlike [`nonblocking`], this queue blocks to wait for data on the reader end, or wait for space
10//! on the writer end. For [`u8`] storage, this means the queue can be used as a [`Read`] or
11//! [`Write`].
12//!
13//! If you are using an async runtime, you are probably more interested in the [`asyncio`] queue,
14//! which does not block.
15//!
16//! # Example
17//! ```
18//! use mini_io_queue::blocking::queue;
19//!
20//! let (mut reader, mut writer) = queue(8);
21//!
22//! let write_thread = std::thread::spawn(move || {
23//! for i in 0..16 {
24//! writer.write(&[i]);
25//! }
26//! });
27//!
28//! let read_thread = std::thread::spawn(move || {
29//! for i in 0..16 {
30//! let mut buf = [0];
31//! reader.read_exact(&mut buf).unwrap();
32//!
33//! assert_eq!(buf[0], i);
34//! }
35//! });
36//!
37//! write_thread.join().unwrap();
38//! read_thread.join().unwrap();
39//! ```
40//!
41//! [`Reader`]: self::Reader
42//! [`Writer`]: self::Writer
43//! [`nonblocking`]: crate::nonblocking
44//! [`Read`]: std::io::Read
45//! [`Write`]: std::io::Write
46//! [`asyncio`]: crate::asyncio
47
48use crate::storage::Storage;
49use crate::{Region, RegionMut, Ring};
50use std::sync::atomic::{AtomicBool, Ordering};
51use std::sync::{Arc, Condvar, Mutex};
52use std::{error, fmt, io};
53
54/// Creates a queue that is backed by a specific storage. The queue will use the storage's entire
55/// capacity, and will be initialized with an empty read buffer and a full write buffer.
56///
57/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
58/// does.
59///
60/// # Example
61/// ```
62/// use mini_io_queue::blocking::queue_from;
63/// use mini_io_queue::storage::HeapBuffer;
64///
65/// let buffer = HeapBuffer::<u8>::new(100);
66/// let (reader, writer) = queue_from(buffer);
67/// ```
68///
69/// [`Send`]: std::marker::Send
70/// [`Sync`] std::marker::Sync
71pub fn queue_from<T, S>(storage: S) -> (Reader<S>, Writer<S>)
72where
73 S: Storage<T>,
74{
75 let ring = Ring::new(storage.capacity());
76 queue_from_parts(ring, storage)
77}
78
79/// Creates a queue from a separately allocated ring and storage. The queue will use the ring's
80/// capacity, and be initialized with a read buffer from the ring's left region and a write buffer
81/// from the ring's right region.
82///
83/// It is up to the user to ensure the storage has enough capacity for the ring. If the ring's
84/// capacity is larger than the storage's length, the reader and writer may panic.
85///
86/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
87/// does.
88///
89/// # Example
90/// ```
91/// use mini_io_queue::Ring;
92/// use mini_io_queue::blocking::queue_from_parts;
93/// use mini_io_queue::storage::{HeapBuffer, Storage};
94///
95/// // Create a queue with half of the underlying buffer in the read side.
96/// let ring = Ring::new(10);
97/// ring.advance_right(5);
98///
99/// let mut buffer = HeapBuffer::new(10);
100/// buffer.slice_mut(0..5).copy_from_slice(&[1, 2, 3, 4, 5]);
101///
102/// let (reader, writer) = queue_from_parts(ring, buffer);
103/// ```
104///
105/// [`Send`]: std::marker::Send
106/// [`Sync`]: std::marker::Sync
107pub fn queue_from_parts<S>(ring: Ring, storage: S) -> (Reader<S>, Writer<S>) {
108 let state = Arc::new(State {
109 ring,
110 storage,
111
112 is_reader_open: AtomicBool::new(true),
113 is_writer_open: AtomicBool::new(true),
114
115 data_available_cond: Condvar::new(),
116 space_available_cond: Condvar::new(),
117 });
118
119 let reader = Reader {
120 state: state.clone(),
121 data_available_mutex: Mutex::new(()),
122 };
123 let writer = Writer {
124 state,
125 space_available_mutex: Mutex::new(()),
126 };
127
128 (reader, writer)
129}
130
131#[cfg(feature = "heap-buffer")]
132mod heap_constructors {
133 use crate::blocking::{queue_from_parts, Reader, Writer};
134 use crate::storage::HeapBuffer;
135 use crate::Ring;
136
137 /// Creates a queue with a specific capacity, allocating storage on the heap. The queue will
138 /// be initialized with an empty read buffer and a full write buffer containing the element's
139 /// default value.
140 ///
141 /// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the element
142 /// type also does.
143 ///
144 /// # Example
145 /// ```
146 /// use mini_io_queue::blocking::queue;
147 ///
148 /// let (reader, writer) = queue::<u8>(100);
149 /// ```
150 ///
151 /// [`Send`]: std::marker::Send
152 /// [`Sync`]: std::marker::Sync
153 #[cfg_attr(docsrs, doc(cfg(feature = "heap-buffer")))]
154 pub fn queue<T>(capacity: usize) -> (Reader<HeapBuffer<T>>, Writer<HeapBuffer<T>>)
155 where
156 T: Default,
157 {
158 let ring = Ring::new(capacity);
159 let buffer = HeapBuffer::new(capacity);
160
161 queue_from_parts(ring, buffer)
162 }
163}
164
165#[cfg(feature = "heap-buffer")]
166pub use self::heap_constructors::*;
167
168#[derive(Debug)]
169struct State<S> {
170 ring: Ring,
171 storage: S,
172
173 is_reader_open: AtomicBool,
174 is_writer_open: AtomicBool,
175
176 data_available_cond: Condvar,
177 space_available_cond: Condvar,
178}
179
180/// An error indicating why a writer failed.
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub enum WriteError {
183 /// Writing failed because the reader was closed, preventing the read buffer from emptying.
184 ReaderClosed,
185}
186
187/// An error indicating why reading failed.
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum ReadExactError {
190 /// Reading failed because the writer was closed, meaning no more data will become available.
191 WriterClosed,
192}
193
194/// Receives items from the queue.
195///
196/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
197/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
198#[derive(Debug)]
199pub struct Reader<S> {
200 state: Arc<State<S>>,
201 data_available_mutex: Mutex<()>,
202}
203
204/// Adds items to the queue.
205///
206/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
207/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
208#[derive(Debug)]
209pub struct Writer<S> {
210 state: Arc<State<S>>,
211 space_available_mutex: Mutex<()>,
212}
213
214impl<S> State<S> {
215 fn close_reader(&self) {
216 let was_open = self.is_reader_open.swap(false, Ordering::AcqRel);
217 if was_open {
218 self.space_available_cond.notify_all();
219 }
220 }
221
222 fn close_writer(&self) {
223 let was_open = self.is_writer_open.swap(false, Ordering::AcqRel);
224 if was_open {
225 self.data_available_cond.notify_all();
226 }
227 }
228}
229
230impl<S> Reader<S> {
231 /// Returns if the corresponding writer is still open.
232 ///
233 /// If this is `false`, unread data will still be available to read but a well-behaved writer
234 /// will not provide any new data.
235 #[inline]
236 pub fn is_writer_open(&self) -> bool {
237 self.state.is_writer_open.load(Ordering::Acquire)
238 }
239
240 /// Returns if data is available in the reader's buffer.
241 ///
242 /// If this is true it is guaranteed that the next call to [`fill_buf`] will return a non-empty
243 /// slice, unless [`consume`] is called first.
244 ///
245 /// Keep in mind that when using a reader and writer on separate threads, a reader that has no
246 /// data can receive data at any time - even between calls to `has_data` and other functions.
247 ///
248 /// [`fill_buf`]: Reader::fill_buf
249 /// [`consume`]: Reader::consume
250 #[inline]
251 pub fn has_data(&self) -> bool {
252 let (r1, r2) = self.state.ring.left_ranges();
253 !r1.is_empty() || !r2.is_empty()
254 }
255
256 /// Returns if the buffer is full, i.e all space is allocated to the reader and any write
257 /// operations will block.
258 ///
259 /// If this is true a reader can only resume the writer by calling [`consume`] to pass capacity
260 /// to the writer.
261 ///
262 /// Keep in mind that when using a reader and writer on separate threads, a reader that is not
263 /// full can become full at any time - even between calls to `is_full` and other functions.
264 ///
265 /// [`consume`]: Reader::consume
266 #[inline]
267 pub fn is_full(&self) -> bool {
268 let (r1, r2) = self.state.ring.right_ranges();
269 r1.is_empty() && r2.is_empty()
270 }
271
272 /// Attempt to read from the reader's buffer, blocking to wait for more data if it is empty.
273 ///
274 /// This function is a lower-level call. It needs to be paired with the [`consume`] method to
275 /// function properly. When calling this method, none of the contents will be "read" in the
276 /// sense that later calling `fill_buf` may return the same contents. As such, [`consume`] must
277 /// be called with the number of bytes that are consumed from this buffer to ensure that the
278 /// items are never returned twice.
279 ///
280 /// An empty buffer returned indicates that all data has been read and the writer has closed.
281 ///
282 /// [`consume`]: Reader::consume
283 pub fn fill_buf<T>(&mut self) -> Region<T>
284 where
285 S: Storage<T>,
286 {
287 if self.has_data() {
288 return self.buf();
289 }
290
291 // If the writer is closed, we've now read everything we could.
292 if !self.is_writer_open() {
293 // Empty slice indicates the writer closed.
294 return Default::default();
295 }
296
297 // If no data is available, park and ask the writer to unpark us when it writes something.
298 let mut lock = self.data_available_mutex.lock().unwrap();
299 loop {
300 lock = self.state.data_available_cond.wait(lock).unwrap();
301
302 if self.has_data() {
303 return self.buf();
304 }
305
306 // If the writer is closed, we've now read everything we could.
307 if !self.is_writer_open() {
308 // Empty slice indicates the writer closed.
309 return Default::default();
310 }
311 }
312 }
313
314 /// Marks items at the start of the reader buffer as consumed, removing them from the slice
315 /// returned by [`fill_buf`] and adding their capacity to the end of the writer's buffer.
316 /// Since queues have a fixed underlying length, calling this is required to allow the transfer
317 /// of more data.
318 ///
319 /// # Panics
320 /// This function will panic if `amt` is larger than the reader's available data buffer.
321 ///
322 /// [`fill_buf`]: Reader::fill_buf
323 pub fn consume(&mut self, amt: usize) {
324 self.state.ring.advance_left(amt);
325
326 // Unpark the writer if it was waiting for space.
327 self.state.space_available_cond.notify_all();
328 }
329
330 /// Pulls some items from this queue into the specified buffer, returning how many items were
331 /// read.
332 ///
333 /// This method will complete immediately if at least one item is available to read, otherwise
334 /// it will block until some are available.
335 ///
336 /// # Return
337 /// It is guaranteed that the return value is `<= buf.len()`.
338 ///
339 /// A return value of `0` indicates one of these two scenarios:
340 /// 1. The writer has closed and all items have been read.
341 /// 2. The buffer specified had a length of 0.
342 pub fn read<T>(&mut self, buf: &mut [T]) -> usize
343 where
344 S: Storage<T>,
345 T: Clone,
346 {
347 let src_buf = self.fill_buf();
348
349 if src_buf.is_empty() {
350 // This indicates the writer has closed and all data has been read.
351 return 0;
352 }
353
354 let len = src_buf.len().min(buf.len());
355 src_buf.slice(..len).clone_to_slice(&mut buf[..len]);
356
357 self.consume(len);
358 len
359 }
360
361 /// Reads the exact number of items required to fill `buf`.
362 ///
363 /// If the writer closes before the buffer is completely filled, an error of the kind
364 /// [`ReadExactError::WriterClosed`] will be returned.
365 ///
366 /// # Return
367 /// If the return value is `Ok(n)`, it is guaranteed that `n == buf.len()`.
368 pub fn read_exact<T>(&mut self, buf: &mut [T]) -> Result<usize, ReadExactError>
369 where
370 S: Storage<T>,
371 T: Clone,
372 {
373 let len = buf.len();
374 let src_buf = loop {
375 let src_buf = self.fill_buf();
376
377 if src_buf.len() >= len {
378 break src_buf;
379 }
380
381 if !self.is_writer_open() {
382 // The writer has closed, required data will never be ready.
383 return Err(ReadExactError::WriterClosed);
384 }
385 };
386
387 src_buf.slice(..len).clone_to_slice(buf);
388 self.consume(len);
389
390 Ok(len)
391 }
392
393 /// Close the reader, indicating to the writer that no more data will be read.
394 ///
395 /// Any in-progress writes or flushes on the writer will be interrupted, and any future
396 /// operations will fail. Closing the reader multiple times has no effect.
397 ///
398 /// Dropping the reader object will also close it.
399 #[inline]
400 pub fn close(&mut self) {
401 self.state.close_reader();
402 }
403
404 #[inline]
405 fn buf<T>(&self) -> Region<T>
406 where
407 S: Storage<T>,
408 {
409 let (range_0, range_1) = self.state.ring.left_ranges();
410 Region::new(
411 self.state.storage.slice(range_0),
412 self.state.storage.slice(range_1),
413 )
414 }
415}
416
417impl<S> io::Read for Reader<S>
418where
419 S: Storage<u8>,
420{
421 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
422 let src_buf = self.fill_buf();
423
424 let len = src_buf.len().min(buf.len());
425 src_buf.slice(..len).copy_to_slice(&mut buf[..len]);
426
427 self.consume(len);
428
429 Ok(len)
430 }
431}
432
433impl<S> io::BufRead for Reader<S>
434where
435 S: Storage<u8>,
436{
437 fn fill_buf(&mut self) -> io::Result<&[u8]> {
438 Ok(self.fill_buf().contiguous())
439 }
440
441 fn consume(&mut self, amt: usize) {
442 self.consume(amt);
443 }
444}
445
446impl<S> Drop for Reader<S> {
447 fn drop(&mut self) {
448 self.state.close_reader();
449 }
450}
451
452impl<S> Writer<S> {
453 /// Returns if the corresponding reader is still open.
454 ///
455 /// If this is `false`, any attempt to write or flush the object will fail.
456 #[inline]
457 pub fn is_reader_open(&self) -> bool {
458 self.state.is_reader_open.load(Ordering::Acquire)
459 }
460
461 /// Returns if space is available in the writer's buffer.
462 ///
463 /// If this is true it is guaranteed that the next call to [`empty_buf`] will return a non-empty
464 /// slice, unless [`feed`] is called first.
465 ///
466 /// Keep in mind that when using a reader and writer on separate threads, a writer that has no
467 /// space can have more made available at any time - even between calls to `has_space` and other
468 /// functions.
469 ///
470 /// [`empty_buf`]: Writer::empty_buf
471 /// [`feed`]: Writer::feed
472 #[inline]
473 pub fn has_space(&self) -> bool {
474 let (r0, r1) = self.state.ring.right_ranges();
475 !r0.is_empty() || !r1.is_empty()
476 }
477
478 /// Returns if the buffer is flushed, i.e there are no items to read and any read operations
479 /// will stall.
480 ///
481 /// If this is true a writer can only resume the reader by calling [`feed`] to pass items to
482 /// the reader.
483 ///
484 /// Keep in mind that when using a reader and writer on separate threads, a writer that is not
485 /// flushed can become flushed at any time - even between calls to `is_flushed` and other
486 /// functions.
487 ///
488 /// [`feed`]: Writer::feed
489 #[inline]
490 pub fn is_flushed(&self) -> bool {
491 let (r0, r1) = self.state.ring.left_ranges();
492 r0.is_empty() && r1.is_empty()
493 }
494
495 fn get_flush_state(&self) -> Option<Result<(), WriteError>> {
496 if self.is_flushed() {
497 return Some(Ok(()));
498 }
499 if !self.is_reader_open() {
500 return Some(Err(WriteError::ReaderClosed));
501 }
502 None
503 }
504
505 #[inline]
506 fn buf<T>(&mut self) -> RegionMut<T>
507 where
508 S: Storage<T>,
509 {
510 let (range_0, range_1) = self.state.ring.right_ranges();
511
512 // `Ring` guarantees that a left region will only overlap a right region when this order
513 // is followed:
514 // - Get the right region range
515 // - Advance the right region
516 // - Get the left region range
517 // Given that the borrow checker prevents this here (`buf` and `consume` both take
518 // &mut self), and assuming the Reader behaves correctly and does not hold references to the
519 // left region's buffer while advancing it, there is no way to get another range that
520 // overlaps this one.
521 RegionMut::new(
522 unsafe { self.state.storage.slice_mut_unchecked(range_0) },
523 unsafe { self.state.storage.slice_mut_unchecked(range_1) },
524 )
525 }
526
527 /// Attempt to get the writable buffer, blocking to wait for more space if it is empty.
528 ///
529 /// This functions is a lower-level call. It needs to be paired with the [`feed`] method to
530 /// function properly. When calling this method, none of the contents will be "written" in the
531 /// sense that later calling `empty_buf` may return the same contents. As such, [`feed`] must be
532 /// called with the number of items that have been written to the buffer to ensure that the
533 /// items are never returned twice.
534 ///
535 /// An empty buffer returned indicates that the queue cannot be written to as the reader has
536 /// closed.
537 ///
538 /// [`feed`]: Writer::feed
539 pub fn empty_buf<T>(&mut self) -> RegionMut<T>
540 where
541 S: Storage<T>,
542 {
543 // If the reader is closed there is no point in writing anything, even if space is
544 // available.
545 if !self.is_reader_open() {
546 // Empty slice indicates the reader closed.
547 return Default::default();
548 }
549 if self.has_space() {
550 return self.buf();
551 }
552
553 // If no space is available, park and ask the reader to unpark us when it writes something.
554 {
555 let mut lock = self.space_available_mutex.lock().unwrap();
556 loop {
557 lock = self.state.space_available_cond.wait(lock).unwrap();
558
559 if !self.is_reader_open() {
560 // Empty slice indicates the reader closed.
561 return Default::default();
562 }
563 if self.has_space() {
564 break;
565 }
566 }
567 }
568
569 self.buf()
570 }
571
572 /// Marks items at the start of the writer buffer as ready to be read, removing them from the
573 /// slice returned by [`empty_buf`] and making them available in the reader's buffer.
574 ///
575 /// # Panics
576 /// This function will panic if `amt` is larger than the writer's available space buffer.
577 ///
578 /// [`empty_buf`]: Writer::empty_buf
579 pub fn feed(&mut self, len: usize) {
580 self.state.ring.advance_right(len);
581
582 // Unpark the reader if it was waiting for data.
583 self.state.data_available_cond.notify_all();
584 }
585
586 /// Writes some items from a buffer into this queue, returning how many items were written.
587 ///
588 /// This function will attempt to write the entire contents of `buf`, but the entire write may
589 /// not succeed if not enough space is available.
590 ///
591 /// # Return
592 /// It is guaranteed that the return value is `<= buf.len()`.
593 ///
594 /// A return value of `0` indicates one of these two scenarios:
595 /// 1. The reader has closed.
596 /// 2. The buffer specified had a length of 0.
597 pub fn write<T>(&mut self, buf: &[T]) -> usize
598 where
599 S: Storage<T>,
600 T: Clone,
601 {
602 let mut dest_buf = self.empty_buf();
603
604 if dest_buf.is_empty() {
605 // This indicates the reader has closed.
606 return 0;
607 }
608
609 let len = dest_buf.len().min(buf.len());
610 dest_buf.slice_mut(..len).clone_from_slice(&buf[..len]);
611
612 self.feed(len);
613 len
614 }
615
616 /// Attempts to write all items in a buffer into this queue.
617 ///
618 /// If the reader closes before all items are written, an error of the kind
619 /// [`WriteError::ReaderClosed`] will be returned.
620 ///
621 /// # Return
622 /// If the return value is `Ok(n)`, it is guaranteed that `n == buf.len()`.
623 pub fn write_all<T>(&mut self, buf: &[T]) -> Result<usize, WriteError>
624 where
625 S: Storage<T>,
626 T: Clone,
627 {
628 let len = buf.len();
629 let mut dest_buf = loop {
630 let dest_buf = self.empty_buf();
631
632 if dest_buf.is_empty() {
633 // The reader has closed.
634 return Err(WriteError::ReaderClosed);
635 }
636
637 if dest_buf.len() >= len {
638 break dest_buf;
639 }
640 };
641
642 dest_buf.slice_mut(..len).clone_from_slice(buf);
643 self.feed(len);
644
645 Ok(len)
646 }
647
648 /// Flush the buffer, ensuring that any items waiting to be read are consumed by the reader.
649 ///
650 /// If the reader is closed, returns `FlushError::ReaderClosed`. If blocking cannot be completed
651 /// immediately, this method blocks until the reader closed or the buffer is flushed.
652 pub fn flush(&mut self) -> Result<(), WriteError> {
653 if let Some(flush_state) = self.get_flush_state() {
654 return flush_state;
655 }
656
657 // Wait for more data to be read.
658 let mut lock = self.space_available_mutex.lock().unwrap();
659 loop {
660 lock = self.state.space_available_cond.wait(lock).unwrap();
661
662 if let Some(flush_state) = self.get_flush_state() {
663 return flush_state;
664 }
665 }
666 }
667
668 /// Close the writer, flushing any remaining data and indicating to the reader that no more data
669 /// will be written.
670 ///
671 /// Any future read operations will fail. Closing the writer multiple times has no effect.
672 pub fn close(&mut self) -> Result<(), WriteError> {
673 self.flush()?;
674 self.state.close_writer();
675 Ok(())
676 }
677}
678
679impl<S> io::Write for Writer<S>
680where
681 S: Storage<u8>,
682{
683 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
684 let mut dest_buf = self.empty_buf();
685
686 let len = dest_buf.len().min(buf.len());
687 dest_buf.slice_mut(..len).copy_from_slice(&buf[..len]);
688
689 self.feed(len);
690
691 Ok(len)
692 }
693
694 fn flush(&mut self) -> io::Result<()> {
695 self.flush().map_err(Into::into)
696 }
697}
698
699impl<S> Drop for Writer<S> {
700 #[inline]
701 fn drop(&mut self) {
702 self.state.close_writer();
703 }
704}
705
706impl error::Error for WriteError {}
707
708impl fmt::Display for WriteError {
709 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
710 match self {
711 WriteError::ReaderClosed => write!(f, "reader closed"),
712 }
713 }
714}
715
716impl From<WriteError> for io::Error {
717 fn from(err: WriteError) -> Self {
718 match err {
719 WriteError::ReaderClosed => io::ErrorKind::UnexpectedEof.into(),
720 }
721 }
722}
723
724impl error::Error for ReadExactError {}
725
726impl fmt::Display for ReadExactError {
727 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728 match self {
729 ReadExactError::WriterClosed => write!(f, "writer closed"),
730 }
731 }
732}
733
734impl From<ReadExactError> for io::Error {
735 fn from(err: ReadExactError) -> Self {
736 match err {
737 ReadExactError::WriterClosed => io::ErrorKind::UnexpectedEof.into(),
738 }
739 }
740}