another_rxrust/operators/
buffer_with_count.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  marker::PhantomData,
5  sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct BufferWithCount<Item>
10where
11  Item: Clone + Send + Sync,
12{
13  count: usize,
14  _item: PhantomData<Item>,
15}
16
17impl<'a, Item> BufferWithCount<Item>
18where
19  Item: Clone + Send + Sync,
20{
21  pub fn new(count: usize) -> BufferWithCount<Item> {
22    assert!(count > 0);
23    BufferWithCount { count, _item: PhantomData }
24  }
25  pub fn execute(
26    &self,
27    source: Observable<'a, Item>,
28  ) -> Observable<'a, Vec<Item>> {
29    let count = self.count;
30    Observable::<Vec<Item>>::create(move |s| {
31      let results = Arc::new(RwLock::new(Vec::new()));
32      let sctl = StreamController::new(s);
33
34      let sctl_f = sctl.clone();
35      let results_f = Arc::clone(&results);
36      let register = move |item: Item| {
37        let vec = {
38          let mut vec = results_f.write().unwrap();
39          vec.push(item);
40          if vec.len() == count {
41            let v = vec.clone();
42            vec.clear();
43            Some(v)
44          } else {
45            None
46          }
47        };
48        if let Some(vec) = vec {
49          sctl_f.sink_next(vec);
50        }
51      };
52
53      {
54        let register = register.clone();
55        let sctl_error = sctl.clone();
56        let sctl_complete = sctl.clone();
57
58        source.inner_subscribe(sctl.new_observer(
59          move |_, x| register(x),
60          move |_, e| {
61            sctl_error.sink_error(e);
62          },
63          move |serial| {
64            let vec = results.read().unwrap();
65            if vec.len() > 0 {
66              sctl_complete.sink_next(vec.clone());
67            }
68            sctl_complete.sink_complete(&serial);
69          },
70        ));
71      }
72    })
73  }
74}
75
76impl<'a, Item> Observable<'a, Item>
77where
78  Item: Clone + Send + Sync,
79{
80  pub fn buffer_with_count(&self, count: usize) -> Observable<'a, Vec<Item>> {
81    BufferWithCount::new(count).execute(self.clone())
82  }
83}
84
85#[cfg(test)]
86mod test {
87  use crate::prelude::*;
88  use std::{thread, time};
89
90  #[test]
91  fn basic() {
92    observables::from_iter(0..10)
93      .buffer_with_count(3)
94      .subscribe(
95        print_next_fmt!("{:?}"),
96        print_error!(),
97        print_complete!(),
98      );
99    thread::sleep(time::Duration::from_millis(1000));
100  }
101}