Skip to main content

piper/
lib.rs

1//! A bounded single-producer single-consumer pipe.
2//!
3//! This crate provides a ring buffer that can be asynchronously read from and written to. It is
4//! created via the [`pipe`] function, which returns a pair of [`Reader`] and [`Writer`] handles.
5//! They implement the [`AsyncRead`] and [`AsyncWrite`] traits, respectively.
6//!
7//! The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need `&mut`
8//! access to read or write to them. If multiple-producer/multiple-consumer handles are needed,
9//! consider wrapping them in an `Arc<Mutex<...>>` or similar.
10//!
11//! When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
12//! to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
13//!
14//! When the receiver is dropped, the pipe is closed and no more bytes can be written into it.
15//! Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
16//!
17//! # Version 0.2.0 Notes
18//!
19//! Previously, this crate contained other synchronization primitives, such as bounded channels, locks,
20//! and event listeners. These have been split out into their own crates:
21//!
22//! - [`async-channel`](https://docs.rs/async-channel)
23//! - [`async-dup`](https://docs.rs/async-dup)
24//! - [`async-lock`](https://docs.rs/async-lock)
25//! - [`async-mutex`](https://docs.rs/async-mutex)
26//! - [`event-listener`](https://docs.rs/event-listener)
27//!
28//! # Examples
29//!
30//! ## Asynchronous Tasks
31//!
32//! Communicate between asynchronous tasks, potentially on other threads.
33//!
34//! ```
35//! use async_channel::unbounded;
36//! use async_executor::Executor;
37//! use easy_parallel::Parallel;
38//! use futures_lite::{future, prelude::*};
39//! use std::time::Duration;
40//!
41//! # if cfg!(miri) { return; }
42//!
43//! // Create a pair of handles.
44//! let (mut reader, mut writer) = piper::pipe(1024);
45//!
46//! // Create the executor.
47//! let ex = Executor::new();
48//! let (signal, shutdown) = unbounded::<()>();
49//!
50//! // Spawn a detached task for random data to the pipe.
51//! let writer = ex.spawn(async move {
52//!     for _ in 0..1_000 {
53//!         // Generate 8 random numnbers.
54//!         let random = fastrand::u64(..).to_le_bytes();
55//!
56//!         // Write them to the pipe.
57//!         writer.write_all(&random).await.unwrap();
58//!
59//!         // Wait a bit.
60//!         async_io::Timer::after(Duration::from_millis(5)).await;
61//!     }
62//!
63//!     // Drop the writer to close the pipe.
64//!     drop(writer);
65//! });
66//!
67//! // Detach the task so that it runs in the background.
68//! writer.detach();
69//!
70//! // Spawn a task for reading from the pipe.
71//! let reader = ex.spawn(async move {
72//!     let mut buf = vec![];
73//!
74//!     // Read all bytes from the pipe.
75//!     reader.read_to_end(&mut buf).await.unwrap();
76//!
77//!     println!("Random data: {:#?}", buf);
78//! });
79//!
80//! Parallel::new()
81//!     // Run four executor threads.
82//!     .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
83//!     // Run the main future on the current thread.
84//!     .finish(|| future::block_on(async {
85//!         // Wait for the reader to finish.
86//!         reader.await;
87//!
88//!         // Signal the executor threads to shut down.
89//!         drop(signal);
90//!     }));
91//! ```
92//!
93//! ## Blocking I/O
94//!
95//! File I/O is blocking; therefore, in `async` code, you must run it on another thread. This example
96//! spawns another thread for reading a file and writing it to a pipe.
97//!
98//! ```no_run
99//! use futures_lite::{future, prelude::*};
100//! use std::fs::File;
101//! use std::io::prelude::*;
102//! use std::thread;
103//!
104//! // Create a pair of handles.
105//! let (mut r, mut w) = piper::pipe(1024);
106//!
107//! // Spawn a thread for reading a file.
108//! thread::spawn(move || {
109//!     let mut file = File::open("Cargo.toml").unwrap();
110//!
111//!     // Read the file into a buffer.
112//!     let mut buf = [0u8; 16384];
113//!     future::block_on(async move {
114//!         loop {
115//!             // Read a chunk of bytes from the file.
116//!             // Blocking is okay here, since this is a separate thread.
117//!             let n = file.read(&mut buf).unwrap();
118//!             if n == 0 {
119//!                 break;
120//!             }
121//!
122//!             // Write the chunk to the pipe.
123//!             w.write_all(&buf[..n]).await.unwrap();
124//!         }
125//!
126//!         // Close the pipe.
127//!         drop(w);
128//!     });
129//! });
130//!
131//! # future::block_on(async move {
132//! // Read bytes from the pipe.
133//! let mut buf = vec![];
134//! r.read_to_end(&mut buf).await.unwrap();
135//!
136//! println!("Read {} bytes", buf.len());
137//! # });
138//! ```
139//!
140//! However, the lower-level [`poll_fill`] and [`poll_drain`] methods take `impl Read` and `impl Write`
141//! arguments, respectively. This allows you to skip the buffer entirely and read/write directly from
142//! the file into the pipe. This approach should be preferred when possible, as it avoids an extra
143//! copy.
144//!
145//! ```no_run
146//! # use futures_lite::future;
147//! # use std::fs::File;
148//! # let mut file: File = unimplemented!();
149//! # let mut w: piper::Writer = unimplemented!();
150//! // In the `future::block_on` call above...
151//! # future::block_on(async move {
152//! loop {
153//!     let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
154//!     if n == 0 {
155//!         break;
156//!     }
157//! }
158//! # });
159//! ```
160//!
161//! The [`blocking`] crate is preferred in this use case, since it uses more efficient strategies for
162//! thread management and pipes.
163//!
164//! [`poll_fill`]: struct.Writer.html#method.poll_fill
165//! [`poll_drain`]: struct.Reader.html#method.poll_drain
166//! [`blocking`]: https://docs.rs/blocking
167
168#![cfg_attr(not(feature = "std"), no_std)]
169#![forbid(missing_docs)]
170#![doc(
171    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
172)]
173#![doc(
174    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
175)]
176
177extern crate alloc;
178
179use core::convert::Infallible;
180use core::mem;
181use core::slice;
182use core::task::{Context, Poll};
183
184use alloc::vec::Vec;
185
186use sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
187use sync::Arc;
188
189#[cfg(feature = "std")]
190use std::{
191    io::{self, Read, Write},
192    pin::Pin,
193};
194
195use atomic_waker::AtomicWaker;
196
197#[cfg(feature = "std")]
198use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
199
200macro_rules! ready {
201    ($e:expr) => {{
202        match $e {
203            Poll::Ready(t) => t,
204            Poll::Pending => return Poll::Pending,
205        }
206    }};
207}
208
209/// Creates a bounded single-producer single-consumer pipe.
210///
211/// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
212///
213/// See the [crate-level documentation](index.html) for more details.
214///
215/// # Panics
216///
217/// This function panics if `cap` is 0 or if `cap * 2` overflows a `usize`.
218#[allow(clippy::incompatible_msrv)] // false positive: https://github.com/rust-lang/rust-clippy/issues/12280
219pub fn pipe(cap: usize) -> (Reader, Writer) {
220    assert!(cap > 0, "capacity must be positive");
221    assert!(cap.checked_mul(2).is_some(), "capacity is too large");
222
223    // Allocate the ring buffer.
224    let mut v = Vec::with_capacity(cap);
225    let buffer = v.as_mut_ptr();
226    mem::forget(v);
227
228    let inner = Arc::new(Pipe {
229        head: AtomicUsize::new(0),
230        tail: AtomicUsize::new(0),
231        reader: AtomicWaker::new(),
232        writer: AtomicWaker::new(),
233        closed: AtomicBool::new(false),
234        buffer,
235        cap,
236    });
237
238    // Use a random number generator to randomize fair yielding behavior.
239    let mut rng = rng();
240
241    let r = Reader {
242        inner: inner.clone(),
243        head: 0,
244        tail: 0,
245        rng: rng.fork(),
246    };
247
248    let w = Writer {
249        inner,
250        head: 0,
251        tail: 0,
252        zeroed_until: 0,
253        rng,
254    };
255
256    (r, w)
257}
258
259/// The reading side of a pipe.
260///
261/// This type is created by the [`pipe`] function. See its documentation for more details.
262pub struct Reader {
263    /// The inner ring buffer.
264    inner: Arc<Pipe>,
265
266    /// The head index, moved by the reader, in the range `0..2*cap`.
267    ///
268    /// This index always matches `inner.head`.
269    head: usize,
270
271    /// The tail index, moved by the writer, in the range `0..2*cap`.
272    ///
273    /// This index is a snapshot of `index.tail` that might become stale at any point.
274    tail: usize,
275
276    /// Random number generator.
277    rng: fastrand::Rng,
278}
279
280/// The writing side of a pipe.
281///
282/// This type is created by the [`pipe`] function. See its documentation for more details.
283pub struct Writer {
284    /// The inner ring buffer.
285    inner: Arc<Pipe>,
286
287    /// The head index, moved by the reader, in the range `0..2*cap`.
288    ///
289    /// This index is a snapshot of `index.head` that might become stale at any point.
290    head: usize,
291
292    /// The tail index, moved by the writer, in the range `0..2*cap`.
293    ///
294    /// This index always matches `inner.tail`.
295    tail: usize,
296
297    /// How many bytes at the beginning of the buffer have been zeroed.
298    ///
299    /// The pipe allocates an uninitialized buffer, and we must be careful about passing
300    /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
301    /// expensive, so we zero it in smaller chunks as the writer makes progress.
302    zeroed_until: usize,
303
304    /// Random number generator.
305    rng: fastrand::Rng,
306}
307
308/// The inner ring buffer.
309///
310/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
311/// `0..cap` range. The distance between head and tail indices is never more than `cap`.
312///
313/// The reason why indices are not in the range `0..cap` is because we need to distinguish between
314/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
315/// could mean the pipe is either empty or full, but we don't know which!
316struct Pipe {
317    /// The head index, moved by the reader, in the range `0..2*cap`.
318    head: AtomicUsize,
319
320    /// The tail index, moved by the writer, in the range `0..2*cap`.
321    tail: AtomicUsize,
322
323    /// A waker representing the blocked reader.
324    reader: AtomicWaker,
325
326    /// A waker representing the blocked writer.
327    writer: AtomicWaker,
328
329    /// Set to `true` if the reader or writer was dropped.
330    closed: AtomicBool,
331
332    /// The byte buffer.
333    buffer: *mut u8,
334
335    /// The buffer capacity.
336    cap: usize,
337}
338
339unsafe impl Sync for Pipe {}
340unsafe impl Send for Pipe {}
341
342impl Drop for Pipe {
343    fn drop(&mut self) {
344        // Deallocate the byte buffer.
345        unsafe {
346            Vec::from_raw_parts(self.buffer, 0, self.cap);
347        }
348    }
349}
350
351impl Drop for Reader {
352    fn drop(&mut self) {
353        // Dropping closes the pipe and then wakes the writer.
354        self.inner.closed.store(true, Ordering::SeqCst);
355        self.inner.writer.wake();
356    }
357}
358
359impl Drop for Writer {
360    fn drop(&mut self) {
361        // Dropping closes the pipe and then wakes the reader.
362        self.inner.closed.store(true, Ordering::SeqCst);
363        self.inner.reader.wake();
364    }
365}
366
367impl Pipe {
368    /// Get the length of the data in the pipe.
369    fn len(&self) -> usize {
370        let head = self.head.load(Ordering::Acquire);
371        let tail = self.tail.load(Ordering::Acquire);
372
373        if head <= tail {
374            tail - head
375        } else {
376            (2 * self.cap) - (head - tail)
377        }
378    }
379
380    /// Given an index in `0..2*cap`, returns the real index in `0..cap`.
381    #[inline]
382    fn real_index(&self, i: usize) -> usize {
383        if i < self.cap {
384            i
385        } else {
386            i - self.cap
387        }
388    }
389}
390
391impl Reader {
392    /// Gets the total length of the data in the pipe.
393    ///
394    /// This method returns the number of bytes that have been written into the pipe but haven't been
395    /// read yet.
396    ///
397    /// # Examples
398    ///
399    /// ```
400    /// let (mut reader, mut writer) = piper::pipe(10);
401    /// let _ = writer.try_fill(&[0u8; 5]);
402    /// assert_eq!(reader.len(), 5);
403    /// ```
404    pub fn len(&self) -> usize {
405        self.inner.len()
406    }
407
408    /// Tell whether or not the pipe is empty.
409    ///
410    /// This method returns `true` if the pipe is empty, and `false` otherwise.
411    ///
412    /// # Examples
413    ///
414    /// ```
415    /// let (mut reader, mut writer) = piper::pipe(10);
416    /// assert!(reader.is_empty());
417    /// let _ = writer.try_fill(&[0u8; 5]);
418    /// assert!(!reader.is_empty());
419    /// ```
420    pub fn is_empty(&self) -> bool {
421        self.inner.len() == 0
422    }
423
424    /// Gets the total capacity of the pipe.
425    ///
426    /// This method returns the number of bytes that the pipe can hold at a time.
427    ///
428    /// # Examples
429    ///
430    /// ```
431    /// # futures_lite::future::block_on(async {
432    /// let (reader, _) = piper::pipe(10);
433    /// assert_eq!(reader.capacity(), 10);
434    /// # });
435    /// ```
436    pub fn capacity(&self) -> usize {
437        self.inner.cap
438    }
439
440    /// Tell whether or not the pipe is full.
441    ///
442    /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
443    /// writes will block until some data is read from the pipe.
444    ///
445    /// This method returns `true` if the pipe is full, and `false` otherwise.
446    ///
447    /// # Examples
448    ///
449    /// ```
450    /// let (mut reader, mut writer) = piper::pipe(10);
451    /// assert!(!reader.is_full());
452    /// let _ = writer.try_fill(&[0u8; 10]);
453    /// assert!(reader.is_full());
454    /// let _ = reader.try_drain(&mut [0u8; 5]);
455    /// assert!(!reader.is_full());
456    /// ```
457    pub fn is_full(&self) -> bool {
458        self.inner.len() == self.inner.cap
459    }
460
461    /// Tell whether or not the pipe is closed.
462    ///
463    /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
464    /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
465    /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
466    ///
467    /// # Examples
468    ///
469    /// ```
470    /// # futures_lite::future::block_on(async {
471    /// let (mut reader, mut writer) = piper::pipe(10);
472    /// assert!(!reader.is_closed());
473    /// drop(writer);
474    /// assert!(reader.is_closed());
475    /// # });
476    /// ```
477    pub fn is_closed(&self) -> bool {
478        self.inner.closed.load(Ordering::SeqCst)
479    }
480
481    /// Reads bytes from this reader and writes into blocking `dest`.
482    ///
483    /// This method reads directly from the pipe's internal buffer into `dest`. This avoids an extra copy,
484    /// but it may block the thread if `dest` blocks.
485    ///
486    /// If the pipe is empty, this method returns `Poll::Pending`. If the pipe is closed, this method
487    /// returns `Poll::Ready(Ok(0))`. Errors in `dest` are bubbled up through `Poll::Ready(Err(e))`.
488    /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes written.
489    ///
490    /// This method is only available when the `std` feature is enabled. For `no_std` environments,
491    /// consider using [`poll_drain_bytes`] instead.
492    ///
493    /// [`poll_drain_bytes`]: #method.poll_drain_bytes
494    ///
495    /// # Examples
496    ///
497    /// ```
498    /// use futures_lite::{future, prelude::*};
499    /// # future::block_on(async {
500    ///
501    /// let (mut r, mut w) = piper::pipe(1024);
502    ///
503    /// // Write some data to the pipe.
504    /// w.write_all(b"hello world").await.unwrap();
505    ///
506    /// // Try reading from the pipe.
507    /// let mut buf = [0; 1024];
508    /// let n = future::poll_fn(|cx| r.poll_drain(cx, &mut buf[..])).await.unwrap();
509    ///
510    /// // The data was written to the buffer.
511    /// assert_eq!(&buf[..n], b"hello world");
512    /// # });
513    /// ```
514    #[cfg(feature = "std")]
515    pub fn poll_drain(
516        &mut self,
517        cx: &mut Context<'_>,
518        dest: impl Write,
519    ) -> Poll<io::Result<usize>> {
520        self.drain_inner(Some(cx), dest)
521    }
522
523    /// Reads bytes from this reader.
524    ///
525    /// Rather than taking a `Write` trait object, this method takes a slice of bytes to write into.
526    /// Because of this, it is infallible and can be used in `no_std` environments.
527    ///
528    /// The same conditions that apply to [`poll_drain`] apply to this method.
529    ///
530    /// [`poll_drain`]: #method.poll_drain
531    ///
532    /// # Examples
533    ///
534    /// ```
535    /// use futures_lite::{future, prelude::*};
536    /// # future::block_on(async {
537    /// let (mut r, mut w) = piper::pipe(1024);
538    ///
539    /// // Write some data to the pipe.
540    /// w.write_all(b"hello world").await.unwrap();
541    ///
542    /// // Try reading from the pipe.
543    /// let mut buf = [0; 1024];
544    /// let n = future::poll_fn(|cx| r.poll_drain_bytes(cx, &mut buf[..])).await;
545    ///
546    /// // The data was written to the buffer.
547    /// assert_eq!(&buf[..n], b"hello world");
548    /// # });
549    /// ```
550    pub fn poll_drain_bytes(&mut self, cx: &mut Context<'_>, dest: &mut [u8]) -> Poll<usize> {
551        match self.drain_inner(Some(cx), WriteBytes(dest)) {
552            Poll::Ready(Ok(n)) => Poll::Ready(n),
553            Poll::Ready(Err(e)) => match e {},
554            Poll::Pending => Poll::Pending,
555        }
556    }
557
558    /// Poll for data to become available in the pipe or the write side to be closed.
559    ///
560    /// Returns `Poll::Ready(true)` when data is ready. Call
561    /// [`peek_buf()`][Self::peek_buf] to access the data, and
562    /// [`consume()`][Self::consume] to advance the read position.
563    ///
564    /// A return value of `Poll::Ready(false)` indicates that the pipe is closed.
565    ///
566    /// If no data is available, this method will return `Poll::Pending` and register the waker
567    /// to receive a notification when data is written to the pipe or the write end is closed.
568    ///
569    /// Unlike `AsyncBufRead::poll_fill_buf` method, this method is infallible and does not
570    /// require the `std` feature. It separates the polling, buffer access, and consume steps
571    /// for compatibility with `poll_fn`'s lifetime requirements.
572    ///
573    /// # Example
574    ///
575    /// ```
576    /// use futures_lite::{future, prelude::*};
577    ///
578    /// # future::block_on(async {
579    /// let (mut r, mut w) = piper::pipe(1024);
580    ///
581    /// // Write some data to the pipe.
582    /// w.write_all(b"hello world").await.unwrap();
583    ///
584    /// future::poll_fn(|cx| r.poll(cx)).await;
585    /// let buf = r.peek_buf();
586    /// assert_eq!(buf, &b"hello world"[..buf.len()]);
587    ///
588    /// // Consume one byte
589    /// r.consume(1);
590    ///
591    /// future::poll_fn(|cx| r.poll(cx)).await;
592    /// let buf = r.peek_buf();
593    /// assert_eq!(buf, &b"ello world"[..buf.len()]);
594    ///
595    /// r.consume(buf.len());
596    /// # });
597    /// ```
598    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<bool> {
599        self.poll_available(Some(cx))
600    }
601
602    /// Return the contents of the internal buffer that are available immediately.
603    ///
604    /// Call [`Self::consume()`] to consume the bytes returned by this method. The buffer might
605    /// not re-fill until another call to [`Self::poll()`] returns `Poll::Ready`.
606    pub fn peek_buf(&self) -> &[u8] {
607        let n = self
608            .available_data() // No more than bytes in the pipe.
609            .min(self.inner.cap - self.inner.real_index(self.head)); // Don't go past the buffer boundary.
610
611        unsafe { slice::from_raw_parts(self.inner.buffer.add(self.inner.real_index(self.head)), n) }
612    }
613
614    /// Consume `amt` bytes from the pipe.
615    ///
616    /// Panics if `amt` is greater than the length of the buffer returned by [`Self::peek_buf()`].
617    pub fn consume(&mut self, amt: usize) {
618        let cap = self.inner.cap;
619
620        assert!(
621            amt <= self.available_data() && self.head + amt <= 2 * cap,
622            "cannot consume more bytes than available in the pipe"
623        );
624
625        // Move the head forward.
626        if self.head + amt < 2 * cap {
627            self.head += amt;
628        } else {
629            self.head = 0;
630        }
631
632        // Store the current head index.
633        self.inner.head.store(self.head, Ordering::Release);
634
635        // Wake the writer because the pipe is not full.
636        self.inner.writer.wake();
637    }
638
639    /// Tries to read bytes from this reader.
640    ///
641    /// Returns the total number of bytes that were read from this reader.
642    ///
643    /// # Examples
644    ///
645    /// ```
646    /// let (mut r, mut w) = piper::pipe(1024);
647    ///
648    /// // `try_drain()` returns 0 off the bat.
649    /// let mut buf = [0; 10];
650    /// assert_eq!(r.try_drain(&mut buf), 0);
651    ///
652    /// // After a write it returns the data.
653    /// w.try_fill(&[0, 1, 2, 3, 4]);
654    /// assert_eq!(r.try_drain(&mut buf), 5);
655    /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
656    /// ```
657    pub fn try_drain(&mut self, dest: &mut [u8]) -> usize {
658        match self.drain_inner(None, WriteBytes(dest)) {
659            Poll::Ready(Ok(n)) => n,
660            Poll::Ready(Err(e)) => match e {},
661            Poll::Pending => 0,
662        }
663    }
664
665    /// Get the number of bytes available to read without synchronization.
666    #[inline]
667    fn available_data(&self) -> usize {
668        let a = self.head;
669        let b = self.tail;
670        if a <= b {
671            b - a
672        } else {
673            2 * self.inner.cap - (a - b)
674        }
675    }
676
677    /// Poll for available data or end of stream.
678    ///
679    /// Returns `Poll::Ready(true)` if data is available, `Poll::Ready(false)` if the pipe is closed,
680    /// or `Poll::Pending` if the pipe is empty and the waker has been registered.
681    fn poll_available(&mut self, mut cx: Option<&mut Context<'_>>) -> Poll<bool> {
682        // If the pipe appears to be empty...
683        if self.available_data() == 0 {
684            // Reload the tail in case it's become stale.
685            self.tail = self.inner.tail.load(Ordering::Acquire);
686
687            // If the pipe is now really empty...
688            if self.available_data() == 0 {
689                // Register the waker.
690                if let Some(cx) = cx.as_mut() {
691                    self.inner.reader.register(cx.waker());
692                }
693                atomic::fence(Ordering::SeqCst);
694
695                // Load whether the channel is closed or not early, so that we don't miss any writes
696                // between updating the tail and checking for close.
697                let is_closed = self.inner.closed.load(Ordering::Acquire);
698
699                // Reload the tail after registering the waker.
700                self.tail = self.inner.tail.load(Ordering::Acquire);
701
702                // If the pipe is still empty...
703                if self.available_data() == 0 {
704                    // Check whether the pipe is closed or just empty.
705                    if is_closed {
706                        return Poll::Ready(false);
707                    } else {
708                        return Poll::Pending;
709                    }
710                }
711            }
712        }
713
714        // The pipe is not empty so remove the waker.
715        self.inner.reader.take();
716        Poll::Ready(true)
717    }
718
719    /// Reads bytes from this reader and writes into blocking `dest`.
720    #[inline]
721    fn drain_inner<W: WriteLike>(
722        &mut self,
723        mut cx: Option<&mut Context<'_>>,
724        mut dest: W,
725    ) -> Poll<Result<usize, W::Error>> {
726        if !ready!(self.poll_available(cx.as_mut().map(|c| &mut **c))) {
727            // The pipe is closed
728            return Poll::Ready(Ok(0));
729        }
730
731        // Yield with some small probability - this improves fairness.
732        if let Some(cx) = cx {
733            ready!(maybe_yield(&mut self.rng, cx));
734        }
735
736        // Number of bytes read so far.
737        let mut count = 0;
738
739        loop {
740            // Create a slice of data in the pipe buffer.
741            let pipe_slice = self.peek_buf();
742
743            // Not too many bytes in one go - better to wake the writer soon!
744            let pipe_slice = &pipe_slice[..pipe_slice.len().min(128 * 1024)];
745
746            // Copy bytes from the pipe buffer into `dest`.
747            let n = dest.write(pipe_slice)?;
748            count += n;
749
750            // If pipe is empty or `dest` is full, return.
751            if n == 0 {
752                return Poll::Ready(Ok(count));
753            }
754
755            self.consume(n);
756        }
757    }
758}
759
760#[cfg(feature = "std")]
761impl AsyncRead for Reader {
762    fn poll_read(
763        mut self: Pin<&mut Self>,
764        cx: &mut Context<'_>,
765        buf: &mut [u8],
766    ) -> Poll<io::Result<usize>> {
767        self.poll_drain_bytes(cx, buf).map(Ok)
768    }
769}
770
771#[cfg(feature = "std")]
772impl AsyncBufRead for Reader {
773    fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
774        ready!(self.poll(cx));
775        // SAFETY: Reader is `Unpin` (equivalent to the safe `Pin::into_inner` but it needs a higher MSRV)
776        let this = unsafe { self.get_unchecked_mut() };
777        Poll::Ready(Ok(this.peek_buf()))
778    }
779
780    fn consume(mut self: Pin<&mut Self>, amt: usize) {
781        (*self).consume(amt)
782    }
783}
784
785impl Writer {
786    /// Gets the total length of the data in the pipe.
787    ///
788    /// This method returns the number of bytes that have been written into the pipe but haven't been
789    /// read yet.
790    ///
791    /// # Examples
792    ///
793    /// ```
794    /// let (_reader, mut writer) = piper::pipe(10);
795    /// let _ = writer.try_fill(&[0u8; 5]);
796    /// assert_eq!(writer.len(), 5);
797    /// ```
798    pub fn len(&self) -> usize {
799        self.inner.len()
800    }
801
802    /// Tell whether or not the pipe is empty.
803    ///
804    /// This method returns `true` if the pipe is empty, and `false` otherwise.
805    ///
806    /// # Examples
807    ///
808    /// ```
809    /// let (_reader, mut writer) = piper::pipe(10);
810    /// assert!(writer.is_empty());
811    /// let _ = writer.try_fill(&[0u8; 5]);
812    /// assert!(!writer.is_empty());
813    /// ```
814    pub fn is_empty(&self) -> bool {
815        self.inner.len() == 0
816    }
817
818    /// Gets the total capacity of the pipe.
819    ///
820    /// This method returns the number of bytes that the pipe can hold at a time.
821    ///
822    /// # Examples
823    ///
824    /// ```
825    /// # futures_lite::future::block_on(async {
826    /// let (_, writer) = piper::pipe(10);
827    /// assert_eq!(writer.capacity(), 10);
828    /// # });
829    /// ```
830    pub fn capacity(&self) -> usize {
831        self.inner.cap
832    }
833
834    /// Tell whether or not the pipe is full.
835    ///
836    /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
837    /// writes will block until some data is read from the pipe.
838    ///
839    /// This method returns `true` if the pipe is full, and `false` otherwise.
840    ///
841    /// # Examples
842    ///
843    /// ```
844    /// let (mut reader, mut writer) = piper::pipe(10);
845    /// assert!(!writer.is_full());
846    /// let _ = writer.try_fill(&[0u8; 10]);
847    /// assert!(writer.is_full());
848    /// let _ = reader.try_drain(&mut [0u8; 5]);
849    /// assert!(!writer.is_full());
850    /// ```
851    pub fn is_full(&self) -> bool {
852        self.inner.len() == self.inner.cap
853    }
854
855    /// Tell whether or not the pipe is closed.
856    ///
857    /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
858    /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
859    /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
860    ///
861    /// # Examples
862    ///
863    /// ```
864    /// # futures_lite::future::block_on(async {
865    /// let (reader, writer) = piper::pipe(10);
866    /// assert!(!writer.is_closed());
867    /// drop(reader);
868    /// assert!(writer.is_closed());
869    /// # });
870    /// ```
871    pub fn is_closed(&self) -> bool {
872        self.inner.closed.load(Ordering::SeqCst)
873    }
874
875    /// Reads bytes from blocking `src` and writes into this writer.
876    ///
877    /// This method writes directly from `src` into the pipe's internal buffer. This avoids an extra copy,
878    /// but it may block the thread if `src` blocks.
879    ///
880    /// If the pipe is full, this method returns `Poll::Pending`. If the pipe is closed, this method
881    /// returns `Poll::Ready(Ok(0))`. Errors in `src` are bubbled up through `Poll::Ready(Err(e))`.
882    /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes read.
883    ///
884    /// This method is only available when the `std` feature is enabled. For `no_std` environments,
885    /// consider using [`poll_fill_bytes`] instead.
886    ///
887    /// [`poll_fill_bytes`]: #method.poll_fill_bytes
888    ///
889    /// # Examples
890    ///
891    /// ```
892    /// use futures_lite::{future, prelude::*};
893    /// # future::block_on(async {
894    ///
895    /// // Create a pipe.
896    /// let (mut reader, mut writer) = piper::pipe(1024);
897    ///
898    /// // Fill the pipe with some bytes.
899    /// let data = b"hello world";
900    /// let n = future::poll_fn(|cx| writer.poll_fill(cx, &data[..])).await.unwrap();
901    /// assert_eq!(n, data.len());
902    ///
903    /// // Read the bytes back.
904    /// let mut buf = [0; 1024];
905    /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
906    /// assert_eq!(&buf[..data.len()], data);
907    /// # });
908    /// ```
909    #[cfg(feature = "std")]
910    pub fn poll_fill(&mut self, cx: &mut Context<'_>, src: impl Read) -> Poll<io::Result<usize>> {
911        self.fill_inner(Some(cx), src)
912    }
913
914    /// Writes bytes into this writer.
915    ///
916    /// Rather than taking a `Read` trait object, this method takes a slice of bytes to read from.
917    /// Because of this, it is infallible and can be used in `no_std` environments.
918    ///
919    /// The same conditions that apply to [`poll_fill`] apply to this method.
920    ///
921    /// [`poll_fill`]: #method.poll_fill
922    ///
923    /// # Examples
924    ///
925    /// ```
926    /// use futures_lite::{future, prelude::*};
927    /// # future::block_on(async {
928    ///
929    /// // Create a pipe.
930    /// let (mut reader, mut writer) = piper::pipe(1024);
931    ///
932    /// // Fill the pipe with some bytes.
933    /// let data = b"hello world";
934    /// let n = future::poll_fn(|cx| writer.poll_fill_bytes(cx, &data[..])).await;
935    /// assert_eq!(n, data.len());
936    ///
937    /// // Read the bytes back.
938    /// let mut buf = [0; 1024];
939    /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
940    /// assert_eq!(&buf[..data.len()], data);
941    /// # });
942    /// ```
943    pub fn poll_fill_bytes(&mut self, cx: &mut Context<'_>, bytes: &[u8]) -> Poll<usize> {
944        match self.fill_inner(Some(cx), ReadBytes(bytes)) {
945            Poll::Ready(Ok(n)) => Poll::Ready(n),
946            Poll::Ready(Err(e)) => match e {},
947            Poll::Pending => Poll::Pending,
948        }
949    }
950
951    /// Tries to write bytes to this writer.
952    ///
953    /// Returns the total number of bytes that were read from this reader.
954    ///
955    /// # Examples
956    ///
957    /// ```
958    /// let (mut r, mut w) = piper::pipe(1024);
959    ///
960    /// let mut buf = [0; 10];
961    /// assert_eq!(w.try_fill(&[0, 1, 2, 3, 4]), 5);
962    /// assert_eq!(r.try_drain(&mut buf), 5);
963    /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
964    /// ```
965    pub fn try_fill(&mut self, dest: &[u8]) -> usize {
966        match self.fill_inner(None, ReadBytes(dest)) {
967            Poll::Ready(Ok(n)) => n,
968            Poll::Ready(Err(e)) => match e {},
969            Poll::Pending => 0,
970        }
971    }
972
973    /// Get the free space in bytes that can be written without synchronization.
974    #[inline]
975    fn available_space(&self) -> usize {
976        let a = self.head;
977        let b = self.tail;
978        if a <= b {
979            self.inner.cap - (b - a)
980        } else {
981            (a - b) - self.inner.cap
982        }
983    }
984
985    /// Wait for available space in the pipe or the read side to be closed.
986    ///
987    /// Returns `Poll::Ready(true)` when space is available, or `Poll::Ready(false)` if the pipe is closed.
988    #[inline]
989    fn poll_inner(&mut self, cx: Option<&mut Context<'_>>) -> Poll<bool> {
990        // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
991        if self.inner.closed.load(Ordering::Relaxed) {
992            return Poll::Ready(false);
993        }
994
995        // If the pipe appears to be full...
996        if self.available_space() == 0 {
997            // Reload the head in case it's become stale.
998            self.head = self.inner.head.load(Ordering::Acquire);
999
1000            // If the pipe is now really still full...
1001            if self.available_space() == 0 {
1002                // Register the waker.
1003                if let Some(cx) = cx {
1004                    self.inner.writer.register(cx.waker());
1005                }
1006                atomic::fence(Ordering::SeqCst);
1007
1008                // Reload the head after registering the waker.
1009                self.head = self.inner.head.load(Ordering::Acquire);
1010
1011                // If the pipe is still full...
1012                if self.available_space() == 0 {
1013                    // Check whether the pipe is closed or just full.
1014                    if self.inner.closed.load(Ordering::Relaxed) {
1015                        return Poll::Ready(false);
1016                    } else {
1017                        return Poll::Pending;
1018                    }
1019                }
1020            }
1021        }
1022
1023        // The pipe is not full so remove the waker.
1024        self.inner.writer.take();
1025
1026        Poll::Ready(true)
1027    }
1028
1029    /// Poll for available space in the pipe or the read side to be closed.
1030    ///
1031    /// Returns `Poll::Ready(true)` when space is available to write. Call
1032    /// [`write_buf()`][Self::write_buf] to obtain a mutable slice of the buffer
1033    /// to write into, then call [`produced(n)`][Self::produced] once the data
1034    /// is written.
1035    ///
1036    /// A return value of `Poll::Ready(false)` indicates that the pipe is
1037    /// closed.
1038    ///
1039    /// If no space is available, this method will return `Poll::Pending` and
1040    /// register the waker to receive a notification when space becomes
1041    /// available or the read end is closed.
1042    ///
1043    /// # Example
1044    ///
1045    /// ```
1046    /// use futures_lite::{future, prelude::*};
1047    ///
1048    /// # future::block_on(async {
1049    /// let (mut r, mut w) = piper::pipe(1024);
1050    ///
1051    /// future::poll_fn(|cx| w.poll(cx)).await;
1052    /// let data = b"hello world";
1053    /// let mut remaining = &data[..];
1054    ///
1055    /// while !remaining.is_empty() {
1056    ///     let buf = w.write_buf(remaining.len());
1057    ///     let n = buf.len();
1058    ///     buf[..n].copy_from_slice(&remaining[..n]);
1059    ///     w.produced(n);
1060    ///     remaining = &remaining[n..];
1061    /// }
1062    ///
1063    /// // Read the bytes back.
1064    /// let mut buf = [0; 64];
1065    /// r.read_exact(&mut buf[..data.len()]).await.unwrap();
1066    /// assert_eq!(&buf[..data.len()], b"hello world");
1067    /// # });
1068    /// ```
1069    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<bool> {
1070        self.poll_inner(Some(cx))
1071    }
1072
1073    /// Reads bytes from blocking `src` and writes into this writer.
1074    #[inline]
1075    fn fill_inner<R: ReadLike>(
1076        &mut self,
1077        mut cx: Option<&mut Context<'_>>,
1078        mut src: R,
1079    ) -> Poll<Result<usize, R::Error>> {
1080        if !ready!(self.poll_inner(cx.as_mut().map(|c| &mut **c))) {
1081            return Poll::Ready(Ok(0));
1082        }
1083
1084        // Yield with some small probability - this improves fairness.
1085        if let Some(cx) = cx {
1086            ready!(maybe_yield(&mut self.rng, cx));
1087        }
1088
1089        // Number of bytes written so far.
1090        let mut count = 0;
1091
1092        loop {
1093            // Not too many bytes in one go - better to wake the reader soon!
1094            let pipe_slice_mut = self.write_buf(128 * 1024);
1095
1096            // Copy bytes from `src` into the piper buffer.
1097            let n = src.read(pipe_slice_mut)?;
1098            count += n;
1099
1100            // If the pipe is full or closed, or `src` is empty, return.
1101            if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
1102                return Poll::Ready(Ok(count));
1103            }
1104
1105            self.produced(n);
1106        }
1107    }
1108
1109    /// Get a mutable slice of the pipe's internal buffer that can be written to.
1110    ///
1111    /// The contents of the slice are initialized but unspecified and you should
1112    /// not read from it or assume its contents are zeroed.
1113    ///
1114    /// The `max` parameter is an upper bound on the size of the slice returned,
1115    /// limiting the number of bytes that will be initialized when using the
1116    /// buffer for the first time.
1117    ///
1118    /// After writing to the buffer, you should call [`produced(n)`] to notify the
1119    /// pipe that `n` bytes have been written to the buffer and make them available
1120    /// to the reader.
1121    pub fn write_buf(&mut self, max: usize) -> &mut [u8] {
1122        let n = max
1123            .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
1124            .min(self.available_space()) // No more than space in the pipe.
1125            .min(self.inner.cap - self.inner.real_index(self.tail)); // Don't go past the buffer boundary.
1126
1127        // Create a slice of available space in the pipe buffer.
1128        unsafe {
1129            let from = self.inner.real_index(self.tail);
1130            let to = from + n;
1131
1132            // Make sure all bytes in the slice are initialized.
1133            if self.zeroed_until < to {
1134                self.inner
1135                    .buffer
1136                    .add(self.zeroed_until)
1137                    .write_bytes(0u8, to - self.zeroed_until);
1138                self.zeroed_until = to;
1139            }
1140
1141            slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
1142        }
1143    }
1144
1145    /// Notify the pipe that `n` bytes have been written to the buffer returned by `write_buf()`.
1146    ///
1147    /// ## Panics
1148    ///   * if `n` is greater than the size of the buffer returned by `write_buf()`.
1149    pub fn produced(&mut self, n: usize) {
1150        assert!(
1151            n <= self.available_space()
1152                && (self.zeroed_until == self.inner.cap || self.tail + n <= self.zeroed_until)
1153                && self.tail + n <= 2 * self.inner.cap,
1154            "cannot write more bytes than available space"
1155        );
1156
1157        // Move the tail forward.
1158        if self.tail + n < 2 * self.inner.cap {
1159            self.tail += n;
1160        } else {
1161            self.tail = 0;
1162        }
1163
1164        // Store the current tail index.
1165        self.inner.tail.store(self.tail, Ordering::Release);
1166
1167        // Wake the reader because the pipe is not empty.
1168        self.inner.reader.wake();
1169    }
1170}
1171
1172#[cfg(feature = "std")]
1173impl AsyncWrite for Writer {
1174    fn poll_write(
1175        mut self: Pin<&mut Self>,
1176        cx: &mut Context<'_>,
1177        buf: &[u8],
1178    ) -> Poll<io::Result<usize>> {
1179        self.poll_fill_bytes(cx, buf).map(Ok)
1180    }
1181
1182    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1183        // Nothing to flush.
1184        Poll::Ready(Ok(()))
1185    }
1186
1187    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1188        // Set the closed flag.
1189        self.inner.closed.store(true, Ordering::Release);
1190
1191        // Wake up any tasks that may be waiting on the pipe.
1192        self.inner.reader.wake();
1193        self.inner.writer.wake();
1194
1195        // The pipe is now closed.
1196        Poll::Ready(Ok(()))
1197    }
1198}
1199
1200/// A trait for reading bytes into a pipe.
1201trait ReadLike {
1202    /// The error type.
1203    type Error;
1204
1205    /// Reads bytes into the given buffer.
1206    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
1207}
1208
1209#[cfg(feature = "std")]
1210impl<R: Read> ReadLike for R {
1211    type Error = io::Error;
1212
1213    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1214        Read::read(self, buf)
1215    }
1216}
1217
1218/// Implements `no_std` reading around a byte slice.
1219struct ReadBytes<'a>(&'a [u8]);
1220
1221impl ReadLike for ReadBytes<'_> {
1222    type Error = Infallible;
1223
1224    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1225        let n = self.0.len().min(buf.len());
1226        buf[..n].copy_from_slice(&self.0[..n]);
1227        self.0 = &self.0[n..];
1228        Ok(n)
1229    }
1230}
1231
1232/// A trait for writing bytes from a pipe.
1233trait WriteLike {
1234    /// The error type.
1235    type Error;
1236
1237    /// Writes bytes from the given buffer.
1238    fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
1239}
1240
1241#[cfg(feature = "std")]
1242impl<W: Write> WriteLike for W {
1243    type Error = io::Error;
1244
1245    fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1246        Write::write(self, buf)
1247    }
1248}
1249
1250/// Implements `no_std` writing around a byte slice.
1251struct WriteBytes<'a>(&'a mut [u8]);
1252
1253impl WriteLike for WriteBytes<'_> {
1254    type Error = Infallible;
1255
1256    fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1257        let n = self.0.len().min(buf.len());
1258        self.0[..n].copy_from_slice(&buf[..n]);
1259
1260        // mem::take() is not available on 1.36
1261        #[allow(clippy::mem_replace_with_default)]
1262        {
1263            let slice = mem::replace(&mut self.0, &mut []);
1264            self.0 = &mut slice[n..];
1265        }
1266
1267        Ok(n)
1268    }
1269}
1270
1271/// Yield with some small probability.
1272fn maybe_yield(rng: &mut fastrand::Rng, cx: &mut Context<'_>) -> Poll<()> {
1273    if rng.usize(..100) == 0 {
1274        cx.waker().wake_by_ref();
1275        Poll::Pending
1276    } else {
1277        Poll::Ready(())
1278    }
1279}
1280
1281/// Get a random number generator.
1282#[cfg(feature = "std")]
1283#[inline]
1284fn rng() -> fastrand::Rng {
1285    fastrand::Rng::new()
1286}
1287
1288/// Get a random number generator.
1289///
1290/// This uses a fixed seed due to the lack of a good RNG in `no_std` environments.
1291#[cfg(not(feature = "std"))]
1292#[inline]
1293fn rng() -> fastrand::Rng {
1294    // Chosen by fair roll of the dice.
1295    fastrand::Rng::with_seed(0x7e9b496634c97ec6)
1296}
1297
1298/// ```
1299/// use piper::{Reader, Writer};
1300/// fn _send_sync<T: Send + Sync>() {}
1301/// _send_sync::<Reader>();
1302/// _send_sync::<Writer>();
1303/// ```
1304fn _assert_send_sync() {}
1305
1306mod sync {
1307    #[cfg(not(feature = "portable-atomic"))]
1308    pub use core::sync::atomic;
1309
1310    #[cfg(not(feature = "portable-atomic"))]
1311    pub use alloc::sync::Arc;
1312
1313    #[cfg(feature = "portable-atomic")]
1314    pub use portable_atomic_crate as atomic;
1315
1316    #[cfg(feature = "portable-atomic")]
1317    pub use portable_atomic_util::Arc;
1318}