another_rxrust/operators/
sample.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4use std::sync::{Arc, RwLock};
5
6#[derive(Clone)]
7pub struct Sample<'a, Item, TriggerValue>
8where
9 Item: Clone + Send + Sync,
10 TriggerValue: Clone + Send + Sync,
11{
12 trigger: Observable<'a, TriggerValue>,
13 _item: PhantomData<Item>,
14}
15
16impl<'a, Item, TriggerValue> Sample<'a, Item, TriggerValue>
17where
18 Item: Clone + Send + Sync,
19 TriggerValue: Clone + Send + Sync,
20{
21 pub fn new(
22 trigger: Observable<'a, TriggerValue>,
23 ) -> Sample<'a, Item, TriggerValue>
24 where
25 TriggerValue: Clone + Send + Sync,
26 {
27 Sample { trigger, _item: PhantomData }
28 }
29 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
30 let trigger = self.trigger.clone();
31 Observable::<Item>::create(move |s| {
32 let value = Arc::new(RwLock::new(None::<Item>));
33 let sctl = StreamController::new(s);
34
35 let obs_trigger = {
36 let value_trigger_next = Arc::clone(&value);
37 let sctl_trigger_next = sctl.clone();
38
39 sctl.new_observer(
40 move |_, _| {
41 let value = {
42 let mut v = value_trigger_next.write().unwrap();
43 let vv = v.clone();
44 *v = None;
45 vv
46 };
47 if let Some(v) = value {
48 sctl_trigger_next.sink_next(v);
49 }
50 },
51 |_, _| {},
52 |_| {},
53 )
54 };
55
56 let obs_source = {
57 let value_next = Arc::clone(&value);
58 let sctl_error = sctl.clone();
59 let sctl_complete = sctl.clone();
60
61 sctl.new_observer(
62 move |_, x| {
63 *value_next.write().unwrap() = Some(x);
64 },
65 move |_, e| {
66 sctl_error.sink_error(e);
67 },
68 move |_| sctl_complete.sink_complete_force(), )
70 };
71
72 trigger.inner_subscribe(obs_trigger);
73 source.inner_subscribe(obs_source);
74 })
75 }
76}
77
78impl<'a, Item> Observable<'a, Item>
79where
80 Item: Clone + Send + Sync,
81{
82 pub fn sample<TriggerValue>(
83 &self,
84 trigger: Observable<'a, TriggerValue>,
85 ) -> Observable<'a, Item>
86 where
87 TriggerValue: Clone + Send + Sync,
88 {
89 Sample::new(trigger).execute(self.clone())
90 }
91}
92
93#[cfg(all(test, not(feature = "web")))]
94mod test {
95 use crate::prelude::*;
96 use std::{thread, time};
97
98 #[test]
99 fn basic() {
100 let sbj = subjects::Subject::new();
101 observables::interval(
102 time::Duration::from_millis(100),
103 schedulers::new_thread_scheduler(),
104 )
105 .sample(sbj.observable())
106 .take(3)
107 .subscribe(
108 print_next_fmt!("{}"),
109 print_error!(),
110 print_complete!(),
111 );
112
113 (0..3).for_each(|_| {
114 thread::sleep(time::Duration::from_millis(500));
115 sbj.next(());
116 });
117 sbj.complete();
118 thread::sleep(time::Duration::from_millis(500));
119 }
120}