klauthed_data/idempotency/
mongo.rs1use async_trait::async_trait;
34use klauthed_core::time::Timestamp;
35use mongodb::Collection;
36use mongodb::bson::{Bson, Document, doc, from_bson, to_bson};
37use mongodb::options::{FindOneAndUpdateOptions, IndexOptions, ReturnDocument};
38use mongodb::{Database, IndexModel};
39
40use crate::error::DataError;
41use crate::idempotency::{IdempotencyRecord, IdempotencyStatus, IdempotencyStore, Outcome};
42
43const DEFAULT_TTL_SECS: u64 = 24 * 60 * 60;
45
46const DEFAULT_COLLECTION: &str = "idempotency_keys";
48
49#[derive(Clone)]
53pub struct MongoIdempotencyStore {
54 collection: Collection<Document>,
55 ttl_secs: u64,
56}
57
58impl MongoIdempotencyStore {
59 pub fn new(db: &Database) -> Self {
61 Self::with_collection(db, DEFAULT_COLLECTION)
62 }
63
64 pub fn with_collection(db: &Database, collection_name: &str) -> Self {
66 Self { collection: db.collection(collection_name), ttl_secs: DEFAULT_TTL_SECS }
67 }
68
69 pub fn with_ttl_secs(mut self, ttl_secs: u64) -> Self {
71 self.ttl_secs = ttl_secs;
72 self
73 }
74
75 pub async fn ensure_schema(&self) -> Result<(), DataError> {
79 let index = IndexModel::builder()
80 .keys(doc! { "expires_at": 1 })
81 .options(
82 IndexOptions::builder()
83 .name(Some("expires_at_ttl".to_owned()))
84 .expire_after(Some(std::time::Duration::from_secs(0)))
85 .build(),
86 )
87 .build();
88
89 self.collection
90 .create_index(index)
91 .await
92 .map_err(|e| DataError::Idempotency(format!("mongo create TTL index failed: {e}")))?;
93
94 Ok(())
95 }
96
97 fn expires_at(&self) -> Result<Timestamp, DataError> {
99 let now = Timestamp::now();
100 let ttl = klauthed_core::time::Duration::seconds(self.ttl_secs as i64);
101 now.checked_add(ttl).ok_or_else(|| DataError::Idempotency("TTL overflow".to_owned()))
102 }
103}
104
105fn status_to_str(status: IdempotencyStatus) -> &'static str {
106 match status {
107 IdempotencyStatus::InProgress => "in_progress",
108 IdempotencyStatus::Completed => "completed",
109 }
110}
111
112fn str_to_status(s: &str) -> Result<IdempotencyStatus, DataError> {
113 match s {
114 "in_progress" => Ok(IdempotencyStatus::InProgress),
115 "completed" => Ok(IdempotencyStatus::Completed),
116 other => Err(DataError::Idempotency(format!("unknown idempotency status '{other}'"))),
117 }
118}
119
120fn parse_timestamp(s: &str) -> Result<Timestamp, DataError> {
121 serde_json::from_value(serde_json::Value::String(s.to_owned()))
122 .map_err(|e| DataError::Idempotency(format!("invalid timestamp '{s}': {e}")))
123}
124
125fn doc_to_record(key: &str, doc: &Document) -> Result<IdempotencyRecord, DataError> {
126 let status_str = doc
127 .get_str("status")
128 .map_err(|e| DataError::Idempotency(format!("missing status: {e}")))?;
129 let status = str_to_status(status_str)?;
130
131 let response: Option<serde_json::Value> = match doc.get("response") {
132 Some(Bson::Null) | None => None,
133 Some(bson) => Some(
134 from_bson(bson.clone())
135 .map_err(|e| DataError::Idempotency(format!("response bson→json: {e}")))?,
136 ),
137 };
138
139 let created_at = parse_timestamp(
140 doc.get_str("created_at")
141 .map_err(|e| DataError::Idempotency(format!("missing created_at: {e}")))?,
142 )?;
143 let updated_at = parse_timestamp(
144 doc.get_str("updated_at")
145 .map_err(|e| DataError::Idempotency(format!("missing updated_at: {e}")))?,
146 )?;
147
148 Ok(IdempotencyRecord { key: key.to_owned(), status, response, created_at, updated_at })
149}
150
151#[async_trait]
152impl IdempotencyStore for MongoIdempotencyStore {
153 async fn begin(&self, key: &str) -> Result<Outcome, DataError> {
154 let now = Timestamp::now();
155 let expires_at = self.expires_at()?;
156 let now_str = now.to_rfc3339();
157 let expires_str = expires_at.to_rfc3339();
158
159 let filter = doc! { "_id": key };
162 let update = doc! {
163 "$setOnInsert": {
164 "status": status_to_str(IdempotencyStatus::InProgress),
165 "response": Bson::Null,
166 "created_at": &now_str,
167 "updated_at": &now_str,
168 "expires_at": &expires_str,
169 }
170 };
171 let options = FindOneAndUpdateOptions::builder()
172 .upsert(Some(true))
173 .return_document(Some(ReturnDocument::Before))
174 .build();
175
176 let existing = self
177 .collection
178 .find_one_and_update(filter, update)
179 .with_options(options)
180 .await
181 .map_err(|e| {
182 DataError::Idempotency(format!("mongo find_one_and_update failed: {e}"))
183 })?;
184
185 match existing {
186 None => Ok(Outcome::New),
188 Some(doc) => {
190 let record = doc_to_record(key, &doc)?;
191 match record.status {
192 IdempotencyStatus::InProgress => Ok(Outcome::InProgress),
193 IdempotencyStatus::Completed => {
194 let response = record.response.unwrap_or(serde_json::Value::Null);
195 Ok(Outcome::Completed(response))
196 }
197 }
198 }
199 }
200 }
201
202 async fn complete(&self, key: &str, response: serde_json::Value) -> Result<(), DataError> {
203 let now = Timestamp::now().to_rfc3339();
204 let response_bson = to_bson(&response)
205 .map_err(|e| DataError::Idempotency(format!("json→bson failed: {e}")))?;
206
207 let filter = doc! { "_id": key };
208 let update = doc! {
209 "$set": {
210 "status": status_to_str(IdempotencyStatus::Completed),
211 "response": response_bson,
212 "updated_at": now,
213 }
214 };
215
216 let result = self
217 .collection
218 .update_one(filter, update)
219 .await
220 .map_err(|e| DataError::Idempotency(format!("mongo update_one failed: {e}")))?;
221
222 if result.matched_count == 0 {
223 return Err(DataError::Idempotency(format!(
224 "cannot complete unknown idempotency key '{key}'"
225 )));
226 }
227
228 Ok(())
229 }
230
231 async fn get(&self, key: &str) -> Result<Option<IdempotencyRecord>, DataError> {
232 let filter = doc! { "_id": key };
233 let doc = self
234 .collection
235 .find_one(filter)
236 .await
237 .map_err(|e| DataError::Idempotency(format!("mongo find_one failed: {e}")))?;
238
239 match doc {
240 None => Ok(None),
241 Some(d) => Ok(Some(doc_to_record(key, &d)?)),
242 }
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use klauthed_core::id::Id;
250
251 async fn live_store() -> MongoIdempotencyStore {
252 let url =
253 std::env::var("MONGODB_URL").unwrap_or_else(|_| "mongodb://127.0.0.1:27017".to_owned());
254 let client = mongodb::Client::with_uri_str(&url).await.expect("connect mongodb");
255 let db_name = format!("klauthed_test_{}", Id::<()>::new());
256 let store = MongoIdempotencyStore::new(&client.database(&db_name));
257 store.ensure_schema().await.expect("ensure schema");
258 store
259 }
260
261 #[tokio::test]
262 #[ignore = "requires a live MongoDB at MONGODB_URL"]
263 async fn new_in_progress_complete_replay() {
264 let store = live_store().await;
265 let key = format!("test:{}", Id::<()>::new());
266
267 assert_eq!(store.begin(&key).await.unwrap(), Outcome::New);
268 assert_eq!(store.begin(&key).await.unwrap(), Outcome::InProgress);
269
270 let response = serde_json::json!({ "charged": true });
271 store.complete(&key, response.clone()).await.unwrap();
272
273 assert_eq!(store.begin(&key).await.unwrap(), Outcome::Completed(response));
274 }
275
276 #[tokio::test]
277 #[ignore = "requires a live MongoDB at MONGODB_URL"]
278 async fn complete_unknown_key_errors() {
279 let store = live_store().await;
280 let key = format!("test:{}:missing", Id::<()>::new());
281 let err = store.complete(&key, serde_json::Value::Null).await.unwrap_err();
282 assert!(matches!(err, DataError::Idempotency(_)));
283 }
284}