rx_rust/operators/mathematical_aggregate/
count.rs1use crate::utils::types::{MarkerType, NecessarySendSync};
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8use std::marker::PhantomData;
9
10#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct Count<T, OE> {
39 source: OE,
40 _marker: MarkerType<T>,
41}
42
43impl<T, OE> Count<T, OE> {
44 pub fn new<'or, 'sub, E>(source: OE) -> Self
45 where
46 OE: Observable<'or, 'sub, T, E>,
47 {
48 Self {
49 source,
50 _marker: PhantomData,
51 }
52 }
53}
54
55impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, usize, E> for Count<T, OE>
56where
57 OE: Observable<'or, 'sub, T, E>,
58{
59 fn subscribe(
60 self,
61 observer: impl Observer<usize, E> + NecessarySendSync + 'or,
62 ) -> Subscription<'sub> {
63 let observer = CountObserver { observer, count: 0 };
64 self.source.subscribe(observer)
65 }
66}
67
68struct CountObserver<OR> {
69 observer: OR,
70 count: usize,
71}
72
73impl<T, E, OR> Observer<T, E> for CountObserver<OR>
74where
75 OR: Observer<usize, E>,
76{
77 fn on_next(&mut self, _: T) {
78 self.count += 1;
79 }
80
81 fn on_termination(mut self, termination: Termination<E>) {
82 match termination {
83 Termination::Completed => {
84 self.observer.on_next(self.count);
85 }
86 Termination::Error(_) => {}
87 }
88 self.observer.on_termination(termination)
89 }
90}