another_rxrust/operators/
take_until.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4
5#[derive(Clone)]
6pub struct TakeUntil<'a, Item, TriggerValue>
7where
8  Item: Clone + Send + Sync,
9  TriggerValue: Clone + Send + Sync,
10{
11  trigger: Observable<'a, TriggerValue>,
12  _item: PhantomData<Item>,
13}
14
15impl<'a, Item, TriggerValue> TakeUntil<'a, Item, TriggerValue>
16where
17  Item: Clone + Send + Sync,
18  TriggerValue: Clone + Send + Sync,
19{
20  pub fn new(
21    trigger: Observable<'a, TriggerValue>,
22  ) -> TakeUntil<'a, Item, TriggerValue>
23  where
24    TriggerValue: Clone + Send + Sync,
25  {
26    TakeUntil { trigger, _item: PhantomData }
27  }
28  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
29    let trigger = self.trigger.clone();
30    Observable::<Item>::create(move |s| {
31      let sctl = StreamController::new(s);
32
33      let obs_trigger = {
34        let sctl_trigger_next = sctl.clone();
35
36        sctl.new_observer(
37          move |_, _| {
38            sctl_trigger_next.sink_complete_force();
39          },
40          |_, _| {},
41          |_| {},
42        )
43      };
44
45      let obs_source = {
46        let sctl_next = sctl.clone();
47        let sctl_error = sctl.clone();
48        let sctl_complete = sctl.clone();
49
50        sctl.new_observer(
51          move |_, x| {
52            sctl_next.sink_next(x);
53          },
54          move |_, e| {
55            sctl_error.sink_error(e);
56          },
57          move |_| sctl_complete.sink_complete_force(), // trigger also unsubscribe
58        )
59      };
60
61      trigger.inner_subscribe(obs_trigger);
62      source.inner_subscribe(obs_source);
63    })
64  }
65}
66
67impl<'a, Item> Observable<'a, Item>
68where
69  Item: Clone + Send + Sync,
70{
71  pub fn take_until<TriggerValue>(
72    &self,
73    trigger: Observable<'a, TriggerValue>,
74  ) -> Observable<'a, Item>
75  where
76    TriggerValue: Clone + Send + Sync,
77  {
78    TakeUntil::new(trigger).execute(self.clone())
79  }
80}
81
82#[cfg(all(test, not(feature = "web")))]
83mod test {
84  use crate::prelude::*;
85  use std::{thread, time};
86
87  #[test]
88  fn basic() {
89    let o = Observable::create(|s| {
90      for n in 0..10 {
91        thread::sleep(time::Duration::from_millis(100));
92        s.next(n);
93      }
94      s.complete();
95    });
96
97    o.take_until(observables::timer(
98      time::Duration::from_millis(500),
99      schedulers::new_thread_scheduler(),
100    ))
101    .subscribe(
102      print_next_fmt!("{}"),
103      print_error!(),
104      print_complete!(),
105    );
106  }
107
108  #[test]
109  fn with_subject() {
110    let sbj = subjects::Subject::new();
111    let sbj_ = sbj.clone();
112    let sbsc = observables::interval(
113      time::Duration::from_millis(100),
114      schedulers::new_thread_scheduler(),
115    )
116    .take_until(sbj.observable())
117    .flat_map(move |x| {
118      if x == 10 {
119        sbj_.next(());
120      }
121      return observables::empty::<u64>();
122    })
123    .subscribe(
124      junk_next!(),
125      junk_error!(),
126      junk_complete!(),
127    );
128    while sbsc.is_subscribed() {}
129  }
130}