use super::scheduler::*;
use std::sync::{Arc};
use std::marker::{PhantomData};
use futures::prelude::*;
use futures::channel::oneshot;
use futures::future::{Future, BoxFuture};
use std::cell::{UnsafeCell};
use std::mem;
use std::panic::{UnwindSafe};
use std::result::{Result};
pub struct Desync<T: Send> {
queue: Arc<JobQueue>,
data: *mut T,
_marker: PhantomData<UnsafeCell<T>>
}
unsafe impl<T: Send> Send for Desync<T> {}
unsafe impl<T: Send> Sync for Desync<T> {}
impl<T: Send+UnwindSafe> UnwindSafe for Desync<T> {}
struct DataRef<T: Send>(*mut T);
unsafe impl<T: Send> Send for DataRef<T> {}
impl<T: 'static+Send> Desync<T> {
pub fn new(data: T) -> Desync<T> {
let queue = queue();
Desync {
queue: queue,
data: Box::into_raw(Box::new(data)),
_marker: PhantomData
}
}
#[inline]
#[deprecated(since="0.3.0", note="please use `desync` instead")]
pub fn r#async<TFn>(&self, job: TFn)
where TFn: 'static+Send+FnOnce(&mut T) -> () {
self.desync(job)
}
pub fn desync<TFn>(&self, job: TFn)
where TFn: 'static+Send+FnOnce(&mut T) -> () {
let data = DataRef::<T>(self.data);
desync(&self.queue, move || {
let data = data.0;
job(unsafe { &mut *data });
})
}
pub fn sync<TFn, Result>(&self, job: TFn) -> Result
where TFn: Send+FnOnce(&mut T) -> Result, Result: Send {
let result = {
let data = DataRef::<T>(self.data);
sync(&self.queue, move || {
let data = data.0;
job(unsafe { &mut *data })
})
};
result
}
pub fn try_sync<TFn, FnResult>(&self, job: TFn) -> Result<FnResult, TrySyncError>
where TFn: Send+FnOnce(&mut T) -> FnResult, FnResult: Send {
let result = {
let data = DataRef::<T>(self.data);
try_sync(&self.queue, move || {
let data = data.0;
job(unsafe { &mut *data })
})
};
result
}
#[inline]
#[deprecated(since="0.7.0", note="please use either `future_desync` or `future_sync` to schedule futures")]
pub fn future<TFn, TOutput>(&self, job: TFn) -> impl Future<Output=Result<TOutput, oneshot::Canceled>>+Send
where TFn: 'static+Send+for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, TOutput>,
TOutput: 'static+Send {
self.future_desync(job)
}
pub fn future_desync<TFn, TOutput>(&self, job: TFn) -> SchedulerFuture<TOutput>
where
TFn: 'static + Send + for<'borrow> FnOnce(&'borrow mut T) -> BoxFuture<'borrow, TOutput>,
TOutput: 'static + Send,
{
let data = DataRef::<T>(self.data);
scheduler().future_desync(&self.queue, move || {
let data = data.0;
let job = job(unsafe { &mut *data });
async {
job.await
}
})
}
pub fn future_sync<'a, TFn, TOutput>(&'a self, job: TFn) -> impl 'a + Future<Output=Result<TOutput, oneshot::Canceled>> + Send
where
TFn: 'a + Send + for<'borrow> FnOnce(&'borrow mut T) -> BoxFuture<'borrow, TOutput>,
TOutput: 'a + Send,
{
let data = DataRef::<T>(self.data);
scheduler().future_sync(&self.queue, move || {
let data = data.0;
let job = job(unsafe { &mut *data });
async {
job.await
}
})
}
pub fn after<TFn, Res, Fut>(&self, after: Fut, job: TFn) -> impl Future<Output=Result<Res, oneshot::Canceled>> + Send
where
Res: 'static + Send,
Fut: 'static + Future + Send,
TFn: 'static + Send + FnOnce(&mut T, Fut::Output) -> Res
{
self.future_desync(move |data| {
async move {
let future_result = after.await;
job(data, future_result)
}.boxed()
})
}
}
impl<T: Send> Drop for Desync<T> {
fn drop(&mut self) {
use std::thread;
let data = DataRef::<T>(self.data);
if thread::panicking() {
scheduler().sync_no_panic(&self.queue, move || {
let data = data.0;
mem::drop(unsafe { Box::from_raw(data) });
});
} else {
sync(&self.queue, move || {
let data = data.0;
mem::drop(unsafe { Box::from_raw(data) });
});
}
}
}