another_rxrust/operators/
timeout.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use scheduler::IScheduler;
4use std::marker::PhantomData;
5use std::sync::{Arc, RwLock};
6use std::time::Duration;
7
8#[derive(Clone)]
9pub struct Timeout<'a, Scheduler, Item>
10where
11 Scheduler: IScheduler<'a> + Clone + Send + Sync,
12 Item: Clone + Send + Sync,
13{
14 dur: Duration,
15 scheduler_ctor: FunctionWrapper<'a, (), Scheduler>,
16 _item: PhantomData<Item>,
17}
18
19impl<'a, Scheduler, Item> Timeout<'a, Scheduler, Item>
20where
21 Scheduler: IScheduler<'a> + Clone + Send + Sync,
22 Item: Clone + Send + Sync,
23{
24 pub fn new<SchedulerCreator>(
25 dur: Duration,
26 scheduler_ctor: SchedulerCreator,
27 ) -> Timeout<'a, Scheduler, Item>
28 where
29 SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
30 {
31 Timeout {
32 dur,
33 scheduler_ctor: FunctionWrapper::new(move |_| scheduler_ctor()),
34 _item: PhantomData,
35 }
36 }
37
38 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
39 let scheduler_ctor = self.scheduler_ctor.clone();
40 let dur = self.dur;
41 Observable::create(move |s| {
42 let sctl = StreamController::new(s);
43 let timer = Arc::new(RwLock::new(None::<Subscription<'a>>));
44 let scheduler_ctor = scheduler_ctor.clone();
45
46 let sctl_next = sctl.clone();
47 let sctl_error = sctl.clone();
48 let sctl_complete = sctl.clone();
49 source.inner_subscribe(sctl.new_observer(
50 move |_, x| {
51 {
52 let mut timer = timer.write().unwrap();
53 if let Some(timer) = &*timer {
54 timer.unsubscribe();
55 }
56 *timer = None;
57 }
58
59 sctl_next.sink_next(x);
60
61 {
62 let sctl = sctl_next.clone();
63 let scheduler_ctor = scheduler_ctor.clone();
64 *timer.write().unwrap() = Some(
65 observables::interval(dur, move || scheduler_ctor.call(()))
66 .take(1)
67 .subscribe(
68 move |_| {
69 sctl.sink_error(RxError::from_error(
70 std::io::Error::from(std::io::ErrorKind::TimedOut),
71 ));
72 },
73 junk_error!(),
74 junk_complete!(),
75 ),
76 );
77 }
78 },
79 move |_, e| {
80 sctl_error.sink_error(e);
81 },
82 move |serial| sctl_complete.sink_complete(&serial),
83 ));
84 })
85 }
86}
87
88impl<'a, Item> Observable<'a, Item>
89where
90 Item: Clone + Send + Sync,
91{
92 pub fn timeout<Scheduler, SchedulerCreator>(
93 &self,
94 dur: Duration,
95 scheduler_ctor: SchedulerCreator,
96 ) -> Observable<'a, Item>
97 where
98 Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
99 SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
100 {
101 Timeout::new(dur, scheduler_ctor).execute(self.clone())
102 }
103}
104
105#[cfg(test)]
106mod test {
107 use crate::prelude::*;
108 use std::{thread, time};
109
110 #[test]
111 fn basic() {
112 let sbj = subjects::Subject::new();
113
114 sbj
115 .observable()
116 .timeout(
117 time::Duration::from_millis(100),
118 schedulers::new_thread_scheduler(),
119 )
120 .subscribe(
121 print_next_fmt!("{}"),
122 print_error_as!(std::io::Error),
123 print_complete!(),
124 );
125
126 sbj.next(1);
127 thread::sleep(time::Duration::from_millis(10));
128 sbj.next(2);
129 thread::sleep(time::Duration::from_millis(10));
130 sbj.next(3);
131 thread::sleep(time::Duration::from_millis(10));
132 sbj.next(4);
133 thread::sleep(time::Duration::from_millis(200));
134 sbj.next(5);
135
136 thread::sleep(time::Duration::from_millis(1000));
137 }
138}