callbag/
skip.rs

1use arc_swap::ArcSwapOption;
2use std::sync::{
3    atomic::{AtomicUsize, Ordering as AtomicOrdering},
4    Arc,
5};
6
7use crate::{Message, Source};
8
9/// Callbag operator that skips the first N data points of a source.
10///
11/// Works on either pullable and listenable sources.
12///
13/// See <https://github.com/staltz/callbag-skip/blob/698d6b7805c9bcddac038ceff25a0f0362adb25a/index.js#L1-L18>
14///
15/// # Examples
16///
17/// On a listenable source:
18///
19/// ```
20/// use async_executors::TimerExt;
21/// use async_nursery::Nursery;
22/// use crossbeam_queue::SegQueue;
23/// use std::{sync::Arc, time::Duration};
24///
25/// use callbag::{for_each, interval, skip};
26///
27/// let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
28///
29/// let actual = Arc::new(SegQueue::new());
30///
31/// let source = skip(3)(interval(Duration::from_millis(1_000), nursery.clone()));
32///
33/// for_each({
34///     let actual = Arc::clone(&actual);
35///     move |x| {
36///         println!("{}", x);
37///         actual.push(x);
38///     }
39/// })(source);
40///
41/// let nursery_out = nursery.timeout(Duration::from_millis(7_500), nursery_out);
42/// drop(nursery);
43/// async_std::task::block_on(nursery_out);
44///
45/// assert_eq!(
46///     &{
47///         let mut v = vec![];
48///         for _i in 0..actual.len() {
49///             v.push(actual.pop().unwrap());
50///         }
51///         v
52///     }[..],
53///     [3, 4, 5, 6]
54/// );
55/// ```
56///
57/// On a pullable source:
58///
59/// ```
60/// use crossbeam_queue::SegQueue;
61/// use std::sync::Arc;
62///
63/// use callbag::{for_each, from_iter, skip};
64///
65/// #[derive(Clone)]
66/// struct Range {
67///     i: usize,
68///     to: usize,
69/// }
70///
71/// impl Range {
72///     fn new(from: usize, to: usize) -> Self {
73///         Range { i: from, to }
74///     }
75/// }
76///
77/// impl Iterator for Range {
78///     type Item = usize;
79///
80///     fn next(&mut self) -> Option<Self::Item> {
81///         let i = self.i;
82///         if i <= self.to {
83///             self.i += 1;
84///             Some(i)
85///         } else {
86///             None
87///         }
88///     }
89/// }
90///
91/// let actual = Arc::new(SegQueue::new());
92///
93/// let source = skip(4)(from_iter(Range::new(10, 20)));
94///
95/// for_each({
96///     let actual = Arc::clone(&actual);
97///     move |x| {
98///         println!("{}", x);
99///         actual.push(x);
100///     }
101/// })(source);
102///
103/// assert_eq!(
104///     &{
105///         let mut v = vec![];
106///         for _i in 0..actual.len() {
107///             v.push(actual.pop().unwrap());
108///         }
109///         v
110///     }[..],
111///     [14, 15, 16, 17, 18, 19, 20]
112/// );
113/// ```
114pub fn skip<T: 'static, S>(max: usize) -> Box<dyn Fn(S) -> Source<T>>
115where
116    S: Into<Arc<Source<T>>>,
117{
118    Box::new(move |source| {
119        let source: Arc<Source<T>> = source.into();
120        (move |message| {
121            if let Message::Handshake(sink) = message {
122                let skipped = Arc::new(AtomicUsize::new(0));
123                let talkback: Arc<ArcSwapOption<Source<T>>> = Arc::new(ArcSwapOption::from(None));
124                source(Message::Handshake(Arc::new(
125                    (move |message| match message {
126                        Message::Handshake(source) => {
127                            talkback.store(Some(source));
128                            sink(Message::Handshake(Arc::new(
129                                {
130                                    let talkback = Arc::clone(&talkback);
131                                    move |message| match message {
132                                        Message::Handshake(_) => {
133                                            panic!("sink handshake has already occurred");
134                                        },
135                                        Message::Data(_) => {
136                                            panic!("sink must not send data");
137                                        },
138                                        Message::Pull => {
139                                            let talkback = talkback.load();
140                                            let source =
141                                                talkback.as_ref().expect("source talkback not set");
142                                            source(Message::Pull);
143                                        },
144                                        Message::Error(error) => {
145                                            let talkback = talkback.load();
146                                            let source =
147                                                talkback.as_ref().expect("source talkback not set");
148                                            source(Message::Error(error));
149                                        },
150                                        Message::Terminate => {
151                                            let talkback = talkback.load();
152                                            let source =
153                                                talkback.as_ref().expect("source talkback not set");
154                                            source(Message::Terminate);
155                                        },
156                                    }
157                                }
158                                .into(),
159                            )));
160                        },
161                        Message::Data(data) => {
162                            if skipped.load(AtomicOrdering::Acquire) < max {
163                                skipped.fetch_add(1, AtomicOrdering::AcqRel);
164                                {
165                                    let talkback = talkback.load();
166                                    let talkback =
167                                        talkback.as_ref().expect("source talkback not set");
168                                    talkback(Message::Pull);
169                                }
170                            } else {
171                                sink(Message::Data(data));
172                            }
173                        },
174                        Message::Pull => {
175                            panic!("source must not pull");
176                        },
177                        Message::Error(error) => {
178                            sink(Message::Error(error));
179                        },
180                        Message::Terminate => {
181                            sink(Message::Terminate);
182                        },
183                    })
184                    .into(),
185                )))
186            }
187        })
188        .into()
189    })
190}