marigold_impl/
run_stream.rs

1use core::pin::Pin;
2use futures::Stream;
3
4#[cfg(any(feature = "async-std", feature = "tokio"))]
5use futures::StreamExt;
6
7#[cfg(any(feature = "async-std", feature = "tokio"))]
8#[inline]
9pub fn run_stream<
10    T: std::marker::Send + 'static,
11    S: Stream<Item = T> + 'static + std::marker::Send + std::marker::Unpin,
12>(
13    s: S,
14) -> Pin<Box<dyn Stream<Item = T>>> {
15    let (sender, receiver) = futures::channel::mpsc::channel(std::cmp::max(num_cpus::get() - 1, 2));
16    crate::async_runtime::spawn(async move {
17        let mut moved_sender = sender;
18        s.map(Ok)
19            .forward(&mut moved_sender)
20            .await
21            .expect("Internal marigold error: could not write stream results to channel");
22    });
23    Box::pin(receiver)
24}
25
26#[cfg(not(any(feature = "async-std", feature = "tokio")))]
27#[inline]
28pub fn run_stream<
29    T: std::marker::Send + 'static,
30    S: Stream<Item = T> + 'static + std::marker::Send + std::marker::Unpin,
31>(
32    s: S,
33) -> Pin<Box<dyn Stream<Item = T>>> {
34    Box::pin(s)
35}
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use futures::stream::StreamExt;
41
42    #[tokio::test]
43    async fn combinations() {
44        assert_eq!(
45            run_stream(futures::stream::iter(0_u32..3_u32))
46                .collect::<Vec<_>>()
47                .await,
48            vec![0_u32, 1_u32, 2_u32]
49        );
50    }
51}