another_rxrust/operators/
element_at.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct ElementAt<Item>
6where
7  Item: Clone + Send + Sync,
8{
9  take_op: operators::Take<Item>,
10}
11
12impl<'a, Item> ElementAt<Item>
13where
14  Item: Clone + Send + Sync,
15{
16  pub fn new(count: usize) -> ElementAt<Item> {
17    ElementAt {
18      take_op: operators::Take::<Item>::new(count),
19    }
20  }
21  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
22    let take_op = self.take_op.clone();
23
24    Observable::<Item>::create(move |s| {
25      let source = source.clone();
26
27      let sctl = StreamController::new(s);
28      let sctl_next = sctl.clone();
29      let sctl_error = sctl.clone();
30      let sctl_complete = sctl.clone();
31
32      take_op
33        .execute(source)
34        .last()
35        .inner_subscribe(sctl.new_observer(
36          move |_, x| {
37            sctl_next.sink_next(x);
38          },
39          move |_, e| {
40            sctl_error.sink_error(e);
41          },
42          move |serial| sctl_complete.sink_complete(&serial),
43        ));
44    })
45  }
46}
47
48impl<'a, Item> Observable<'a, Item>
49where
50  Item: Clone + Send + Sync,
51{
52  pub fn element_at(&self, count: usize) -> Observable<'a, Item> {
53    ElementAt::new(count).execute(self.clone())
54  }
55}
56
57#[cfg(test)]
58mod test {
59  use crate::prelude::*;
60
61  #[test]
62  fn basic() {
63    observables::from_iter(1..100)
64      .element_at(86)
65      .tap(
66        |x| assert_eq!(x, 86),
67        junk_error!(),
68        junk_complete!(),
69      )
70      .subscribe(
71        print_next_fmt!("{}"),
72        print_error!(),
73        print_complete!(),
74      );
75  }
76}