use crate::{lio, lio::Lio, registration::Registration, typed_op::TypedOp};
use std::{
future::Future,
pin::Pin,
sync::mpsc as std_mpsc,
task::{Context, Poll},
time::Duration,
};
#[must_use = "Io doesn't schedule any operation on itself."]
pub struct Io<T>
where
T: Send,
{
op: T,
handle: LioHandle,
}
impl<T> Io<T>
where
T: TypedOp,
{
#[inline]
#[deprecated(
since = "0.4.0",
note = "wait() requires external event loop management. Use .send() with manual lio.run() calls or .when_done() instead."
)]
pub fn wait(self) -> T::Result
where
T::Result: Send,
{
self.send().recv()
}
#[inline]
pub fn send(self) -> Receiver<T::Result>
where
T::Result: Send,
{
let (sender, receiver) = std_mpsc::channel();
self.send_with(sender);
Receiver { recv: Some(receiver) }
}
#[inline]
pub fn send_with(self, sender: std_mpsc::Sender<T::Result>)
where
T::Result: Send,
{
self.when_done(move |res| {
let _ = sender.send(res);
});
}
pub fn when_done<F>(self, f: F)
where
F: FnOnce(T::Result) + Send + 'static,
{
let (lio, typed_op) = self.into_lio();
let mut boxed = Box::new(typed_op);
let op = boxed.into_op();
lio
.schedule(op, Registration::new_callback_boxed::<T, F>(f, boxed))
.expect("lio error: lio should handle this");
}
}
enum LioHandle {
GloballyInstalled,
Custom(Lio),
}
pub struct Receiver<T> {
recv: Option<std_mpsc::Receiver<T>>,
}
impl<T> Receiver<T> {
pub fn recv(mut self) -> T {
match self.get_inner() {
Some(value) => value
.recv()
.expect("internal lio error: Sender dropped without sending"),
None => unreachable!(),
}
}
pub fn recv_timeout(&mut self, duration: Duration) -> Option<T> {
match self.get_inner() {
Some(value) => match value.recv_timeout(duration) {
Ok(v) => Some(v),
Err(err) => match err {
std_mpsc::RecvTimeoutError::Timeout => {
self.set_inner(value);
None
}
std_mpsc::RecvTimeoutError::Disconnected => unreachable!(),
},
},
None => panic!(
"lio consumer error: Tried running BlockingReceiver::recv_timeout after first one returned value."
),
}
}
pub fn try_recv(&mut self) -> Option<T> {
match self.get_inner() {
Some(receiver) => match receiver.try_recv() {
Ok(value) => Some(value),
Err(err) => match err {
std_mpsc::TryRecvError::Empty => {
self.set_inner(receiver);
None
}
std_mpsc::TryRecvError::Disconnected => panic!(
"internal lio error: sender didn't send before getting dropped."
),
},
},
None => panic!(
"lio consumer error: Tried running BlockingReceiver::try_recv after first one returned value."
),
}
}
}
impl<T> Receiver<T> {
fn get_inner(&mut self) -> Option<std_mpsc::Receiver<T>> {
self.recv.take()
}
fn set_inner(&mut self, value: std_mpsc::Receiver<T>) {
if self.recv.replace(value).is_some() {
panic!("internal lio error");
};
}
}
impl<T> Io<T>
where
T: Send,
{
pub fn from_op(op: T) -> Self {
Self { op, handle: LioHandle::GloballyInstalled }
}
pub fn with_lio(self, lio: &Lio) -> Self {
Io { op: self.op, handle: LioHandle::Custom(lio.clone()) }
}
fn into_lio(self) -> (Lio, T) {
let lio = match self.handle {
LioHandle::GloballyInstalled => lio::get_global().expect(
"No Lio instance available. Either call install_global(lio) or use .with_lio(&lio) before consuming the operation.",
),
LioHandle::Custom(lio) => lio,
};
(lio, self.op)
}
}
impl<T> IntoFuture for Io<T>
where
T: TypedOp + Unpin + 'static,
{
type Output = T::Result;
type IntoFuture = IoFuture<T>;
fn into_future(self) -> Self::IntoFuture {
let (lio, op) = self.into_lio();
IoFuture { state: IoFutureState::Pending(op), lio }
}
}
pub struct IoFuture<T> {
state: IoFutureState<T>,
lio: Lio,
}
enum IoFutureState<T> {
Pending(T),
Inflight { id: u64, op: Box<T> },
Done,
}
impl<T> Future for IoFuture<T>
where
T: TypedOp + Unpin,
{
type Output = T::Result;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let this = &mut *self;
match std::mem::replace(&mut this.state, IoFutureState::Done) {
IoFutureState::Pending(typed) => {
let mut boxed = Box::new(typed);
let op = boxed.into_op();
let id = this
.lio
.schedule(op, Registration::new_waker(cx.waker().clone()))
.expect("lio error: failed to schedule operation");
this.state = IoFutureState::Inflight { id, op: boxed };
Poll::Pending
}
IoFutureState::Inflight { id, op } => match this.lio.check_done(id) {
Ok(result) => Poll::Ready(op.extract_result(result)),
Err(crate::lio::Error::EntryNotCompleted) => {
this.lio.set_waker(id, cx.waker().clone());
this.state = IoFutureState::Inflight { id, op };
Poll::Pending
}
Err(crate::lio::Error::EntryNotFound) => {
panic!("lio bookkeeping bug: operation entry not found");
}
},
IoFutureState::Done => {
panic!("IoFuture polled after completion");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api;
use std::task::{RawWaker, RawWakerVTable, Waker};
#[test]
fn test_blocking_receiver_is_send() {
fn assert_send<T: Send>() {}
assert_send::<Receiver<()>>();
}
fn noop_waker() -> Waker {
const VTABLE: RawWakerVTable = RawWakerVTable::new(
|p| RawWaker::new(p, &VTABLE),
|_| {},
|_| {},
|_| {},
);
unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
}
fn run_until_done(lio: &Lio) {
use std::time::Duration;
lio.try_run().unwrap();
lio.run_timeout(Duration::from_millis(10)).unwrap();
}
#[test]
fn test_io_future_completes() {
let lio = Lio::new(64).unwrap();
let io = api::nop().with_lio(&lio);
let mut future = io.into_future();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let poll1 = Pin::new(&mut future).poll(&mut cx);
assert!(poll1.is_pending());
run_until_done(&lio);
let poll2 = Pin::new(&mut future).poll(&mut cx);
assert!(poll2.is_ready());
}
#[test]
#[should_panic(expected = "IoFuture polled after completion")]
fn test_io_future_panics_when_polled_after_completion() {
let lio = Lio::new(64).unwrap();
let io = api::nop().with_lio(&lio);
let mut future = io.into_future();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = Pin::new(&mut future).poll(&mut cx);
run_until_done(&lio);
let poll = Pin::new(&mut future).poll(&mut cx);
assert!(poll.is_ready());
let _ = Pin::new(&mut future).poll(&mut cx);
}
#[test]
fn test_io_future_state_transitions() {
let lio = Lio::new(64).unwrap();
let io = api::nop().with_lio(&lio);
let mut future = io.into_future();
assert!(matches!(future.state, IoFutureState::Pending(_)));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = Pin::new(&mut future).poll(&mut cx);
assert!(matches!(future.state, IoFutureState::Inflight { .. }));
run_until_done(&lio);
let _ = Pin::new(&mut future).poll(&mut cx);
assert!(matches!(future.state, IoFutureState::Done));
}
#[test]
fn test_io_future_multiple_pending_polls() {
let lio = Lio::new(64).unwrap();
let io = api::nop().with_lio(&lio);
let mut future = io.into_future();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let poll1 = Pin::new(&mut future).poll(&mut cx);
assert!(poll1.is_pending());
let poll2 = Pin::new(&mut future).poll(&mut cx);
assert!(poll2.is_pending());
assert!(matches!(future.state, IoFutureState::Inflight { .. }));
run_until_done(&lio);
let poll3 = Pin::new(&mut future).poll(&mut cx);
assert!(poll3.is_ready());
}
#[test]
fn test_multiple_futures_can_coexist() {
let lio = Lio::new(64).unwrap();
let fut1 = api::nop().with_lio(&lio).into_future();
let fut2 = api::nop().with_lio(&lio).into_future();
let fut3 = api::nop().with_lio(&lio).into_future();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut fut1 = fut1;
let mut fut2 = fut2;
let mut fut3 = fut3;
let poll1 = Pin::new(&mut fut1).poll(&mut cx);
let poll2 = Pin::new(&mut fut2).poll(&mut cx);
let poll3 = Pin::new(&mut fut3).poll(&mut cx);
assert!(poll1.is_pending());
assert!(poll2.is_pending());
assert!(poll3.is_pending());
run_until_done(&lio);
let poll1 = Pin::new(&mut fut1).poll(&mut cx);
let poll2 = Pin::new(&mut fut2).poll(&mut cx);
let poll3 = Pin::new(&mut fut3).poll(&mut cx);
assert!(poll1.is_ready());
assert!(poll2.is_ready());
assert!(poll3.is_ready());
}
#[test]
fn test_lio_is_clone() {
let lio1 = Lio::new(64).unwrap();
let lio2 = lio1.clone();
let _fut1 = api::nop().with_lio(&lio1).into_future();
let _fut2 = api::nop().with_lio(&lio2).into_future();
run_until_done(&lio1);
}
#[test]
fn test_global_lio_install_and_use() {
let _ = lio::uninstall_global();
let lio = Lio::new(64).unwrap();
let lio_for_run = lio.clone();
lio::install_global(lio);
let mut receiver = api::nop().send();
run_until_done(&lio_for_run);
let result = receiver.try_recv();
assert!(result.is_some());
assert!(result.unwrap().is_ok());
let _ = lio::uninstall_global();
}
#[test]
fn test_global_lio_uninstall() {
let _ = lio::uninstall_global();
let lio = Lio::new(64).unwrap();
lio::install_global(lio);
let uninstalled = lio::uninstall_global();
assert!(uninstalled.is_some());
let uninstalled2 = lio::uninstall_global();
assert!(uninstalled2.is_none());
}
#[test]
#[should_panic(expected = "Global Lio already installed")]
fn test_global_lio_double_install_panics() {
let _ = lio::uninstall_global();
let lio1 = Lio::new(64).unwrap();
let lio2 = Lio::new(64).unwrap();
lio::install_global(lio1);
lio::install_global(lio2); }
#[test]
#[should_panic(expected = "No Lio instance available")]
fn test_no_global_panics() {
let _ = lio::uninstall_global();
api::nop().when_done(|_| {});
}
}