use crate::thread_guard::ThreadGuard;
use crate::translate::*;
use futures_core::future::Future;
use futures_core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use std::mem;
use std::pin::{self, Pin};
use std::ptr;
use crate::MainContext;
use crate::MainLoop;
use crate::Priority;
use crate::Source;
use crate::SourceId;
enum FutureWrapper {
Send(FutureObj<'static, ()>),
NonSend(ThreadGuard<LocalFutureObj<'static, ()>>),
}
impl Future for FutureWrapper {
type Output = ();
fn poll(self: pin::Pin<&mut Self>, ctx: &mut Context) -> Poll<()> {
match self.get_mut() {
FutureWrapper::Send(fut) => Pin::new(fut).poll(ctx),
FutureWrapper::NonSend(fut) => Pin::new(fut.get_mut()).poll(ctx),
}
}
}
#[repr(C)]
struct TaskSource {
source: ffi::GSource,
future: FutureWrapper,
waker: Waker,
}
#[repr(C)]
struct WakerSource {
source: ffi::GSource,
}
impl TaskSource {
unsafe extern "C" fn dispatch(
source: *mut ffi::GSource,
callback: ffi::GSourceFunc,
_user_data: ffi::gpointer,
) -> ffi::gboolean {
let source = &mut *(source as *mut TaskSource);
assert!(callback.is_none());
if let Poll::Ready(()) = source.poll() {
ffi::G_SOURCE_REMOVE
} else {
ffi::G_SOURCE_CONTINUE
}
}
unsafe extern "C" fn finalize(source: *mut ffi::GSource) {
let source = source as *mut TaskSource;
match (*source).future {
FutureWrapper::Send(_) => {
ptr::drop_in_place(&mut (*source).future);
}
FutureWrapper::NonSend(ref mut future) if future.is_owner() => {
ptr::drop_in_place(&mut (*source).future);
}
FutureWrapper::NonSend(ref mut future) => {
let context =
ffi::g_source_get_context(source as *mut TaskSource as *mut ffi::GSource);
if !context.is_null() {
let future = ptr::read(future);
let context = MainContext::from_glib_none(context);
context.invoke(move || {
drop(future);
});
} else {
ptr::drop_in_place(&mut (*source).future);
}
}
}
ptr::drop_in_place(&mut (*source).waker);
}
}
impl WakerSource {
unsafe fn clone_raw(waker: *const ()) -> RawWaker {
static VTABLE: RawWakerVTable = RawWakerVTable::new(
WakerSource::clone_raw,
WakerSource::wake_raw,
WakerSource::wake_by_ref_raw,
WakerSource::drop_raw,
);
let waker = waker as *const ffi::GSource;
ffi::g_source_ref(mut_override(waker));
RawWaker::new(waker as *const (), &VTABLE)
}
unsafe fn wake_raw(waker: *const ()) {
Self::wake_by_ref_raw(waker);
Self::drop_raw(waker);
}
unsafe fn wake_by_ref_raw(waker: *const ()) {
let waker = waker as *const ffi::GSource;
ffi::g_source_set_ready_time(mut_override(waker), 0);
}
unsafe fn drop_raw(waker: *const ()) {
let waker = waker as *const ffi::GSource;
ffi::g_source_unref(mut_override(waker));
}
unsafe extern "C" fn dispatch(
source: *mut ffi::GSource,
_callback: ffi::GSourceFunc,
_user_data: ffi::gpointer,
) -> ffi::gboolean {
ffi::g_source_set_ready_time(mut_override(source), -1);
ffi::G_SOURCE_CONTINUE
}
}
unsafe impl Send for TaskSource {}
unsafe impl Sync for TaskSource {}
unsafe impl Send for WakerSource {}
unsafe impl Sync for WakerSource {}
impl TaskSource {
#[allow(clippy::new_ret_no_self)]
fn new(priority: Priority, future: FutureWrapper) -> Source {
unsafe {
static TASK_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
check: None,
prepare: None,
dispatch: Some(TaskSource::dispatch),
finalize: Some(TaskSource::finalize),
closure_callback: None,
closure_marshal: None,
};
static WAKER_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
check: None,
prepare: None,
dispatch: Some(WakerSource::dispatch),
finalize: None,
closure_callback: None,
closure_marshal: None,
};
let source = ffi::g_source_new(
mut_override(&TASK_SOURCE_FUNCS),
mem::size_of::<TaskSource>() as u32,
);
let waker_source = ffi::g_source_new(
mut_override(&WAKER_SOURCE_FUNCS),
mem::size_of::<WakerSource>() as u32,
);
ffi::g_source_set_priority(source, priority.into_glib());
ffi::g_source_add_child_source(source, waker_source);
{
let source = &mut *(source as *mut TaskSource);
ptr::write(&mut source.future, future);
let waker = Waker::from_raw(WakerSource::clone_raw(waker_source as *const ()));
ptr::write(&mut source.waker, waker);
}
ffi::g_source_set_ready_time(waker_source, 0);
ffi::g_source_unref(waker_source);
from_glib_full(source)
}
}
fn poll(&mut self) -> Poll<()> {
let source = &self.source as *const _;
let executor: Borrowed<MainContext> =
unsafe { from_glib_borrow(ffi::g_source_get_context(mut_override(source))) };
assert!(
executor.is_owner(),
"Polling futures only allowed if the thread is owning the MainContext"
);
executor
.with_thread_default(|| {
let _enter = futures_executor::enter().unwrap();
let mut context = Context::from_waker(&self.waker);
Pin::new(&mut self.future).poll(&mut context)
})
.expect("current thread is not owner of the main context")
}
}
impl MainContext {
pub fn spawn<F: Future<Output = ()> + Send + 'static>(&self, f: F) -> SourceId {
self.spawn_with_priority(crate::PRIORITY_DEFAULT, f)
}
pub fn spawn_local<F: Future<Output = ()> + 'static>(&self, f: F) -> SourceId {
self.spawn_local_with_priority(crate::PRIORITY_DEFAULT, f)
}
pub fn spawn_with_priority<F: Future<Output = ()> + Send + 'static>(
&self,
priority: Priority,
f: F,
) -> SourceId {
let f = FutureObj::new(Box::new(f));
let source = TaskSource::new(priority, FutureWrapper::Send(f));
source.attach(Some(self))
}
pub fn spawn_local_with_priority<F: Future<Output = ()> + 'static>(
&self,
priority: Priority,
f: F,
) -> SourceId {
let _acquire = self
.acquire()
.expect("Spawning local futures only allowed on the thread owning the MainContext");
let f = LocalFutureObj::new(Box::new(f));
let source = TaskSource::new(priority, FutureWrapper::NonSend(ThreadGuard::new(f)));
source.attach(Some(self))
}
#[allow(clippy::transmute_ptr_to_ptr)]
pub fn block_on<F: Future>(&self, f: F) -> F::Output {
let mut res = None;
let l = MainLoop::new(Some(self), false);
let l_clone = l.clone();
let f = async {
res = Some(f.await);
l_clone.quit();
};
unsafe {
let f = LocalFutureObj::new(Box::new(f));
let f: LocalFutureObj<'static, ()> = mem::transmute(f);
let source = TaskSource::new(
crate::PRIORITY_DEFAULT,
FutureWrapper::NonSend(ThreadGuard::new(f)),
);
source.attach(Some(self));
}
l.run();
res.unwrap()
}
}
impl Spawn for MainContext {
fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
let source = TaskSource::new(crate::PRIORITY_DEFAULT, FutureWrapper::Send(f));
source.attach(Some(self));
Ok(())
}
}
impl LocalSpawn for MainContext {
fn spawn_local_obj(&self, f: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
let source = TaskSource::new(
crate::PRIORITY_DEFAULT,
FutureWrapper::NonSend(ThreadGuard::new(f)),
);
source.attach(Some(self));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_channel::oneshot;
use futures_util::future::{FutureExt, TryFutureExt};
use std::sync::mpsc;
use std::thread;
#[test]
fn test_spawn() {
let c = MainContext::new();
let l = crate::MainLoop::new(Some(&c), false);
let (sender, receiver) = mpsc::channel();
let (o_sender, o_receiver) = oneshot::channel();
let l_clone = l.clone();
c.spawn(
o_receiver
.and_then(move |()| {
sender.send(()).unwrap();
l_clone.quit();
futures_util::future::ok(())
})
.then(|res| {
assert!(res.is_ok());
futures_util::future::ready(())
}),
);
thread::spawn(move || {
l.run();
});
o_sender.send(()).unwrap();
receiver.recv().unwrap();
}
#[test]
fn test_spawn_local() {
let c = MainContext::new();
let l = crate::MainLoop::new(Some(&c), false);
c.with_thread_default(|| {
let l_clone = l.clone();
c.spawn_local(futures_util::future::lazy(move |_ctx| {
l_clone.quit();
}));
l.run();
})
.unwrap();
}
#[test]
fn test_block_on() {
let c = MainContext::new();
let mut v = None;
{
let v = &mut v;
let future = futures_util::future::lazy(|_ctx| {
*v = Some(123);
Ok::<i32, ()>(123)
});
let res = c.block_on(future);
assert_eq!(res, Ok(123));
}
assert_eq!(v, Some(123));
}
}