another_rxrust/operators/
count.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  marker::PhantomData,
5  sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct Count<Item> {
10  _item: PhantomData<Item>,
11}
12
13impl<'a, Item> Count<Item>
14where
15  Item: Clone + Send + Sync,
16{
17  pub fn new() -> Count<Item> {
18    Count { _item: PhantomData }
19  }
20  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, usize> {
21    Observable::<usize>::create(move |s| {
22      let n = Arc::new(RwLock::new(0usize));
23
24      let sctl = StreamController::new(s);
25      let sctl_error = sctl.clone();
26      let sctl_complete = sctl.clone();
27
28      let n_next = Arc::clone(&n);
29
30      source.inner_subscribe(sctl.new_observer(
31        move |_, _| {
32          *n_next.write().unwrap() += 1;
33        },
34        move |_, e| {
35          sctl_error.sink_error(e);
36        },
37        move |serial| {
38          let n = *n.read().unwrap();
39          sctl_complete.sink_next(n);
40          sctl_complete.sink_complete(&serial);
41        },
42      ));
43    })
44  }
45}
46
47impl<'a, Item> Observable<'a, Item>
48where
49  Item: Clone + Send + Sync,
50{
51  pub fn count(&self) -> Observable<'a, usize> {
52    Count::new().execute(self.clone())
53  }
54}
55
56#[cfg(test)]
57mod test {
58  use crate::prelude::*;
59
60  #[test]
61  fn basic() {
62    observables::repeat(()).take(100).count().subscribe(
63      print_next_fmt!("{}"),
64      print_error!(),
65      print_complete!(),
66    );
67  }
68
69  #[test]
70  fn empty() {
71    observables::empty::<i32>().count().subscribe(
72      print_next_fmt!("{}"),
73      print_error!(),
74      print_complete!(),
75    );
76  }
77
78  #[test]
79  fn error() {
80    Observable::create(|s| {
81      s.next(1);
82      s.error(RxError::from_error("ERR!"))
83    })
84    .count()
85    .subscribe(
86      print_next_fmt!("{}"),
87      print_error_as!(&str),
88      print_complete!(),
89    );
90  }
91}