use std::sync::{Arc, Mutex};
#[cfg(feature = "for_futures")]
use super::common::shared_thread_pool;
#[cfg(feature = "for_futures")]
use crate::futures::task::SpawnExt;
#[cfg(feature = "for_futures")]
use std::error::Error;
use super::handler::Handler;
use super::sync::CountDownLatch;
use super::common::{RawFunc, Subscription, SubscriptionFunc};
#[derive(Clone)]
pub struct MonadIO<Y> {
effect: Arc<Mutex<dyn FnMut() -> Y + Send + Sync + 'static>>,
ob_handler: Option<Arc<Mutex<dyn Handler>>>,
sub_handler: Option<Arc<Mutex<dyn Handler>>>,
}
pub fn of<Z: 'static + Send + Sync + Clone>(r: Z) -> impl FnMut() -> Z + Send + Sync + 'static {
let _r = Box::new(r);
move || *_r.clone()
}
impl<Y: 'static + Send + Sync + Clone> From<Y> for MonadIO<Y> {
fn from(r: Y) -> Self {
MonadIO::just(r)
}
}
impl<Y: 'static + Send + Sync + Clone> MonadIO<Y> {
pub fn just(r: Y) -> MonadIO<Y> {
MonadIO::new(of(r))
}
#[cfg(feature = "for_futures")]
pub async fn to_future(&self) -> Result<Arc<Y>, Box<dyn Error>> {
let mio = self.clone();
let future = {
shared_thread_pool()
.inner
.lock()
.unwrap()
.spawn_with_handle(async move { mio.eval() })?
};
let result = future.await;
Ok(result)
}
}
impl<Y: 'static + Send + Sync> MonadIO<Y> {
pub fn new(effect: impl FnMut() -> Y + Send + Sync + 'static) -> MonadIO<Y> {
MonadIO::new_with_handlers(effect, None, None)
}
pub fn new_with_handlers(
effect: impl FnMut() -> Y + Send + Sync + 'static,
ob: Option<Arc<Mutex<dyn Handler + 'static>>>,
sub: Option<Arc<Mutex<dyn Handler + 'static>>>,
) -> MonadIO<Y> {
MonadIO {
effect: Arc::new(Mutex::new(effect)),
ob_handler: ob,
sub_handler: sub,
}
}
pub fn observe_on(&mut self, h: Option<Arc<Mutex<dyn Handler + 'static>>>) {
self.ob_handler = h;
}
pub fn subscribe_on(&mut self, h: Option<Arc<Mutex<dyn Handler + 'static>>>) {
self.sub_handler = h;
}
pub fn map<Z: 'static + Send + Sync + Clone>(
&self,
func: impl FnMut(Y) -> Z + Send + Sync + 'static,
) -> MonadIO<Z> {
let _func = Arc::new(Mutex::new(func));
let mut _effect = self.effect.clone();
MonadIO::new_with_handlers(
move || (_func.lock().unwrap())((_effect.lock().unwrap())()),
self.ob_handler.clone(),
self.sub_handler.clone(),
)
}
pub fn fmap<Z: 'static + Send + Sync + Clone>(
&self,
func: impl FnMut(Y) -> MonadIO<Z> + Send + Sync + 'static,
) -> MonadIO<Z> {
let mut _func = Arc::new(Mutex::new(func));
self.map(move |y: Y| ((_func.lock().unwrap())(y).effect.lock().unwrap())())
}
pub fn subscribe(&self, s: Arc<impl Subscription<Y>>) {
let mut _effect = self.effect.clone();
match &self.ob_handler {
Some(ob_handler) => {
let mut sub_handler_thread = Arc::new(self.sub_handler.clone());
ob_handler.lock().unwrap().post(RawFunc::new(move || {
match Arc::make_mut(&mut sub_handler_thread) {
Some(ref mut sub_handler) => {
let effect = _effect.clone();
let s = s.clone();
sub_handler.lock().unwrap().post(RawFunc::new(move || {
let result = { Arc::new(effect.lock().unwrap()()) };
s.on_next(result);
}));
}
None => {
s.on_next(Arc::new(_effect.lock().unwrap()()));
}
}
}));
}
None => {
s.on_next(Arc::new(_effect.lock().unwrap()()));
}
}
}
pub fn subscribe_fn(&self, func: impl FnMut(Arc<Y>) + Send + Sync + 'static) {
self.subscribe(Arc::new(SubscriptionFunc::new(func)))
}
pub fn eval(&self) -> Arc<Y> {
let latch = CountDownLatch::new(1);
let latch_thread = latch.clone();
let result = Arc::new(Mutex::new(None::<Arc<Y>>));
let result_thread = result.clone();
self.subscribe_fn(move |y| {
result_thread.lock().unwrap().replace(y);
latch_thread.countdown();
});
latch.wait();
let result = result.lock().as_mut().unwrap().to_owned();
result.unwrap()
}
}
#[cfg(feature = "for_futures")]
#[futures_test::test]
async fn test_monadio_async() {
assert_eq!(Arc::new(3), MonadIO::just(3).eval());
assert_eq!(
Arc::new(3),
MonadIO::just(3).to_future().await.ok().unwrap()
);
assert_eq!(
Arc::new(6),
MonadIO::just(3)
.map(|i| i * 2)
.to_future()
.await
.ok()
.unwrap()
);
}
#[test]
fn test_monadio_new() {
use super::common::SubscriptionFunc;
use super::handler::HandlerThread;
use std::sync::Arc;
use std::{thread, time};
use super::sync::CountDownLatch;
let monadio_simple = MonadIO::just(3);
{
assert_eq!(3, (monadio_simple.effect.lock().unwrap())());
}
let monadio_simple_map = monadio_simple.map(|x| x * 3);
monadio_simple_map.subscribe_fn(move |x| {
println!("monadio_simple_map {:?}", x);
assert_eq!(9, *Arc::make_mut(&mut x.clone()));
});
let mut _subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<u16>| {
println!("monadio_sync {:?}", x); assert_eq!(36, *Arc::make_mut(&mut x.clone()));
}));
let subscription = _subscription.clone();
let monadio_sync = MonadIO::just(1)
.fmap(|x| MonadIO::new(move || x * 4))
.map(|x| x * 3)
.map(|x| x * 3);
monadio_sync.subscribe(subscription);
let mut _handler_observe_on = HandlerThread::new_with_mutex();
let mut _handler_subscribe_on = HandlerThread::new_with_mutex();
let monadio_async = MonadIO::new_with_handlers(
|| {
println!("In string");
String::from("ok")
},
Some(_handler_observe_on.clone()),
Some(_handler_subscribe_on.clone()),
);
let latch = CountDownLatch::new(1);
let latch2 = latch.clone();
thread::sleep(time::Duration::from_millis(1));
let subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
println!("monadio_async {:?}", x);
latch2.countdown(); }));
monadio_async.subscribe(subscription);
monadio_async.subscribe(Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
println!("monadio_async sub2 {:?}", x); })));
{
let mut handler_observe_on = _handler_observe_on.lock().unwrap();
let mut handler_subscribe_on = _handler_subscribe_on.lock().unwrap();
println!("hh2");
handler_observe_on.start();
handler_subscribe_on.start();
println!("hh2 running");
handler_observe_on.post(RawFunc::new(move || {}));
handler_observe_on.post(RawFunc::new(move || {}));
handler_observe_on.post(RawFunc::new(move || {}));
handler_observe_on.post(RawFunc::new(move || {}));
handler_observe_on.post(RawFunc::new(move || {}));
}
thread::sleep(time::Duration::from_millis(1));
latch.clone().wait();
}