athena_rs 3.4.7

Database driver
Documentation
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};

use tokio::sync::Semaphore;

static LAST_DROP_WARN_EPOCH_SECS: AtomicU64 = AtomicU64::new(0);
static DROPPED_BEST_EFFORT_TASKS: AtomicU64 = AtomicU64::new(0);

/// Spawn best-effort background work behind an optional concurrency cap.
///
/// When saturated, the task is dropped instead of waiting so audit/auth writes
/// cannot monopolize the shared logging pool.
pub fn spawn_best_effort_logging_task<F>(
    limiter: Option<Arc<Semaphore>>,
    task_name: &'static str,
    future: F,
) -> bool
where
    F: Future<Output = ()> + Send + 'static,
{
    let Some(limiter) = limiter else {
        actix_web::rt::spawn(future);
        return true;
    };

    let Ok(permit) = limiter.try_acquire_owned() else {
        let dropped_so_far: u64 = DROPPED_BEST_EFFORT_TASKS.fetch_add(1, Ordering::Relaxed) + 1;
        let now_epoch_secs: u64 = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();
        let last_epoch_secs: u64 = LAST_DROP_WARN_EPOCH_SECS.load(Ordering::Relaxed);
        if now_epoch_secs != last_epoch_secs
            && LAST_DROP_WARN_EPOCH_SECS
                .compare_exchange(
                    last_epoch_secs,
                    now_epoch_secs,
                    Ordering::Relaxed,
                    Ordering::Relaxed,
                )
                .is_ok()
        {
            tracing::warn!(
                target: "athena_rs::logging_tasks",
                task = task_name,
                dropped_total = dropped_so_far,
                "background logging concurrency limit reached; dropping best-effort task"
            );
        }
        return false;
    };

    actix_web::rt::spawn(async move {
        let _permit = permit;
        future.await;
    });
    true
}

#[cfg(test)]
mod tests {
    use super::spawn_best_effort_logging_task;
    use std::sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    };
    use tokio::sync::{Semaphore, oneshot};
    use tokio::time::{Duration, timeout};

    #[actix_web::test]
    async fn spawn_without_limiter_runs_task() {
        let (tx, rx) = oneshot::channel();

        let spawned: bool = spawn_best_effort_logging_task(None, "test_no_limiter", async move {
            let _ = tx.send(());
        });

        assert!(spawned);
        timeout(Duration::from_secs(1), rx)
            .await
            .expect("task should complete without limiter")
            .expect("sender should not be dropped");
    }

    #[actix_web::test]
    async fn spawn_with_available_permit_runs_and_releases_it() {
        let limiter: Arc<Semaphore> = Arc::new(Semaphore::new(1));
        let (release_tx, release_rx) = oneshot::channel();
        let (done_tx, done_rx) = oneshot::channel();

        let spawned = spawn_best_effort_logging_task(
            Some(limiter.clone()),
            "test_with_limiter",
            async move {
                let _ = release_rx.await;
                let _ = done_tx.send(());
            },
        );

        assert!(spawned);
        assert_eq!(limiter.available_permits(), 0);

        let _ = release_tx.send(());

        timeout(Duration::from_secs(1), done_rx)
            .await
            .expect("task should complete once released")
            .expect("sender should not be dropped");

        timeout(Duration::from_secs(1), async {
            loop {
                if limiter.available_permits() == 1 {
                    break;
                }
                tokio::task::yield_now().await;
            }
        })
        .await
        .expect("permit should be released after task completion");
    }

    #[actix_web::test]
    async fn spawn_with_saturated_limiter_drops_task() {
        let limiter: Arc<Semaphore> = Arc::new(Semaphore::new(1));
        let permit: tokio::sync::OwnedSemaphorePermit = limiter
            .clone()
            .try_acquire_owned()
            .expect("initial permit should be available");
        let ran: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
        let ran_clone: Arc<AtomicBool> = ran.clone();

        let spawned: bool =
            spawn_best_effort_logging_task(Some(limiter), "test_saturated_limiter", async move {
                ran_clone.store(true, Ordering::SeqCst);
            });

        assert!(!spawned);
        tokio::task::yield_now().await;
        assert!(!ran.load(Ordering::SeqCst));

        drop(permit);
    }
}