another_rxrust/operators/
element_at.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct ElementAt<Item>
6where
7 Item: Clone + Send + Sync,
8{
9 take_op: operators::Take<Item>,
10}
11
12impl<'a, Item> ElementAt<Item>
13where
14 Item: Clone + Send + Sync,
15{
16 pub fn new(count: usize) -> ElementAt<Item> {
17 ElementAt {
18 take_op: operators::Take::<Item>::new(count),
19 }
20 }
21 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
22 let take_op = self.take_op.clone();
23
24 Observable::<Item>::create(move |s| {
25 let source = source.clone();
26
27 let sctl = StreamController::new(s);
28 let sctl_next = sctl.clone();
29 let sctl_error = sctl.clone();
30 let sctl_complete = sctl.clone();
31
32 take_op
33 .execute(source)
34 .last()
35 .inner_subscribe(sctl.new_observer(
36 move |_, x| {
37 sctl_next.sink_next(x);
38 },
39 move |_, e| {
40 sctl_error.sink_error(e);
41 },
42 move |serial| sctl_complete.sink_complete(&serial),
43 ));
44 })
45 }
46}
47
48impl<'a, Item> Observable<'a, Item>
49where
50 Item: Clone + Send + Sync,
51{
52 pub fn element_at(&self, count: usize) -> Observable<'a, Item> {
53 ElementAt::new(count).execute(self.clone())
54 }
55}
56
57#[cfg(test)]
58mod test {
59 use crate::prelude::*;
60
61 #[test]
62 fn basic() {
63 observables::from_iter(1..100)
64 .element_at(86)
65 .tap(
66 |x| assert_eq!(x, 86),
67 junk_error!(),
68 junk_complete!(),
69 )
70 .subscribe(
71 print_next_fmt!("{}"),
72 print_error!(),
73 print_complete!(),
74 );
75 }
76}