use crate::prelude::*;
use std::sync::{Arc, Mutex};
use std::time::Duration;
pub trait ThrottleTime
where
Self: Sized,
{
fn throttle_time(
self,
duration: Duration,
edge: ThrottleEdge,
) -> ThrottleTimeOp<Self> {
ThrottleTimeOp {
source: self,
duration,
edge,
}
}
}
#[derive(PartialEq, Clone, Copy)]
pub enum ThrottleEdge {
Tailing,
Leading,
}
pub struct ThrottleTimeOp<S> {
source: S,
duration: Duration,
edge: ThrottleEdge,
}
impl<S> Fork for ThrottleTimeOp<S>
where
S: Fork,
{
type Output = ThrottleTimeOp<S::Output>;
fn fork(&self) -> Self::Output {
ThrottleTimeOp {
source: self.source.fork(),
edge: self.edge,
duration: self.duration,
}
}
}
impl<S> IntoShared for ThrottleTimeOp<S>
where
S: IntoShared,
{
type Shared = ThrottleTimeOp<S::Shared>;
fn to_shared(self) -> Self::Shared {
ThrottleTimeOp {
source: self.source.to_shared(),
edge: self.edge,
duration: self.duration,
}
}
}
impl<S> ThrottleTime for S {}
impl<Item, Err, O, U, S> RawSubscribable<Item, Err, Subscriber<O, U>>
for ThrottleTimeOp<S>
where
S: RawSubscribable<
Item,
Err,
Subscriber<ThrottleTimeObserver<O::Shared, Item>, SharedSubscription>,
>,
O: IntoShared,
U: IntoShared<Shared = SharedSubscription>,
{
type Unsub = S::Unsub;
fn raw_subscribe(self, subscriber: Subscriber<O, U>) -> Self::Unsub {
let Self {
source,
duration,
edge,
} = self;
let subscription = subscriber.subscription.to_shared();
source.raw_subscribe(Subscriber {
observer: ThrottleTimeObserver(Arc::new(Mutex::new(
InnerThrottleTimeObserver {
observer: subscriber.observer.to_shared(),
edge,
delay: duration,
trailing_value: None,
throttled: None,
subscription: subscription.clone(),
},
))),
subscription,
})
}
}
struct InnerThrottleTimeObserver<O, Item> {
observer: O,
edge: ThrottleEdge,
delay: Duration,
trailing_value: Option<Item>,
throttled: Option<SharedSubscription>,
subscription: SharedSubscription,
}
pub struct ThrottleTimeObserver<O, Item>(
Arc<Mutex<InnerThrottleTimeObserver<O, Item>>>,
);
impl<O, Item> IntoShared for ThrottleTimeObserver<O, Item>
where
O: Send + Sync + 'static,
Item: Send + Sync + 'static,
{
type Shared = Self;
#[inline(always)]
fn to_shared(self) -> Self::Shared { self }
}
impl<O, Item, Err> Observer<Item, Err> for ThrottleTimeObserver<O, Item>
where
O: Observer<Item, Err> + Send + 'static,
Item: Clone + Send + 'static,
{
fn next(&mut self, value: &Item) {
let mut inner = self.0.lock().unwrap();
if inner.edge == ThrottleEdge::Tailing {
inner.trailing_value = Some(value.clone());
}
if inner.throttled.is_none() {
let c_inner = self.0.clone();
let subscription = Schedulers::ThreadPool.schedule(
move |_, _| {
let mut inner = c_inner.lock().unwrap();
if let Some(v) = inner.trailing_value.take() {
inner.observer.next(&v);
}
if let Some(mut throttled) = inner.throttled.take() {
throttled.unsubscribe();
inner.subscription.remove(&throttled);
}
},
Some(inner.delay),
(),
);
inner.subscription.add(subscription.clone());
inner.throttled = Some(subscription);
if inner.edge == ThrottleEdge::Leading {
inner.observer.next(value);
}
}
}
fn error(&mut self, err: &Err) {
let mut inner = self.0.lock().unwrap();
inner.observer.error(err)
}
fn complete(&mut self) {
let mut inner = self.0.lock().unwrap();
if let Some(value) = inner.trailing_value.take() {
inner.observer.next(&value);
}
inner.observer.complete();
}
}
#[test]
fn smoke() {
let x = Arc::new(Mutex::new(vec![]));
let x_c = x.clone();
let interval = observable::interval!(Duration::from_millis(2));
let throttle_subscribe = |edge| {
let x = x.clone();
interval
.fork()
.to_shared()
.throttle_time(Duration::from_millis(19), edge)
.subscribe(move |v| x.lock().unwrap().push(*v))
};
let mut sub = throttle_subscribe(ThrottleEdge::Tailing);
std::thread::sleep(Duration::from_millis(205));
sub.unsubscribe();
assert_eq!(
x_c.lock().unwrap().clone(),
vec![9, 19, 29, 39, 49, 59, 69, 79, 89, 99]
);
x_c.lock().unwrap().clear();
throttle_subscribe(ThrottleEdge::Leading);
std::thread::sleep(Duration::from_millis(205));
assert_eq!(
x_c.lock().unwrap().clone(),
vec![0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
);
}
#[test]
fn fork_and_shared() {
observable::of!(0..10)
.throttle_time(Duration::from_nanos(1), ThrottleEdge::Leading)
.fork()
.to_shared()
.fork()
.to_shared()
.subscribe(|_| {});
}