uactor/data/datasource_combinators/
filter_impl.rs1use crate::data::datasource::{DataSource, DataSourceResult};
2
3pub struct DataSourceFilter<D, F>
4where
5 F: Fn(&D::Item) -> bool + Send,
6 D: DataSource + Send,
7{
8 datasource: D,
9 filter_fn: F,
10}
11
12impl<D, F> DataSourceFilterExt<D, F> for DataSourceFilter<D, F>
13where
14 F: Fn(&D::Item) -> bool + Send,
15 D: DataSource + Send,
16{
17 fn filter(self, map_fn: F) -> DataSourceFilter<D, F> {
18 DataSourceFilter {
19 datasource: self.datasource,
20 filter_fn: map_fn,
21 }
22 }
23}
24
25impl<D, F> DataSource for DataSourceFilter<D, F>
26where
27 F: Fn(&D::Item) -> bool + Send,
28 D: DataSource + Send,
29{
30 type Item = D::Item;
31
32 async fn next(&mut self) -> DataSourceResult<Self::Item> {
33 loop {
34 let value = DataSource::next(&mut self.datasource).await?;
35 if (self.filter_fn)(&value) {
36 return Ok(value);
37 } else {
38 continue;
39 }
40 }
41 }
42}
43
44pub trait DataSourceFilterExt<D, F>
45where
46 F: Fn(&D::Item) -> bool + Send,
47 D: DataSource + Send,
48{
49 #[allow(dead_code)]
50 fn filter(self, map_fn: F) -> DataSourceFilter<D, F>;
51}
52
53impl<D, F> DataSourceFilterExt<D, F> for D
54where
55 F: Fn(&D::Item) -> bool + Send,
56 D: DataSource + Send,
57{
58 fn filter(self, map_fn: F) -> DataSourceFilter<D, F> {
59 DataSourceFilter {
60 datasource: self,
61 filter_fn: map_fn,
62 }
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::DataSourceFilterExt;
69 use crate::data::datasource::DataSource;
70
71 #[tokio::test]
72 async fn test_map() {
73 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
74 tx.send(1).unwrap();
75 tx.send(-2).unwrap();
76 tx.send(3).unwrap();
77 tx.send(-4).unwrap();
78 tx.send(5).unwrap();
79
80 drop(tx);
81
82 let mut stream = rx.filter(|i| i > &0);
83
84 let mut sum = 0;
85 while let Ok(value) = stream.next().await {
86 assert!(value > 0);
87 sum += value;
88 }
89
90 assert_eq!(sum, 9);
91 }
92}