use std::cell::Cell;
use std::cmp;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Wake, Waker};
use std::time::Instant;
use futures_lite::prelude::*;
use parking::Parker;
use crate::event_loop::Wakeup;
use crate::reactor::Reactor;
use crate::sync::ThreadSafety;
use winit::event::Event;
use winit::event_loop::{ControlFlow, EventLoop, EventLoopProxy, EventLoopWindowTarget};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ReturnOrFinish<O, T> {
Output(O),
FutureReturned(T),
}
pub struct Filter<TS: ThreadSafety> {
deadline: Option<Instant>,
wakers: Vec<Waker>,
parker: Parker,
notifier: Arc<ReactorWaker>,
notifier_waker: Waker,
parker_waker: Waker,
yielding: bool,
reactor: TS::Rc<Reactor<TS>>,
}
impl<TS: ThreadSafety> Filter<TS> {
pub fn new(inner: &EventLoop<Wakeup>) -> Filter<TS> {
let reactor = Reactor::<TS>::get();
let notifier = Arc::new(ReactorWaker {
proxy: Mutex::new(inner.create_proxy()),
notified: AtomicBool::new(true),
awake: AtomicBool::new(false),
});
let notifier_waker = Waker::from(notifier.clone());
reactor.set_proxy(notifier.clone());
let (parker, unparker) = parking::pair();
let parker_waker = Waker::from(Arc::new(EventPostWaker {
reactor_waker: notifier.clone(),
unparker,
}));
Filter {
deadline: None,
wakers: vec![],
parker,
notifier,
notifier_waker,
parker_waker,
yielding: false,
reactor,
}
}
pub fn handle_event<F>(
&mut self,
future: Pin<&mut F>,
event: Event<'_, Wakeup>,
elwt: &EventLoopWindowTarget<Wakeup>,
flow: &mut ControlFlow,
) -> ReturnOrFinish<(), F::Output>
where
F: Future,
{
let output = Cell::new(ReturnOrFinish::Output(()));
let future = {
let output = &output;
async move {
output.set(ReturnOrFinish::FutureReturned(future.await));
}
};
futures_lite::pin!(future);
let about_to_sleep = match &event {
Event::NewEvents(_) => {
self.yielding = false;
self.notifier.awake.store(true, Ordering::SeqCst);
self.deadline = self.reactor.process_timers(&mut self.wakers);
false
}
Event::RedrawEventsCleared => {
self.notifier.awake.store(false, Ordering::SeqCst);
true
}
_ => {
false
}
};
let notifier = self.reactor.post_event(event);
futures_lite::pin!(notifier);
let mut cx = Context::from_waker(&self.parker_waker);
if notifier.as_mut().poll(&mut cx).is_pending() {
self.yielding = false;
let driver = future.as_mut().or(notifier);
futures_lite::pin!(driver);
self.reactor.drain_loop_queue(elwt);
loop {
if let Poll::Ready(()) = driver.as_mut().poll(&mut cx) {
break;
}
self.reactor.drain_loop_queue(elwt);
let deadline = {
let current_deadline = self.reactor.process_timers(&mut self.wakers);
match (current_deadline, self.deadline) {
(None, None) => None,
(Some(x), None) | (None, Some(x)) => Some(x),
(Some(a), Some(b)) => Some(cmp::min(a, b)),
}
};
for waker in self.wakers.drain(..) {
waker.wake();
}
match deadline {
None => self.parker.park(),
Some(deadline) => {
self.parker.park_deadline(deadline);
}
}
}
}
while !self.yielding && self.notifier.notified.swap(false, Ordering::SeqCst) {
let mut cx = Context::from_waker(&self.notifier_waker);
if future.as_mut().poll(&mut cx).is_pending() {
if self.notifier.notified.load(Ordering::SeqCst) {
self.yielding = true;
}
self.reactor.drain_loop_queue(elwt);
}
}
if about_to_sleep {
self.reactor.drain_loop_queue(elwt);
for waker in self.wakers.drain(..) {
waker.wake();
}
}
if let Some(code) = self.reactor.exit_requested() {
flow.set_exit_with_code(code);
} else if self.yielding {
flow.set_poll();
} else if let Some(deadline) = self.deadline {
flow.set_wait_until(deadline);
} else {
flow.set_wait();
}
output.replace(ReturnOrFinish::Output(()))
}
}
pub(crate) struct ReactorWaker {
proxy: Mutex<EventLoopProxy<Wakeup>>,
notified: AtomicBool,
awake: AtomicBool,
}
impl ReactorWaker {
pub(crate) fn notify(&self) {
if self.notified.swap(true, Ordering::SeqCst) {
return;
}
if self.awake.load(Ordering::SeqCst) {
return;
}
self.proxy
.lock()
.unwrap()
.send_event(Wakeup { _private: () })
.ok();
}
}
impl Wake for ReactorWaker {
fn wake(self: Arc<Self>) {
self.notify()
}
fn wake_by_ref(self: &Arc<Self>) {
self.notify()
}
}
struct EventPostWaker {
reactor_waker: Arc<ReactorWaker>,
unparker: parking::Unparker,
}
impl Wake for EventPostWaker {
fn wake(self: Arc<Self>) {
self.reactor_waker.notify();
self.unparker.unpark();
}
fn wake_by_ref(self: &Arc<Self>) {
self.reactor_waker.notify();
self.unparker.unpark();
}
}