another_rxrust/operators/
skip_until.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  marker::PhantomData,
5  sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct SkipUntil<'a, Item, TriggerValue>
10where
11  Item: Clone + Send + Sync,
12  TriggerValue: Clone + Send + Sync,
13{
14  trigger: Observable<'a, TriggerValue>,
15  _item: PhantomData<Item>,
16}
17
18impl<'a, Item, TriggerValue> SkipUntil<'a, Item, TriggerValue>
19where
20  Item: Clone + Send + Sync,
21  TriggerValue: Clone + Send + Sync,
22{
23  pub fn new(
24    trigger: Observable<'a, TriggerValue>,
25  ) -> SkipUntil<'a, Item, TriggerValue>
26  where
27    TriggerValue: Clone + Send + Sync,
28  {
29    SkipUntil { trigger, _item: PhantomData }
30  }
31  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
32    let trigger = self.trigger.clone();
33    Observable::<Item>::create(move |s| {
34      let enable = Arc::new(RwLock::new(false));
35      let sctl = StreamController::new(s);
36
37      let obs_trigger = {
38        let enable_triggner_next = Arc::clone(&enable);
39        let sctl_trigger_next = sctl.clone();
40
41        sctl.new_observer(
42          move |serial, _| {
43            *enable_triggner_next.write().unwrap() = true;
44            sctl_trigger_next.upstream_abort_observe(&serial);
45          },
46          |_, _| {},
47          |_| {},
48        )
49      };
50
51      let obs_source = {
52        let sctl_next = sctl.clone();
53        let sctl_error = sctl.clone();
54        let sctl_complete = sctl.clone();
55
56        sctl.new_observer(
57          move |_, x| {
58            if *enable.read().unwrap() {
59              sctl_next.sink_next(x);
60            }
61          },
62          move |_, e| {
63            sctl_error.sink_error(e);
64          },
65          move |_| sctl_complete.sink_complete_force(), // trigger also unsubscribe
66        )
67      };
68
69      trigger.inner_subscribe(obs_trigger);
70      source.inner_subscribe(obs_source);
71    })
72  }
73}
74
75impl<'a, Item> Observable<'a, Item>
76where
77  Item: Clone + Send + Sync,
78{
79  pub fn skip_until<TriggerValue>(
80    &self,
81    trigger: Observable<'a, TriggerValue>,
82  ) -> Observable<'a, Item>
83  where
84    TriggerValue: Clone + Send + Sync,
85  {
86    SkipUntil::new(trigger).execute(self.clone())
87  }
88}
89
90#[cfg(all(test, not(feature = "web")))]
91mod test {
92  use crate::prelude::*;
93  use std::{thread, time};
94
95  #[test]
96  fn basic() {
97    let o = Observable::create(|s| {
98      for n in 0..10 {
99        thread::sleep(time::Duration::from_millis(100));
100        s.next(n);
101      }
102      s.complete();
103    });
104
105    o.skip_until(observables::timer(
106      time::Duration::from_millis(500),
107      schedulers::new_thread_scheduler(),
108    ))
109    .subscribe(
110      print_next_fmt!("{}"),
111      print_error!(),
112      print_complete!(),
113    );
114  }
115}