1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use crate::shared::*;
use crate::{flavor::FlavorMP, AsyncTx, MAsyncTx, TrySendError};
use std::fmt;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::task::*;
/// An async sink that allows you to write custom futures with `poll_send(ctx)`.
pub struct AsyncSink<F: Flavor> {
tx: AsyncTx<F>,
waker: Option<<F::Send as Registry>::Waker>,
}
impl<F: Flavor> fmt::Debug for AsyncSink<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "AsyncSink")
}
}
impl<F: Flavor> fmt::Display for AsyncSink<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "AsyncSink")
}
}
impl<F: Flavor> AsyncSink<F> {
#[inline]
pub fn new(tx: AsyncTx<F>) -> Self {
Self { tx, waker: None }
}
}
impl<F: Flavor> Deref for AsyncSink<F> {
type Target = AsyncTx<F>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.tx
}
}
impl<F: Flavor> From<AsyncTx<F>> for AsyncSink<F> {
#[inline]
fn from(tx: AsyncTx<F>) -> Self {
tx.into_sink()
}
}
impl<F: Flavor + FlavorMP> From<MAsyncTx<F>> for AsyncSink<F> {
#[inline]
fn from(tx: MAsyncTx<F>) -> Self {
tx.into_sink()
}
}
impl<F: Flavor> AsyncSink<F>
where
F::Item: Unpin,
{
/// `poll_send()` will try to send a message.
/// If the channel is full, it will register a notification for the next poll.
///
/// # Behavior
///
/// The polling behavior is different from [SendFuture](crate::SendFuture).
/// Because the waker is not exposed to the user, you cannot perform delicate operations on
/// the waker (compared to the `Drop` handler in `SendFuture`).
/// To make sure no deadlock happens on cancellation, the `WakerState` will be `Init`
/// after being registered (and will not be converted to `Waiting`).
/// The receivers will wake up all `Init` state wakers until they find a normal
/// pending sender in the `Waiting` state.
///
/// # Return value:
///
/// Returns `Ok(())` on message sent.
///
/// Returns `Err([crate::TrySendError::Full])` for a `Poll::Pending` case.
/// The next time the channel is not full, your future will be woken again.
/// You should then continue calling `poll_send()` to send the message.
/// If you want to cancel, just don't call `poll_send()` again. There are no side effects,
/// and other senders will have a chance to send their messages.
///
/// Returns `Err([crate::TrySendError::Disconnected])` when all `Rx` are dropped.
#[inline]
pub fn poll_send(
&mut self, ctx: &mut Context, item: F::Item,
) -> Result<(), TrySendError<F::Item>> {
let _item = MaybeUninit::new(item);
let shared = &self.tx.shared;
if shared.inner.try_send(&_item) {
shared.on_send();
return Ok(());
}
match self.tx.poll_send::<true>(ctx, &_item, &mut self.waker) {
Poll::Ready(Ok(())) => Ok(()),
Poll::Ready(Err(())) => Err(TrySendError::Disconnected(unsafe { _item.assume_init() })),
Poll::Pending => Err(TrySendError::Full(unsafe { _item.assume_init() })),
}
}
}
impl<F: Flavor> Drop for AsyncSink<F> {
fn drop(&mut self) {
if let Some(waker) = self.waker.as_ref() {
self.tx.shared.abandon_send_waker(waker);
}
}
}