use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};
use futures::{select, FutureExt};
use crate::state::SimulationState;
use crate::{Event, EventData, Id, TypedEvent};
pub type EventKey = u64;
pub enum AwaitResult<T: EventData> {
Ok(TypedEvent<T>),
Timeout {
src: Option<Id>,
event_key: Option<EventKey>,
timeout: f64,
},
}
pub struct EventFuture<T: EventData> {
dst: Id,
src: Option<Id>,
event_key: Option<EventKey>,
state: Rc<RefCell<TypedEventAwaitState<T>>>,
sim_state: Rc<RefCell<SimulationState>>,
}
impl<T: EventData> EventFuture<T> {
fn new(
dst: Id,
src: Option<Id>,
event_key: Option<EventKey>,
state: Rc<RefCell<TypedEventAwaitState<T>>>,
sim_state: Rc<RefCell<SimulationState>>,
) -> Self {
Self {
dst,
src,
event_key,
state,
sim_state,
}
}
pub async fn with_timeout(self, timeout: f64) -> AwaitResult<T> {
assert!(timeout >= 0., "Timeout must be a positive value");
let timer_future = self
.sim_state
.borrow_mut()
.create_timer(self.dst, timeout, self.sim_state.clone());
let src = self.src;
let event_key = self.event_key;
select! {
event = self.fuse() => {
AwaitResult::Ok(event)
}
_ = timer_future.fuse() => {
AwaitResult::Timeout { src, event_key, timeout }
}
}
}
}
impl<T: EventData> Future for EventFuture<T> {
type Output = TypedEvent<T>;
fn poll(self: Pin<&mut Self>, async_ctx: &mut Context) -> Poll<Self::Output> {
let mut state = self.state.as_ref().borrow_mut();
if state.completed {
let event = std::mem::take(&mut state.event).expect("Completed EventFuture contains no event");
Poll::Ready(event)
} else {
state.waker = Some(async_ctx.waker().clone());
Poll::Pending
}
}
}
impl<T: EventData> Drop for EventFuture<T> {
fn drop(&mut self) {
if !self.state.borrow().completed && !self.state.borrow().manually_dropped {
self.sim_state
.borrow_mut()
.on_incomplete_event_future_drop::<T>(self.dst, &self.src, self.event_key);
}
}
}
#[derive(Clone)]
pub(crate) struct EventPromise {
state: Rc<RefCell<dyn EventAwaitState>>,
}
impl EventPromise {
pub fn contract<T: EventData>(
dst: Id,
src: Option<Id>,
event_key: Option<EventKey>,
sim_state: Rc<RefCell<SimulationState>>,
) -> (Self, EventFuture<T>) {
let state = Rc::new(RefCell::new(TypedEventAwaitState::<T>::default()));
let future = EventFuture::new(dst, src, event_key, state.clone(), sim_state);
(Self { state }, future)
}
pub fn complete(&self, e: Event) {
if Rc::strong_count(&self.state) > 1 {
self.state.borrow_mut().complete(e);
} else {
panic!("Trying to complete promise which state is no longer shared");
}
}
pub fn drop_state(&mut self) {
let _waker = self.state.borrow_mut().drop();
}
}
struct TypedEventAwaitState<T: EventData> {
pub completed: bool,
pub manually_dropped: bool,
pub event: Option<TypedEvent<T>>,
pub waker: Option<Waker>,
}
impl<T: EventData> Default for TypedEventAwaitState<T> {
fn default() -> Self {
Self {
completed: false,
manually_dropped: false,
event: None,
waker: None,
}
}
}
trait EventAwaitState {
fn complete(&mut self, event: Event);
fn drop(&mut self) -> Option<Waker>;
}
impl<T: EventData> EventAwaitState for TypedEventAwaitState<T> {
fn complete(&mut self, e: Event) {
if self.completed {
panic!("Trying to complete already completed state")
}
self.completed = true;
self.event = Some(Event::downcast::<T>(e));
if let Some(waker) = self.waker.take() {
waker.wake()
}
}
fn drop(&mut self) -> Option<Waker> {
self.manually_dropped = true;
self.event = None;
self.waker.take()
}
}