Skip to main content

cdk_sql_common/mint/
proofs.rs

1//! Proofs database implementation
2
3use std::collections::HashMap;
4use std::str::FromStr;
5
6use async_trait::async_trait;
7use cdk_common::database::mint::Acquired;
8use cdk_common::database::{self, Error, MintProofsDatabase};
9use cdk_common::mint::{Operation, ProofsWithState};
10use cdk_common::nut00::ProofsMethods;
11use cdk_common::quote_id::QuoteId;
12use cdk_common::secret::Secret;
13use cdk_common::util::unix_time;
14use cdk_common::{Amount, Id, Proof, Proofs, PublicKey, State};
15
16use super::{SQLMintDatabase, SQLTransaction};
17use crate::database::DatabaseExecutor;
18use crate::pool::DatabasePool;
19use crate::stmt::{query, Column};
20use crate::{column_as_nullable_string, column_as_number, column_as_string, unpack_into};
21
22pub(super) async fn get_current_states<C>(
23    conn: &C,
24    ys: &[PublicKey],
25    for_update: bool,
26) -> Result<HashMap<PublicKey, State>, Error>
27where
28    C: DatabaseExecutor + Send + Sync,
29{
30    if ys.is_empty() {
31        return Ok(Default::default());
32    }
33    let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
34
35    query(&format!(
36        r#"SELECT y, state FROM proof WHERE y IN (:ys) {}"#,
37        for_update_clause
38    ))?
39    .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
40    .fetch_all(conn)
41    .await?
42    .into_iter()
43    .map(|row| {
44        Ok((
45            column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
46            column_as_string!(&row[1], State::from_str),
47        ))
48    })
49    .collect::<Result<HashMap<_, _>, _>>()
50}
51
52pub(super) fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
53    unpack_into!(
54        let (
55            amount,
56            keyset_id,
57            secret,
58            c,
59            witness
60        ) = row
61    );
62
63    let amount: u64 = column_as_number!(amount);
64    Ok(Proof {
65        amount: Amount::from(amount),
66        keyset_id: column_as_string!(keyset_id, Id::from_str),
67        secret: column_as_string!(secret, Secret::from_str),
68        c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
69        witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
70        dleq: None,
71        p2pk_e: None,
72    })
73}
74
75pub(super) fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, State), Error> {
76    unpack_into!(
77        let (
78            keyset_id, amount, secret, c, witness, state
79        ) = row
80    );
81
82    let amount: u64 = column_as_number!(amount);
83    let state = column_as_nullable_string!(state)
84        .and_then(|s| State::from_str(&s).ok())
85        .unwrap_or(State::Pending);
86
87    Ok((
88        Proof {
89            amount: Amount::from(amount),
90            keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
91            secret: column_as_string!(secret, Secret::from_str),
92            c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
93            witness: column_as_nullable_string!(witness)
94                .and_then(|w| serde_json::from_str(&w).ok()),
95            dleq: None,
96            p2pk_e: None,
97        },
98        state,
99    ))
100}
101
102pub(super) fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
103    unpack_into!(
104        let (
105            keyset_id, amount
106        ) = row
107    );
108
109    let amount: u64 = column_as_number!(amount);
110    Ok((
111        column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
112        Amount::from(amount),
113    ))
114}
115
116#[async_trait]
117impl<RM> database::MintProofsTransaction for SQLTransaction<RM>
118where
119    RM: DatabasePool + 'static,
120{
121    type Err = Error;
122
123    /// Adds proofs to the database with initial state `Unspent`.
124    ///
125    /// This method first checks if any of the proofs already exist in the database.
126    /// If a proof exists and is spent, returns [`Error::AttemptUpdateSpentProof`].
127    /// If a proof exists in any other state, returns [`Error::Duplicate`].
128    ///
129    /// On success, returns the proofs wrapped in [`Acquired<ProofsWithState>`] with
130    /// state set to `Unspent`, indicating the rows are locked for the duration of
131    /// the transaction.
132    async fn add_proofs(
133        &mut self,
134        proofs: Proofs,
135        quote_id: Option<QuoteId>,
136        operation: &Operation,
137    ) -> Result<Acquired<ProofsWithState>, Self::Err> {
138        let current_time = unix_time();
139
140        // Check any previous proof, this query should return None in order to proceed storing
141        // Any result here would error
142        match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
143            .bind_vec(
144                "ys",
145                proofs
146                    .iter()
147                    .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
148                    .collect::<Result<_, _>>()?,
149            )
150            .pluck(&self.inner)
151            .await?
152            .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
153            .transpose()?
154        {
155            Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
156            Some(_) => Err(database::Error::Duplicate),
157            None => Ok(()), // no previous record
158        }?;
159
160        for proof in &proofs {
161            let y = proof.y()?;
162
163            query(
164                r#"
165                  INSERT INTO proof
166                  (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
167                  VALUES
168                  (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
169                  "#,
170            )?
171            .bind("y", y.to_bytes().to_vec())
172            .bind("amount", proof.amount.to_i64())
173            .bind("keyset_id", proof.keyset_id.to_string())
174            .bind("secret", proof.secret.to_string())
175            .bind("c", proof.c.to_bytes().to_vec())
176            .bind(
177                "witness",
178                proof.witness.clone().and_then(|w| serde_json::to_string(&w).inspect_err(|e| tracing::error!("Failed to serialize witness: {:?}", e)).ok()),
179            )
180            .bind("state", "UNSPENT".to_string())
181            .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
182            .bind("created_time", current_time as i64)
183            .bind("operation_kind", operation.kind().to_string())
184            .bind("operation_id", operation.id().to_string())
185            .execute(&self.inner)
186            .await?;
187        }
188
189        Ok(ProofsWithState::new(proofs, State::Unspent).into())
190    }
191
192    /// Updates all proofs to the given state in the database.
193    ///
194    /// Also updates the `state` field on the [`ProofsWithState`] wrapper to reflect
195    /// the new state after the database update succeeds.
196    ///
197    /// When the new state is `Spent`, this method also updates the `keyset_amounts`
198    /// table to track the total redeemed amount per keyset for analytics purposes.
199    ///
200    /// # Prerequisites
201    ///
202    /// The proofs must have been previously acquired via `add_proofs`
203    /// or `get_proofs` to ensure they are locked within the current transaction.
204    async fn update_proofs_state(
205        &mut self,
206        proofs: &mut Acquired<ProofsWithState>,
207        new_state: State,
208    ) -> Result<(), Self::Err> {
209        let ys = proofs.ys()?;
210
211        query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
212            .bind("new_state", new_state.to_string())
213            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
214            .execute(&self.inner)
215            .await?;
216
217        if new_state == State::Spent {
218            query(
219                    r#"
220                    INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
221                    SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
222                    FROM proof
223                    WHERE y IN (:ys)
224                    GROUP BY keyset_id
225                    ON CONFLICT (keyset_id)
226                    DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
227                    "#,
228                )?
229                .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
230                .execute(&self.inner)
231                .await?;
232        }
233
234        proofs.state = new_state;
235
236        Ok(())
237    }
238
239    async fn remove_proofs(
240        &mut self,
241        ys: &[PublicKey],
242        _quote_id: Option<QuoteId>,
243    ) -> Result<(), Self::Err> {
244        if ys.is_empty() {
245            return Ok(());
246        }
247        let total_deleted = query(
248            r#"
249            DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
250            "#,
251        )?
252        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
253        .bind_vec("exclude_state", vec![State::Spent.to_string()])
254        .execute(&self.inner)
255        .await?;
256
257        if total_deleted != ys.len() {
258            // Query current states to provide detailed logging
259            let current_states = get_current_states(&self.inner, ys, true).await?;
260
261            let missing_count = ys.len() - current_states.len();
262            let spent_count = current_states
263                .values()
264                .filter(|s| **s == State::Spent)
265                .count();
266
267            if missing_count > 0 {
268                tracing::warn!(
269                    "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
270                    missing_count,
271                    ys.len()
272                );
273            }
274
275            if spent_count > 0 {
276                tracing::warn!(
277                    "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
278                    spent_count,
279                    ys.len()
280                );
281            }
282
283            tracing::debug!(
284                "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
285                ys.len(),
286                total_deleted,
287                missing_count,
288                spent_count
289            );
290
291            return Err(Self::Err::AttemptRemoveSpentProof);
292        }
293
294        Ok(())
295    }
296
297    async fn get_proof_ys_by_quote_id(
298        &mut self,
299        quote_id: &QuoteId,
300    ) -> Result<Vec<PublicKey>, Self::Err> {
301        Ok(query(
302            r#"
303            SELECT
304                amount,
305                keyset_id,
306                secret,
307                c,
308                witness
309            FROM
310                proof
311            WHERE
312                quote_id = :quote_id
313            FOR UPDATE
314            "#,
315        )?
316        .bind("quote_id", quote_id.to_string())
317        .fetch_all(&self.inner)
318        .await?
319        .into_iter()
320        .map(sql_row_to_proof)
321        .collect::<Result<Vec<Proof>, _>>()?
322        .ys()?)
323    }
324
325    async fn get_proof_ys_by_operation_id(
326        &mut self,
327        operation_id: &uuid::Uuid,
328    ) -> Result<Vec<PublicKey>, Self::Err> {
329        Ok(query(
330            r#"
331            SELECT
332                y
333            FROM
334                proof
335            WHERE
336                operation_id = :operation_id
337            "#,
338        )?
339        .bind("operation_id", operation_id.to_string())
340        .fetch_all(&self.inner)
341        .await?
342        .into_iter()
343        .map(|row| -> Result<PublicKey, Error> {
344            Ok(column_as_string!(
345                &row[0],
346                PublicKey::from_hex,
347                PublicKey::from_slice
348            ))
349        })
350        .collect::<Result<Vec<_>, _>>()?)
351    }
352
353    async fn get_proofs(
354        &mut self,
355        ys: &[PublicKey],
356    ) -> Result<Acquired<ProofsWithState>, Self::Err> {
357        if ys.is_empty() {
358            return Err(database::Error::ProofNotFound);
359        }
360
361        let rows = query(
362            r#"
363             SELECT
364                 keyset_id,
365                 amount,
366                 secret,
367                 c,
368                 witness,
369                 state
370             FROM
371                 proof
372             WHERE
373                 y IN (:ys)
374             FOR UPDATE
375             "#,
376        )?
377        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
378        .fetch_all(&self.inner)
379        .await?;
380
381        if rows.is_empty() || rows.len() != ys.len() {
382            return Err(database::Error::ProofNotFound);
383        }
384
385        let results: Vec<(Proof, State)> = rows
386            .into_iter()
387            .map(sql_row_to_proof_with_state)
388            .collect::<Result<Vec<_>, _>>()?;
389
390        let mut proofs = Vec::with_capacity(results.len());
391        let mut first_state: Option<State> = None;
392
393        for (proof, state) in results {
394            if let Some(first) = first_state {
395                if first != state {
396                    return Err(database::Error::Internal(
397                        "Proofs have inconsistent states".to_string(),
398                    ));
399                }
400            } else {
401                first_state = Some(state);
402            }
403
404            proofs.push(proof);
405        }
406
407        let state = first_state.unwrap_or(State::Unspent);
408        Ok(ProofsWithState::new(proofs, state).into())
409    }
410}
411
412#[async_trait]
413impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
414where
415    RM: DatabasePool + 'static,
416{
417    type Err = Error;
418
419    async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
420        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
421        let mut proofs = query(
422            r#"
423            SELECT
424                amount,
425                keyset_id,
426                secret,
427                c,
428                witness,
429                y
430            FROM
431                proof
432            WHERE
433                y IN (:ys)
434            "#,
435        )?
436        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
437        .fetch_all(&*conn)
438        .await?
439        .into_iter()
440        .map(|mut row| {
441            Ok((
442                column_as_string!(
443                    row.pop().ok_or(Error::InvalidDbResponse)?,
444                    PublicKey::from_hex,
445                    PublicKey::from_slice
446                ),
447                sql_row_to_proof(row)?,
448            ))
449        })
450        .collect::<Result<HashMap<_, _>, Error>>()?;
451
452        Ok(ys.iter().map(|y| proofs.remove(y)).collect())
453    }
454
455    async fn get_proof_ys_by_quote_id(
456        &self,
457        quote_id: &QuoteId,
458    ) -> Result<Vec<PublicKey>, Self::Err> {
459        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
460        Ok(query(
461            r#"
462            SELECT
463                amount,
464                keyset_id,
465                secret,
466                c,
467                witness
468            FROM
469                proof
470            WHERE
471                quote_id = :quote_id
472            "#,
473        )?
474        .bind("quote_id", quote_id.to_string())
475        .fetch_all(&*conn)
476        .await?
477        .into_iter()
478        .map(sql_row_to_proof)
479        .collect::<Result<Vec<Proof>, _>>()?
480        .ys()?)
481    }
482
483    async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
484        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
485        let mut current_states = get_current_states(&*conn, ys, false).await?;
486
487        Ok(ys.iter().map(|y| current_states.remove(y)).collect())
488    }
489
490    async fn get_proofs_by_keyset_id(
491        &self,
492        keyset_id: &Id,
493    ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
494        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
495
496        let (proofs, states): (Vec<Proof>, Vec<State>) = query(
497            r#"
498            SELECT
499               keyset_id,
500               amount,
501               secret,
502               c,
503               witness,
504               state
505            FROM
506                proof
507            WHERE
508                keyset_id=:keyset_id
509            "#,
510        )?
511        .bind("keyset_id", keyset_id.to_string())
512        .fetch_all(&*conn)
513        .await?
514        .into_iter()
515        .map(sql_row_to_proof_with_state)
516        .collect::<Result<Vec<_>, _>>()?
517        .into_iter()
518        .unzip();
519
520        Ok((proofs, states.into_iter().map(Some).collect()))
521    }
522
523    /// Get total proofs redeemed by keyset id
524    async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
525        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
526        query(
527            r#"
528            SELECT
529                keyset_id,
530                total_redeemed as amount
531            FROM
532                keyset_amounts
533        "#,
534        )?
535        .fetch_all(&*conn)
536        .await?
537        .into_iter()
538        .map(sql_row_to_hashmap_amount)
539        .collect()
540    }
541
542    async fn get_proof_ys_by_operation_id(
543        &self,
544        operation_id: &uuid::Uuid,
545    ) -> Result<Vec<PublicKey>, Self::Err> {
546        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
547        query(
548            r#"
549            SELECT
550                y
551            FROM
552                proof
553            WHERE
554                operation_id = :operation_id
555            "#,
556        )?
557        .bind("operation_id", operation_id.to_string())
558        .fetch_all(&*conn)
559        .await?
560        .into_iter()
561        .map(|row| -> Result<PublicKey, Error> {
562            Ok(column_as_string!(
563                &row[0],
564                PublicKey::from_hex,
565                PublicKey::from_slice
566            ))
567        })
568        .collect::<Result<Vec<_>, _>>()
569    }
570}