callbag/filter.rs
1use arc_swap::ArcSwapOption;
2use std::sync::Arc;
3
4use crate::{Message, Source};
5
6/// Callbag operator that conditionally lets data pass through.
7///
8/// Works on either pullable or listenable sources.
9///
10/// See <https://github.com/staltz/callbag-filter/blob/01212b2d17622cae31545200235e9db3f1b0e235/readme.js#L23-L36>
11///
12/// # Examples
13///
14/// ```
15/// use crossbeam_queue::SegQueue;
16/// use std::sync::Arc;
17///
18/// use callbag::{filter, for_each, from_iter};
19///
20/// let actual = Arc::new(SegQueue::new());
21///
22/// let source = filter(|x| x % 2 == 1)(from_iter([1, 2, 3, 4, 5]));
23///
24/// for_each({
25///     let actual = Arc::clone(&actual);
26///     move |x| {
27///         println!("{}", x);
28///         actual.push(x);
29///     }
30/// })(source);
31///
32/// assert_eq!(
33///     &{
34///         let mut v = vec![];
35///         for _i in 0..actual.len() {
36///             v.push(actual.pop().unwrap());
37///         }
38///         v
39///     }[..],
40///     [1, 3, 5]
41/// );
42/// ```
43pub fn filter<I: 'static, F: 'static, S>(condition: F) -> Box<dyn Fn(S) -> Source<I>>
44where
45    F: Fn(&I) -> bool + Send + Sync + Clone,
46    S: Into<Arc<Source<I>>>,
47{
48    Box::new(move |source| {
49        let source: Arc<Source<I>> = source.into();
50        {
51            let condition = condition.clone();
52            move |message| {
53                if let Message::Handshake(sink) = message {
54                    let talkback = Arc::new(ArcSwapOption::from(None));
55                    source(Message::Handshake(Arc::new(
56                        {
57                            let condition = condition.clone();
58                            move |message| {
59                                let talkback = Arc::clone(&talkback);
60                                match message {
61                                    Message::Handshake(source) => {
62                                        talkback.store(Some(source));
63                                        sink(Message::Handshake(Arc::new(
64                                            (move |message| match message {
65                                                Message::Handshake(_) => {
66                                                    panic!("sink handshake has already occurred");
67                                                },
68                                                Message::Data(_) => {
69                                                    panic!("sink must not send data");
70                                                },
71                                                Message::Pull => {
72                                                    let talkback = talkback.load();
73                                                    let source = talkback
74                                                        .as_ref()
75                                                        .expect("source talkback not set");
76                                                    source(Message::Pull);
77                                                },
78                                                Message::Error(error) => {
79                                                    let talkback = talkback.load();
80                                                    let source = talkback
81                                                        .as_ref()
82                                                        .expect("source talkback not set");
83                                                    source(Message::Error(error));
84                                                },
85                                                Message::Terminate => {
86                                                    let talkback = talkback.load();
87                                                    let source = talkback
88                                                        .as_ref()
89                                                        .expect("source talkback not set");
90                                                    source(Message::Terminate);
91                                                },
92                                            })
93                                            .into(),
94                                        )));
95                                    },
96                                    Message::Data(data) => {
97                                        if condition(&data) {
98                                            sink(Message::Data(data));
99                                        } else {
100                                            let talkback = talkback.load();
101                                            let talkback =
102                                                talkback.as_ref().expect("source talkback not set");
103                                            talkback(Message::Pull);
104                                        }
105                                    },
106                                    Message::Pull => {
107                                        panic!("source must not pull");
108                                    },
109                                    Message::Error(error) => {
110                                        sink(Message::Error(error));
111                                    },
112                                    Message::Terminate => {
113                                        sink(Message::Terminate);
114                                    },
115                                }
116                            }
117                        }
118                        .into(),
119                    )));
120                }
121            }
122        }
123        .into()
124    })
125}