another_rxrust/operators/
window_with_count.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  marker::PhantomData,
5  sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct WindowWithCount<Item>
10where
11  Item: Clone + Send + Sync,
12{
13  count: usize,
14  _item: PhantomData<Item>,
15}
16
17impl<'a, Item> WindowWithCount<Item>
18where
19  Item: Clone + Send + Sync,
20{
21  pub fn new(count: usize) -> WindowWithCount<Item> {
22    WindowWithCount { count, _item: PhantomData }
23  }
24  pub fn execute(
25    &self,
26    source: Observable<'a, Item>,
27  ) -> Observable<'a, Observable<'a, Item>> {
28    let count = self.count;
29
30    Observable::create(move |s| {
31      let n = Arc::new(RwLock::new(0));
32      let sbj = Arc::new(RwLock::new(
33        subjects::Subject::<Item>::new(),
34      ));
35
36      let sctl = StreamController::new(s);
37      let sctl_next = sctl.clone();
38      let sctl_error = sctl.clone();
39      let sctl_complete = sctl.clone();
40
41      let sbj_next = Arc::clone(&sbj);
42      let sbj_error = Arc::clone(&sbj);
43      let sbj_complete = Arc::clone(&sbj);
44
45      source.inner_subscribe(sctl.new_observer(
46        move |_, x| {
47          let mut n = n.write().unwrap();
48          if *n == 0 {
49            sctl_next.sink_next(sbj_next.read().unwrap().observable());
50            sbj_next.read().unwrap().next(x);
51            *n += 1;
52          } else {
53            sbj_next.read().unwrap().next(x);
54            *n += 1;
55            if *n == count {
56              sbj_next.read().unwrap().complete();
57              *sbj_next.write().unwrap() = subjects::Subject::<Item>::new();
58              *n = 0;
59            }
60          }
61        },
62        move |_, e| {
63          sbj_error.read().unwrap().error(e.clone());
64          sctl_error.sink_error(e);
65        },
66        move |serial| {
67          sbj_complete.read().unwrap().complete();
68          sctl_complete.sink_complete(&serial);
69        },
70      ));
71    })
72  }
73}
74
75impl<'a, Item> Observable<'a, Item>
76where
77  Item: Clone + Send + Sync,
78{
79  pub fn window_with_count(
80    &self,
81    count: usize,
82  ) -> Observable<'a, Observable<'a, Item>> {
83    WindowWithCount::new(count).execute(self.clone())
84  }
85}
86
87#[cfg(test)]
88mod test {
89  use crate::prelude::*;
90  use std::sync::{Arc, RwLock};
91
92  #[test]
93  fn basic() {
94    let n = Arc::new(RwLock::new(0));
95    observables::from_iter(0..10)
96      .window_with_count(3)
97      .subscribe(
98        move |x| {
99          let nn = *n.read().unwrap();
100          *n.write().unwrap() += 1;
101          x.subscribe(
102            move |y| println!("next ({}) - {}", nn, y),
103            move |e| println!("error ({}) - {:?}", nn, e),
104            move || println!("complete ({})", nn),
105          );
106        },
107        print_error!(),
108        print_complete!(),
109      );
110  }
111
112  #[test]
113  fn error() {
114    let n = Arc::new(RwLock::new(0));
115    observables::from_iter(0..10)
116      .flat_map(|x| {
117        if x == 7 {
118          observables::error(RxError::from_error("it's 7!!"))
119        } else {
120          observables::just(x)
121        }
122      })
123      .window_with_count(3)
124      .subscribe(
125        move |x| {
126          let nn = *n.read().unwrap();
127          *n.write().unwrap() += 1;
128          x.subscribe(
129            move |y| println!("next ({}) - {}", nn, y),
130            move |e| println!("error ({}) - {:?}", nn, e),
131            move || println!("complete ({})", nn),
132          );
133        },
134        print_error!(),
135        print_complete!(),
136      );
137  }
138}