another_rxrust/operators/
take_until.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4
5#[derive(Clone)]
6pub struct TakeUntil<'a, Item, TriggerValue>
7where
8 Item: Clone + Send + Sync,
9 TriggerValue: Clone + Send + Sync,
10{
11 trigger: Observable<'a, TriggerValue>,
12 _item: PhantomData<Item>,
13}
14
15impl<'a, Item, TriggerValue> TakeUntil<'a, Item, TriggerValue>
16where
17 Item: Clone + Send + Sync,
18 TriggerValue: Clone + Send + Sync,
19{
20 pub fn new(
21 trigger: Observable<'a, TriggerValue>,
22 ) -> TakeUntil<'a, Item, TriggerValue>
23 where
24 TriggerValue: Clone + Send + Sync,
25 {
26 TakeUntil { trigger, _item: PhantomData }
27 }
28 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
29 let trigger = self.trigger.clone();
30 Observable::<Item>::create(move |s| {
31 let sctl = StreamController::new(s);
32
33 let obs_trigger = {
34 let sctl_trigger_next = sctl.clone();
35
36 sctl.new_observer(
37 move |_, _| {
38 sctl_trigger_next.sink_complete_force();
39 },
40 |_, _| {},
41 |_| {},
42 )
43 };
44
45 let obs_source = {
46 let sctl_next = sctl.clone();
47 let sctl_error = sctl.clone();
48 let sctl_complete = sctl.clone();
49
50 sctl.new_observer(
51 move |_, x| {
52 sctl_next.sink_next(x);
53 },
54 move |_, e| {
55 sctl_error.sink_error(e);
56 },
57 move |_| sctl_complete.sink_complete_force(), )
59 };
60
61 trigger.inner_subscribe(obs_trigger);
62 source.inner_subscribe(obs_source);
63 })
64 }
65}
66
67impl<'a, Item> Observable<'a, Item>
68where
69 Item: Clone + Send + Sync,
70{
71 pub fn take_until<TriggerValue>(
72 &self,
73 trigger: Observable<'a, TriggerValue>,
74 ) -> Observable<'a, Item>
75 where
76 TriggerValue: Clone + Send + Sync,
77 {
78 TakeUntil::new(trigger).execute(self.clone())
79 }
80}
81
82#[cfg(all(test, not(feature = "web")))]
83mod test {
84 use crate::prelude::*;
85 use std::{thread, time};
86
87 #[test]
88 fn basic() {
89 let o = Observable::create(|s| {
90 for n in 0..10 {
91 thread::sleep(time::Duration::from_millis(100));
92 s.next(n);
93 }
94 s.complete();
95 });
96
97 o.take_until(observables::timer(
98 time::Duration::from_millis(500),
99 schedulers::new_thread_scheduler(),
100 ))
101 .subscribe(
102 print_next_fmt!("{}"),
103 print_error!(),
104 print_complete!(),
105 );
106 }
107
108 #[test]
109 fn with_subject() {
110 let sbj = subjects::Subject::new();
111 let sbj_ = sbj.clone();
112 let sbsc = observables::interval(
113 time::Duration::from_millis(100),
114 schedulers::new_thread_scheduler(),
115 )
116 .take_until(sbj.observable())
117 .flat_map(move |x| {
118 if x == 10 {
119 sbj_.next(());
120 }
121 return observables::empty::<u64>();
122 })
123 .subscribe(
124 junk_next!(),
125 junk_error!(),
126 junk_complete!(),
127 );
128 while sbsc.is_subscribed() {}
129 }
130}