#![deny(missing_docs)]
use std::cell::UnsafeCell;
use std::panic::{self, RefUnwindSafe};
use std::sync::{Arc, Weak};
use may::go;
use may::sync::{mpsc, AtomicOption, Blocker};
#[repr(C)]
#[derive(Debug)]
struct ActorImpl<T> {
data: UnsafeCell<T>,
tx: mpsc::Sender<Box<dyn FnOnce() + Send>>,
}
unsafe impl<T: Send> Sync for ActorImpl<T> {}
impl<T> RefUnwindSafe for ActorImpl<T> {}
impl<T> ActorImpl<T> {
fn new(data: T) -> Self {
let (tx, rx) = mpsc::channel::<Box<dyn FnOnce() + Send>>();
go!(move || for f in rx.into_iter() {
panic::catch_unwind(panic::AssertUnwindSafe(move || {
f();
}))
.ok();
});
ActorImpl {
data: UnsafeCell::new(data),
tx,
}
}
#[allow(clippy::mut_from_ref)]
unsafe fn get_mut(&self) -> &mut T {
&mut *self.data.get()
}
}
#[derive(Debug)]
pub struct Actor<T> {
inner: Arc<ActorImpl<T>>,
}
unsafe impl<T> Send for Actor<T> {}
impl<T> Clone for Actor<T> {
fn clone(&self) -> Self {
Actor {
inner: self.inner.clone(),
}
}
}
impl<T> Actor<T> {
pub fn new(actor: T) -> Self {
Actor {
inner: Arc::new(ActorImpl::new(actor)),
}
}
pub fn drive_new<F>(data: T, f: F) -> Self
where
F: FnOnce(DriverActor<T>) + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = mpsc::channel::<Box<dyn FnOnce() + Send>>();
let actor = Actor {
inner: Arc::new(ActorImpl {
data: UnsafeCell::new(data),
tx,
}),
};
let driver_para = Arc::downgrade(&actor.inner);
let driver = go!(|| f(DriverActor { inner: driver_para }));
go!(move || {
for f in rx.into_iter() {
panic::catch_unwind(panic::AssertUnwindSafe(move || {
f();
}))
.ok();
}
unsafe { driver.coroutine().cancel() };
driver.join().ok();
});
actor
}
pub unsafe fn from(inner: &T) -> Self {
let m: *const ActorImpl<T> = (inner as *const _ as usize) as *const _;
let arc = Arc::from_raw(m);
let ret = Actor { inner: arc.clone() };
std::mem::forget(arc);
ret
}
pub fn call<F>(&self, f: F)
where
F: FnOnce(&mut T) + Send + 'static,
T: Send + 'static,
{
let actor = self.inner.clone();
let f = move || {
let data = unsafe { actor.get_mut() };
f(data);
};
self.inner.tx.send(Box::new(f)).unwrap();
}
pub fn with<R, F>(&self, f: F) -> R
where
F: FnOnce(&mut T) -> R + Send,
T: Send,
R: Send,
{
let blocker = Blocker::current();
let ret = Arc::new(AtomicOption::none());
let err = Arc::new(AtomicOption::none());
{
let ret = ret.clone();
let err = err.clone();
let blocker = blocker.clone();
let actor = self.inner.clone();
let f = move || {
let data = unsafe { actor.get_mut() };
let exit = panic::catch_unwind(panic::AssertUnwindSafe(|| f(data)));
match exit {
Ok(r) => {
ret.store(Box::new(r));
}
Err(e) => {
err.store(Box::new(e));
}
}
blocker.unpark();
};
let closure: Box<dyn FnOnce() + Send> = Box::new(f);
let closure: Box<dyn FnOnce() + Send + 'static> =
unsafe { std::mem::transmute(closure) };
self.inner.tx.send(closure).unwrap();
}
match blocker.park(None) {
Ok(_) => match ret.take() {
Some(v) => *v,
None => match err.take() {
Some(panic) => panic::resume_unwind(panic),
None => unreachable!("failed to get result"),
},
},
Err(_) => may::coroutine::trigger_cancel_panic(),
}
}
pub fn key(&self) -> usize {
let inner = self.inner.clone();
let addr = Arc::into_raw(inner);
let key = addr as usize;
unsafe { Arc::from_raw(addr) };
key
}
}
#[derive(Debug)]
pub struct DriverActor<T> {
inner: Weak<ActorImpl<T>>,
}
impl<T> DriverActor<T> {
pub fn call<F>(&self, f: F)
where
F: FnOnce(&mut T) + Send + 'static,
T: Send + 'static,
{
let actor = match self.inner.upgrade() {
None => may::coroutine::trigger_cancel_panic(),
Some(inner) => Actor { inner },
};
actor.call(f);
}
pub fn with<R, F>(&self, f: F) -> R
where
F: FnOnce(&mut T) -> R + Send,
T: Send,
R: Send,
{
let actor = match self.inner.upgrade() {
None => may::coroutine::trigger_cancel_panic(),
Some(inner) => Actor { inner },
};
actor.with(f)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let i = 0u32;
let a = Actor::new(i);
a.call(|me| *me += 2);
a.call(|_me| panic!("support panic inside"));
a.call(|me| *me += 4);
a.with(|me| assert_eq!(*me, 6));
}
#[test]
fn ping_pong() {
struct Ping {
count: u32,
tx: mpsc::Sender<()>,
}
struct Pong {
count: u32,
}
impl Ping {
fn ping(&mut self, to: Actor<Pong>) {
if self.count > 10 {
self.tx.send(()).unwrap();
return;
}
println!("ping called, count={}", self.count);
self.count += 1;
let ping = unsafe { Actor::from(self) };
to.call(move |pong| pong.pong(ping));
}
}
impl Pong {
fn pong(&mut self, to: Actor<Ping>) {
println!("pong called, count={}", self.count);
self.count += 1;
let pong = unsafe { Actor::from(self) };
to.call(|ping| ping.ping(pong))
}
}
let (tx, rx) = mpsc::channel();
let ping = Actor::new(Ping { count: 0, tx });
let pong = Actor::new(Pong { count: 0 });
{
let pong = pong.clone();
ping.call(|me| me.ping(pong));
}
rx.recv().unwrap();
ping.with(|me| assert_eq!(me.count, 11));
pong.with(|me| assert_eq!(me.count, 11));
}
#[test]
fn driver_actor() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
#[derive(Debug)]
struct DropFlag {
flag: Arc<AtomicBool>,
}
impl Drop for DropFlag {
fn drop(&mut self) {
self.flag.store(true, Ordering::Relaxed);
}
}
let flag = Arc::new(AtomicBool::new(false));
let drop_flag = DropFlag { flag: flag.clone() };
let actor = Actor::drive_new(0, move |me| loop {
me.call(|v| {
*v += 1;
println!("new_value = {}", *v)
});
assert!(!drop_flag.flag.load(Ordering::Relaxed));
may::coroutine::sleep(Duration::from_secs(1));
});
may::coroutine::sleep(Duration::from_secs(3));
drop(actor);
may::coroutine::sleep(Duration::from_millis(100));
assert!(flag.load(Ordering::Relaxed));
}
#[test]
fn with_test() {
let mut i = 100;
let a = Actor::new(0);
let v = a.with(|me| {
*me += 2;
i += *me;
*me
});
assert_eq!(100 + v, i);
}
#[test]
#[should_panic]
fn with_panic() {
let a = Actor::new(0);
a.with(|_| panic!("panic inside"));
}
#[test]
fn test_key() {
let a = Actor::new(0);
let b = a.clone();
assert_eq!(a.key(), b.key());
}
}