klauthed_data/idempotency/
mod.rs1#[cfg(feature = "redis")]
34pub mod redis;
35
36#[cfg(feature = "mongodb")]
37pub mod mongo;
38
39use async_trait::async_trait;
40use klauthed_core::time::Timestamp;
41use serde::{Deserialize, Serialize};
42use std::collections::HashMap;
43use std::sync::Mutex;
44
45use crate::error::DataError;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum IdempotencyStatus {
50 InProgress,
52 Completed,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub struct IdempotencyRecord {
59 pub key: String,
61 pub status: IdempotencyStatus,
63 pub response: Option<serde_json::Value>,
66 pub created_at: Timestamp,
68 pub updated_at: Timestamp,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum Outcome {
75 New,
77 InProgress,
79 Completed(serde_json::Value),
81}
82
83#[async_trait]
85pub trait IdempotencyStore: Send + Sync {
86 async fn begin(&self, key: &str) -> Result<Outcome, DataError>;
92
93 async fn complete(&self, key: &str, response: serde_json::Value) -> Result<(), DataError>;
98
99 async fn get(&self, key: &str) -> Result<Option<IdempotencyRecord>, DataError>;
101}
102
103#[derive(Default)]
105pub struct InMemoryIdempotencyStore {
106 records: Mutex<HashMap<String, IdempotencyRecord>>,
107}
108
109impl InMemoryIdempotencyStore {
110 pub fn new() -> Self {
112 Self::default()
113 }
114}
115
116#[async_trait]
117impl IdempotencyStore for InMemoryIdempotencyStore {
118 async fn begin(&self, key: &str) -> Result<Outcome, DataError> {
119 let now = Timestamp::now();
120 let mut guard = self.records.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
121 match guard.get(key) {
122 Some(record) => match record.status {
123 IdempotencyStatus::InProgress => Ok(Outcome::InProgress),
124 IdempotencyStatus::Completed => {
125 let response = record.response.clone().unwrap_or(serde_json::Value::Null);
127 Ok(Outcome::Completed(response))
128 }
129 },
130 None => {
131 guard.insert(
132 key.to_owned(),
133 IdempotencyRecord {
134 key: key.to_owned(),
135 status: IdempotencyStatus::InProgress,
136 response: None,
137 created_at: now,
138 updated_at: now,
139 },
140 );
141 Ok(Outcome::New)
142 }
143 }
144 }
145
146 async fn complete(&self, key: &str, response: serde_json::Value) -> Result<(), DataError> {
147 let now = Timestamp::now();
148 let mut guard = self.records.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
149 match guard.get_mut(key) {
150 Some(record) => {
151 record.status = IdempotencyStatus::Completed;
152 record.response = Some(response);
153 record.updated_at = now;
154 Ok(())
155 }
156 None => Err(DataError::Idempotency(format!(
157 "cannot complete unknown idempotency key '{key}'"
158 ))),
159 }
160 }
161
162 async fn get(&self, key: &str) -> Result<Option<IdempotencyRecord>, DataError> {
163 Ok(self.records.lock().unwrap_or_else(std::sync::PoisonError::into_inner).get(key).cloned())
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[tokio::test]
172 async fn new_then_in_progress_then_completed_replay() {
173 let store = InMemoryIdempotencyStore::new();
174
175 assert_eq!(store.begin("k").await.unwrap(), Outcome::New);
177
178 assert_eq!(store.begin("k").await.unwrap(), Outcome::InProgress);
180
181 let response = serde_json::json!({ "charged": true, "amount": 100 });
183 store.complete("k", response.clone()).await.unwrap();
184
185 assert_eq!(store.begin("k").await.unwrap(), Outcome::Completed(response));
187 }
188
189 #[tokio::test]
190 async fn distinct_keys_are_independent() {
191 let store = InMemoryIdempotencyStore::new();
192 assert_eq!(store.begin("a").await.unwrap(), Outcome::New);
193 assert_eq!(store.begin("b").await.unwrap(), Outcome::New);
194 }
195
196 #[tokio::test]
197 async fn complete_unknown_key_errors() {
198 let store = InMemoryIdempotencyStore::new();
199 let err = store.complete("missing", serde_json::Value::Null).await.unwrap_err();
200 assert!(matches!(err, DataError::Idempotency(_)));
201 }
202
203 #[tokio::test]
204 async fn get_exposes_record_lifecycle() {
205 let store = InMemoryIdempotencyStore::new();
206 store.begin("k").await.unwrap();
207 let rec = store.get("k").await.unwrap().unwrap();
208 assert_eq!(rec.status, IdempotencyStatus::InProgress);
209 assert!(rec.response.is_none());
210
211 store.complete("k", serde_json::json!(1)).await.unwrap();
212 let rec = store.get("k").await.unwrap().unwrap();
213 assert_eq!(rec.status, IdempotencyStatus::Completed);
214 assert_eq!(rec.response, Some(serde_json::json!(1)));
215 }
216}