rxrust/observable/
from_iter.rs1use crate::prelude::*;
2use std::iter::{Repeat, Take};
3
4pub fn from_iter<Iter, Item>(iter: Iter) -> ObservableBase<IterEmitter<Iter>>
32where
33 Iter: IntoIterator<Item = Item>,
34{
35 ObservableBase::new(IterEmitter(iter))
36}
37
38#[derive(Clone)]
39pub struct IterEmitter<Iter>(Iter);
40
41#[doc(hidden)]
42macro_rules! iter_emitter {
43 ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
44 fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
45 where
46 O: Observer<Item=Self::Item, Err=Self::Err> + $($marker +)* $lf
47 {
48 for v in self.0.into_iter() {
49 if !subscriber.is_finished() {
50 subscriber.next(v);
51 } else {
52 break;
53 }
54 }
55 if !subscriber.is_finished() {
56 subscriber.complete();
57 }
58 }
59}
60}
61
62impl<Iter, Item> Emitter for IterEmitter<Iter>
63where
64 Iter: IntoIterator<Item = Item>,
65{
66 type Item = Item;
67 type Err = ();
68}
69
70impl<'a, Iter, Item> LocalEmitter<'a> for IterEmitter<Iter>
71where
72 Iter: IntoIterator<Item = Item>,
73{
74 iter_emitter!(LocalSubscription, 'a);
75}
76
77impl<Iter, Item> SharedEmitter for IterEmitter<Iter>
78where
79 Iter: IntoIterator<Item = Item>,
80{
81 iter_emitter!(SharedSubscription, Send + Sync + 'static);
82}
83
84pub fn repeat<Item>(
107 v: Item,
108 n: usize,
109) -> ObservableBase<IterEmitter<Take<Repeat<Item>>>>
110where
111 Item: Clone,
112{
113 from_iter(std::iter::repeat(v).take(n))
114}
115
116#[cfg(test)]
117mod test {
118 use crate::prelude::*;
119 use bencher::Bencher;
120
121 #[test]
122 fn from_range() {
123 let mut hit_count = 0;
124 let mut completed = false;
125 observable::from_iter(0..100)
126 .subscribe_complete(|_| hit_count += 1, || completed = true);
127
128 assert_eq!(hit_count, 100);
129 assert!(completed);
130 }
131
132 #[test]
133 fn from_vec() {
134 let mut hit_count = 0;
135 let mut completed = false;
136 observable::from_iter(vec![0; 100])
137 .subscribe_complete(|_| hit_count += 1, || completed = true);
138
139 assert_eq!(hit_count, 100);
140 assert!(completed);
141 }
142
143 #[test]
144 fn repeat_three_times() {
145 let mut hit_count = 0;
146 let mut completed = false;
147 repeat(123, 5).subscribe_complete(
148 |v| {
149 hit_count += 1;
150 assert_eq!(123, v);
151 },
152 || completed = true,
153 );
154 assert_eq!(5, hit_count);
155 assert!(completed);
156 }
157
158 #[test]
159 fn repeat_zero_times() {
160 let mut hit_count = 0;
161 let mut completed = false;
162 repeat(123, 0).subscribe_complete(
163 |v| {
164 hit_count += 1;
165 assert_eq!(123, v);
166 },
167 || completed = true,
168 );
169 assert_eq!(0, hit_count);
170 assert!(completed);
171 }
172 #[test]
173 fn bench() { do_bench(); }
174
175 benchmark_group!(do_bench, bench_from_iter);
176
177 fn bench_from_iter(b: &mut Bencher) { b.iter(from_range); }
178}