another_rxrust/operators/
min.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  marker::PhantomData,
5  sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct Min<Item> {
10  _item: PhantomData<Item>,
11}
12
13impl<'a, Item> Min<Item>
14where
15  Item: Clone + Send + Sync + PartialOrd,
16{
17  pub fn new() -> Min<Item> {
18    Min { _item: PhantomData }
19  }
20  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
21    Observable::<Item>::create(move |s| {
22      let result = Arc::new(RwLock::new(None::<Item>));
23
24      let sctl = StreamController::new(s);
25      let sctl_error = sctl.clone();
26      let sctl_complete = sctl.clone();
27
28      let result_next = Arc::clone(&result);
29
30      source.inner_subscribe(sctl.new_observer(
31        move |_, x| {
32          let mut r = result_next.write().unwrap();
33          if let Some(xx) = &*r {
34            if x < *xx {
35              *r = Some(x);
36            }
37          } else {
38            *r = Some(x);
39          }
40        },
41        move |_, e| {
42          sctl_error.sink_error(e);
43        },
44        move |serial| {
45          if let Some(x) = &*result.read().unwrap() {
46            sctl_complete.sink_next(x.clone());
47          }
48          sctl_complete.sink_complete(&serial);
49        },
50      ));
51    })
52  }
53}
54
55impl<'a, Item> Observable<'a, Item>
56where
57  Item: Clone + Send + Sync + PartialOrd,
58{
59  pub fn min(&self) -> Observable<'a, Item> {
60    Min::new().execute(self.clone())
61  }
62}
63
64#[cfg(test)]
65mod test {
66  use crate::prelude::*;
67
68  #[test]
69  fn basic() {
70    observables::from_iter([5, 6, 2, 7].into_iter())
71      .min()
72      .subscribe(
73        print_next_fmt!("{}"),
74        print_error!(),
75        print_complete!(),
76      );
77  }
78
79  #[test]
80  fn empty() {
81    observables::empty::<i32>().min().subscribe(
82      print_next_fmt!("{}"),
83      print_error!(),
84      print_complete!(),
85    );
86  }
87
88  #[test]
89  fn error() {
90    Observable::create(|s| {
91      s.next(1);
92      s.error(RxError::from_error("ERR!"))
93    })
94    .min()
95    .subscribe(
96      print_next_fmt!("{}"),
97      print_error_as!(&str),
98      print_complete!(),
99    );
100  }
101}