another_rxrust/operators/
observe_on.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use scheduler::IScheduler;
4use std::marker::PhantomData;
5
6#[derive(Clone)]
7pub struct ObserveOn<'a, Scheduler, Item>
8where
9 Scheduler: IScheduler<'a> + Clone + Send + Sync,
10 Item: Clone + Send + Sync,
11{
12 scheduler_ctor: FunctionWrapper<'a, (), Scheduler>,
13 _item: PhantomData<Item>,
14 _lifetime: PhantomData<&'a ()>,
15}
16
17impl<'a, Scheduler, Item> ObserveOn<'a, Scheduler, Item>
18where
19 Scheduler: IScheduler<'a> + Clone + Send + Sync,
20 Item: Clone + Send + Sync,
21{
22 pub fn new<SchedulerCreator>(
23 scheduler_ctor: SchedulerCreator,
24 ) -> ObserveOn<'a, Scheduler, Item>
25 where
26 SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
27 {
28 ObserveOn {
29 scheduler_ctor: FunctionWrapper::new(move |_| scheduler_ctor()),
30 _item: PhantomData,
31 _lifetime: PhantomData,
32 }
33 }
34
35 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
36 let scheduler_ctor = self.scheduler_ctor.clone();
37 Observable::create(move |s| {
38 let scheduler = scheduler_ctor.call(());
39
40 let sctl = StreamController::new(s);
41
42 let sctl_next = sctl.clone();
43 let sctl_error = sctl.clone();
44 let sctl_complete = sctl.clone();
45
46 let source_next = source.clone();
47
48 let scheduler_next = scheduler.clone();
49 let scheduler_error = scheduler.clone();
50 let scheduler_complete = scheduler.clone();
51
52 let scheduler_on_finalize = scheduler.clone();
53 sctl.set_on_finalize(move || {
54 scheduler_on_finalize.abort();
55 });
56
57 source_next.inner_subscribe(sctl.new_observer(
58 move |_, x: Item| {
59 let sctl_next = sctl_next.clone();
60 scheduler_next.post(move || {
61 sctl_next.sink_next(x.clone());
62 });
63 },
64 move |_, e| {
65 let sctl_error = sctl_error.clone();
66 scheduler_error.post(move || {
67 sctl_error.sink_error(e.clone());
68 });
69 },
70 move |serial| {
71 let sctl_complete = sctl_complete.clone();
72 scheduler_complete.post(move || {
73 sctl_complete.sink_complete(&serial);
74 });
75 },
76 ));
77 })
78 }
79}
80
81impl<'a, Item> Observable<'a, Item>
82where
83 Item: Clone + Send + Sync,
84{
85 pub fn observe_on<Scheduler, SchedulerCreator>(
86 &self,
87 scheduler_ctor: SchedulerCreator,
88 ) -> Observable<'a, Item>
89 where
90 Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
91 SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
92 {
93 ObserveOn::new(scheduler_ctor).execute(self.clone())
94 }
95}
96
97#[cfg(all(test, not(feature = "web")))]
98mod test {
99 use crate::prelude::*;
100 use std::{thread, time};
101
102 #[test]
103 fn basic() {
104 let o = Observable::create(|s| {
105 for n in 0..5 {
106 s.next(n);
107 thread::sleep(time::Duration::from_millis(100));
108 }
109 s.complete();
110 });
111
112 o.observe_on(schedulers::new_thread_scheduler()).subscribe(
113 print_next_fmt!("{}"),
114 print_error!(),
115 print_complete!(),
116 );
117 thread::sleep(time::Duration::from_millis(1000));
118 }
119
120 #[test]
121 fn multiple() {
122 let o = Observable::create(|s| {
123 for n in 0..5 {
124 s.next(n);
125 thread::sleep(time::Duration::from_millis(100));
126 }
127 s.complete();
128 })
129 .observe_on(schedulers::new_thread_scheduler());
130
131 o.subscribe(
132 |x| println!("#1 next {}", x),
133 |e| println!("#1 error {:?}", e),
134 || println!("#1 complete"),
135 );
136
137 o.subscribe(
138 |x| println!("#2 next {}", x),
139 |e| println!("#2 error {:?}", e),
140 || println!("#2 complete"),
141 );
142 thread::sleep(time::Duration::from_millis(1000));
143 }
144}