mini_io_queue/asyncio.rs
1//! Async reader/writer queue for generic items or byte arrays.
2//!
3//! Each queue has a [`Reader`] and [`Writer`] part. Data can be copied into the writer's buffer and
4//! sent to the reader without locks or allocation, allowing nonblocking communication across
5//! threads.
6//!
7//! Reading and writing with the queue does not require any allocation, with the downside that the
8//! queue has a fixed capacity on creation.
9//!
10//! Unlike [`nonblocking`], this queue allows asynchronously waiting for data on the reader end,
11//! or waiting for space on the writer end. For [`u8`] storage, this means the queue can be used
12//! as a [`futures::AsyncRead`] and [`futures::AsyncWrite`] if the `std-io` feature is enabled.
13//!
14//! If you are not using an async runtime, you are probably more interested in the [`blocking`]
15//! queue, which blocks instead of waiting asynchronously.
16//!
17//! # Example
18//! ```
19//! use futures::join;
20//! use futures::executor::block_on;
21//! use mini_io_queue::asyncio::queue;
22//!
23//! let (mut reader, mut writer) = queue(8);
24//!
25//! let write_loop = async {
26//! for i in 0..16 {
27//! writer.write(&[i]).await;
28//! }
29//! };
30//!
31//! let read_loop = async {
32//! for i in 0..16 {
33//! let mut buf = [0];
34//! reader.read_exact(&mut buf).await.unwrap();
35//!
36//! assert_eq!(buf[0], i);
37//! }
38//! };
39//!
40//! block_on(async { join!(write_loop, read_loop) });
41//! ```
42//!
43//! [`Reader`]: self::Reader
44//! [`Writer`]: self::Writer
45//! [`nonblocking`]: crate::nonblocking
46//! [`blocking`]: crate::blocking
47//! [`futures::AsyncRead`]: futures::AsyncRead
48//! [`futures::AsyncWrite`]: futures::AsyncWrite
49
50use crate::storage::Storage;
51use crate::{Region, RegionMut, Ring};
52use alloc::sync::Arc;
53use core::fmt;
54use core::future::Future;
55use core::pin::Pin;
56use core::sync::atomic::{AtomicBool, Ordering};
57use core::task::{Context, Poll};
58use futures::task::AtomicWaker;
59
60/// Creates a queue that is backed by a specific storage. The queue will use the storage's entire
61/// capacity, and will be initialized with an empty read buffer and a full write buffer.
62///
63/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
64/// does.
65///
66/// # Example
67/// ```
68/// use mini_io_queue::asyncio::queue_from;
69/// use mini_io_queue::storage::HeapBuffer;
70///
71/// let buffer: HeapBuffer<u8> = HeapBuffer::new(100);
72/// let (reader, writer) = queue_from(buffer);
73/// ```
74///
75/// [`Send`]: std::marker::Send
76/// [`Sync`]: std::marker::Sync
77pub fn queue_from<T, S>(storage: S) -> (Reader<S>, Writer<S>)
78where
79 S: Storage<T>,
80{
81 let ring = Ring::new(storage.capacity());
82 queue_from_parts(ring, storage)
83}
84
85/// Creates a queue from a separately allocated ring and storage. The queue will use the ring's
86/// capacity, and be initialized with a read buffer from the ring's left region and a write buffer
87/// from the ring's right region.
88///
89/// It is up to the user to ensure the storage has enough capacity for the ring. If the ring's
90/// capacity is larger than the storage's length, the reader and writer may panic.
91///
92/// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the storage also
93/// does.
94///
95/// # Example
96/// ```
97/// use mini_io_queue::Ring;
98/// use mini_io_queue::asyncio::queue_from_parts;
99/// use mini_io_queue::storage::{HeapBuffer, Storage};
100///
101/// // Create a queue with half of the underlying buffer in the read side.
102/// let ring = Ring::new(10);
103/// ring.advance_right(5);
104///
105/// let mut buffer = HeapBuffer::new(10);
106/// buffer.slice_mut(0..5).copy_from_slice(&[1, 2, 3, 4, 5]);
107///
108/// let (reader, writer) = queue_from_parts(ring, buffer);
109/// ```
110///
111/// [`Send`]: std::marker::Send
112/// [`Sync`]: std::marker::Sync
113pub fn queue_from_parts<S>(ring: Ring, storage: S) -> (Reader<S>, Writer<S>) {
114 let state = Arc::new(State {
115 ring,
116 storage,
117
118 is_reader_open: AtomicBool::new(true),
119 is_writer_open: AtomicBool::new(true),
120
121 data_available_waker: AtomicWaker::new(),
122 space_available_waker: AtomicWaker::new(),
123 });
124
125 let reader = Reader {
126 state: state.clone(),
127 };
128 let writer = Writer { state };
129
130 (reader, writer)
131}
132
133#[cfg(feature = "heap-buffer")]
134mod heap_constructors {
135 use crate::asyncio::{queue_from_parts, Reader, Writer};
136 use crate::storage::HeapBuffer;
137 use crate::Ring;
138
139 /// Creates a queue with a specific capacity, allocating storage on the heap. The queue will
140 /// be initialized with an empty read buffer and a full write buffer containing the element's
141 /// default value.
142 ///
143 /// Note that the reader and writer will only implement [`Send`] and [`Sync`] if the element
144 /// type also does.
145 ///
146 /// # Example
147 /// ```
148 /// use mini_io_queue::asyncio::queue;
149 ///
150 /// let (reader, writer) = queue::<u8>(100);
151 /// ```
152 ///
153 /// [`Send`]: std::marker::Send
154 /// [`Sync`]: std::marker::Sync
155 #[cfg_attr(docsrs, doc(cfg(feature = "heap-buffer")))]
156 pub fn queue<T>(capacity: usize) -> (Reader<HeapBuffer<T>>, Writer<HeapBuffer<T>>)
157 where
158 T: Default,
159 {
160 let ring = Ring::new(capacity);
161 let buffer = HeapBuffer::new(capacity);
162
163 queue_from_parts(ring, buffer)
164 }
165}
166
167#[cfg(feature = "heap-buffer")]
168pub use self::heap_constructors::*;
169
170#[derive(Debug)]
171struct State<S> {
172 ring: Ring,
173 storage: S,
174
175 is_reader_open: AtomicBool,
176 is_writer_open: AtomicBool,
177
178 data_available_waker: AtomicWaker,
179 space_available_waker: AtomicWaker,
180}
181
182/// An error indicating why a writer failed.
183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub enum WriteError {
185 /// Writing failed because the reader was closed, preventing the read buffer from emptying.
186 ReaderClosed,
187}
188
189/// An error indicating why reading failed.
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub enum ReadExactError {
192 /// Reading failed because the writer was closed, meaning no more data will become available.
193 WriterClosed,
194}
195
196/// Receives items from the queue.
197///
198/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
199/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
200///
201/// A reader will automatically [`close`] itself when dropped.
202///
203/// [`close`]: Reader::close
204#[derive(Debug)]
205pub struct Reader<S> {
206 state: Arc<State<S>>,
207}
208
209/// Adds items to the queue.
210///
211/// Values sent by the writer will be added to the end of the reader's buffer, and capacity can be
212/// sent back to the writer from the start of the reader's buffer to allow it to write more data.
213///
214/// A writer will automatically close itself when dropped.
215#[derive(Debug)]
216pub struct Writer<S> {
217 state: Arc<State<S>>,
218}
219
220impl<S> State<S> {
221 fn close_reader(&self) {
222 let was_open = self.is_reader_open.swap(false, Ordering::AcqRel);
223 if was_open {
224 self.space_available_waker.wake();
225 }
226 }
227
228 fn close_writer(&self) {
229 let was_open = self.is_writer_open.swap(false, Ordering::AcqRel);
230 if was_open {
231 self.data_available_waker.wake();
232 }
233 }
234}
235
236impl<S> Reader<S> {
237 /// Returns if the corresponding writer is still open.
238 ///
239 /// If this is `false`, unread data will still be available to read but a well-behaved writer
240 /// will not provide any new data.
241 #[inline]
242 pub fn is_writer_open(&self) -> bool {
243 self.state.is_writer_open.load(Ordering::Acquire)
244 }
245
246 /// Returns if data is available in the reader's buffer.
247 ///
248 /// If this is true it is guaranteed that the next call to [`poll_fill_buf`] will return a
249 /// non-empty slice, unless [`consume`] is called first.
250 ///
251 /// Keep in mind that when using a reader and writer on separate threads, a reader that has no
252 /// data can receive data at any time - even between calls to `has_data` and other functions.
253 ///
254 /// [`poll_fill_buf`]: Reader::poll_fill_buf
255 /// [`consume`]: Reader::consume
256 #[inline]
257 pub fn has_data(&self) -> bool {
258 let (r0, r1) = self.state.ring.left_ranges();
259 !r0.is_empty() || !r1.is_empty()
260 }
261
262 /// Returns if the buffer is full, i.e all space is allocated to the reader and any write
263 /// operations will stall.
264 ///
265 /// If this is true a reader can only resume the writer by calling [`consume`] to pass capacity
266 /// to the writer.
267 ///
268 /// Keep in mind that when using a reader and writer on separate threads, a reader that is not
269 /// full can become full at any time - even between calls to `is_full` and other functions.
270 ///
271 /// [`consume`]: Reader::consume
272 #[inline]
273 pub fn is_full(&self) -> bool {
274 let (r0, r1) = self.state.ring.right_ranges();
275 r0.is_empty() && r1.is_empty()
276 }
277
278 /// Attempt to read from the reader's buffer, waiting for more data if it is empty.
279 ///
280 /// On success, returns `Poll::Ready(Ok(buf))`.
281 ///
282 /// If no data is available for reading, the method returns `Poll::Pending` and arranges for
283 /// the current task to receive a notification when the writer provides data or is closed.
284 ///
285 /// This function is a lower-level call. It needs to be paired with the [`consume`] method to
286 /// function properly. When calling this method, none of the contents will be "read" in the
287 /// sense that later calling `poll_fill_buf` may return the same contents. As such,
288 /// [`consume`] must be called with the number of items that are consumed from this buffer to
289 /// ensure that the items are never returned twice.
290 ///
291 /// An empty buffer returned indicates that all data has been read and the writer has closed.
292 ///
293 /// [`consume`]: Reader::consume
294 pub fn poll_fill_buf<T>(&mut self, cx: &mut Context<'_>) -> Poll<Region<T>>
295 where
296 S: Storage<T>,
297 {
298 if self.has_data() {
299 return Poll::Ready(self.buf());
300 }
301
302 // If no data is available, ask the writer to wake us when it writes something.
303 self.state.data_available_waker.register(cx.waker());
304
305 // Check if data appeared between the last check and the waker being set.
306 if self.has_data() {
307 // Data became available, remove the waker to avoid unnecessarily waking this task.
308 self.state.data_available_waker.take();
309
310 return Poll::Ready(self.buf());
311 }
312
313 // If the writer is closed, we've now read everything we could.
314 // This must be checked after registering a waker to ensure we will see new
315 // `is_writer_closed` values if the writer closed while registering the waker.
316 if !self.is_writer_open() {
317 // Remove the waker to avoid unnecessarily waking this task now it's done.
318 self.state.data_available_waker.take();
319
320 // Empty slice indicates the writer closed.
321 return Poll::Ready(Default::default());
322 }
323
324 // Still empty, park until writer indicates data available.
325 Poll::Pending
326 }
327
328 /// Marks items at the start of the reader buffer as consumed, removing them from the slice
329 /// returned by [`poll_fill_buf`] and adding their capacity to the end of the writer's buffer.
330 /// Since queues have a fixed underlying length, calling this is required to allow the transfer
331 /// of more data.
332 ///
333 /// # Panics
334 /// This function will panic if `amt` is larger than the reader's available data buffer.
335 ///
336 /// [`poll_fill_buf`]: Reader::poll_fill_buf
337 pub fn consume(&mut self, amt: usize) {
338 self.state.ring.advance_left(amt);
339
340 // Wake the writer if it was waiting for space.
341 self.state.space_available_waker.wake();
342 }
343
344 /// Pulls some items from this queue into the specified buffer, returning how many items were
345 /// read.
346 ///
347 /// This method will complete immediately if at least one item is available to be read.
348 ///
349 /// # Return
350 /// It is guaranteed that the return value is `<= buf.len()`.
351 ///
352 /// A return value of `0` indicates one of these two scenarios:
353 /// 1. The writer has closed and all items have been read.
354 /// 2. The buffer specified had a length of 0.
355 ///
356 /// # Cancel safety
357 /// This method is cancel safe. If you use it in a `select!` statement and some other branch
358 /// completes first, then it is guaranteed that no data was read.
359 pub async fn read<T>(&mut self, buf: &mut [T]) -> usize
360 where
361 S: Storage<T>,
362 T: Clone,
363 {
364 Read { reader: self, buf }.await
365 }
366
367 /// Reads the exact number of items required to fill `buf`.
368 ///
369 /// If the writer closes before the buffer is completely filled, an error of the kind
370 /// [`ReadExactError::WriterClosed`] will be returned.
371 ///
372 /// # Return
373 /// If the return value is `Ok(n)`, it is guaranteed that `n == buf.len()`.
374 ///
375 /// # Cancel safety
376 /// This method is not cancellation safe. If the method is used in a `select!` statement and
377 /// some other branch completes first, then some data may already have been read into `buf`.
378 pub async fn read_exact<T>(&mut self, buf: &mut [T]) -> Result<usize, ReadExactError>
379 where
380 S: Storage<T>,
381 T: Clone,
382 {
383 ReadExact {
384 reader: self,
385 buf,
386 read_bytes: 0,
387 }
388 .await
389 }
390
391 /// Close the reader, indicating to the writer that no more data will be read.
392 ///
393 /// Any in-progress writes or flushes on the writer will be interrupted, and any future
394 /// operations will fail. Closing the reader multiple times has no effect.
395 ///
396 /// Dropping the reader object will also close it.
397 #[inline]
398 pub fn close(&mut self) {
399 self.state.close_reader();
400 }
401
402 #[inline]
403 fn buf<T>(&self) -> Region<T>
404 where
405 S: Storage<T>,
406 {
407 let (range_0, range_1) = self.state.ring.left_ranges();
408 Region::new(
409 self.state.storage.slice(range_0),
410 self.state.storage.slice(range_1),
411 )
412 }
413}
414
415impl<S> Drop for Reader<S> {
416 /// Closes the reader.
417 #[inline]
418 fn drop(&mut self) {
419 self.state.close_reader();
420 }
421}
422
423struct Read<'a, T, S> {
424 reader: &'a mut Reader<S>,
425 buf: &'a mut [T],
426}
427
428impl<'a, T, S> Future for Read<'a, T, S>
429where
430 S: Storage<T>,
431 T: Clone,
432{
433 type Output = usize;
434
435 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
436 let me = self.get_mut();
437
438 let src_buf = match me.reader.poll_fill_buf(cx) {
439 Poll::Ready(src_buf) => src_buf,
440 Poll::Pending => return Poll::Pending,
441 };
442
443 if src_buf.is_empty() {
444 // This indicates the writer has closed and all data has been read.
445 return Poll::Ready(0);
446 }
447
448 let len = src_buf.len().min(me.buf.len());
449 src_buf.slice(..len).clone_to_slice(&mut me.buf[..len]);
450
451 me.reader.consume(len);
452 Poll::Ready(len)
453 }
454}
455
456struct ReadExact<'a, T, S> {
457 reader: &'a mut Reader<S>,
458 buf: &'a mut [T],
459 read_bytes: usize,
460}
461
462impl<'a, T, S> Future for ReadExact<'a, T, S>
463where
464 S: Storage<T>,
465 T: Clone,
466{
467 type Output = Result<usize, ReadExactError>;
468
469 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
470 let me = self.get_mut();
471
472 // Avoid polling the reader if the buffer is empty. This branch should only be reachable
473 // on the first poll, before any data has been read.
474 if me.buf.is_empty() {
475 debug_assert_eq!(me.read_bytes, 0);
476 return Poll::Ready(Ok(0));
477 }
478
479 let src_buf = match me.reader.poll_fill_buf(cx) {
480 Poll::Ready(src_buf) => src_buf,
481 Poll::Pending => return Poll::Pending,
482 };
483 let dest_buf = &mut me.buf[me.read_bytes..];
484
485 // Copy as much data as possible.
486 let read_len = dest_buf.len().min(src_buf.len());
487 src_buf
488 .slice(..read_len)
489 .clone_to_slice(&mut dest_buf[..read_len]);
490 me.reader.consume(read_len);
491
492 // Offset buffer for the next read.
493 me.read_bytes += read_len;
494
495 if dest_buf.len() == read_len {
496 // The buffer has been filled.
497 Poll::Ready(Ok(me.read_bytes))
498 } else if me.reader.is_writer_open() {
499 // Need to wait for more data to fill the buffer.
500 Poll::Pending
501 } else {
502 // The writer has closed, required data will never be ready.
503 Poll::Ready(Err(ReadExactError::WriterClosed))
504 }
505 }
506}
507
508impl<S> Writer<S> {
509 /// Returns if the corresponding reader is still open.
510 ///
511 /// If this is `false`, any attempt to write or flush the object will fail.
512 #[inline]
513 pub fn is_reader_open(&self) -> bool {
514 self.state.is_reader_open.load(Ordering::Acquire)
515 }
516
517 /// Returns if space is available in the writer's buffer.
518 ///
519 /// If this is true it is guaranteed that the next call to [`poll_empty_buf`] will return a
520 /// non-empty slice, unless [`feed`] is called first.
521 ///
522 /// Keep in mind that when using a reader and writer on separate threads, a writer that has no
523 /// space can have more made available at any time - even between calls to `has_space` and other
524 /// functions.
525 ///
526 /// [`poll_empty_buf`]: Writer::poll_empty_buf
527 /// [`feed`]: Writer::feed
528 #[inline]
529 pub fn has_space(&self) -> bool {
530 let (r0, r1) = self.state.ring.right_ranges();
531 !r0.is_empty() || !r1.is_empty()
532 }
533
534 /// Returns if the buffer is flushed, i.e there are no items to read and any read operations
535 /// will stall.
536 ///
537 /// If this is true a writer can only resume the reader by calling [`feed`] to pass items to
538 /// the reader.
539 ///
540 /// Keep in mind that when using a reader and writer on separate threads, a writer that is not
541 /// flushed can become flushed at any time - even between calls to `is_flushed` and other
542 /// functions.
543 ///
544 /// [`feed`]: Writer::feed
545 #[inline]
546 pub fn is_flushed(&self) -> bool {
547 let (r0, r1) = self.state.ring.left_ranges();
548 r0.is_empty() && r1.is_empty()
549 }
550
551 fn get_flush_state(&self) -> Option<Result<(), WriteError>> {
552 if self.is_flushed() {
553 return Some(Ok(()));
554 }
555 if !self.is_reader_open() {
556 return Some(Err(WriteError::ReaderClosed));
557 }
558 None
559 }
560
561 #[inline]
562 fn buf<T>(&mut self) -> RegionMut<T>
563 where
564 S: Storage<T>,
565 {
566 let (range_0, range_1) = self.state.ring.right_ranges();
567
568 // `Ring` guarantees that a left region will only overlap a right region when this order
569 // is followed:
570 // - Get the right region range
571 // - Advance the right region
572 // - Get the left region range
573 // Given that the borrow checker prevents this here (`buf` and `consume` both take
574 // &mut self), and assuming the Reader behaves correctly and does not hold references to the
575 // left region's buffer while advancing it, there is no way to get another range that
576 // overlaps this one.
577 RegionMut::new(
578 unsafe { self.state.storage.slice_mut_unchecked(range_0) },
579 unsafe { self.state.storage.slice_mut_unchecked(range_1) },
580 )
581 }
582
583 /// Attempt to get the writable buffer, waiting for more space if it is empty.
584 ///
585 /// On success, returns `Poll::Ready(Ok(buf))`.
586 ///
587 /// If no space is available for writing, the method returns `Poll::Pending` and arranges for
588 /// the current task to receive a notification when the reader provides space or is closed.
589 ///
590 /// This functions is a lower-level call. It needs to be paired with the [`feed`] method to
591 /// function properly. When calling this method, none of the contents will be "written" in the
592 /// sense that later calling `poll_empty_buf` may return the same contents. As such, [`feed`]
593 /// must be called with the number of items that have been written to the buffer to ensure that
594 /// the items are never returned twice.
595 ///
596 /// An empty buffer returned indicates that the queue cannot be written to as the reader has
597 /// closed.
598 ///
599 /// [`feed`]: Writer::feed
600 pub fn poll_empty_buf<T>(&mut self, cx: &mut Context<'_>) -> Poll<RegionMut<T>>
601 where
602 S: Storage<T>,
603 {
604 // If the reader is closed there is no point in writing anything, even if space
605 // is available.
606 if !self.is_reader_open() {
607 // Empty slice indicates the reader closed.
608 return Poll::Ready(Default::default());
609 }
610
611 if self.has_space() {
612 return Poll::Ready(self.buf());
613 }
614
615 // If no space is available, ask the reader to wake us when it reads something.
616 self.state.space_available_waker.register(cx.waker());
617
618 // Check if the reader is closed again in case it was closed between the last check and the
619 // waker being set.
620 if !self.is_reader_open() {
621 // Remove the waker to avoid unnecessarily waking this task now it's done.
622 self.state.data_available_waker.take();
623
624 // Empty slice indicates the reader closed.
625 return Poll::Ready(Default::default());
626 }
627
628 // Check if space appeared between the last check and the waker being set.
629 if self.has_space() {
630 // Space became available remove the waker to avoid unnecessarily waking this task.
631 self.state.data_available_waker.take();
632
633 return Poll::Ready(self.buf());
634 }
635
636 // Still empty, park until reader indicates space available.
637 Poll::Pending
638 }
639
640 /// Marks items at the start of the writer buffer as ready to be read, removing them from the
641 /// slice returned by [`poll_empty_buf`] and making them available in the reader's buffer.
642 ///
643 /// # Panics
644 /// This function will panic if `amt` is larger than the writer's available space buffer.
645 ///
646 /// [`poll_empty_buf`]: Writer::poll_empty_buf
647 pub fn feed(&mut self, len: usize) {
648 self.state.ring.advance_right(len);
649
650 // Wake the reader if it was waiting for data
651 self.state.data_available_waker.wake();
652 }
653
654 /// Writes some items from a buffer into this queue, returning how many items were written.
655 ///
656 /// This function will attempt to write the entire contents of `buf`, but the entire write may
657 /// not succeed if not enough space is available.
658 ///
659 /// # Return
660 /// It is guaranteed that the return value is `<= buf.len()`.
661 ///
662 /// A return value of `0` indicates one of these two scenarios:
663 /// 1. The reader has closed.
664 /// 2. The buffer specified had a length of 0.
665 ///
666 /// # Cancel safety
667 /// This method is cancel safe. If you use it in a `select!` statement and some other branch
668 /// completes first, then it is guaranteed that no data was written.
669 pub async fn write<T>(&mut self, buf: &[T]) -> usize
670 where
671 S: Storage<T>,
672 T: Clone,
673 {
674 Write { writer: self, buf }.await
675 }
676
677 /// Attempts to write all items in a buffer into this queue.
678 ///
679 /// If the reader closes before all items are written, an error of the kind
680 /// [`WriteError::ReaderClosed`] will be returned.
681 ///
682 /// # Return
683 /// If the return value is `Ok(n)`, it is guaranteed that `n == buf.len()`.
684 ///
685 /// # Cancel safety
686 /// This method is not cancellation safe. If it is used in a `select!` statement and some other
687 /// branch completes first, then the provided buffer may have been partially written, but
688 /// future calls to `write_all` will start over from the beginning of the buffer.
689 pub async fn write_all<T>(&mut self, buf: &[T]) -> Result<usize, WriteError>
690 where
691 S: Storage<T>,
692 T: Clone,
693 {
694 WriteAll {
695 writer: self,
696 buf,
697 written_bytes: 0,
698 }
699 .await
700 }
701
702 /// Attempt to flush the buffer, ensuring that any items waiting to be read are consumed by the
703 /// reader.
704 ///
705 /// On success, returns `Poll::Ready(Ok(()))`. If the reader is closed, returns
706 /// `Poll::Ready(Err(FlushError::ReaderClosed))`.
707 ///
708 /// If flushing cannot immediately complete, this method returns `Poll::Pending` and arranges
709 /// for the current task to receive a notification when the object can make progress towards
710 /// flushing.
711 pub fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
712 if let Some(flush_state) = self.get_flush_state() {
713 return Poll::Ready(flush_state);
714 }
715
716 // Wait for more data to be read before checking if the data is flushed.
717 self.state.space_available_waker.register(cx.waker());
718
719 // Check the flush state again in case the reader read between the last check and the waker
720 // being set.
721 if let Some(flush_state) = self.get_flush_state() {
722 // Flush is complete, remove the waker to avoid unnecessarily waking this task.
723 self.state.space_available_waker.take();
724
725 return Poll::Ready(flush_state);
726 }
727
728 Poll::Pending
729 }
730
731 /// Flushes the buffer, ensuring that any items waiting to be read are consumed by the reader.
732 ///
733 /// If the reader closes before the buffer is flushed, an error of the kind
734 /// [`WriteError::ReaderClosed`] will be returned.
735 pub async fn flush(&mut self) -> Result<(), WriteError> {
736 Flush(self).await
737 }
738
739 /// Attempt to close the writer, flushing any remaining data and indicating to the reader that
740 /// no more data will be written.
741 ///
742 /// On success, returns `Poll::Ready(Ok(()))`. Any future read operations will fail. Closing
743 /// the writer multiple times has no effect.
744 pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), WriteError>> {
745 // Wait for data to be flushed.
746 match self.poll_flush(cx) {
747 Poll::Ready(Ok(())) => {}
748 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
749 Poll::Pending => return Poll::Pending,
750 }
751
752 self.state.close_writer();
753 Poll::Ready(Ok(()))
754 }
755
756 /// Closes the buffer, flushing any remaining data and indicating to the reader that no more
757 /// data will be written.
758 ///
759 /// If the reader closes before the buffer is flushed, an error of the kind
760 /// [`WriteError::ReaderClosed`] will be returned.
761 pub async fn close(&mut self) -> Result<(), WriteError> {
762 Flush(self).await
763 }
764}
765
766impl<S> Drop for Writer<S> {
767 /// Closes the writer without flushing.
768 #[inline]
769 fn drop(&mut self) {
770 self.state.close_writer();
771 }
772}
773
774struct Write<'a, T, S> {
775 writer: &'a mut Writer<S>,
776 buf: &'a [T],
777}
778
779impl<'a, T, S> Future for Write<'a, T, S>
780where
781 S: Storage<T>,
782 T: Clone,
783{
784 type Output = usize;
785
786 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
787 let me = self.get_mut();
788
789 let mut dest_buf = match me.writer.poll_empty_buf(cx) {
790 Poll::Ready(dest_buf) => dest_buf,
791 Poll::Pending => return Poll::Pending,
792 };
793
794 if dest_buf.is_empty() {
795 // This indicates the reader has closed.
796 return Poll::Ready(0);
797 }
798
799 let len = dest_buf.len().min(me.buf.len());
800 dest_buf.slice_mut(..len).clone_from_slice(&me.buf[..len]);
801
802 me.writer.feed(len);
803 Poll::Ready(len)
804 }
805}
806
807struct WriteAll<'a, T, S> {
808 writer: &'a mut Writer<S>,
809 buf: &'a [T],
810 written_bytes: usize,
811}
812
813impl<'a, T, S> Future for WriteAll<'a, T, S>
814where
815 S: Storage<T>,
816 T: Clone,
817{
818 type Output = Result<usize, WriteError>;
819
820 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
821 let me = self.get_mut();
822
823 // Avoid polling the writer if the buffer is empty. This branch should only be reachable
824 // on the first poll, before any data has been written.
825 if me.buf.is_empty() {
826 debug_assert_eq!(me.written_bytes, 0);
827 return Poll::Ready(Ok(0));
828 }
829
830 let mut dest_buf = match me.writer.poll_empty_buf(cx) {
831 Poll::Ready(dest_buf) => dest_buf,
832 Poll::Pending => return Poll::Pending,
833 };
834 let src_buf = &me.buf[me.written_bytes..];
835
836 if dest_buf.is_empty() {
837 // The reader has closed.
838 return Poll::Ready(Err(WriteError::ReaderClosed));
839 }
840
841 // Copy as much data as possible.
842 let write_len = dest_buf.len().min(src_buf.len());
843 dest_buf
844 .slice_mut(..write_len)
845 .clone_from_slice(&src_buf[..write_len]);
846 me.writer.feed(write_len);
847
848 // Offset buffer for the next write.
849 me.written_bytes += write_len;
850
851 if src_buf.len() == write_len {
852 // All data has been written.
853 Poll::Ready(Ok(me.written_bytes))
854 } else {
855 // Need to wait for more space to write remaining data.
856 Poll::Pending
857 }
858 }
859}
860
861struct Flush<'a, S>(&'a mut Writer<S>);
862
863impl<'a, S> Future for Flush<'a, S> {
864 type Output = Result<(), WriteError>;
865
866 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
867 self.get_mut().0.poll_flush(cx)
868 }
869}
870
871struct Close<'a, S>(&'a mut Writer<S>);
872
873impl<'a, S> Future for Close<'a, S> {
874 type Output = Result<(), WriteError>;
875
876 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
877 self.get_mut().0.poll_close(cx)
878 }
879}
880
881impl fmt::Display for WriteError {
882 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
883 match self {
884 WriteError::ReaderClosed => write!(f, "reader closed"),
885 }
886 }
887}
888
889impl fmt::Display for ReadExactError {
890 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
891 match self {
892 ReadExactError::WriterClosed => write!(f, "writer closed"),
893 }
894 }
895}
896
897#[cfg(feature = "std")]
898mod std_impls {
899 use crate::asyncio::{ReadExactError, WriteError};
900 use std::{error, io};
901
902 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
903 impl error::Error for WriteError {}
904
905 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
906 impl From<WriteError> for io::Error {
907 fn from(err: WriteError) -> Self {
908 match err {
909 WriteError::ReaderClosed => io::ErrorKind::UnexpectedEof.into(),
910 }
911 }
912 }
913
914 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
915 impl error::Error for ReadExactError {}
916
917 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
918 impl From<ReadExactError> for io::Error {
919 fn from(err: ReadExactError) -> Self {
920 match err {
921 ReadExactError::WriterClosed => io::ErrorKind::UnexpectedEof.into(),
922 }
923 }
924 }
925}
926
927#[cfg(feature = "std")]
928pub use self::std_impls::*;
929
930#[cfg(feature = "std-io")]
931mod io_impls {
932 use crate::asyncio::{Reader, Writer};
933 use crate::storage::Storage;
934 use core::pin::Pin;
935 use core::task::{Context, Poll};
936 use futures::{io, AsyncBufRead, AsyncRead, AsyncWrite};
937
938 #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
939 impl<S> AsyncRead for Reader<S>
940 where
941 S: Storage<u8>,
942 {
943 fn poll_read(
944 self: Pin<&mut Self>,
945 cx: &mut Context<'_>,
946 buf: &mut [u8],
947 ) -> Poll<io::Result<usize>> {
948 let me = self.get_mut();
949
950 let src_buf = match me.poll_fill_buf(cx) {
951 Poll::Ready(src_buf) => src_buf,
952 Poll::Pending => return Poll::Pending,
953 };
954
955 if src_buf.is_empty() {
956 // This indicates the writer has closed and all data has been read.
957 return Poll::Ready(Ok(0));
958 }
959
960 let len = src_buf.len().min(buf.len());
961 src_buf.slice(..len).copy_to_slice(&mut buf[..len]);
962
963 me.consume(len);
964
965 Poll::Ready(Ok(len))
966 }
967 }
968
969 #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
970 impl<S> AsyncBufRead for Reader<S>
971 where
972 S: Storage<u8>,
973 {
974 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
975 self.get_mut()
976 .poll_fill_buf(cx)
977 .map(|region| Ok(region.contiguous()))
978 }
979
980 fn consume(self: Pin<&mut Self>, amt: usize) {
981 self.get_mut().consume(amt);
982 }
983 }
984
985 #[cfg_attr(docsrs, doc(cfg(feature = "std-io")))]
986 impl<S> AsyncWrite for Writer<S>
987 where
988 S: Storage<u8>,
989 {
990 fn poll_write(
991 self: Pin<&mut Self>,
992 cx: &mut Context<'_>,
993 buf: &[u8],
994 ) -> Poll<io::Result<usize>> {
995 let me = self.get_mut();
996
997 let mut dest_buf = match me.poll_empty_buf(cx) {
998 Poll::Ready(dest_buf) => dest_buf,
999 Poll::Pending => return Poll::Pending,
1000 };
1001
1002 if dest_buf.is_empty() {
1003 // This indicates the reader has closed.
1004 return Poll::Ready(Ok(0));
1005 }
1006
1007 let len = dest_buf.len().min(buf.len());
1008 dest_buf.slice_mut(..len).copy_from_slice(&buf[..len]);
1009
1010 me.feed(len);
1011
1012 Poll::Ready(Ok(len))
1013 }
1014
1015 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1016 self.get_mut().poll_flush(cx).map(|r| r.map_err(Into::into))
1017 }
1018
1019 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1020 self.get_mut().poll_close(cx).map(|r| r.map_err(Into::into))
1021 }
1022 }
1023}
1024
1025#[cfg(feature = "std-io")]
1026pub use self::io_impls::*;