Skip to main content

cratestack_sqlx/
idempotency.rs

1//! Postgres-backed [`IdempotencyStore`]. Banks need duplicate-execution
2//! protection even under concurrency, so this uses the atomic-reservation
3//! pattern: a single upsert claims the key (or surfaces the existing
4//! claim), the middleware runs the handler only when it owns the
5//! reservation, then writes the captured response with `complete`.
6//!
7//! Expired rows are replaced on the next reservation via the
8//! `ON CONFLICT DO UPDATE WHERE cratestack_idempotency.expires_at <= NOW()`
9//! clause, letting the new caller take over a stale row in the same
10//! statement that would otherwise have hit the unique-key wall.
11
12mod operations;
13
14use std::time::SystemTime;
15
16use async_trait::async_trait;
17use cratestack_axum::idempotency::{IDEMPOTENCY_TABLE_DDL, IdempotencyStore, ReservationOutcome};
18use cratestack_core::CoolError;
19
20use crate::sqlx;
21
22#[derive(Clone)]
23pub struct SqlxIdempotencyStore {
24    pool: sqlx::PgPool,
25}
26
27impl SqlxIdempotencyStore {
28    pub fn new(pool: sqlx::PgPool) -> Self {
29        Self { pool }
30    }
31
32    /// Ensure the table exists. Banks typically run this via their own
33    /// migration tooling; exposed here for convenience.
34    pub async fn ensure_schema(&self) -> Result<(), CoolError> {
35        // Multi-statement DDL (table + index) — prepared statements
36        // only accept one statement at a time, so split + execute
37        // sequentially.
38        for statement in IDEMPOTENCY_TABLE_DDL
39            .split(';')
40            .map(str::trim)
41            .filter(|s| !s.is_empty())
42        {
43            sqlx::query(statement)
44                .execute(&self.pool)
45                .await
46                .map_err(|error| CoolError::Database(error.to_string()))?;
47        }
48        Ok(())
49    }
50
51    /// Delete expired rows. Run periodically — the request path does
52    /// not auto-GC, although `reserve_or_fetch` does take over any
53    /// single expired row it tries to claim.
54    pub async fn garbage_collect(&self) -> Result<u64, CoolError> {
55        let result = sqlx::query("DELETE FROM cratestack_idempotency WHERE expires_at < NOW()")
56            .execute(&self.pool)
57            .await
58            .map_err(|error| CoolError::Database(error.to_string()))?;
59        Ok(result.rows_affected())
60    }
61}
62
63#[async_trait]
64impl IdempotencyStore for SqlxIdempotencyStore {
65    async fn reserve_or_fetch(
66        &self,
67        principal: &str,
68        key: &str,
69        request_hash: [u8; 32],
70        expires_at: SystemTime,
71    ) -> Result<ReservationOutcome, CoolError> {
72        operations::reserve_or_fetch(&self.pool, principal, key, request_hash, expires_at).await
73    }
74
75    async fn complete(
76        &self,
77        principal: &str,
78        key: &str,
79        token: uuid::Uuid,
80        status: u16,
81        headers: &[u8],
82        body: &[u8],
83    ) -> Result<(), CoolError> {
84        operations::complete(&self.pool, principal, key, token, status, headers, body).await
85    }
86
87    async fn release(
88        &self,
89        principal: &str,
90        key: &str,
91        token: uuid::Uuid,
92    ) -> Result<(), CoolError> {
93        operations::release(&self.pool, principal, key, token).await
94    }
95}
96
97/// Compute when a record originally captured at `created_at` will
98/// expire. Pulled out for unit-test reach; the SystemTime arithmetic
99/// is otherwise awkward to assert against without a clock injection
100/// point.
101pub fn expiry_from(created_at: SystemTime, ttl: std::time::Duration) -> SystemTime {
102    created_at + ttl
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108    use std::time::{Duration, SystemTime};
109
110    #[test]
111    fn expiry_adds_ttl_to_creation() {
112        let now = SystemTime::UNIX_EPOCH;
113        let expiry = expiry_from(now, Duration::from_secs(24 * 3600));
114        assert_eq!(
115            expiry.duration_since(SystemTime::UNIX_EPOCH).unwrap(),
116            Duration::from_secs(24 * 3600),
117        );
118    }
119}