use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use tokio::sync::Notify;
pub(super) const PENDING_WAIT_TIMEOUT: Duration = Duration::from_millis(5);
#[cfg(test)]
pub(super) const PENDING_ENTRY_TTL_MS: u64 = 100;
pub(super) fn register(pending: &DashMap<String, Arc<Notify>>, key: &str) -> Arc<Notify> {
pending
.entry(key.to_string())
.or_insert_with(|| Arc::new(Notify::new()))
.clone()
}
pub(super) fn complete(pending: &DashMap<String, Arc<Notify>>, key: &str) {
if let Some((_, notify)) = pending.remove(key) {
notify.notify_waiters();
}
}
pub(super) async fn await_pending(
pending: &DashMap<String, Arc<Notify>>,
key: &str,
timeout: Duration,
) -> bool {
let Some(notify) = pending.get(key).map(|entry| Arc::clone(&entry)) else {
return false;
};
let capped = timeout.min(PENDING_WAIT_TIMEOUT);
if capped.is_zero() {
return true;
}
let _ = tokio::time::timeout(capped, notify.notified()).await;
true
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[tokio::test]
async fn register_is_idempotent_for_same_key() {
let pending: DashMap<String, Arc<Notify>> = DashMap::new();
let a = register(&pending, "deadbeef");
let b = register(&pending, "deadbeef");
assert!(Arc::ptr_eq(&a, &b));
assert_eq!(pending.len(), 1);
}
#[tokio::test]
async fn complete_notifies_waiters_and_removes_entry() {
let pending: Arc<DashMap<String, Arc<Notify>>> = Arc::new(DashMap::new());
let _notify = register(&pending, "feedface");
let pending_for_waiter = Arc::clone(&pending);
let wait = tokio::spawn(async move {
await_pending(&pending_for_waiter, "feedface", PENDING_WAIT_TIMEOUT).await
});
tokio::time::sleep(Duration::from_millis(1)).await;
complete(&pending, "feedface");
let observed = wait.await.unwrap();
assert!(observed, "waiter must observe pending entry");
assert!(pending.is_empty(), "complete must remove entry");
}
#[tokio::test]
async fn await_pending_times_out_and_does_not_leak() {
let pending: DashMap<String, Arc<Notify>> = DashMap::new();
let _registered = register(&pending, "cafebabe");
let start = Instant::now();
let observed = await_pending(&pending, "cafebabe", PENDING_WAIT_TIMEOUT).await;
let elapsed = start.elapsed();
assert!(observed, "pending entry was present — must report true");
assert!(
elapsed >= PENDING_WAIT_TIMEOUT,
"timeout must elapse, got {elapsed:?}"
);
assert!(
elapsed < Duration::from_millis(PENDING_ENTRY_TTL_MS),
"timeout must not exceed the blast-radius bound ({PENDING_ENTRY_TTL_MS} ms), got {elapsed:?}"
);
assert_eq!(pending.len(), 1);
complete(&pending, "cafebabe");
assert!(pending.is_empty());
}
#[tokio::test]
async fn await_pending_returns_false_immediately_when_not_registered() {
let pending: DashMap<String, Arc<Notify>> = DashMap::new();
let start = Instant::now();
let observed = await_pending(&pending, "nothere", PENDING_WAIT_TIMEOUT).await;
let elapsed = start.elapsed();
assert!(!observed);
assert!(
elapsed < Duration::from_millis(1),
"no-wait path took {elapsed:?}"
);
}
#[tokio::test]
async fn await_pending_caps_caller_timeout_at_the_blast_radius_bound() {
let pending: DashMap<String, Arc<Notify>> = DashMap::new();
let _registered = register(&pending, "longshot");
let start = Instant::now();
let _observed = await_pending(&pending, "longshot", Duration::from_secs(1)).await;
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(PENDING_ENTRY_TTL_MS),
"caller-supplied 1s timeout was not capped, elapsed {elapsed:?}"
);
}
}