1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use arc_swap::ArcSwapOption;
use std::sync::Arc;

use crate::{Message, Source};

/// Callbag operator that conditionally lets data pass through.
///
/// Works on either pullable or listenable sources.
///
/// See <https://github.com/staltz/callbag-filter/blob/01212b2d17622cae31545200235e9db3f1b0e235/readme.js#L23-L36>
///
/// # Examples
///
/// ```
/// use crossbeam_queue::SegQueue;
/// use std::sync::Arc;
///
/// use callbag::{filter, for_each, from_iter};
///
/// let actual = Arc::new(SegQueue::new());
///
/// let source = filter(|x| x % 2 == 1)(from_iter([1, 2, 3, 4, 5]));
///
/// for_each({
///     let actual = Arc::clone(&actual);
///     move |x| {
///         println!("{}", x);
///         actual.push(x);
///     }
/// })(source);
///
/// assert_eq!(
///     &{
///         let mut v = vec![];
///         for _i in 0..actual.len() {
///             v.push(actual.pop().unwrap());
///         }
///         v
///     }[..],
///     [1, 3, 5]
/// );
/// ```
pub fn filter<I: 'static, F: 'static, S>(condition: F) -> Box<dyn Fn(S) -> Source<I>>
where
    F: Fn(&I) -> bool + Send + Sync + Clone,
    S: Into<Arc<Source<I>>>,
{
    Box::new(move |source| {
        let source: Arc<Source<I>> = source.into();
        {
            let condition = condition.clone();
            move |message| {
                if let Message::Handshake(sink) = message {
                    let talkback = Arc::new(ArcSwapOption::from(None));
                    source(Message::Handshake(Arc::new(
                        {
                            let condition = condition.clone();
                            move |message| {
                                let talkback = Arc::clone(&talkback);
                                match message {
                                    Message::Handshake(source) => {
                                        talkback.store(Some(source));
                                        sink(Message::Handshake(Arc::new(
                                            (move |message| match message {
                                                Message::Handshake(_) => {
                                                    panic!("sink handshake has already occurred");
                                                },
                                                Message::Data(_) => {
                                                    panic!("sink must not send data");
                                                },
                                                Message::Pull => {
                                                    let talkback = talkback.load();
                                                    let source = talkback
                                                        .as_ref()
                                                        .expect("source talkback not set");
                                                    source(Message::Pull);
                                                },
                                                Message::Error(error) => {
                                                    let talkback = talkback.load();
                                                    let source = talkback
                                                        .as_ref()
                                                        .expect("source talkback not set");
                                                    source(Message::Error(error));
                                                },
                                                Message::Terminate => {
                                                    let talkback = talkback.load();
                                                    let source = talkback
                                                        .as_ref()
                                                        .expect("source talkback not set");
                                                    source(Message::Terminate);
                                                },
                                            })
                                            .into(),
                                        )));
                                    },
                                    Message::Data(data) => {
                                        if condition(&data) {
                                            sink(Message::Data(data));
                                        } else {
                                            let talkback = talkback.load();
                                            let talkback =
                                                talkback.as_ref().expect("source talkback not set");
                                            talkback(Message::Pull);
                                        }
                                    },
                                    Message::Pull => {
                                        panic!("source must not pull");
                                    },
                                    Message::Error(error) => {
                                        sink(Message::Error(error));
                                    },
                                    Message::Terminate => {
                                        sink(Message::Terminate);
                                    },
                                }
                            }
                        }
                        .into(),
                    )));
                }
            }
        }
        .into()
    })
}