use std::{
panic,
rc::Rc,
any::Any,
cell::UnsafeCell,
mem::MaybeUninit,
collections::HashMap,
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
};
use omango_util::lock::Spinlock;
use crate::{
wg::WaitGroup,
error::WrapError,
};
pub type Fn<T> = fn() -> Result<T, WrapError>;
macro_rules! get_result {
($call:expr) => {
unsafe { (*$call.result.get()).assume_init_ref().clone() }
};
}
macro_rules! set_result {
($call:expr, $result:expr) => {
unsafe { $call.result.get().write(MaybeUninit::new(Arc::new($result))) }
};
}
struct Call<T: Any> {
wg: WaitGroup,
count: AtomicU32,
result: UnsafeCell<MaybeUninit<Arc<Result<T, WrapError>>>>,
}
impl<T: Any> Default for Call<T> {
#[inline(always)]
fn default() -> Self {
Self {
wg: WaitGroup::default(),
count: AtomicU32::new(1),
result: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}
pub struct Group {
guard: Spinlock<HashMap<String, Rc<dyn Any>>>,
}
impl Default for Group {
#[inline(always)]
fn default() -> Self {
Self {
guard: Spinlock::new(HashMap::default()),
}
}
}
impl Group {
pub fn exec<T: Any>(&self, key: &str, func: Fn<T>) -> (Arc<Result<T, WrapError>>, u32) {
match self.get(key) {
Some(any) => {
let call = any.downcast_ref::<Call<T>>().unwrap();
call.count.fetch_add(1, Ordering::Relaxed);
call.wg.wait();
(get_result!(call), call.count.load(Ordering::Relaxed))
}
None => {
let oc = Rc::<Call<T>>::default();
let call = oc.clone();
oc.wg.add(1);
self.guard.lock().insert(key.to_string(), oc);
let result = panic::catch_unwind(|| {
func()
});
let out = match result {
Ok(result) => {
set_result!(call, result);
(get_result!(call), call.count.load(Ordering::Relaxed))
}
Err(_) => {
set_result!(call, Err(WrapError("function of user panic".to_string())));
(get_result!(call), call.count.load(Ordering::Relaxed))
}
};
call.wg.done();
out
}
}
}
#[inline(always)]
pub fn forgot(&self, key: &str) -> bool {
self.guard.lock().remove(key).is_some()
}
#[allow(clippy::map_clone)]
#[inline(always)]
fn get(&self, key: &str) -> Option<Rc<dyn Any>> {
self.guard.lock().get(key).map(|v| v.clone())
}
}
unsafe impl Send for Group {}
unsafe impl Sync for Group {}
mod test {
#[test]
fn test() {
let g = std::sync::Arc::new(crate::single::flight::Group::default());
let g_clone = g.clone();
let thread = std::thread::spawn(move || {
let (rs, _) = g.exec("google", move || {
std::thread::sleep(std::time::Duration::from_secs(1));
Ok(1i32)
});
match rs.as_ref() {
Ok(v) => assert_eq!(v, &1i32),
Err(_) => panic!("should be success"),
}
});
let (rs, times) = g_clone.exec("google", move || {
std::thread::sleep(std::time::Duration::from_secs(1));
Ok(1i32)
});
thread.join().unwrap();
match rs.as_ref() {
Ok(v) => assert_eq!(v, &1i32),
Err(_) => panic!("should be success"),
}
assert_eq!(times, 2);
g_clone.forgot("google");
}
}