use futures::task::{Context, Poll};
use std::pin::Pin;
#[cfg(not(feature="crossbeam"))]
pub fn channel<T>() -> (std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>) {
std::sync::mpsc::channel()
}
#[cfg(feature="crossbeam")]
pub fn channel<T>() -> (crossbeam_channel::Sender<T>, crossbeam_channel::Receiver<T>) {
crossbeam_channel::unbounded()
}
#[cfg(not(feature="crossbeam"))]
type Sender<T> = std::sync::mpsc::Sender<T>;
#[cfg(feature="crossbeam")]
type Sender<T> = crossbeam_channel::Sender<T>;
#[macro_export]
macro_rules! once {
($func_name: ident($($params: expr),*, ->($($c_params: ident),*))) => {
(|| {
let (sender, receiver) = $crate::channel();
$func_name($($params),*, move |$($c_params),*| {sender.send(($($c_params),*)).unwrap()});
return async move {
receiver.recv().unwrap()
}
})()
};
($func_name: ident(->($($c_params: ident),*), $($params: expr),*)) => {
(|| {
let (sender, receiver) = $crate::channel();
$func_name(move |$($c_params),*| {sender.send(($($c_params),*)).unwrap()}, $($params),*);
return async move {
receiver.recv().unwrap()
}
})()
};
($func_name: ident($($params: expr),+, ->($($c_params: ident),*), $($more_params: expr),+)) => {
(|| {
let (sender, receiver) = $crate::channel();
$func_name($($params),*, move |$($c_params),*| {sender.send(($($c_params),*)).unwrap()}, $($more_params),*);
return async move {
receiver.recv().unwrap()
}
})()
};
}
#[macro_export]
macro_rules! once_blocked {
($func_name: ident($($params: expr),*, ->($($c_params: ident),*)->$c_ret: expr)) => {
(|| {
let (sender, receiver) = $crate::channel();
let (ret_sender, ret_receiver) = $crate::channel();
let default_ret_sender = ret_sender.clone();
let thread_handle = std::thread::spawn(move || {
$func_name($($params),*, move |$($c_params),*| {
sender.send(($($c_params),*)).unwrap();
ret_receiver.recv().unwrap()
})
});
return async move {
let ($($c_params),*) = receiver.recv().unwrap();
return $crate::CBBlockResult::new(
($($c_params),*),
move |val| {ret_sender.send(val).unwrap()},
move || {default_ret_sender.send($c_ret).unwrap();},
Some(thread_handle)
)
};
})()
};
($func_name: ident(->($($c_params: ident),*)->$c_ret: expr, $($params: expr),*)) => {
(|| {
let (sender, receiver) = $crate::channel();
let (ret_sender, ret_receiver) = $crate::channel();
let default_ret_sender = ret_sender.clone();
let thread_handle = std::thread::spawn(move || {
$func_name(move |$($c_params),*| {
sender.send(($($c_params),*)).unwrap();
ret_receiver.recv().unwrap()
}, $($params),*,)
});
return async move {
let ($($c_params),*) = receiver.recv().unwrap();
return $crate::CBBlockResult::new(
($($c_params),*),
move |val| {ret_sender.send(val).unwrap()},
move || {default_ret_sender.send($c_ret).unwrap();},
Some(thread_handle)
)
};
})()
};
($func_name: ident($($params: expr),+, ->($($c_params: ident),*)->$c_ret: expr, $($more_params: expr),+)) => {
(|| {
let (sender, receiver) = $crate::channel();
let (ret_sender, ret_receiver) = $crate::channel();
let default_ret_sender = ret_sender.clone();
let thread_handle = std::thread::spawn(move || {
$func_name($($params),*, move |$($c_params),*| {
sender.send(($($c_params),*)).unwrap();
ret_receiver.recv().unwrap()
}, $($more_params),*,)
});
return async move {
let ($($c_params),*) = receiver.recv().unwrap();
return $crate::CBBlockResult::new(
($($c_params),*),
move |val| {ret_sender.send(val).unwrap()},
move || {default_ret_sender.send($c_ret).unwrap();},
Some(thread_handle)
)
};
})()
};
}
#[macro_export]
macro_rules! stream {
($func_name: ident($($params: expr),*, ->($($c_params: ident),*))) => {
(|| {
let (sender, receiver) = futures::channel::mpsc::unbounded();
$func_name($($params),*, move |$($c_params),*| {sender.unbounded_send(($($c_params),*)).unwrap()});
$crate::CBStream::new(receiver)
})()
};
($func_name: ident(->($($c_params: ident),*), $($params: expr),*)) => {
(|| {
let (sender, receiver) = futures::channel::mpsc::unbounded();
$func_name(move |$($c_params),*| {sender.unbounded_send(($($c_params),*)).unwrap()}, $($params),*);
$crate::CBStream::new(receiver)
})()
};
($func_name: ident($($params: expr),+, ->($($c_params: ident),*), $($more_params: expr),+)) => {
(|| {
let (sender, receiver) = futures::channel::mpsc::unbounded();
$func_name($($params),*, move |$($c_params),*| {sender.unbounded_send(($($c_params),*)).unwrap()}, $($more_params),*);
$crate::CBStream::new(receiver)
})()
};
}
#[macro_export]
macro_rules! stream_blocked {
($func_name: ident($($params: expr),*, ->($($c_params: ident),*)->$c_ret: expr)) => {
(|| {
let (sender, receiver) = futures::channel::mpsc::unbounded();
let (ret_sender, ret_receiver) = $crate::channel();
std::thread::spawn(move || {
$func_name(
$($params),*,
move |$($c_params),*| {
sender.unbounded_send(($($c_params),*)).unwrap();
let val = ret_receiver.recv().unwrap();
val
}
)
});
Box::new($crate::CBStreamBlocked::new(ret_sender, receiver, $c_ret))
})()
};
($func_name: ident(->($($c_params: ident),*)->$c_ret: expr, $($params: expr),*)) => {
(|| {
let (sender, receiver) = futures::channel::mpsc::unbounded();
let (ret_sender, ret_receiver) = $crate::channel();
std::thread::spawn(move || {
$func_name(
move |$($c_params),*| {
sender.unbounded_send(($($c_params),*)).unwrap();
let val = ret_receiver.recv().unwrap();
val
},
$($params),*
)
});
Box::new($crate::CBStreamBlocked::new(ret_sender, receiver, $c_ret))
})()
};
($func_name: ident($($params: expr),+, ->($($c_params: ident),*)->$c_ret: expr, $($more_params: expr),+)) => {
(|| {
let (sender, receiver) = futures::channel::mpsc::unbounded();
let (ret_sender, ret_receiver) = $crate::channel();
std::thread::spawn(move || {
$func_name(
$($params),*,
move |$($c_params),*| {
sender.unbounded_send(($($c_params),*)).unwrap();
let val = ret_receiver.recv().unwrap();
val
},
$($more_params),*
)
});
Box::new($crate::CBStreamBlocked::new(ret_sender, receiver, $c_ret))
})()
};
}
pub struct CBStream<T> {
data_receiver: futures::channel::mpsc::UnboundedReceiver<T>,
waker: Option<futures::task::Waker>
}
impl<T> CBStream<T> {
pub fn new(reciever: futures::channel::mpsc::UnboundedReceiver<T>) -> CBStream<T> {
CBStream {
data_receiver: reciever,
waker: None
}
}
}
impl<T> futures::Stream for CBStream<T> {
type Item=T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let data_receiver = &mut self.data_receiver;
futures::pin_mut!(data_receiver);
match data_receiver.poll_next(cx) {
Poll::Ready(v) => {
if let Some(v) = v {
Poll::Ready(Some(v))
} else {
Poll::Ready(None)
}
},
Poll::Pending => {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
pub struct CBStreamBlocked<R, T> where R: 'static + Clone {
data_receiver: futures::channel::mpsc::UnboundedReceiver<T>,
ret_sender: crate::Sender<R>,
default_return_value: R,
waker: Option<futures::task::Waker>
}
impl<R, T> CBStreamBlocked<R, T> where R: 'static + Clone {
pub fn new(return_sender: crate::Sender<R>, reciever: futures::channel::mpsc::UnboundedReceiver<T>, default_ret_val: R) -> CBStreamBlocked<R, T> where R: 'static + Clone {
CBStreamBlocked {
data_receiver: reciever,
default_return_value: default_ret_val,
ret_sender: return_sender,
waker: None
}
}
}
impl<R, T> futures::Stream for Box<CBStreamBlocked<R, T>> where R: 'static + Clone {
type Item=CBBlockResult<R, T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let data_receiver = &mut self.data_receiver;
futures::pin_mut!(data_receiver);
match data_receiver.poll_next(cx) {
Poll::Ready(v) => {
let ret_sender = self.ret_sender.clone();
let default_sender = ret_sender.clone();
let default_value = self.default_return_value.clone();
if let Some(v) = v {
Poll::Ready(Some(
CBBlockResult::new(
v,
move |val| {ret_sender.send(val).unwrap()},
move || default_sender.send(default_value).unwrap(),
None
)
))
} else {
Poll::Ready(None)
}
}, Poll::Pending => {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
#[derive(Debug, PartialEq)]
pub struct AlreadyReturnError;
impl std::fmt::Display for AlreadyReturnError {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(fmt, "The caller has already return a value. It shall not return other value.")
}
}
pub struct CBBlockResult<R, T> {
result: T,
return_fn: Option<Box<dyn FnOnce(R)>>,
default_return: Option<Box<dyn FnOnce()>>,
func_handle: Option<std::thread::JoinHandle<()>>
}
impl<R, T> core::ops::Deref for CBBlockResult<R, T> {
type Target=T;
fn deref(&self) -> &T {
&self.result
}
}
impl<R, T> CBBlockResult<R, T> where R: 'static {
pub fn new<F, FR>(result: T, caller_return_fn: FR, default_return: F, handle: Option<std::thread::JoinHandle<()>>) -> CBBlockResult<R, T> where F: 'static + FnOnce(), FR: 'static + FnOnce(R) {
CBBlockResult {
result,
return_fn: Some(Box::new(caller_return_fn)),
default_return: Some(Box::new(default_return)),
func_handle: handle
}
}
pub fn return_value(&mut self, value: R) -> Result<(), AlreadyReturnError> {
if self.return_fn.is_some() && self.default_return.is_some() {
self.default_return.take();
let ret_fn = self.return_fn.take().unwrap();
(ret_fn)(value);
Ok(())
} else {
Err(AlreadyReturnError)
}
}
}
impl<R, T> Drop for CBBlockResult<R, T> {
fn drop(&mut self) {
if self.default_return.is_some() {
let default_return = self.default_return.take().unwrap();
(default_return)();
}
if self.func_handle.is_some() {
let func_handle = self.func_handle.take().unwrap();
func_handle.join().unwrap();
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_once_postfix() {
fn func(v: i32, cb: impl FnOnce(i32, i32)) {
std::thread::sleep(std::time::Duration::from_secs(2));
cb(v, v * 2)
}
let (a, b) = futures::executor::block_on(once!(func(2 + 3, ->(a, b))));
assert_eq!(5, a);
assert_eq!(10, b);
}
#[test]
fn test_once_prefix() {
fn func(cb: impl FnOnce(i32, i32), v: i32) {
std::thread::sleep(std::time::Duration::from_secs(2));
cb(v, v * 2)
}
let (a, b) = futures::executor::block_on(once!(func(->(a, b), 2 + 3)));
assert_eq!(5, a);
assert_eq!(10, b);
}
#[test]
fn test_once_infix() {
fn func(u: i32, cb: impl FnOnce(i32, i32), v: i32) {
std::thread::sleep(std::time::Duration::from_secs(2));
cb(u, v)
}
let (a, b) = futures::executor::block_on(once!(func(1, ->(a, b), 2 + 3)));
assert_eq!(1, a);
assert_eq!(5, b);
}
#[test]
fn test_once_postfix_no_args() {
fn func(_v: i32, cb: impl FnOnce()) {
std::thread::sleep(std::time::Duration::from_secs(2));
cb()
}
futures::executor::block_on(once!(func(2 + 3, -> ())));
}
#[test]
fn test_once_blocked_postfix() {
fn func(v: i32, cb: impl FnOnce(i32, i32) -> i32) {
if cb(v, v * 2) == 0i32 {
dbg!("Ok !");
} else {
panic!("Something wrong")
}
}
let mut ret = futures::executor::block_on(once_blocked!(func(2 + 3, ->(a, b) -> 0i32)));
let (a, b) = *ret;
assert_eq!(5, a);
assert_eq!(10, b);
ret.return_value(0).unwrap();
}
#[test]
fn test_once_blocked_default_postfix() {
fn func(v: i32, cb: impl FnOnce(i32, i32) -> i32) {
if cb(v, v * 2) == 0i32 {
dbg!("Ok !");
} else {
dbg!("Default shutdown..");
}
}
let ret = futures::executor::block_on(once_blocked!(func(2 + 3, ->(a, b) -> 1i32)));
let (a, b) = *ret;
assert_eq!(5, a);
assert_eq!(10, b);
}
#[test]
fn test_once_blocked_default_postfix_no_args() {
fn func(_v: i32, cb: impl FnOnce() -> i32) {
if cb() == 3i32 {
dbg!("Ok !");
} else {
panic!("Invalid return value")
}
}
futures::executor::block_on(once_blocked!(func(2 + 3, ->() -> {3i32})));
}
#[test]
fn test_once_blocked_postfix_with_logic() {
fn func(v: i32, cb: impl FnOnce(i32, i32) -> i32) {
if cb(v, v * 2) == 0i32 {
dbg!("Ok !");
} else {
panic!("Something wrong")
}
}
let mut ret = futures::executor::block_on(once_blocked!(func(2 + 3, ->(a, b) -> 0i32)));
let (a, b) = *ret;
assert_eq!(5, a);
assert_eq!(10, b);
if a + b == 15 && a * b == 50 {
ret.return_value(0).unwrap();
}
assert_eq!(ret.return_value(0).unwrap_err(), super::AlreadyReturnError);
}
#[test]
fn test_once_blocked_prefix() {
fn func(cb: impl FnOnce(i32, i32) -> i32, v: i32) {
if cb(v, v * 2) == 0i32 {
dbg!("Ok !");
} else {
panic!("Something wrong")
}
}
let mut ret = futures::executor::block_on(once_blocked!(func(->(a, b) -> 0i32, 2 + 3)));
let (a, b) = *ret;
assert_eq!(5, a);
assert_eq!(10, b);
ret.return_value(0).unwrap();
}
#[test]
fn test_once_blocked_default_prefix() {
fn func(cb: impl FnOnce(i32, i32) -> i32, v: i32) {
if cb(v, v * 2) == 0i32 {
dbg!("Ok !");
} else {
dbg!("Default shutdown..");
}
}
let ret = futures::executor::block_on(once_blocked!(func(->(a, b) -> 1i32, 2 + 3)));
let (a, b) = *ret;
assert_eq!(5, a);
assert_eq!(10, b);
}
#[test]
fn test_once_blocked_default_prefix_no_args() {
fn func(cb: impl FnOnce() -> i32, _v: i32) {
if cb() == 3i32 {
dbg!("Ok !");
} else {
panic!("Invalid return value")
}
}
futures::executor::block_on(once_blocked!(func(->() -> {3i32}, 2 + 3)));
}
#[test]
fn test_once_blocked_infix() {
fn func(u: i32, cb: impl FnOnce(i32, i32) -> i32, v: i32) {
if cb(u + v, u * v) == 0i32 {
dbg!("Ok !");
} else {
panic!("Something wrong")
}
}
let mut ret = futures::executor::block_on(once_blocked!(func(2i32, ->(a, b) -> 0i32, 2 + 3)));
let (a, b) = *ret;
assert_eq!(7, a);
assert_eq!(10, b);
ret.return_value(0).unwrap();
}
#[test]
fn test_once_blocked_default_infix() {
fn func(u: i32, cb: impl FnOnce(i32, i32) -> i32, v: i32) {
if cb(u + v, u * v) == 0i32 {
dbg!("Ok !");
} else {
panic!("Something wrong")
}
}
let ret = futures::executor::block_on(once_blocked!(func(2i32, ->(a, b) -> 0i32, 2 + 3)));
let (a, b) = *ret;
assert_eq!(7, a);
assert_eq!(10, b);
}
#[test]
fn test_once_blocked_default_infix_no_args() {
fn func(u: i32, cb: impl FnOnce(i32, i32) -> i32, v: i32) {
if cb(u + v, u * v) == 0i32 {
dbg!("Ok !");
} else {
panic!("Something wrong")
}
}
futures::executor::block_on(once_blocked!(func(2i32, ->(a, b) -> 0i32, 2 + 3)));
}
#[test]
fn test_stream_postfix() {
use futures::stream::StreamExt;
fn func(v: i32, mut cb: impl FnMut(i32, i32)) {
for i in 0..5 {
cb(v, v * i)
}
}
let mut counter = 0;
futures::executor::block_on(stream!(func(2 + 3, ->(a, b))).enumerate().for_each(|(i, fut)| {
counter += 1;
async move {
let (a, b) = fut;
assert_eq!(5, a);
assert_eq!(5 * i as i32, b);
}
}));
assert_eq!(5, counter);
}
#[test]
fn test_stream_prefix() {
use futures::stream::StreamExt;
fn func(mut cb: impl FnMut(i32, i32), v: i32) {
for i in 0..5 {
cb(v, v * i)
}
}
let mut counter = 0;
futures::executor::block_on(stream!(func(->(a, b), 2 + 3)).enumerate().for_each(|(i, fut)| {
counter += 1;
async move {
let (a, b) = fut;
assert_eq!(5, a);
assert_eq!(5 * i as i32, b);
}
}));
assert_eq!(5, counter);
}
#[test]
fn test_stream_infix() {
use futures::stream::StreamExt;
fn func(u: i32, mut cb: impl FnMut(i32, i32), v: i32) {
for i in 0..5 {
cb(u + i, v * i)
}
}
let mut counter = 0;
futures::executor::block_on(stream!(func(2 * 3, ->(a, b), 2 + 3)).enumerate().for_each(|(i, fut)| {
counter += 1;
async move {
let (a, b) = fut;
assert_eq!(2 * 3 + i as i32, a);
assert_eq!((2 + 3) * i as i32, b);
}
}));
assert_eq!(5, counter);
}
#[test]
fn test_stream_prefix_no_args() {
use futures::stream::StreamExt;
fn func(mut cb: impl FnMut(), _v: i32) {
for _ in 0..5 {
cb()
}
}
let mut counter = 0;
futures::executor::block_on(stream!(func(->(), 2 + 3)).for_each(|_| {
counter += 1;
async {}
}));
assert_eq!(5, counter);
}
#[test]
fn test_stream_blocked_postfix() {
use futures::stream::StreamExt;
fn func(v: i32, mut cb: impl FnMut(i32, i32)->i32) {
for i in 0..5 {
cb(v, v * i);
}
}
let mut counter = 0;
futures::executor::block_on(stream_blocked!(func(2 + 3, ->(a, b)->0i32)).enumerate().for_each(|(i, fut)| {
counter += 1;
async move {
let mut result = fut;
result.return_value(i as i32).unwrap();
let (a, b) = *result;
assert_eq!(5, a);
assert_eq!(5 * i as i32, b);
}
}));
assert_eq!(5, counter);
}
#[test]
fn test_stream_blocked_prefix() {
use futures::stream::StreamExt;
fn func(mut cb: impl FnMut(i32, i32)->i32, v: i32) {
for i in 0..5 {
cb(v, v * i);
}
}
let mut counter = 0;
futures::executor::block_on(stream_blocked!(func(->(a, b)->0i32, 2 + 3)).enumerate().for_each(|(i, fut)| {
counter += 1;
async move {
let mut result = fut;
result.return_value(i as i32).unwrap();
let (a, b) = *result;
assert_eq!(5, a);
assert_eq!(5 * i as i32, b);
}
}));
assert_eq!(5, counter);
}
#[test]
fn test_stream_blocked_infix() {
use futures::stream::StreamExt;
fn func(u: i32, mut cb: impl FnMut(i32, i32)->i32, v: i32) {
let mut j = 0;
for _ in 0..5 {
j = cb(u + j, v * j);
}
}
let mut counter = 0;
futures::executor::block_on(stream_blocked!(func(2, ->(a, b)->0i32, 2 + 3)).enumerate().for_each(|(i, fut)| {
counter += 1;
async move {
let mut result = fut;
result.return_value(i as i32 + 1i32).unwrap();
let (a, b) = *result;
assert_eq!(i as i32 + 2i32, a);
assert_eq!((3i32 + 2i32) * i as i32, b);
}
}));
assert_eq!(5, counter);
}
}