compio_send_wrapper/
futures.rs1use std::{future::Future, pin::Pin, task};
3
4use futures_core::Stream;
5
6use crate::{SendWrapper, invalid_deref, invalid_poll};
7
8impl<F: Future> Future for SendWrapper<F> {
9 type Output = F::Output;
10
11 #[track_caller]
18 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
19 self.get_pinned_mut()
20 .unwrap_or_else(|| invalid_poll())
21 .poll(cx)
22 }
23}
24
25impl<S: Stream> Stream for SendWrapper<S> {
26 type Item = S::Item;
27
28 #[track_caller]
35 fn poll_next(
36 self: Pin<&mut Self>,
37 cx: &mut task::Context<'_>,
38 ) -> task::Poll<Option<Self::Item>> {
39 self.get_pinned_mut()
40 .unwrap_or_else(|| invalid_poll())
41 .poll_next(cx)
42 }
43
44 #[inline]
45 fn size_hint(&self) -> (usize, Option<usize>) {
46 self.get().unwrap_or_else(|| invalid_deref()).size_hint()
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use std::thread;
53
54 use futures_executor as executor;
55 use futures_util::{StreamExt, future, stream};
56
57 use crate::SendWrapper;
58
59 #[test]
60 fn test_future() {
61 let w1 = SendWrapper::new(future::ready(42));
62 let w2 = w1.clone();
63 assert_eq!(
64 format!("{:?}", executor::block_on(w1)),
65 format!("{:?}", executor::block_on(w2)),
66 );
67 }
68
69 #[test]
70 fn test_future_panic() {
71 let w = SendWrapper::new(future::ready(42));
72 let t = thread::spawn(move || executor::block_on(w));
73 assert!(t.join().is_err());
74 }
75
76 #[test]
77 fn test_stream() {
78 let mut w1 = SendWrapper::new(stream::once(future::ready(42)));
79 let mut w2 = SendWrapper::new(stream::once(future::ready(42)));
80 assert_eq!(
81 format!("{:?}", executor::block_on(w1.next())),
82 format!("{:?}", executor::block_on(w2.next())),
83 );
84 }
85
86 #[test]
87 fn test_stream_panic() {
88 let mut w = SendWrapper::new(stream::once(future::ready(42)));
89 let t = thread::spawn(move || executor::block_on(w.next()));
90 assert!(t.join().is_err());
91 }
92}