Skip to main content

bus_core/
idempotency.rs

1use crate::{error::BusError, id::MessageId};
2use async_trait::async_trait;
3use std::time::Duration;
4
5/// Result of a `try_claim` call.
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum ClaimOutcome {
8    /// First time the key has been seen — caller now owns the pending claim.
9    Claimed,
10    /// Key already exists in `pending` state; another worker may be holding
11    /// the claim, or a previous attempt did not complete. Caller should
12    /// proceed only if it can guarantee no concurrent execution (e.g. a
13    /// JetStream redelivery to the same consumer).
14    AlreadyPending,
15    /// Key exists in `done` state; the message was successfully processed
16    /// before. Caller MUST treat this as a duplicate and ack without running
17    /// the handler.
18    AlreadyDone,
19}
20
21/// Backend-agnostic idempotency store for deduplicating handler executions.
22///
23/// Implementations must use atomic operations to remain safe under concurrent
24/// delivery.
25#[async_trait]
26pub trait IdempotencyStore: Send + Sync {
27    /// State-aware claim. Atomically inserts the key in `pending` state if
28    /// absent, otherwise reports the existing state.
29    async fn try_claim(&self, key: &MessageId, ttl: Duration) -> Result<ClaimOutcome, BusError>;
30
31    /// Mark a previously claimed key as successfully processed.
32    async fn mark_done(&self, key: &MessageId) -> Result<(), BusError>;
33
34    /// Release a `pending` claim so the message can be retried.
35    async fn release(&self, key: &MessageId) -> Result<(), BusError>;
36}
37
38#[cfg(test)]
39mod tests {
40    use super::ClaimOutcome;
41
42    #[test]
43    fn claim_outcome_variants_exist() {
44        let _ = ClaimOutcome::Claimed;
45        let _ = ClaimOutcome::AlreadyPending;
46        let _ = ClaimOutcome::AlreadyDone;
47    }
48
49    #[test]
50    fn claim_outcome_is_eq_and_debug() {
51        assert_eq!(ClaimOutcome::Claimed, ClaimOutcome::Claimed);
52        assert_ne!(ClaimOutcome::Claimed, ClaimOutcome::AlreadyDone);
53        let _ = format!("{:?}", ClaimOutcome::AlreadyPending);
54    }
55}