marigold_impl/
run_stream.rs1use core::pin::Pin;
2use futures::Stream;
3
4#[cfg(any(feature = "async-std", feature = "tokio"))]
5use futures::StreamExt;
6
7#[cfg(any(feature = "async-std", feature = "tokio"))]
8#[inline]
9pub fn run_stream<
10 T: std::marker::Send + 'static,
11 S: Stream<Item = T> + 'static + std::marker::Send + std::marker::Unpin,
12>(
13 s: S,
14) -> Pin<Box<dyn Stream<Item = T>>> {
15 let (sender, receiver) = futures::channel::mpsc::channel(std::cmp::max(num_cpus::get() - 1, 2));
16 crate::async_runtime::spawn(async move {
17 let mut moved_sender = sender;
18 s.map(Ok)
19 .forward(&mut moved_sender)
20 .await
21 .expect("Internal marigold error: could not write stream results to channel");
22 });
23 Box::pin(receiver)
24}
25
26#[cfg(not(any(feature = "async-std", feature = "tokio")))]
27#[inline]
28pub fn run_stream<
29 T: std::marker::Send + 'static,
30 S: Stream<Item = T> + 'static + std::marker::Send + std::marker::Unpin,
31>(
32 s: S,
33) -> Pin<Box<dyn Stream<Item = T>>> {
34 Box::pin(s)
35}
36
37#[cfg(test)]
38mod tests {
39 use super::*;
40 use futures::stream::StreamExt;
41
42 #[tokio::test]
43 async fn combinations() {
44 assert_eq!(
45 run_stream(futures::stream::iter(0_u32..3_u32))
46 .collect::<Vec<_>>()
47 .await,
48 vec![0_u32, 1_u32, 2_u32]
49 );
50 }
51}