#[cfg(feature = "redis")]
pub mod redis;
#[cfg(feature = "mongodb")]
pub mod mongo;
use async_trait::async_trait;
use klauthed_core::time::Timestamp;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Mutex;
use crate::error::DataError;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum IdempotencyStatus {
InProgress,
Completed,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct IdempotencyRecord {
pub key: String,
pub status: IdempotencyStatus,
pub response: Option<serde_json::Value>,
pub created_at: Timestamp,
pub updated_at: Timestamp,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Outcome {
New,
InProgress,
Completed(serde_json::Value),
}
#[async_trait]
pub trait IdempotencyStore: Send + Sync {
async fn begin(&self, key: &str) -> Result<Outcome, DataError>;
async fn complete(&self, key: &str, response: serde_json::Value) -> Result<(), DataError>;
async fn get(&self, key: &str) -> Result<Option<IdempotencyRecord>, DataError>;
}
#[derive(Default)]
pub struct InMemoryIdempotencyStore {
records: Mutex<HashMap<String, IdempotencyRecord>>,
}
impl InMemoryIdempotencyStore {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl IdempotencyStore for InMemoryIdempotencyStore {
async fn begin(&self, key: &str) -> Result<Outcome, DataError> {
let now = Timestamp::now();
let mut guard = self.records.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
match guard.get(key) {
Some(record) => match record.status {
IdempotencyStatus::InProgress => Ok(Outcome::InProgress),
IdempotencyStatus::Completed => {
let response = record.response.clone().unwrap_or(serde_json::Value::Null);
Ok(Outcome::Completed(response))
}
},
None => {
guard.insert(
key.to_owned(),
IdempotencyRecord {
key: key.to_owned(),
status: IdempotencyStatus::InProgress,
response: None,
created_at: now,
updated_at: now,
},
);
Ok(Outcome::New)
}
}
}
async fn complete(&self, key: &str, response: serde_json::Value) -> Result<(), DataError> {
let now = Timestamp::now();
let mut guard = self.records.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
match guard.get_mut(key) {
Some(record) => {
record.status = IdempotencyStatus::Completed;
record.response = Some(response);
record.updated_at = now;
Ok(())
}
None => Err(DataError::Idempotency(format!(
"cannot complete unknown idempotency key '{key}'"
))),
}
}
async fn get(&self, key: &str) -> Result<Option<IdempotencyRecord>, DataError> {
Ok(self.records.lock().unwrap_or_else(std::sync::PoisonError::into_inner).get(key).cloned())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn new_then_in_progress_then_completed_replay() {
let store = InMemoryIdempotencyStore::new();
assert_eq!(store.begin("k").await.unwrap(), Outcome::New);
assert_eq!(store.begin("k").await.unwrap(), Outcome::InProgress);
let response = serde_json::json!({ "charged": true, "amount": 100 });
store.complete("k", response.clone()).await.unwrap();
assert_eq!(store.begin("k").await.unwrap(), Outcome::Completed(response));
}
#[tokio::test]
async fn distinct_keys_are_independent() {
let store = InMemoryIdempotencyStore::new();
assert_eq!(store.begin("a").await.unwrap(), Outcome::New);
assert_eq!(store.begin("b").await.unwrap(), Outcome::New);
}
#[tokio::test]
async fn complete_unknown_key_errors() {
let store = InMemoryIdempotencyStore::new();
let err = store.complete("missing", serde_json::Value::Null).await.unwrap_err();
assert!(matches!(err, DataError::Idempotency(_)));
}
#[tokio::test]
async fn get_exposes_record_lifecycle() {
let store = InMemoryIdempotencyStore::new();
store.begin("k").await.unwrap();
let rec = store.get("k").await.unwrap().unwrap();
assert_eq!(rec.status, IdempotencyStatus::InProgress);
assert!(rec.response.is_none());
store.complete("k", serde_json::json!(1)).await.unwrap();
let rec = store.get("k").await.unwrap().unwrap();
assert_eq!(rec.status, IdempotencyStatus::Completed);
assert_eq!(rec.response, Some(serde_json::json!(1)));
}
}