use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{AcqRel, Acquire};
use std::time::Duration;
use futures_util::StreamExt;
use futures_util::stream::FuturesUnordered;
use tokio::sync::Notify;
use uniflight::Merger;
fn unreachable_future() -> std::future::Pending<String> {
std::future::pending()
}
async fn await_notify(notify: &Notify, msg: &str) {
tokio::time::timeout(Duration::from_secs(5), notify.notified()).await.expect(msg);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn direct_call() {
let group = Merger::<String, String, _>::new_per_process();
let result = group
.execute("key", || async {
tokio::time::sleep(Duration::from_millis(10)).await;
"Result".to_string()
})
.await;
assert_eq!(result, Ok("Result".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn parallel_call() {
let call_counter = AtomicUsize::default();
let group = Merger::<String, String, _>::new_per_process();
let futures = FuturesUnordered::new();
for _ in 0..10 {
futures.push(group.execute("key", || async {
tokio::time::sleep(Duration::from_millis(100)).await;
call_counter.fetch_add(1, AcqRel);
"Result".to_string()
}));
}
assert!(futures.all(|out| async move { out == Ok("Result".to_string()) }).await);
assert_eq!(call_counter.load(Acquire), 1);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn parallel_call_seq_await() {
let call_counter = AtomicUsize::default();
let group = Merger::<String, String, _>::new_per_process();
let mut futures = Vec::new();
for _ in 0..10 {
futures.push(group.execute("key", || async {
tokio::time::sleep(Duration::from_millis(100)).await;
call_counter.fetch_add(1, AcqRel);
"Result".to_string()
}));
}
for fut in futures {
assert_eq!(fut.await, Ok("Result".to_string()));
}
assert_eq!(call_counter.load(Acquire), 1);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn call_with_static_str_key() {
let group = Merger::<String, String, _>::new_per_process();
let result = group
.execute("key", || async {
tokio::time::sleep(Duration::from_millis(1)).await;
"Result".to_string()
})
.await;
assert_eq!(result, Ok("Result".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn call_with_static_string_key() {
let group = Merger::<String, String, _>::new_per_process();
let result = group
.execute("key", || async {
tokio::time::sleep(Duration::from_millis(1)).await;
"Result".to_string()
})
.await;
assert_eq!(result, Ok("Result".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn call_with_custom_key() {
#[derive(Clone, PartialEq, Eq, Hash)]
struct K(i32);
let group = Merger::<K, String, _>::new_per_process();
let result = group
.execute(&K(1), || async {
tokio::time::sleep(Duration::from_millis(1)).await;
"Result".to_string()
})
.await;
assert_eq!(result, Ok("Result".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn late_wait() {
let group = Merger::<String, String, _>::new_per_process();
let fut_early = group.execute("key", || async {
tokio::time::sleep(Duration::from_millis(20)).await;
"Result".to_string()
});
let fut_late = group.execute("key", unreachable_future);
assert_eq!(fut_early.await, Ok("Result".to_string()));
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(fut_late.await, Ok("Result".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn cancel() {
let group = Merger::<String, String, _>::new_per_process();
let fut_cancel = group.execute(&"key".to_string(), unreachable_future);
let _ = tokio::time::timeout(Duration::from_millis(10), fut_cancel).await;
let fut_late = group.execute("key", || async { "Result2".to_string() });
assert_eq!(fut_late.await, Ok("Result2".to_string()));
let begin = tokio::time::Instant::now();
let fut_1 = group.execute("key", || async {
tokio::time::sleep(Duration::from_secs(2)).await;
"Result1".to_string()
});
let fut_2 = group.execute(&"key".to_string(), unreachable_future);
let (v1, v2) = tokio::join!(fut_1, fut_2);
assert_eq!(v1, Ok("Result1".to_string()));
assert_eq!(v2, Ok("Result1".to_string()));
assert!(begin.elapsed() > Duration::from_millis(1500));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn leader_panic_returns_error_to_all() {
let group: Arc<Merger<String, String>> = Arc::new(Merger::new());
let leader_registered = Arc::new(Notify::new());
let follower_registered = Arc::new(Notify::new());
let leader_handle = tokio::spawn({
let group = Arc::clone(&group);
let leader_registered = Arc::clone(&leader_registered);
let follower_registered = Arc::clone(&follower_registered);
async move {
let fut = group.execute("key", || async move {
await_notify(&follower_registered, "follower should register before timeout").await;
panic!("leader panicked")
});
leader_registered.notify_one();
fut.await
}
});
await_notify(&leader_registered, "leader should register before timeout").await;
let follower_handle = tokio::spawn({
let group = Arc::clone(&group);
let follower_registered = Arc::clone(&follower_registered);
async move {
let fut = group.execute("key", || async {
"follower result".to_string()
});
follower_registered.notify_one();
fut.await
}
});
let leader_err = leader_handle.await.expect("task should not panic - panic is caught").unwrap_err();
assert_eq!(leader_err.message(), "leader panicked");
let follower_err = follower_handle.await.expect("follower task should not panic").unwrap_err();
assert_eq!(follower_err.message(), "leader panicked");
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn debug_impl() {
let group: Merger<String, String> = Merger::new();
let debug_str = format!("{group:?}");
assert!(debug_str.contains("Merger"));
let fut = group.execute("key", || async {
tokio::time::sleep(Duration::from_millis(100)).await;
"Result".to_string()
});
let debug_str = format!("{group:?}");
assert!(debug_str.contains("Merger"));
assert!(debug_str.contains("DashMap"));
assert_eq!(fut.await, Ok("Result".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn per_process_strategy() {
let group = Merger::<String, String, _>::new_per_process();
let result = group.execute("key", || async { "Result".to_string() }).await;
assert_eq!(result, Ok("Result".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn per_numa_strategy() {
let group = Merger::<String, String, _>::new_per_numa();
let result = group.execute("key", || async { "Result".to_string() }).await;
assert_eq!(result, Ok("Result".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn per_core_strategy() {
let group = Merger::<String, String, _>::new_per_core();
let result = group.execute("key", || async { "Result".to_string() }).await;
assert_eq!(result, Ok("Result".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn clone_shares_state() {
let group1 = Merger::<String, String, _>::new_per_process();
let group2 = group1.clone();
let call_counter = AtomicUsize::default();
let fut1 = group1.execute("key", || async {
tokio::time::sleep(Duration::from_millis(50)).await;
call_counter.fetch_add(1, AcqRel);
"Result".to_string()
});
let fut2 = group2.execute("key", || async {
call_counter.fetch_add(1, AcqRel);
"Unreachable".to_string()
});
let (r1, r2) = tokio::join!(fut1, fut2);
assert_eq!(r1, Ok("Result".to_string()));
assert_eq!(r2, Ok("Result".to_string()));
assert_eq!(call_counter.load(Acquire), 1);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn leader_panicked_error_traits() {
let group: Merger<String, String> = Merger::new();
let result = group.execute("key", || async { panic!("test message") }).await;
let Err(error) = result else {
panic!("expected Err");
};
assert_eq!(error.message(), "test message");
let display = format!("{error}");
assert!(display.contains("leader task panicked"));
assert!(display.contains("test message"));
let debug_str = format!("{error:?}");
assert!(debug_str.contains("LeaderPanicked"));
let cloned = error.clone();
assert_eq!(cloned.message(), error.message());
assert_eq!(error, cloned);
let std_error: &dyn std::error::Error = &error;
assert!(std_error.source().is_none());
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn retry_after_panic_succeeds() {
let group: Merger<String, String> = Merger::new();
let result = group.execute("key", || async { panic!("intentional panic") }).await;
let Err(err) = result else {
panic!("expected Err");
};
assert_eq!(err.message(), "intentional panic");
let result = group.execute("key", || async { "success".to_string() }).await;
assert_eq!(result, Ok("success".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn default_impl() {
let group1: Merger<String, String> = Merger::default();
let group2: Merger<String, String> = Merger::new();
let result1 = group1.execute("key", || async { "value".to_string() }).await;
let result2 = group2.execute("key", || async { "value".to_string() }).await;
assert_eq!(result1, Ok("value".to_string()));
assert_eq!(result2, Ok("value".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn mixed_panic_and_success() {
let group: Merger<String, String> = Merger::new();
let panic_fut = group.execute("panic_key", || async {
tokio::time::sleep(Duration::from_millis(10)).await;
panic!("intentional panic")
});
let success_fut = group.execute("success_key", || async {
tokio::time::sleep(Duration::from_millis(10)).await;
"success".to_string()
});
let (panic_result, success_result) = tokio::join!(panic_fut, success_fut);
let Err(err) = panic_result else {
panic!("expected Err");
};
assert_eq!(err.message(), "intentional panic");
assert_eq!(success_result, Ok("success".to_string()));
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn follower_closure_not_called_on_panic() {
let group: Arc<Merger<String, String>> = Arc::new(Merger::new());
let follower_called = Arc::new(AtomicUsize::new(0));
let leader_registered = Arc::new(Notify::new());
let follower_registered = Arc::new(Notify::new());
let leader_handle = tokio::spawn({
let group = Arc::clone(&group);
let leader_registered = Arc::clone(&leader_registered);
let follower_registered = Arc::clone(&follower_registered);
async move {
let fut = group.execute("key", || async move {
await_notify(&follower_registered, "follower should register before timeout").await;
panic!("leader panic")
});
leader_registered.notify_one();
fut.await
}
});
await_notify(&leader_registered, "leader should register before timeout").await;
let follower_handle = tokio::spawn({
let group = Arc::clone(&group);
let follower_called = Arc::clone(&follower_called);
let follower_registered = Arc::clone(&follower_registered);
async move {
let fut = group.execute("key", || async {
follower_called.fetch_add(1, AcqRel);
"follower result".to_string()
});
follower_registered.notify_one();
fut.await
}
});
let leader_err = leader_handle.await.expect("task join").unwrap_err();
assert_eq!(leader_err.message(), "leader panic");
let follower_err = follower_handle.await.expect("task join").unwrap_err();
assert_eq!(follower_err.message(), "leader panic");
assert_eq!(follower_called.load(Acquire), 0);
}