use super::ThrottleRate;
use futures::channel::oneshot::{Receiver, Sender};
use futures::Future;
use log::{log_enabled, trace};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Clone)]
pub struct ThrottlePool {
inner: Arc<ThrottlePoolInner>,
}
impl ThrottlePool {
pub fn new(rate: ThrottleRate) -> Self {
let mut slots = Vec::with_capacity(rate.count());
for _ in 0..rate.count() {
slots.push(Mutex::new(Slot {
wait_until: Instant::now() - rate.duration(),
hold: None,
}));
}
Self {
inner: Arc::new(ThrottlePoolInner {
rate_duration: rate.duration(),
slots,
}),
}
}
pub fn queue(&self) -> impl Future<Output = ()> {
let queue = self.queue_with_hold();
async move {
queue.await;
}
}
pub fn queue_with_hold(&self) -> impl Future<Output = HoldHandle> {
let inner = self.inner.clone();
async move {
loop {
let now = Instant::now();
let mut sleep = inner.rate_duration;
for slot in &inner.slots {
if let Ok(mut slot) = slot.try_lock() {
if slot.wait_until <= now {
if let Some(rx) = &mut slot.hold {
if rx.try_recv().is_err() {
slot.wait_until = now + inner.rate_duration;
slot.hold = None;
}
else {
sleep = Duration::from_secs(0);
break;
}
}
else {
let (tx, rx) = futures::channel::oneshot::channel();
slot.hold = Some(rx);
return HoldHandle { tx: Some(tx) };
}
}
else {
sleep = std::cmp::min(slot.wait_until - now, sleep);
}
}
else {
sleep = Duration::from_secs(0);
break;
}
}
if log_enabled!(log::Level::Trace) {
trace!("Sleeping for {:?}", sleep);
}
delay_for(sleep).await;
}
}
}
}
#[derive(Debug)]
struct ThrottlePoolInner {
rate_duration: Duration,
slots: Vec<Mutex<Slot>>, }
#[derive(Debug)]
struct Slot {
wait_until: Instant,
hold: Option<Receiver<()>>,
}
pub struct HoldHandle {
tx: Option<Sender<()>>,
}
impl HoldHandle {
pub fn release(mut self) {
self.tx.take();
}
}
#[cfg(feature = "timer-tokio")]
fn delay_for(dur: Duration) -> impl Future<Output = ()> {
tokio::time::sleep(dur)
}
#[cfg(feature = "timer-futures-timer")]
fn delay_for(dur: Duration) -> impl Future<Output = ()> {
futures_timer::Delay::new(dur)
}