another_rxrust/operators/
observe_on.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use scheduler::IScheduler;
4use std::marker::PhantomData;
5
6#[derive(Clone)]
7pub struct ObserveOn<'a, Scheduler, Item>
8where
9  Scheduler: IScheduler<'a> + Clone + Send + Sync,
10  Item: Clone + Send + Sync,
11{
12  scheduler_ctor: FunctionWrapper<'a, (), Scheduler>,
13  _item: PhantomData<Item>,
14  _lifetime: PhantomData<&'a ()>,
15}
16
17impl<'a, Scheduler, Item> ObserveOn<'a, Scheduler, Item>
18where
19  Scheduler: IScheduler<'a> + Clone + Send + Sync,
20  Item: Clone + Send + Sync,
21{
22  pub fn new<SchedulerCreator>(
23    scheduler_ctor: SchedulerCreator,
24  ) -> ObserveOn<'a, Scheduler, Item>
25  where
26    SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
27  {
28    ObserveOn {
29      scheduler_ctor: FunctionWrapper::new(move |_| scheduler_ctor()),
30      _item: PhantomData,
31      _lifetime: PhantomData,
32    }
33  }
34
35  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
36    let scheduler_ctor = self.scheduler_ctor.clone();
37    Observable::create(move |s| {
38      let scheduler = scheduler_ctor.call(());
39
40      let sctl = StreamController::new(s);
41
42      let sctl_next = sctl.clone();
43      let sctl_error = sctl.clone();
44      let sctl_complete = sctl.clone();
45
46      let source_next = source.clone();
47
48      let scheduler_next = scheduler.clone();
49      let scheduler_error = scheduler.clone();
50      let scheduler_complete = scheduler.clone();
51
52      let scheduler_on_finalize = scheduler.clone();
53      sctl.set_on_finalize(move || {
54        scheduler_on_finalize.abort();
55      });
56
57      source_next.inner_subscribe(sctl.new_observer(
58        move |_, x: Item| {
59          let sctl_next = sctl_next.clone();
60          scheduler_next.post(move || {
61            sctl_next.sink_next(x.clone());
62          });
63        },
64        move |_, e| {
65          let sctl_error = sctl_error.clone();
66          scheduler_error.post(move || {
67            sctl_error.sink_error(e.clone());
68          });
69        },
70        move |serial| {
71          let sctl_complete = sctl_complete.clone();
72          scheduler_complete.post(move || {
73            sctl_complete.sink_complete(&serial);
74          });
75        },
76      ));
77    })
78  }
79}
80
81impl<'a, Item> Observable<'a, Item>
82where
83  Item: Clone + Send + Sync,
84{
85  pub fn observe_on<Scheduler, SchedulerCreator>(
86    &self,
87    scheduler_ctor: SchedulerCreator,
88  ) -> Observable<'a, Item>
89  where
90    Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
91    SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
92  {
93    ObserveOn::new(scheduler_ctor).execute(self.clone())
94  }
95}
96
97#[cfg(all(test, not(feature = "web")))]
98mod test {
99  use crate::prelude::*;
100  use std::{thread, time};
101
102  #[test]
103  fn basic() {
104    let o = Observable::create(|s| {
105      for n in 0..5 {
106        s.next(n);
107        thread::sleep(time::Duration::from_millis(100));
108      }
109      s.complete();
110    });
111
112    o.observe_on(schedulers::new_thread_scheduler()).subscribe(
113      print_next_fmt!("{}"),
114      print_error!(),
115      print_complete!(),
116    );
117    thread::sleep(time::Duration::from_millis(1000));
118  }
119
120  #[test]
121  fn multiple() {
122    let o = Observable::create(|s| {
123      for n in 0..5 {
124        s.next(n);
125        thread::sleep(time::Duration::from_millis(100));
126      }
127      s.complete();
128    })
129    .observe_on(schedulers::new_thread_scheduler());
130
131    o.subscribe(
132      |x| println!("#1 next {}", x),
133      |e| println!("#1 error {:?}", e),
134      || println!("#1 complete"),
135    );
136
137    o.subscribe(
138      |x| println!("#2 next {}", x),
139      |e| println!("#2 error {:?}", e),
140      || println!("#2 complete"),
141    );
142    thread::sleep(time::Duration::from_millis(1000));
143  }
144}