pyra-streams 0.4.2

Redis Stream consumer infrastructure for Pyra services
Documentation
use deadpool_redis::Pool;
use redis::AsyncCommands;

use crate::error::StreamResult;

/// Try to acquire a dedup lock using Redis SET NX EX.
///
/// Returns `true` if the lock was acquired (first time seeing this key),
/// `false` if the key already exists (duplicate).
pub async fn try_acquire_dedup_lock(
    pool: &Pool,
    key: &str,
    ttl_secs: u64,
) -> StreamResult<bool> {
    let mut conn = pool.get().await?;
    let result: Option<String> = redis::cmd("SET")
        .arg(key)
        .arg("1")
        .arg("NX")
        .arg("EX")
        .arg(ttl_secs)
        .query_async(&mut *conn)
        .await?;
    Ok(result.is_some())
}

/// Release a dedup lock by deleting the key.
///
/// Used when a handler fails and the message should be retried —
/// removing the lock allows the retry to re-acquire it.
pub async fn release_dedup_lock(pool: &Pool, key: &str) -> StreamResult<()> {
    let mut conn = pool.get().await?;
    let _: () = conn.del(key).await?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    // Dedup lock tests require a real Redis connection.
    // These are validated via integration tests in consuming services.
    // Unit tests here verify the function signatures compile correctly.

    #[test]
    fn test_dedup_functions_exist() {
        // Type-level check: ensure the public API compiles
        fn _assert_acquire<'a>(_pool: &'a Pool, _key: &'a str, _ttl: u64) -> impl std::future::Future<Output = StreamResult<bool>> + 'a {
            try_acquire_dedup_lock(_pool, _key, _ttl)
        }
        fn _assert_release<'a>(_pool: &'a Pool, _key: &'a str) -> impl std::future::Future<Output = StreamResult<()>> + 'a {
            release_dedup_lock(_pool, _key)
        }
    }
}