futures_intrusive/channel/channel_future.rs
1use super::ChannelSendError;
2use crate::intrusive_double_linked_list::ListNode;
3use core::marker::PhantomData;
4use core::pin::Pin;
5use futures_core::future::{FusedFuture, Future};
6use futures_core::task::{Context, Poll, Waker};
7
8/// Conveys additional information regarding the status of a channel
9/// following a `close` operation.
10#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
11pub enum CloseStatus {
12 /// The channel has just been closed by the operation.
13 NewlyClosed,
14
15 /// The channel was already closed prior to the operation.
16 AlreadyClosed,
17}
18
19impl CloseStatus {
20 /// Returns whether the value is the `NewlyClosed` variant.
21 pub fn is_newly_closed(self) -> bool {
22 match self {
23 Self::NewlyClosed => true,
24 _ => false,
25 }
26 }
27
28 /// Returns whether the value is the `AlreadyClosed` variant.
29 pub fn is_already_closed(self) -> bool {
30 match self {
31 Self::AlreadyClosed => true,
32 _ => false,
33 }
34 }
35}
36
37/// Tracks how the future had interacted with the channel
38#[derive(PartialEq, Debug)]
39pub enum RecvPollState {
40 /// The task is not registered at the wait queue at the channel
41 Unregistered,
42 /// The task was added to the wait queue at the channel.
43 Registered,
44 /// The task was notified that a value is available or can be sent,
45 /// but hasn't interacted with the channel since then
46 Notified,
47}
48
49/// Tracks the channel futures waiting state.
50/// Access to this struct is synchronized through the channel.
51#[derive(Debug)]
52pub struct RecvWaitQueueEntry {
53 /// The task handle of the waiting task
54 pub task: Option<Waker>,
55 /// Current polling state
56 pub state: RecvPollState,
57}
58
59impl RecvWaitQueueEntry {
60 /// Creates a new RecvWaitQueueEntry
61 pub fn new() -> RecvWaitQueueEntry {
62 RecvWaitQueueEntry {
63 task: None,
64 state: RecvPollState::Unregistered,
65 }
66 }
67}
68
69/// Tracks how the future had interacted with the channel
70#[derive(PartialEq, Debug)]
71pub enum SendPollState {
72 /// The task is not registered at the wait queue at the channel
73 Unregistered,
74 /// The task was added to the wait queue at the channel.
75 Registered,
76 /// The value has been transmitted to the other task
77 SendComplete,
78}
79
80/// Tracks the channel futures waiting state.
81/// Access to this struct is synchronized through the channel.
82pub struct SendWaitQueueEntry<T> {
83 /// The task handle of the waiting task
84 pub task: Option<Waker>,
85 /// Current polling state
86 pub state: SendPollState,
87 /// The value to send
88 pub value: Option<T>,
89}
90
91impl<T> core::fmt::Debug for SendWaitQueueEntry<T> {
92 fn fmt(
93 &self,
94 fmt: &mut core::fmt::Formatter<'_>,
95 ) -> core::result::Result<(), core::fmt::Error> {
96 fmt.debug_struct("SendWaitQueueEntry")
97 .field("task", &self.task)
98 .field("state", &self.state)
99 .finish()
100 }
101}
102
103impl<T> SendWaitQueueEntry<T> {
104 /// Creates a new SendWaitQueueEntry
105 pub fn new(value: T) -> SendWaitQueueEntry<T> {
106 SendWaitQueueEntry {
107 task: None,
108 state: SendPollState::Unregistered,
109 value: Some(value),
110 }
111 }
112}
113
114/// Adapter trait that allows Futures to generically interact with Channel
115/// implementations via dynamic dispatch.
116pub trait ChannelSendAccess<T> {
117 unsafe fn send_or_register(
118 &self,
119 wait_node: &mut ListNode<SendWaitQueueEntry<T>>,
120 cx: &mut Context<'_>,
121 ) -> (Poll<()>, Option<T>);
122
123 fn remove_send_waiter(
124 &self,
125 wait_node: &mut ListNode<SendWaitQueueEntry<T>>,
126 );
127}
128
129/// Adapter trait that allows Futures to generically interact with Channel
130/// implementations via dynamic dispatch.
131pub trait ChannelReceiveAccess<T> {
132 unsafe fn receive_or_register(
133 &self,
134 wait_node: &mut ListNode<RecvWaitQueueEntry>,
135 cx: &mut Context<'_>,
136 ) -> Poll<Option<T>>;
137
138 fn remove_receive_waiter(
139 &self,
140 wait_node: &mut ListNode<RecvWaitQueueEntry>,
141 );
142}
143
144/// A Future that is returned by the `receive` function on a channel.
145/// The future gets resolved with `Some(value)` when a value could be
146/// received from the channel.
147/// If the channels gets closed and no items are still enqueued inside the
148/// channel, the future will resolve to `None`.
149#[must_use = "futures do nothing unless polled"]
150pub struct ChannelReceiveFuture<'a, MutexType, T> {
151 /// The channel that is associated with this ChannelReceiveFuture
152 pub(crate) channel: Option<&'a dyn ChannelReceiveAccess<T>>,
153 /// Node for waiting on the channel
154 pub(crate) wait_node: ListNode<RecvWaitQueueEntry>,
155 /// Marker for mutex type
156 pub(crate) _phantom: PhantomData<MutexType>,
157}
158
159// Safety: Channel futures can be sent between threads as long as the underlying
160// channel is thread-safe (Sync), which allows to poll/register/unregister from
161// a different thread.
162unsafe impl<'a, MutexType: Sync, T: Send> Send
163 for ChannelReceiveFuture<'a, MutexType, T>
164{
165}
166
167impl<'a, MutexType, T> core::fmt::Debug
168 for ChannelReceiveFuture<'a, MutexType, T>
169{
170 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
171 f.debug_struct("ChannelReceiveFuture").finish()
172 }
173}
174
175impl<'a, MutexType, T> Future for ChannelReceiveFuture<'a, MutexType, T> {
176 type Output = Option<T>;
177
178 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
179 // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
180 // However this didn't seem to work for some borrow checker reasons
181
182 // Safety: The next operations are safe, because Pin promises us that
183 // the address of the wait queue entry inside ChannelReceiveFuture is stable,
184 // and we don't move any fields inside the future until it gets dropped.
185 let mut_self: &mut ChannelReceiveFuture<MutexType, T> =
186 unsafe { Pin::get_unchecked_mut(self) };
187
188 let channel = mut_self
189 .channel
190 .expect("polled ChannelReceiveFuture after completion");
191
192 let poll_res =
193 unsafe { channel.receive_or_register(&mut mut_self.wait_node, cx) };
194
195 if poll_res.is_ready() {
196 // A value was available
197 mut_self.channel = None;
198 }
199
200 poll_res
201 }
202}
203
204impl<'a, MutexType, T> FusedFuture for ChannelReceiveFuture<'a, MutexType, T> {
205 fn is_terminated(&self) -> bool {
206 self.channel.is_none()
207 }
208}
209
210impl<'a, MutexType, T> Drop for ChannelReceiveFuture<'a, MutexType, T> {
211 fn drop(&mut self) {
212 // If this ChannelReceiveFuture has been polled and it was added to the
213 // wait queue at the channel, it must be removed before dropping.
214 // Otherwise the channel would access invalid memory.
215 if let Some(channel) = self.channel {
216 channel.remove_receive_waiter(&mut self.wait_node);
217 }
218 }
219}
220
221/// A Future that is returned by the `send` function on a channel.
222/// The future gets resolved with `None` when a value could be
223/// written to the channel.
224/// If the channel gets closed the send operation will fail, and the
225/// Future will resolve to `ChannelSendError(T)` and return the item to send.
226#[must_use = "futures do nothing unless polled"]
227pub struct ChannelSendFuture<'a, MutexType, T> {
228 /// The Channel that is associated with this ChannelSendFuture
229 pub(crate) channel: Option<&'a dyn ChannelSendAccess<T>>,
230 /// Node for waiting on the channel
231 pub(crate) wait_node: ListNode<SendWaitQueueEntry<T>>,
232 /// Marker for mutex type
233 pub(crate) _phantom: PhantomData<MutexType>,
234}
235
236// Safety: Channel futures can be sent between threads as long as the underlying
237// channel is thread-safe (Sync), which allows to poll/register/unregister from
238// a different thread.
239unsafe impl<'a, MutexType: Sync, T: Send> Send
240 for ChannelSendFuture<'a, MutexType, T>
241{
242}
243
244impl<'a, MutexType, T> core::fmt::Debug
245 for ChannelSendFuture<'a, MutexType, T>
246{
247 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
248 f.debug_struct("ChannelSendFuture").finish()
249 }
250}
251
252impl<'a, MutexType, T> ChannelSendFuture<'a, MutexType, T> {
253 /// Tries to cancel the ongoing send operation
254 pub fn cancel(&mut self) -> Option<T> {
255 let channel = self.channel.take();
256 match channel {
257 None => None,
258 Some(channel) => {
259 channel.remove_send_waiter(&mut self.wait_node);
260 self.wait_node.value.take()
261 }
262 }
263 }
264}
265
266impl<'a, MutexType, T> Future for ChannelSendFuture<'a, MutexType, T> {
267 type Output = Result<(), ChannelSendError<T>>;
268
269 fn poll(
270 self: Pin<&mut Self>,
271 cx: &mut Context<'_>,
272 ) -> Poll<Result<(), ChannelSendError<T>>> {
273 // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
274 // However this didn't seem to work for some borrow checker reasons
275
276 // Safety: The next operations are safe, because Pin promises us that
277 // the address of the wait queue entry inside ChannelSendFuture is stable,
278 // and we don't move any fields inside the future until it gets dropped.
279 let mut_self: &mut ChannelSendFuture<MutexType, T> =
280 unsafe { Pin::get_unchecked_mut(self) };
281
282 let channel = mut_self
283 .channel
284 .expect("polled ChannelSendFuture after completion");
285
286 let send_res =
287 unsafe { channel.send_or_register(&mut mut_self.wait_node, cx) };
288
289 match send_res.0 {
290 Poll::Ready(()) => {
291 // Value has been transmitted or channel was closed
292 mut_self.channel = None;
293 match send_res.1 {
294 Some(v) => {
295 // Channel must have been closed
296 Poll::Ready(Err(ChannelSendError(v)))
297 }
298 None => Poll::Ready(Ok(())),
299 }
300 }
301 Poll::Pending => Poll::Pending,
302 }
303 }
304}
305
306impl<'a, MutexType, T> FusedFuture for ChannelSendFuture<'a, MutexType, T> {
307 fn is_terminated(&self) -> bool {
308 self.channel.is_none()
309 }
310}
311
312impl<'a, MutexType, T> Drop for ChannelSendFuture<'a, MutexType, T> {
313 fn drop(&mut self) {
314 // If this ChannelSendFuture has been polled and it was added to the
315 // wait queue at the channel, it must be removed before dropping.
316 // Otherwise the channel would access invalid memory.
317 if let Some(channel) = self.channel {
318 channel.remove_send_waiter(&mut self.wait_node);
319 }
320 }
321}
322
323#[cfg(feature = "alloc")]
324mod if_alloc {
325 use super::*;
326
327 pub mod shared {
328 use super::*;
329
330 /// A Future that is returned by the `receive` function on a channel.
331 /// The future gets resolved with `Some(value)` when a value could be
332 /// received from the channel.
333 /// If the channels gets closed and no items are still enqueued inside the
334 /// channel, the future will resolve to `None`.
335 #[must_use = "futures do nothing unless polled"]
336 pub struct ChannelReceiveFuture<MutexType, T> {
337 /// The Channel that is associated with this ChannelReceiveFuture
338 pub(crate) channel:
339 Option<alloc::sync::Arc<dyn ChannelReceiveAccess<T>>>,
340 /// Node for waiting on the channel
341 pub(crate) wait_node: ListNode<RecvWaitQueueEntry>,
342 /// Marker for mutex type
343 pub(crate) _phantom: PhantomData<MutexType>,
344 }
345
346 // Safety: Channel futures can be sent between threads as long as the underlying
347 // channel is thread-safe (Sync), which allows to poll/register/unregister from
348 // a different thread.
349 unsafe impl<MutexType: Sync, T: Send> Send
350 for ChannelReceiveFuture<MutexType, T>
351 {
352 }
353
354 impl<MutexType, T> core::fmt::Debug for ChannelReceiveFuture<MutexType, T> {
355 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
356 f.debug_struct("ChannelReceiveFuture").finish()
357 }
358 }
359
360 impl<MutexType, T> Future for ChannelReceiveFuture<MutexType, T> {
361 type Output = Option<T>;
362
363 fn poll(
364 self: Pin<&mut Self>,
365 cx: &mut Context<'_>,
366 ) -> Poll<Option<T>> {
367 // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
368 // However this didn't seem to work for some borrow checker reasons
369
370 // Safety: The next operations are safe, because Pin promises us that
371 // the address of the wait queue entry inside ChannelReceiveFuture is stable,
372 // and we don't move any fields inside the future until it gets dropped.
373 let mut_self: &mut ChannelReceiveFuture<MutexType, T> =
374 unsafe { Pin::get_unchecked_mut(self) };
375
376 let channel = mut_self
377 .channel
378 .take()
379 .expect("polled ChannelReceiveFuture after completion");
380
381 let poll_res = unsafe {
382 channel.receive_or_register(&mut mut_self.wait_node, cx)
383 };
384
385 if poll_res.is_ready() {
386 // A value was available
387 mut_self.channel = None;
388 } else {
389 mut_self.channel = Some(channel)
390 }
391
392 poll_res
393 }
394 }
395
396 impl<MutexType, T> FusedFuture for ChannelReceiveFuture<MutexType, T> {
397 fn is_terminated(&self) -> bool {
398 self.channel.is_none()
399 }
400 }
401
402 impl<MutexType, T> Drop for ChannelReceiveFuture<MutexType, T> {
403 fn drop(&mut self) {
404 // If this ChannelReceiveFuture has been polled and it was added to the
405 // wait queue at the channel, it must be removed before dropping.
406 // Otherwise the channel would access invalid memory.
407 if let Some(channel) = &self.channel {
408 channel.remove_receive_waiter(&mut self.wait_node);
409 }
410 }
411 }
412
413 /// A Future that is returned by the `send` function on a channel.
414 /// The future gets resolved with `None` when a value could be
415 /// written to the channel.
416 /// If the channel gets closed the send operation will fail, and the
417 /// Future will resolve to `ChannelSendError(T)` and return the item
418 /// to send.
419 #[must_use = "futures do nothing unless polled"]
420 pub struct ChannelSendFuture<MutexType, T> {
421 /// The LocalChannel that is associated with this ChannelSendFuture
422 pub(crate) channel:
423 Option<alloc::sync::Arc<dyn ChannelSendAccess<T>>>,
424 /// Node for waiting on the channel
425 pub(crate) wait_node: ListNode<SendWaitQueueEntry<T>>,
426 /// Marker for mutex type
427 pub(crate) _phantom: PhantomData<MutexType>,
428 }
429
430 impl<MutexType, T> ChannelSendFuture<MutexType, T> {
431 /// Tries to cancel the ongoing send operation
432 pub fn cancel(&mut self) -> Option<T> {
433 let channel = self.channel.take();
434 match channel {
435 None => None,
436 Some(channel) => {
437 channel.remove_send_waiter(&mut self.wait_node);
438 self.wait_node.value.take()
439 }
440 }
441 }
442 }
443
444 // Safety: Channel futures can be sent between threads as long as the underlying
445 // channel is thread-safe (Sync), which allows to poll/register/unregister from
446 // a different thread.
447 unsafe impl<MutexType: Sync, T: Send> Send for ChannelSendFuture<MutexType, T> {}
448
449 impl<MutexType, T> core::fmt::Debug for ChannelSendFuture<MutexType, T> {
450 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
451 f.debug_struct("ChannelSendFuture").finish()
452 }
453 }
454
455 impl<MutexType, T> Future for ChannelSendFuture<MutexType, T> {
456 type Output = Result<(), ChannelSendError<T>>;
457
458 fn poll(
459 self: Pin<&mut Self>,
460 cx: &mut Context<'_>,
461 ) -> Poll<Result<(), ChannelSendError<T>>> {
462 // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
463 // However this didn't seem to work for some borrow checker reasons
464
465 // Safety: The next operations are safe, because Pin promises us that
466 // the address of the wait queue entry inside ChannelSendFuture is stable,
467 // and we don't move any fields inside the future until it gets dropped.
468 let mut_self: &mut ChannelSendFuture<MutexType, T> =
469 unsafe { Pin::get_unchecked_mut(self) };
470
471 let channel = mut_self
472 .channel
473 .take()
474 .expect("polled ChannelSendFuture after completion");
475
476 let send_res = unsafe {
477 channel.send_or_register(&mut mut_self.wait_node, cx)
478 };
479
480 match send_res.0 {
481 Poll::Ready(()) => {
482 // Value has been transmitted or channel was closed
483 match send_res.1 {
484 Some(v) => {
485 // Channel must have been closed
486 Poll::Ready(Err(ChannelSendError(v)))
487 }
488 None => Poll::Ready(Ok(())),
489 }
490 }
491 Poll::Pending => {
492 mut_self.channel = Some(channel);
493 Poll::Pending
494 }
495 }
496 }
497 }
498
499 impl<MutexType, T> FusedFuture for ChannelSendFuture<MutexType, T> {
500 fn is_terminated(&self) -> bool {
501 self.channel.is_none()
502 }
503 }
504
505 impl<MutexType, T> Drop for ChannelSendFuture<MutexType, T> {
506 fn drop(&mut self) {
507 // If this ChannelSendFuture has been polled and it was added to the
508 // wait queue at the channel, it must be removed before dropping.
509 // Otherwise the channel would access invalid memory.
510 if let Some(channel) = &self.channel {
511 channel.remove_send_waiter(&mut self.wait_node);
512 }
513 }
514 }
515 }
516}
517
518#[cfg(feature = "alloc")]
519pub use self::if_alloc::*;