use std::sync::{Arc, Weak};
use std::sync::atomic::{self, Ordering::*};
use std::ptr::NonNull;
use std::ops::{CoerceUnsized, DispatchFromDyn};
use std::marker::Unsize;
use std::cell::UnsafeCell;
use std::alloc::{Global, Alloc, Layout, handle_alloc_error};
use crate::{
thread, Thread, Signal, Handler, MutHandler, Future, Emitter, Expose, Subscription
};
mod actor;
use actor::Actor;
fn is_dangling<T: ?Sized>(ptr: NonNull<T>) -> bool {
let address = ptr.as_ptr() as *mut () as usize;
address == std::usize::MAX
}
const MAX_REFCOUNT: usize = (std::isize::MAX) as usize;
pub struct Remote<T: ?Sized> {
ptr: NonNull<Inner<T>>
}
impl<T: ?Sized + Unsize<U>, U: ?Sized> CoerceUnsized<Remote<U>> for Remote<T> {}
impl<T: ?Sized + Unsize<U>, U: ?Sized> DispatchFromDyn<Remote<U>> for Remote<T> {}
unsafe impl<T: ?Sized> Sync for Remote<T> {}
unsafe impl<T: ?Sized> Send for Remote<T> {}
pub struct WeakRemote<T: ?Sized> {
ptr: NonNull<Inner<T>>
}
impl<T: ?Sized + Unsize<U>, U: ?Sized> CoerceUnsized<WeakRemote<U>> for WeakRemote<T> {}
impl<T: ?Sized + Unsize<U>, U: ?Sized> DispatchFromDyn<WeakRemote<U>> for WeakRemote<T> {}
unsafe impl<T: ?Sized> Sync for WeakRemote<T> {}
unsafe impl<T: ?Sized> Send for WeakRemote<T> {}
impl<T: ?Sized> PartialEq for Remote<T> {
fn eq(&self, other: &Remote<T>) -> bool {
self.ptr == other.ptr
}
}
impl<T: ?Sized> Eq for Remote<T> {
}
struct Delivery<F: FnOnce() -> ()> {
callback: Option<F>,
receiver: Receiver
}
unsafe impl<F: FnOnce() -> ()> Send for Delivery<F> {}
impl<F: FnOnce() -> ()> thread::Query for Delivery<F> {
fn process(&mut self) {
let mut callback = None;
std::mem::swap(&mut self.callback, &mut callback);
match callback {
Some(f) => (f)(),
None => ()
}
}
fn receiver(&self) -> Option<Receiver> {
Some(self.receiver)
}
}
impl<T: 'static + ?Sized> Remote<T> {
pub fn from<F: 'static + FnOnce(Remote<T>) -> T + Send>(thread: &Arc<Thread>, constructor: F) -> Remote<T> where T: Sized {
let mut inner : Box<Inner<T>> = Box::new(Inner {
strong: atomic::AtomicUsize::new(1),
weak: atomic::AtomicUsize::new(1),
thread: Arc::downgrade(thread),
actor: UnsafeCell::new(unsafe { Actor::uninitialized() })
});
let remote = Remote {
ptr: Box::into_raw_non_null(inner)
};
let this = remote.clone();
match Thread::current() {
Some(current_thread) if Arc::ptr_eq(¤t_thread, thread) => {
unsafe {
let actor_mut: &mut Actor<T> = &mut *this.inner().actor.get();
actor_mut.initialize(constructor(this))
}
},
_ => {
thread.query(thread::Callback::new(move || {
unsafe {
let actor_mut: &mut Actor<T> = &mut *this.inner().actor.get();
actor_mut.initialize(constructor(this))
}
}));
}
}
remote
}
pub fn new(thread: &Arc<Thread>, t: T) -> Remote<T> where T: Send + Sized {
let mut inner : Box<Inner<T>> = Box::new(Inner {
strong: atomic::AtomicUsize::new(1),
weak: atomic::AtomicUsize::new(1),
thread: Arc::downgrade(thread),
actor: UnsafeCell::new(Actor::new(t))
});
Remote {
ptr: Box::into_raw_non_null(inner)
}
}
pub fn local(t: T) -> std::result::Result<Remote<T>, T> where T: Sized {
match Thread::current() {
Some(thread) => {
let mut inner : Box<Inner<T>> = Box::new(Inner {
strong: atomic::AtomicUsize::new(1),
weak: atomic::AtomicUsize::new(1),
thread: Arc::downgrade(&thread),
actor: UnsafeCell::new(Actor::new(t))
});
Ok(Remote {
ptr: Box::into_raw_non_null(inner)
})
},
None => Err(t)
}
}
pub fn send<S: Signal + 'static>(&self, msg: S) -> Future<S::Response> where T: Handler<S> {
match self.inner().thread.upgrade() {
Some(thread) => {
let (sender, receiver) = crossbeam_channel::bounded(1);
let this = self.clone();
let delivery = Delivery {
callback: Some(move || {
let actor = unsafe {
&*this.inner().actor.get()
};
sender.send(actor.data.handle(msg)).unwrap_or(())
}),
receiver: self.receiver()
};
thread.query(delivery);
Future {
receiver: receiver
}
},
None => panic!("thread died")
}
}
pub fn send_mut<S: Signal + 'static>(&self, msg: S) -> Future<S::Response> where T: MutHandler<S> {
match self.inner().thread.upgrade() {
Some(thread) => {
let (sender, receiver) = crossbeam_channel::bounded(1);
let this = self.clone();
let delivery = Delivery {
callback: Some(move || {
let actor = unsafe {
&mut *this.inner().actor.get()
};
sender.send(actor.data.handle_mut(msg)).unwrap_or(())
}),
receiver: self.receiver()
};
thread.query(delivery);
Future {
receiver: receiver
}
},
None => panic!("thread died")
}
}
}
impl<T: ?Sized> Remote<T> {
pub fn ptr(&self) -> *const T {
let actor = unsafe { &*self.inner().actor.get() };
&actor.data as *const T
}
pub fn ptr_mut(&self) -> *mut T {
let mut actor = unsafe { &mut *self.inner().actor.get() };
&mut actor.data as *mut T
}
pub fn is_initialized(&self) -> bool {
let actor = unsafe { &*self.inner().actor.get() };
actor.is_initialized()
}
pub fn get<D>(&self) -> Option<D> where T: Expose<D> {
if self.is_initialized() {
match (self.inner().thread.upgrade(), Thread::current()) {
(Some(thread), Some(current)) if Arc::ptr_eq(&thread, ¤t) => {
let actor = unsafe { &*self.inner().actor.get() };
Some(actor.data.get())
},
_ => None
}
} else {
None
}
}
pub fn receiver(&self) -> Receiver {
let actor = unsafe { &*self.inner().actor.get() };
Receiver {
owner: &actor.data as *const T as *const (),
inner: self.ptr.as_ptr() as *mut ()
}
}
pub fn downgrade(&self) -> WeakRemote<T> {
let mut cur = self.inner().weak.load(Relaxed);
loop {
if cur == std::usize::MAX {
cur = self.inner().weak.load(Relaxed);
continue;
}
match self.inner().weak.compare_exchange_weak(cur, cur + 1, Acquire, Relaxed) {
Ok(_) => {
debug_assert!(!is_dangling(self.ptr));
return WeakRemote { ptr: self.ptr };
}
Err(old) => cur = old,
}
}
}
#[inline]
pub(crate) fn inner(&self) -> &Inner<T> {
unsafe { self.ptr.as_ref() }
}
#[inline(never)]
unsafe fn drop_slow(&mut self) {
std::ptr::drop_in_place(&mut self.ptr.as_mut().actor);
if self.inner().weak.fetch_sub(1, Release) == 1 {
atomic::fence(Acquire);
std::alloc::Global.dealloc(self.ptr.cast(), Layout::for_value(self.ptr.as_ref()))
}
}
}
impl<T: 'static + ?Sized> Clone for Remote<T> {
#[inline]
fn clone(&self) -> Remote<T> {
let old_size = self.inner().strong.fetch_add(1, Relaxed);
if old_size > MAX_REFCOUNT {
unsafe {
std::process::abort();
}
}
Remote {
ptr: self.ptr
}
}
}
unsafe impl<#[may_dangle] T: ?Sized> Drop for Remote<T> {
#[inline]
fn drop(&mut self) {
if self.inner().strong.fetch_sub(1, Release) != 1 {
return;
}
atomic::fence(Acquire);
unsafe {
self.drop_slow();
}
}
}
impl<S: Signal, T: 'static + ?Sized + Handler<Subscription<S>>> Emitter<S> for Remote<T> {
fn subscribe(&self, actor: Remote<dyn 'static + Handler<S>>) {
self.send(Subscription::Const(actor));
}
fn subscribe_mut(&self, actor: Remote<dyn 'static + MutHandler<S>>) {
self.send(Subscription::Mut(actor));
}
}
impl<T: 'static> WeakRemote<T> {
pub fn try_send<S: Signal + 'static>(&self, msg: S) -> Option<Future<S::Response>> where T: Handler<S> {
match self.upgrade() {
Some(remote) => Some(remote.send(msg)),
None => None
}
}
pub fn try_send_mut<S: Signal + 'static>(&self, msg: S) -> Option<Future<S::Response>> where T: MutHandler<S> {
match self.upgrade() {
Some(remote) => Some(remote.send_mut(msg)),
None => None
}
}
}
impl<T: ?Sized> WeakRemote<T> {
pub fn new() -> WeakRemote<T> {
let dangling = std::usize::MAX;
let ptr = &dangling as *const usize as (*const *mut Inner<T>);
let dangling_ptr = unsafe { *ptr };
WeakRemote {
ptr: NonNull::new(dangling_ptr).expect("MAX is not 0"),
}
}
#[inline]
fn inner(&self) -> Option<&Inner<T>> {
if is_dangling(self.ptr) {
None
} else {
Some(unsafe { self.ptr.as_ref() })
}
}
pub fn upgrade(&self) -> Option<Remote<T>> {
let inner = self.inner()?;
let mut n = inner.strong.load(Relaxed);
loop {
if n == 0 {
return None;
}
if n > MAX_REFCOUNT {
unsafe {
std::process::abort();
}
}
match inner.strong.compare_exchange_weak(n, n + 1, Relaxed, Relaxed) {
Ok(_) => return Some(Remote {
ptr: self.ptr
}),
Err(old) => n = old,
}
}
}
}
impl<T: ?Sized> Clone for WeakRemote<T> {
#[inline]
fn clone(&self) -> WeakRemote<T> {
let inner = if let Some(inner) = self.inner() {
inner
} else {
return WeakRemote { ptr: self.ptr };
};
let old_size = inner.weak.fetch_add(1, Relaxed);
if old_size > MAX_REFCOUNT {
unsafe {
std::process::abort();
}
}
return WeakRemote { ptr: self.ptr };
}
}
impl<T> Default for WeakRemote<T> {
fn default() -> WeakRemote<T> {
WeakRemote::new()
}
}
impl<T: ?Sized> Drop for WeakRemote<T> {
fn drop(&mut self) {
let inner = if let Some(inner) = self.inner() {
inner
} else {
return
};
if inner.weak.fetch_sub(1, Release) == 1 {
atomic::fence(Acquire);
unsafe {
Global.dealloc(self.ptr.cast(), Layout::for_value(self.ptr.as_ref()))
}
}
}
}
pub(crate) struct Inner<T: ?Sized> {
strong: atomic::AtomicUsize,
weak: atomic::AtomicUsize,
thread: Weak<Thread>,
pub(crate) actor: UnsafeCell<Actor<T>>,
}
#[derive(Clone, Copy)]
pub struct Receiver {
owner: *const (),
inner: *mut ()
}
unsafe impl Sync for Receiver {}
unsafe impl Send for Receiver {}
impl Receiver {
pub(crate) unsafe fn upgrade<T>(&self, data: &T) -> Option<Remote<T>> {
if data as *const T as *const () == self.owner {
let inner = &mut *(self.inner as *mut Inner<T>);
let old_size = inner.strong.fetch_add(1, Relaxed);
if old_size > MAX_REFCOUNT {
unsafe {
std::process::abort();
}
}
Some(Remote {
ptr: NonNull::new(inner as *mut Inner<T>).unwrap()
})
} else {
None
}
}
}