another_rxrust/operators/
sample.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4use std::sync::{Arc, RwLock};
5
6#[derive(Clone)]
7pub struct Sample<'a, Item, TriggerValue>
8where
9  Item: Clone + Send + Sync,
10  TriggerValue: Clone + Send + Sync,
11{
12  trigger: Observable<'a, TriggerValue>,
13  _item: PhantomData<Item>,
14}
15
16impl<'a, Item, TriggerValue> Sample<'a, Item, TriggerValue>
17where
18  Item: Clone + Send + Sync,
19  TriggerValue: Clone + Send + Sync,
20{
21  pub fn new(
22    trigger: Observable<'a, TriggerValue>,
23  ) -> Sample<'a, Item, TriggerValue>
24  where
25    TriggerValue: Clone + Send + Sync,
26  {
27    Sample { trigger, _item: PhantomData }
28  }
29  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
30    let trigger = self.trigger.clone();
31    Observable::<Item>::create(move |s| {
32      let value = Arc::new(RwLock::new(None::<Item>));
33      let sctl = StreamController::new(s);
34
35      let obs_trigger = {
36        let value_trigger_next = Arc::clone(&value);
37        let sctl_trigger_next = sctl.clone();
38
39        sctl.new_observer(
40          move |_, _| {
41            let value = {
42              let mut v = value_trigger_next.write().unwrap();
43              let vv = v.clone();
44              *v = None;
45              vv
46            };
47            if let Some(v) = value {
48              sctl_trigger_next.sink_next(v);
49            }
50          },
51          |_, _| {},
52          |_| {},
53        )
54      };
55
56      let obs_source = {
57        let value_next = Arc::clone(&value);
58        let sctl_error = sctl.clone();
59        let sctl_complete = sctl.clone();
60
61        sctl.new_observer(
62          move |_, x| {
63            *value_next.write().unwrap() = Some(x);
64          },
65          move |_, e| {
66            sctl_error.sink_error(e);
67          },
68          move |_| sctl_complete.sink_complete_force(), // trigger also unsubscribe
69        )
70      };
71
72      trigger.inner_subscribe(obs_trigger);
73      source.inner_subscribe(obs_source);
74    })
75  }
76}
77
78impl<'a, Item> Observable<'a, Item>
79where
80  Item: Clone + Send + Sync,
81{
82  pub fn sample<TriggerValue>(
83    &self,
84    trigger: Observable<'a, TriggerValue>,
85  ) -> Observable<'a, Item>
86  where
87    TriggerValue: Clone + Send + Sync,
88  {
89    Sample::new(trigger).execute(self.clone())
90  }
91}
92
93#[cfg(all(test, not(feature = "web")))]
94mod test {
95  use crate::prelude::*;
96  use std::{thread, time};
97
98  #[test]
99  fn basic() {
100    let sbj = subjects::Subject::new();
101    observables::interval(
102      time::Duration::from_millis(100),
103      schedulers::new_thread_scheduler(),
104    )
105    .sample(sbj.observable())
106    .take(3)
107    .subscribe(
108      print_next_fmt!("{}"),
109      print_error!(),
110      print_complete!(),
111    );
112
113    (0..3).for_each(|_| {
114      thread::sleep(time::Duration::from_millis(500));
115      sbj.next(());
116    });
117    sbj.complete();
118    thread::sleep(time::Duration::from_millis(500));
119  }
120}