another_rxrust/operators/
timeout.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use scheduler::IScheduler;
4use std::marker::PhantomData;
5use std::sync::{Arc, RwLock};
6use std::time::Duration;
7
8#[derive(Clone)]
9pub struct Timeout<'a, Scheduler, Item>
10where
11  Scheduler: IScheduler<'a> + Clone + Send + Sync,
12  Item: Clone + Send + Sync,
13{
14  dur: Duration,
15  scheduler_ctor: FunctionWrapper<'a, (), Scheduler>,
16  _item: PhantomData<Item>,
17}
18
19impl<'a, Scheduler, Item> Timeout<'a, Scheduler, Item>
20where
21  Scheduler: IScheduler<'a> + Clone + Send + Sync,
22  Item: Clone + Send + Sync,
23{
24  pub fn new<SchedulerCreator>(
25    dur: Duration,
26    scheduler_ctor: SchedulerCreator,
27  ) -> Timeout<'a, Scheduler, Item>
28  where
29    SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
30  {
31    Timeout {
32      dur,
33      scheduler_ctor: FunctionWrapper::new(move |_| scheduler_ctor()),
34      _item: PhantomData,
35    }
36  }
37
38  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
39    let scheduler_ctor = self.scheduler_ctor.clone();
40    let dur = self.dur;
41    Observable::create(move |s| {
42      let sctl = StreamController::new(s);
43      let timer = Arc::new(RwLock::new(None::<Subscription<'a>>));
44      let scheduler_ctor = scheduler_ctor.clone();
45
46      let sctl_next = sctl.clone();
47      let sctl_error = sctl.clone();
48      let sctl_complete = sctl.clone();
49      source.inner_subscribe(sctl.new_observer(
50        move |_, x| {
51          {
52            let mut timer = timer.write().unwrap();
53            if let Some(timer) = &*timer {
54              timer.unsubscribe();
55            }
56            *timer = None;
57          }
58
59          sctl_next.sink_next(x);
60
61          {
62            let sctl = sctl_next.clone();
63            let scheduler_ctor = scheduler_ctor.clone();
64            *timer.write().unwrap() = Some(
65              observables::interval(dur, move || scheduler_ctor.call(()))
66                .take(1)
67                .subscribe(
68                  move |_| {
69                    sctl.sink_error(RxError::from_error(
70                      std::io::Error::from(std::io::ErrorKind::TimedOut),
71                    ));
72                  },
73                  junk_error!(),
74                  junk_complete!(),
75                ),
76            );
77          }
78        },
79        move |_, e| {
80          sctl_error.sink_error(e);
81        },
82        move |serial| sctl_complete.sink_complete(&serial),
83      ));
84    })
85  }
86}
87
88impl<'a, Item> Observable<'a, Item>
89where
90  Item: Clone + Send + Sync,
91{
92  pub fn timeout<Scheduler, SchedulerCreator>(
93    &self,
94    dur: Duration,
95    scheduler_ctor: SchedulerCreator,
96  ) -> Observable<'a, Item>
97  where
98    Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
99    SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
100  {
101    Timeout::new(dur, scheduler_ctor).execute(self.clone())
102  }
103}
104
105#[cfg(test)]
106mod test {
107  use crate::prelude::*;
108  use std::{thread, time};
109
110  #[test]
111  fn basic() {
112    let sbj = subjects::Subject::new();
113
114    sbj
115      .observable()
116      .timeout(
117        time::Duration::from_millis(100),
118        schedulers::new_thread_scheduler(),
119      )
120      .subscribe(
121        print_next_fmt!("{}"),
122        print_error_as!(std::io::Error),
123        print_complete!(),
124      );
125
126    sbj.next(1);
127    thread::sleep(time::Duration::from_millis(10));
128    sbj.next(2);
129    thread::sleep(time::Duration::from_millis(10));
130    sbj.next(3);
131    thread::sleep(time::Duration::from_millis(10));
132    sbj.next(4);
133    thread::sleep(time::Duration::from_millis(200));
134    sbj.next(5);
135
136    thread::sleep(time::Duration::from_millis(1000));
137  }
138}