mini_io_queue/
asyncio.rs

1//! Async reader/writer queue for generic items or byte arrays.
2//!
3//! Each queue has a [`Reader`] and [`Writer`] part. Data can be copied into the writer's buffer and
4//! sent to the reader without locks or allocation, allowing nonblocking communication across
5//! threads.
6//!
7//! Reading and writing with the queue does not require any allocation, with the downside that the
8//! queue has a fixed capacity on creation.
9//!
10//! Unlike [`nonblocking`], this queue allows asynchronously waiting for data on the reader end,
11//! or waiting for space on the writer end. For [`u8`] storage, this means the queue can be used
12//! as a [`futures::AsyncRead`] and [`futures::AsyncWrite`] if the `std-io` feature is enabled.
13//!
14//! If you are not using an async runtime, you are probably more interested in the [`blocking`]
15//! queue, which blocks instead of waiting asynchronously.
16//!
17//! # Example
18//! ```
19//! use futures::join;
20//! use futures::executor::block_on;
21//! use mini_io_queue::asyncio::queue;
22//!
23//! let (mut reader, mut writer) = queue(8);
24//!
25//! let write_loop = async {
26//!     for i in 0..16 {
27//!         writer.write(&[i]).await;
28//!     }
29//! };
30//!
31//! let read_loop = async {
32//!     for i in 0..16 {
33//!         let mut buf = [0];
34//!         reader.read_exact(&mut buf).await.unwrap();
35//!
36//!         assert_eq!(buf[0], i);
37//!     }
38//! };
39//!
40//! block_on(async { join!(write_loop, read_loop) });
41//! ```
42//!
43//! [`Reader`]: self::Reader
44//! [`Writer`]: self::Writer
45//! [`nonblocking`]: crate::nonblocking
46//! [`blocking`]: crate::blocking
47//! [`futures::AsyncRead`]: futures::AsyncRead
48//! [`futures::AsyncWrite`]: futures::AsyncWrite
49
50use crate::storage::Storage;
51use crate::{Region, RegionMut, Ring};
52use alloc::sync::Arc;
53use core::fmt;
54use core::future::Future;
55use core::pin::Pin;
56use core::sync::atomic::{AtomicBool, Ordering};
57use core::task::{Context, Poll};
58use futures::task::AtomicWaker;
59
60/// Creates a queue that is backed by a specific storage. The queue will use the storage's entire
61/// capacity, and will be initialized with an empty read buffer and a full write buffer.
62///
63/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
64/// does.
65///
66/// # Example
67/// ```
68/// use mini_io_queue::asyncio::queue_from;
69/// use mini_io_queue::storage::HeapBuffer;
70///
71/// let buffer: HeapBuffer<u8> = HeapBuffer::new(100);
72/// let (reader, writer) = queue_from(buffer);
73/// ```
74///
75/// [`Send`]: std::marker::Send
76/// [`Sync`]: std::marker::Sync
77pub fn queue_from<T, S>(storage: S) -> (Reader<S>, Writer<S>)
78where
79    S: Storage<T>,
80{
81    let ring = Ring::new(storage.capacity());
82    queue_from_parts(ring, storage)
83}
84
85/// Creates a queue from a separately allocated ring and storage. The queue will use the ring's
86/// capacity, and be initialized with a read buffer from the ring's left region and a write buffer
87/// from the ring's right region.
88///
89/// It is up to the user to ensure the storage has enough capacity for the ring. If the ring's
90/// capacity is larger than the storage's length, the reader and writer may panic.
91///
92/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
93/// does.
94///
95/// # Example
96/// ```
97/// use mini_io_queue::Ring;
98/// use mini_io_queue::asyncio::queue_from_parts;
99/// use mini_io_queue::storage::{HeapBuffer, Storage};
100///
101/// // Create a queue with half of the underlying buffer in the read side.
102/// let ring = Ring::new(10);
103/// ring.advance_right(5);
104///
105/// let mut buffer = HeapBuffer::new(10);
106/// buffer.slice_mut(0..5).copy_from_slice(&[1, 2, 3, 4, 5]);
107///
108/// let (reader, writer) = queue_from_parts(ring, buffer);
109/// ```
110///
111/// [`Send`]: std::marker::Send
112/// [`Sync`]: std::marker::Sync
113pub fn queue_from_parts<S>(ring: Ring, storage: S) -> (Reader<S>, Writer<S>) {
114    let state = Arc::new(State {
115        ring,
116        storage,
117
118        is_reader_open: AtomicBool::new(true),
119        is_writer_open: AtomicBool::new(true),
120
121        data_available_waker: AtomicWaker::new(),
122        space_available_waker: AtomicWaker::new(),
123    });
124
125    let reader = Reader {
126        state: state.clone(),
127    };
128    let writer = Writer { state };
129
130    (reader, writer)
131}
132
133#[cfg(feature = "heap-buffer")]
134mod heap_constructors {
135    use crate::asyncio::{queue_from_parts, Reader, Writer};
136    use crate::storage::HeapBuffer;
137    use crate::Ring;
138
139    /// Creates a queue with a specific capacity, allocating storage on the heap. The queue will
140    /// be initialized with an empty read buffer and a full write buffer containing the element's
141    /// default value.
142    ///
143    /// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the element
144    /// type also does.
145    ///
146    /// # Example
147    /// ```
148    /// use mini_io_queue::asyncio::queue;
149    ///
150    /// let (reader, writer) = queue::<u8>(100);
151    /// ```
152    ///
153    /// [`Send`]: std::marker::Send
154    /// [`Sync`]: std::marker::Sync
155    #[cfg_attr(docsrs, doc(cfg(feature = "heap-buffer")))]
156    pub fn queue<T>(capacity: usize) -> (Reader<HeapBuffer<T>>, Writer<HeapBuffer<T>>)
157    where
158        T: Default,
159    {
160        let ring = Ring::new(capacity);
161        let buffer = HeapBuffer::new(capacity);
162
163        queue_from_parts(ring, buffer)
164    }
165}
166
167#[cfg(feature = "heap-buffer")]
168pub use self::heap_constructors::*;
169
170#[derive(Debug)]
171struct State<S> {
172    ring: Ring,
173    storage: S,
174
175    is_reader_open: AtomicBool,
176    is_writer_open: AtomicBool,
177
178    data_available_waker: AtomicWaker,
179    space_available_waker: AtomicWaker,
180}
181
182/// An error indicating why a writer failed.
183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub enum WriteError {
185    /// Writing failed because the reader was closed, preventing the read buffer from emptying.
186    ReaderClosed,
187}
188
189/// An error indicating why reading failed.
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub enum ReadExactError {
192    /// Reading failed because the writer was closed, meaning no more data will become available.
193    WriterClosed,
194}
195
196/// Receives items from the queue.
197///
198/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
199/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
200///
201/// A reader will automatically [`close`] itself when dropped.
202///
203/// [`close`]: Reader::close
204#[derive(Debug)]
205pub struct Reader<S> {
206    state: Arc<State<S>>,
207}
208
209/// Adds items to the queue.
210///
211/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
212/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
213///
214/// A writer will automatically close itself when dropped.
215#[derive(Debug)]
216pub struct Writer<S> {
217    state: Arc<State<S>>,
218}
219
220impl<S> State<S> {
221    fn close_reader(&self) {
222        let was_open = self.is_reader_open.swap(false, Ordering::AcqRel);
223        if was_open {
224            self.space_available_waker.wake();
225        }
226    }
227
228    fn close_writer(&self) {
229        let was_open = self.is_writer_open.swap(false, Ordering::AcqRel);
230        if was_open {
231            self.data_available_waker.wake();
232        }
233    }
234}
235
236impl<S> Reader<S> {
237    /// Returns if the corresponding writer is still open.
238    ///
239    /// If this is `false`, unread data will still be available to read but a well-behaved writer
240    /// will not provide any new data.
241    #[inline]
242    pub fn is_writer_open(&self) -> bool {
243        self.state.is_writer_open.load(Ordering::Acquire)
244    }
245
246    /// Returns if data is available in the reader's buffer.
247    ///
248    /// If this is true it is guaranteed that the next call to [`poll_fill_buf`] will return a
249    /// non-empty slice, unless [`consume`] is called first.
250    ///
251    /// Keep in mind that when using a reader and writer on separate threads, a reader that has no
252    /// data can receive data at any time - even between calls to `has_data` and other functions.
253    ///
254    /// [`poll_fill_buf`]: Reader::poll_fill_buf
255    /// [`consume`]: Reader::consume
256    #[inline]
257    pub fn has_data(&self) -> bool {
258        let (r0, r1) = self.state.ring.left_ranges();
259        !r0.is_empty() || !r1.is_empty()
260    }
261
262    /// Returns if the buffer is full, i.e all space is allocated to the reader and any write
263    /// operations will stall.
264    ///
265    /// If this is true a reader can only resume the writer by calling [`consume`] to pass capacity
266    /// to the writer.
267    ///
268    /// Keep in mind that when using a reader and writer on separate threads, a reader that is not
269    /// full can become full at any time - even between calls to `is_full` and other functions.
270    ///
271    /// [`consume`]: Reader::consume
272    #[inline]
273    pub fn is_full(&self) -> bool {
274        let (r0, r1) = self.state.ring.right_ranges();
275        r0.is_empty() && r1.is_empty()
276    }
277
278    /// Attempt to read from the reader's buffer, waiting for more data if it is empty.
279    ///
280    /// On success, returns `Poll::Ready(Ok(buf))`.
281    ///
282    /// If no data is available for reading, the method returns `Poll::Pending` and arranges for
283    /// the current task to receive a notification when the writer provides data or is closed.
284    ///
285    /// This function is a lower-level call. It needs to be paired with the [`consume`] method to
286    /// function properly. When calling this method, none of the contents will be "read" in the
287    /// sense that later calling `poll_fill_buf` may return the same contents. As such,
288    /// [`consume`] must be called with the number of items that are consumed from this buffer to
289    /// ensure that the items are never returned twice.
290    ///
291    /// An empty buffer returned indicates that all data has been read and the writer has closed.
292    ///
293    /// [`consume`]: Reader::consume
294    pub fn poll_fill_buf<T>(&mut self, cx: &mut Context<'_>) -> Poll<Region<T>>
295    where
296        S: Storage<T>,
297    {
298        if self.has_data() {
299            return Poll::Ready(self.buf());
300        }
301
302        // If no data is available, ask the writer to wake us when it writes something.
303        self.state.data_available_waker.register(cx.waker());
304
305        // Check if data appeared between the last check and the waker being set.
306        if self.has_data() {
307            // Data became available, remove the waker to avoid unnecessarily waking this task.
308            self.state.data_available_waker.take();
309
310            return Poll::Ready(self.buf());
311        }
312
313        // If the writer is closed, we've now read everything we could.
314        // This must be checked after registering a waker to ensure we will see new
315        // `is_writer_closed` values if the writer closed while registering the waker.
316        if !self.is_writer_open() {
317            // Remove the waker to avoid unnecessarily waking this task now it's done.
318            self.state.data_available_waker.take();
319
320            // Empty slice indicates the writer closed.
321            return Poll::Ready(Default::default());
322        }
323
324        // Still empty, park until writer indicates data available.
325        Poll::Pending
326    }
327
328    /// Marks items at the start of the reader buffer as consumed, removing them from the slice
329    /// returned by [`poll_fill_buf`] and adding their capacity to the end of the writer's buffer.
330    /// Since queues have a fixed underlying length, calling this is required to allow the transfer
331    /// of more data.
332    ///
333    /// # Panics
334    /// This function will panic if `amt` is larger than the reader's available data buffer.
335    ///
336    /// [`poll_fill_buf`]: Reader::poll_fill_buf
337    pub fn consume(&mut self, amt: usize) {
338        self.state.ring.advance_left(amt);
339
340        // Wake the writer if it was waiting for space.
341        self.state.space_available_waker.wake();
342    }
343
344    /// Pulls some items from this queue into the specified buffer, returning how many items were
345    /// read.
346    ///
347    /// This method will complete immediately if at least one item is available to be read.
348    ///
349    /// # Return
350    /// It is guaranteed that the return value is `<= buf.len()`.
351    ///
352    /// A return value of `0` indicates one of these two scenarios:
353    ///  1. The writer has closed and all items have been read.
354    ///  2. The buffer specified had a length of 0.
355    ///
356    /// # Cancel safety
357    /// This method is cancel safe. If you use it in a `select!` statement and some other branch
358    /// completes first, then it is guaranteed that no data was read.
359    pub async fn read<T>(&mut self, buf: &mut [T]) -> usize
360    where
361        S: Storage<T>,
362        T: Clone,
363    {
364        Read { reader: self, buf }.await
365    }
366
367    /// Reads the exact number of items required to fill `buf`.
368    ///
369    /// If the writer closes before the buffer is completely filled, an error of the kind
370    /// [`ReadExactError::WriterClosed`] will be returned.
371    ///
372    /// # Return
373    /// If the return value is `Ok(n)`, it is guaranteed that `n == buf.len()`.
374    ///
375    /// # Cancel safety
376    /// This method is not cancellation safe. If the method is used in a `select!` statement and
377    /// some other branch completes first, then some data may already have been read into `buf`.
378    pub async fn read_exact<T>(&mut self, buf: &mut [T]) -> Result<usize, ReadExactError>
379    where
380        S: Storage<T>,
381        T: Clone,
382    {
383        ReadExact {
384            reader: self,
385            buf,
386            read_bytes: 0,
387        }
388        .await
389    }
390
391    /// Close the reader, indicating to the writer that no more data will be read.
392    ///
393    /// Any in-progress writes or flushes on the writer will be interrupted, and any future
394    /// operations will fail. Closing the reader multiple times has no effect.
395    ///
396    /// Dropping the reader object will also close it.
397    #[inline]
398    pub fn close(&mut self) {
399        self.state.close_reader();
400    }
401
402    #[inline]
403    fn buf<T>(&self) -> Region<T>
404    where
405        S: Storage<T>,
406    {
407        let (range_0, range_1) = self.state.ring.left_ranges();
408        Region::new(
409            self.state.storage.slice(range_0),
410            self.state.storage.slice(range_1),
411        )
412    }
413}
414
415impl<S> Drop for Reader<S> {
416    /// Closes the reader.
417    #[inline]
418    fn drop(&mut self) {
419        self.state.close_reader();
420    }
421}
422
423struct Read<'a, T, S> {
424    reader: &'a mut Reader<S>,
425    buf: &'a mut [T],
426}
427
428impl<'a, T, S> Future for Read<'a, T, S>
429where
430    S: Storage<T>,
431    T: Clone,
432{
433    type Output = usize;
434
435    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
436        let me = self.get_mut();
437
438        let src_buf = match me.reader.poll_fill_buf(cx) {
439            Poll::Ready(src_buf) => src_buf,
440            Poll::Pending => return Poll::Pending,
441        };
442
443        if src_buf.is_empty() {
444            // This indicates the writer has closed and all data has been read.
445            return Poll::Ready(0);
446        }
447
448        let len = src_buf.len().min(me.buf.len());
449        src_buf.slice(..len).clone_to_slice(&mut me.buf[..len]);
450
451        me.reader.consume(len);
452        Poll::Ready(len)
453    }
454}
455
456struct ReadExact<'a, T, S> {
457    reader: &'a mut Reader<S>,
458    buf: &'a mut [T],
459    read_bytes: usize,
460}
461
462impl<'a, T, S> Future for ReadExact<'a, T, S>
463where
464    S: Storage<T>,
465    T: Clone,
466{
467    type Output = Result<usize, ReadExactError>;
468
469    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
470        let me = self.get_mut();
471
472        // Avoid polling the reader if the buffer is empty. This branch should only be reachable
473        // on the first poll, before any data has been read.
474        if me.buf.is_empty() {
475            debug_assert_eq!(me.read_bytes, 0);
476            return Poll::Ready(Ok(0));
477        }
478
479        let src_buf = match me.reader.poll_fill_buf(cx) {
480            Poll::Ready(src_buf) => src_buf,
481            Poll::Pending => return Poll::Pending,
482        };
483        let dest_buf = &mut me.buf[me.read_bytes..];
484
485        // Copy as much data as possible.
486        let read_len = dest_buf.len().min(src_buf.len());
487        src_buf
488            .slice(..read_len)
489            .clone_to_slice(&mut dest_buf[..read_len]);
490        me.reader.consume(read_len);
491
492        // Offset buffer for the next read.
493        me.read_bytes += read_len;
494
495        if dest_buf.len() == read_len {
496            // The buffer has been filled.
497            Poll::Ready(Ok(me.read_bytes))
498        } else if me.reader.is_writer_open() {
499            // Need to wait for more data to fill the buffer.
500            Poll::Pending
501        } else {
502            // The writer has closed, required data will never be ready.
503            Poll::Ready(Err(ReadExactError::WriterClosed))
504        }
505    }
506}
507
508impl<S> Writer<S> {
509    /// Returns if the corresponding reader is still open.
510    ///
511    /// If this is `false`, any attempt to write or flush the object will fail.
512    #[inline]
513    pub fn is_reader_open(&self) -> bool {
514        self.state.is_reader_open.load(Ordering::Acquire)
515    }
516
517    /// Returns if space is available in the writer's buffer.
518    ///
519    /// If this is true it is guaranteed that the next call to [`poll_empty_buf`] will return a
520    /// non-empty slice, unless [`feed`] is called first.
521    ///
522    /// Keep in mind that when using a reader and writer on separate threads, a writer that has no
523    /// space can have more made available at any time - even between calls to `has_space` and other
524    /// functions.
525    ///
526    /// [`poll_empty_buf`]: Writer::poll_empty_buf
527    /// [`feed`]: Writer::feed
528    #[inline]
529    pub fn has_space(&self) -> bool {
530        let (r0, r1) = self.state.ring.right_ranges();
531        !r0.is_empty() || !r1.is_empty()
532    }
533
534    /// Returns if the buffer is flushed, i.e there are no items to read and any read operations
535    /// will stall.
536    ///
537    /// If this is true a writer can only resume the reader by calling [`feed`] to pass items to
538    /// the reader.
539    ///
540    /// Keep in mind that when using a reader and writer on separate threads, a writer that is not
541    /// flushed can become flushed at any time - even between calls to `is_flushed` and other
542    /// functions.
543    ///
544    /// [`feed`]: Writer::feed
545    #[inline]
546    pub fn is_flushed(&self) -> bool {
547        let (r0, r1) = self.state.ring.left_ranges();
548        r0.is_empty() && r1.is_empty()
549    }
550
551    fn get_flush_state(&self) -> Option<Result<(), WriteError>> {
552        if self.is_flushed() {
553            return Some(Ok(()));
554        }
555        if !self.is_reader_open() {
556            return Some(Err(WriteError::ReaderClosed));
557        }
558        None
559    }
560
561    #[inline]
562    fn buf<T>(&mut self) -> RegionMut<T>
563    where
564        S: Storage<T>,
565    {
566        let (range_0, range_1) = self.state.ring.right_ranges();
567
568        // `Ring` guarantees that a left region will only overlap a right region when this order
569        // is followed:
570        //  - Get the right region range
571        //  - Advance the right region
572        //  - Get the left region range
573        // Given that the borrow checker prevents this here (`buf` and `consume` both take
574        // &mut self), and assuming the Reader behaves correctly and does not hold references to the
575        // left region's buffer while advancing it, there is no way to get another range that
576        // overlaps this one.
577        RegionMut::new(
578            unsafe { self.state.storage.slice_mut_unchecked(range_0) },
579            unsafe { self.state.storage.slice_mut_unchecked(range_1) },
580        )
581    }
582
583    /// Attempt to get the writable buffer, waiting for more space if it is empty.
584    ///
585    /// On success, returns `Poll::Ready(Ok(buf))`.
586    ///
587    /// If no space is available for writing, the method returns `Poll::Pending` and arranges for
588    /// the current task to receive a notification when the reader provides space or is closed.
589    ///
590    /// This functions is a lower-level call. It needs to be paired with the [`feed`] method to
591    /// function properly. When calling this method, none of the contents will be "written" in the
592    /// sense that later calling `poll_empty_buf` may return the same contents. As such, [`feed`]
593    /// must be called with the number of items that have been written to the buffer to ensure that
594    /// the items are never returned twice.
595    ///
596    /// An empty buffer returned indicates that the queue cannot be written to as the reader has
597    /// closed.
598    ///
599    /// [`feed`]: Writer::feed
600    pub fn poll_empty_buf<T>(&mut self, cx: &mut Context<'_>) -> Poll<RegionMut<T>>
601    where
602        S: Storage<T>,
603    {
604        // If the reader is closed there is no point in writing anything, even if space
605        // is available.
606        if !self.is_reader_open() {
607            // Empty slice indicates the reader closed.
608            return Poll::Ready(Default::default());
609        }
610
611        if self.has_space() {
612            return Poll::Ready(self.buf());
613        }
614
615        // If no space is available, ask the reader to wake us when it reads something.
616        self.state.space_available_waker.register(cx.waker());
617
618        // Check if the reader is closed again in case it was closed between the last check and the
619        // waker being set.
620        if !self.is_reader_open() {
621            // Remove the waker to avoid unnecessarily waking this task now it's done.
622            self.state.data_available_waker.take();
623
624            // Empty slice indicates the reader closed.
625            return Poll::Ready(Default::default());
626        }
627
628        // Check if space appeared between the last check and the waker being set.
629        if self.has_space() {
630            // Space became available remove the waker to avoid unnecessarily waking this task.
631            self.state.data_available_waker.take();
632
633            return Poll::Ready(self.buf());
634        }
635
636        // Still empty, park until reader indicates space available.
637        Poll::Pending
638    }
639
640    /// Marks items at the start of the writer buffer as ready to be read, removing them from the
641    /// slice returned by [`poll_empty_buf`] and making them available in the reader's buffer.
642    ///
643    /// # Panics
644    /// This function will panic if `amt` is larger than the writer's available space buffer.
645    ///
646    /// [`poll_empty_buf`]: Writer::poll_empty_buf
647    pub fn feed(&mut self, len: usize) {
648        self.state.ring.advance_right(len);
649
650        // Wake the reader if it was waiting for data
651        self.state.data_available_waker.wake();
652    }
653
654    /// Writes some items from a buffer into this queue, returning how many items were written.
655    ///
656    /// This function will attempt to write the entire contents of `buf`, but the entire write may
657    /// not succeed if not enough space is available.
658    ///
659    /// # Return
660    /// It is guaranteed that the return value is `<= buf.len()`.
661    ///
662    /// A return value of `0` indicates one of these two scenarios:
663    ///  1. The reader has closed.
664    ///  2. The buffer specified had a length of 0.
665    ///
666    /// # Cancel safety
667    /// This method is cancel safe. If you use it in a `select!` statement and some other branch
668    /// completes first, then it is guaranteed that no data was written.
669    pub async fn write<T>(&mut self, buf: &[T]) -> usize
670    where
671        S: Storage<T>,
672        T: Clone,
673    {
674        Write { writer: self, buf }.await
675    }
676
677    /// Attempts to write all items in a buffer into this queue.
678    ///
679    /// If the reader closes before all items are written, an error of the kind
680    /// [`WriteError::ReaderClosed`] will be returned.
681    ///
682    /// # Return
683    /// If the return value is `Ok(n)`, it is guaranteed that `n == buf.len()`.
684    ///
685    /// # Cancel safety
686    /// This method is not cancellation safe. If it is used in a `select!` statement and some other
687    /// branch completes first, then the provided buffer may have been partially written, but
688    /// future calls to `write_all` will start over from the beginning of the buffer.
689    pub async fn write_all<T>(&mut self, buf: &[T]) -> Result<usize, WriteError>
690    where
691        S: Storage<T>,
692        T: Clone,
693    {
694        WriteAll {
695            writer: self,
696            buf,
697            written_bytes: 0,
698        }
699        .await
700    }
701
702    /// Attempt to flush the buffer, ensuring that any items waiting to be read are consumed by the
703    /// reader.
704    ///
705    /// On success, returns `Poll::Ready(Ok(()))`. If the reader is closed, returns
706    /// `Poll::Ready(Err(FlushError::ReaderClosed))`.
707    ///
708    /// If flushing cannot immediately complete, this method returns `Poll::Pending` and arranges
709    /// for the current task to receive a notification when the object can make progress towards
710    /// flushing.
711    pub fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
712        if let Some(flush_state) = self.get_flush_state() {
713            return Poll::Ready(flush_state);
714        }
715
716        // Wait for more data to be read before checking if the data is flushed.
717        self.state.space_available_waker.register(cx.waker());
718
719        // Check the flush state again in case the reader read between the last check and the waker
720        // being set.
721        if let Some(flush_state) = self.get_flush_state() {
722            // Flush is complete, remove the waker to avoid unnecessarily waking this task.
723            self.state.space_available_waker.take();
724
725            return Poll::Ready(flush_state);
726        }
727
728        Poll::Pending
729    }
730
731    /// Flushes the buffer, ensuring that any items waiting to be read are consumed by the reader.
732    ///
733    /// If the reader closes before the buffer is flushed, an error of the kind
734    /// [`WriteError::ReaderClosed`] will be returned.
735    pub async fn flush(&mut self) -> Result<(), WriteError> {
736        Flush(self).await
737    }
738
739    /// Attempt to close the writer, flushing any remaining data and indicating to the reader that
740    /// no more data will be written.
741    ///
742    /// On success, returns `Poll::Ready(Ok(()))`. Any future read operations will fail. Closing
743    /// the writer multiple times has no effect.
744    pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
745        // Wait for data to be flushed.
746        match self.poll_flush(cx) {
747            Poll::Ready(Ok(())) => {}
748            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
749            Poll::Pending => return Poll::Pending,
750        }
751
752        self.state.close_writer();
753        Poll::Ready(Ok(()))
754    }
755
756    /// Closes the buffer, flushing any remaining data and indicating to the reader that no more
757    /// data will be written.
758    ///
759    /// If the reader closes before the buffer is flushed, an error of the kind
760    /// [`WriteError::ReaderClosed`] will be returned.
761    pub async fn close(&mut self) -> Result<(), WriteError> {
762        Flush(self).await
763    }
764}
765
766impl<S> Drop for Writer<S> {
767    /// Closes the writer without flushing.
768    #[inline]
769    fn drop(&mut self) {
770        self.state.close_writer();
771    }
772}
773
774struct Write<'a, T, S> {
775    writer: &'a mut Writer<S>,
776    buf: &'a [T],
777}
778
779impl<'a, T, S> Future for Write<'a, T, S>
780where
781    S: Storage<T>,
782    T: Clone,
783{
784    type Output = usize;
785
786    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
787        let me = self.get_mut();
788
789        let mut dest_buf = match me.writer.poll_empty_buf(cx) {
790            Poll::Ready(dest_buf) => dest_buf,
791            Poll::Pending => return Poll::Pending,
792        };
793
794        if dest_buf.is_empty() {
795            // This indicates the reader has closed.
796            return Poll::Ready(0);
797        }
798
799        let len = dest_buf.len().min(me.buf.len());
800        dest_buf.slice_mut(..len).clone_from_slice(&me.buf[..len]);
801
802        me.writer.feed(len);
803        Poll::Ready(len)
804    }
805}
806
807struct WriteAll<'a, T, S> {
808    writer: &'a mut Writer<S>,
809    buf: &'a [T],
810    written_bytes: usize,
811}
812
813impl<'a, T, S> Future for WriteAll<'a, T, S>
814where
815    S: Storage<T>,
816    T: Clone,
817{
818    type Output = Result<usize, WriteError>;
819
820    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
821        let me = self.get_mut();
822
823        // Avoid polling the writer if the buffer is empty. This branch should only be reachable
824        // on the first poll, before any data has been written.
825        if me.buf.is_empty() {
826            debug_assert_eq!(me.written_bytes, 0);
827            return Poll::Ready(Ok(0));
828        }
829
830        let mut dest_buf = match me.writer.poll_empty_buf(cx) {
831            Poll::Ready(dest_buf) => dest_buf,
832            Poll::Pending => return Poll::Pending,
833        };
834        let src_buf = &me.buf[me.written_bytes..];
835
836        if dest_buf.is_empty() {
837            // The reader has closed.
838            return Poll::Ready(Err(WriteError::ReaderClosed));
839        }
840
841        // Copy as much data as possible.
842        let write_len = dest_buf.len().min(src_buf.len());
843        dest_buf
844            .slice_mut(..write_len)
845            .clone_from_slice(&src_buf[..write_len]);
846        me.writer.feed(write_len);
847
848        // Offset buffer for the next write.
849        me.written_bytes += write_len;
850
851        if src_buf.len() == write_len {
852            // All data has been written.
853            Poll::Ready(Ok(me.written_bytes))
854        } else {
855            // Need to wait for more space to write remaining data.
856            Poll::Pending
857        }
858    }
859}
860
861struct Flush<'a, S>(&'a mut Writer<S>);
862
863impl<'a, S> Future for Flush<'a, S> {
864    type Output = Result<(), WriteError>;
865
866    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
867        self.get_mut().0.poll_flush(cx)
868    }
869}
870
871struct Close<'a, S>(&'a mut Writer<S>);
872
873impl<'a, S> Future for Close<'a, S> {
874    type Output = Result<(), WriteError>;
875
876    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
877        self.get_mut().0.poll_close(cx)
878    }
879}
880
881impl fmt::Display for WriteError {
882    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
883        match self {
884            WriteError::ReaderClosed => write!(f, "reader closed"),
885        }
886    }
887}
888
889impl fmt::Display for ReadExactError {
890    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
891        match self {
892            ReadExactError::WriterClosed => write!(f, "writer closed"),
893        }
894    }
895}
896
897#[cfg(feature = "std")]
898mod std_impls {
899    use crate::asyncio::{ReadExactError, WriteError};
900    use std::{error, io};
901
902    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
903    impl error::Error for WriteError {}
904
905    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
906    impl From<WriteError> for io::Error {
907        fn from(err: WriteError) -> Self {
908            match err {
909                WriteError::ReaderClosed => io::ErrorKind::UnexpectedEof.into(),
910            }
911        }
912    }
913
914    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
915    impl error::Error for ReadExactError {}
916
917    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
918    impl From<ReadExactError> for io::Error {
919        fn from(err: ReadExactError) -> Self {
920            match err {
921                ReadExactError::WriterClosed => io::ErrorKind::UnexpectedEof.into(),
922            }
923        }
924    }
925}
926
927#[cfg(feature = "std")]
928pub use self::std_impls::*;
929
930#[cfg(feature = "std-io")]
931mod io_impls {
932    use crate::asyncio::{Reader, Writer};
933    use crate::storage::Storage;
934    use core::pin::Pin;
935    use core::task::{Context, Poll};
936    use futures::{io, AsyncBufRead, AsyncRead, AsyncWrite};
937
938    #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
939    impl<S> AsyncRead for Reader<S>
940    where
941        S: Storage<u8>,
942    {
943        fn poll_read(
944            self: Pin<&mut Self>,
945            cx: &mut Context<'_>,
946            buf: &mut [u8],
947        ) -> Poll<io::Result<usize>> {
948            let me = self.get_mut();
949
950            let src_buf = match me.poll_fill_buf(cx) {
951                Poll::Ready(src_buf) => src_buf,
952                Poll::Pending => return Poll::Pending,
953            };
954
955            if src_buf.is_empty() {
956                // This indicates the writer has closed and all data has been read.
957                return Poll::Ready(Ok(0));
958            }
959
960            let len = src_buf.len().min(buf.len());
961            src_buf.slice(..len).copy_to_slice(&mut buf[..len]);
962
963            me.consume(len);
964
965            Poll::Ready(Ok(len))
966        }
967    }
968
969    #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
970    impl<S> AsyncBufRead for Reader<S>
971    where
972        S: Storage<u8>,
973    {
974        fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
975            self.get_mut()
976                .poll_fill_buf(cx)
977                .map(|region| Ok(region.contiguous()))
978        }
979
980        fn consume(self: Pin<&mut Self>, amt: usize) {
981            self.get_mut().consume(amt);
982        }
983    }
984
985    #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
986    impl<S> AsyncWrite for Writer<S>
987    where
988        S: Storage<u8>,
989    {
990        fn poll_write(
991            self: Pin<&mut Self>,
992            cx: &mut Context<'_>,
993            buf: &[u8],
994        ) -> Poll<io::Result<usize>> {
995            let me = self.get_mut();
996
997            let mut dest_buf = match me.poll_empty_buf(cx) {
998                Poll::Ready(dest_buf) => dest_buf,
999                Poll::Pending => return Poll::Pending,
1000            };
1001
1002            if dest_buf.is_empty() {
1003                // This indicates the reader has closed.
1004                return Poll::Ready(Ok(0));
1005            }
1006
1007            let len = dest_buf.len().min(buf.len());
1008            dest_buf.slice_mut(..len).copy_from_slice(&buf[..len]);
1009
1010            me.feed(len);
1011
1012            Poll::Ready(Ok(len))
1013        }
1014
1015        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1016            self.get_mut().poll_flush(cx).map(|r| r.map_err(Into::into))
1017        }
1018
1019        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1020            self.get_mut().poll_close(cx).map(|r| r.map_err(Into::into))
1021        }
1022    }
1023}
1024
1025#[cfg(feature = "std-io")]
1026pub use self::io_impls::*;