Skip to main content

klauthed_data/idempotency/
mod.rs

1//! Idempotency keys.
2//!
3//! To make a non-idempotent operation safe to retry, the caller attaches an
4//! **idempotency key**. Before doing the work it calls [`begin`](IdempotencyStore::begin):
5//!
6//! * [`Outcome::New`] — no record yet; this caller claimed the key and should
7//!   proceed, then call [`complete`](IdempotencyStore::complete) with the response.
8//! * [`Outcome::InProgress`] — another attempt claimed the key and has not
9//!   finished; the caller should back off / reject the duplicate.
10//! * [`Outcome::Completed`] — the work already ran; the stored response is
11//!   replayed instead of re-executing.
12//!
13//! This module provides the backend-agnostic [`IdempotencyStore`] trait, the
14//! [`IdempotencyRecord`] model, and an in-memory implementation. A Redis-backed
15//! store (using `SET key val NX` to claim atomically) is a future pass.
16//!
17//! ```
18//! use klauthed_data::idempotency::{IdempotencyStore, InMemoryIdempotencyStore, Outcome};
19//!
20//! # async fn run() -> Result<(), klauthed_data::DataError> {
21//! let store = InMemoryIdempotencyStore::new();
22//! match store.begin("charge-42").await? {
23//!     Outcome::New => {
24//!         store.complete("charge-42", serde_json::json!({"ok": true})).await?;
25//!     }
26//!     Outcome::InProgress => { /* duplicate in flight */ }
27//!     Outcome::Completed(_response) => { /* replay stored response */ }
28//! }
29//! # Ok(())
30//! # }
31//! ```
32
33#[cfg(feature = "redis")]
34pub mod redis;
35
36#[cfg(feature = "mongodb")]
37pub mod mongo;
38
39use async_trait::async_trait;
40use klauthed_core::time::Timestamp;
41use serde::{Deserialize, Serialize};
42use std::collections::HashMap;
43use std::sync::Mutex;
44
45use crate::error::DataError;
46
47/// The lifecycle state of an idempotency key.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum IdempotencyStatus {
50    /// Claimed by a caller that is still executing the operation.
51    InProgress,
52    /// The operation finished; a response is stored for replay.
53    Completed,
54}
55
56/// The persisted state for one idempotency key.
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct IdempotencyRecord {
59    /// The idempotency key this record belongs to.
60    pub key: String,
61    /// Where the keyed operation is in its lifecycle.
62    pub status: IdempotencyStatus,
63    /// The stored response, present once `status` is
64    /// [`Completed`](IdempotencyStatus::Completed).
65    pub response: Option<serde_json::Value>,
66    /// When the key was first claimed.
67    pub created_at: Timestamp,
68    /// When the record last changed (claim or completion).
69    pub updated_at: Timestamp,
70}
71
72/// The result of claiming an idempotency key with [`IdempotencyStore::begin`].
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum Outcome {
75    /// The key was free and is now claimed by this caller; proceed with the work.
76    New,
77    /// The key is claimed by an attempt that has not yet completed.
78    InProgress,
79    /// The work already completed; the stored response is returned for replay.
80    Completed(serde_json::Value),
81}
82
83/// A store that deduplicates operations by idempotency key.
84#[async_trait]
85pub trait IdempotencyStore: Send + Sync {
86    /// Atomically claim `key` if it is free, otherwise report its current state.
87    ///
88    /// Returns [`Outcome::New`] when this caller wins the claim (a record is
89    /// created `InProgress`), [`Outcome::InProgress`] if another claim is live,
90    /// or [`Outcome::Completed`] with the stored response if the work is done.
91    async fn begin(&self, key: &str) -> Result<Outcome, DataError>;
92
93    /// Mark `key`'s operation completed and store `response` for future replays.
94    ///
95    /// # Errors
96    /// Returns [`DataError::Idempotency`] if the key was never claimed.
97    async fn complete(&self, key: &str, response: serde_json::Value) -> Result<(), DataError>;
98
99    /// Fetch the raw record for `key`, if any.
100    async fn get(&self, key: &str) -> Result<Option<IdempotencyRecord>, DataError>;
101}
102
103/// A thread-safe, in-memory [`IdempotencyStore`] for tests and single-process use.
104#[derive(Default)]
105pub struct InMemoryIdempotencyStore {
106    records: Mutex<HashMap<String, IdempotencyRecord>>,
107}
108
109impl InMemoryIdempotencyStore {
110    /// An empty store.
111    pub fn new() -> Self {
112        Self::default()
113    }
114}
115
116#[async_trait]
117impl IdempotencyStore for InMemoryIdempotencyStore {
118    async fn begin(&self, key: &str) -> Result<Outcome, DataError> {
119        let now = Timestamp::now();
120        let mut guard = self.records.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
121        match guard.get(key) {
122            Some(record) => match record.status {
123                IdempotencyStatus::InProgress => Ok(Outcome::InProgress),
124                IdempotencyStatus::Completed => {
125                    // A completed record always carries its response.
126                    let response = record.response.clone().unwrap_or(serde_json::Value::Null);
127                    Ok(Outcome::Completed(response))
128                }
129            },
130            None => {
131                guard.insert(
132                    key.to_owned(),
133                    IdempotencyRecord {
134                        key: key.to_owned(),
135                        status: IdempotencyStatus::InProgress,
136                        response: None,
137                        created_at: now,
138                        updated_at: now,
139                    },
140                );
141                Ok(Outcome::New)
142            }
143        }
144    }
145
146    async fn complete(&self, key: &str, response: serde_json::Value) -> Result<(), DataError> {
147        let now = Timestamp::now();
148        let mut guard = self.records.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
149        match guard.get_mut(key) {
150            Some(record) => {
151                record.status = IdempotencyStatus::Completed;
152                record.response = Some(response);
153                record.updated_at = now;
154                Ok(())
155            }
156            None => Err(DataError::Idempotency(format!(
157                "cannot complete unknown idempotency key '{key}'"
158            ))),
159        }
160    }
161
162    async fn get(&self, key: &str) -> Result<Option<IdempotencyRecord>, DataError> {
163        Ok(self.records.lock().unwrap_or_else(std::sync::PoisonError::into_inner).get(key).cloned())
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[tokio::test]
172    async fn new_then_in_progress_then_completed_replay() {
173        let store = InMemoryIdempotencyStore::new();
174
175        // First caller claims the key.
176        assert_eq!(store.begin("k").await.unwrap(), Outcome::New);
177
178        // A concurrent second caller sees it in progress.
179        assert_eq!(store.begin("k").await.unwrap(), Outcome::InProgress);
180
181        // The first caller finishes and stores its response.
182        let response = serde_json::json!({ "charged": true, "amount": 100 });
183        store.complete("k", response.clone()).await.unwrap();
184
185        // Subsequent begins replay the stored response instead of re-running.
186        assert_eq!(store.begin("k").await.unwrap(), Outcome::Completed(response));
187    }
188
189    #[tokio::test]
190    async fn distinct_keys_are_independent() {
191        let store = InMemoryIdempotencyStore::new();
192        assert_eq!(store.begin("a").await.unwrap(), Outcome::New);
193        assert_eq!(store.begin("b").await.unwrap(), Outcome::New);
194    }
195
196    #[tokio::test]
197    async fn complete_unknown_key_errors() {
198        let store = InMemoryIdempotencyStore::new();
199        let err = store.complete("missing", serde_json::Value::Null).await.unwrap_err();
200        assert!(matches!(err, DataError::Idempotency(_)));
201    }
202
203    #[tokio::test]
204    async fn get_exposes_record_lifecycle() {
205        let store = InMemoryIdempotencyStore::new();
206        store.begin("k").await.unwrap();
207        let rec = store.get("k").await.unwrap().unwrap();
208        assert_eq!(rec.status, IdempotencyStatus::InProgress);
209        assert!(rec.response.is_none());
210
211        store.complete("k", serde_json::json!(1)).await.unwrap();
212        let rec = store.get("k").await.unwrap().unwrap();
213        assert_eq!(rec.status, IdempotencyStatus::Completed);
214        assert_eq!(rec.response, Some(serde_json::json!(1)));
215    }
216}