use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::singleflight::Group;
#[tokio::test]
async fn test_simple() {
let group = Group::new();
let res = group.work("key", || async { "val" }).await;
assert_eq!(res, "val");
}
#[tokio::test]
async fn test_coalescing() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let group = group.clone();
let counter = counter.clone();
handles.push(tokio::spawn(async move {
group
.work("key", || async move {
tokio::time::sleep(Duration::from_millis(100)).await;
counter.fetch_add(1, Ordering::SeqCst);
"val"
})
.await
}));
}
for handle in handles {
assert_eq!(handle.await.unwrap(), "val");
}
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_multiple_keys() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));
let g1 = group.clone();
let c1 = counter.clone();
let h1 = tokio::spawn(async move {
g1.work("key1", || async move {
tokio::time::sleep(Duration::from_millis(50)).await;
c1.fetch_add(1, Ordering::SeqCst);
"val1"
})
.await
});
let g2 = group.clone();
let c2 = counter.clone();
let h2 = tokio::spawn(async move {
g2.work("key2", || async move {
tokio::time::sleep(Duration::from_millis(50)).await;
c2.fetch_add(1, Ordering::SeqCst);
"val2"
})
.await
});
assert_eq!(h1.await.unwrap(), "val1");
assert_eq!(h2.await.unwrap(), "val2");
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_forget() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));
let g1 = group.clone();
let c1 = counter.clone();
let h1 = tokio::spawn(async move {
g1.work("key", || async move {
tokio::time::sleep(Duration::from_millis(100)).await;
c1.fetch_add(1, Ordering::SeqCst);
"val1"
})
.await
});
tokio::time::sleep(Duration::from_millis(10)).await;
group.forget(&"key");
let g2 = group.clone();
let c2 = counter.clone();
let h2 = tokio::spawn(async move {
g2.work("key", || async move {
tokio::time::sleep(Duration::from_millis(100)).await;
c2.fetch_add(1, Ordering::SeqCst);
"val2"
})
.await
});
assert_eq!(h1.await.unwrap(), "val1");
assert_eq!(h2.await.unwrap(), "val2");
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_panic_safe() {
let group = Arc::new(Group::<&str, String>::new());
let g1 = group.clone();
let h1 = tokio::spawn(async move {
g1.work("key", || async {
panic!("oops");
})
.await
});
let err = h1.await.unwrap_err();
assert!(err.is_panic());
let res = group.work("key", || async { "success".to_string() }).await;
assert_eq!(res, "success");
}
#[tokio::test]
async fn test_try_work_simple() {
let group = Group::new();
let res = group
.try_work("key", || async { Ok::<&str, ()>("val") })
.await;
assert_eq!(res, Ok("val"));
let res2 = group
.try_work("key", || async { Ok::<&str, ()>("val2") })
.await;
assert_eq!(res2, Ok("val2"));
}
#[tokio::test]
async fn test_try_work_coalescing() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let group = group.clone();
let counter = counter.clone();
handles.push(tokio::spawn(async move {
group
.try_work("key", || async move {
tokio::time::sleep(Duration::from_millis(100)).await;
counter.fetch_add(1, Ordering::SeqCst);
Ok::<&str, ()>("val")
})
.await
}));
}
for handle in handles {
assert_eq!(handle.await.unwrap(), Ok("val"));
}
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_try_work_failure() {
let group = Group::new();
let res = group
.try_work("key", || async { Err::<&str, &str>("error") })
.await;
assert_eq!(res, Err("error"));
let res2 = group
.try_work("key", || async { Ok::<&str, ()>("success") })
.await;
assert_eq!(res2, Ok("success"));
}
#[tokio::test]
async fn test_try_work_wait_and_retry() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));
let g1 = group.clone();
let c1 = counter.clone();
let h1 = tokio::spawn(async move {
g1.try_work("key", || async move {
c1.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(100)).await;
Err::<&str, &str>("fail")
})
.await
});
let g2 = group.clone();
let c2 = counter.clone();
let h2 = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
g2.try_work("key", || async move {
c2.fetch_add(1, Ordering::SeqCst);
Ok::<&str, ()>("success")
})
.await
});
assert_eq!(h1.await.unwrap(), Err("fail"));
assert_eq!(h2.await.unwrap(), Ok("success"));
assert_eq!(counter.load(Ordering::SeqCst), 2);
}