futures_intrusive/channel/
channel_future.rs

1use super::ChannelSendError;
2use crate::intrusive_double_linked_list::ListNode;
3use core::marker::PhantomData;
4use core::pin::Pin;
5use futures_core::future::{FusedFuture, Future};
6use futures_core::task::{Context, Poll, Waker};
7
8/// Conveys additional information regarding the status of a channel
9/// following a `close` operation.
10#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
11pub enum CloseStatus {
12    /// The channel has just been closed by the operation.
13    NewlyClosed,
14
15    /// The channel was already closed prior to the operation.
16    AlreadyClosed,
17}
18
19impl CloseStatus {
20    /// Returns whether the value is the `NewlyClosed` variant.
21    pub fn is_newly_closed(self) -> bool {
22        match self {
23            Self::NewlyClosed => true,
24            _ => false,
25        }
26    }
27
28    /// Returns whether the value is the `AlreadyClosed` variant.
29    pub fn is_already_closed(self) -> bool {
30        match self {
31            Self::AlreadyClosed => true,
32            _ => false,
33        }
34    }
35}
36
37/// Tracks how the future had interacted with the channel
38#[derive(PartialEq, Debug)]
39pub enum RecvPollState {
40    /// The task is not registered at the wait queue at the channel
41    Unregistered,
42    /// The task was added to the wait queue at the channel.
43    Registered,
44    /// The task was notified that a value is available or can be sent,
45    /// but hasn't interacted with the channel since then
46    Notified,
47}
48
49/// Tracks the channel futures waiting state.
50/// Access to this struct is synchronized through the channel.
51#[derive(Debug)]
52pub struct RecvWaitQueueEntry {
53    /// The task handle of the waiting task
54    pub task: Option<Waker>,
55    /// Current polling state
56    pub state: RecvPollState,
57}
58
59impl RecvWaitQueueEntry {
60    /// Creates a new RecvWaitQueueEntry
61    pub fn new() -> RecvWaitQueueEntry {
62        RecvWaitQueueEntry {
63            task: None,
64            state: RecvPollState::Unregistered,
65        }
66    }
67}
68
69/// Tracks how the future had interacted with the channel
70#[derive(PartialEq, Debug)]
71pub enum SendPollState {
72    /// The task is not registered at the wait queue at the channel
73    Unregistered,
74    /// The task was added to the wait queue at the channel.
75    Registered,
76    /// The value has been transmitted to the other task
77    SendComplete,
78}
79
80/// Tracks the channel futures waiting state.
81/// Access to this struct is synchronized through the channel.
82pub struct SendWaitQueueEntry<T> {
83    /// The task handle of the waiting task
84    pub task: Option<Waker>,
85    /// Current polling state
86    pub state: SendPollState,
87    /// The value to send
88    pub value: Option<T>,
89}
90
91impl<T> core::fmt::Debug for SendWaitQueueEntry<T> {
92    fn fmt(
93        &self,
94        fmt: &mut core::fmt::Formatter<'_>,
95    ) -> core::result::Result<(), core::fmt::Error> {
96        fmt.debug_struct("SendWaitQueueEntry")
97            .field("task", &self.task)
98            .field("state", &self.state)
99            .finish()
100    }
101}
102
103impl<T> SendWaitQueueEntry<T> {
104    /// Creates a new SendWaitQueueEntry
105    pub fn new(value: T) -> SendWaitQueueEntry<T> {
106        SendWaitQueueEntry {
107            task: None,
108            state: SendPollState::Unregistered,
109            value: Some(value),
110        }
111    }
112}
113
114/// Adapter trait that allows Futures to generically interact with Channel
115/// implementations via dynamic dispatch.
116pub trait ChannelSendAccess<T> {
117    unsafe fn send_or_register(
118        &self,
119        wait_node: &mut ListNode<SendWaitQueueEntry<T>>,
120        cx: &mut Context<'_>,
121    ) -> (Poll<()>, Option<T>);
122
123    fn remove_send_waiter(
124        &self,
125        wait_node: &mut ListNode<SendWaitQueueEntry<T>>,
126    );
127}
128
129/// Adapter trait that allows Futures to generically interact with Channel
130/// implementations via dynamic dispatch.
131pub trait ChannelReceiveAccess<T> {
132    unsafe fn receive_or_register(
133        &self,
134        wait_node: &mut ListNode<RecvWaitQueueEntry>,
135        cx: &mut Context<'_>,
136    ) -> Poll<Option<T>>;
137
138    fn remove_receive_waiter(
139        &self,
140        wait_node: &mut ListNode<RecvWaitQueueEntry>,
141    );
142}
143
144/// A Future that is returned by the `receive` function on a channel.
145/// The future gets resolved with `Some(value)` when a value could be
146/// received from the channel.
147/// If the channels gets closed and no items are still enqueued inside the
148/// channel, the future will resolve to `None`.
149#[must_use = "futures do nothing unless polled"]
150pub struct ChannelReceiveFuture<'a, MutexType, T> {
151    /// The channel that is associated with this ChannelReceiveFuture
152    pub(crate) channel: Option<&'a dyn ChannelReceiveAccess<T>>,
153    /// Node for waiting on the channel
154    pub(crate) wait_node: ListNode<RecvWaitQueueEntry>,
155    /// Marker for mutex type
156    pub(crate) _phantom: PhantomData<MutexType>,
157}
158
159// Safety: Channel futures can be sent between threads as long as the underlying
160// channel is thread-safe (Sync), which allows to poll/register/unregister from
161// a different thread.
162unsafe impl<'a, MutexType: Sync, T: Send> Send
163    for ChannelReceiveFuture<'a, MutexType, T>
164{
165}
166
167impl<'a, MutexType, T> core::fmt::Debug
168    for ChannelReceiveFuture<'a, MutexType, T>
169{
170    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
171        f.debug_struct("ChannelReceiveFuture").finish()
172    }
173}
174
175impl<'a, MutexType, T> Future for ChannelReceiveFuture<'a, MutexType, T> {
176    type Output = Option<T>;
177
178    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
179        // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
180        // However this didn't seem to work for some borrow checker reasons
181
182        // Safety: The next operations are safe, because Pin promises us that
183        // the address of the wait queue entry inside ChannelReceiveFuture is stable,
184        // and we don't move any fields inside the future until it gets dropped.
185        let mut_self: &mut ChannelReceiveFuture<MutexType, T> =
186            unsafe { Pin::get_unchecked_mut(self) };
187
188        let channel = mut_self
189            .channel
190            .expect("polled ChannelReceiveFuture after completion");
191
192        let poll_res =
193            unsafe { channel.receive_or_register(&mut mut_self.wait_node, cx) };
194
195        if poll_res.is_ready() {
196            // A value was available
197            mut_self.channel = None;
198        }
199
200        poll_res
201    }
202}
203
204impl<'a, MutexType, T> FusedFuture for ChannelReceiveFuture<'a, MutexType, T> {
205    fn is_terminated(&self) -> bool {
206        self.channel.is_none()
207    }
208}
209
210impl<'a, MutexType, T> Drop for ChannelReceiveFuture<'a, MutexType, T> {
211    fn drop(&mut self) {
212        // If this ChannelReceiveFuture has been polled and it was added to the
213        // wait queue at the channel, it must be removed before dropping.
214        // Otherwise the channel would access invalid memory.
215        if let Some(channel) = self.channel {
216            channel.remove_receive_waiter(&mut self.wait_node);
217        }
218    }
219}
220
221/// A Future that is returned by the `send` function on a channel.
222/// The future gets resolved with `None` when a value could be
223/// written to the channel.
224/// If the channel gets closed the send operation will fail, and the
225/// Future will resolve to `ChannelSendError(T)` and return the item to send.
226#[must_use = "futures do nothing unless polled"]
227pub struct ChannelSendFuture<'a, MutexType, T> {
228    /// The Channel that is associated with this ChannelSendFuture
229    pub(crate) channel: Option<&'a dyn ChannelSendAccess<T>>,
230    /// Node for waiting on the channel
231    pub(crate) wait_node: ListNode<SendWaitQueueEntry<T>>,
232    /// Marker for mutex type
233    pub(crate) _phantom: PhantomData<MutexType>,
234}
235
236// Safety: Channel futures can be sent between threads as long as the underlying
237// channel is thread-safe (Sync), which allows to poll/register/unregister from
238// a different thread.
239unsafe impl<'a, MutexType: Sync, T: Send> Send
240    for ChannelSendFuture<'a, MutexType, T>
241{
242}
243
244impl<'a, MutexType, T> core::fmt::Debug
245    for ChannelSendFuture<'a, MutexType, T>
246{
247    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
248        f.debug_struct("ChannelSendFuture").finish()
249    }
250}
251
252impl<'a, MutexType, T> ChannelSendFuture<'a, MutexType, T> {
253    /// Tries to cancel the ongoing send operation
254    pub fn cancel(&mut self) -> Option<T> {
255        let channel = self.channel.take();
256        match channel {
257            None => None,
258            Some(channel) => {
259                channel.remove_send_waiter(&mut self.wait_node);
260                self.wait_node.value.take()
261            }
262        }
263    }
264}
265
266impl<'a, MutexType, T> Future for ChannelSendFuture<'a, MutexType, T> {
267    type Output = Result<(), ChannelSendError<T>>;
268
269    fn poll(
270        self: Pin<&mut Self>,
271        cx: &mut Context<'_>,
272    ) -> Poll<Result<(), ChannelSendError<T>>> {
273        // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
274        // However this didn't seem to work for some borrow checker reasons
275
276        // Safety: The next operations are safe, because Pin promises us that
277        // the address of the wait queue entry inside ChannelSendFuture is stable,
278        // and we don't move any fields inside the future until it gets dropped.
279        let mut_self: &mut ChannelSendFuture<MutexType, T> =
280            unsafe { Pin::get_unchecked_mut(self) };
281
282        let channel = mut_self
283            .channel
284            .expect("polled ChannelSendFuture after completion");
285
286        let send_res =
287            unsafe { channel.send_or_register(&mut mut_self.wait_node, cx) };
288
289        match send_res.0 {
290            Poll::Ready(()) => {
291                // Value has been transmitted or channel was closed
292                mut_self.channel = None;
293                match send_res.1 {
294                    Some(v) => {
295                        // Channel must have been closed
296                        Poll::Ready(Err(ChannelSendError(v)))
297                    }
298                    None => Poll::Ready(Ok(())),
299                }
300            }
301            Poll::Pending => Poll::Pending,
302        }
303    }
304}
305
306impl<'a, MutexType, T> FusedFuture for ChannelSendFuture<'a, MutexType, T> {
307    fn is_terminated(&self) -> bool {
308        self.channel.is_none()
309    }
310}
311
312impl<'a, MutexType, T> Drop for ChannelSendFuture<'a, MutexType, T> {
313    fn drop(&mut self) {
314        // If this ChannelSendFuture has been polled and it was added to the
315        // wait queue at the channel, it must be removed before dropping.
316        // Otherwise the channel would access invalid memory.
317        if let Some(channel) = self.channel {
318            channel.remove_send_waiter(&mut self.wait_node);
319        }
320    }
321}
322
323#[cfg(feature = "alloc")]
324mod if_alloc {
325    use super::*;
326
327    pub mod shared {
328        use super::*;
329
330        /// A Future that is returned by the `receive` function on a channel.
331        /// The future gets resolved with `Some(value)` when a value could be
332        /// received from the channel.
333        /// If the channels gets closed and no items are still enqueued inside the
334        /// channel, the future will resolve to `None`.
335        #[must_use = "futures do nothing unless polled"]
336        pub struct ChannelReceiveFuture<MutexType, T> {
337            /// The Channel that is associated with this ChannelReceiveFuture
338            pub(crate) channel:
339                Option<alloc::sync::Arc<dyn ChannelReceiveAccess<T>>>,
340            /// Node for waiting on the channel
341            pub(crate) wait_node: ListNode<RecvWaitQueueEntry>,
342            /// Marker for mutex type
343            pub(crate) _phantom: PhantomData<MutexType>,
344        }
345
346        // Safety: Channel futures can be sent between threads as long as the underlying
347        // channel is thread-safe (Sync), which allows to poll/register/unregister from
348        // a different thread.
349        unsafe impl<MutexType: Sync, T: Send> Send
350            for ChannelReceiveFuture<MutexType, T>
351        {
352        }
353
354        impl<MutexType, T> core::fmt::Debug for ChannelReceiveFuture<MutexType, T> {
355            fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
356                f.debug_struct("ChannelReceiveFuture").finish()
357            }
358        }
359
360        impl<MutexType, T> Future for ChannelReceiveFuture<MutexType, T> {
361            type Output = Option<T>;
362
363            fn poll(
364                self: Pin<&mut Self>,
365                cx: &mut Context<'_>,
366            ) -> Poll<Option<T>> {
367                // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
368                // However this didn't seem to work for some borrow checker reasons
369
370                // Safety: The next operations are safe, because Pin promises us that
371                // the address of the wait queue entry inside ChannelReceiveFuture is stable,
372                // and we don't move any fields inside the future until it gets dropped.
373                let mut_self: &mut ChannelReceiveFuture<MutexType, T> =
374                    unsafe { Pin::get_unchecked_mut(self) };
375
376                let channel = mut_self
377                    .channel
378                    .take()
379                    .expect("polled ChannelReceiveFuture after completion");
380
381                let poll_res = unsafe {
382                    channel.receive_or_register(&mut mut_self.wait_node, cx)
383                };
384
385                if poll_res.is_ready() {
386                    // A value was available
387                    mut_self.channel = None;
388                } else {
389                    mut_self.channel = Some(channel)
390                }
391
392                poll_res
393            }
394        }
395
396        impl<MutexType, T> FusedFuture for ChannelReceiveFuture<MutexType, T> {
397            fn is_terminated(&self) -> bool {
398                self.channel.is_none()
399            }
400        }
401
402        impl<MutexType, T> Drop for ChannelReceiveFuture<MutexType, T> {
403            fn drop(&mut self) {
404                // If this ChannelReceiveFuture has been polled and it was added to the
405                // wait queue at the channel, it must be removed before dropping.
406                // Otherwise the channel would access invalid memory.
407                if let Some(channel) = &self.channel {
408                    channel.remove_receive_waiter(&mut self.wait_node);
409                }
410            }
411        }
412
413        /// A Future that is returned by the `send` function on a channel.
414        /// The future gets resolved with `None` when a value could be
415        /// written to the channel.
416        /// If the channel gets closed the send operation will fail, and the
417        /// Future will resolve to `ChannelSendError(T)` and return the item
418        /// to send.
419        #[must_use = "futures do nothing unless polled"]
420        pub struct ChannelSendFuture<MutexType, T> {
421            /// The LocalChannel that is associated with this ChannelSendFuture
422            pub(crate) channel:
423                Option<alloc::sync::Arc<dyn ChannelSendAccess<T>>>,
424            /// Node for waiting on the channel
425            pub(crate) wait_node: ListNode<SendWaitQueueEntry<T>>,
426            /// Marker for mutex type
427            pub(crate) _phantom: PhantomData<MutexType>,
428        }
429
430        impl<MutexType, T> ChannelSendFuture<MutexType, T> {
431            /// Tries to cancel the ongoing send operation
432            pub fn cancel(&mut self) -> Option<T> {
433                let channel = self.channel.take();
434                match channel {
435                    None => None,
436                    Some(channel) => {
437                        channel.remove_send_waiter(&mut self.wait_node);
438                        self.wait_node.value.take()
439                    }
440                }
441            }
442        }
443
444        // Safety: Channel futures can be sent between threads as long as the underlying
445        // channel is thread-safe (Sync), which allows to poll/register/unregister from
446        // a different thread.
447        unsafe impl<MutexType: Sync, T: Send> Send for ChannelSendFuture<MutexType, T> {}
448
449        impl<MutexType, T> core::fmt::Debug for ChannelSendFuture<MutexType, T> {
450            fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
451                f.debug_struct("ChannelSendFuture").finish()
452            }
453        }
454
455        impl<MutexType, T> Future for ChannelSendFuture<MutexType, T> {
456            type Output = Result<(), ChannelSendError<T>>;
457
458            fn poll(
459                self: Pin<&mut Self>,
460                cx: &mut Context<'_>,
461            ) -> Poll<Result<(), ChannelSendError<T>>> {
462                // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
463                // However this didn't seem to work for some borrow checker reasons
464
465                // Safety: The next operations are safe, because Pin promises us that
466                // the address of the wait queue entry inside ChannelSendFuture is stable,
467                // and we don't move any fields inside the future until it gets dropped.
468                let mut_self: &mut ChannelSendFuture<MutexType, T> =
469                    unsafe { Pin::get_unchecked_mut(self) };
470
471                let channel = mut_self
472                    .channel
473                    .take()
474                    .expect("polled ChannelSendFuture after completion");
475
476                let send_res = unsafe {
477                    channel.send_or_register(&mut mut_self.wait_node, cx)
478                };
479
480                match send_res.0 {
481                    Poll::Ready(()) => {
482                        // Value has been transmitted or channel was closed
483                        match send_res.1 {
484                            Some(v) => {
485                                // Channel must have been closed
486                                Poll::Ready(Err(ChannelSendError(v)))
487                            }
488                            None => Poll::Ready(Ok(())),
489                        }
490                    }
491                    Poll::Pending => {
492                        mut_self.channel = Some(channel);
493                        Poll::Pending
494                    }
495                }
496            }
497        }
498
499        impl<MutexType, T> FusedFuture for ChannelSendFuture<MutexType, T> {
500            fn is_terminated(&self) -> bool {
501                self.channel.is_none()
502            }
503        }
504
505        impl<MutexType, T> Drop for ChannelSendFuture<MutexType, T> {
506            fn drop(&mut self) {
507                // If this ChannelSendFuture has been polled and it was added to the
508                // wait queue at the channel, it must be removed before dropping.
509                // Otherwise the channel would access invalid memory.
510                if let Some(channel) = &self.channel {
511                    channel.remove_send_waiter(&mut self.wait_node);
512                }
513            }
514        }
515    }
516}
517
518#[cfg(feature = "alloc")]
519pub use self::if_alloc::*;