//! Futures helpers
use std::future::Future;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::future::FusedFuture;
use futures::ready;
use futures::Sink;
use pin_project::pin_project;
use void::{ResultVoidExt as _, Void};
/// Switch to the nontrivial version of this, to get debugging output on stderr
macro_rules! dprintln { { $f:literal $($a:tt)* } => { () } }
//macro_rules! dprintln { { $f:literal $($a:tt)* } => { eprintln!(concat!(" ",$f) $($a)*) } }
/// Extension trait for [`Sink`]
pub trait SinkExt<'w, OS, OM>
where
OS: Sink<OM>,
{
/// For processing an item obtained from a future, avoiding async cancel lossage
///
/// ```
/// # use futures::channel::mpsc;
/// # use tor_basic_utils::futures::SinkExt as _;
/// #
/// # #[tokio::main]
/// # async fn main() -> Result<(),mpsc::SendError> {
/// # let (mut sink, sink_r) = mpsc::unbounded::<usize>();
/// # let message_generator_future = futures::future::ready(42);
/// # let process_message = |m| Ok::<_,mpsc::SendError>(m);
/// let (message, sendable) = sink.prepare_send_from(
/// message_generator_future
/// ).await?;
/// let message = process_message(message)?;
/// sendable.send(message);
/// # Ok(())
/// # }
/// ```
///
/// Prepares to send a output message[^terminology] `OM` to an output sink `OS` (`self`),
/// where the `OM` is made from an input message `IM`,
/// and the `IM` is obtained from a future, `generator: IF`.
///
/// [^terminology]: We sometimes use slightly inconsistent terminology,
/// "item" vs "message".
/// This avoids having to have the generic parameters by named `OI` and `II`
/// where `I` is sometimes "item" and sometimes "input".
///
/// When successfully run, `prepare_send_from` gives `(IM, SinkSendable)`.
///
/// After processing `IM` into `OM`,
/// use the [`SinkSendable`] to [`send`](SinkSendable::send) the `OM` to `OS`.
///
/// # Why use this
///
/// This avoids the an async cancellation hazard
/// which exists with naive use of `select!`
/// followed by `OS.send().await`. You might write this:
///
/// ```rust,ignore
/// select!{
/// message = input_stream.next() => {
/// if let Some(message) = message {
/// let message = do_our_processing(message);
/// output_sink(message).await; // <---**BUG**
/// }
/// }
/// control = something_else() => { .. }
/// }
/// ```
///
/// If, when we reach `BUG`, the output sink is not ready to receive the message,
/// the future for that particular `select!` branch will be suspended.
/// But when `select!` finds that *any one* of the branches is ready,
/// it *drops* the futures for the other branches.
/// That drops all the local variables, including possibly `message`, losing it.
///
/// For more about cancellation safety, see
/// [Rust for the Polyglot Programmer](https://www.chiark.greenend.org.uk/~ianmdlvl/rust-polyglot/async.html#cancellation-safety)
/// which has a general summary, and
/// Matthias Einwag's
/// [extensive discussion in his gist](https://gist.github.com/Matthias247/ffc0f189742abf6aa41a226fe07398a8#cancellation-in-async-rust)
/// with comparisons to other languages.
///
/// ## Alternatives
///
/// Unbounded mpsc channels, and certain other primitives,
/// do not suffer from this problem because they do not block.
/// `UnboundedSender` offers
/// [`unbounded_send`](futures::channel::mpsc::UnboundedSender::unbounded_send)
/// but only as an inherent method, so this does not compose with `Sink` combinators.
/// And of course unbounded channels do not implement any backpressure.
///
/// The problem can otherwise be avoided by completely eschewing use of `select!`
/// and writing manual implementations of `Future`, `Sink`, and so on,
/// However, such code is typically considerably more complex and involves
/// entangling the primary logic with future machinery.
/// It is normally better to write primary functionality in `async { }`
/// using utilities (often "futures combinators") such as this one.
///
// Personal note from @Diziet:
// IMO it is generally accepted in the Rust community that
// it is not good practice to write principal code at the manual futues level.
// However, I have not been able to find very clear support for this proposition.
// There are endless articles explaining how futures work internally,
// often by describing how to reimplement standard combinators such as `map`.
// ISTM that these exist to help understanding,
// but it seems to be only rarely stated that doing this is not generally a good idea.
//
// I did find the following:
//
// https://dev.to/mindflavor/rust-futures-an-uneducated-short-and-hopefully-not-boring-tutorial---part-4---a-real-future-from-scratch-734#conclusion
//
// Of course you generally do not write a future manually. You use the ones provided by
// libraries and compose them as needed. It's important to understand how they work
// nevertheless.
//
// And of curse the existence of the `futures` crate is indicative:
// it consists almost entirely of combinators and utilities
// whose purpose is to allow you to write many structures in async code
// without needing to resort to manual future impls.
//
/// # Example
///
/// This comprehensive example demonstrates how to read from possibly multiple sources
/// and also be able to process other events:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use futures::select;
/// use futures::{SinkExt as _, StreamExt as _};
/// use tor_basic_utils::futures::SinkExt as _;
///
/// let (mut input_w, mut input_r) = futures::channel::mpsc::unbounded::<usize>();
/// let (mut output_w, mut output_r) = futures::channel::mpsc::unbounded::<String>();
/// input_w.send(42).await;
/// select!{
/// ret = output_w.prepare_send_from(async {
/// select!{
/// got_input = input_r.next() => got_input.expect("input stream ended!"),
/// () = futures::future::pending() => panic!(), // other branches are OK here
/// }
/// }) => {
/// let (input_msg, sendable) = ret.unwrap();
/// let output_msg = input_msg.to_string();
/// let () = sendable.send(output_msg).unwrap();
/// },
/// () = futures::future::pending() => panic!(), // other branches are OK here
/// }
///
/// assert_eq!(output_r.next().await.unwrap(), "42");
/// # }
/// ```
///
/// # Formally
///
/// [`prepare_send_from`](SinkExt::prepare_send_from)
/// returns a [`SinkPrepareSendFuture`] which, when awaited:
///
/// * Waits for `OS` to be ready to receive an item.
/// * Runs `message_generator` to obtain a `IM`.
/// * Returns the `IM` (for processing), and a [`SinkSendable`].
///
/// The caller should then:
///
/// * Check the error from `prepare_send_from`
/// (which came from the *output* sink).
/// * Process the `IM`, making an `OM` out of it.
/// * Call [`sendable.send()`](SinkSendable::send) (and check its error).
///
/// # Flushing
///
/// `prepare_send_from` will (when awaited)
/// [`flush`](futures::SinkExt::flush) the output sink
/// when it finds the input is not ready yet.
/// Until then items may be buffered
/// (as if they had been written with [`feed`](futures::SinkExt::feed)).
///
/// # Errors
///
/// ## Output sink errors
///
/// The call site can experience output sink errors in two places,
/// [`prepare_send_from()`](SinkExt::prepare_send_from) and [`SinkSendable::send()`].
/// The caller should typically handle them the same way regardless of when they occurred.
///
/// If the error happens at [`SinkSendable::send()`],
/// the call site will usually be forced to discard the item being processed.
/// This will only occur if the sink is actually broken.
///
/// ## Errors specific to the call site: faillible input, and fallible processing
///
/// At some call sites, the input future may yield errors
/// (perhaps it is reading from a `Stream` of [`Result`]s).
/// in that case the value from the input future will be a [`Result`].
/// Then `IM` is a `Result`, and is provided in the `.0` element
/// of the "successful" return from `prepare_send_from`.
///
/// And, at some call sites, the processing of an `IM` into an `OM` is fallible.
///
/// Handling these latter two error caess is up to the caller,
/// in the code which processes `IM`.
/// The call site will often want to deal with such an error
/// without sending anything into the output sink,
/// and can then just drop the [`SinkSendable`].
///
/// # Implementations
///
/// This is an extension trait and you are not expected to need to implement it.
///
/// There are provided implementations for `Pin<&mut impl Sink>`
/// and `&mut impl Sink + Unpin`, for your convenience.
fn prepare_send_from<IF, IM>(
self,
message_generator: IF,
) -> SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>;
}
impl<'w, OS, OM> SinkExt<'w, OS, OM> for Pin<&'w mut OS>
where
OS: Sink<OM>,
{
fn prepare_send_from<'r, IF, IM>(
self,
message_generator: IF,
) -> SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>,
{
SinkPrepareSendFuture {
output: Some(self),
generator: message_generator,
tw: PhantomData,
}
}
}
impl<'w, OS, OM> SinkExt<'w, OS, OM> for &'w mut OS
where
OS: Sink<OM> + Unpin,
{
fn prepare_send_from<'r, IF, IM>(
self,
message_generator: IF,
) -> SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>,
{
Pin::new(self).prepare_send_from(message_generator)
}
}
/// Future for `SinkExt::prepare_send_from`
#[pin_project]
#[must_use]
pub struct SinkPrepareSendFuture<'w, IF, OS, OM> {
///
#[pin]
generator: IF,
/// This Option exists because otherwise SinkPrepareSendFuture::poll()
/// can't move `output` out of this struct to put it into the `SinkSendable`.
/// (The poll() impl cannot borrow from SinkPrepareSendFuture.)
output: Option<Pin<&'w mut OS>>,
/// `fn(OM)` gives contravariance in OM.
///
/// Variance is confusing.
/// Loosely, a SinkPrepareSendFuture<..OM> consumes an OM.
/// Actually, we don't really need to add any variance restricions wrt OM,
/// because the &mut OS already implies the correct variance,
/// so we could have used the PhantomData<fn(*const OM)> trick.
/// Happily there is no unsafe anywhere nearby, so it is not possible for us to write
/// a bug due to getting the variance wrong - only to erroneously prevent some use
/// case.
tw: PhantomData<fn(OM)>,
}
/// A [`Sink`] which is ready to receive an item
///
/// Produced by [`SinkExt::prepare_send_from`]. See there for the overview docs.
///
/// This references an output sink `OS`.
/// It offers the ability to write into the sink without blocking,
/// (and constitutes a proof token that the sink has declared itself ready for that).
///
/// The only useful method is [`send`](SinkSendable::send).
///
/// `SinkSendable` has no drop glue and can be freely dropped,
/// for example if you prepare to send a message and then
/// encounter an error when producing the output message.
#[must_use]
pub struct SinkSendable<'w, OS, OM> {
///
output: Pin<&'w mut OS>,
///
tw: PhantomData<fn(OM)>,
}
impl<'w, IF, OS, IM, OM> Future for SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>,
OS: Sink<OM>,
{
type Output = Result<(IM, SinkSendable<'w, OS, OM>), OS::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut self_ = self.project();
/// returns `&mut Pin<&'w mut OS>` from self_.output
//
// macro because the closure's type parameters would be unnameable.
macro_rules! get_output {
($self_:expr) => {
$self_.output.as_mut().expect(BAD_POLL_MSG).as_mut()
};
}
///
const BAD_POLL_MSG: &str =
"future from SinkExt::prepare_send_from (SinkPrepareSendFuture) \
polled after returning Ready(Ok)";
let () = match ready!(get_output!(self_).poll_ready(cx)) {
Err(e) => {
dprintln!("poll: output poll = IF.Err SO IF.Err");
// Deliberately don't fuse by `take`ing output. If we did that, we would expose
// our caller to an additional panic risk. There is no harm in polling the output
// sink again: although `Sink` documents that a sink that returns errors will
// probably continue to do so, it is not forbidden to try it and see. This is in
// any case better than definitely crashing if the `SinkPrepareSendFuture` is
// polled after it gave Ready.
return Poll::Ready(Err(e));
}
Ok(()) => {
dprintln!("poll: output poll = IF.Ok calling generator");
}
};
let value = match self_.generator.as_mut().poll(cx) {
Poll::Pending => {
// We defer flushing the output until the input stops yielding.
// This allows our caller (which is typically a loop) to transfer multiple
// items from their input to their output between flushes.
//
// But we must not return `Pending` without flushing, or the caller could block
// without flushing output, leading to untimely delivery of buffered data.
dprintln!("poll: generator = Pending calling output flush");
let flushed = get_output!(self_).poll_flush(cx);
return match flushed {
Poll::Ready(Err(e)) => {
dprintln!("poll: output flush = IF.Err SO IF.Err");
Poll::Ready(Err(e))
}
Poll::Ready(Ok(())) => {
dprintln!("poll: output flush = IF.Ok SO Pending");
Poll::Pending
}
Poll::Pending => {
dprintln!("poll: output flush = Pending SO Pending");
Poll::Pending
}
};
}
Poll::Ready(v) => {
dprintln!("poll: generator = Ready SO IF.Ok");
v
}
};
let sendable = SinkSendable {
output: self_.output.take().expect(BAD_POLL_MSG),
tw: PhantomData,
};
Poll::Ready(Ok((value, sendable)))
}
}
impl<'w, IF, OS, IM, OM> FusedFuture for SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>,
OS: Sink<OM>,
{
fn is_terminated(&self) -> bool {
let r = self.output.is_none();
dprintln!("is_terminated = {}", r);
r
}
}
impl<'w, OS, OM> SinkSendable<'w, OS, OM>
where
OS: Sink<OM>,
{
/// Synchronously send an item into `OS`, which is a [`Sink`]
///
/// Can fail if the sink `OS` reports an error.
///
/// (However, the existence of the `SinkSendable` demonstrates that
/// the sink reported itself ready for sending,
/// so this call is synchronous, avoiding cancellation hazards.)
pub fn send(self, item: OM) -> Result<(), OS::Error> {
dprintln!("send ...");
let r = self.output.start_send(item);
dprintln!("send: {:?}", r.as_ref().map_err(|_| (())));
r
}
}
/// Extension trait for some `postage::watch::Sender` to provide `maybe_send`
///
/// Ideally these, or something like them, would be upstream:
/// See <https://github.com/austinjones/postage-rs/issues/56>.
///
/// We provide this as an extension trait became the implementation is a bit fiddly.
/// This lets us concentrate on the actual logic, when we use it.
pub trait PostageWatchSenderExt<T> {
/// Update, by calling a fallible function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>;
/// Update, by calling a function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn maybe_send<F>(&mut self, update: F)
where
T: PartialEq,
F: FnOnce(&T) -> T,
{
self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
.void_unwrap();
}
}
impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>,
{
let lock = self.borrow();
let new = update(&*lock)?;
if new != *lock {
// We must drop the lock guard, because otherwise borrow_mut will deadlock.
// There is no race, because we hold &mut self, so no-one else can get a look in.
// (postage::watch::Sender is not one of those facilities which is mereely a
// handle, and Clone.)
drop(lock);
*self.borrow_mut() = new;
}
Ok(())
}
}
#[derive(Debug)]
/// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped
///
/// Derefs to the inner `Sender`.
///
/// Ideally this would be behaviour promised by upstream, or something
/// See <https://github.com/austinjones/postage-rs/issues/57>.
pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
/// Values that can signal EOF
///
/// Implemented for `Option`, which is usually what you want to use.
pub trait DropNotifyEofSignallable {
/// Generate the EOF value
fn eof() -> Self;
/// Does this value indicate EOF
fn is_eof(&self) -> bool;
}
impl<T> DropNotifyEofSignallable for Option<T> {
fn eof() -> Self {
None
}
fn is_eof(&self) -> bool {
self.is_none()
}
}
impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
/// Arrange to send `T::Default` when `inner` is dropped
pub fn new(inner: postage::watch::Sender<T>) -> Self {
DropNotifyWatchSender(Some(inner))
}
/// Unwrap the inner sender, defusing the drop notification
pub fn into_inner(mut self) -> postage::watch::Sender<T> {
self.0.take().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
type Target = postage::watch::Sender<T>;
fn deref(&self) -> &Self::Target {
self.0.as_ref().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
fn drop(&mut self) {
if let Some(mut inner) = self.0.take() {
// None means into_inner() was called
*inner.borrow_mut() = DropNotifyEofSignallable::eof();
}
}
}
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use futures::channel::mpsc;
use futures::future::poll_fn;
use futures::select_biased;
use futures::SinkExt as _;
use futures_await_test::async_test;
use std::convert::Infallible;
use std::sync::Arc;
use std::sync::Mutex;
#[derive(Debug, Eq, PartialEq)]
struct TestError(char);
#[async_test]
async fn prepare_send() {
// Early versions of this used unfold quite a lot more, but it is not really
// convenient for testing. It buffers one item internally, and is also buggy:
// https://github.com/rust-lang/futures-rs/issues/2600
// So we use mpsc channels, which (perhaps with buffering) are quite controllable.
// The eprintln!("FOR ...") calls correspond go the dprintln1() calls in the impl,
// and can check that each code path in the implementation is used,
// by turning on the dbug and using `--nocapture`.
{
eprintln!("-- disconnected ---");
eprintln!("FOR poll: output poll = IF.Err SO IF.Err");
let (mut w, r) = mpsc::unbounded::<usize>();
drop(r);
let ret = w.prepare_send_from(async { Ok::<_, Infallible>(12) }).await;
assert!(ret.map(|_| ()).unwrap_err().is_disconnected());
}
{
eprintln!("-- buffered late disconnect --");
eprintln!("FOR poll: output poll = IF.Ok calling generator");
eprintln!("FOR poll: output flush = IF.Err SO IF.Err");
let (w, r) = mpsc::unbounded::<usize>();
let mut w = w.buffer(10);
let mut r = Some(r);
w.feed(66).await.unwrap();
let ret = w
.prepare_send_from(poll_fn(move |_cx| {
drop(r.take());
Poll::Pending::<usize>
}))
.await;
assert!(ret.map(|_| ()).unwrap_err().is_disconnected());
}
{
eprintln!("-- flushing before wait --");
eprintln!("FOR poll: output flush = IF.Ok SO Pending");
let (mut w, _r) = mpsc::unbounded::<usize>();
let () = select_biased! {
_ = w.prepare_send_from(poll_fn(
move |_cx| {
Poll::Pending::<usize>
}
)) => panic!(),
_ = futures::future::ready(()) => { },
};
}
{
eprintln!("-- flush before wait is pending --");
eprintln!("FOR poll: output flush = Pending SO Pending");
let (mut w, _r) = mpsc::channel::<usize>(0);
let () = w.feed(77).await.unwrap();
let mut w = w.buffer(10);
let () = select_biased! {
_ = w.prepare_send_from(poll_fn(
move |_cx| {
Poll::Pending::<usize>
}
)) => panic!(),
_ = futures::future::ready(()) => { },
};
}
{
eprintln!("-- flush before wait is pending --");
eprintln!("FOR poll: generator = Ready SO IF.Ok");
eprintln!("FOR send ...");
eprintln!("ALSO check that bufferinrg works as expected");
let sunk = Arc::new(Mutex::new(vec![]));
let unfold = futures::sink::unfold((), |(), v| {
let sunk = sunk.clone();
async move {
dbg!();
sunk.lock().unwrap().push(v);
Ok::<_, Infallible>(())
}
});
let mut unfold = Box::pin(unfold.buffer(10));
for v in [42, 43] {
// We can only do two here because that's how many we can actually buffer in Buffer
// and Unfold. Because our closure is always ready, the buffering isn't actually
// as copious as all that. This is fine, because the point of this test is to test
// *flushing*.
dbg!(v);
let ret = unfold
.prepare_send_from(async move { Ok::<_, Infallible>(v) })
.await;
let (msg, sendable) = ret.unwrap();
let msg = msg.unwrap();
assert_eq!(msg, v);
let () = sendable.send(msg).unwrap();
assert_eq!(*sunk.lock().unwrap(), &[]); // It's still buffered
}
select_biased! {
_ = unfold.prepare_send_from(futures::future::pending::<()>()) => panic!(),
_ = futures::future::ready(()) => { },
};
assert_eq!(*sunk.lock().unwrap(), &[42, 43]);
}
}
#[async_test]
async fn postage_sender_ext() {
use futures::stream::StreamExt;
use futures::FutureExt;
let (mut s, mut r) = postage::watch::channel_with(20);
// Receiver of a fresh watch wakes once, but let's not rely on this
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(20)),
_ = futures::future::ready(()) => { }, // tolerate nothing
};
// Now, not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i);
// Still not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i + 1);
// Ready, with 21
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(21)),
_ = futures::future::ready(()) => panic!(),
};
let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
// Not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
}
#[async_test]
async fn postage_drop() {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct I(i32);
impl DropNotifyEofSignallable for I {
fn eof() -> I {
I(0)
}
fn is_eof(&self) -> bool {
self.0 == 0
}
}
let (s, r) = postage::watch::channel_with(I(20));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(20));
drop(s);
assert_eq!(*r.borrow(), I(0));
let (s, r) = postage::watch::channel_with(I(44));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(44));
drop(s.into_inner());
assert_eq!(*r.borrow(), I(44));
}
}