use crate::{
channels::local_channel::{self, LocalReceiver, LocalSender},
controllers::ControllerStatus,
enclose,
task,
Latency,
Shares,
SharesManager,
TaskQueueHandle,
};
use futures_lite::StreamExt;
use log::{trace, warn};
use std::{
cell::{Cell, RefCell},
collections::VecDeque,
fmt,
future::Future,
io,
pin::Pin,
rc::Rc,
time::{Duration, Instant},
};
pub trait DeadlineSource {
type Output;
fn expected_duration(&self) -> Duration;
fn action(self: Rc<Self>) -> Pin<Box<dyn Future<Output = Self::Output> + 'static>>;
fn total_units(&self) -> u64;
fn processed_units(&self) -> u64;
}
impl<T> fmt::Debug for dyn DeadlineSource<Output = T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"DeadlineSource processed {} out of {}",
self.processed_units(),
self.total_units()
)
}
}
#[derive(Debug)]
pub struct PriorityBump<T> {
queue: Rc<InnerQueue<T>>,
}
impl<T> PriorityBump<T> {
fn new(queue: Rc<InnerQueue<T>>) -> PriorityBump<T> {
queue.min_shares.set(250);
PriorityBump { queue }
}
}
impl<T> Drop for PriorityBump<T> {
fn drop(&mut self) {
self.queue.min_shares.set(1);
}
}
type QueueItem<T> = Rc<dyn DeadlineSource<Output = T>>;
#[derive(Debug)]
struct InnerQueue<T> {
queue: RefCell<VecDeque<(Instant, QueueItem<T>)>>,
last_admitted: Cell<Instant>,
_last_adjusted: Cell<Instant>,
last_shares: Cell<usize>,
accumulated_error: Cell<f64>,
adjustment_period: Duration,
last_error: Cell<f64>,
min_shares: Cell<usize>,
state: Cell<ControllerStatus>,
}
impl<T> SharesManager for InnerQueue<T> {
fn shares(&self) -> usize {
if let ControllerStatus::Disabled(shares) = self.state.get() {
return shares;
}
let queue = self.queue.borrow();
let mut expected = 0.0;
let mut processed = 0.0;
let now = Instant::now();
for (exp, source) in queue.iter() {
let remaining_time = exp.saturating_duration_since(now);
trace!(
"Remaining time for this source: {:#?}, total_units {}",
remaining_time,
source.total_units()
);
let time_fraction =
1.0 - (remaining_time.as_secs_f64() / source.expected_duration().as_secs_f64());
if remaining_time.as_nanos() == 0 && now.saturating_duration_since(*exp).as_secs() > 5 {
self.last_shares.set(1000);
return 1000;
}
expected += source.total_units() as f64 * time_fraction;
processed += source.processed_units() as f64;
}
if expected < 0.01 {
return self.last_shares.get();
}
let error = 1.0 - processed / expected;
let accumulated_error = self.accumulated_error.get();
let acc = accumulated_error + error;
self.accumulated_error.set(acc);
let delta_error = error - self.last_error.get();
self.last_error.set(error);
let kp = 850.0;
let ki = kp / 6.0;
let dshares = ki * error + kp * delta_error;
let mut shares = (dshares + self.last_shares.get() as f64) as isize;
shares = std::cmp::min(shares, 1000);
shares = std::cmp::max(shares, self.min_shares.get() as isize);
let shares = shares as usize;
trace!(
"processed: {}. expected: {} error: {}, delta_error {} , kp term {}, ki term {}, \
shares: {}",
processed,
expected,
error,
delta_error,
ki * error,
kp * delta_error,
shares
);
self.last_shares.set(shares);
shares
}
fn adjustment_period(&self) -> Duration {
self.adjustment_period
}
}
impl<T> InnerQueue<T> {
fn new(adjustment_period: Duration) -> Self {
let now = Instant::now();
Self {
queue: RefCell::new(VecDeque::new()),
last_admitted: Cell::new(now),
_last_adjusted: Cell::new(now),
last_shares: Cell::new(1),
accumulated_error: Cell::new(0.0),
adjustment_period,
last_error: Cell::new(0.0),
min_shares: Cell::new(1),
state: Cell::new(ControllerStatus::Enabled),
}
}
fn admit(&self, source: Rc<dyn DeadlineSource<Output = T>>) -> io::Result<()> {
let expiration = Instant::now()
.checked_add(source.expected_duration())
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?;
expiration
.checked_duration_since(self.last_admitted.get())
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?;
self.last_admitted.set(expiration);
let mut queue = self.queue.borrow_mut();
queue.push_back((expiration, source.clone()));
Ok(())
}
}
#[derive(Debug)]
pub struct DeadlineQueue<T> {
tq: TaskQueueHandle,
sender: LocalSender<Rc<dyn DeadlineSource<Output = T>>>,
responder: LocalReceiver<T>,
_handle: task::join_handle::JoinHandle<()>,
queue: Rc<InnerQueue<T>>,
}
impl<T: 'static> DeadlineQueue<T> {
pub fn new(name: &'static str, adjustment_period: Duration) -> DeadlineQueue<T> {
let queue = Rc::new(InnerQueue::new(adjustment_period));
let lat = {
if adjustment_period < Duration::from_millis(100) {
Latency::Matters(adjustment_period)
} else {
Latency::NotImportant
}
};
let tq = crate::executor().create_task_queue(Shares::Dynamic(queue.clone()), lat, name);
let (sender, receiver): (LocalSender<QueueItem<T>>, LocalReceiver<QueueItem<T>>) =
local_channel::new_bounded(1);
let (response_sender, responder): (LocalSender<T>, LocalReceiver<T>) =
local_channel::new_bounded(1);
let queue_weak = Rc::downgrade(&queue);
let handle = crate::spawn_local_into(
enclose! { (queue_weak) async move {
let response = Rc::new(response_sender);
let mut stream = receiver.stream();
while let Some(request) = stream.next().await {
let res = request.action().await;
if let Some(queue) = queue_weak.upgrade() {
let mut queue = queue.queue.borrow_mut();
queue.pop_front().unwrap();
}
if response.send(res).await.is_err() {
warn!("receiver channel broken!");
break;
}
}
}},
tq,
)
.unwrap()
.detach();
DeadlineQueue {
tq,
sender,
responder,
_handle: handle,
queue,
}
}
pub async fn push_work(&self, source: Rc<dyn DeadlineSource<Output = T>>) -> io::Result<T> {
self.queue.admit(source.clone())?;
self.sender.send(source.clone()).await?;
self.responder.recv().await.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "no response from response channel")
})
}
pub fn task_queue(&self) -> TaskQueueHandle {
self.tq
}
pub fn bump_priority(&self) -> PriorityBump<T> {
PriorityBump::new(self.queue.clone())
}
pub fn disable(&self, mut shares: usize) {
shares = std::cmp::min(shares, 1000);
shares = std::cmp::max(shares, 1);
self.queue.state.set(ControllerStatus::Disabled(shares));
}
pub fn enable(&self) {
self.queue.state.set(ControllerStatus::Enabled);
}
pub fn status(&self) -> ControllerStatus {
self.queue.state.get()
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
enclose,
timer::{Timer, TimerActionRepeat},
};
struct DeadlineSourceTest {
duration: Duration,
total_units: usize,
processed_units: Cell<usize>,
drop_guarantee: Rc<Cell<bool>>,
}
impl DeadlineSourceTest {
fn new(duration: Duration, total_units: usize) -> Rc<Self> {
Rc::new(Self {
duration,
total_units,
processed_units: Cell::new(0),
drop_guarantee: Rc::new(Cell::new(false)),
})
}
async fn wait(self: Rc<Self>) -> usize {
Timer::new(self.duration).await;
0
}
}
impl Drop for DeadlineSourceTest {
fn drop(&mut self) {
self.drop_guarantee.set(true);
}
}
impl DeadlineSource for DeadlineSourceTest {
type Output = usize;
fn expected_duration(&self) -> Duration {
self.duration
}
fn action(self: Rc<Self>) -> Pin<Box<dyn Future<Output = Self::Output> + 'static>> {
Box::pin(self.wait())
}
fn total_units(&self) -> u64 {
self.total_units as _
}
fn processed_units(&self) -> u64 {
self.processed_units.get() as _
}
}
#[test]
fn deadline_queue_does_not_accept_non_monotonic_durations() {
test_executor!(async move {
let queue = Rc::new(DeadlineQueue::new("example", Duration::from_millis(1)));
let tq = crate::spawn_local(enclose! { (queue) async move {
let test = DeadlineSourceTest::new(Duration::from_secs(2_u64), 1);
let res = queue.push_work(test).await.unwrap();
assert_eq!(res, 0);
}})
.detach();
Timer::new(Duration::from_millis(1)).await;
let test = DeadlineSourceTest::new(Duration::from_secs(1_u64), 1);
match queue.push_work(test).await {
Err(x) => assert_eq!(x.kind(), io::ErrorKind::InvalidInput),
Ok(_) => panic!("should have failed"),
}
tq.await;
});
}
#[test]
fn deadline_queue_behaves_well_with_zero_total_units() {
test_executor!(async move {
let queue = Rc::new(DeadlineQueue::<usize>::new(
"example",
Duration::from_millis(1),
));
let test = DeadlineSourceTest::new(Duration::from_secs(1_u64), 0);
let res = queue.push_work(test).await.unwrap();
assert_eq!(res, 0);
});
}
#[test]
fn deadline_queue_successfully_drops_item_when_done() {
test_executor!(async move {
let queue = Rc::new(DeadlineQueue::<usize>::new(
"example",
Duration::from_millis(1),
));
let test = DeadlineSourceTest::new(Duration::from_secs(1_u64), 0);
let drop_happens = test.drop_guarantee.clone();
let res = queue.push_work(test).await.unwrap();
assert_eq!(res, 0);
assert!(drop_happens.get());
});
}
#[test]
fn deadline_queue_behaves_well_if_we_process_too_much() {
test_executor!(async move {
let queue = Rc::new(DeadlineQueue::<usize>::new(
"example",
Duration::from_millis(1),
));
let test = DeadlineSourceTest::new(Duration::from_secs(1_u64), 1000);
let tq = crate::spawn_local(enclose! { (queue, test) async move {
let res = queue.push_work(test).await.unwrap();
assert_eq!(res, 0);
}})
.detach();
Timer::new(Duration::from_millis(2)).await;
test.processed_units.set(1000 * 1000);
Timer::new(Duration::from_millis(2)).await;
assert_eq!(queue.queue.shares(), 1);
tq.await;
});
}
#[test]
fn deadline_queue_shares_increase_if_we_dont_process() {
test_executor!(async move {
let queue = Rc::new(DeadlineQueue::<usize>::new(
"example",
Duration::from_millis(1),
));
let test = DeadlineSourceTest::new(Duration::from_secs(1_u64), 1000);
let tq = crate::spawn_local(enclose! { (queue, test) async move {
let res = queue.push_work(test).await.unwrap();
assert_eq!(res, 0);
}})
.detach();
Timer::new(Duration::from_millis(900)).await;
assert!(queue.queue.shares() > 800);
tq.await;
});
}
#[test]
fn deadline_queue_shares_ok_if_we_process_smoothly() {
test_executor!(async move {
let queue = Rc::new(DeadlineQueue::<usize>::new(
"example",
Duration::from_millis(10),
));
let test = DeadlineSourceTest::new(Duration::from_secs(1_u64), 1000);
let tq = crate::spawn_local(enclose! { (queue, test) async move {
let res = queue.push_work(test).await.unwrap();
assert_eq!(res, 0);
}})
.detach();
let start = Instant::now();
let last_shares = Rc::new(Cell::new(0));
let action = TimerActionRepeat::repeat(move || {
enclose! { (queue, test, last_shares) async move {
let elapsed = start.elapsed().as_millis();
let shares = queue.queue.shares();
let old = last_shares.replace(shares) as isize;
if elapsed > 500 && elapsed < 850 {
let diff = old - shares as isize;
assert!(diff.abs() < 200, "Found diff: {}", diff);
}
if test.processed_units.replace(elapsed as usize) < 1000 {
Some(Duration::from_millis(50))
} else {
None
}
}}
});
tq.await;
action.join().await;
});
}
#[test]
fn deadline_queue_second_queued_item_increases_slope() {
test_executor!(async move {
let queue = Rc::new(DeadlineQueue::new("example", Duration::from_millis(1)));
let tq = crate::spawn_local(enclose! { (queue) async move {
let test = DeadlineSourceTest::new(Duration::from_secs(1_u64), 1);
let res = queue.push_work(test).await.unwrap();
assert_eq!(res, 0);
}})
.detach();
Timer::new(Duration::from_millis(2)).await;
let shares_first = queue.queue.shares();
let tq2 = crate::spawn_local(enclose! { (queue) async move {
let test = DeadlineSourceTest::new(Duration::from_secs(1_u64), 1000);
let res = queue.push_work(test).await.unwrap();
assert_eq!(res, 0);
}})
.detach();
Timer::new(Duration::from_millis(2)).await;
let shares_second = queue.queue.shares();
assert!(shares_second > shares_first);
tq.await;
tq2.await;
});
}
}