1use crate::prelude::*;
2
3#[macro_export]
26macro_rules! of_sequence {
27 ( $( $item:expr ),* ) => {
28 {
29 $crate::observable::create(|mut s| {
30 $(
31 s.next($item);
32 )*
33 s.complete();
34 })
35 }
36}
37}
38
39pub fn of<Item>(v: Item) -> ObservableBase<OfEmitter<Item>> {
56 ObservableBase::new(OfEmitter(v))
57}
58
59#[derive(Clone)]
60pub struct OfEmitter<Item>(pub(crate) Item);
61
62#[doc(hidden)]
63macro_rules! of_emitter {
64 ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
65 fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
66 where
67 O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
68 {
69 subscriber.next(self.0);
70 subscriber.complete();
71 }
72}
73}
74
75impl<Item> Emitter for OfEmitter<Item> {
76 type Item = Item;
77 type Err = ();
78}
79
80impl<'a, Item> LocalEmitter<'a> for OfEmitter<Item> {
81 of_emitter!(LocalSubscription, 'a);
82}
83
84impl<Item> SharedEmitter for OfEmitter<Item> {
85 of_emitter!(SharedSubscription, Send + Sync + 'static);
86}
87
88pub fn of_result<Item, Err>(
112 r: Result<Item, Err>,
113) -> ObservableBase<ResultEmitter<Item, Err>> {
114 ObservableBase::new(ResultEmitter(r))
115}
116
117#[doc(hidden)]
118macro_rules! of_result_emitter {
119 ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
120 fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
121 where
122 O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
123 {
124 match self.0 {
125 Ok(v) => subscriber.next(v),
126 Err(e) => subscriber.error(e),
127 };
128 subscriber.complete();
129 }
130}
131}
132
133#[derive(Clone)]
134pub struct ResultEmitter<Item, Err>(pub(crate) Result<Item, Err>);
135
136impl<Item, Err> Emitter for ResultEmitter<Item, Err> {
137 type Item = Item;
138 type Err = Err;
139}
140
141impl<'a, Item, Err> LocalEmitter<'a> for ResultEmitter<Item, Err> {
142 of_result_emitter!(LocalSubscription, 'a);
143}
144
145impl<Item, Err> SharedEmitter for ResultEmitter<Item, Err> {
146 of_result_emitter!(SharedSubscription, Send + Sync + 'static);
147}
148
149pub fn of_option<Item>(o: Option<Item>) -> ObservableBase<OptionEmitter<Item>> {
167 ObservableBase::new(OptionEmitter(o))
168}
169
170#[derive(Clone)]
171pub struct OptionEmitter<Item>(pub(crate) Option<Item>);
172
173#[doc(hidden)]
174macro_rules! of_option_emitter {
175 ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
176 fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
177 where
178 O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
179 {
180 if let Some(v) = self.0 {
181 subscriber.next(v)
182 }
183 subscriber.complete();
184 }
185}
186}
187
188impl<Item> Emitter for OptionEmitter<Item> {
189 type Item = Item;
190 type Err = ();
191}
192
193impl<'a, Item> LocalEmitter<'a> for OptionEmitter<Item> {
194 of_option_emitter!(LocalSubscription, 'a);
195}
196
197impl<Item> SharedEmitter for OptionEmitter<Item> {
198 of_option_emitter!(SharedSubscription, Send + Sync + 'static);
199}
200
201pub fn of_fn<F, Item>(f: F) -> ObservableBase<CallableEmitter<F>>
218where
219 F: FnOnce() -> Item,
220{
221 ObservableBase::new(CallableEmitter(f))
222}
223
224#[derive(Clone)]
225pub struct CallableEmitter<F>(F);
226
227#[doc(hidden)]
228macro_rules! of_fn_emitter {
229 ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
230 fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
231 where
232 O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
233 {
234 subscriber.next((self.0)());
235 subscriber.complete();
236 }
237}
238}
239
240impl<Item, F> Emitter for CallableEmitter<F>
241where
242 F: FnOnce() -> Item,
243{
244 type Item = Item;
245 type Err = ();
246}
247
248impl<'a, Item, F> LocalEmitter<'a> for CallableEmitter<F>
249where
250 F: FnOnce() -> Item,
251{
252 of_fn_emitter!(LocalSubscription, 'a);
253}
254
255impl<Item, F> SharedEmitter for CallableEmitter<F>
256where
257 F: FnOnce() -> Item,
258{
259 of_fn_emitter!(SharedSubscription, Send + Sync + 'static);
260}
261
262#[cfg(test)]
263mod test {
264 use crate::prelude::*;
265
266 #[test]
267 fn from_fn() {
268 let mut value = 0;
269 let mut completed = false;
270 let callable = || 123;
271 observable::of_fn(callable).subscribe_complete(
272 |v| {
273 value = v;
274 },
275 || completed = true,
276 );
277
278 assert_eq!(value, 123);
279 assert!(completed);
280 }
281
282 #[test]
283 fn of_option() {
284 let mut value1 = 0;
285 let mut completed1 = false;
286 observable::of_option(Some(123)).subscribe_complete(
287 |v| {
288 value1 = v;
289 },
290 || completed1 = true,
291 );
292
293 assert_eq!(value1, 123);
294 assert!(completed1);
295
296 let mut value2 = 0;
297 let mut completed2 = false;
298 observable::of_option(None).subscribe_complete(
299 |v| {
300 value2 = v;
301 },
302 || completed2 = true,
303 );
304
305 assert_eq!(value2, 0);
306 assert!(completed2);
307 }
308
309 #[test]
310 fn of_result() {
311 let mut value1 = 0;
312 let mut completed1 = false;
313 let r: Result<i32, &str> = Ok(123);
314 observable::of_result(r).subscribe_all(
315 |v| {
316 value1 = v;
317 },
318 |_| {},
319 || completed1 = true,
320 );
321
322 assert_eq!(value1, 123);
323 assert!(completed1);
324
325 let mut value2 = 0;
326 let mut error_reported = false;
327 let r: Result<i32, &str> = Err("error");
328 observable::of_result(r)
329 .subscribe_err(|_| value2 = 123, |_| error_reported = true);
330
331 assert_eq!(value2, 0);
332 assert!(error_reported);
333 }
334
335 #[test]
336 fn of() {
337 let mut value = 0;
338 let mut completed = false;
339 observable::of(100).subscribe_complete(|v| value = v, || completed = true);
340
341 assert_eq!(value, 100);
342 assert!(completed);
343 }
344
345 #[test]
346 fn of_macros() {
347 let mut value = 0;
348 of_sequence!(1, 2, 3).subscribe(|v| value += v);
349
350 assert_eq!(value, 6);
351 }
352
353 #[test]
354 fn bench() { do_bench(); }
355
356 benchmark_group!(do_bench, bench_of);
357
358 fn bench_of(b: &mut bencher::Bencher) { b.iter(of); }
359}