rxrust/observable.rs
1#![macro_use]
2mod trivial;
3pub use trivial::*;
4
5mod from_iter;
6pub use from_iter::{from_iter, repeat};
7
8pub mod of;
9pub use of::{of, of_fn, of_option, of_result};
10
11pub(crate) mod from_future;
12pub use from_future::{from_future, from_future_result};
13
14pub mod interval;
15pub use interval::{interval, interval_at};
16
17pub(crate) mod connectable_observable;
18pub use connectable_observable::{
19 ConnectableObservable, LocalConnectableObservable,
20 SharedConnectableObservable,
21};
22
23mod observable_block_all;
24#[cfg(test)]
25pub use observable_block_all::*;
26
27mod observable_block;
28#[cfg(test)]
29pub use observable_block::*;
30
31mod base;
32pub use base::*;
33
34pub mod from_fn;
35pub use from_fn::*;
36
37pub mod timer;
38pub use timer::{timer, timer_at};
39mod observable_all;
40pub use observable_all::*;
41mod observable_err;
42pub use observable_err::*;
43mod observable_next;
44pub use observable_next::*;
45mod defer;
46mod observable_comp;
47pub use defer::*;
48
49use crate::prelude::*;
50pub use observable_comp::*;
51
52use crate::ops::default_if_empty::DefaultIfEmptyOp;
53use ops::{
54 box_it::{BoxOp, IntoBox},
55 buffer::{BufferWithCountOp, BufferWithCountOrTimerOp, BufferWithTimeOp},
56 contains::ContainsOp,
57 debounce::DebounceOp,
58 delay::DelayOp,
59 distinct::DistinctOp,
60 filter::FilterOp,
61 filter_map::FilterMapOp,
62 finalize::FinalizeOp,
63 flatten::FlattenOp,
64 group_by::GroupByOp,
65 last::LastOp,
66 map::MapOp,
67 map_to::MapToOp,
68 merge::MergeOp,
69 merge_all::MergeAllOp,
70 observe_on::ObserveOnOp,
71 ref_count::{RefCount, RefCountCreator},
72 sample::SampleOp,
73 scan::ScanOp,
74 skip::SkipOp,
75 skip_last::SkipLastOp,
76 skip_while::SkipWhileOp,
77 subscribe_on::SubscribeOnOP,
78 take::TakeOp,
79 take_last::TakeLastOp,
80 take_until::TakeUntilOp,
81 take_while::TakeWhileOp,
82 throttle_time::{ThrottleEdge, ThrottleTimeOp},
83 zip::ZipOp,
84 Accum, AverageOp, CountOp, FlatMapOp, MinMaxOp, ReduceOp, SumOp,
85};
86use std::ops::{Add, Mul};
87use std::time::{Duration, Instant};
88
89type ALLOp<O, F> =
90 DefaultIfEmptyOp<TakeOp<FilterOp<MapOp<O, F>, fn(&bool) -> bool>>>;
91
92pub trait Observable: Sized {
93 type Item;
94 type Err;
95
96 /// emit only the first item emitted by an Observable
97 #[inline]
98 fn first(self) -> TakeOp<Self> { self.take(1) }
99
100 /// emit only the first item emitted by an Observable
101 #[inline]
102 fn first_or(self, default: Self::Item) -> DefaultIfEmptyOp<TakeOp<Self>> {
103 self.first().default_if_empty(default)
104 }
105
106 /// Emit only the last final item emitted by a source observable or a
107 /// default item given.
108 ///
109 /// Completes right after emitting the single item. Emits error when
110 /// source observable emits it.
111 ///
112 /// # Examples
113 ///
114 /// ```
115 /// use rxrust::prelude::*;
116 ///
117 /// observable::empty()
118 /// .last_or(1234)
119 /// .subscribe(|v| println!("{}", v));
120 ///
121 /// // print log:
122 /// // 1234
123 /// ```
124 #[inline]
125 fn last_or(
126 self,
127 default: Self::Item,
128 ) -> DefaultIfEmptyOp<LastOp<Self, Self::Item>> {
129 self.last().default_if_empty(default)
130 }
131
132 /// Emit only item n (0-indexed) emitted by an Observable
133 #[inline]
134 fn element_at(self, nth: u32) -> TakeOp<SkipOp<Self>> {
135 self.skip(nth).first()
136 }
137
138 /// Do not emit any items from an Observable but mirror its termination
139 /// notification
140 #[inline]
141 fn ignore_elements(self) -> FilterOp<Self, fn(&Self::Item) -> bool> {
142 fn always_false<Item>(_: &Item) -> bool { false }
143 self.filter(always_false as fn(&Self::Item) -> bool)
144 }
145
146 /// Determine whether all items emitted by an Observable meet some criteria
147 #[inline]
148 fn all<F>(self, pred: F) -> ALLOp<Self, F>
149 where
150 F: Fn(Self::Item) -> bool,
151 {
152 fn not(b: &bool) -> bool { !b }
153 self
154 .map(pred)
155 .filter(not as fn(&bool) -> bool)
156 .first_or(true)
157 }
158
159 /// Determine whether an Observable emits a particular item or not
160 fn contains(self, target: Self::Item) -> ContainsOp<Self, Self::Item> {
161 ContainsOp {
162 source: self,
163 target,
164 }
165 }
166
167 /// Emits only last final item emitted by a source observable.
168 ///
169 /// Completes right after emitting the single last item, or when source
170 /// observable completed, being an empty one. Emits error when source
171 /// observable emits it.
172 ///
173 /// # Examples
174 ///
175 /// ```
176 /// use rxrust::prelude::*;
177 ///
178 /// observable::from_iter(0..100)
179 /// .last()
180 /// .subscribe(|v| println!("{}", v));
181 ///
182 /// // print log:
183 /// // 99
184 /// ```
185 #[inline]
186 fn last(self) -> LastOp<Self, Self::Item> {
187 LastOp {
188 source: self,
189 last: None,
190 }
191 }
192
193 /// Call a function when observable completes, errors or is unsubscribed from.
194 #[inline]
195 fn finalize<F>(self, f: F) -> FinalizeOp<Self, F>
196 where
197 F: FnMut(),
198 {
199 FinalizeOp {
200 source: self,
201 func: f,
202 }
203 }
204
205 /// Creates an Observable that combines all the emissions from Observables
206 /// that get emitted from an Observable.
207 ///
208 /// # Example
209 ///
210 /// ```
211 /// # use rxrust::prelude::*;
212 /// let mut source = LocalSubject::new();
213 /// let numbers = LocalSubject::new();
214 /// // create a even stream by filter
215 /// let even = numbers.clone().filter((|v| *v % 2 == 0) as fn(&i32) -> bool);
216 /// // create an odd stream by filter
217 /// let odd = numbers.clone().filter((|v| *v % 2 != 0) as fn(&i32) -> bool);
218 ///
219 /// // merge odd and even stream again
220 /// let out = source.clone().flatten();
221 ///
222 /// source.next(even);
223 /// source.next(odd);
224 ///
225 /// // attach observers
226 /// out.subscribe(|v: i32| println!("{} ", v));
227 /// ```
228 #[inline]
229 fn flatten<Inner, A>(self) -> FlattenOp<Self, Inner>
230 where
231 Inner: Observable<Item = A, Err = Self::Err>,
232 {
233 FlattenOp {
234 source: self,
235 marker: std::marker::PhantomData::<Inner>,
236 }
237 }
238
239 /// Applies given function to each item emitted by this Observable, where
240 /// that function returns an Observable that itself emits items. It then
241 /// merges the emissions of these resulting Observables, emitting these
242 /// merged results as its own sequence.
243 #[inline]
244 fn flat_map<Inner, B, F>(self, f: F) -> FlatMapOp<Self, Inner, F>
245 where
246 Inner: Observable<Item = B, Err = Self::Err>,
247 F: Fn(Self::Item) -> Inner,
248 {
249 FlattenOp {
250 source: MapOp {
251 source: self,
252 func: f,
253 },
254 marker: std::marker::PhantomData::<Inner>,
255 }
256 }
257
258 /// Groups items emited by the source Observable into Observables.
259 /// Each emited Observable emits items matching the key returned
260 /// by the discriminator function.
261 ///
262 /// # Example
263 ///
264 /// ```
265 /// use rxrust::prelude::*;
266 ///
267 /// #[derive(Clone)]
268 /// struct Person {
269 /// name: String,
270 /// age: u32,
271 /// }
272 ///
273 /// observable::from_iter([
274 /// Person{ name: String::from("John"), age: 26 },
275 /// Person{ name: String::from("Anne"), age: 28 },
276 /// Person{ name: String::from("Gregory"), age: 24 },
277 /// Person{ name: String::from("Alice"), age: 28 },
278 /// ])
279 /// .group_by(|person: &Person| person.age)
280 /// .subscribe(|group| {
281 /// group
282 /// .reduce(|acc, person| format!("{} {}", acc, person.name))
283 /// .subscribe(|result| println!("{}", result));
284 /// });
285 ///
286 /// // Prints:
287 /// // John
288 /// // Anne Alice
289 /// // Gregory
290 /// ```
291 #[inline]
292 fn group_by<D, Item, Key>(self, discr: D) -> GroupByOp<Self, D>
293 where
294 D: FnMut(&Item) -> Key,
295 {
296 GroupByOp {
297 source: self,
298 discr,
299 }
300 }
301
302 /// Creates a new stream which calls a closure on each element and uses
303 /// its return as the value.
304 #[inline]
305 fn map<B, F>(self, f: F) -> MapOp<Self, F>
306 where
307 F: Fn(Self::Item) -> B,
308 {
309 MapOp {
310 source: self,
311 func: f,
312 }
313 }
314
315 /// Maps emissions to a constant value.
316 #[inline]
317 fn map_to<B>(self, value: B) -> MapToOp<Self, B> {
318 MapToOp {
319 source: self,
320 value,
321 }
322 }
323
324 /// combine two Observables into one by merging their emissions
325 ///
326 /// # Example
327 ///
328 /// ```
329 /// # use rxrust::prelude::*;
330 /// let numbers = LocalSubject::new();
331 /// // create a even stream by filter
332 /// let even = numbers.clone().filter(|v| *v % 2 == 0);
333 /// // create an odd stream by filter
334 /// let odd = numbers.clone().filter(|v| *v % 2 != 0);
335 ///
336 /// // merge odd and even stream again
337 /// let merged = even.merge(odd);
338 ///
339 /// // attach observers
340 /// merged.subscribe(|v: &i32| println!("{} ", v));
341 /// ```
342 #[inline]
343 fn merge<S>(self, o: S) -> MergeOp<Self, S>
344 where
345 S: Observable<Item = Self::Item, Err = Self::Err>,
346 {
347 MergeOp {
348 source1: self,
349 source2: o,
350 }
351 }
352
353 /// Converts a higher-order Observable into a first-order Observable which
354 /// concurrently delivers all values that are emitted on the inner
355 /// Observables.
356 ///
357 /// # Example
358 ///
359 /// ```
360 /// # use rxrust::prelude::*;
361 /// # use futures::executor::LocalPool;
362 /// # use std::time::Duration;
363 /// let mut local = LocalPool::new();
364 /// observable::from_iter(
365 /// (0..3)
366 /// .map(|_| interval(Duration::from_millis(1), local.spawner()).take(5)),
367 /// )
368 /// .merge_all(2)
369 /// .subscribe(move |i| println!("{}", i));
370 /// local.run();
371 /// ```
372 #[inline]
373 fn merge_all(self, concurrent: usize) -> MergeAllOp<Self> {
374 MergeAllOp {
375 source: self,
376 concurrent,
377 }
378 }
379
380 /// Emit only those items from an Observable that pass a predicate test
381 /// # Example
382 ///
383 /// ```
384 /// use rxrust:: prelude::*;
385 ///
386 /// let mut coll = vec![];
387 /// let coll_clone = coll.clone();
388 ///
389 /// observable::from_iter(0..10)
390 /// .filter(|v| *v % 2 == 0)
391 /// .subscribe(|v| { coll.push(v); });
392 ///
393 /// // only even numbers received.
394 /// assert_eq!(coll, vec![0, 2, 4, 6, 8]);
395 /// ```
396 #[inline]
397 fn filter<F>(self, filter: F) -> FilterOp<Self, F>
398 where
399 F: Fn(&Self::Item) -> bool,
400 {
401 FilterOp {
402 source: self,
403 filter,
404 }
405 }
406
407 /// The closure must return an Option<T>. filter_map creates an iterator which
408 /// calls this closure on each element. If the closure returns Some(element),
409 /// then that element is returned. If the closure returns None, it will try
410 /// again, and call the closure on the next element, seeing if it will return
411 /// Some.
412 ///
413 /// Why filter_map and not just filter and map? The key is in this part:
414 ///
415 /// If the closure returns Some(element), then that element is returned.
416 ///
417 /// In other words, it removes the Option<T> layer automatically. If your
418 /// mapping is already returning an Option<T> and you want to skip over Nones,
419 /// then filter_map is much, much nicer to use.
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// # use rxrust::prelude::*;
425 /// let mut res: Vec<i32> = vec![];
426 /// observable::from_iter(["1", "lol", "3", "NaN", "5"].iter())
427 /// .filter_map(|s: &&str| s.parse().ok())
428 /// .subscribe(|v| res.push(v));
429 ///
430 /// assert_eq!(res, [1, 3, 5]);
431 /// ```
432 #[inline]
433 fn filter_map<F, SourceItem, Item>(self, f: F) -> FilterMapOp<Self, F>
434 where
435 F: FnMut(SourceItem) -> Option<Item>,
436 {
437 FilterMapOp { source: self, f }
438 }
439
440 /// box an observable to a safety object and convert it to a simple type
441 /// `BoxOp`, which only care `Item` and `Err` Observable emitted.
442 ///
443 /// # Example
444 /// ```
445 /// use rxrust::prelude::*;
446 /// use ops::box_it::LocalBoxOp;
447 ///
448 /// let mut boxed: LocalBoxOp<'_, i32, ()> = observable::of(1)
449 /// .map(|v| v).box_it();
450 ///
451 /// // BoxOp can box any observable type
452 /// boxed = observable::empty().box_it();
453 ///
454 /// boxed.subscribe(|_| {});
455 /// ```
456 #[inline]
457 fn box_it<O: IntoBox<Self>>(self) -> BoxOp<O>
458 where
459 BoxOp<O>: Observable<Item = Self::Item, Err = Self::Err>,
460 {
461 O::box_it(self)
462 }
463
464 /// Ignore the first `count` values emitted by the source Observable.
465 ///
466 /// `skip` returns an Observable that ignore the first `count` values
467 /// emitted by the source Observable. If the source emits fewer than `count`
468 /// values then 0 of its values are emitted. After that, it completes,
469 /// regardless if the source completes.
470 ///
471 /// # Example
472 /// Ignore the first 5 seconds of an infinite 1-second interval Observable
473 ///
474 /// ```
475 /// # use rxrust::prelude::*;
476 ///
477 /// observable::from_iter(0..10).skip(5).subscribe(|v| println!("{}", v));
478
479 /// // print logs:
480 /// // 6
481 /// // 7
482 /// // 8
483 /// // 9
484 /// // 10
485 /// ```
486 #[inline]
487 fn skip(self, count: u32) -> SkipOp<Self> {
488 SkipOp {
489 source: self,
490 count,
491 }
492 }
493
494 /// Ignore values while result of a callback is true.
495 ///
496 /// `skip_while` returns an Observable that ignores values while result of an
497 /// callback is true emitted by the source Observable.
498 ///
499 /// # Example
500 /// Suppress the first 5 items of an infinite 1-second interval Observable
501 ///
502 /// ```
503 /// # use rxrust::prelude::*;
504 ///
505 /// observable::from_iter(0..10)
506 /// .skip_while(|v| v < &5)
507 /// .subscribe(|v| println!("{}", v));
508 ///
509 /// // print logs:
510 /// // 5
511 /// // 6
512 /// // 7
513 /// // 8
514 /// // 9
515 /// ```
516 #[inline]
517 fn skip_while<F>(self, callback: F) -> SkipWhileOp<Self, F>
518 where
519 F: FnMut(&Self::Item) -> bool,
520 {
521 SkipWhileOp {
522 source: self,
523 callback,
524 }
525 }
526
527 /// Ignore the last `count` values emitted by the source Observable.
528 ///
529 /// `skip_last` returns an Observable that ignore the last `count` values
530 /// emitted by the source Observable. If the source emits fewer than `count`
531 /// values then 0 of its values are emitted.
532 /// It will not emit values until source Observable complete.
533 ///
534 /// # Example
535 /// Skip the last 5 seconds of an infinite 1-second interval Observable
536 ///
537 /// ```
538 /// # use rxrust::prelude::*;
539 ///
540 /// observable::from_iter(0..10)
541 /// .skip_last(5)
542 /// .subscribe(|v| println!("{}", v));
543 ///
544 /// // print logs:
545 /// // 0
546 /// // 1
547 /// // 2
548 /// // 3
549 /// // 4
550 /// ```
551 #[inline]
552 fn skip_last(self, count: usize) -> SkipLastOp<Self> {
553 SkipLastOp {
554 source: self,
555 count,
556 }
557 }
558
559 /// Emits only the first `count` values emitted by the source Observable.
560 ///
561 /// `take` returns an Observable that emits only the first `count` values
562 /// emitted by the source Observable. If the source emits fewer than `count`
563 /// values then all of its values are emitted. After that, it completes,
564 /// regardless if the source completes.
565 ///
566 /// # Example
567 /// Take the first 5 seconds of an infinite 1-second interval Observable
568 ///
569 /// ```
570 /// # use rxrust::prelude::*;
571 ///
572 /// observable::from_iter(0..10).take(5).subscribe(|v| println!("{}", v));
573
574 /// // print logs:
575 /// // 0
576 /// // 1
577 /// // 2
578 /// // 3
579 /// // 4
580 /// ```
581 ///
582 #[inline]
583 fn take(self, count: u32) -> TakeOp<Self> {
584 TakeOp {
585 source: self,
586 count,
587 }
588 }
589
590 /// Emits the values emitted by the source Observable until a `notifier`
591 /// Observable emits a value.
592 ///
593 /// `take_until` subscribes and begins mirroring the source Observable. It
594 /// also monitors a second Observable, `notifier` that you provide. If the
595 /// `notifier` emits a value, the output Observable stops mirroring the source
596 /// Observable and completes. If the `notifier` doesn't emit any value and
597 /// completes then `take_until` will pass all values.
598 #[inline]
599 fn take_until<T>(self, notifier: T) -> TakeUntilOp<Self, T> {
600 TakeUntilOp {
601 source: self,
602 notifier,
603 }
604 }
605
606 /// Emits values while result of an callback is true.
607 ///
608 /// `take_while` returns an Observable that emits values while result of an
609 /// callback is true emitted by the source Observable.
610 /// It will not emit values until source Observable complete.
611 ///
612 /// # Example
613 /// Take the first 5 seconds of an infinite 1-second interval Observable
614 ///
615 /// ```
616 /// # use rxrust::prelude::*;
617 ///
618 /// observable::from_iter(0..10)
619 /// .take_while(|v| v < &5)
620 /// .subscribe(|v| println!("{}", v));
621
622 /// // print logs:
623 /// // 0
624 /// // 1
625 /// // 2
626 /// // 3
627 /// // 4
628 /// ```
629 ///
630 #[inline]
631 fn take_while<F>(self, callback: F) -> TakeWhileOp<Self, F>
632 where
633 F: FnMut(&Self::Item) -> bool,
634 {
635 TakeWhileOp {
636 source: self,
637 callback,
638 }
639 }
640
641 /// Emits only the last `count` values emitted by the source Observable.
642 ///
643 /// `take_last` returns an Observable that emits only the last `count` values
644 /// emitted by the source Observable. If the source emits fewer than `count`
645 /// values then all of its values are emitted.
646 /// It will not emit values until source Observable complete.
647 ///
648 /// # Example
649 /// Take the last 5 seconds of an infinite 1-second interval Observable
650 ///
651 /// ```
652 /// # use rxrust::prelude::*;
653 ///
654 /// observable::from_iter(0..10)
655 /// .take_last(5)
656 /// .subscribe(|v| println!("{}", v));
657
658 /// // print logs:
659 /// // 5
660 /// // 6
661 /// // 7
662 /// // 8
663 /// // 9
664 /// ```
665 ///
666 #[inline]
667 fn take_last(self, count: usize) -> TakeLastOp<Self> {
668 TakeLastOp {
669 source: self,
670 count,
671 }
672 }
673
674 /// Emits item it has most recently emitted since the previous sampling
675 ///
676 ///
677 /// It will emit values when sampling observable complete.
678 ///
679 /// #Example
680 /// Sampling every 5ms of an infinite 1ms interval Observable
681 /// ```
682 /// use rxrust::prelude::*;
683 /// use std::time::Duration;
684 /// use futures::executor::LocalPool;
685 ///
686 /// let mut local_scheduler = LocalPool::new();
687 /// let spawner = local_scheduler.spawner();
688 /// observable::interval(Duration::from_millis(2), spawner.clone())
689 /// .sample(observable::interval(Duration::from_millis(5), spawner))
690 /// .take(5)
691 /// .subscribe(move |v| println!("{}", v));
692 ///
693 /// local_scheduler.run();
694 /// // print logs:
695 /// // 1
696 /// // 4
697 /// // 6
698 /// // 9
699 /// // ...
700 /// ```
701 #[inline]
702 fn sample<O>(self, sampling: O) -> SampleOp<Self, O>
703 where
704 O: Observable,
705 {
706 SampleOp {
707 source: self,
708 sampling,
709 }
710 }
711
712 /// The Scan operator applies a function to the first item emitted by the
713 /// source observable and then emits the result of that function as its
714 /// own first emission. It also feeds the result of the function back into
715 /// the function along with the second item emitted by the source observable
716 /// in order to generate its second emission. It continues to feed back its
717 /// own subsequent emissions along with the subsequent emissions from the
718 /// source Observable in order to create the rest of its sequence.
719 ///
720 /// Applies a binary operator closure to each item emitted from source
721 /// observable and emits successive values.
722 ///
723 /// Completes when source observable completes.
724 /// Emits error when source observable emits it.
725 ///
726 /// This version starts with an user-specified initial value for when the
727 /// binary operator is called with the first item processed.
728 ///
729 /// # Arguments
730 ///
731 /// * `initial_value` - An initial value to start the successive accumulations
732 /// from.
733 /// * `binary_op` - A closure or function acting as a binary operator.
734 ///
735 /// # Examples
736 ///
737 /// ```
738 /// use rxrust::prelude::*;
739 ///
740 /// observable::from_iter(vec![1, 1, 1, 1, 1])
741 /// .scan_initial(100, |acc, v| acc + v)
742 /// .subscribe(|v| println!("{}", v));
743 ///
744 /// // print log:
745 /// // 101
746 /// // 102
747 /// // 103
748 /// // 104
749 /// // 105
750 /// ```
751 #[inline]
752 fn scan_initial<OutputItem, BinaryOp>(
753 self,
754 initial_value: OutputItem,
755 binary_op: BinaryOp,
756 ) -> ScanOp<Self, BinaryOp, OutputItem>
757 where
758 BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
759 OutputItem: Clone,
760 {
761 ScanOp {
762 source_observable: self,
763 binary_op,
764 initial_value,
765 }
766 }
767
768 /// Works like [`scan_initial`](Observable::scan_initial) but starts with a
769 /// value defined by a [`Default`] trait for the first argument `binary_op`
770 /// operator operates on.
771 ///
772 /// # Arguments
773 ///
774 /// * `binary_op` - A closure or function acting as a binary operator.
775 #[inline]
776 fn scan<OutputItem, BinaryOp>(
777 self,
778 binary_op: BinaryOp,
779 ) -> ScanOp<Self, BinaryOp, OutputItem>
780 where
781 BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
782 OutputItem: Default + Clone,
783 {
784 self.scan_initial(OutputItem::default(), binary_op)
785 }
786
787 /// Apply a function to each item emitted by an observable, sequentially,
788 /// and emit the final value, after source observable completes.
789 ///
790 /// Emits error when source observable emits it.
791 ///
792 /// # Arguments
793 ///
794 /// * `initial` - An initial value to start the successive reduction from.
795 /// * `binary_op` - A closure acting as a binary (folding) operator.
796 ///
797 /// # Examples
798 ///
799 /// ```
800 /// use rxrust::prelude::*;
801 ///
802 /// observable::from_iter(vec![1, 1, 1, 1, 1])
803 /// .reduce_initial(100, |acc, v| acc + v)
804 /// .subscribe(|v| println!("{}", v));
805 ///
806 /// // print log:
807 /// // 105
808 /// ```
809 #[inline]
810 fn reduce_initial<OutputItem, BinaryOp>(
811 self,
812 initial: OutputItem,
813 binary_op: BinaryOp,
814 ) -> ReduceOp<Self, BinaryOp, OutputItem>
815 where
816 BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
817 OutputItem: Clone,
818 {
819 // realised as a composition of `scan`, and `last`
820 self
821 .scan_initial(initial.clone(), binary_op)
822 .last_or(initial)
823 }
824
825 /// Works like [`reduce_initial`](Observable::reduce_initial) but starts with
826 /// a value defined by a [`Default`] trait for the first argument `f`
827 /// operator operates on.
828 ///
829 /// # Arguments
830 ///
831 /// * `binary_op` - A closure acting as a binary operator.
832 #[inline]
833 fn reduce<OutputItem, BinaryOp>(
834 self,
835 binary_op: BinaryOp,
836 ) -> DefaultIfEmptyOp<LastOp<ScanOp<Self, BinaryOp, OutputItem>, OutputItem>>
837 where
838 BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
839 OutputItem: Default + Clone,
840 {
841 self.reduce_initial(OutputItem::default(), binary_op)
842 }
843
844 /// Emits the item from the source observable that had the maximum value.
845 ///
846 /// Emits error when source observable emits it.
847 ///
848 /// # Examples
849 ///
850 /// ```
851 /// use rxrust::prelude::*;
852 ///
853 /// observable::from_iter(vec![3., 4., 7., 5., 6.])
854 /// .max()
855 /// .subscribe(|v| println!("{}", v));
856 ///
857 /// // print log:
858 /// // 7
859 /// ```
860 #[inline]
861 fn max(self) -> MinMaxOp<Self, Self::Item>
862 where
863 Self::Item: Clone + Send + PartialOrd<Self::Item>,
864 {
865 fn get_greater<Item>(i: Option<Item>, v: Item) -> Option<Item>
866 where
867 Item: Clone + PartialOrd<Item>,
868 {
869 i.map(|vv| if vv < v { v.clone() } else { vv }).or(Some(v))
870 }
871 let get_greater_func =
872 get_greater as fn(Option<Self::Item>, Self::Item) -> Option<Self::Item>;
873
874 self
875 .scan_initial(None, get_greater_func)
876 .last()
877 // we can safely unwrap, because we will ever get this item
878 // once a max value exists and is there.
879 .map(|v| v.unwrap())
880 }
881
882 /// Emits the item from the source observable that had the minimum value.
883 ///
884 /// Emits error when source observable emits it.
885 ///
886 /// # Examples
887 ///
888 /// ```
889 /// use rxrust::prelude::*;
890 ///
891 /// observable::from_iter(vec![3., 4., 7., 5., 6.])
892 /// .min()
893 /// .subscribe(|v| println!("{}", v));
894 ///
895 /// // print log:
896 /// // 3
897 /// ```
898 #[inline]
899 fn min(self) -> MinMaxOp<Self, Self::Item>
900 where
901 Self::Item: Clone + Send + PartialOrd<Self::Item>,
902 {
903 fn get_lesser<Item>(i: Option<Item>, v: Item) -> Option<Item>
904 where
905 Item: Clone + PartialOrd<Item>,
906 {
907 i.map(|vv| if vv > v { v.clone() } else { vv }).or(Some(v))
908 }
909
910 let get_lesser_func =
911 get_lesser as fn(Option<Self::Item>, Self::Item) -> Option<Self::Item>;
912
913 self
914 .scan_initial(None, get_lesser_func)
915 .last()
916 // we can safely unwrap, because we will ever get this item
917 // once a max value exists and is there.
918 .map(|v| v.unwrap())
919 }
920
921 /// Calculates the sum of numbers emitted by an source observable and emits
922 /// this sum when source completes.
923 ///
924 /// Emits zero when source completed as an and empty sequence.
925 /// Emits error when source observable emits it.
926 ///
927 /// # Examples
928 ///
929 /// ```
930 /// use rxrust::prelude::*;
931 ///
932 /// observable::from_iter(vec![1, 1, 1, 1, 1])
933 /// .sum()
934 /// .subscribe(|v| println!("{}", v));
935 ///
936 /// // p rint log:
937 /// // 5
938 /// ```
939 #[inline]
940 fn sum(self) -> SumOp<Self, Self::Item>
941 where
942 Self::Item: Clone + Default + Add<Self::Item, Output = Self::Item>,
943 {
944 self.reduce(|acc, v| acc + v)
945 }
946
947 /// Emits the number of items emitted by a source observable when this source
948 /// completes.
949 ///
950 /// The output type of this operator is fixed to [`usize`].
951 ///
952 /// Emits zero when source completed as an and empty sequence.
953 /// Emits error when source observable emits it.
954 ///
955 /// # Examples
956 ///
957 /// ```
958 /// use rxrust::prelude::*;
959 ///
960 /// observable::from_iter(vec!['1', '7', '3', '0', '4'])
961 /// .count()
962 /// .subscribe(|v| println!("{}", v));
963 ///
964 /// // print log:
965 /// // 5
966 /// ```
967 #[inline]
968 fn count(self) -> CountOp<Self, Self::Item> { self.reduce(|acc, _v| acc + 1) }
969
970 /// Calculates the sum of numbers emitted by an source observable and emits
971 /// this sum when source completes.
972 ///
973 /// Emits zero when source completed as an and empty sequence.
974 /// Emits error when source observable emits it.
975 ///
976 /// # Examples
977 ///
978 /// ```
979 /// use rxrust::prelude::*;
980 ///
981 /// observable::from_iter(vec![3., 4., 5., 6., 7.])
982 /// .average()
983 /// .subscribe(|v| println!("{}", v));
984 ///
985 /// // print log:
986 /// // 5
987 /// ```
988 #[inline]
989 fn average(self) -> AverageOp<Self, Self::Item>
990 where
991 Self::Item: Clone
992 + Send
993 + Default
994 + Add<Self::Item, Output = Self::Item>
995 + Mul<f64, Output = Self::Item>,
996 {
997 /// Computing an average by multiplying accumulated nominator by a
998 /// reciprocal of accumulated denominator. In this way some generic
999 /// types that support linear scaling over floats values could be
1000 /// averaged (e.g. vectors)
1001 fn average_floats<T>(acc: Accum<T>) -> T
1002 where
1003 T: Default + Clone + Send + Mul<f64, Output = T>,
1004 {
1005 // Note: we will never be dividing by zero here, as
1006 // the acc.1 will be always >= 1.
1007 // It would have be zero if we've would have received an element
1008 // when the source observable is empty but beacuse of how
1009 // `scan` works, we will transparently not receive anything in
1010 // such case.
1011 acc.0 * (1.0 / (acc.1 as f64))
1012 }
1013
1014 fn accumulate_item<T>(acc: Accum<T>, v: T) -> Accum<T>
1015 where
1016 T: Clone + Add<T, Output = T>,
1017 {
1018 let newacc = acc.0 + v;
1019 let newcount = acc.1 + 1;
1020 (newacc, newcount)
1021 }
1022
1023 // our starting point
1024 let start = (Self::Item::default(), 0);
1025
1026 let acc =
1027 accumulate_item as fn(Accum<Self::Item>, Self::Item) -> Accum<Self::Item>;
1028 let avg = average_floats as fn(Accum<Self::Item>) -> Self::Item;
1029
1030 self.scan_initial(start, acc).last().map(avg)
1031 }
1032
1033 /// Returns a ConnectableObservable. A ConnectableObservable Observable
1034 /// resembles an ordinary Observable, except that it does not begin emitting
1035 /// items when it is subscribed to, but only when the Connect operator is
1036 /// applied to it. In this way you can wait for all intended observers to
1037 /// subscribe to the Observable before the Observable begins emitting items.
1038 #[inline]
1039 fn publish<Subject: Default>(self) -> ConnectableObservable<Self, Subject> {
1040 ConnectableObservable {
1041 source: self,
1042 subject: Subject::default(),
1043 }
1044 }
1045
1046 /// Returns a new Observable that multicast (shares) the original
1047 /// Observable. As long as there is at least one Subscriber this
1048 /// Observable will be subscribed and emitting data. When all subscribers
1049 /// have unsubscribed it will unsubscribe from the source Observable.
1050 /// Because the Observable is multicasting it makes the stream `hot`.
1051 /// This is an alias for `publish().ref_count()`
1052 #[inline]
1053 fn share<Subject, Inner>(
1054 self,
1055 ) -> RefCount<Inner, ConnectableObservable<Self, Subject>>
1056 where
1057 Inner: RefCountCreator<Connectable = ConnectableObservable<Self, Subject>>,
1058 Subject: Default,
1059 Self: Clone,
1060 {
1061 self.publish::<Subject>().ref_count::<Inner>()
1062 }
1063
1064 /// Delays the emission of items from the source Observable by a given timeout
1065 /// or until a given `Instant`.
1066 #[inline]
1067 fn delay<SD>(self, dur: Duration, scheduler: SD) -> DelayOp<Self, SD> {
1068 DelayOp {
1069 source: self,
1070 delay: dur,
1071 scheduler,
1072 }
1073 }
1074
1075 #[inline]
1076 fn delay_at<SD>(self, at: Instant, scheduler: SD) -> DelayOp<Self, SD> {
1077 DelayOp {
1078 source: self,
1079 delay: at.elapsed(),
1080 scheduler,
1081 }
1082 }
1083
1084 /// Specify the Scheduler on which an Observable will operate
1085 ///
1086 /// With `SubscribeON` you can decide what type of scheduler a specific
1087 /// Observable will be using when it is subscribed to.
1088 ///
1089 /// Schedulers control the speed and order of emissions to observers from an
1090 /// Observable stream.
1091 ///
1092 /// # Example
1093 /// Given the following code:
1094 /// ```rust
1095 /// use rxrust::prelude::*;
1096 ///
1097 /// let a = observable::from_iter(1..5);
1098 /// let b = observable::from_iter(5..10);
1099 /// a.merge(b).subscribe(|v| print!("{} ", v));
1100 /// ```
1101 ///
1102 /// Both Observable `a` and `b` will emit their values directly and
1103 /// synchronously once they are subscribed to.
1104 /// This will result in the output of `1 2 3 4 5 6 7 8 9`.
1105 ///
1106 /// But if we instead use the `subscribe_on` operator declaring that we want
1107 /// to use the new thread scheduler for values emitted by Observable `a`:
1108 /// ```rust
1109 /// use rxrust::prelude::*;
1110 /// use std::thread;
1111 /// use futures::executor::ThreadPool;
1112 ///
1113 /// let pool = ThreadPool::new().unwrap();
1114 /// let a = observable::from_iter(1..5).subscribe_on(pool);
1115 /// let b = observable::from_iter(5..10);
1116 /// a.merge(b).into_shared().subscribe(|v|{
1117 /// let handle = thread::current();
1118 /// print!("{}({:?}) ", v, handle.id())
1119 /// });
1120 /// ```
1121 ///
1122 /// The output will instead by `1(thread 1) 2(thread 1) 3(thread 1) 4(thread
1123 /// 1) 5(thread 2) 6(thread 2) 7(thread 2) 8(thread 2) 9(thread id2)`.
1124 /// The reason for this is that Observable `b` emits its values directly like
1125 /// before, but the emissions from `a` are scheduled on a new thread because
1126 /// we are now using the `NewThread` Scheduler for that specific Observable.
1127 #[inline]
1128 fn subscribe_on<SD>(self, scheduler: SD) -> SubscribeOnOP<Self, SD> {
1129 SubscribeOnOP {
1130 source: self,
1131 scheduler,
1132 }
1133 }
1134
1135 /// Re-emits all notifications from source Observable with specified
1136 /// scheduler.
1137 ///
1138 /// `ObserveOn` is an operator that accepts a scheduler as the parameter,
1139 /// which will be used to reschedule notifications emitted by the source
1140 /// Observable.
1141 #[inline]
1142 fn observe_on<SD>(self, scheduler: SD) -> ObserveOnOp<Self, SD> {
1143 ObserveOnOp {
1144 source: self,
1145 scheduler,
1146 }
1147 }
1148
1149 /// Emits a value from the source Observable only after a particular time span
1150 /// has passed without another source emission.
1151 #[inline]
1152 fn debounce<SD>(
1153 self,
1154 duration: Duration,
1155 scheduler: SD,
1156 ) -> DebounceOp<Self, SD> {
1157 DebounceOp {
1158 source: self,
1159 duration,
1160 scheduler,
1161 }
1162 }
1163
1164 /// Emits a value from the source Observable, then ignores subsequent source
1165 /// values for duration milliseconds, then repeats this process.
1166 ///
1167 /// #Example
1168 /// ```
1169 /// use rxrust::{ prelude::*, ops::throttle_time::ThrottleEdge };
1170 /// use std::time::Duration;
1171 /// use futures::executor::LocalPool;
1172 ///
1173 /// let mut local_scheduler = LocalPool::new();
1174 /// let spawner = local_scheduler.spawner();
1175 /// observable::interval(Duration::from_millis(1), spawner.clone())
1176 /// .throttle_time(Duration::from_millis(9), ThrottleEdge::Leading, spawner)
1177 /// .take(5)
1178 /// .subscribe(move |v| println!("{}", v));
1179 ///
1180 /// local_scheduler.run();
1181 /// ```
1182 #[inline]
1183 fn throttle_time<SD>(
1184 self,
1185 duration: Duration,
1186 edge: ThrottleEdge,
1187 scheduler: SD,
1188 ) -> ThrottleTimeOp<Self, SD> {
1189 ThrottleTimeOp {
1190 source: self,
1191 duration,
1192 edge,
1193 scheduler,
1194 }
1195 }
1196
1197 /// Returns an Observable that emits all items emitted by the source
1198 /// Observable that are distinct by comparison from previous items.
1199 #[inline]
1200 fn distinct(self) -> DistinctOp<Self> { DistinctOp { source: self } }
1201
1202 /// 'Zips up' two observable into a single observable of pairs.
1203 ///
1204 /// zip() returns a new observable that will emit over two other
1205 /// observables, returning a tuple where the first element comes from the
1206 /// first observable, and the second element comes from the second
1207 /// observable.
1208 ///
1209 /// In other words, it zips two observables together, into a single one.
1210 #[inline]
1211 fn zip<U>(self, other: U) -> ZipOp<Self, U>
1212 where
1213 U: Observable,
1214 {
1215 ZipOp { a: self, b: other }
1216 }
1217
1218 /// Emits default value if Observable completed with empty result
1219 ///
1220 /// #Example
1221 /// ```
1222 /// use rxrust::prelude::*;
1223 ///
1224 /// observable::empty()
1225 /// .default_if_empty(5)
1226 /// .subscribe(|v| println!("{}", v));
1227 ///
1228 /// // Prints:
1229 /// // 5
1230 /// ```
1231 #[inline]
1232 fn default_if_empty(
1233 self,
1234 default_value: Self::Item,
1235 ) -> DefaultIfEmptyOp<Self> {
1236 DefaultIfEmptyOp {
1237 source: self,
1238 is_empty: true,
1239 default_value,
1240 }
1241 }
1242
1243 /// Buffers emitted values of type T in a Vec<T> and
1244 /// emits that Vec<T> as soon as the buffer's size equals
1245 /// the given count.
1246 /// On complete, if the buffer is not empty,
1247 /// it will be emitted.
1248 /// On error, the buffer will be discarded.
1249 ///
1250 /// The operator never returns an empty buffer.
1251 ///
1252 /// #Example
1253 /// ```
1254 /// use rxrust::prelude::*;
1255 ///
1256 /// observable::from_iter(0..6)
1257 /// .buffer_with_count(3)
1258 /// .subscribe(|vec| println!("{:?}", vec));
1259 ///
1260 /// // Prints:
1261 /// // [0, 1, 2]
1262 /// // [3, 4, 5]
1263 /// ```
1264 #[inline]
1265 fn buffer_with_count(self, count: usize) -> BufferWithCountOp<Self> {
1266 BufferWithCountOp {
1267 source: self,
1268 count,
1269 }
1270 }
1271
1272 /// Buffers emitted values of type T in a Vec<T> and
1273 /// emits that Vec<T> periodically.
1274 ///
1275 /// On complete, if the buffer is not empty,
1276 /// it will be emitted.
1277 /// On error, the buffer will be discarded.
1278 ///
1279 /// The operator never returns an empty buffer.
1280 ///
1281 /// #Example
1282 /// ```
1283 /// use rxrust::prelude::*;
1284 /// use std::time::Duration;
1285 /// use futures::executor::ThreadPool;
1286 ///
1287 /// let pool = ThreadPool::new().unwrap();
1288 ///
1289 /// observable::create(|mut subscriber| {
1290 /// subscriber.next(0);
1291 /// subscriber.next(1);
1292 /// std::thread::sleep(Duration::from_millis(100));
1293 /// subscriber.next(2);
1294 /// subscriber.next(3);
1295 /// subscriber.complete();
1296 /// })
1297 /// .buffer_with_time(Duration::from_millis(50), pool)
1298 /// .into_shared()
1299 /// .subscribe(|vec| println!("{:?}", vec));
1300 ///
1301 /// // Prints:
1302 /// // [0, 1]
1303 /// // [2, 3]
1304 /// ```
1305 #[inline]
1306 fn buffer_with_time<S>(
1307 self,
1308 time: Duration,
1309 scheduler: S,
1310 ) -> BufferWithTimeOp<Self, S> {
1311 BufferWithTimeOp {
1312 source: self,
1313 time,
1314 scheduler,
1315 }
1316 }
1317
1318 /// Buffers emitted values of type T in a Vec<T> and
1319 /// emits that Vec<T> either if the buffer's size equals count, or
1320 /// periodically. This operator combines the functionality of
1321 /// buffer_with_count and buffer_with_time.
1322 ///
1323 /// #Example
1324 /// ```
1325 /// use rxrust::prelude::*;
1326 /// use std::time::Duration;
1327 /// use futures::executor::ThreadPool;
1328 ///
1329 /// let pool = ThreadPool::new().unwrap();
1330 ///
1331 /// observable::create(|mut subscriber| {
1332 /// subscriber.next(0);
1333 /// subscriber.next(1);
1334 /// subscriber.next(2);
1335 /// std::thread::sleep(Duration::from_millis(100));
1336 /// subscriber.next(3);
1337 /// subscriber.next(4);
1338 /// subscriber.complete();
1339 /// })
1340 /// .buffer_with_count_and_time(2, Duration::from_millis(50), pool)
1341 /// .into_shared()
1342 /// .subscribe(|vec| println!("{:?}", vec));
1343 ///
1344 /// // Prints:
1345 /// // [0, 1]
1346 /// // [2]
1347 /// // [3, 4]
1348 /// ```
1349 #[inline]
1350 fn buffer_with_count_and_time<S>(
1351 self,
1352 count: usize,
1353 time: Duration,
1354 scheduler: S,
1355 ) -> BufferWithCountOrTimerOp<Self, S> {
1356 BufferWithCountOrTimerOp {
1357 source: self,
1358 count,
1359 time,
1360 scheduler,
1361 }
1362 }
1363}
1364
1365pub trait LocalObservable<'a>: Observable {
1366 type Unsub: SubscriptionLike + 'static;
1367 fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
1368 self,
1369 subscriber: Subscriber<O, LocalSubscription>,
1370 ) -> Self::Unsub;
1371}
1372
1373#[macro_export]
1374macro_rules! observable_proxy_impl {
1375 ($ty: ident, $host: ident$(, $lf: lifetime)?$(, $generics: ident) *) => {
1376 impl<$($lf, )? $host, $($generics ,)*> Observable
1377 for $ty<$($lf, )? $host, $($generics ,)*>
1378 where
1379 $host: Observable
1380 {
1381 type Item = $host::Item;
1382 type Err = $host::Err;
1383 }
1384}
1385}
1386
1387#[cfg(test)]
1388mod tests {
1389 use super::*;
1390
1391 #[test]
1392 fn smoke_element_at() {
1393 let s = observable::from_iter(0..20);
1394 s.clone().element_at(0).subscribe(|v| assert_eq!(v, 0));
1395 s.clone().element_at(5).subscribe(|v| assert_eq!(v, 5));
1396 s.clone().element_at(20).subscribe(|v| assert_eq!(v, 20));
1397 s.element_at(21).subscribe(|_| panic!());
1398 }
1399
1400 #[test]
1401 fn bench_element_at() { do_bench_element_at(); }
1402
1403 benchmark_group!(do_bench_element_at, element_at_bench);
1404
1405 fn element_at_bench(b: &mut bencher::Bencher) { b.iter(smoke_element_at); }
1406
1407 #[test]
1408 fn first() {
1409 let mut completed = 0;
1410 let mut next_count = 0;
1411
1412 observable::from_iter(0..2)
1413 .first()
1414 .subscribe_complete(|_| next_count += 1, || completed += 1);
1415
1416 assert_eq!(completed, 1);
1417 assert_eq!(next_count, 1);
1418 }
1419
1420 #[test]
1421 fn bench_first() { do_bench_first(); }
1422
1423 benchmark_group!(do_bench_first, first_bench);
1424
1425 fn first_bench(b: &mut bencher::Bencher) { b.iter(first); }
1426
1427 #[test]
1428 fn first_or() {
1429 let mut completed = false;
1430 let mut next_count = 0;
1431
1432 observable::from_iter(0..2)
1433 .first_or(100)
1434 .subscribe_complete(|_| next_count += 1, || completed = true);
1435
1436 assert_eq!(next_count, 1);
1437 assert!(completed);
1438
1439 completed = false;
1440 let mut v = 0;
1441 observable::empty()
1442 .first_or(100)
1443 .subscribe_complete(|value| v = value, || completed = true);
1444
1445 assert!(completed);
1446 assert_eq!(v, 100);
1447 }
1448
1449 #[test]
1450 fn bench_first_or() { do_bench_first_or(); }
1451
1452 benchmark_group!(do_bench_first_or, first_or_bench);
1453
1454 fn first_or_bench(b: &mut bencher::Bencher) { b.iter(first_or); }
1455
1456 #[test]
1457 fn first_support_fork() {
1458 let mut value = 0;
1459 let mut value2 = 0;
1460 {
1461 let o = observable::from_iter(1..100).first();
1462 let o1 = o.clone().first();
1463 let o2 = o.first();
1464 o1.subscribe(|v| value = v);
1465 o2.subscribe(|v| value2 = v);
1466 }
1467 assert_eq!(value, 1);
1468 assert_eq!(value2, 1);
1469 }
1470
1471 #[test]
1472 fn first_or_support_fork() {
1473 let mut default = 0;
1474 let mut default2 = 0;
1475 let o = observable::create(|mut subscriber| {
1476 subscriber.complete();
1477 })
1478 .first_or(100);
1479 let o1 = o.clone().first_or(0);
1480 let o2 = o.clone().first_or(0);
1481 o1.subscribe(|v| default = v);
1482 o2.subscribe(|v| default2 = v);
1483 assert_eq!(default, 100);
1484 assert_eq!(default, 100);
1485 }
1486
1487 #[test]
1488 fn smoke_ignore_elements() {
1489 observable::from_iter(0..20)
1490 .ignore_elements()
1491 .subscribe(move |_| panic!());
1492 }
1493
1494 #[test]
1495 fn bench_ignore() { do_bench_ignore(); }
1496
1497 benchmark_group!(do_bench_ignore, ignore_emements_bench);
1498
1499 fn ignore_emements_bench(b: &mut bencher::Bencher) {
1500 b.iter(smoke_ignore_elements);
1501 }
1502
1503 #[test]
1504 fn shared_ignore_elements() {
1505 observable::from_iter(0..20)
1506 .ignore_elements()
1507 .into_shared()
1508 .subscribe(|_| panic!());
1509 }
1510
1511 #[test]
1512 fn smoke_all() {
1513 observable::from_iter(0..10)
1514 .all(|v| v < 10)
1515 .subscribe(|b| assert!(b));
1516 observable::from_iter(0..10)
1517 .all(|v| v < 5)
1518 .subscribe(|b| assert!(!b));
1519 }
1520
1521 #[test]
1522 fn bench_all() { do_bench_all(); }
1523
1524 benchmark_group!(do_bench_all, all_bench);
1525
1526 fn all_bench(b: &mut bencher::Bencher) { b.iter(smoke_all); }
1527}