callbag/map.rs
1use std::sync::Arc;
2
3use crate::{Message, Source};
4
5/// Callbag operator that applies a transformation on data passing through it.
6///
7/// Works on either pullable or listenable sources.
8///
9/// See <https://github.com/staltz/callbag-map/blob/b9d984b78bf4301d0525b21f928d896842e17a0a/readme.js#L24-L29>
10///
11/// # Examples
12///
13/// ```
14/// use crossbeam_queue::SegQueue;
15/// use std::sync::Arc;
16///
17/// use callbag::{for_each, from_iter, map};
18///
19/// let actual = Arc::new(SegQueue::new());
20///
21/// let source = map(|x| (x as f64 * 0.1) as usize)(from_iter([10, 20, 30, 40]));
22///
23/// for_each({
24/// let actual = Arc::clone(&actual);
25/// move |x| {
26/// println!("{}", x);
27/// actual.push(x);
28/// }
29/// })(source);
30///
31/// assert_eq!(
32/// &{
33/// let mut v = vec![];
34/// for _i in 0..actual.len() {
35/// v.push(actual.pop().unwrap());
36/// }
37/// v
38/// }[..],
39/// [1, 2, 3, 4]
40/// );
41/// ```
42pub fn map<I: 'static, O: 'static, F: 'static, S>(f: F) -> Box<dyn Fn(S) -> Source<O>>
43where
44 F: Fn(I) -> O + Send + Sync + Clone,
45 S: Into<Arc<Source<I>>>,
46{
47 Box::new(move |source| {
48 let source: Arc<Source<I>> = source.into();
49 {
50 let f = f.clone();
51 move |message| {
52 if let Message::Handshake(sink) = message {
53 source(Message::Handshake(Arc::new(
54 {
55 let f = f.clone();
56 move |message| match message {
57 Message::Handshake(source) => {
58 sink(Message::Handshake(Arc::new(
59 (move |message| match message {
60 Message::Handshake(_) => {
61 panic!("sink handshake has already occurred");
62 },
63 Message::Data(_) => {
64 panic!("sink must not send data");
65 },
66 Message::Pull => {
67 source(Message::Pull);
68 },
69 Message::Error(error) => {
70 source(Message::Error(error));
71 },
72 Message::Terminate => {
73 source(Message::Terminate);
74 },
75 })
76 .into(),
77 )));
78 },
79 Message::Data(data) => {
80 sink(Message::Data(f(data)));
81 },
82 Message::Pull => {
83 panic!("source must not pull");
84 },
85 Message::Error(error) => {
86 sink(Message::Error(error));
87 },
88 Message::Terminate => {
89 sink(Message::Terminate);
90 },
91 }
92 }
93 .into(),
94 )));
95 }
96 }
97 }
98 .into()
99 })
100}