another_rxrust/operators/
window_with_count.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 marker::PhantomData,
5 sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct WindowWithCount<Item>
10where
11 Item: Clone + Send + Sync,
12{
13 count: usize,
14 _item: PhantomData<Item>,
15}
16
17impl<'a, Item> WindowWithCount<Item>
18where
19 Item: Clone + Send + Sync,
20{
21 pub fn new(count: usize) -> WindowWithCount<Item> {
22 WindowWithCount { count, _item: PhantomData }
23 }
24 pub fn execute(
25 &self,
26 source: Observable<'a, Item>,
27 ) -> Observable<'a, Observable<'a, Item>> {
28 let count = self.count;
29
30 Observable::create(move |s| {
31 let n = Arc::new(RwLock::new(0));
32 let sbj = Arc::new(RwLock::new(
33 subjects::Subject::<Item>::new(),
34 ));
35
36 let sctl = StreamController::new(s);
37 let sctl_next = sctl.clone();
38 let sctl_error = sctl.clone();
39 let sctl_complete = sctl.clone();
40
41 let sbj_next = Arc::clone(&sbj);
42 let sbj_error = Arc::clone(&sbj);
43 let sbj_complete = Arc::clone(&sbj);
44
45 source.inner_subscribe(sctl.new_observer(
46 move |_, x| {
47 let mut n = n.write().unwrap();
48 if *n == 0 {
49 sctl_next.sink_next(sbj_next.read().unwrap().observable());
50 sbj_next.read().unwrap().next(x);
51 *n += 1;
52 } else {
53 sbj_next.read().unwrap().next(x);
54 *n += 1;
55 if *n == count {
56 sbj_next.read().unwrap().complete();
57 *sbj_next.write().unwrap() = subjects::Subject::<Item>::new();
58 *n = 0;
59 }
60 }
61 },
62 move |_, e| {
63 sbj_error.read().unwrap().error(e.clone());
64 sctl_error.sink_error(e);
65 },
66 move |serial| {
67 sbj_complete.read().unwrap().complete();
68 sctl_complete.sink_complete(&serial);
69 },
70 ));
71 })
72 }
73}
74
75impl<'a, Item> Observable<'a, Item>
76where
77 Item: Clone + Send + Sync,
78{
79 pub fn window_with_count(
80 &self,
81 count: usize,
82 ) -> Observable<'a, Observable<'a, Item>> {
83 WindowWithCount::new(count).execute(self.clone())
84 }
85}
86
87#[cfg(test)]
88mod test {
89 use crate::prelude::*;
90 use std::sync::{Arc, RwLock};
91
92 #[test]
93 fn basic() {
94 let n = Arc::new(RwLock::new(0));
95 observables::from_iter(0..10)
96 .window_with_count(3)
97 .subscribe(
98 move |x| {
99 let nn = *n.read().unwrap();
100 *n.write().unwrap() += 1;
101 x.subscribe(
102 move |y| println!("next ({}) - {}", nn, y),
103 move |e| println!("error ({}) - {:?}", nn, e),
104 move || println!("complete ({})", nn),
105 );
106 },
107 print_error!(),
108 print_complete!(),
109 );
110 }
111
112 #[test]
113 fn error() {
114 let n = Arc::new(RwLock::new(0));
115 observables::from_iter(0..10)
116 .flat_map(|x| {
117 if x == 7 {
118 observables::error(RxError::from_error("it's 7!!"))
119 } else {
120 observables::just(x)
121 }
122 })
123 .window_with_count(3)
124 .subscribe(
125 move |x| {
126 let nn = *n.read().unwrap();
127 *n.write().unwrap() += 1;
128 x.subscribe(
129 move |y| println!("next ({}) - {}", nn, y),
130 move |e| println!("error ({}) - {:?}", nn, e),
131 move || println!("complete ({})", nn),
132 );
133 },
134 print_error!(),
135 print_complete!(),
136 );
137 }
138}