1use std::{
3 future::Future,
4 ops::{Deref as _, DerefMut as _},
5 pin::Pin,
6 task,
7};
8
9use futures_core::Stream;
10
11use crate::SendWrapper;
12
13impl<F: Future> Future for SendWrapper<F> {
14 type Output = F::Output;
15
16 #[track_caller]
23 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
24 self.assert_valid_for_poll();
25 unsafe { self.map_unchecked_mut(Self::deref_mut) }.poll(cx)
28 }
29}
30
31impl<S: Stream> Stream for SendWrapper<S> {
32 type Item = S::Item;
33
34 #[track_caller]
41 fn poll_next(
42 self: Pin<&mut Self>,
43 cx: &mut task::Context<'_>,
44 ) -> task::Poll<Option<Self::Item>> {
45 self.assert_valid_for_poll();
46 unsafe { self.map_unchecked_mut(Self::deref_mut) }.poll_next(cx)
49 }
50
51 #[inline]
52 fn size_hint(&self) -> (usize, Option<usize>) {
53 self.deref().size_hint()
54 }
55}
56
57#[cfg(test)]
58mod tests {
59 use std::thread;
60
61 use futures_executor as executor;
62 use futures_util::{future, stream, StreamExt};
63
64 use crate::SendWrapper;
65
66 #[test]
67 fn test_future() {
68 let w1 = SendWrapper::new(future::ready(42));
69 let w2 = w1.clone();
70 assert_eq!(
71 format!("{:?}", executor::block_on(w1)),
72 format!("{:?}", executor::block_on(w2)),
73 );
74 }
75
76 #[test]
77 fn test_future_panic() {
78 let w = SendWrapper::new(future::ready(42));
79 let t = thread::spawn(move || executor::block_on(w));
80 assert!(t.join().is_err());
81 }
82
83 #[test]
84 fn test_stream() {
85 let mut w1 = SendWrapper::new(stream::once(future::ready(42)));
86 let mut w2 = SendWrapper::new(stream::once(future::ready(42)));
87 assert_eq!(
88 format!("{:?}", executor::block_on(w1.next())),
89 format!("{:?}", executor::block_on(w2.next())),
90 );
91 }
92
93 #[test]
94 fn test_stream_panic() {
95 let mut w = SendWrapper::new(stream::once(future::ready(42)));
96 let t = thread::spawn(move || executor::block_on(w.next()));
97 assert!(t.join().is_err());
98 }
99}