marigold_impl/
combinations.rs

1use async_trait::async_trait;
2use futures::stream::Stream;
3use futures::stream::StreamExt;
4use itertools::Combinations;
5use tracing::instrument;
6
7#[async_trait]
8pub trait Combinable<T> {
9    async fn combinations(
10        self,
11        k: usize,
12    ) -> futures::stream::Iter<Combinations<std::vec::IntoIter<T>>>;
13}
14
15/// This is a glue trait to allow streams to use Combinable in itertools.
16/// The current implementation eagerly consumes the parent stream.
17#[async_trait]
18impl<T, SInput> Combinable<T> for SInput
19where
20    SInput: Stream<Item = T> + Send,
21    T: Clone + Send + std::fmt::Debug,
22{
23    #[instrument(skip(self))]
24    async fn combinations(
25        self,
26        k: usize,
27    ) -> futures::stream::Iter<Combinations<std::vec::IntoIter<T>>> {
28        use itertools::Itertools;
29
30        let combinations_iterable = self.collect::<Vec<_>>().await.into_iter().combinations(k);
31        futures::stream::iter(combinations_iterable)
32    }
33}
34
35#[cfg(test)]
36mod tests {
37    use super::Combinable;
38    use futures::stream::StreamExt;
39
40    #[tokio::test]
41    async fn combinations() {
42        assert_eq!(
43            futures::stream::iter(vec![1, 2, 3])
44                .combinations(2)
45                .await
46                .collect::<Vec<_>>()
47                .await,
48            vec![vec![1, 2], vec![1, 3], vec![2, 3],]
49        );
50    }
51}