use super::future_hash_map::FutureHashMap;
use crate::scheduler::{self, ScheduleRequest, Scheduler};
use futures::{Future, Stream, StreamExt};
use pin_project::pin_project;
use std::{
hash::Hash,
pin::Pin,
task::{Context, Poll},
};
#[pin_project]
pub struct Runner<T, R, F, MkF> {
#[pin]
scheduler: Scheduler<T, R>,
run_msg: MkF,
slots: FutureHashMap<T, F>,
}
impl<T, R, F, MkF> Runner<T, R, F, MkF>
where
F: Future + Unpin,
MkF: FnMut(&T) -> F,
{
pub fn new(scheduler: Scheduler<T, R>, run_msg: MkF) -> Self {
Self {
scheduler,
run_msg,
slots: FutureHashMap::default(),
}
}
}
impl<T, R, F, MkF> Stream for Runner<T, R, F, MkF>
where
T: Eq + Hash + Clone + Unpin,
R: Stream<Item = ScheduleRequest<T>>,
F: Future + Unpin,
MkF: FnMut(&T) -> F,
{
type Item = scheduler::Result<F::Output>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let slots = this.slots;
let scheduler = &mut this.scheduler;
let has_active_slots = match slots.poll_next_unpin(cx) {
Poll::Ready(Some(result)) => return Poll::Ready(Some(Ok(result))),
Poll::Ready(None) => false,
Poll::Pending => true,
};
loop {
let next_msg_poll = scheduler
.as_mut()
.hold_unless(|msg| !slots.contains_key(msg))
.poll_next_unpin(cx);
match next_msg_poll {
Poll::Ready(Some(Ok(msg))) => {
let msg_fut = (this.run_msg)(&msg);
assert!(
slots.insert(msg, msg_fut).is_none(),
"Runner tried to replace a running future.. please report this as a kube-rs bug!"
);
cx.waker().wake_by_ref();
}
Poll::Ready(Some(Err(err))) => break Poll::Ready(Some(Err(err))),
Poll::Ready(None) => {
break if has_active_slots {
Poll::Pending
} else {
Poll::Ready(None)
};
}
Poll::Pending => break Poll::Pending,
}
}
}
}
#[cfg(test)]
mod tests {
use super::Runner;
use crate::scheduler::{scheduler, ScheduleRequest};
use futures::{
channel::{mpsc, oneshot},
future, poll, SinkExt, TryStreamExt,
};
use std::{cell::RefCell, time::Duration};
use tokio::{
runtime::Handle,
task::yield_now,
time::{pause, sleep, timeout, Instant},
};
#[tokio::test]
async fn runner_should_never_run_two_instances_at_once() {
pause();
let rc = RefCell::new(());
let mut count = 0;
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
Runner::new(scheduler(sched_rx), |_| {
count += 1;
let mutex_ref = rc.borrow_mut();
Box::pin(async move {
sleep(Duration::from_secs(1)).await;
drop(mutex_ref);
})
})
.try_for_each(|_| async { Ok(()) }),
);
sched_tx
.send(ScheduleRequest {
message: (),
run_at: Instant::now(),
})
.await
.unwrap();
assert!(poll!(runner.as_mut()).is_pending());
sched_tx
.send(ScheduleRequest {
message: (),
run_at: Instant::now(),
})
.await
.unwrap();
let ((), run) = future::join(
async {
tokio::time::sleep(Duration::from_secs(5)).await;
drop(sched_tx);
},
runner,
)
.await;
run.unwrap();
assert_eq!(count, 2);
}
#[tokio::test(flavor = "current_thread")]
async fn runner_should_wake_when_scheduling_messages() {
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let (result_tx, result_rx) = oneshot::channel();
let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg));
Handle::current().spawn(async move { result_tx.send(runner.try_next().await.unwrap()).unwrap() });
yield_now().await;
sched_tx
.send(ScheduleRequest {
message: 8,
run_at: Instant::now(),
})
.await
.unwrap();
assert_eq!(
timeout(Duration::from_secs(1), result_rx).await.unwrap().unwrap(),
Some(8)
);
}
}