use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Semaphore;
static LAST_DROP_WARN_EPOCH_SECS: AtomicU64 = AtomicU64::new(0);
static DROPPED_BEST_EFFORT_TASKS: AtomicU64 = AtomicU64::new(0);
pub fn spawn_best_effort_logging_task<F>(
limiter: Option<Arc<Semaphore>>,
task_name: &'static str,
future: F,
) -> bool
where
F: Future<Output = ()> + Send + 'static,
{
let Some(limiter) = limiter else {
actix_web::rt::spawn(future);
return true;
};
let Ok(permit) = limiter.try_acquire_owned() else {
let dropped_so_far: u64 = DROPPED_BEST_EFFORT_TASKS.fetch_add(1, Ordering::Relaxed) + 1;
let now_epoch_secs: u64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let last_epoch_secs: u64 = LAST_DROP_WARN_EPOCH_SECS.load(Ordering::Relaxed);
if now_epoch_secs != last_epoch_secs
&& LAST_DROP_WARN_EPOCH_SECS
.compare_exchange(
last_epoch_secs,
now_epoch_secs,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
tracing::warn!(
target: "athena_rs::logging_tasks",
task = task_name,
dropped_total = dropped_so_far,
"background logging concurrency limit reached; dropping best-effort task"
);
}
return false;
};
actix_web::rt::spawn(async move {
let _permit = permit;
future.await;
});
true
}
#[cfg(test)]
mod tests {
use super::spawn_best_effort_logging_task;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use tokio::sync::{Semaphore, oneshot};
use tokio::time::{Duration, timeout};
#[actix_web::test]
async fn spawn_without_limiter_runs_task() {
let (tx, rx) = oneshot::channel();
let spawned: bool = spawn_best_effort_logging_task(None, "test_no_limiter", async move {
let _ = tx.send(());
});
assert!(spawned);
timeout(Duration::from_secs(1), rx)
.await
.expect("task should complete without limiter")
.expect("sender should not be dropped");
}
#[actix_web::test]
async fn spawn_with_available_permit_runs_and_releases_it() {
let limiter: Arc<Semaphore> = Arc::new(Semaphore::new(1));
let (release_tx, release_rx) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let spawned = spawn_best_effort_logging_task(
Some(limiter.clone()),
"test_with_limiter",
async move {
let _ = release_rx.await;
let _ = done_tx.send(());
},
);
assert!(spawned);
assert_eq!(limiter.available_permits(), 0);
let _ = release_tx.send(());
timeout(Duration::from_secs(1), done_rx)
.await
.expect("task should complete once released")
.expect("sender should not be dropped");
timeout(Duration::from_secs(1), async {
loop {
if limiter.available_permits() == 1 {
break;
}
tokio::task::yield_now().await;
}
})
.await
.expect("permit should be released after task completion");
}
#[actix_web::test]
async fn spawn_with_saturated_limiter_drops_task() {
let limiter: Arc<Semaphore> = Arc::new(Semaphore::new(1));
let permit: tokio::sync::OwnedSemaphorePermit = limiter
.clone()
.try_acquire_owned()
.expect("initial permit should be available");
let ran: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let ran_clone: Arc<AtomicBool> = ran.clone();
let spawned: bool =
spawn_best_effort_logging_task(Some(limiter), "test_saturated_limiter", async move {
ran_clone.store(true, Ordering::SeqCst);
});
assert!(!spawned);
tokio::task::yield_now().await;
assert!(!ran.load(Ordering::SeqCst));
drop(permit);
}
}