rx_rust/operators/mathematical_aggregate/
count.rs

1use crate::utils::types::{MarkerType, NecessarySendSync};
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8use std::marker::PhantomData;
9
10/// Counts the number of items emitted by the source Observable and emits this count.
11/// See <https://reactivex.io/documentation/operators/count.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::Termination,
18///     operators::{
19///         creating::from_iter::FromIter,
20///         mathematical_aggregate::count::Count,
21///     },
22/// };
23///
24/// let mut values = Vec::new();
25/// let mut terminations = Vec::new();
26///
27/// let observable = Count::new(FromIter::new(vec![1, 2, 3, 4]));
28/// observable.subscribe_with_callback(
29///     |value| values.push(value),
30///     |termination| terminations.push(termination),
31/// );
32///
33/// assert_eq!(values, vec![4]);
34/// assert_eq!(terminations, vec![Termination::Completed]);
35/// ```
36#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct Count<T, OE> {
39    source: OE,
40    _marker: MarkerType<T>,
41}
42
43impl<T, OE> Count<T, OE> {
44    pub fn new<'or, 'sub, E>(source: OE) -> Self
45    where
46        OE: Observable<'or, 'sub, T, E>,
47    {
48        Self {
49            source,
50            _marker: PhantomData,
51        }
52    }
53}
54
55impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, usize, E> for Count<T, OE>
56where
57    OE: Observable<'or, 'sub, T, E>,
58{
59    fn subscribe(
60        self,
61        observer: impl Observer<usize, E> + NecessarySendSync + 'or,
62    ) -> Subscription<'sub> {
63        let observer = CountObserver { observer, count: 0 };
64        self.source.subscribe(observer)
65    }
66}
67
68struct CountObserver<OR> {
69    observer: OR,
70    count: usize,
71}
72
73impl<T, E, OR> Observer<T, E> for CountObserver<OR>
74where
75    OR: Observer<usize, E>,
76{
77    fn on_next(&mut self, _: T) {
78        self.count += 1;
79    }
80
81    fn on_termination(mut self, termination: Termination<E>) {
82        match termination {
83            Termination::Completed => {
84                self.observer.on_next(self.count);
85            }
86            Termination::Error(_) => {}
87        }
88        self.observer.on_termination(termination)
89    }
90}