cratestack_sqlx/
idempotency.rs1mod operations;
13
14use std::time::SystemTime;
15
16use async_trait::async_trait;
17use cratestack_axum::idempotency::{IDEMPOTENCY_TABLE_DDL, IdempotencyStore, ReservationOutcome};
18use cratestack_core::CoolError;
19
20use crate::sqlx;
21
22#[derive(Clone)]
23pub struct SqlxIdempotencyStore {
24 pool: sqlx::PgPool,
25}
26
27impl SqlxIdempotencyStore {
28 pub fn new(pool: sqlx::PgPool) -> Self {
29 Self { pool }
30 }
31
32 pub async fn ensure_schema(&self) -> Result<(), CoolError> {
35 for statement in IDEMPOTENCY_TABLE_DDL
39 .split(';')
40 .map(str::trim)
41 .filter(|s| !s.is_empty())
42 {
43 sqlx::query(statement)
44 .execute(&self.pool)
45 .await
46 .map_err(|error| CoolError::Database(error.to_string()))?;
47 }
48 Ok(())
49 }
50
51 pub async fn garbage_collect(&self) -> Result<u64, CoolError> {
55 let result = sqlx::query("DELETE FROM cratestack_idempotency WHERE expires_at < NOW()")
56 .execute(&self.pool)
57 .await
58 .map_err(|error| CoolError::Database(error.to_string()))?;
59 Ok(result.rows_affected())
60 }
61}
62
63#[async_trait]
64impl IdempotencyStore for SqlxIdempotencyStore {
65 async fn reserve_or_fetch(
66 &self,
67 principal: &str,
68 key: &str,
69 request_hash: [u8; 32],
70 expires_at: SystemTime,
71 ) -> Result<ReservationOutcome, CoolError> {
72 operations::reserve_or_fetch(&self.pool, principal, key, request_hash, expires_at).await
73 }
74
75 async fn complete(
76 &self,
77 principal: &str,
78 key: &str,
79 token: uuid::Uuid,
80 status: u16,
81 headers: &[u8],
82 body: &[u8],
83 ) -> Result<(), CoolError> {
84 operations::complete(&self.pool, principal, key, token, status, headers, body).await
85 }
86
87 async fn release(
88 &self,
89 principal: &str,
90 key: &str,
91 token: uuid::Uuid,
92 ) -> Result<(), CoolError> {
93 operations::release(&self.pool, principal, key, token).await
94 }
95}
96
97pub fn expiry_from(created_at: SystemTime, ttl: std::time::Duration) -> SystemTime {
102 created_at + ttl
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108 use std::time::{Duration, SystemTime};
109
110 #[test]
111 fn expiry_adds_ttl_to_creation() {
112 let now = SystemTime::UNIX_EPOCH;
113 let expiry = expiry_from(now, Duration::from_secs(24 * 3600));
114 assert_eq!(
115 expiry.duration_since(SystemTime::UNIX_EPOCH).unwrap(),
116 Duration::from_secs(24 * 3600),
117 );
118 }
119}