another_rxrust/operators/
skip_until.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 marker::PhantomData,
5 sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct SkipUntil<'a, Item, TriggerValue>
10where
11 Item: Clone + Send + Sync,
12 TriggerValue: Clone + Send + Sync,
13{
14 trigger: Observable<'a, TriggerValue>,
15 _item: PhantomData<Item>,
16}
17
18impl<'a, Item, TriggerValue> SkipUntil<'a, Item, TriggerValue>
19where
20 Item: Clone + Send + Sync,
21 TriggerValue: Clone + Send + Sync,
22{
23 pub fn new(
24 trigger: Observable<'a, TriggerValue>,
25 ) -> SkipUntil<'a, Item, TriggerValue>
26 where
27 TriggerValue: Clone + Send + Sync,
28 {
29 SkipUntil { trigger, _item: PhantomData }
30 }
31 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
32 let trigger = self.trigger.clone();
33 Observable::<Item>::create(move |s| {
34 let enable = Arc::new(RwLock::new(false));
35 let sctl = StreamController::new(s);
36
37 let obs_trigger = {
38 let enable_triggner_next = Arc::clone(&enable);
39 let sctl_trigger_next = sctl.clone();
40
41 sctl.new_observer(
42 move |serial, _| {
43 *enable_triggner_next.write().unwrap() = true;
44 sctl_trigger_next.upstream_abort_observe(&serial);
45 },
46 |_, _| {},
47 |_| {},
48 )
49 };
50
51 let obs_source = {
52 let sctl_next = sctl.clone();
53 let sctl_error = sctl.clone();
54 let sctl_complete = sctl.clone();
55
56 sctl.new_observer(
57 move |_, x| {
58 if *enable.read().unwrap() {
59 sctl_next.sink_next(x);
60 }
61 },
62 move |_, e| {
63 sctl_error.sink_error(e);
64 },
65 move |_| sctl_complete.sink_complete_force(), )
67 };
68
69 trigger.inner_subscribe(obs_trigger);
70 source.inner_subscribe(obs_source);
71 })
72 }
73}
74
75impl<'a, Item> Observable<'a, Item>
76where
77 Item: Clone + Send + Sync,
78{
79 pub fn skip_until<TriggerValue>(
80 &self,
81 trigger: Observable<'a, TriggerValue>,
82 ) -> Observable<'a, Item>
83 where
84 TriggerValue: Clone + Send + Sync,
85 {
86 SkipUntil::new(trigger).execute(self.clone())
87 }
88}
89
90#[cfg(all(test, not(feature = "web")))]
91mod test {
92 use crate::prelude::*;
93 use std::{thread, time};
94
95 #[test]
96 fn basic() {
97 let o = Observable::create(|s| {
98 for n in 0..10 {
99 thread::sleep(time::Duration::from_millis(100));
100 s.next(n);
101 }
102 s.complete();
103 });
104
105 o.skip_until(observables::timer(
106 time::Duration::from_millis(500),
107 schedulers::new_thread_scheduler(),
108 ))
109 .subscribe(
110 print_next_fmt!("{}"),
111 print_error!(),
112 print_complete!(),
113 );
114 }
115}