another_rxrust/operators/
subscribe_on.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use scheduler::IScheduler;
4use std::marker::PhantomData;
5
6#[derive(Clone)]
7pub struct SubscribeOn<'a, Scheduler, Item>
8where
9 Scheduler: IScheduler<'a> + Clone + Send + Sync,
10 Item: Clone + Send + Sync,
11{
12 scheduler_ctor: FunctionWrapper<'a, (), Scheduler>,
13 _item: PhantomData<Item>,
14 _lifetime: PhantomData<&'a ()>,
15}
16
17impl<'a, Scheduler, Item> SubscribeOn<'a, Scheduler, Item>
18where
19 Scheduler: IScheduler<'a> + Clone + Send + Sync,
20 Item: Clone + Send + Sync,
21{
22 pub fn new<SchedulerCreator>(
23 scheduler_ctor: SchedulerCreator,
24 ) -> SubscribeOn<'a, Scheduler, Item>
25 where
26 SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
27 {
28 SubscribeOn {
29 scheduler_ctor: FunctionWrapper::new(move |_| scheduler_ctor()),
30 _item: PhantomData,
31 _lifetime: PhantomData,
32 }
33 }
34
35 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
36 let scheduler_ctor = self.scheduler_ctor.clone();
37 Observable::create(move |s| {
38 let scheduler = scheduler_ctor.call(());
39 let sctl = StreamController::new(s);
40 let source_next = source.clone();
41
42 let scheduler_on_finalize = scheduler.clone();
43 sctl.set_on_finalize(move || {
44 scheduler_on_finalize.abort();
45 });
46
47 scheduler.post(move || {
48 let sctl_next = sctl.clone();
49 let sctl_error = sctl.clone();
50 let sctl_complete = sctl.clone();
51 source_next.inner_subscribe(sctl.new_observer(
52 move |_, x| {
53 sctl_next.sink_next(x);
54 },
55 move |_, e| {
56 sctl_error.sink_error(e);
57 },
58 move |serial| sctl_complete.sink_complete(&serial),
59 ));
60 });
61 })
62 }
63}
64
65impl<'a, Item> Observable<'a, Item>
66where
67 Item: Clone + Send + Sync,
68{
69 pub fn subscribe_on<Scheduler, SchedulerCreator>(
70 &self,
71 scheduler_ctor: SchedulerCreator,
72 ) -> Observable<'a, Item>
73 where
74 Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
75 SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
76 {
77 SubscribeOn::new(scheduler_ctor).execute(self.clone())
78 }
79}
80
81#[cfg(all(test, not(feature = "web")))]
82mod test {
83 use crate::prelude::*;
84 use std::{thread, time};
85
86 #[test]
87 fn basic() {
88 let o = Observable::create(|s| {
89 for n in 0..5 {
90 s.next(n);
91 thread::sleep(time::Duration::from_millis(100));
92 }
93 s.complete();
94 });
95
96 o.subscribe_on(schedulers::new_thread_scheduler())
97 .subscribe(
98 print_next_fmt!("{}"),
99 print_error!(),
100 print_complete!(),
101 );
102 thread::sleep(time::Duration::from_millis(1000));
103 }
104
105 #[test]
106 fn multiple() {
107 let o = Observable::create(|s| {
108 for n in 0..5 {
109 s.next(n);
110 thread::sleep(time::Duration::from_millis(100));
111 }
112 s.complete();
113 })
114 .subscribe_on(schedulers::new_thread_scheduler());
115
116 o.subscribe(
117 |x| println!("#1 next {}", x),
118 |e| println!("#1 error {:?}", e),
119 || println!("#1 complete"),
120 );
121
122 o.subscribe(
123 |x| println!("#2 next {}", x),
124 |e| println!("#2 error {:?}", e),
125 || println!("#2 complete"),
126 );
127
128 thread::sleep(time::Duration::from_millis(1000));
129 }
130}