another_rxrust/operators/
subscribe_on.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use scheduler::IScheduler;
4use std::marker::PhantomData;
5
6#[derive(Clone)]
7pub struct SubscribeOn<'a, Scheduler, Item>
8where
9  Scheduler: IScheduler<'a> + Clone + Send + Sync,
10  Item: Clone + Send + Sync,
11{
12  scheduler_ctor: FunctionWrapper<'a, (), Scheduler>,
13  _item: PhantomData<Item>,
14  _lifetime: PhantomData<&'a ()>,
15}
16
17impl<'a, Scheduler, Item> SubscribeOn<'a, Scheduler, Item>
18where
19  Scheduler: IScheduler<'a> + Clone + Send + Sync,
20  Item: Clone + Send + Sync,
21{
22  pub fn new<SchedulerCreator>(
23    scheduler_ctor: SchedulerCreator,
24  ) -> SubscribeOn<'a, Scheduler, Item>
25  where
26    SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
27  {
28    SubscribeOn {
29      scheduler_ctor: FunctionWrapper::new(move |_| scheduler_ctor()),
30      _item: PhantomData,
31      _lifetime: PhantomData,
32    }
33  }
34
35  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
36    let scheduler_ctor = self.scheduler_ctor.clone();
37    Observable::create(move |s| {
38      let scheduler = scheduler_ctor.call(());
39      let sctl = StreamController::new(s);
40      let source_next = source.clone();
41
42      let scheduler_on_finalize = scheduler.clone();
43      sctl.set_on_finalize(move || {
44        scheduler_on_finalize.abort();
45      });
46
47      scheduler.post(move || {
48        let sctl_next = sctl.clone();
49        let sctl_error = sctl.clone();
50        let sctl_complete = sctl.clone();
51        source_next.inner_subscribe(sctl.new_observer(
52          move |_, x| {
53            sctl_next.sink_next(x);
54          },
55          move |_, e| {
56            sctl_error.sink_error(e);
57          },
58          move |serial| sctl_complete.sink_complete(&serial),
59        ));
60      });
61    })
62  }
63}
64
65impl<'a, Item> Observable<'a, Item>
66where
67  Item: Clone + Send + Sync,
68{
69  pub fn subscribe_on<Scheduler, SchedulerCreator>(
70    &self,
71    scheduler_ctor: SchedulerCreator,
72  ) -> Observable<'a, Item>
73  where
74    Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
75    SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
76  {
77    SubscribeOn::new(scheduler_ctor).execute(self.clone())
78  }
79}
80
81#[cfg(all(test, not(feature = "web")))]
82mod test {
83  use crate::prelude::*;
84  use std::{thread, time};
85
86  #[test]
87  fn basic() {
88    let o = Observable::create(|s| {
89      for n in 0..5 {
90        s.next(n);
91        thread::sleep(time::Duration::from_millis(100));
92      }
93      s.complete();
94    });
95
96    o.subscribe_on(schedulers::new_thread_scheduler())
97      .subscribe(
98        print_next_fmt!("{}"),
99        print_error!(),
100        print_complete!(),
101      );
102    thread::sleep(time::Duration::from_millis(1000));
103  }
104
105  #[test]
106  fn multiple() {
107    let o = Observable::create(|s| {
108      for n in 0..5 {
109        s.next(n);
110        thread::sleep(time::Duration::from_millis(100));
111      }
112      s.complete();
113    })
114    .subscribe_on(schedulers::new_thread_scheduler());
115
116    o.subscribe(
117      |x| println!("#1 next {}", x),
118      |e| println!("#1 error {:?}", e),
119      || println!("#1 complete"),
120    );
121
122    o.subscribe(
123      |x| println!("#2 next {}", x),
124      |e| println!("#2 error {:?}", e),
125      || println!("#2 complete"),
126    );
127
128    thread::sleep(time::Duration::from_millis(1000));
129  }
130}