Skip to main content

compio_send_wrapper/
futures.rs

1//! [`Future`] and [`Stream`] support for [`SendWrapper`].
2use std::{future::Future, pin::Pin, task};
3
4use futures_core::Stream;
5
6use crate::{SendWrapper, invalid_deref, invalid_poll};
7
8impl<F: Future> Future for SendWrapper<F> {
9    type Output = F::Output;
10
11    /// Polls this [`SendWrapper`] [`Future`].
12    ///
13    /// # Panics
14    ///
15    /// Polling panics if it is done from a different thread than the one the
16    /// [`SendWrapper`] instance has been created with.
17    #[track_caller]
18    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
19        self.get_pinned_mut()
20            .unwrap_or_else(|| invalid_poll())
21            .poll(cx)
22    }
23}
24
25impl<S: Stream> Stream for SendWrapper<S> {
26    type Item = S::Item;
27
28    /// Polls this [`SendWrapper`] [`Stream`].
29    ///
30    /// # Panics
31    ///
32    /// Polling panics if it is done from a different thread than the one the
33    /// [`SendWrapper`] instance has been created with.
34    #[track_caller]
35    fn poll_next(
36        self: Pin<&mut Self>,
37        cx: &mut task::Context<'_>,
38    ) -> task::Poll<Option<Self::Item>> {
39        self.get_pinned_mut()
40            .unwrap_or_else(|| invalid_poll())
41            .poll_next(cx)
42    }
43
44    #[inline]
45    fn size_hint(&self) -> (usize, Option<usize>) {
46        self.get().unwrap_or_else(|| invalid_deref()).size_hint()
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use std::thread;
53
54    use futures_executor as executor;
55    use futures_util::{StreamExt, future, stream};
56
57    use crate::SendWrapper;
58
59    #[test]
60    fn test_future() {
61        let w1 = SendWrapper::new(future::ready(42));
62        let w2 = w1.clone();
63        assert_eq!(
64            format!("{:?}", executor::block_on(w1)),
65            format!("{:?}", executor::block_on(w2)),
66        );
67    }
68
69    #[test]
70    fn test_future_panic() {
71        let w = SendWrapper::new(future::ready(42));
72        let t = thread::spawn(move || executor::block_on(w));
73        assert!(t.join().is_err());
74    }
75
76    #[test]
77    fn test_stream() {
78        let mut w1 = SendWrapper::new(stream::once(future::ready(42)));
79        let mut w2 = SendWrapper::new(stream::once(future::ready(42)));
80        assert_eq!(
81            format!("{:?}", executor::block_on(w1.next())),
82            format!("{:?}", executor::block_on(w2.next())),
83        );
84    }
85
86    #[test]
87    fn test_stream_panic() {
88        let mut w = SendWrapper::new(stream::once(future::ready(42)));
89        let t = thread::spawn(move || executor::block_on(w.next()));
90        assert!(t.join().is_err());
91    }
92}