callbag/
map.rs

1use std::sync::Arc;
2
3use crate::{Message, Source};
4
5/// Callbag operator that applies a transformation on data passing through it.
6///
7/// Works on either pullable or listenable sources.
8///
9/// See <https://github.com/staltz/callbag-map/blob/b9d984b78bf4301d0525b21f928d896842e17a0a/readme.js#L24-L29>
10///
11/// # Examples
12///
13/// ```
14/// use crossbeam_queue::SegQueue;
15/// use std::sync::Arc;
16///
17/// use callbag::{for_each, from_iter, map};
18///
19/// let actual = Arc::new(SegQueue::new());
20///
21/// let source = map(|x| (x as f64 * 0.1) as usize)(from_iter([10, 20, 30, 40]));
22///
23/// for_each({
24///     let actual = Arc::clone(&actual);
25///     move |x| {
26///         println!("{}", x);
27///         actual.push(x);
28///     }
29/// })(source);
30///
31/// assert_eq!(
32///     &{
33///         let mut v = vec![];
34///         for _i in 0..actual.len() {
35///             v.push(actual.pop().unwrap());
36///         }
37///         v
38///     }[..],
39///     [1, 2, 3, 4]
40/// );
41/// ```
42pub fn map<I: 'static, O: 'static, F: 'static, S>(f: F) -> Box<dyn Fn(S) -> Source<O>>
43where
44    F: Fn(I) -> O + Send + Sync + Clone,
45    S: Into<Arc<Source<I>>>,
46{
47    Box::new(move |source| {
48        let source: Arc<Source<I>> = source.into();
49        {
50            let f = f.clone();
51            move |message| {
52                if let Message::Handshake(sink) = message {
53                    source(Message::Handshake(Arc::new(
54                        {
55                            let f = f.clone();
56                            move |message| match message {
57                                Message::Handshake(source) => {
58                                    sink(Message::Handshake(Arc::new(
59                                        (move |message| match message {
60                                            Message::Handshake(_) => {
61                                                panic!("sink handshake has already occurred");
62                                            },
63                                            Message::Data(_) => {
64                                                panic!("sink must not send data");
65                                            },
66                                            Message::Pull => {
67                                                source(Message::Pull);
68                                            },
69                                            Message::Error(error) => {
70                                                source(Message::Error(error));
71                                            },
72                                            Message::Terminate => {
73                                                source(Message::Terminate);
74                                            },
75                                        })
76                                        .into(),
77                                    )));
78                                },
79                                Message::Data(data) => {
80                                    sink(Message::Data(f(data)));
81                                },
82                                Message::Pull => {
83                                    panic!("source must not pull");
84                                },
85                                Message::Error(error) => {
86                                    sink(Message::Error(error));
87                                },
88                                Message::Terminate => {
89                                    sink(Message::Terminate);
90                                },
91                            }
92                        }
93                        .into(),
94                    )));
95                }
96            }
97        }
98        .into()
99    })
100}