another_rxrust/operators/
buffer_with_count.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 marker::PhantomData,
5 sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct BufferWithCount<Item>
10where
11 Item: Clone + Send + Sync,
12{
13 count: usize,
14 _item: PhantomData<Item>,
15}
16
17impl<'a, Item> BufferWithCount<Item>
18where
19 Item: Clone + Send + Sync,
20{
21 pub fn new(count: usize) -> BufferWithCount<Item> {
22 assert!(count > 0);
23 BufferWithCount { count, _item: PhantomData }
24 }
25 pub fn execute(
26 &self,
27 source: Observable<'a, Item>,
28 ) -> Observable<'a, Vec<Item>> {
29 let count = self.count;
30 Observable::<Vec<Item>>::create(move |s| {
31 let results = Arc::new(RwLock::new(Vec::new()));
32 let sctl = StreamController::new(s);
33
34 let sctl_f = sctl.clone();
35 let results_f = Arc::clone(&results);
36 let register = move |item: Item| {
37 let vec = {
38 let mut vec = results_f.write().unwrap();
39 vec.push(item);
40 if vec.len() == count {
41 let v = vec.clone();
42 vec.clear();
43 Some(v)
44 } else {
45 None
46 }
47 };
48 if let Some(vec) = vec {
49 sctl_f.sink_next(vec);
50 }
51 };
52
53 {
54 let register = register.clone();
55 let sctl_error = sctl.clone();
56 let sctl_complete = sctl.clone();
57
58 source.inner_subscribe(sctl.new_observer(
59 move |_, x| register(x),
60 move |_, e| {
61 sctl_error.sink_error(e);
62 },
63 move |serial| {
64 let vec = results.read().unwrap();
65 if vec.len() > 0 {
66 sctl_complete.sink_next(vec.clone());
67 }
68 sctl_complete.sink_complete(&serial);
69 },
70 ));
71 }
72 })
73 }
74}
75
76impl<'a, Item> Observable<'a, Item>
77where
78 Item: Clone + Send + Sync,
79{
80 pub fn buffer_with_count(&self, count: usize) -> Observable<'a, Vec<Item>> {
81 BufferWithCount::new(count).execute(self.clone())
82 }
83}
84
85#[cfg(test)]
86mod test {
87 use crate::prelude::*;
88 use std::{thread, time};
89
90 #[test]
91 fn basic() {
92 observables::from_iter(0..10)
93 .buffer_with_count(3)
94 .subscribe(
95 print_next_fmt!("{:?}"),
96 print_error!(),
97 print_complete!(),
98 );
99 thread::sleep(time::Duration::from_millis(1000));
100 }
101}