#![cfg_attr(not(feature = "std"), no_std)]
#![forbid(missing_docs)]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
extern crate alloc;
use core::convert::Infallible;
use core::mem;
use core::slice;
use core::task::{Context, Poll};
use alloc::vec::Vec;
use sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
use sync::Arc;
#[cfg(feature = "std")]
use std::{
io::{self, Read, Write},
pin::Pin,
};
use atomic_waker::AtomicWaker;
#[cfg(feature = "std")]
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
macro_rules! ready {
($e:expr) => {{
match $e {
Poll::Ready(t) => t,
Poll::Pending => return Poll::Pending,
}
}};
}
#[allow(clippy::incompatible_msrv)] pub fn pipe(cap: usize) -> (Reader, Writer) {
assert!(cap > 0, "capacity must be positive");
assert!(cap.checked_mul(2).is_some(), "capacity is too large");
let mut v = Vec::with_capacity(cap);
let buffer = v.as_mut_ptr();
mem::forget(v);
let inner = Arc::new(Pipe {
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
reader: AtomicWaker::new(),
writer: AtomicWaker::new(),
closed: AtomicBool::new(false),
buffer,
cap,
});
let mut rng = rng();
let r = Reader {
inner: inner.clone(),
head: 0,
tail: 0,
rng: rng.fork(),
};
let w = Writer {
inner,
head: 0,
tail: 0,
zeroed_until: 0,
rng,
};
(r, w)
}
pub struct Reader {
inner: Arc<Pipe>,
head: usize,
tail: usize,
rng: fastrand::Rng,
}
pub struct Writer {
inner: Arc<Pipe>,
head: usize,
tail: usize,
zeroed_until: usize,
rng: fastrand::Rng,
}
struct Pipe {
head: AtomicUsize,
tail: AtomicUsize,
reader: AtomicWaker,
writer: AtomicWaker,
closed: AtomicBool,
buffer: *mut u8,
cap: usize,
}
unsafe impl Sync for Pipe {}
unsafe impl Send for Pipe {}
impl Drop for Pipe {
fn drop(&mut self) {
unsafe {
Vec::from_raw_parts(self.buffer, 0, self.cap);
}
}
}
impl Drop for Reader {
fn drop(&mut self) {
self.inner.closed.store(true, Ordering::SeqCst);
self.inner.writer.wake();
}
}
impl Drop for Writer {
fn drop(&mut self) {
self.inner.closed.store(true, Ordering::SeqCst);
self.inner.reader.wake();
}
}
impl Pipe {
fn len(&self) -> usize {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
if head <= tail {
tail - head
} else {
(2 * self.cap) - (head - tail)
}
}
#[inline]
fn real_index(&self, i: usize) -> usize {
if i < self.cap {
i
} else {
i - self.cap
}
}
}
impl Reader {
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.len() == 0
}
pub fn capacity(&self) -> usize {
self.inner.cap
}
pub fn is_full(&self) -> bool {
self.inner.len() == self.inner.cap
}
pub fn is_closed(&self) -> bool {
self.inner.closed.load(Ordering::SeqCst)
}
#[cfg(feature = "std")]
pub fn poll_drain(
&mut self,
cx: &mut Context<'_>,
dest: impl Write,
) -> Poll<io::Result<usize>> {
self.drain_inner(Some(cx), dest)
}
pub fn poll_drain_bytes(&mut self, cx: &mut Context<'_>, dest: &mut [u8]) -> Poll<usize> {
match self.drain_inner(Some(cx), WriteBytes(dest)) {
Poll::Ready(Ok(n)) => Poll::Ready(n),
Poll::Ready(Err(e)) => match e {},
Poll::Pending => Poll::Pending,
}
}
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<bool> {
self.poll_available(Some(cx))
}
pub fn peek_buf(&self) -> &[u8] {
let n = self
.available_data() .min(self.inner.cap - self.inner.real_index(self.head));
unsafe { slice::from_raw_parts(self.inner.buffer.add(self.inner.real_index(self.head)), n) }
}
pub fn consume(&mut self, amt: usize) {
let cap = self.inner.cap;
assert!(
amt <= self.available_data() && self.head + amt <= 2 * cap,
"cannot consume more bytes than available in the pipe"
);
if self.head + amt < 2 * cap {
self.head += amt;
} else {
self.head = 0;
}
self.inner.head.store(self.head, Ordering::Release);
self.inner.writer.wake();
}
pub fn try_drain(&mut self, dest: &mut [u8]) -> usize {
match self.drain_inner(None, WriteBytes(dest)) {
Poll::Ready(Ok(n)) => n,
Poll::Ready(Err(e)) => match e {},
Poll::Pending => 0,
}
}
#[inline]
fn available_data(&self) -> usize {
let a = self.head;
let b = self.tail;
if a <= b {
b - a
} else {
2 * self.inner.cap - (a - b)
}
}
fn poll_available(&mut self, mut cx: Option<&mut Context<'_>>) -> Poll<bool> {
if self.available_data() == 0 {
self.tail = self.inner.tail.load(Ordering::Acquire);
if self.available_data() == 0 {
if let Some(cx) = cx.as_mut() {
self.inner.reader.register(cx.waker());
}
atomic::fence(Ordering::SeqCst);
let is_closed = self.inner.closed.load(Ordering::Acquire);
self.tail = self.inner.tail.load(Ordering::Acquire);
if self.available_data() == 0 {
if is_closed {
return Poll::Ready(false);
} else {
return Poll::Pending;
}
}
}
}
self.inner.reader.take();
Poll::Ready(true)
}
#[inline]
fn drain_inner<W: WriteLike>(
&mut self,
mut cx: Option<&mut Context<'_>>,
mut dest: W,
) -> Poll<Result<usize, W::Error>> {
if !ready!(self.poll_available(cx.as_mut().map(|c| &mut **c))) {
return Poll::Ready(Ok(0));
}
if let Some(cx) = cx {
ready!(maybe_yield(&mut self.rng, cx));
}
let mut count = 0;
loop {
let pipe_slice = self.peek_buf();
let pipe_slice = &pipe_slice[..pipe_slice.len().min(128 * 1024)];
let n = dest.write(pipe_slice)?;
count += n;
if n == 0 {
return Poll::Ready(Ok(count));
}
self.consume(n);
}
}
}
#[cfg(feature = "std")]
impl AsyncRead for Reader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.poll_drain_bytes(cx, buf).map(Ok)
}
}
#[cfg(feature = "std")]
impl AsyncBufRead for Reader {
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
ready!(self.poll(cx));
let this = unsafe { self.get_unchecked_mut() };
Poll::Ready(Ok(this.peek_buf()))
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
(*self).consume(amt)
}
}
impl Writer {
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.len() == 0
}
pub fn capacity(&self) -> usize {
self.inner.cap
}
pub fn is_full(&self) -> bool {
self.inner.len() == self.inner.cap
}
pub fn is_closed(&self) -> bool {
self.inner.closed.load(Ordering::SeqCst)
}
#[cfg(feature = "std")]
pub fn poll_fill(&mut self, cx: &mut Context<'_>, src: impl Read) -> Poll<io::Result<usize>> {
self.fill_inner(Some(cx), src)
}
pub fn poll_fill_bytes(&mut self, cx: &mut Context<'_>, bytes: &[u8]) -> Poll<usize> {
match self.fill_inner(Some(cx), ReadBytes(bytes)) {
Poll::Ready(Ok(n)) => Poll::Ready(n),
Poll::Ready(Err(e)) => match e {},
Poll::Pending => Poll::Pending,
}
}
pub fn try_fill(&mut self, dest: &[u8]) -> usize {
match self.fill_inner(None, ReadBytes(dest)) {
Poll::Ready(Ok(n)) => n,
Poll::Ready(Err(e)) => match e {},
Poll::Pending => 0,
}
}
#[inline]
fn available_space(&self) -> usize {
let a = self.head;
let b = self.tail;
if a <= b {
self.inner.cap - (b - a)
} else {
(a - b) - self.inner.cap
}
}
#[inline]
fn poll_inner(&mut self, cx: Option<&mut Context<'_>>) -> Poll<bool> {
if self.inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(false);
}
if self.available_space() == 0 {
self.head = self.inner.head.load(Ordering::Acquire);
if self.available_space() == 0 {
if let Some(cx) = cx {
self.inner.writer.register(cx.waker());
}
atomic::fence(Ordering::SeqCst);
self.head = self.inner.head.load(Ordering::Acquire);
if self.available_space() == 0 {
if self.inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(false);
} else {
return Poll::Pending;
}
}
}
}
self.inner.writer.take();
Poll::Ready(true)
}
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<bool> {
self.poll_inner(Some(cx))
}
#[inline]
fn fill_inner<R: ReadLike>(
&mut self,
mut cx: Option<&mut Context<'_>>,
mut src: R,
) -> Poll<Result<usize, R::Error>> {
if !ready!(self.poll_inner(cx.as_mut().map(|c| &mut **c))) {
return Poll::Ready(Ok(0));
}
if let Some(cx) = cx {
ready!(maybe_yield(&mut self.rng, cx));
}
let mut count = 0;
loop {
let pipe_slice_mut = self.write_buf(128 * 1024);
let n = src.read(pipe_slice_mut)?;
count += n;
if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(Ok(count));
}
self.produced(n);
}
}
pub fn write_buf(&mut self, max: usize) -> &mut [u8] {
let n = max
.min(self.zeroed_until * 2 + 4096) .min(self.available_space()) .min(self.inner.cap - self.inner.real_index(self.tail));
unsafe {
let from = self.inner.real_index(self.tail);
let to = from + n;
if self.zeroed_until < to {
self.inner
.buffer
.add(self.zeroed_until)
.write_bytes(0u8, to - self.zeroed_until);
self.zeroed_until = to;
}
slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
}
}
pub fn produced(&mut self, n: usize) {
assert!(
n <= self.available_space()
&& (self.zeroed_until == self.inner.cap || self.tail + n <= self.zeroed_until)
&& self.tail + n <= 2 * self.inner.cap,
"cannot write more bytes than available space"
);
if self.tail + n < 2 * self.inner.cap {
self.tail += n;
} else {
self.tail = 0;
}
self.inner.tail.store(self.tail, Ordering::Release);
self.inner.reader.wake();
}
}
#[cfg(feature = "std")]
impl AsyncWrite for Writer {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.poll_fill_bytes(cx, buf).map(Ok)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner.closed.store(true, Ordering::Release);
self.inner.reader.wake();
self.inner.writer.wake();
Poll::Ready(Ok(()))
}
}
trait ReadLike {
type Error;
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
}
#[cfg(feature = "std")]
impl<R: Read> ReadLike for R {
type Error = io::Error;
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
Read::read(self, buf)
}
}
struct ReadBytes<'a>(&'a [u8]);
impl ReadLike for ReadBytes<'_> {
type Error = Infallible;
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
let n = self.0.len().min(buf.len());
buf[..n].copy_from_slice(&self.0[..n]);
self.0 = &self.0[n..];
Ok(n)
}
}
trait WriteLike {
type Error;
fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
}
#[cfg(feature = "std")]
impl<W: Write> WriteLike for W {
type Error = io::Error;
fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
Write::write(self, buf)
}
}
struct WriteBytes<'a>(&'a mut [u8]);
impl WriteLike for WriteBytes<'_> {
type Error = Infallible;
fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
let n = self.0.len().min(buf.len());
self.0[..n].copy_from_slice(&buf[..n]);
#[allow(clippy::mem_replace_with_default)]
{
let slice = mem::replace(&mut self.0, &mut []);
self.0 = &mut slice[n..];
}
Ok(n)
}
}
fn maybe_yield(rng: &mut fastrand::Rng, cx: &mut Context<'_>) -> Poll<()> {
if rng.usize(..100) == 0 {
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
#[cfg(feature = "std")]
#[inline]
fn rng() -> fastrand::Rng {
fastrand::Rng::new()
}
#[cfg(not(feature = "std"))]
#[inline]
fn rng() -> fastrand::Rng {
fastrand::Rng::with_seed(0x7e9b496634c97ec6)
}
fn _assert_send_sync() {}
mod sync {
#[cfg(not(feature = "portable-atomic"))]
pub use core::sync::atomic;
#[cfg(not(feature = "portable-atomic"))]
pub use alloc::sync::Arc;
#[cfg(feature = "portable-atomic")]
pub use portable_atomic_crate as atomic;
#[cfg(feature = "portable-atomic")]
pub use portable_atomic_util::Arc;
}