send_wrapper/
futures.rs

1//! [`Future`] and [`Stream`] support for [`SendWrapper`].
2use std::{
3	future::Future,
4	ops::{Deref as _, DerefMut as _},
5	pin::Pin,
6	task,
7};
8
9use futures_core::Stream;
10
11use crate::SendWrapper;
12
13impl<F: Future> Future for SendWrapper<F> {
14	type Output = F::Output;
15
16	/// Polls this [`SendWrapper`] [`Future`].
17	///
18	/// # Panics
19	///
20	/// Polling panics if it is done from a different thread than the one the [`SendWrapper`]
21	/// instance has been created with.
22	#[track_caller]
23	fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
24		self.assert_valid_for_poll();
25		// This is safe as `SendWrapper` itself points to the inner `Future`.
26		// So, as long as `SendWrapper` is pinned, the inner `Future` is pinned too.
27		unsafe { self.map_unchecked_mut(Self::deref_mut) }.poll(cx)
28	}
29}
30
31impl<S: Stream> Stream for SendWrapper<S> {
32	type Item = S::Item;
33
34	/// Polls this [`SendWrapper`] [`Stream`].
35	///
36	/// # Panics
37	///
38	/// Polling panics if it is done from a different thread than the one the [`SendWrapper`]
39	/// instance has been created with.
40	#[track_caller]
41	fn poll_next(
42		self: Pin<&mut Self>,
43		cx: &mut task::Context<'_>,
44	) -> task::Poll<Option<Self::Item>> {
45		self.assert_valid_for_poll();
46		// This is safe as `SendWrapper` itself points to the inner `Stream`.
47		// So, as long as `SendWrapper` is pinned, the inner `Stream` is pinned too.
48		unsafe { self.map_unchecked_mut(Self::deref_mut) }.poll_next(cx)
49	}
50
51	#[inline]
52	fn size_hint(&self) -> (usize, Option<usize>) {
53		self.deref().size_hint()
54	}
55}
56
57#[cfg(test)]
58mod tests {
59	use std::thread;
60
61	use futures_executor as executor;
62	use futures_util::{future, stream, StreamExt};
63
64	use crate::SendWrapper;
65
66	#[test]
67	fn test_future() {
68		let w1 = SendWrapper::new(future::ready(42));
69		let w2 = w1.clone();
70		assert_eq!(
71			format!("{:?}", executor::block_on(w1)),
72			format!("{:?}", executor::block_on(w2)),
73		);
74	}
75
76	#[test]
77	fn test_future_panic() {
78		let w = SendWrapper::new(future::ready(42));
79		let t = thread::spawn(move || executor::block_on(w));
80		assert!(t.join().is_err());
81	}
82
83	#[test]
84	fn test_stream() {
85		let mut w1 = SendWrapper::new(stream::once(future::ready(42)));
86		let mut w2 = SendWrapper::new(stream::once(future::ready(42)));
87		assert_eq!(
88			format!("{:?}", executor::block_on(w1.next())),
89			format!("{:?}", executor::block_on(w2.next())),
90		);
91	}
92
93	#[test]
94	fn test_stream_panic() {
95		let mut w = SendWrapper::new(stream::once(future::ready(42)));
96		let t = thread::spawn(move || executor::block_on(w.next()));
97		assert!(t.join().is_err());
98	}
99}