Skip to main content

codlet_sqlx/
token.rs

1//! SQLite implementation of [`codlet_core::store::token::FormTokenStore`].
2
3use codlet_core::hashing::LookupKey;
4use codlet_core::state::{TokenConsumeOutcome, classify_token_consume};
5use codlet_core::store::error::StoreError;
6use codlet_core::store::token::{FormTokenRecord, FormTokenStore, TokenSubject};
7
8use crate::SqliteStore;
9
10impl FormTokenStore for SqliteStore {
11    async fn insert_form_token(&self, record: FormTokenRecord) -> Result<(), StoreError> {
12        sqlx::query(
13            "INSERT INTO codlet_form_tokens
14             (lookup_key, key_version, subject_kind, purpose, bound_resource, issued_at, expires_at)
15             VALUES (?, ?, ?, ?, ?, ?, ?)",
16        )
17        .bind(record.lookup_key.as_str())
18        .bind(record.key_version.as_str())
19        .bind(record.subject.as_binding_str())
20        .bind(&record.purpose)
21        .bind(record.bound_resource.as_deref())
22        .bind(record.issued_at as i64)
23        .bind(record.expires_at as i64)
24        .execute(&self.pool)
25        .await
26        .map_err(|e| StoreError::Backend(e.to_string()))?;
27        Ok(())
28    }
29
30    async fn consume_form_token(
31        &self,
32        candidates: &[LookupKey],
33        subject: &TokenSubject,
34        purpose: &str,
35        bound_resource: Option<&str>,
36        now: u64,
37    ) -> Result<(TokenConsumeOutcome, Option<String>), StoreError> {
38        let lookup_key = candidates.first().expect("at least one candidate");
39        let now_i = now as i64;
40        let lk = lookup_key.as_str();
41        let subj = subject.as_binding_str();
42        let br = bound_resource.unwrap_or("");
43
44        // Atomic conditional UPDATE — the single-winner transition (RFC-022).
45        let update = sqlx::query(
46            "UPDATE codlet_form_tokens
47             SET consumed_at = ?
48             WHERE lookup_key    = ?
49               AND subject_kind  = ?
50               AND purpose       = ?
51               AND COALESCE(bound_resource, '') = ?
52               AND expires_at    > ?
53               AND consumed_at   IS NULL",
54        )
55        .bind(now_i)
56        .bind(lk)
57        .bind(&subj)
58        .bind(purpose)
59        .bind(br)
60        .bind(now_i)
61        .execute(&self.pool)
62        .await
63        .map_err(|e| StoreError::Backend(e.to_string()))?;
64
65        let changed = update.rows_affected() as usize;
66        if changed > 1 {
67            return Err(StoreError::InvariantViolation(format!(
68                "consume_form_token changed {changed} rows for lookup_key={lk}"
69            )));
70        }
71        if changed == 1 {
72            return Ok((TokenConsumeOutcome::Proceed, None));
73        }
74
75        // changed == 0: follow-up SELECT to classify why (no race-sensitive write).
76        let row: Option<(Option<i64>, Option<String>, Option<String>)> = sqlx::query_as(
77            "SELECT consumed_at, result_ref, bound_resource
78             FROM codlet_form_tokens
79             WHERE lookup_key = ? AND subject_kind = ? AND purpose = ?
80             LIMIT 1",
81        )
82        .bind(lk)
83        .bind(&subj)
84        .bind(purpose)
85        .fetch_optional(&self.pool)
86        .await
87        .map_err(|e| StoreError::Backend(e.to_string()))?;
88
89        let found = row.is_some();
90        let (already_consumed, stored_rr, stored_br) = row
91            .map(|(ca, rr, b)| (ca.is_some(), rr, b))
92            .unwrap_or((false, None, None));
93
94        let binding_ok = match bound_resource {
95            Some(expected) => stored_br.as_deref().unwrap_or("") == expected,
96            None => true,
97        };
98
99        let outcome = classify_token_consume(0, found, already_consumed, binding_ok);
100        let result_ref = if outcome == TokenConsumeOutcome::Replay {
101            stored_rr
102        } else {
103            None
104        };
105        Ok((outcome, result_ref))
106    }
107
108    async fn set_token_result(
109        &self,
110        candidates: &[LookupKey],
111        result_ref: &str,
112    ) -> Result<(), StoreError> {
113        let lookup_key = candidates.first().expect("at least one candidate");
114        sqlx::query(
115            "UPDATE codlet_form_tokens
116             SET result_ref = ?
117             WHERE lookup_key = ? AND consumed_at IS NOT NULL",
118        )
119        .bind(result_ref)
120        .bind(lookup_key.as_str())
121        .execute(&self.pool)
122        .await
123        .map_err(|e| StoreError::Backend(e.to_string()))?;
124        Ok(())
125    }
126}