Skip to main content

cdk_sql_common/mint/
proofs.rs

1//! Proofs database implementation
2
3use std::collections::HashMap;
4use std::str::FromStr;
5
6use async_trait::async_trait;
7use cdk_common::database::mint::Acquired;
8use cdk_common::database::{self, Error, MintProofsDatabase};
9use cdk_common::mint::{Operation, ProofsWithState};
10use cdk_common::nut00::ProofsMethods;
11use cdk_common::quote_id::QuoteId;
12use cdk_common::secret::Secret;
13use cdk_common::util::unix_time;
14use cdk_common::{Amount, Id, Proof, Proofs, PublicKey, State};
15
16use super::{SQLMintDatabase, SQLTransaction};
17use crate::database::DatabaseExecutor;
18use crate::pool::DatabasePool;
19use crate::stmt::{query, Column};
20use crate::{column_as_nullable_string, column_as_number, column_as_string, unpack_into};
21
22pub(super) async fn get_current_states<C>(
23    conn: &C,
24    ys: &[PublicKey],
25    for_update: bool,
26) -> Result<HashMap<PublicKey, State>, Error>
27where
28    C: DatabaseExecutor + Send + Sync,
29{
30    if ys.is_empty() {
31        return Ok(Default::default());
32    }
33    let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
34
35    query(&format!(
36        r#"SELECT y, state FROM proof WHERE y IN (:ys) {}"#,
37        for_update_clause
38    ))?
39    .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
40    .fetch_all(conn)
41    .await?
42    .into_iter()
43    .map(|row| {
44        Ok((
45            column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
46            column_as_string!(&row[1], State::from_str),
47        ))
48    })
49    .collect::<Result<HashMap<_, _>, _>>()
50}
51
52pub(super) fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
53    unpack_into!(
54        let (
55            amount,
56            keyset_id,
57            secret,
58            c,
59            witness
60        ) = row
61    );
62
63    let amount: u64 = column_as_number!(amount);
64    Ok(Proof {
65        amount: Amount::from(amount),
66        keyset_id: column_as_string!(keyset_id, Id::from_str),
67        secret: column_as_string!(secret, Secret::from_str),
68        c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
69        witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
70        dleq: None,
71    })
72}
73
74pub(super) fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, State), Error> {
75    unpack_into!(
76        let (
77            keyset_id, amount, secret, c, witness, state
78        ) = row
79    );
80
81    let amount: u64 = column_as_number!(amount);
82    let state = column_as_nullable_string!(state)
83        .and_then(|s| State::from_str(&s).ok())
84        .unwrap_or(State::Pending);
85
86    Ok((
87        Proof {
88            amount: Amount::from(amount),
89            keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
90            secret: column_as_string!(secret, Secret::from_str),
91            c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
92            witness: column_as_nullable_string!(witness)
93                .and_then(|w| serde_json::from_str(&w).ok()),
94            dleq: None,
95        },
96        state,
97    ))
98}
99
100pub(super) fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
101    unpack_into!(
102        let (
103            keyset_id, amount
104        ) = row
105    );
106
107    let amount: u64 = column_as_number!(amount);
108    Ok((
109        column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
110        Amount::from(amount),
111    ))
112}
113
114#[async_trait]
115impl<RM> database::MintProofsTransaction for SQLTransaction<RM>
116where
117    RM: DatabasePool + 'static,
118{
119    type Err = Error;
120
121    /// Adds proofs to the database with initial state `Unspent`.
122    ///
123    /// This method first checks if any of the proofs already exist in the database.
124    /// If a proof exists and is spent, returns [`Error::AttemptUpdateSpentProof`].
125    /// If a proof exists in any other state, returns [`Error::Duplicate`].
126    ///
127    /// On success, returns the proofs wrapped in [`Acquired<ProofsWithState>`] with
128    /// state set to `Unspent`, indicating the rows are locked for the duration of
129    /// the transaction.
130    async fn add_proofs(
131        &mut self,
132        proofs: Proofs,
133        quote_id: Option<QuoteId>,
134        operation: &Operation,
135    ) -> Result<Acquired<ProofsWithState>, Self::Err> {
136        let current_time = unix_time();
137
138        // Check any previous proof, this query should return None in order to proceed storing
139        // Any result here would error
140        match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
141            .bind_vec(
142                "ys",
143                proofs
144                    .iter()
145                    .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
146                    .collect::<Result<_, _>>()?,
147            )
148            .pluck(&self.inner)
149            .await?
150            .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
151            .transpose()?
152        {
153            Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
154            Some(_) => Err(database::Error::Duplicate),
155            None => Ok(()), // no previous record
156        }?;
157
158        for proof in &proofs {
159            let y = proof.y()?;
160
161            query(
162                r#"
163                  INSERT INTO proof
164                  (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
165                  VALUES
166                  (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
167                  "#,
168            )?
169            .bind("y", y.to_bytes().to_vec())
170            .bind("amount", proof.amount.to_i64())
171            .bind("keyset_id", proof.keyset_id.to_string())
172            .bind("secret", proof.secret.to_string())
173            .bind("c", proof.c.to_bytes().to_vec())
174            .bind(
175                "witness",
176                proof.witness.clone().and_then(|w| serde_json::to_string(&w).inspect_err(|e| tracing::error!("Failed to serialize witness: {:?}", e)).ok()),
177            )
178            .bind("state", "UNSPENT".to_string())
179            .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
180            .bind("created_time", current_time as i64)
181            .bind("operation_kind", operation.kind().to_string())
182            .bind("operation_id", operation.id().to_string())
183            .execute(&self.inner)
184            .await?;
185        }
186
187        Ok(ProofsWithState::new(proofs, State::Unspent).into())
188    }
189
190    /// Updates all proofs to the given state in the database.
191    ///
192    /// Also updates the `state` field on the [`ProofsWithState`] wrapper to reflect
193    /// the new state after the database update succeeds.
194    ///
195    /// When the new state is `Spent`, this method also updates the `keyset_amounts`
196    /// table to track the total redeemed amount per keyset for analytics purposes.
197    ///
198    /// # Prerequisites
199    ///
200    /// The proofs must have been previously acquired via `add_proofs`
201    /// or `get_proofs` to ensure they are locked within the current transaction.
202    async fn update_proofs_state(
203        &mut self,
204        proofs: &mut Acquired<ProofsWithState>,
205        new_state: State,
206    ) -> Result<(), Self::Err> {
207        let ys = proofs.ys()?;
208
209        query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
210            .bind("new_state", new_state.to_string())
211            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
212            .execute(&self.inner)
213            .await?;
214
215        if new_state == State::Spent {
216            query(
217                    r#"
218                    INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
219                    SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
220                    FROM proof
221                    WHERE y IN (:ys)
222                    GROUP BY keyset_id
223                    ON CONFLICT (keyset_id)
224                    DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
225                    "#,
226                )?
227                .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
228                .execute(&self.inner)
229                .await?;
230        }
231
232        proofs.state = new_state;
233
234        Ok(())
235    }
236
237    async fn remove_proofs(
238        &mut self,
239        ys: &[PublicKey],
240        _quote_id: Option<QuoteId>,
241    ) -> Result<(), Self::Err> {
242        if ys.is_empty() {
243            return Ok(());
244        }
245        let total_deleted = query(
246            r#"
247            DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
248            "#,
249        )?
250        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
251        .bind_vec("exclude_state", vec![State::Spent.to_string()])
252        .execute(&self.inner)
253        .await?;
254
255        if total_deleted != ys.len() {
256            // Query current states to provide detailed logging
257            let current_states = get_current_states(&self.inner, ys, true).await?;
258
259            let missing_count = ys.len() - current_states.len();
260            let spent_count = current_states
261                .values()
262                .filter(|s| **s == State::Spent)
263                .count();
264
265            if missing_count > 0 {
266                tracing::warn!(
267                    "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
268                    missing_count,
269                    ys.len()
270                );
271            }
272
273            if spent_count > 0 {
274                tracing::warn!(
275                    "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
276                    spent_count,
277                    ys.len()
278                );
279            }
280
281            tracing::debug!(
282                "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
283                ys.len(),
284                total_deleted,
285                missing_count,
286                spent_count
287            );
288
289            return Err(Self::Err::AttemptRemoveSpentProof);
290        }
291
292        Ok(())
293    }
294
295    async fn get_proof_ys_by_quote_id(
296        &mut self,
297        quote_id: &QuoteId,
298    ) -> Result<Vec<PublicKey>, Self::Err> {
299        Ok(query(
300            r#"
301            SELECT
302                amount,
303                keyset_id,
304                secret,
305                c,
306                witness
307            FROM
308                proof
309            WHERE
310                quote_id = :quote_id
311            FOR UPDATE
312            "#,
313        )?
314        .bind("quote_id", quote_id.to_string())
315        .fetch_all(&self.inner)
316        .await?
317        .into_iter()
318        .map(sql_row_to_proof)
319        .collect::<Result<Vec<Proof>, _>>()?
320        .ys()?)
321    }
322
323    async fn get_proof_ys_by_operation_id(
324        &mut self,
325        operation_id: &uuid::Uuid,
326    ) -> Result<Vec<PublicKey>, Self::Err> {
327        Ok(query(
328            r#"
329            SELECT
330                y
331            FROM
332                proof
333            WHERE
334                operation_id = :operation_id
335            "#,
336        )?
337        .bind("operation_id", operation_id.to_string())
338        .fetch_all(&self.inner)
339        .await?
340        .into_iter()
341        .map(|row| -> Result<PublicKey, Error> {
342            Ok(column_as_string!(
343                &row[0],
344                PublicKey::from_hex,
345                PublicKey::from_slice
346            ))
347        })
348        .collect::<Result<Vec<_>, _>>()?)
349    }
350
351    async fn get_proofs(
352        &mut self,
353        ys: &[PublicKey],
354    ) -> Result<Acquired<ProofsWithState>, Self::Err> {
355        if ys.is_empty() {
356            return Err(database::Error::ProofNotFound);
357        }
358
359        let rows = query(
360            r#"
361             SELECT
362                 keyset_id,
363                 amount,
364                 secret,
365                 c,
366                 witness,
367                 state
368             FROM
369                 proof
370             WHERE
371                 y IN (:ys)
372             FOR UPDATE
373             "#,
374        )?
375        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
376        .fetch_all(&self.inner)
377        .await?;
378
379        if rows.is_empty() || rows.len() != ys.len() {
380            return Err(database::Error::ProofNotFound);
381        }
382
383        let results: Vec<(Proof, State)> = rows
384            .into_iter()
385            .map(sql_row_to_proof_with_state)
386            .collect::<Result<Vec<_>, _>>()?;
387
388        let mut proofs = Vec::with_capacity(results.len());
389        let mut first_state: Option<State> = None;
390
391        for (proof, state) in results {
392            if let Some(first) = first_state {
393                if first != state {
394                    return Err(database::Error::Internal(
395                        "Proofs have inconsistent states".to_string(),
396                    ));
397                }
398            } else {
399                first_state = Some(state);
400            }
401
402            proofs.push(proof);
403        }
404
405        let state = first_state.unwrap_or(State::Unspent);
406        Ok(ProofsWithState::new(proofs, state).into())
407    }
408}
409
410#[async_trait]
411impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
412where
413    RM: DatabasePool + 'static,
414{
415    type Err = Error;
416
417    async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
418        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
419        let mut proofs = query(
420            r#"
421            SELECT
422                amount,
423                keyset_id,
424                secret,
425                c,
426                witness,
427                y
428            FROM
429                proof
430            WHERE
431                y IN (:ys)
432            "#,
433        )?
434        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
435        .fetch_all(&*conn)
436        .await?
437        .into_iter()
438        .map(|mut row| {
439            Ok((
440                column_as_string!(
441                    row.pop().ok_or(Error::InvalidDbResponse)?,
442                    PublicKey::from_hex,
443                    PublicKey::from_slice
444                ),
445                sql_row_to_proof(row)?,
446            ))
447        })
448        .collect::<Result<HashMap<_, _>, Error>>()?;
449
450        Ok(ys.iter().map(|y| proofs.remove(y)).collect())
451    }
452
453    async fn get_proof_ys_by_quote_id(
454        &self,
455        quote_id: &QuoteId,
456    ) -> Result<Vec<PublicKey>, Self::Err> {
457        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
458        Ok(query(
459            r#"
460            SELECT
461                amount,
462                keyset_id,
463                secret,
464                c,
465                witness
466            FROM
467                proof
468            WHERE
469                quote_id = :quote_id
470            "#,
471        )?
472        .bind("quote_id", quote_id.to_string())
473        .fetch_all(&*conn)
474        .await?
475        .into_iter()
476        .map(sql_row_to_proof)
477        .collect::<Result<Vec<Proof>, _>>()?
478        .ys()?)
479    }
480
481    async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
482        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
483        let mut current_states = get_current_states(&*conn, ys, false).await?;
484
485        Ok(ys.iter().map(|y| current_states.remove(y)).collect())
486    }
487
488    async fn get_proofs_by_keyset_id(
489        &self,
490        keyset_id: &Id,
491    ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
492        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
493
494        let (proofs, states): (Vec<Proof>, Vec<State>) = query(
495            r#"
496            SELECT
497               keyset_id,
498               amount,
499               secret,
500               c,
501               witness,
502               state
503            FROM
504                proof
505            WHERE
506                keyset_id=:keyset_id
507            "#,
508        )?
509        .bind("keyset_id", keyset_id.to_string())
510        .fetch_all(&*conn)
511        .await?
512        .into_iter()
513        .map(sql_row_to_proof_with_state)
514        .collect::<Result<Vec<_>, _>>()?
515        .into_iter()
516        .unzip();
517
518        Ok((proofs, states.into_iter().map(Some).collect()))
519    }
520
521    /// Get total proofs redeemed by keyset id
522    async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
523        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
524        query(
525            r#"
526            SELECT
527                keyset_id,
528                total_redeemed as amount
529            FROM
530                keyset_amounts
531        "#,
532        )?
533        .fetch_all(&*conn)
534        .await?
535        .into_iter()
536        .map(sql_row_to_hashmap_amount)
537        .collect()
538    }
539
540    async fn get_proof_ys_by_operation_id(
541        &self,
542        operation_id: &uuid::Uuid,
543    ) -> Result<Vec<PublicKey>, Self::Err> {
544        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
545        query(
546            r#"
547            SELECT
548                y
549            FROM
550                proof
551            WHERE
552                operation_id = :operation_id
553            "#,
554        )?
555        .bind("operation_id", operation_id.to_string())
556        .fetch_all(&*conn)
557        .await?
558        .into_iter()
559        .map(|row| -> Result<PublicKey, Error> {
560            Ok(column_as_string!(
561                &row[0],
562                PublicKey::from_hex,
563                PublicKey::from_slice
564            ))
565        })
566        .collect::<Result<Vec<_>, _>>()
567    }
568}