use crate::{
Error,
sched::{
Poll,
subscription::{RwEventFlags, Subscription},
},
};
use std::future::{self, Future};
use std::pin::{Pin, pin};
use std::task::{Context, Poll as FPoll};
struct FirstReady<'a, T>(Vec<Pin<Box<dyn Future<Output = T> + Send + 'a>>>);
impl<'a, T> FirstReady<'a, T> {
fn new() -> Self {
FirstReady(Vec::new())
}
fn push(&mut self, f: impl Future<Output = T> + Send + 'a) {
self.0.push(Box::pin(f));
}
}
impl<'a, T> Future for FirstReady<'a, T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> FPoll<T> {
let mut result = FPoll::Pending;
for f in self.as_mut().0.iter_mut() {
match f.as_mut().poll(cx) {
FPoll::Ready(r) => match result {
FPoll::Pending => {
result = FPoll::Ready(r);
}
_ => {}
},
_ => continue,
}
}
return result;
}
}
pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
if poll.is_empty() {
return Ok(());
}
let duration = poll
.earliest_clock_deadline()
.map(|sub| sub.duration_until());
let mut futures = FirstReady::new();
for s in poll.rw_subscriptions() {
match s {
Subscription::Read(f) => {
futures.push(async move {
f.file
.readable()
.await
.map_err(|e| e.context("readable future"))?;
f.complete(
f.file
.num_ready_bytes()
.map_err(|e| e.context("read num_ready_bytes"))?,
RwEventFlags::empty(),
);
Ok::<(), Error>(())
});
}
Subscription::Write(f) => {
futures.push(async move {
f.file
.writable()
.await
.map_err(|e| e.context("writable future"))?;
f.complete(0, RwEventFlags::empty());
Ok(())
});
}
Subscription::MonotonicClock { .. } => unreachable!(),
}
}
match duration {
Some(Some(remaining)) => match tokio::time::timeout(remaining, futures).await {
Ok(r) => r?,
Err(_deadline_elapsed) => {}
},
Some(None) => {
let mut futures = pin!(futures);
future::poll_fn(|cx| match futures.as_mut().poll(cx) {
FPoll::Ready(e) => FPoll::Ready(e),
FPoll::Pending => FPoll::Ready(Ok(())),
})
.await?
}
None => futures.await?,
}
Ok(())
}