another_rxrust/operators/
first.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct First<Item>
6where
7 Item: Clone + Send + Sync,
8{
9 take_op: operators::Take<Item>,
10}
11
12impl<'a, Item> First<Item>
13where
14 Item: Clone + Send + Sync,
15{
16 pub fn new() -> First<Item> {
17 First { take_op: operators::Take::new(1) }
18 }
19 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
20 let take_op = self.take_op.clone();
21
22 Observable::<Item>::create(move |s| {
23 let source = source.clone();
24
25 let sctl = StreamController::new(s);
26 let sctl_next = sctl.clone();
27 let sctl_error = sctl.clone();
28 let sctl_complete = sctl.clone();
29
30 take_op.execute(source).inner_subscribe(sctl.new_observer(
31 move |_, x| {
32 sctl_next.sink_next(x);
33 },
34 move |_, e| {
35 sctl_error.sink_error(e);
36 },
37 move |serial| sctl_complete.sink_complete(&serial),
38 ));
39 })
40 }
41}
42
43impl<'a, Item> Observable<'a, Item>
44where
45 Item: Clone + Send + Sync,
46{
47 pub fn first(&self) -> Observable<'a, Item> {
48 First::new().execute(self.clone())
49 }
50}
51
52#[cfg(test)]
53mod test {
54 use crate::prelude::*;
55
56 #[test]
57 fn basic() {
58 observables::from_iter(0..10).first().subscribe(
59 print_next_fmt!("{}"),
60 print_error!(),
61 print_complete!(),
62 );
63 }
64}