rxrust/
observable.rs

1#![macro_use]
2mod trivial;
3pub use trivial::*;
4
5mod from_iter;
6pub use from_iter::{from_iter, repeat};
7
8pub mod of;
9pub use of::{of, of_fn, of_option, of_result};
10
11pub(crate) mod from_future;
12pub use from_future::{from_future, from_future_result};
13
14pub mod interval;
15pub use interval::{interval, interval_at};
16
17pub(crate) mod connectable_observable;
18pub use connectable_observable::{
19  ConnectableObservable, LocalConnectableObservable,
20  SharedConnectableObservable,
21};
22
23mod observable_block_all;
24#[cfg(test)]
25pub use observable_block_all::*;
26
27mod observable_block;
28#[cfg(test)]
29pub use observable_block::*;
30
31mod base;
32pub use base::*;
33
34pub mod from_fn;
35pub use from_fn::*;
36
37pub mod timer;
38pub use timer::{timer, timer_at};
39mod observable_all;
40pub use observable_all::*;
41mod observable_err;
42pub use observable_err::*;
43mod observable_next;
44pub use observable_next::*;
45mod defer;
46mod observable_comp;
47pub use defer::*;
48
49use crate::prelude::*;
50pub use observable_comp::*;
51
52use crate::ops::default_if_empty::DefaultIfEmptyOp;
53use ops::{
54  box_it::{BoxOp, IntoBox},
55  buffer::{BufferWithCountOp, BufferWithCountOrTimerOp, BufferWithTimeOp},
56  contains::ContainsOp,
57  debounce::DebounceOp,
58  delay::DelayOp,
59  distinct::DistinctOp,
60  filter::FilterOp,
61  filter_map::FilterMapOp,
62  finalize::FinalizeOp,
63  flatten::FlattenOp,
64  group_by::GroupByOp,
65  last::LastOp,
66  map::MapOp,
67  map_to::MapToOp,
68  merge::MergeOp,
69  merge_all::MergeAllOp,
70  observe_on::ObserveOnOp,
71  ref_count::{RefCount, RefCountCreator},
72  sample::SampleOp,
73  scan::ScanOp,
74  skip::SkipOp,
75  skip_last::SkipLastOp,
76  skip_while::SkipWhileOp,
77  subscribe_on::SubscribeOnOP,
78  take::TakeOp,
79  take_last::TakeLastOp,
80  take_until::TakeUntilOp,
81  take_while::TakeWhileOp,
82  throttle_time::{ThrottleEdge, ThrottleTimeOp},
83  zip::ZipOp,
84  Accum, AverageOp, CountOp, FlatMapOp, MinMaxOp, ReduceOp, SumOp,
85};
86use std::ops::{Add, Mul};
87use std::time::{Duration, Instant};
88
89type ALLOp<O, F> =
90  DefaultIfEmptyOp<TakeOp<FilterOp<MapOp<O, F>, fn(&bool) -> bool>>>;
91
92pub trait Observable: Sized {
93  type Item;
94  type Err;
95
96  /// emit only the first item emitted by an Observable
97  #[inline]
98  fn first(self) -> TakeOp<Self> { self.take(1) }
99
100  /// emit only the first item emitted by an Observable
101  #[inline]
102  fn first_or(self, default: Self::Item) -> DefaultIfEmptyOp<TakeOp<Self>> {
103    self.first().default_if_empty(default)
104  }
105
106  /// Emit only the last final item emitted by a source observable or a
107  /// default item given.
108  ///
109  /// Completes right after emitting the single item. Emits error when
110  /// source observable emits it.
111  ///
112  /// # Examples
113  ///
114  /// ```
115  /// use rxrust::prelude::*;
116  ///
117  /// observable::empty()
118  ///   .last_or(1234)
119  ///   .subscribe(|v| println!("{}", v));
120  ///
121  /// // print log:
122  /// // 1234
123  /// ```
124  #[inline]
125  fn last_or(
126    self,
127    default: Self::Item,
128  ) -> DefaultIfEmptyOp<LastOp<Self, Self::Item>> {
129    self.last().default_if_empty(default)
130  }
131
132  /// Emit only item n (0-indexed) emitted by an Observable
133  #[inline]
134  fn element_at(self, nth: u32) -> TakeOp<SkipOp<Self>> {
135    self.skip(nth).first()
136  }
137
138  /// Do not emit any items from an Observable but mirror its termination
139  /// notification
140  #[inline]
141  fn ignore_elements(self) -> FilterOp<Self, fn(&Self::Item) -> bool> {
142    fn always_false<Item>(_: &Item) -> bool { false }
143    self.filter(always_false as fn(&Self::Item) -> bool)
144  }
145
146  /// Determine whether all items emitted by an Observable meet some criteria
147  #[inline]
148  fn all<F>(self, pred: F) -> ALLOp<Self, F>
149  where
150    F: Fn(Self::Item) -> bool,
151  {
152    fn not(b: &bool) -> bool { !b }
153    self
154      .map(pred)
155      .filter(not as fn(&bool) -> bool)
156      .first_or(true)
157  }
158
159  /// Determine whether an Observable emits a particular item or not
160  fn contains(self, target: Self::Item) -> ContainsOp<Self, Self::Item> {
161    ContainsOp {
162      source: self,
163      target,
164    }
165  }
166
167  /// Emits only last final item emitted by a source observable.
168  ///
169  /// Completes right after emitting the single last item, or when source
170  /// observable completed, being an empty one. Emits error when source
171  /// observable emits it.
172  ///
173  /// # Examples
174  ///
175  /// ```
176  /// use rxrust::prelude::*;
177  ///
178  /// observable::from_iter(0..100)
179  ///   .last()
180  ///   .subscribe(|v| println!("{}", v));
181  ///
182  /// // print log:
183  /// // 99
184  /// ```
185  #[inline]
186  fn last(self) -> LastOp<Self, Self::Item> {
187    LastOp {
188      source: self,
189      last: None,
190    }
191  }
192
193  /// Call a function when observable completes, errors or is unsubscribed from.
194  #[inline]
195  fn finalize<F>(self, f: F) -> FinalizeOp<Self, F>
196  where
197    F: FnMut(),
198  {
199    FinalizeOp {
200      source: self,
201      func: f,
202    }
203  }
204
205  /// Creates an Observable that combines all the emissions from Observables
206  /// that get emitted from an Observable.
207  ///
208  /// # Example
209  ///
210  /// ```
211  /// # use rxrust::prelude::*;
212  /// let mut source = LocalSubject::new();
213  /// let numbers = LocalSubject::new();
214  /// // create a even stream by filter
215  /// let even = numbers.clone().filter((|v| *v % 2 == 0) as fn(&i32) -> bool);
216  /// // create an odd stream by filter
217  /// let odd = numbers.clone().filter((|v| *v % 2 != 0) as fn(&i32) -> bool);
218  ///
219  /// // merge odd and even stream again
220  /// let out = source.clone().flatten();
221  ///
222  /// source.next(even);
223  /// source.next(odd);
224  ///
225  /// // attach observers
226  /// out.subscribe(|v: i32| println!("{} ", v));
227  /// ```
228  #[inline]
229  fn flatten<Inner, A>(self) -> FlattenOp<Self, Inner>
230  where
231    Inner: Observable<Item = A, Err = Self::Err>,
232  {
233    FlattenOp {
234      source: self,
235      marker: std::marker::PhantomData::<Inner>,
236    }
237  }
238
239  ///  Applies given function to each item emitted by this Observable, where
240  ///  that function returns an Observable that itself emits items. It then
241  ///  merges the emissions of these resulting Observables, emitting these
242  ///  merged results as its own sequence.
243  #[inline]
244  fn flat_map<Inner, B, F>(self, f: F) -> FlatMapOp<Self, Inner, F>
245  where
246    Inner: Observable<Item = B, Err = Self::Err>,
247    F: Fn(Self::Item) -> Inner,
248  {
249    FlattenOp {
250      source: MapOp {
251        source: self,
252        func: f,
253      },
254      marker: std::marker::PhantomData::<Inner>,
255    }
256  }
257
258  /// Groups items emited by the source Observable into Observables.
259  /// Each emited Observable emits items matching the key returned
260  /// by the discriminator function.
261  ///
262  /// # Example
263  ///
264  /// ```
265  /// use rxrust::prelude::*;
266  ///
267  /// #[derive(Clone)]
268  /// struct Person {
269  ///   name: String,
270  ///   age: u32,
271  /// }
272  ///
273  /// observable::from_iter([
274  ///   Person{ name: String::from("John"), age: 26 },
275  ///   Person{ name: String::from("Anne"), age: 28 },
276  ///   Person{ name: String::from("Gregory"), age: 24 },
277  ///   Person{ name: String::from("Alice"), age: 28 },
278  /// ])
279  /// .group_by(|person: &Person| person.age)
280  /// .subscribe(|group| {
281  ///   group
282  ///   .reduce(|acc, person| format!("{} {}", acc, person.name))
283  ///   .subscribe(|result| println!("{}", result));
284  /// });
285  ///
286  /// // Prints:
287  /// //  John
288  /// //  Anne Alice
289  /// //  Gregory
290  /// ```
291  #[inline]
292  fn group_by<D, Item, Key>(self, discr: D) -> GroupByOp<Self, D>
293  where
294    D: FnMut(&Item) -> Key,
295  {
296    GroupByOp {
297      source: self,
298      discr,
299    }
300  }
301
302  /// Creates a new stream which calls a closure on each element and uses
303  /// its return as the value.
304  #[inline]
305  fn map<B, F>(self, f: F) -> MapOp<Self, F>
306  where
307    F: Fn(Self::Item) -> B,
308  {
309    MapOp {
310      source: self,
311      func: f,
312    }
313  }
314
315  /// Maps emissions to a constant value.
316  #[inline]
317  fn map_to<B>(self, value: B) -> MapToOp<Self, B> {
318    MapToOp {
319      source: self,
320      value,
321    }
322  }
323
324  /// combine two Observables into one by merging their emissions
325  ///
326  /// # Example
327  ///
328  /// ```
329  /// # use rxrust::prelude::*;
330  /// let numbers = LocalSubject::new();
331  /// // create a even stream by filter
332  /// let even = numbers.clone().filter(|v| *v % 2 == 0);
333  /// // create an odd stream by filter
334  /// let odd = numbers.clone().filter(|v| *v % 2 != 0);
335  ///
336  /// // merge odd and even stream again
337  /// let merged = even.merge(odd);
338  ///
339  /// // attach observers
340  /// merged.subscribe(|v: &i32| println!("{} ", v));
341  /// ```
342  #[inline]
343  fn merge<S>(self, o: S) -> MergeOp<Self, S>
344  where
345    S: Observable<Item = Self::Item, Err = Self::Err>,
346  {
347    MergeOp {
348      source1: self,
349      source2: o,
350    }
351  }
352
353  /// Converts a higher-order Observable into a first-order Observable which
354  /// concurrently delivers all values that are emitted on the inner
355  /// Observables.
356  ///
357  /// # Example
358  ///
359  /// ```
360  /// # use rxrust::prelude::*;
361  /// # use futures::executor::LocalPool;
362  /// # use std::time::Duration;
363  /// let mut local = LocalPool::new();
364  /// observable::from_iter(
365  ///   (0..3)
366  ///     .map(|_| interval(Duration::from_millis(1), local.spawner()).take(5)),
367  /// )
368  /// .merge_all(2)
369  /// .subscribe(move |i| println!("{}", i));
370  /// local.run();
371  /// ```
372  #[inline]
373  fn merge_all(self, concurrent: usize) -> MergeAllOp<Self> {
374    MergeAllOp {
375      source: self,
376      concurrent,
377    }
378  }
379
380  /// Emit only those items from an Observable that pass a predicate test
381  /// # Example
382  ///
383  /// ```
384  /// use rxrust:: prelude::*;
385  ///
386  /// let mut coll = vec![];
387  /// let coll_clone = coll.clone();
388  ///
389  /// observable::from_iter(0..10)
390  ///   .filter(|v| *v % 2 == 0)
391  ///   .subscribe(|v| { coll.push(v); });
392  ///
393  /// // only even numbers received.
394  /// assert_eq!(coll, vec![0, 2, 4, 6, 8]);
395  /// ```
396  #[inline]
397  fn filter<F>(self, filter: F) -> FilterOp<Self, F>
398  where
399    F: Fn(&Self::Item) -> bool,
400  {
401    FilterOp {
402      source: self,
403      filter,
404    }
405  }
406
407  /// The closure must return an Option<T>. filter_map creates an iterator which
408  /// calls this closure on each element. If the closure returns Some(element),
409  /// then that element is returned. If the closure returns None, it will try
410  /// again, and call the closure on the next element, seeing if it will return
411  /// Some.
412  ///
413  /// Why filter_map and not just filter and map? The key is in this part:
414  ///
415  /// If the closure returns Some(element), then that element is returned.
416  ///
417  /// In other words, it removes the Option<T> layer automatically. If your
418  /// mapping is already returning an Option<T> and you want to skip over Nones,
419  /// then filter_map is much, much nicer to use.
420  ///
421  /// # Examples
422  ///
423  /// ```
424  ///  # use rxrust::prelude::*;
425  ///  let mut res: Vec<i32> = vec![];
426  ///   observable::from_iter(["1", "lol", "3", "NaN", "5"].iter())
427  ///   .filter_map(|s: &&str| s.parse().ok())
428  ///   .subscribe(|v| res.push(v));
429  ///
430  /// assert_eq!(res, [1, 3, 5]);
431  /// ```
432  #[inline]
433  fn filter_map<F, SourceItem, Item>(self, f: F) -> FilterMapOp<Self, F>
434  where
435    F: FnMut(SourceItem) -> Option<Item>,
436  {
437    FilterMapOp { source: self, f }
438  }
439
440  /// box an observable to a safety object and convert it to a simple type
441  /// `BoxOp`, which only care `Item` and `Err` Observable emitted.
442  ///
443  /// # Example
444  /// ```
445  /// use rxrust::prelude::*;
446  /// use ops::box_it::LocalBoxOp;
447  ///
448  /// let mut boxed: LocalBoxOp<'_, i32, ()> = observable::of(1)
449  ///   .map(|v| v).box_it();
450  ///
451  /// // BoxOp can box any observable type
452  /// boxed = observable::empty().box_it();
453  ///
454  /// boxed.subscribe(|_| {});
455  /// ```
456  #[inline]
457  fn box_it<O: IntoBox<Self>>(self) -> BoxOp<O>
458  where
459    BoxOp<O>: Observable<Item = Self::Item, Err = Self::Err>,
460  {
461    O::box_it(self)
462  }
463
464  /// Ignore the first `count` values emitted by the source Observable.
465  ///
466  /// `skip` returns an Observable that ignore the first `count` values
467  /// emitted by the source Observable. If the source emits fewer than `count`
468  /// values then 0 of its values are emitted. After that, it completes,
469  /// regardless if the source completes.
470  ///
471  /// # Example
472  /// Ignore the first 5 seconds of an infinite 1-second interval Observable
473  ///
474  /// ```
475  /// # use rxrust::prelude::*;
476  ///
477  /// observable::from_iter(0..10).skip(5).subscribe(|v| println!("{}", v));
478
479  /// // print logs:
480  /// // 6
481  /// // 7
482  /// // 8
483  /// // 9
484  /// // 10
485  /// ```
486  #[inline]
487  fn skip(self, count: u32) -> SkipOp<Self> {
488    SkipOp {
489      source: self,
490      count,
491    }
492  }
493
494  /// Ignore values while result of a callback is true.
495  ///
496  /// `skip_while` returns an Observable that ignores values while result of an
497  /// callback is true emitted by the source Observable.
498  ///
499  /// # Example
500  /// Suppress the first 5 items of an infinite 1-second interval Observable
501  ///
502  /// ```
503  /// # use rxrust::prelude::*;
504  ///
505  /// observable::from_iter(0..10)
506  ///   .skip_while(|v| v < &5)
507  ///   .subscribe(|v| println!("{}", v));
508  ///
509  /// // print logs:
510  /// // 5
511  /// // 6
512  /// // 7
513  /// // 8
514  /// // 9
515  /// ```
516  #[inline]
517  fn skip_while<F>(self, callback: F) -> SkipWhileOp<Self, F>
518  where
519    F: FnMut(&Self::Item) -> bool,
520  {
521    SkipWhileOp {
522      source: self,
523      callback,
524    }
525  }
526
527  /// Ignore the last `count` values emitted by the source Observable.
528  ///
529  /// `skip_last` returns an Observable that ignore the last `count` values
530  /// emitted by the source Observable. If the source emits fewer than `count`
531  /// values then 0 of its values are emitted.
532  /// It will not emit values until source Observable complete.
533  ///
534  /// # Example
535  /// Skip the last 5 seconds of an infinite 1-second interval Observable
536  ///
537  /// ```
538  /// # use rxrust::prelude::*;
539  ///
540  /// observable::from_iter(0..10)
541  ///   .skip_last(5)
542  ///   .subscribe(|v| println!("{}", v));
543  ///
544  /// // print logs:
545  /// // 0
546  /// // 1
547  /// // 2
548  /// // 3
549  /// // 4
550  /// ```
551  #[inline]
552  fn skip_last(self, count: usize) -> SkipLastOp<Self> {
553    SkipLastOp {
554      source: self,
555      count,
556    }
557  }
558
559  /// Emits only the first `count` values emitted by the source Observable.
560  ///
561  /// `take` returns an Observable that emits only the first `count` values
562  /// emitted by the source Observable. If the source emits fewer than `count`
563  /// values then all of its values are emitted. After that, it completes,
564  /// regardless if the source completes.
565  ///
566  /// # Example
567  /// Take the first 5 seconds of an infinite 1-second interval Observable
568  ///
569  /// ```
570  /// # use rxrust::prelude::*;
571  ///
572  /// observable::from_iter(0..10).take(5).subscribe(|v| println!("{}", v));
573
574  /// // print logs:
575  /// // 0
576  /// // 1
577  /// // 2
578  /// // 3
579  /// // 4
580  /// ```
581  ///
582  #[inline]
583  fn take(self, count: u32) -> TakeOp<Self> {
584    TakeOp {
585      source: self,
586      count,
587    }
588  }
589
590  /// Emits the values emitted by the source Observable until a `notifier`
591  /// Observable emits a value.
592  ///
593  /// `take_until` subscribes and begins mirroring the source Observable. It
594  /// also monitors a second Observable, `notifier` that you provide. If the
595  /// `notifier` emits a value, the output Observable stops mirroring the source
596  /// Observable and completes. If the `notifier` doesn't emit any value and
597  /// completes then `take_until` will pass all values.
598  #[inline]
599  fn take_until<T>(self, notifier: T) -> TakeUntilOp<Self, T> {
600    TakeUntilOp {
601      source: self,
602      notifier,
603    }
604  }
605
606  /// Emits values while result of an callback is true.
607  ///
608  /// `take_while` returns an Observable that emits values while result of an
609  /// callback is true emitted by the source Observable.
610  /// It will not emit values until source Observable complete.
611  ///
612  /// # Example
613  /// Take the first 5 seconds of an infinite 1-second interval Observable
614  ///
615  /// ```
616  /// # use rxrust::prelude::*;
617  ///
618  /// observable::from_iter(0..10)
619  ///   .take_while(|v| v < &5)
620  /// .subscribe(|v| println!("{}", v));
621
622  /// // print logs:
623  /// // 0
624  /// // 1
625  /// // 2
626  /// // 3
627  /// // 4
628  /// ```
629  ///
630  #[inline]
631  fn take_while<F>(self, callback: F) -> TakeWhileOp<Self, F>
632  where
633    F: FnMut(&Self::Item) -> bool,
634  {
635    TakeWhileOp {
636      source: self,
637      callback,
638    }
639  }
640
641  /// Emits only the last `count` values emitted by the source Observable.
642  ///
643  /// `take_last` returns an Observable that emits only the last `count` values
644  /// emitted by the source Observable. If the source emits fewer than `count`
645  /// values then all of its values are emitted.
646  /// It will not emit values until source Observable complete.
647  ///
648  /// # Example
649  /// Take the last 5 seconds of an infinite 1-second interval Observable
650  ///
651  /// ```
652  /// # use rxrust::prelude::*;
653  ///
654  /// observable::from_iter(0..10)
655  ///   .take_last(5)
656  /// .subscribe(|v| println!("{}", v));
657
658  /// // print logs:
659  /// // 5
660  /// // 6
661  /// // 7
662  /// // 8
663  /// // 9
664  /// ```
665  ///
666  #[inline]
667  fn take_last(self, count: usize) -> TakeLastOp<Self> {
668    TakeLastOp {
669      source: self,
670      count,
671    }
672  }
673
674  /// Emits item it has most recently emitted since the previous sampling
675  ///
676  ///
677  /// It will emit values when sampling observable complete.
678  ///
679  /// #Example
680  /// Sampling every  5ms of an infinite 1ms interval Observable
681  /// ```
682  /// use rxrust::prelude::*;
683  /// use std::time::Duration;
684  /// use futures::executor::LocalPool;
685  ///
686  /// let mut local_scheduler = LocalPool::new();
687  /// let spawner = local_scheduler.spawner();
688  /// observable::interval(Duration::from_millis(2), spawner.clone())
689  ///   .sample(observable::interval(Duration::from_millis(5), spawner))
690  ///   .take(5)
691  ///   .subscribe(move |v| println!("{}", v));
692  ///
693  /// local_scheduler.run();
694  /// // print logs:
695  /// // 1
696  /// // 4
697  /// // 6
698  /// // 9
699  /// // ...
700  /// ```
701  #[inline]
702  fn sample<O>(self, sampling: O) -> SampleOp<Self, O>
703  where
704    O: Observable,
705  {
706    SampleOp {
707      source: self,
708      sampling,
709    }
710  }
711
712  /// The Scan operator applies a function to the first item emitted by the
713  /// source observable and then emits the result of that function as its
714  /// own first emission. It also feeds the result of the function back into
715  /// the function along with the second item emitted by the source observable
716  /// in order to generate its second emission. It continues to feed back its
717  /// own subsequent emissions along with the subsequent emissions from the
718  /// source Observable in order to create the rest of its sequence.
719  ///
720  /// Applies a binary operator closure to each item emitted from source
721  /// observable and emits successive values.
722  ///
723  /// Completes when source observable completes.
724  /// Emits error when source observable emits it.
725  ///
726  /// This version starts with an user-specified initial value for when the
727  /// binary operator is called with the first item processed.
728  ///
729  /// # Arguments
730  ///
731  /// * `initial_value` - An initial value to start the successive accumulations
732  ///   from.
733  /// * `binary_op` - A closure or function acting as a binary operator.
734  ///
735  /// # Examples
736  ///
737  /// ```
738  /// use rxrust::prelude::*;
739  ///
740  /// observable::from_iter(vec![1, 1, 1, 1, 1])
741  ///   .scan_initial(100, |acc, v| acc + v)
742  ///   .subscribe(|v| println!("{}", v));
743  ///
744  /// // print log:
745  /// // 101
746  /// // 102
747  /// // 103
748  /// // 104
749  /// // 105
750  /// ```
751  #[inline]
752  fn scan_initial<OutputItem, BinaryOp>(
753    self,
754    initial_value: OutputItem,
755    binary_op: BinaryOp,
756  ) -> ScanOp<Self, BinaryOp, OutputItem>
757  where
758    BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
759    OutputItem: Clone,
760  {
761    ScanOp {
762      source_observable: self,
763      binary_op,
764      initial_value,
765    }
766  }
767
768  /// Works like [`scan_initial`](Observable::scan_initial) but starts with a
769  /// value defined by a [`Default`] trait for the first argument `binary_op`
770  /// operator operates on.
771  ///
772  /// # Arguments
773  ///
774  /// * `binary_op` - A closure or function acting as a binary operator.
775  #[inline]
776  fn scan<OutputItem, BinaryOp>(
777    self,
778    binary_op: BinaryOp,
779  ) -> ScanOp<Self, BinaryOp, OutputItem>
780  where
781    BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
782    OutputItem: Default + Clone,
783  {
784    self.scan_initial(OutputItem::default(), binary_op)
785  }
786
787  /// Apply a function to each item emitted by an observable, sequentially,
788  /// and emit the final value, after source observable completes.
789  ///
790  /// Emits error when source observable emits it.
791  ///
792  /// # Arguments
793  ///
794  /// * `initial` - An initial value to start the successive reduction from.
795  /// * `binary_op` - A closure acting as a binary (folding) operator.
796  ///
797  /// # Examples
798  ///
799  /// ```
800  /// use rxrust::prelude::*;
801  ///
802  /// observable::from_iter(vec![1, 1, 1, 1, 1])
803  ///   .reduce_initial(100, |acc, v| acc + v)
804  ///   .subscribe(|v| println!("{}", v));
805  ///
806  /// // print log:
807  /// // 105
808  /// ```
809  #[inline]
810  fn reduce_initial<OutputItem, BinaryOp>(
811    self,
812    initial: OutputItem,
813    binary_op: BinaryOp,
814  ) -> ReduceOp<Self, BinaryOp, OutputItem>
815  where
816    BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
817    OutputItem: Clone,
818  {
819    // realised as a composition of `scan`, and `last`
820    self
821      .scan_initial(initial.clone(), binary_op)
822      .last_or(initial)
823  }
824
825  /// Works like [`reduce_initial`](Observable::reduce_initial) but starts with
826  /// a value defined by a [`Default`] trait for the first argument `f`
827  /// operator operates on.
828  ///
829  /// # Arguments
830  ///
831  /// * `binary_op` - A closure acting as a binary operator.
832  #[inline]
833  fn reduce<OutputItem, BinaryOp>(
834    self,
835    binary_op: BinaryOp,
836  ) -> DefaultIfEmptyOp<LastOp<ScanOp<Self, BinaryOp, OutputItem>, OutputItem>>
837  where
838    BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
839    OutputItem: Default + Clone,
840  {
841    self.reduce_initial(OutputItem::default(), binary_op)
842  }
843
844  /// Emits the item from the source observable that had the maximum value.
845  ///
846  /// Emits error when source observable emits it.
847  ///
848  /// # Examples
849  ///
850  /// ```
851  /// use rxrust::prelude::*;
852  ///
853  /// observable::from_iter(vec![3., 4., 7., 5., 6.])
854  ///   .max()
855  ///   .subscribe(|v| println!("{}", v));
856  ///
857  /// // print log:
858  /// // 7
859  /// ```
860  #[inline]
861  fn max(self) -> MinMaxOp<Self, Self::Item>
862  where
863    Self::Item: Clone + Send + PartialOrd<Self::Item>,
864  {
865    fn get_greater<Item>(i: Option<Item>, v: Item) -> Option<Item>
866    where
867      Item: Clone + PartialOrd<Item>,
868    {
869      i.map(|vv| if vv < v { v.clone() } else { vv }).or(Some(v))
870    }
871    let get_greater_func =
872      get_greater as fn(Option<Self::Item>, Self::Item) -> Option<Self::Item>;
873
874    self
875      .scan_initial(None, get_greater_func)
876      .last()
877      // we can safely unwrap, because we will ever get this item
878      // once a max value exists and is there.
879      .map(|v| v.unwrap())
880  }
881
882  /// Emits the item from the source observable that had the minimum value.
883  ///
884  /// Emits error when source observable emits it.
885  ///
886  /// # Examples
887  ///
888  /// ```
889  /// use rxrust::prelude::*;
890  ///
891  /// observable::from_iter(vec![3., 4., 7., 5., 6.])
892  ///   .min()
893  ///   .subscribe(|v| println!("{}", v));
894  ///
895  /// // print log:
896  /// // 3
897  /// ```
898  #[inline]
899  fn min(self) -> MinMaxOp<Self, Self::Item>
900  where
901    Self::Item: Clone + Send + PartialOrd<Self::Item>,
902  {
903    fn get_lesser<Item>(i: Option<Item>, v: Item) -> Option<Item>
904    where
905      Item: Clone + PartialOrd<Item>,
906    {
907      i.map(|vv| if vv > v { v.clone() } else { vv }).or(Some(v))
908    }
909
910    let get_lesser_func =
911      get_lesser as fn(Option<Self::Item>, Self::Item) -> Option<Self::Item>;
912
913    self
914      .scan_initial(None, get_lesser_func)
915      .last()
916      // we can safely unwrap, because we will ever get this item
917      // once a max value exists and is there.
918      .map(|v| v.unwrap())
919  }
920
921  /// Calculates the sum of numbers emitted by an source observable and emits
922  /// this sum when source completes.
923  ///
924  /// Emits zero when source completed as an and empty sequence.
925  /// Emits error when source observable emits it.
926  ///
927  /// # Examples
928  ///
929  /// ```
930  /// use rxrust::prelude::*;
931  ///
932  /// observable::from_iter(vec![1, 1, 1, 1, 1])
933  ///   .sum()
934  ///   .subscribe(|v| println!("{}", v));
935  ///
936  /// // p rint log:
937  /// // 5
938  /// ```
939  #[inline]
940  fn sum(self) -> SumOp<Self, Self::Item>
941  where
942    Self::Item: Clone + Default + Add<Self::Item, Output = Self::Item>,
943  {
944    self.reduce(|acc, v| acc + v)
945  }
946
947  /// Emits the number of items emitted by a source observable when this source
948  /// completes.
949  ///
950  /// The output type of this operator is fixed to [`usize`].
951  ///
952  /// Emits zero when source completed as an and empty sequence.
953  /// Emits error when source observable emits it.
954  ///
955  /// # Examples
956  ///
957  /// ```
958  /// use rxrust::prelude::*;
959  ///
960  /// observable::from_iter(vec!['1', '7', '3', '0', '4'])
961  ///   .count()
962  ///   .subscribe(|v| println!("{}", v));
963  ///
964  /// // print log:
965  /// // 5
966  /// ```
967  #[inline]
968  fn count(self) -> CountOp<Self, Self::Item> { self.reduce(|acc, _v| acc + 1) }
969
970  /// Calculates the sum of numbers emitted by an source observable and emits
971  /// this sum when source completes.
972  ///
973  /// Emits zero when source completed as an and empty sequence.
974  /// Emits error when source observable emits it.
975  ///
976  /// # Examples
977  ///
978  /// ```
979  /// use rxrust::prelude::*;
980  ///
981  /// observable::from_iter(vec![3., 4., 5., 6., 7.])
982  ///   .average()
983  ///   .subscribe(|v| println!("{}", v));
984  ///
985  /// // print log:
986  /// // 5
987  /// ```
988  #[inline]
989  fn average(self) -> AverageOp<Self, Self::Item>
990  where
991    Self::Item: Clone
992      + Send
993      + Default
994      + Add<Self::Item, Output = Self::Item>
995      + Mul<f64, Output = Self::Item>,
996  {
997    /// Computing an average by multiplying accumulated nominator by a
998    /// reciprocal of accumulated denominator. In this way some generic
999    /// types that support linear scaling over floats values could be
1000    /// averaged (e.g. vectors)
1001    fn average_floats<T>(acc: Accum<T>) -> T
1002    where
1003      T: Default + Clone + Send + Mul<f64, Output = T>,
1004    {
1005      // Note: we will never be dividing by zero here, as
1006      // the acc.1 will be always >= 1.
1007      // It would have be zero if we've would have received an element
1008      // when the source observable is empty but beacuse of how
1009      // `scan` works, we will transparently not receive anything in
1010      // such case.
1011      acc.0 * (1.0 / (acc.1 as f64))
1012    }
1013
1014    fn accumulate_item<T>(acc: Accum<T>, v: T) -> Accum<T>
1015    where
1016      T: Clone + Add<T, Output = T>,
1017    {
1018      let newacc = acc.0 + v;
1019      let newcount = acc.1 + 1;
1020      (newacc, newcount)
1021    }
1022
1023    // our starting point
1024    let start = (Self::Item::default(), 0);
1025
1026    let acc =
1027      accumulate_item as fn(Accum<Self::Item>, Self::Item) -> Accum<Self::Item>;
1028    let avg = average_floats as fn(Accum<Self::Item>) -> Self::Item;
1029
1030    self.scan_initial(start, acc).last().map(avg)
1031  }
1032
1033  /// Returns a ConnectableObservable. A ConnectableObservable Observable
1034  /// resembles an ordinary Observable, except that it does not begin emitting
1035  /// items when it is subscribed to, but only when the Connect operator is
1036  /// applied to it. In this way you can wait for all intended observers to
1037  /// subscribe to the Observable before the Observable begins emitting items.
1038  #[inline]
1039  fn publish<Subject: Default>(self) -> ConnectableObservable<Self, Subject> {
1040    ConnectableObservable {
1041      source: self,
1042      subject: Subject::default(),
1043    }
1044  }
1045
1046  /// Returns a new Observable that multicast (shares) the original
1047  /// Observable. As long as there is at least one Subscriber this
1048  /// Observable will be subscribed and emitting data. When all subscribers
1049  /// have unsubscribed it will unsubscribe from the source Observable.
1050  /// Because the Observable is multicasting it makes the stream `hot`.
1051  /// This is an alias for `publish().ref_count()`
1052  #[inline]
1053  fn share<Subject, Inner>(
1054    self,
1055  ) -> RefCount<Inner, ConnectableObservable<Self, Subject>>
1056  where
1057    Inner: RefCountCreator<Connectable = ConnectableObservable<Self, Subject>>,
1058    Subject: Default,
1059    Self: Clone,
1060  {
1061    self.publish::<Subject>().ref_count::<Inner>()
1062  }
1063
1064  /// Delays the emission of items from the source Observable by a given timeout
1065  /// or until a given `Instant`.
1066  #[inline]
1067  fn delay<SD>(self, dur: Duration, scheduler: SD) -> DelayOp<Self, SD> {
1068    DelayOp {
1069      source: self,
1070      delay: dur,
1071      scheduler,
1072    }
1073  }
1074
1075  #[inline]
1076  fn delay_at<SD>(self, at: Instant, scheduler: SD) -> DelayOp<Self, SD> {
1077    DelayOp {
1078      source: self,
1079      delay: at.elapsed(),
1080      scheduler,
1081    }
1082  }
1083
1084  /// Specify the Scheduler on which an Observable will operate
1085  ///
1086  /// With `SubscribeON` you can decide what type of scheduler a specific
1087  /// Observable will be using when it is subscribed to.
1088  ///
1089  /// Schedulers control the speed and order of emissions to observers from an
1090  /// Observable stream.
1091  ///
1092  /// # Example
1093  /// Given the following code:
1094  /// ```rust
1095  /// use rxrust::prelude::*;
1096  ///
1097  /// let a = observable::from_iter(1..5);
1098  /// let b = observable::from_iter(5..10);
1099  /// a.merge(b).subscribe(|v| print!("{} ", v));
1100  /// ```
1101  ///
1102  /// Both Observable `a` and `b` will emit their values directly and
1103  /// synchronously once they are subscribed to.
1104  /// This will result in the output of `1 2 3 4 5 6 7 8 9`.
1105  ///
1106  /// But if we instead use the `subscribe_on` operator declaring that we want
1107  /// to use the new thread scheduler for values emitted by Observable `a`:
1108  /// ```rust
1109  /// use rxrust::prelude::*;
1110  /// use std::thread;
1111  /// use futures::executor::ThreadPool;
1112  ///
1113  /// let pool = ThreadPool::new().unwrap();
1114  /// let a = observable::from_iter(1..5).subscribe_on(pool);
1115  /// let b = observable::from_iter(5..10);
1116  /// a.merge(b).into_shared().subscribe(|v|{
1117  ///   let handle = thread::current();
1118  ///   print!("{}({:?}) ", v, handle.id())
1119  /// });
1120  /// ```
1121  ///
1122  /// The output will instead by `1(thread 1) 2(thread 1) 3(thread 1) 4(thread
1123  /// 1)  5(thread 2) 6(thread 2) 7(thread 2) 8(thread 2) 9(thread id2)`.
1124  /// The reason for this is that Observable `b` emits its values directly like
1125  /// before, but the emissions from `a` are scheduled on a new thread because
1126  /// we are now using the `NewThread` Scheduler for that specific Observable.
1127  #[inline]
1128  fn subscribe_on<SD>(self, scheduler: SD) -> SubscribeOnOP<Self, SD> {
1129    SubscribeOnOP {
1130      source: self,
1131      scheduler,
1132    }
1133  }
1134
1135  /// Re-emits all notifications from source Observable with specified
1136  /// scheduler.
1137  ///
1138  /// `ObserveOn` is an operator that accepts a scheduler as the parameter,
1139  /// which will be used to reschedule notifications emitted by the source
1140  /// Observable.
1141  #[inline]
1142  fn observe_on<SD>(self, scheduler: SD) -> ObserveOnOp<Self, SD> {
1143    ObserveOnOp {
1144      source: self,
1145      scheduler,
1146    }
1147  }
1148
1149  /// Emits a value from the source Observable only after a particular time span
1150  /// has passed without another source emission.
1151  #[inline]
1152  fn debounce<SD>(
1153    self,
1154    duration: Duration,
1155    scheduler: SD,
1156  ) -> DebounceOp<Self, SD> {
1157    DebounceOp {
1158      source: self,
1159      duration,
1160      scheduler,
1161    }
1162  }
1163
1164  /// Emits a value from the source Observable, then ignores subsequent source
1165  /// values for duration milliseconds, then repeats this process.
1166  ///
1167  /// #Example
1168  /// ```
1169  /// use rxrust::{ prelude::*, ops::throttle_time::ThrottleEdge };
1170  /// use std::time::Duration;
1171  /// use futures::executor::LocalPool;
1172  ///
1173  /// let mut local_scheduler = LocalPool::new();
1174  /// let spawner = local_scheduler.spawner();
1175  /// observable::interval(Duration::from_millis(1), spawner.clone())
1176  ///   .throttle_time(Duration::from_millis(9), ThrottleEdge::Leading, spawner)
1177  ///   .take(5)
1178  ///   .subscribe(move |v| println!("{}", v));
1179  ///
1180  /// local_scheduler.run();
1181  /// ```
1182  #[inline]
1183  fn throttle_time<SD>(
1184    self,
1185    duration: Duration,
1186    edge: ThrottleEdge,
1187    scheduler: SD,
1188  ) -> ThrottleTimeOp<Self, SD> {
1189    ThrottleTimeOp {
1190      source: self,
1191      duration,
1192      edge,
1193      scheduler,
1194    }
1195  }
1196
1197  /// Returns an Observable that emits all items emitted by the source
1198  /// Observable that are distinct by comparison from previous items.
1199  #[inline]
1200  fn distinct(self) -> DistinctOp<Self> { DistinctOp { source: self } }
1201
1202  /// 'Zips up' two observable into a single observable of pairs.
1203  ///
1204  /// zip() returns a new observable that will emit over two other
1205  /// observables,  returning a tuple where the first element comes from the
1206  /// first observable, and  the second element comes from the second
1207  /// observable.
1208  ///
1209  ///  In other words, it zips two observables together, into a single one.
1210  #[inline]
1211  fn zip<U>(self, other: U) -> ZipOp<Self, U>
1212  where
1213    U: Observable,
1214  {
1215    ZipOp { a: self, b: other }
1216  }
1217
1218  /// Emits default value if Observable completed with empty result
1219  ///
1220  /// #Example
1221  /// ```
1222  /// use rxrust::prelude::*;
1223  ///
1224  /// observable::empty()
1225  ///   .default_if_empty(5)
1226  ///   .subscribe(|v| println!("{}", v));
1227  ///
1228  /// // Prints:
1229  /// // 5
1230  /// ```
1231  #[inline]
1232  fn default_if_empty(
1233    self,
1234    default_value: Self::Item,
1235  ) -> DefaultIfEmptyOp<Self> {
1236    DefaultIfEmptyOp {
1237      source: self,
1238      is_empty: true,
1239      default_value,
1240    }
1241  }
1242
1243  /// Buffers emitted values of type T in a Vec<T> and
1244  /// emits that Vec<T> as soon as the buffer's size equals
1245  /// the given count.
1246  /// On complete, if the buffer is not empty,
1247  /// it will be emitted.
1248  /// On error, the buffer will be discarded.
1249  ///
1250  /// The operator never returns an empty buffer.
1251  ///
1252  /// #Example
1253  /// ```
1254  /// use rxrust::prelude::*;
1255  ///
1256  /// observable::from_iter(0..6)
1257  ///   .buffer_with_count(3)
1258  ///   .subscribe(|vec| println!("{:?}", vec));
1259  ///
1260  /// // Prints:
1261  /// // [0, 1, 2]
1262  /// // [3, 4, 5]
1263  /// ```
1264  #[inline]
1265  fn buffer_with_count(self, count: usize) -> BufferWithCountOp<Self> {
1266    BufferWithCountOp {
1267      source: self,
1268      count,
1269    }
1270  }
1271
1272  /// Buffers emitted values of type T in a Vec<T> and
1273  /// emits that Vec<T> periodically.
1274  ///
1275  /// On complete, if the buffer is not empty,
1276  /// it will be emitted.
1277  /// On error, the buffer will be discarded.
1278  ///
1279  /// The operator never returns an empty buffer.
1280  ///
1281  /// #Example
1282  /// ```
1283  /// use rxrust::prelude::*;
1284  /// use std::time::Duration;
1285  /// use futures::executor::ThreadPool;
1286  ///
1287  /// let pool = ThreadPool::new().unwrap();
1288  ///
1289  /// observable::create(|mut subscriber| {
1290  ///   subscriber.next(0);
1291  ///   subscriber.next(1);
1292  ///   std::thread::sleep(Duration::from_millis(100));
1293  ///   subscriber.next(2);
1294  ///   subscriber.next(3);
1295  ///   subscriber.complete();
1296  /// })
1297  ///   .buffer_with_time(Duration::from_millis(50), pool)
1298  ///   .into_shared()
1299  ///   .subscribe(|vec| println!("{:?}", vec));
1300  ///
1301  /// // Prints:
1302  /// // [0, 1]
1303  /// // [2, 3]
1304  /// ```
1305  #[inline]
1306  fn buffer_with_time<S>(
1307    self,
1308    time: Duration,
1309    scheduler: S,
1310  ) -> BufferWithTimeOp<Self, S> {
1311    BufferWithTimeOp {
1312      source: self,
1313      time,
1314      scheduler,
1315    }
1316  }
1317
1318  /// Buffers emitted values of type T in a Vec<T> and
1319  /// emits that Vec<T> either if the buffer's size equals count, or
1320  /// periodically. This operator combines the functionality of
1321  /// buffer_with_count and buffer_with_time.
1322  ///
1323  /// #Example
1324  /// ```
1325  /// use rxrust::prelude::*;
1326  /// use std::time::Duration;
1327  /// use futures::executor::ThreadPool;
1328  ///
1329  /// let pool = ThreadPool::new().unwrap();
1330  ///
1331  /// observable::create(|mut subscriber| {
1332  ///   subscriber.next(0);
1333  ///   subscriber.next(1);
1334  ///   subscriber.next(2);
1335  ///   std::thread::sleep(Duration::from_millis(100));
1336  ///   subscriber.next(3);
1337  ///   subscriber.next(4);
1338  ///   subscriber.complete();
1339  /// })
1340  ///   .buffer_with_count_and_time(2, Duration::from_millis(50), pool)
1341  ///   .into_shared()
1342  ///   .subscribe(|vec| println!("{:?}", vec));
1343  ///
1344  /// // Prints:
1345  /// // [0, 1]
1346  /// // [2]
1347  /// // [3, 4]
1348  /// ```
1349  #[inline]
1350  fn buffer_with_count_and_time<S>(
1351    self,
1352    count: usize,
1353    time: Duration,
1354    scheduler: S,
1355  ) -> BufferWithCountOrTimerOp<Self, S> {
1356    BufferWithCountOrTimerOp {
1357      source: self,
1358      count,
1359      time,
1360      scheduler,
1361    }
1362  }
1363}
1364
1365pub trait LocalObservable<'a>: Observable {
1366  type Unsub: SubscriptionLike + 'static;
1367  fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
1368    self,
1369    subscriber: Subscriber<O, LocalSubscription>,
1370  ) -> Self::Unsub;
1371}
1372
1373#[macro_export]
1374macro_rules! observable_proxy_impl {
1375    ($ty: ident, $host: ident$(, $lf: lifetime)?$(, $generics: ident) *) => {
1376  impl<$($lf, )? $host, $($generics ,)*> Observable
1377    for $ty<$($lf, )? $host, $($generics ,)*>
1378  where
1379    $host: Observable
1380  {
1381    type Item = $host::Item;
1382    type Err = $host::Err;
1383  }
1384}
1385}
1386
1387#[cfg(test)]
1388mod tests {
1389  use super::*;
1390
1391  #[test]
1392  fn smoke_element_at() {
1393    let s = observable::from_iter(0..20);
1394    s.clone().element_at(0).subscribe(|v| assert_eq!(v, 0));
1395    s.clone().element_at(5).subscribe(|v| assert_eq!(v, 5));
1396    s.clone().element_at(20).subscribe(|v| assert_eq!(v, 20));
1397    s.element_at(21).subscribe(|_| panic!());
1398  }
1399
1400  #[test]
1401  fn bench_element_at() { do_bench_element_at(); }
1402
1403  benchmark_group!(do_bench_element_at, element_at_bench);
1404
1405  fn element_at_bench(b: &mut bencher::Bencher) { b.iter(smoke_element_at); }
1406
1407  #[test]
1408  fn first() {
1409    let mut completed = 0;
1410    let mut next_count = 0;
1411
1412    observable::from_iter(0..2)
1413      .first()
1414      .subscribe_complete(|_| next_count += 1, || completed += 1);
1415
1416    assert_eq!(completed, 1);
1417    assert_eq!(next_count, 1);
1418  }
1419
1420  #[test]
1421  fn bench_first() { do_bench_first(); }
1422
1423  benchmark_group!(do_bench_first, first_bench);
1424
1425  fn first_bench(b: &mut bencher::Bencher) { b.iter(first); }
1426
1427  #[test]
1428  fn first_or() {
1429    let mut completed = false;
1430    let mut next_count = 0;
1431
1432    observable::from_iter(0..2)
1433      .first_or(100)
1434      .subscribe_complete(|_| next_count += 1, || completed = true);
1435
1436    assert_eq!(next_count, 1);
1437    assert!(completed);
1438
1439    completed = false;
1440    let mut v = 0;
1441    observable::empty()
1442      .first_or(100)
1443      .subscribe_complete(|value| v = value, || completed = true);
1444
1445    assert!(completed);
1446    assert_eq!(v, 100);
1447  }
1448
1449  #[test]
1450  fn bench_first_or() { do_bench_first_or(); }
1451
1452  benchmark_group!(do_bench_first_or, first_or_bench);
1453
1454  fn first_or_bench(b: &mut bencher::Bencher) { b.iter(first_or); }
1455
1456  #[test]
1457  fn first_support_fork() {
1458    let mut value = 0;
1459    let mut value2 = 0;
1460    {
1461      let o = observable::from_iter(1..100).first();
1462      let o1 = o.clone().first();
1463      let o2 = o.first();
1464      o1.subscribe(|v| value = v);
1465      o2.subscribe(|v| value2 = v);
1466    }
1467    assert_eq!(value, 1);
1468    assert_eq!(value2, 1);
1469  }
1470
1471  #[test]
1472  fn first_or_support_fork() {
1473    let mut default = 0;
1474    let mut default2 = 0;
1475    let o = observable::create(|mut subscriber| {
1476      subscriber.complete();
1477    })
1478    .first_or(100);
1479    let o1 = o.clone().first_or(0);
1480    let o2 = o.clone().first_or(0);
1481    o1.subscribe(|v| default = v);
1482    o2.subscribe(|v| default2 = v);
1483    assert_eq!(default, 100);
1484    assert_eq!(default, 100);
1485  }
1486
1487  #[test]
1488  fn smoke_ignore_elements() {
1489    observable::from_iter(0..20)
1490      .ignore_elements()
1491      .subscribe(move |_| panic!());
1492  }
1493
1494  #[test]
1495  fn bench_ignore() { do_bench_ignore(); }
1496
1497  benchmark_group!(do_bench_ignore, ignore_emements_bench);
1498
1499  fn ignore_emements_bench(b: &mut bencher::Bencher) {
1500    b.iter(smoke_ignore_elements);
1501  }
1502
1503  #[test]
1504  fn shared_ignore_elements() {
1505    observable::from_iter(0..20)
1506      .ignore_elements()
1507      .into_shared()
1508      .subscribe(|_| panic!());
1509  }
1510
1511  #[test]
1512  fn smoke_all() {
1513    observable::from_iter(0..10)
1514      .all(|v| v < 10)
1515      .subscribe(|b| assert!(b));
1516    observable::from_iter(0..10)
1517      .all(|v| v < 5)
1518      .subscribe(|b| assert!(!b));
1519  }
1520
1521  #[test]
1522  fn bench_all() { do_bench_all(); }
1523
1524  benchmark_group!(do_bench_all, all_bench);
1525
1526  fn all_bench(b: &mut bencher::Bencher) { b.iter(smoke_all); }
1527}