Skip to main content

uactor/data/datasource_combinators/
filter_impl.rs

1use crate::data::datasource::{DataSource, DataSourceResult};
2
3pub struct DataSourceFilter<D, F>
4where
5    F: Fn(&D::Item) -> bool + Send,
6    D: DataSource + Send,
7{
8    datasource: D,
9    filter_fn: F,
10}
11
12impl<D, F> DataSourceFilterExt<D, F> for DataSourceFilter<D, F>
13where
14    F: Fn(&D::Item) -> bool + Send,
15    D: DataSource + Send,
16{
17    fn filter(self, map_fn: F) -> DataSourceFilter<D, F> {
18        DataSourceFilter {
19            datasource: self.datasource,
20            filter_fn: map_fn,
21        }
22    }
23}
24
25impl<D, F> DataSource for DataSourceFilter<D, F>
26where
27    F: Fn(&D::Item) -> bool + Send,
28    D: DataSource + Send,
29{
30    type Item = D::Item;
31
32    async fn next(&mut self) -> DataSourceResult<Self::Item> {
33        loop {
34            let value = DataSource::next(&mut self.datasource).await?;
35            if (self.filter_fn)(&value) {
36                return Ok(value);
37            } else {
38                continue;
39            }
40        }
41    }
42}
43
44pub trait DataSourceFilterExt<D, F>
45where
46    F: Fn(&D::Item) -> bool + Send,
47    D: DataSource + Send,
48{
49    #[allow(dead_code)]
50    fn filter(self, map_fn: F) -> DataSourceFilter<D, F>;
51}
52
53impl<D, F> DataSourceFilterExt<D, F> for D
54where
55    F: Fn(&D::Item) -> bool + Send,
56    D: DataSource + Send,
57{
58    fn filter(self, map_fn: F) -> DataSourceFilter<D, F> {
59        DataSourceFilter {
60            datasource: self,
61            filter_fn: map_fn,
62        }
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::DataSourceFilterExt;
69    use crate::data::datasource::DataSource;
70
71    #[tokio::test]
72    async fn test_map() {
73        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
74        tx.send(1).unwrap();
75        tx.send(-2).unwrap();
76        tx.send(3).unwrap();
77        tx.send(-4).unwrap();
78        tx.send(5).unwrap();
79
80        drop(tx);
81
82        let mut stream = rx.filter(|i| i > &0);
83
84        let mut sum = 0;
85        while let Ok(value) = stream.next().await {
86            assert!(value > 0);
87            sum += value;
88        }
89
90        assert_eq!(sum, 9);
91    }
92}