1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use ArcSwapOption;
use Arc;
use crate::;
/// Callbag sink that consumes both pullable and listenable sources.
///
/// When called on a pullable source, it will iterate through its data. When called on a listenable
/// source, it will observe its data.
///
/// See <https://github.com/staltz/callbag-for-each/blob/a7550690afca2a27324ea5634a32a313f826d61a/readme.js#L40-L47>
///
/// # Examples
///
/// Consume a pullable source:
///
/// ```
/// use crossbeam_queue::SegQueue;
/// use std::sync::Arc;
///
/// use callbag::{for_each, from_iter};
///
/// let actual = Arc::new(SegQueue::new());
///
/// let source = from_iter([10, 20, 30, 40]);
///
/// for_each({
/// let actual = Arc::clone(&actual);
/// move |x| {
/// println!("{}", x);
/// actual.push(x);
/// }
/// })(source);
///
/// assert_eq!(
/// &{
/// let mut v = vec![];
/// for _i in 0..actual.len() {
/// v.push(actual.pop().unwrap());
/// }
/// v
/// }[..],
/// [10, 20, 30, 40]
/// );
/// ```
///
/// Consume a listenable source:
///
/// ```
/// use async_executors::TimerExt;
/// use async_nursery::Nursery;
/// use crossbeam_queue::SegQueue;
/// use std::{sync::Arc, time::Duration};
///
/// use callbag::{for_each, interval};
///
/// let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
///
/// let actual = Arc::new(SegQueue::new());
///
/// let source = interval(Duration::from_millis(1_000), nursery.clone());
///
/// for_each({
/// let actual = Arc::clone(&actual);
/// move |x| {
/// println!("{}", x);
/// actual.push(x);
/// }
/// })(source);
///
/// let nursery_out = nursery.timeout(Duration::from_millis(4_500), nursery_out);
/// drop(nursery);
/// async_std::task::block_on(nursery_out);
///
/// assert_eq!(
/// &{
/// let mut v = vec![];
/// for _i in 0..actual.len() {
/// v.push(actual.pop().unwrap());
/// }
/// v
/// }[..],
/// [0, 1, 2, 3]
/// );
/// ```