use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver, Sender},
Arc, Mutex,
};
use std::thread;
#[macro_export]
macro_rules! do_m {
($this:expr) => {{
let mut _do_m = cor_newmutex!($this, (), ());
{
let mut do_m = _do_m.lock().unwrap();
do_m.set_async(false);
}
cor_start!(_do_m);
}};
}
#[macro_export]
macro_rules! do_m_pattern {
(
let $p:pat = $e:expr ; $( $t:tt )*
) => (
{ let $p = $e ; do_m_pattern! { $( $t )* } }
);
(
let $p:ident : $ty:ty = $e:expr ; $( $t:tt )*
) => (
{ let $p:$ty = $e ; do_m_pattern! { $( $t )* } }
);
(
let mut $p:ident : $ty:ty = $e:expr ; $( $t:tt )*
) => (
{ let mut $p:$ty = $e ; do_m_pattern! { $( $t )* } }
);
(
$p:ident = $e:expr ; $( $t:tt )*
) => (
{ $p = $e ; do_m_pattern! { $( $t )* } }
);
(
exec $b:expr ; $( $t:tt )*
) => (
{ $b ; do_m_pattern! { $( $t )* } }
);
(
ret $e:expr
) => (
$e
);
($p:ident $ty:ty, $val:expr, yield_from $cor:expr; $($t:tt)*) => ({
let $p: Arc<Mutex<Option<$ty>>> = Arc::new(Mutex::new(None::<$ty>));
let _p = $p.clone();
let mut _do_m = cor_newmutex!(
move |this| {
let mut p = _p.lock().unwrap();
*p = cor_yield_from!(this, $cor, $val);
},
(),
()
);
{
let mut do_m = _do_m.lock().unwrap();
do_m.set_async(false);
}
cor_start!(_do_m);
do_m_pattern! { $( $t )* }
});
}
#[macro_export]
macro_rules! cor_newmutex {
($func:expr, $RETURN:ty, $RECEIVE:ty) => {
<Cor<$RETURN, $RECEIVE>>::new_with_mutex($func)
};
}
#[macro_export]
macro_rules! cor_newmutex_and_start {
($func:expr, $RETURN:ty, $RECEIVE:ty) => {{
let new_one = <Cor<$RETURN, $RECEIVE>>::new_with_mutex($func);
cor_start!(new_one);
new_one
}};
}
#[macro_export]
macro_rules! cor_yield {
($this:expr, $given_to_outside:expr) => {
Cor::yield_ref($this, $given_to_outside)
};
}
#[macro_export]
macro_rules! cor_yield_from {
($this:expr, $target:expr, $sent_to_inside:expr) => {
Cor::yield_from($this.clone(), $target.clone(), $sent_to_inside)
};
}
#[macro_export]
macro_rules! cor_start {
($this:expr) => {
Cor::start($this.clone())
};
}
pub struct CorOp<RETURN: 'static, RECEIVE: 'static> {
pub result_ch_sender: Arc<Mutex<Sender<Option<RETURN>>>>,
pub val: Option<RECEIVE>,
}
impl<RETURN, RECEIVE> CorOp<RETURN, RECEIVE> {}
type CorEffect<RETURN, RECEIVE> =
dyn FnMut(Arc<Mutex<Cor<RETURN, RECEIVE>>>) + Send + Sync + 'static;
#[derive(Clone)]
pub struct Cor<RETURN: 'static, RECEIVE: 'static> {
is_async: bool,
started_alive: Arc<Mutex<(AtomicBool, AtomicBool)>>,
op_ch_sender: Arc<Mutex<Sender<CorOp<RETURN, RECEIVE>>>>,
op_ch_receiver: Arc<Mutex<Receiver<CorOp<RETURN, RECEIVE>>>>,
effect: Arc<Mutex<CorEffect<RETURN, RECEIVE>>>,
}
impl<RETURN: Send + Sync + 'static, RECEIVE: Send + Sync + 'static> Cor<RETURN, RECEIVE> {
pub fn new(
effect: impl FnMut(Arc<Mutex<Cor<RETURN, RECEIVE>>>) + Send + Sync + 'static,
) -> Cor<RETURN, RECEIVE> {
let (op_ch_sender, op_ch_receiver) = channel();
Cor {
is_async: true,
started_alive: Arc::new(Mutex::new((AtomicBool::new(false), AtomicBool::new(false)))),
op_ch_sender: Arc::new(Mutex::new(op_ch_sender)),
op_ch_receiver: Arc::new(Mutex::new(op_ch_receiver)),
effect: Arc::new(Mutex::new(effect)),
}
}
pub fn new_with_mutex(
effect: impl FnMut(Arc<Mutex<Cor<RETURN, RECEIVE>>>) + Send + Sync + 'static,
) -> Arc<Mutex<Cor<RETURN, RECEIVE>>> {
Arc::new(Mutex::new(<Cor<RETURN, RECEIVE>>::new(effect)))
}
pub fn yield_from<RETURNTARGET: Send + Sync + 'static, RECEIVETARGET: Send + Sync + 'static>(
this: Arc<Mutex<Cor<RETURN, RECEIVE>>>,
target: Arc<Mutex<Cor<RETURNTARGET, RECEIVETARGET>>>,
sent_to_inside: Option<RECEIVETARGET>,
) -> Option<RETURNTARGET> {
{
let me = this.lock().unwrap();
if !me.is_alive() {
return None;
}
}
{
let (result_ch_sender, result_ch_receiver) = channel();
let _result_ch_sender = Arc::new(Mutex::new(result_ch_sender));
let _result_ch_receiver = Arc::new(Mutex::new(result_ch_receiver));
{
target
.lock()
.unwrap()
.receive(_result_ch_sender.clone(), sent_to_inside);
}
let result;
{
let result_ch_receiver = _result_ch_receiver.lock().unwrap();
result = result_ch_receiver.recv();
}
{
drop(_result_ch_sender.lock().unwrap());
}
if let Ok(_x) = result {
return _x;
}
}
None
}
pub fn yield_none(this: Arc<Mutex<Cor<RETURN, RECEIVE>>>) -> Option<RECEIVE> {
Cor::yield_ref(this, None)
}
pub fn yield_ref(
this: Arc<Mutex<Cor<RETURN, RECEIVE>>>,
given_to_outside: Option<RETURN>,
) -> Option<RECEIVE> {
let _op_ch_receiver;
{
let me = this.lock().unwrap();
if !me.is_alive() {
return None;
}
_op_ch_receiver = me.op_ch_receiver.clone();
}
let op;
{
op = _op_ch_receiver.lock().unwrap().recv();
}
if let Ok(_x) = op {
{
let _result = _x.result_ch_sender.lock().unwrap().send(given_to_outside);
}
return _x.val;
}
None
}
pub fn start(this: Arc<Mutex<Cor<RETURN, RECEIVE>>>) {
let is_async;
{
let me = this.lock().unwrap();
is_async = me.is_async;
let started_alive = me.started_alive.lock().unwrap();
let &(ref started, ref alive) = &*started_alive;
if started.load(Ordering::SeqCst) {
return;
}
started.store(true, Ordering::SeqCst);
alive.store(true, Ordering::SeqCst);
}
{
let do_things = move || {
{
let mut _effect;
{
_effect = this.lock().unwrap().effect.clone();
}
(_effect.lock().unwrap())(this.clone());
}
{
this.lock().unwrap().stop();
}
};
if is_async {
thread::spawn(do_things);
} else {
do_things();
}
}
}
pub fn set_async(&mut self, is_async: bool) {
self.is_async = is_async;
}
pub fn is_started(&self) -> bool {
let started_alive = self.started_alive.lock().unwrap();
let &(ref started, _) = &*started_alive;
started.load(Ordering::SeqCst)
}
pub fn is_alive(&self) -> bool {
let started_alive = self.started_alive.lock().unwrap();
let &(_, ref alive) = &*started_alive;
alive.load(Ordering::SeqCst)
}
pub fn stop(&mut self) {
{
let started_alive = self.started_alive.lock().unwrap();
let &(ref started, ref alive) = &*started_alive;
if !started.load(Ordering::SeqCst) {
return;
}
if !alive.load(Ordering::SeqCst) {
return;
}
alive.store(false, Ordering::SeqCst);
{
drop(self.op_ch_sender.lock().unwrap());
}
}
}
fn receive(
&mut self,
result_ch_sender: Arc<Mutex<Sender<Option<RETURN>>>>,
given_as_request: Option<RECEIVE>,
) {
let started_alive = self.started_alive.lock().unwrap();
let &(_, ref alive) = &*started_alive;
if !alive.load(Ordering::SeqCst) {
return;
}
{
let op_ch_sender = self.op_ch_sender.lock().unwrap();
let _result = op_ch_sender.send(CorOp {
result_ch_sender,
val: given_as_request,
});
}
}
}
#[test]
fn test_cor_do_m() {
let v = Arc::new(Mutex::new(String::from("")));
let _v = v.clone();
do_m!(move |this| {
println!("test_cor_do_m started");
let cor_inner1 = cor_newmutex_and_start!(
|this| {
let s = cor_yield!(this, Some(String::from("1")));
println!("cor_inner1 {:?}", s);
},
String,
i16
);
let cor_inner2 = cor_newmutex_and_start!(
|this| {
let s = cor_yield!(this, Some(String::from("2")));
println!("cor_inner2 {:?}", s);
},
String,
i16
);
let cor_inner3 = cor_newmutex_and_start!(
|this| {
let s = cor_yield!(this, Some(String::from("3")));
println!("cor_inner3 {:?}", s);
},
String,
i16
);
{
(*_v.lock().unwrap()) = [
cor_yield_from!(this, cor_inner1, Some(1)).unwrap(),
cor_yield_from!(this, cor_inner2, Some(2)).unwrap(),
cor_yield_from!(this, cor_inner3, Some(3)).unwrap(),
]
.join("");
}
});
let _v = v.clone();
{
assert_eq!("123", *_v.lock().unwrap());
}
}
#[test]
fn test_cor_do_m_pattern() {
let _r = do_m_pattern! {
let mut _v4 = String::from("");
_v4 = String::from("4");
exec {
println!("do_m_pattern _v4:{:?}", _v4)
};
_v1 String, Some(1), yield_from cor_newmutex_and_start!(
|this| {
let s = cor_yield!(this, Some(String::from("1")));
println!("cor_inner1 {:?}", s);
},
String,
i16
);
_v2 String, Some(2), yield_from cor_newmutex_and_start!(
|this| {
let s = cor_yield!(this, Some(String::from("2")));
println!("cor_inner2 {:?}", s);
},
String,
i16
);
_v3 String, Some(3), yield_from cor_newmutex_and_start!(
|this| {
let s = cor_yield!(this, Some(String::from("3")));
println!("cor_inner3 {:?}", s);
},
String,
i16
);
let _v1 = _v1.lock().unwrap();
let _v2 = _v2.lock().unwrap();
let _v3 = _v3.lock().unwrap();
ret [
_v1.clone().unwrap(),
_v2.clone().unwrap(),
_v3.clone().unwrap(),
_v4,
].join("")
};
assert_eq!("1234", _r);
}
#[test]
fn test_cor_new() {
use std::time;
println!("test_cor_new");
let _cor1 = cor_newmutex!(
|this| {
println!("cor1 started");
let s = cor_yield!(this, Some(String::from("given_to_outside")));
println!("cor1 {:?}", s);
},
String,
i16
);
let cor1 = _cor1.clone();
let _cor2 = cor_newmutex!(
move |this| {
println!("cor2 started");
println!("cor2 yield_from before");
let s = cor_yield_from!(this, cor1, Some(3));
println!("cor2 {:?}", s);
},
i16,
i16
);
{
let cor1 = _cor1.clone();
cor1.lock().unwrap().set_async(true); }
{
let cor2 = _cor2.clone();
cor2.lock().unwrap().set_async(false);
}
cor_start!(_cor1);
cor_start!(_cor2);
thread::sleep(time::Duration::from_millis(1));
}