bus_core/idempotency.rs
1use crate::{error::BusError, id::MessageId};
2use async_trait::async_trait;
3use std::time::Duration;
4
5/// Result of a `try_claim` call.
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum ClaimOutcome {
8 /// First time the key has been seen — caller now owns the pending claim.
9 Claimed,
10 /// Key already exists in `pending` state; another worker may be holding
11 /// the claim, or a previous attempt did not complete. Caller should
12 /// proceed only if it can guarantee no concurrent execution (e.g. a
13 /// JetStream redelivery to the same consumer).
14 AlreadyPending,
15 /// Key exists in `done` state; the message was successfully processed
16 /// before. Caller MUST treat this as a duplicate and ack without running
17 /// the handler.
18 AlreadyDone,
19}
20
21/// Backend-agnostic idempotency store for deduplicating handler executions.
22///
23/// Implementations must use atomic operations to remain safe under concurrent
24/// delivery.
25#[async_trait]
26pub trait IdempotencyStore: Send + Sync {
27 /// State-aware claim. Atomically inserts the key in `pending` state if
28 /// absent, otherwise reports the existing state.
29 async fn try_claim(&self, key: &MessageId, ttl: Duration) -> Result<ClaimOutcome, BusError>;
30
31 /// Mark a previously claimed key as successfully processed.
32 async fn mark_done(&self, key: &MessageId) -> Result<(), BusError>;
33
34 /// Release a `pending` claim so the message can be retried.
35 async fn release(&self, key: &MessageId) -> Result<(), BusError>;
36}
37
38#[cfg(test)]
39mod tests {
40 use super::ClaimOutcome;
41
42 #[test]
43 fn claim_outcome_variants_exist() {
44 let _ = ClaimOutcome::Claimed;
45 let _ = ClaimOutcome::AlreadyPending;
46 let _ = ClaimOutcome::AlreadyDone;
47 }
48
49 #[test]
50 fn claim_outcome_is_eq_and_debug() {
51 assert_eq!(ClaimOutcome::Claimed, ClaimOutcome::Claimed);
52 assert_ne!(ClaimOutcome::Claimed, ClaimOutcome::AlreadyDone);
53 let _ = format!("{:?}", ClaimOutcome::AlreadyPending);
54 }
55}