another_rxrust/operators/
first.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct First<Item>
6where
7  Item: Clone + Send + Sync,
8{
9  take_op: operators::Take<Item>,
10}
11
12impl<'a, Item> First<Item>
13where
14  Item: Clone + Send + Sync,
15{
16  pub fn new() -> First<Item> {
17    First { take_op: operators::Take::new(1) }
18  }
19  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
20    let take_op = self.take_op.clone();
21
22    Observable::<Item>::create(move |s| {
23      let source = source.clone();
24
25      let sctl = StreamController::new(s);
26      let sctl_next = sctl.clone();
27      let sctl_error = sctl.clone();
28      let sctl_complete = sctl.clone();
29
30      take_op.execute(source).inner_subscribe(sctl.new_observer(
31        move |_, x| {
32          sctl_next.sink_next(x);
33        },
34        move |_, e| {
35          sctl_error.sink_error(e);
36        },
37        move |serial| sctl_complete.sink_complete(&serial),
38      ));
39    })
40  }
41}
42
43impl<'a, Item> Observable<'a, Item>
44where
45  Item: Clone + Send + Sync,
46{
47  pub fn first(&self) -> Observable<'a, Item> {
48    First::new().execute(self.clone())
49  }
50}
51
52#[cfg(test)]
53mod test {
54  use crate::prelude::*;
55
56  #[test]
57  fn basic() {
58    observables::from_iter(0..10).first().subscribe(
59      print_next_fmt!("{}"),
60      print_error!(),
61      print_complete!(),
62    );
63  }
64}