piper/lib.rs
1//! A bounded single-producer single-consumer pipe.
2//!
3//! This crate provides a ring buffer that can be asynchronously read from and written to. It is
4//! created via the [`pipe`] function, which returns a pair of [`Reader`] and [`Writer`] handles.
5//! They implement the [`AsyncRead`] and [`AsyncWrite`] traits, respectively.
6//!
7//! The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need `&mut`
8//! access to read or write to them. If multiple-producer/multiple-consumer handles are needed,
9//! consider wrapping them in an `Arc<Mutex<...>>` or similar.
10//!
11//! When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
12//! to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
13//!
14//! When the receiver is dropped, the pipe is closed and no more bytes can be written into it.
15//! Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
16//!
17//! # Version 0.2.0 Notes
18//!
19//! Previously, this crate contained other synchronization primitives, such as bounded channels, locks,
20//! and event listeners. These have been split out into their own crates:
21//!
22//! - [`async-channel`](https://docs.rs/async-channel)
23//! - [`async-dup`](https://docs.rs/async-dup)
24//! - [`async-lock`](https://docs.rs/async-lock)
25//! - [`async-mutex`](https://docs.rs/async-mutex)
26//! - [`event-listener`](https://docs.rs/event-listener)
27//!
28//! # Examples
29//!
30//! ## Asynchronous Tasks
31//!
32//! Communicate between asynchronous tasks, potentially on other threads.
33//!
34//! ```
35//! use async_channel::unbounded;
36//! use async_executor::Executor;
37//! use easy_parallel::Parallel;
38//! use futures_lite::{future, prelude::*};
39//! use std::time::Duration;
40//!
41//! # if cfg!(miri) { return; }
42//!
43//! // Create a pair of handles.
44//! let (mut reader, mut writer) = piper::pipe(1024);
45//!
46//! // Create the executor.
47//! let ex = Executor::new();
48//! let (signal, shutdown) = unbounded::<()>();
49//!
50//! // Spawn a detached task for random data to the pipe.
51//! let writer = ex.spawn(async move {
52//! for _ in 0..1_000 {
53//! // Generate 8 random numnbers.
54//! let random = fastrand::u64(..).to_le_bytes();
55//!
56//! // Write them to the pipe.
57//! writer.write_all(&random).await.unwrap();
58//!
59//! // Wait a bit.
60//! async_io::Timer::after(Duration::from_millis(5)).await;
61//! }
62//!
63//! // Drop the writer to close the pipe.
64//! drop(writer);
65//! });
66//!
67//! // Detach the task so that it runs in the background.
68//! writer.detach();
69//!
70//! // Spawn a task for reading from the pipe.
71//! let reader = ex.spawn(async move {
72//! let mut buf = vec![];
73//!
74//! // Read all bytes from the pipe.
75//! reader.read_to_end(&mut buf).await.unwrap();
76//!
77//! println!("Random data: {:#?}", buf);
78//! });
79//!
80//! Parallel::new()
81//! // Run four executor threads.
82//! .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
83//! // Run the main future on the current thread.
84//! .finish(|| future::block_on(async {
85//! // Wait for the reader to finish.
86//! reader.await;
87//!
88//! // Signal the executor threads to shut down.
89//! drop(signal);
90//! }));
91//! ```
92//!
93//! ## Blocking I/O
94//!
95//! File I/O is blocking; therefore, in `async` code, you must run it on another thread. This example
96//! spawns another thread for reading a file and writing it to a pipe.
97//!
98//! ```no_run
99//! use futures_lite::{future, prelude::*};
100//! use std::fs::File;
101//! use std::io::prelude::*;
102//! use std::thread;
103//!
104//! // Create a pair of handles.
105//! let (mut r, mut w) = piper::pipe(1024);
106//!
107//! // Spawn a thread for reading a file.
108//! thread::spawn(move || {
109//! let mut file = File::open("Cargo.toml").unwrap();
110//!
111//! // Read the file into a buffer.
112//! let mut buf = [0u8; 16384];
113//! future::block_on(async move {
114//! loop {
115//! // Read a chunk of bytes from the file.
116//! // Blocking is okay here, since this is a separate thread.
117//! let n = file.read(&mut buf).unwrap();
118//! if n == 0 {
119//! break;
120//! }
121//!
122//! // Write the chunk to the pipe.
123//! w.write_all(&buf[..n]).await.unwrap();
124//! }
125//!
126//! // Close the pipe.
127//! drop(w);
128//! });
129//! });
130//!
131//! # future::block_on(async move {
132//! // Read bytes from the pipe.
133//! let mut buf = vec![];
134//! r.read_to_end(&mut buf).await.unwrap();
135//!
136//! println!("Read {} bytes", buf.len());
137//! # });
138//! ```
139//!
140//! However, the lower-level [`poll_fill`] and [`poll_drain`] methods take `impl Read` and `impl Write`
141//! arguments, respectively. This allows you to skip the buffer entirely and read/write directly from
142//! the file into the pipe. This approach should be preferred when possible, as it avoids an extra
143//! copy.
144//!
145//! ```no_run
146//! # use futures_lite::future;
147//! # use std::fs::File;
148//! # let mut file: File = unimplemented!();
149//! # let mut w: piper::Writer = unimplemented!();
150//! // In the `future::block_on` call above...
151//! # future::block_on(async move {
152//! loop {
153//! let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
154//! if n == 0 {
155//! break;
156//! }
157//! }
158//! # });
159//! ```
160//!
161//! The [`blocking`] crate is preferred in this use case, since it uses more efficient strategies for
162//! thread management and pipes.
163//!
164//! [`poll_fill`]: struct.Writer.html#method.poll_fill
165//! [`poll_drain`]: struct.Reader.html#method.poll_drain
166//! [`blocking`]: https://docs.rs/blocking
167
168#![cfg_attr(not(feature = "std"), no_std)]
169#![forbid(missing_docs)]
170#![doc(
171 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
172)]
173#![doc(
174 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
175)]
176
177extern crate alloc;
178
179use core::convert::Infallible;
180use core::mem;
181use core::slice;
182use core::task::{Context, Poll};
183
184use alloc::vec::Vec;
185
186use sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
187use sync::Arc;
188
189#[cfg(feature = "std")]
190use std::{
191 io::{self, Read, Write},
192 pin::Pin,
193};
194
195use atomic_waker::AtomicWaker;
196
197#[cfg(feature = "std")]
198use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
199
200macro_rules! ready {
201 ($e:expr) => {{
202 match $e {
203 Poll::Ready(t) => t,
204 Poll::Pending => return Poll::Pending,
205 }
206 }};
207}
208
209/// Creates a bounded single-producer single-consumer pipe.
210///
211/// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
212///
213/// See the [crate-level documentation](index.html) for more details.
214///
215/// # Panics
216///
217/// This function panics if `cap` is 0 or if `cap * 2` overflows a `usize`.
218#[allow(clippy::incompatible_msrv)] // false positive: https://github.com/rust-lang/rust-clippy/issues/12280
219pub fn pipe(cap: usize) -> (Reader, Writer) {
220 assert!(cap > 0, "capacity must be positive");
221 assert!(cap.checked_mul(2).is_some(), "capacity is too large");
222
223 // Allocate the ring buffer.
224 let mut v = Vec::with_capacity(cap);
225 let buffer = v.as_mut_ptr();
226 mem::forget(v);
227
228 let inner = Arc::new(Pipe {
229 head: AtomicUsize::new(0),
230 tail: AtomicUsize::new(0),
231 reader: AtomicWaker::new(),
232 writer: AtomicWaker::new(),
233 closed: AtomicBool::new(false),
234 buffer,
235 cap,
236 });
237
238 // Use a random number generator to randomize fair yielding behavior.
239 let mut rng = rng();
240
241 let r = Reader {
242 inner: inner.clone(),
243 head: 0,
244 tail: 0,
245 rng: rng.fork(),
246 };
247
248 let w = Writer {
249 inner,
250 head: 0,
251 tail: 0,
252 zeroed_until: 0,
253 rng,
254 };
255
256 (r, w)
257}
258
259/// The reading side of a pipe.
260///
261/// This type is created by the [`pipe`] function. See its documentation for more details.
262pub struct Reader {
263 /// The inner ring buffer.
264 inner: Arc<Pipe>,
265
266 /// The head index, moved by the reader, in the range `0..2*cap`.
267 ///
268 /// This index always matches `inner.head`.
269 head: usize,
270
271 /// The tail index, moved by the writer, in the range `0..2*cap`.
272 ///
273 /// This index is a snapshot of `index.tail` that might become stale at any point.
274 tail: usize,
275
276 /// Random number generator.
277 rng: fastrand::Rng,
278}
279
280/// The writing side of a pipe.
281///
282/// This type is created by the [`pipe`] function. See its documentation for more details.
283pub struct Writer {
284 /// The inner ring buffer.
285 inner: Arc<Pipe>,
286
287 /// The head index, moved by the reader, in the range `0..2*cap`.
288 ///
289 /// This index is a snapshot of `index.head` that might become stale at any point.
290 head: usize,
291
292 /// The tail index, moved by the writer, in the range `0..2*cap`.
293 ///
294 /// This index always matches `inner.tail`.
295 tail: usize,
296
297 /// How many bytes at the beginning of the buffer have been zeroed.
298 ///
299 /// The pipe allocates an uninitialized buffer, and we must be careful about passing
300 /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
301 /// expensive, so we zero it in smaller chunks as the writer makes progress.
302 zeroed_until: usize,
303
304 /// Random number generator.
305 rng: fastrand::Rng,
306}
307
308/// The inner ring buffer.
309///
310/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
311/// `0..cap` range. The distance between head and tail indices is never more than `cap`.
312///
313/// The reason why indices are not in the range `0..cap` is because we need to distinguish between
314/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
315/// could mean the pipe is either empty or full, but we don't know which!
316struct Pipe {
317 /// The head index, moved by the reader, in the range `0..2*cap`.
318 head: AtomicUsize,
319
320 /// The tail index, moved by the writer, in the range `0..2*cap`.
321 tail: AtomicUsize,
322
323 /// A waker representing the blocked reader.
324 reader: AtomicWaker,
325
326 /// A waker representing the blocked writer.
327 writer: AtomicWaker,
328
329 /// Set to `true` if the reader or writer was dropped.
330 closed: AtomicBool,
331
332 /// The byte buffer.
333 buffer: *mut u8,
334
335 /// The buffer capacity.
336 cap: usize,
337}
338
339unsafe impl Sync for Pipe {}
340unsafe impl Send for Pipe {}
341
342impl Drop for Pipe {
343 fn drop(&mut self) {
344 // Deallocate the byte buffer.
345 unsafe {
346 Vec::from_raw_parts(self.buffer, 0, self.cap);
347 }
348 }
349}
350
351impl Drop for Reader {
352 fn drop(&mut self) {
353 // Dropping closes the pipe and then wakes the writer.
354 self.inner.closed.store(true, Ordering::SeqCst);
355 self.inner.writer.wake();
356 }
357}
358
359impl Drop for Writer {
360 fn drop(&mut self) {
361 // Dropping closes the pipe and then wakes the reader.
362 self.inner.closed.store(true, Ordering::SeqCst);
363 self.inner.reader.wake();
364 }
365}
366
367impl Pipe {
368 /// Get the length of the data in the pipe.
369 fn len(&self) -> usize {
370 let head = self.head.load(Ordering::Acquire);
371 let tail = self.tail.load(Ordering::Acquire);
372
373 if head <= tail {
374 tail - head
375 } else {
376 (2 * self.cap) - (head - tail)
377 }
378 }
379
380 /// Given an index in `0..2*cap`, returns the real index in `0..cap`.
381 #[inline]
382 fn real_index(&self, i: usize) -> usize {
383 if i < self.cap {
384 i
385 } else {
386 i - self.cap
387 }
388 }
389}
390
391impl Reader {
392 /// Gets the total length of the data in the pipe.
393 ///
394 /// This method returns the number of bytes that have been written into the pipe but haven't been
395 /// read yet.
396 ///
397 /// # Examples
398 ///
399 /// ```
400 /// let (mut reader, mut writer) = piper::pipe(10);
401 /// let _ = writer.try_fill(&[0u8; 5]);
402 /// assert_eq!(reader.len(), 5);
403 /// ```
404 pub fn len(&self) -> usize {
405 self.inner.len()
406 }
407
408 /// Tell whether or not the pipe is empty.
409 ///
410 /// This method returns `true` if the pipe is empty, and `false` otherwise.
411 ///
412 /// # Examples
413 ///
414 /// ```
415 /// let (mut reader, mut writer) = piper::pipe(10);
416 /// assert!(reader.is_empty());
417 /// let _ = writer.try_fill(&[0u8; 5]);
418 /// assert!(!reader.is_empty());
419 /// ```
420 pub fn is_empty(&self) -> bool {
421 self.inner.len() == 0
422 }
423
424 /// Gets the total capacity of the pipe.
425 ///
426 /// This method returns the number of bytes that the pipe can hold at a time.
427 ///
428 /// # Examples
429 ///
430 /// ```
431 /// # futures_lite::future::block_on(async {
432 /// let (reader, _) = piper::pipe(10);
433 /// assert_eq!(reader.capacity(), 10);
434 /// # });
435 /// ```
436 pub fn capacity(&self) -> usize {
437 self.inner.cap
438 }
439
440 /// Tell whether or not the pipe is full.
441 ///
442 /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
443 /// writes will block until some data is read from the pipe.
444 ///
445 /// This method returns `true` if the pipe is full, and `false` otherwise.
446 ///
447 /// # Examples
448 ///
449 /// ```
450 /// let (mut reader, mut writer) = piper::pipe(10);
451 /// assert!(!reader.is_full());
452 /// let _ = writer.try_fill(&[0u8; 10]);
453 /// assert!(reader.is_full());
454 /// let _ = reader.try_drain(&mut [0u8; 5]);
455 /// assert!(!reader.is_full());
456 /// ```
457 pub fn is_full(&self) -> bool {
458 self.inner.len() == self.inner.cap
459 }
460
461 /// Tell whether or not the pipe is closed.
462 ///
463 /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
464 /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
465 /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
466 ///
467 /// # Examples
468 ///
469 /// ```
470 /// # futures_lite::future::block_on(async {
471 /// let (mut reader, mut writer) = piper::pipe(10);
472 /// assert!(!reader.is_closed());
473 /// drop(writer);
474 /// assert!(reader.is_closed());
475 /// # });
476 /// ```
477 pub fn is_closed(&self) -> bool {
478 self.inner.closed.load(Ordering::SeqCst)
479 }
480
481 /// Reads bytes from this reader and writes into blocking `dest`.
482 ///
483 /// This method reads directly from the pipe's internal buffer into `dest`. This avoids an extra copy,
484 /// but it may block the thread if `dest` blocks.
485 ///
486 /// If the pipe is empty, this method returns `Poll::Pending`. If the pipe is closed, this method
487 /// returns `Poll::Ready(Ok(0))`. Errors in `dest` are bubbled up through `Poll::Ready(Err(e))`.
488 /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes written.
489 ///
490 /// This method is only available when the `std` feature is enabled. For `no_std` environments,
491 /// consider using [`poll_drain_bytes`] instead.
492 ///
493 /// [`poll_drain_bytes`]: #method.poll_drain_bytes
494 ///
495 /// # Examples
496 ///
497 /// ```
498 /// use futures_lite::{future, prelude::*};
499 /// # future::block_on(async {
500 ///
501 /// let (mut r, mut w) = piper::pipe(1024);
502 ///
503 /// // Write some data to the pipe.
504 /// w.write_all(b"hello world").await.unwrap();
505 ///
506 /// // Try reading from the pipe.
507 /// let mut buf = [0; 1024];
508 /// let n = future::poll_fn(|cx| r.poll_drain(cx, &mut buf[..])).await.unwrap();
509 ///
510 /// // The data was written to the buffer.
511 /// assert_eq!(&buf[..n], b"hello world");
512 /// # });
513 /// ```
514 #[cfg(feature = "std")]
515 pub fn poll_drain(
516 &mut self,
517 cx: &mut Context<'_>,
518 dest: impl Write,
519 ) -> Poll<io::Result<usize>> {
520 self.drain_inner(Some(cx), dest)
521 }
522
523 /// Reads bytes from this reader.
524 ///
525 /// Rather than taking a `Write` trait object, this method takes a slice of bytes to write into.
526 /// Because of this, it is infallible and can be used in `no_std` environments.
527 ///
528 /// The same conditions that apply to [`poll_drain`] apply to this method.
529 ///
530 /// [`poll_drain`]: #method.poll_drain
531 ///
532 /// # Examples
533 ///
534 /// ```
535 /// use futures_lite::{future, prelude::*};
536 /// # future::block_on(async {
537 /// let (mut r, mut w) = piper::pipe(1024);
538 ///
539 /// // Write some data to the pipe.
540 /// w.write_all(b"hello world").await.unwrap();
541 ///
542 /// // Try reading from the pipe.
543 /// let mut buf = [0; 1024];
544 /// let n = future::poll_fn(|cx| r.poll_drain_bytes(cx, &mut buf[..])).await;
545 ///
546 /// // The data was written to the buffer.
547 /// assert_eq!(&buf[..n], b"hello world");
548 /// # });
549 /// ```
550 pub fn poll_drain_bytes(&mut self, cx: &mut Context<'_>, dest: &mut [u8]) -> Poll<usize> {
551 match self.drain_inner(Some(cx), WriteBytes(dest)) {
552 Poll::Ready(Ok(n)) => Poll::Ready(n),
553 Poll::Ready(Err(e)) => match e {},
554 Poll::Pending => Poll::Pending,
555 }
556 }
557
558 /// Poll for data to become available in the pipe or the write side to be closed.
559 ///
560 /// Returns `Poll::Ready(true)` when data is ready. Call
561 /// [`peek_buf()`][Self::peek_buf] to access the data, and
562 /// [`consume()`][Self::consume] to advance the read position.
563 ///
564 /// A return value of `Poll::Ready(false)` indicates that the pipe is closed.
565 ///
566 /// If no data is available, this method will return `Poll::Pending` and register the waker
567 /// to receive a notification when data is written to the pipe or the write end is closed.
568 ///
569 /// Unlike `AsyncBufRead::poll_fill_buf` method, this method is infallible and does not
570 /// require the `std` feature. It separates the polling, buffer access, and consume steps
571 /// for compatibility with `poll_fn`'s lifetime requirements.
572 ///
573 /// # Example
574 ///
575 /// ```
576 /// use futures_lite::{future, prelude::*};
577 ///
578 /// # future::block_on(async {
579 /// let (mut r, mut w) = piper::pipe(1024);
580 ///
581 /// // Write some data to the pipe.
582 /// w.write_all(b"hello world").await.unwrap();
583 ///
584 /// future::poll_fn(|cx| r.poll(cx)).await;
585 /// let buf = r.peek_buf();
586 /// assert_eq!(buf, &b"hello world"[..buf.len()]);
587 ///
588 /// // Consume one byte
589 /// r.consume(1);
590 ///
591 /// future::poll_fn(|cx| r.poll(cx)).await;
592 /// let buf = r.peek_buf();
593 /// assert_eq!(buf, &b"ello world"[..buf.len()]);
594 ///
595 /// r.consume(buf.len());
596 /// # });
597 /// ```
598 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<bool> {
599 self.poll_available(Some(cx))
600 }
601
602 /// Return the contents of the internal buffer that are available immediately.
603 ///
604 /// Call [`Self::consume()`] to consume the bytes returned by this method. The buffer might
605 /// not re-fill until another call to [`Self::poll()`] returns `Poll::Ready`.
606 pub fn peek_buf(&self) -> &[u8] {
607 let n = self
608 .available_data() // No more than bytes in the pipe.
609 .min(self.inner.cap - self.inner.real_index(self.head)); // Don't go past the buffer boundary.
610
611 unsafe { slice::from_raw_parts(self.inner.buffer.add(self.inner.real_index(self.head)), n) }
612 }
613
614 /// Consume `amt` bytes from the pipe.
615 ///
616 /// Panics if `amt` is greater than the length of the buffer returned by [`Self::peek_buf()`].
617 pub fn consume(&mut self, amt: usize) {
618 let cap = self.inner.cap;
619
620 assert!(
621 amt <= self.available_data() && self.head + amt <= 2 * cap,
622 "cannot consume more bytes than available in the pipe"
623 );
624
625 // Move the head forward.
626 if self.head + amt < 2 * cap {
627 self.head += amt;
628 } else {
629 self.head = 0;
630 }
631
632 // Store the current head index.
633 self.inner.head.store(self.head, Ordering::Release);
634
635 // Wake the writer because the pipe is not full.
636 self.inner.writer.wake();
637 }
638
639 /// Tries to read bytes from this reader.
640 ///
641 /// Returns the total number of bytes that were read from this reader.
642 ///
643 /// # Examples
644 ///
645 /// ```
646 /// let (mut r, mut w) = piper::pipe(1024);
647 ///
648 /// // `try_drain()` returns 0 off the bat.
649 /// let mut buf = [0; 10];
650 /// assert_eq!(r.try_drain(&mut buf), 0);
651 ///
652 /// // After a write it returns the data.
653 /// w.try_fill(&[0, 1, 2, 3, 4]);
654 /// assert_eq!(r.try_drain(&mut buf), 5);
655 /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
656 /// ```
657 pub fn try_drain(&mut self, dest: &mut [u8]) -> usize {
658 match self.drain_inner(None, WriteBytes(dest)) {
659 Poll::Ready(Ok(n)) => n,
660 Poll::Ready(Err(e)) => match e {},
661 Poll::Pending => 0,
662 }
663 }
664
665 /// Get the number of bytes available to read without synchronization.
666 #[inline]
667 fn available_data(&self) -> usize {
668 let a = self.head;
669 let b = self.tail;
670 if a <= b {
671 b - a
672 } else {
673 2 * self.inner.cap - (a - b)
674 }
675 }
676
677 /// Poll for available data or end of stream.
678 ///
679 /// Returns `Poll::Ready(true)` if data is available, `Poll::Ready(false)` if the pipe is closed,
680 /// or `Poll::Pending` if the pipe is empty and the waker has been registered.
681 fn poll_available(&mut self, mut cx: Option<&mut Context<'_>>) -> Poll<bool> {
682 // If the pipe appears to be empty...
683 if self.available_data() == 0 {
684 // Reload the tail in case it's become stale.
685 self.tail = self.inner.tail.load(Ordering::Acquire);
686
687 // If the pipe is now really empty...
688 if self.available_data() == 0 {
689 // Register the waker.
690 if let Some(cx) = cx.as_mut() {
691 self.inner.reader.register(cx.waker());
692 }
693 atomic::fence(Ordering::SeqCst);
694
695 // Load whether the channel is closed or not early, so that we don't miss any writes
696 // between updating the tail and checking for close.
697 let is_closed = self.inner.closed.load(Ordering::Acquire);
698
699 // Reload the tail after registering the waker.
700 self.tail = self.inner.tail.load(Ordering::Acquire);
701
702 // If the pipe is still empty...
703 if self.available_data() == 0 {
704 // Check whether the pipe is closed or just empty.
705 if is_closed {
706 return Poll::Ready(false);
707 } else {
708 return Poll::Pending;
709 }
710 }
711 }
712 }
713
714 // The pipe is not empty so remove the waker.
715 self.inner.reader.take();
716 Poll::Ready(true)
717 }
718
719 /// Reads bytes from this reader and writes into blocking `dest`.
720 #[inline]
721 fn drain_inner<W: WriteLike>(
722 &mut self,
723 mut cx: Option<&mut Context<'_>>,
724 mut dest: W,
725 ) -> Poll<Result<usize, W::Error>> {
726 if !ready!(self.poll_available(cx.as_mut().map(|c| &mut **c))) {
727 // The pipe is closed
728 return Poll::Ready(Ok(0));
729 }
730
731 // Yield with some small probability - this improves fairness.
732 if let Some(cx) = cx {
733 ready!(maybe_yield(&mut self.rng, cx));
734 }
735
736 // Number of bytes read so far.
737 let mut count = 0;
738
739 loop {
740 // Create a slice of data in the pipe buffer.
741 let pipe_slice = self.peek_buf();
742
743 // Not too many bytes in one go - better to wake the writer soon!
744 let pipe_slice = &pipe_slice[..pipe_slice.len().min(128 * 1024)];
745
746 // Copy bytes from the pipe buffer into `dest`.
747 let n = dest.write(pipe_slice)?;
748 count += n;
749
750 // If pipe is empty or `dest` is full, return.
751 if n == 0 {
752 return Poll::Ready(Ok(count));
753 }
754
755 self.consume(n);
756 }
757 }
758}
759
760#[cfg(feature = "std")]
761impl AsyncRead for Reader {
762 fn poll_read(
763 mut self: Pin<&mut Self>,
764 cx: &mut Context<'_>,
765 buf: &mut [u8],
766 ) -> Poll<io::Result<usize>> {
767 self.poll_drain_bytes(cx, buf).map(Ok)
768 }
769}
770
771#[cfg(feature = "std")]
772impl AsyncBufRead for Reader {
773 fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
774 ready!(self.poll(cx));
775 // SAFETY: Reader is `Unpin` (equivalent to the safe `Pin::into_inner` but it needs a higher MSRV)
776 let this = unsafe { self.get_unchecked_mut() };
777 Poll::Ready(Ok(this.peek_buf()))
778 }
779
780 fn consume(mut self: Pin<&mut Self>, amt: usize) {
781 (*self).consume(amt)
782 }
783}
784
785impl Writer {
786 /// Gets the total length of the data in the pipe.
787 ///
788 /// This method returns the number of bytes that have been written into the pipe but haven't been
789 /// read yet.
790 ///
791 /// # Examples
792 ///
793 /// ```
794 /// let (_reader, mut writer) = piper::pipe(10);
795 /// let _ = writer.try_fill(&[0u8; 5]);
796 /// assert_eq!(writer.len(), 5);
797 /// ```
798 pub fn len(&self) -> usize {
799 self.inner.len()
800 }
801
802 /// Tell whether or not the pipe is empty.
803 ///
804 /// This method returns `true` if the pipe is empty, and `false` otherwise.
805 ///
806 /// # Examples
807 ///
808 /// ```
809 /// let (_reader, mut writer) = piper::pipe(10);
810 /// assert!(writer.is_empty());
811 /// let _ = writer.try_fill(&[0u8; 5]);
812 /// assert!(!writer.is_empty());
813 /// ```
814 pub fn is_empty(&self) -> bool {
815 self.inner.len() == 0
816 }
817
818 /// Gets the total capacity of the pipe.
819 ///
820 /// This method returns the number of bytes that the pipe can hold at a time.
821 ///
822 /// # Examples
823 ///
824 /// ```
825 /// # futures_lite::future::block_on(async {
826 /// let (_, writer) = piper::pipe(10);
827 /// assert_eq!(writer.capacity(), 10);
828 /// # });
829 /// ```
830 pub fn capacity(&self) -> usize {
831 self.inner.cap
832 }
833
834 /// Tell whether or not the pipe is full.
835 ///
836 /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
837 /// writes will block until some data is read from the pipe.
838 ///
839 /// This method returns `true` if the pipe is full, and `false` otherwise.
840 ///
841 /// # Examples
842 ///
843 /// ```
844 /// let (mut reader, mut writer) = piper::pipe(10);
845 /// assert!(!writer.is_full());
846 /// let _ = writer.try_fill(&[0u8; 10]);
847 /// assert!(writer.is_full());
848 /// let _ = reader.try_drain(&mut [0u8; 5]);
849 /// assert!(!writer.is_full());
850 /// ```
851 pub fn is_full(&self) -> bool {
852 self.inner.len() == self.inner.cap
853 }
854
855 /// Tell whether or not the pipe is closed.
856 ///
857 /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
858 /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
859 /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
860 ///
861 /// # Examples
862 ///
863 /// ```
864 /// # futures_lite::future::block_on(async {
865 /// let (reader, writer) = piper::pipe(10);
866 /// assert!(!writer.is_closed());
867 /// drop(reader);
868 /// assert!(writer.is_closed());
869 /// # });
870 /// ```
871 pub fn is_closed(&self) -> bool {
872 self.inner.closed.load(Ordering::SeqCst)
873 }
874
875 /// Reads bytes from blocking `src` and writes into this writer.
876 ///
877 /// This method writes directly from `src` into the pipe's internal buffer. This avoids an extra copy,
878 /// but it may block the thread if `src` blocks.
879 ///
880 /// If the pipe is full, this method returns `Poll::Pending`. If the pipe is closed, this method
881 /// returns `Poll::Ready(Ok(0))`. Errors in `src` are bubbled up through `Poll::Ready(Err(e))`.
882 /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes read.
883 ///
884 /// This method is only available when the `std` feature is enabled. For `no_std` environments,
885 /// consider using [`poll_fill_bytes`] instead.
886 ///
887 /// [`poll_fill_bytes`]: #method.poll_fill_bytes
888 ///
889 /// # Examples
890 ///
891 /// ```
892 /// use futures_lite::{future, prelude::*};
893 /// # future::block_on(async {
894 ///
895 /// // Create a pipe.
896 /// let (mut reader, mut writer) = piper::pipe(1024);
897 ///
898 /// // Fill the pipe with some bytes.
899 /// let data = b"hello world";
900 /// let n = future::poll_fn(|cx| writer.poll_fill(cx, &data[..])).await.unwrap();
901 /// assert_eq!(n, data.len());
902 ///
903 /// // Read the bytes back.
904 /// let mut buf = [0; 1024];
905 /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
906 /// assert_eq!(&buf[..data.len()], data);
907 /// # });
908 /// ```
909 #[cfg(feature = "std")]
910 pub fn poll_fill(&mut self, cx: &mut Context<'_>, src: impl Read) -> Poll<io::Result<usize>> {
911 self.fill_inner(Some(cx), src)
912 }
913
914 /// Writes bytes into this writer.
915 ///
916 /// Rather than taking a `Read` trait object, this method takes a slice of bytes to read from.
917 /// Because of this, it is infallible and can be used in `no_std` environments.
918 ///
919 /// The same conditions that apply to [`poll_fill`] apply to this method.
920 ///
921 /// [`poll_fill`]: #method.poll_fill
922 ///
923 /// # Examples
924 ///
925 /// ```
926 /// use futures_lite::{future, prelude::*};
927 /// # future::block_on(async {
928 ///
929 /// // Create a pipe.
930 /// let (mut reader, mut writer) = piper::pipe(1024);
931 ///
932 /// // Fill the pipe with some bytes.
933 /// let data = b"hello world";
934 /// let n = future::poll_fn(|cx| writer.poll_fill_bytes(cx, &data[..])).await;
935 /// assert_eq!(n, data.len());
936 ///
937 /// // Read the bytes back.
938 /// let mut buf = [0; 1024];
939 /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
940 /// assert_eq!(&buf[..data.len()], data);
941 /// # });
942 /// ```
943 pub fn poll_fill_bytes(&mut self, cx: &mut Context<'_>, bytes: &[u8]) -> Poll<usize> {
944 match self.fill_inner(Some(cx), ReadBytes(bytes)) {
945 Poll::Ready(Ok(n)) => Poll::Ready(n),
946 Poll::Ready(Err(e)) => match e {},
947 Poll::Pending => Poll::Pending,
948 }
949 }
950
951 /// Tries to write bytes to this writer.
952 ///
953 /// Returns the total number of bytes that were read from this reader.
954 ///
955 /// # Examples
956 ///
957 /// ```
958 /// let (mut r, mut w) = piper::pipe(1024);
959 ///
960 /// let mut buf = [0; 10];
961 /// assert_eq!(w.try_fill(&[0, 1, 2, 3, 4]), 5);
962 /// assert_eq!(r.try_drain(&mut buf), 5);
963 /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
964 /// ```
965 pub fn try_fill(&mut self, dest: &[u8]) -> usize {
966 match self.fill_inner(None, ReadBytes(dest)) {
967 Poll::Ready(Ok(n)) => n,
968 Poll::Ready(Err(e)) => match e {},
969 Poll::Pending => 0,
970 }
971 }
972
973 /// Get the free space in bytes that can be written without synchronization.
974 #[inline]
975 fn available_space(&self) -> usize {
976 let a = self.head;
977 let b = self.tail;
978 if a <= b {
979 self.inner.cap - (b - a)
980 } else {
981 (a - b) - self.inner.cap
982 }
983 }
984
985 /// Wait for available space in the pipe or the read side to be closed.
986 ///
987 /// Returns `Poll::Ready(true)` when space is available, or `Poll::Ready(false)` if the pipe is closed.
988 #[inline]
989 fn poll_inner(&mut self, cx: Option<&mut Context<'_>>) -> Poll<bool> {
990 // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
991 if self.inner.closed.load(Ordering::Relaxed) {
992 return Poll::Ready(false);
993 }
994
995 // If the pipe appears to be full...
996 if self.available_space() == 0 {
997 // Reload the head in case it's become stale.
998 self.head = self.inner.head.load(Ordering::Acquire);
999
1000 // If the pipe is now really still full...
1001 if self.available_space() == 0 {
1002 // Register the waker.
1003 if let Some(cx) = cx {
1004 self.inner.writer.register(cx.waker());
1005 }
1006 atomic::fence(Ordering::SeqCst);
1007
1008 // Reload the head after registering the waker.
1009 self.head = self.inner.head.load(Ordering::Acquire);
1010
1011 // If the pipe is still full...
1012 if self.available_space() == 0 {
1013 // Check whether the pipe is closed or just full.
1014 if self.inner.closed.load(Ordering::Relaxed) {
1015 return Poll::Ready(false);
1016 } else {
1017 return Poll::Pending;
1018 }
1019 }
1020 }
1021 }
1022
1023 // The pipe is not full so remove the waker.
1024 self.inner.writer.take();
1025
1026 Poll::Ready(true)
1027 }
1028
1029 /// Poll for available space in the pipe or the read side to be closed.
1030 ///
1031 /// Returns `Poll::Ready(true)` when space is available to write. Call
1032 /// [`write_buf()`][Self::write_buf] to obtain a mutable slice of the buffer
1033 /// to write into, then call [`produced(n)`][Self::produced] once the data
1034 /// is written.
1035 ///
1036 /// A return value of `Poll::Ready(false)` indicates that the pipe is
1037 /// closed.
1038 ///
1039 /// If no space is available, this method will return `Poll::Pending` and
1040 /// register the waker to receive a notification when space becomes
1041 /// available or the read end is closed.
1042 ///
1043 /// # Example
1044 ///
1045 /// ```
1046 /// use futures_lite::{future, prelude::*};
1047 ///
1048 /// # future::block_on(async {
1049 /// let (mut r, mut w) = piper::pipe(1024);
1050 ///
1051 /// future::poll_fn(|cx| w.poll(cx)).await;
1052 /// let data = b"hello world";
1053 /// let mut remaining = &data[..];
1054 ///
1055 /// while !remaining.is_empty() {
1056 /// let buf = w.write_buf(remaining.len());
1057 /// let n = buf.len();
1058 /// buf[..n].copy_from_slice(&remaining[..n]);
1059 /// w.produced(n);
1060 /// remaining = &remaining[n..];
1061 /// }
1062 ///
1063 /// // Read the bytes back.
1064 /// let mut buf = [0; 64];
1065 /// r.read_exact(&mut buf[..data.len()]).await.unwrap();
1066 /// assert_eq!(&buf[..data.len()], b"hello world");
1067 /// # });
1068 /// ```
1069 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<bool> {
1070 self.poll_inner(Some(cx))
1071 }
1072
1073 /// Reads bytes from blocking `src` and writes into this writer.
1074 #[inline]
1075 fn fill_inner<R: ReadLike>(
1076 &mut self,
1077 mut cx: Option<&mut Context<'_>>,
1078 mut src: R,
1079 ) -> Poll<Result<usize, R::Error>> {
1080 if !ready!(self.poll_inner(cx.as_mut().map(|c| &mut **c))) {
1081 return Poll::Ready(Ok(0));
1082 }
1083
1084 // Yield with some small probability - this improves fairness.
1085 if let Some(cx) = cx {
1086 ready!(maybe_yield(&mut self.rng, cx));
1087 }
1088
1089 // Number of bytes written so far.
1090 let mut count = 0;
1091
1092 loop {
1093 // Not too many bytes in one go - better to wake the reader soon!
1094 let pipe_slice_mut = self.write_buf(128 * 1024);
1095
1096 // Copy bytes from `src` into the piper buffer.
1097 let n = src.read(pipe_slice_mut)?;
1098 count += n;
1099
1100 // If the pipe is full or closed, or `src` is empty, return.
1101 if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
1102 return Poll::Ready(Ok(count));
1103 }
1104
1105 self.produced(n);
1106 }
1107 }
1108
1109 /// Get a mutable slice of the pipe's internal buffer that can be written to.
1110 ///
1111 /// The contents of the slice are initialized but unspecified and you should
1112 /// not read from it or assume its contents are zeroed.
1113 ///
1114 /// The `max` parameter is an upper bound on the size of the slice returned,
1115 /// limiting the number of bytes that will be initialized when using the
1116 /// buffer for the first time.
1117 ///
1118 /// After writing to the buffer, you should call [`produced(n)`] to notify the
1119 /// pipe that `n` bytes have been written to the buffer and make them available
1120 /// to the reader.
1121 pub fn write_buf(&mut self, max: usize) -> &mut [u8] {
1122 let n = max
1123 .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
1124 .min(self.available_space()) // No more than space in the pipe.
1125 .min(self.inner.cap - self.inner.real_index(self.tail)); // Don't go past the buffer boundary.
1126
1127 // Create a slice of available space in the pipe buffer.
1128 unsafe {
1129 let from = self.inner.real_index(self.tail);
1130 let to = from + n;
1131
1132 // Make sure all bytes in the slice are initialized.
1133 if self.zeroed_until < to {
1134 self.inner
1135 .buffer
1136 .add(self.zeroed_until)
1137 .write_bytes(0u8, to - self.zeroed_until);
1138 self.zeroed_until = to;
1139 }
1140
1141 slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
1142 }
1143 }
1144
1145 /// Notify the pipe that `n` bytes have been written to the buffer returned by `write_buf()`.
1146 ///
1147 /// ## Panics
1148 /// * if `n` is greater than the size of the buffer returned by `write_buf()`.
1149 pub fn produced(&mut self, n: usize) {
1150 assert!(
1151 n <= self.available_space()
1152 && (self.zeroed_until == self.inner.cap || self.tail + n <= self.zeroed_until)
1153 && self.tail + n <= 2 * self.inner.cap,
1154 "cannot write more bytes than available space"
1155 );
1156
1157 // Move the tail forward.
1158 if self.tail + n < 2 * self.inner.cap {
1159 self.tail += n;
1160 } else {
1161 self.tail = 0;
1162 }
1163
1164 // Store the current tail index.
1165 self.inner.tail.store(self.tail, Ordering::Release);
1166
1167 // Wake the reader because the pipe is not empty.
1168 self.inner.reader.wake();
1169 }
1170}
1171
1172#[cfg(feature = "std")]
1173impl AsyncWrite for Writer {
1174 fn poll_write(
1175 mut self: Pin<&mut Self>,
1176 cx: &mut Context<'_>,
1177 buf: &[u8],
1178 ) -> Poll<io::Result<usize>> {
1179 self.poll_fill_bytes(cx, buf).map(Ok)
1180 }
1181
1182 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1183 // Nothing to flush.
1184 Poll::Ready(Ok(()))
1185 }
1186
1187 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1188 // Set the closed flag.
1189 self.inner.closed.store(true, Ordering::Release);
1190
1191 // Wake up any tasks that may be waiting on the pipe.
1192 self.inner.reader.wake();
1193 self.inner.writer.wake();
1194
1195 // The pipe is now closed.
1196 Poll::Ready(Ok(()))
1197 }
1198}
1199
1200/// A trait for reading bytes into a pipe.
1201trait ReadLike {
1202 /// The error type.
1203 type Error;
1204
1205 /// Reads bytes into the given buffer.
1206 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
1207}
1208
1209#[cfg(feature = "std")]
1210impl<R: Read> ReadLike for R {
1211 type Error = io::Error;
1212
1213 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1214 Read::read(self, buf)
1215 }
1216}
1217
1218/// Implements `no_std` reading around a byte slice.
1219struct ReadBytes<'a>(&'a [u8]);
1220
1221impl ReadLike for ReadBytes<'_> {
1222 type Error = Infallible;
1223
1224 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1225 let n = self.0.len().min(buf.len());
1226 buf[..n].copy_from_slice(&self.0[..n]);
1227 self.0 = &self.0[n..];
1228 Ok(n)
1229 }
1230}
1231
1232/// A trait for writing bytes from a pipe.
1233trait WriteLike {
1234 /// The error type.
1235 type Error;
1236
1237 /// Writes bytes from the given buffer.
1238 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
1239}
1240
1241#[cfg(feature = "std")]
1242impl<W: Write> WriteLike for W {
1243 type Error = io::Error;
1244
1245 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1246 Write::write(self, buf)
1247 }
1248}
1249
1250/// Implements `no_std` writing around a byte slice.
1251struct WriteBytes<'a>(&'a mut [u8]);
1252
1253impl WriteLike for WriteBytes<'_> {
1254 type Error = Infallible;
1255
1256 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1257 let n = self.0.len().min(buf.len());
1258 self.0[..n].copy_from_slice(&buf[..n]);
1259
1260 // mem::take() is not available on 1.36
1261 #[allow(clippy::mem_replace_with_default)]
1262 {
1263 let slice = mem::replace(&mut self.0, &mut []);
1264 self.0 = &mut slice[n..];
1265 }
1266
1267 Ok(n)
1268 }
1269}
1270
1271/// Yield with some small probability.
1272fn maybe_yield(rng: &mut fastrand::Rng, cx: &mut Context<'_>) -> Poll<()> {
1273 if rng.usize(..100) == 0 {
1274 cx.waker().wake_by_ref();
1275 Poll::Pending
1276 } else {
1277 Poll::Ready(())
1278 }
1279}
1280
1281/// Get a random number generator.
1282#[cfg(feature = "std")]
1283#[inline]
1284fn rng() -> fastrand::Rng {
1285 fastrand::Rng::new()
1286}
1287
1288/// Get a random number generator.
1289///
1290/// This uses a fixed seed due to the lack of a good RNG in `no_std` environments.
1291#[cfg(not(feature = "std"))]
1292#[inline]
1293fn rng() -> fastrand::Rng {
1294 // Chosen by fair roll of the dice.
1295 fastrand::Rng::with_seed(0x7e9b496634c97ec6)
1296}
1297
1298/// ```
1299/// use piper::{Reader, Writer};
1300/// fn _send_sync<T: Send + Sync>() {}
1301/// _send_sync::<Reader>();
1302/// _send_sync::<Writer>();
1303/// ```
1304fn _assert_send_sync() {}
1305
1306mod sync {
1307 #[cfg(not(feature = "portable-atomic"))]
1308 pub use core::sync::atomic;
1309
1310 #[cfg(not(feature = "portable-atomic"))]
1311 pub use alloc::sync::Arc;
1312
1313 #[cfg(feature = "portable-atomic")]
1314 pub use portable_atomic_crate as atomic;
1315
1316 #[cfg(feature = "portable-atomic")]
1317 pub use portable_atomic_util::Arc;
1318}