Skip to main content

klauthed_data/idempotency/
mongo.rs

1//! MongoDB-backed [`IdempotencyStore`].
2//!
3//! Documents in the collection follow this shape:
4//!
5//! ```text
6//! {
7//!   _id:        <idempotency-key>,
8//!   status:     "in_progress" | "completed",
9//!   response:   <BSON value> | null,
10//!   created_at: <RFC3339 string>,
11//!   updated_at: <RFC3339 string>,
12//!   expires_at: <RFC3339 string>,
13//! }
14//! ```
15//!
16//! **begin** — `find_one_and_update` with `$setOnInsert` and `upsert: true`,
17//! `returnDocument: Before`. A `None` result means the document was newly
18//! inserted (`Outcome::New`); a returned doc reveals the existing status.
19//!
20//! **complete** — `update_one({_id: key}, {$set: {status, response, updated_at}})`.
21//!
22//! **get** — `find_one({_id: key})`.
23//!
24//! A TTL index on `expires_at` auto-expires keys; default TTL is 24 hours,
25//! configurable with [`MongoIdempotencyStore::with_ttl_secs`].
26//!
27//! Live tests are marked `#[ignore]`; run with a running MongoDB at
28//! `MONGODB_URL` via:
29//! ```text
30//! cargo test -p klauthed-data --features mongodb -- --ignored
31//! ```
32
33use async_trait::async_trait;
34use klauthed_core::time::Timestamp;
35use mongodb::Collection;
36use mongodb::bson::{Bson, Document, doc, from_bson, to_bson};
37use mongodb::options::{FindOneAndUpdateOptions, IndexOptions, ReturnDocument};
38use mongodb::{Database, IndexModel};
39
40use crate::error::DataError;
41use crate::idempotency::{IdempotencyRecord, IdempotencyStatus, IdempotencyStore, Outcome};
42
43/// Default TTL for idempotency keys: 24 hours in seconds.
44const DEFAULT_TTL_SECS: u64 = 24 * 60 * 60;
45
46/// Default collection name for idempotency documents.
47const DEFAULT_COLLECTION: &str = "idempotency_keys";
48
49/// A MongoDB-backed [`IdempotencyStore`].
50///
51/// Clone-cheap: holds only the collection handle (an `Arc` internally).
52#[derive(Clone)]
53pub struct MongoIdempotencyStore {
54    collection: Collection<Document>,
55    ttl_secs: u64,
56}
57
58impl MongoIdempotencyStore {
59    /// Wrap an existing database with the default 24-hour TTL.
60    pub fn new(db: &Database) -> Self {
61        Self::with_collection(db, DEFAULT_COLLECTION)
62    }
63
64    /// Wrap an existing database using `collection_name`.
65    pub fn with_collection(db: &Database, collection_name: &str) -> Self {
66        Self { collection: db.collection(collection_name), ttl_secs: DEFAULT_TTL_SECS }
67    }
68
69    /// Set a custom TTL in seconds for idempotency keys.
70    pub fn with_ttl_secs(mut self, ttl_secs: u64) -> Self {
71        self.ttl_secs = ttl_secs;
72        self
73    }
74
75    /// Create the TTL index on `expires_at`.
76    ///
77    /// Must be called once (or at each startup — it is idempotent).
78    pub async fn ensure_schema(&self) -> Result<(), DataError> {
79        let index = IndexModel::builder()
80            .keys(doc! { "expires_at": 1 })
81            .options(
82                IndexOptions::builder()
83                    .name(Some("expires_at_ttl".to_owned()))
84                    .expire_after(Some(std::time::Duration::from_secs(0)))
85                    .build(),
86            )
87            .build();
88
89        self.collection
90            .create_index(index)
91            .await
92            .map_err(|e| DataError::Idempotency(format!("mongo create TTL index failed: {e}")))?;
93
94        Ok(())
95    }
96
97    /// Compute the `expires_at` timestamp for a newly claimed key.
98    fn expires_at(&self) -> Result<Timestamp, DataError> {
99        let now = Timestamp::now();
100        let ttl = klauthed_core::time::Duration::seconds(self.ttl_secs as i64);
101        now.checked_add(ttl).ok_or_else(|| DataError::Idempotency("TTL overflow".to_owned()))
102    }
103}
104
105fn status_to_str(status: IdempotencyStatus) -> &'static str {
106    match status {
107        IdempotencyStatus::InProgress => "in_progress",
108        IdempotencyStatus::Completed => "completed",
109    }
110}
111
112fn str_to_status(s: &str) -> Result<IdempotencyStatus, DataError> {
113    match s {
114        "in_progress" => Ok(IdempotencyStatus::InProgress),
115        "completed" => Ok(IdempotencyStatus::Completed),
116        other => Err(DataError::Idempotency(format!("unknown idempotency status '{other}'"))),
117    }
118}
119
120fn parse_timestamp(s: &str) -> Result<Timestamp, DataError> {
121    serde_json::from_value(serde_json::Value::String(s.to_owned()))
122        .map_err(|e| DataError::Idempotency(format!("invalid timestamp '{s}': {e}")))
123}
124
125fn doc_to_record(key: &str, doc: &Document) -> Result<IdempotencyRecord, DataError> {
126    let status_str = doc
127        .get_str("status")
128        .map_err(|e| DataError::Idempotency(format!("missing status: {e}")))?;
129    let status = str_to_status(status_str)?;
130
131    let response: Option<serde_json::Value> = match doc.get("response") {
132        Some(Bson::Null) | None => None,
133        Some(bson) => Some(
134            from_bson(bson.clone())
135                .map_err(|e| DataError::Idempotency(format!("response bson→json: {e}")))?,
136        ),
137    };
138
139    let created_at = parse_timestamp(
140        doc.get_str("created_at")
141            .map_err(|e| DataError::Idempotency(format!("missing created_at: {e}")))?,
142    )?;
143    let updated_at = parse_timestamp(
144        doc.get_str("updated_at")
145            .map_err(|e| DataError::Idempotency(format!("missing updated_at: {e}")))?,
146    )?;
147
148    Ok(IdempotencyRecord { key: key.to_owned(), status, response, created_at, updated_at })
149}
150
151#[async_trait]
152impl IdempotencyStore for MongoIdempotencyStore {
153    async fn begin(&self, key: &str) -> Result<Outcome, DataError> {
154        let now = Timestamp::now();
155        let expires_at = self.expires_at()?;
156        let now_str = now.to_rfc3339();
157        let expires_str = expires_at.to_rfc3339();
158
159        // `$setOnInsert` only writes when the document is being inserted (new key).
160        // `returnDocument: Before` → None means the doc was just created.
161        let filter = doc! { "_id": key };
162        let update = doc! {
163            "$setOnInsert": {
164                "status":     status_to_str(IdempotencyStatus::InProgress),
165                "response":   Bson::Null,
166                "created_at": &now_str,
167                "updated_at": &now_str,
168                "expires_at": &expires_str,
169            }
170        };
171        let options = FindOneAndUpdateOptions::builder()
172            .upsert(Some(true))
173            .return_document(Some(ReturnDocument::Before))
174            .build();
175
176        let existing = self
177            .collection
178            .find_one_and_update(filter, update)
179            .with_options(options)
180            .await
181            .map_err(|e| {
182                DataError::Idempotency(format!("mongo find_one_and_update failed: {e}"))
183            })?;
184
185        match existing {
186            // Document did not exist before — we just inserted it.
187            None => Ok(Outcome::New),
188            // Document already existed; inspect its status.
189            Some(doc) => {
190                let record = doc_to_record(key, &doc)?;
191                match record.status {
192                    IdempotencyStatus::InProgress => Ok(Outcome::InProgress),
193                    IdempotencyStatus::Completed => {
194                        let response = record.response.unwrap_or(serde_json::Value::Null);
195                        Ok(Outcome::Completed(response))
196                    }
197                }
198            }
199        }
200    }
201
202    async fn complete(&self, key: &str, response: serde_json::Value) -> Result<(), DataError> {
203        let now = Timestamp::now().to_rfc3339();
204        let response_bson = to_bson(&response)
205            .map_err(|e| DataError::Idempotency(format!("json→bson failed: {e}")))?;
206
207        let filter = doc! { "_id": key };
208        let update = doc! {
209            "$set": {
210                "status":     status_to_str(IdempotencyStatus::Completed),
211                "response":   response_bson,
212                "updated_at": now,
213            }
214        };
215
216        let result = self
217            .collection
218            .update_one(filter, update)
219            .await
220            .map_err(|e| DataError::Idempotency(format!("mongo update_one failed: {e}")))?;
221
222        if result.matched_count == 0 {
223            return Err(DataError::Idempotency(format!(
224                "cannot complete unknown idempotency key '{key}'"
225            )));
226        }
227
228        Ok(())
229    }
230
231    async fn get(&self, key: &str) -> Result<Option<IdempotencyRecord>, DataError> {
232        let filter = doc! { "_id": key };
233        let doc = self
234            .collection
235            .find_one(filter)
236            .await
237            .map_err(|e| DataError::Idempotency(format!("mongo find_one failed: {e}")))?;
238
239        match doc {
240            None => Ok(None),
241            Some(d) => Ok(Some(doc_to_record(key, &d)?)),
242        }
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use klauthed_core::id::Id;
250
251    async fn live_store() -> MongoIdempotencyStore {
252        let url =
253            std::env::var("MONGODB_URL").unwrap_or_else(|_| "mongodb://127.0.0.1:27017".to_owned());
254        let client = mongodb::Client::with_uri_str(&url).await.expect("connect mongodb");
255        let db_name = format!("klauthed_test_{}", Id::<()>::new());
256        let store = MongoIdempotencyStore::new(&client.database(&db_name));
257        store.ensure_schema().await.expect("ensure schema");
258        store
259    }
260
261    #[tokio::test]
262    #[ignore = "requires a live MongoDB at MONGODB_URL"]
263    async fn new_in_progress_complete_replay() {
264        let store = live_store().await;
265        let key = format!("test:{}", Id::<()>::new());
266
267        assert_eq!(store.begin(&key).await.unwrap(), Outcome::New);
268        assert_eq!(store.begin(&key).await.unwrap(), Outcome::InProgress);
269
270        let response = serde_json::json!({ "charged": true });
271        store.complete(&key, response.clone()).await.unwrap();
272
273        assert_eq!(store.begin(&key).await.unwrap(), Outcome::Completed(response));
274    }
275
276    #[tokio::test]
277    #[ignore = "requires a live MongoDB at MONGODB_URL"]
278    async fn complete_unknown_key_errors() {
279        let store = live_store().await;
280        let key = format!("test:{}:missing", Id::<()>::new());
281        let err = store.complete(&key, serde_json::Value::Null).await.unwrap_err();
282        assert!(matches!(err, DataError::Idempotency(_)));
283    }
284}