Skip to main content

cdk_sql_common/mint/
proofs.rs

1//! Proofs database implementation
2
3use std::collections::HashMap;
4use std::str::FromStr;
5
6use async_trait::async_trait;
7use cdk_common::database::mint::Acquired;
8use cdk_common::database::{self, Error, MintProofsDatabase};
9use cdk_common::mint::{Operation, ProofsWithState};
10use cdk_common::nut00::ProofsMethods;
11use cdk_common::quote_id::QuoteId;
12use cdk_common::secret::Secret;
13use cdk_common::util::unix_time;
14use cdk_common::{Amount, Id, Proof, Proofs, PublicKey, State};
15
16use super::{SQLMintDatabase, SQLTransaction};
17use crate::database::DatabaseExecutor;
18use crate::pool::DatabasePool;
19use crate::stmt::{query, Column};
20use crate::{column_as_nullable_string, column_as_number, column_as_string, unpack_into};
21
22pub(super) async fn get_current_states<C>(
23    conn: &C,
24    ys: &[PublicKey],
25    for_update: bool,
26) -> Result<HashMap<PublicKey, State>, Error>
27where
28    C: DatabaseExecutor + Send + Sync,
29{
30    let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
31
32    query(&format!(
33        r#"SELECT y, state FROM proof WHERE y IN (:ys) {}"#,
34        for_update_clause
35    ))?
36    .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
37    .fetch_all(conn)
38    .await?
39    .into_iter()
40    .map(|row| {
41        Ok((
42            column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
43            column_as_string!(&row[1], State::from_str),
44        ))
45    })
46    .collect::<Result<HashMap<_, _>, _>>()
47}
48
49pub(super) fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
50    unpack_into!(
51        let (
52            amount,
53            keyset_id,
54            secret,
55            c,
56            witness
57        ) = row
58    );
59
60    let amount: u64 = column_as_number!(amount);
61    Ok(Proof {
62        amount: Amount::from(amount),
63        keyset_id: column_as_string!(keyset_id, Id::from_str),
64        secret: column_as_string!(secret, Secret::from_str),
65        c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
66        witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
67        dleq: None,
68        p2pk_e: None,
69    })
70}
71
72pub(super) fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, State), Error> {
73    unpack_into!(
74        let (
75            keyset_id, amount, secret, c, witness, state
76        ) = row
77    );
78
79    let amount: u64 = column_as_number!(amount);
80    let state = column_as_nullable_string!(state)
81        .and_then(|s| State::from_str(&s).ok())
82        .unwrap_or(State::Pending);
83
84    Ok((
85        Proof {
86            amount: Amount::from(amount),
87            keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
88            secret: column_as_string!(secret, Secret::from_str),
89            c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
90            witness: column_as_nullable_string!(witness)
91                .and_then(|w| serde_json::from_str(&w).ok()),
92            dleq: None,
93            p2pk_e: None,
94        },
95        state,
96    ))
97}
98
99pub(super) fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
100    unpack_into!(
101        let (
102            keyset_id, amount
103        ) = row
104    );
105
106    let amount: u64 = column_as_number!(amount);
107    Ok((
108        column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
109        Amount::from(amount),
110    ))
111}
112
113#[async_trait]
114impl<RM> database::MintProofsTransaction for SQLTransaction<RM>
115where
116    RM: DatabasePool + 'static,
117{
118    type Err = Error;
119
120    /// Adds proofs to the database with initial state `Unspent`.
121    ///
122    /// This method first checks if any of the proofs already exist in the database.
123    /// If a proof exists and is spent, returns [`Error::AttemptUpdateSpentProof`].
124    /// If a proof exists in any other state, returns [`Error::Duplicate`].
125    ///
126    /// On success, returns the proofs wrapped in [`Acquired<ProofsWithState>`] with
127    /// state set to `Unspent`, indicating the rows are locked for the duration of
128    /// the transaction.
129    async fn add_proofs(
130        &mut self,
131        proofs: Proofs,
132        quote_id: Option<QuoteId>,
133        operation: &Operation,
134    ) -> Result<Acquired<ProofsWithState>, Self::Err> {
135        let current_time = unix_time();
136
137        // Check any previous proof, this query should return None in order to proceed storing
138        // Any result here would error
139        match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
140            .bind_vec(
141                "ys",
142                proofs
143                    .iter()
144                    .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
145                    .collect::<Result<_, _>>()?,
146            )?
147            .pluck(&self.inner)
148            .await?
149            .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
150            .transpose()?
151        {
152            Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
153            Some(_) => Err(database::Error::Duplicate),
154            None => Ok(()), // no previous record
155        }?;
156
157        for proof in &proofs {
158            let y = proof.y()?;
159
160            query(
161                r#"
162                  INSERT INTO proof
163                  (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
164                  VALUES
165                  (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
166                  "#,
167            )?
168            .bind("y", y.to_bytes().to_vec())
169            .bind("amount", proof.amount.to_i64())
170            .bind("keyset_id", proof.keyset_id.to_string())
171            .bind("secret", proof.secret.to_string())
172            .bind("c", proof.c.to_bytes().to_vec())
173            .bind(
174                "witness",
175                proof.witness.clone().and_then(|w| serde_json::to_string(&w).inspect_err(|e| tracing::error!("Failed to serialize witness: {:?}", e)).ok()),
176            )
177            .bind("state", "UNSPENT".to_string())
178            .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
179            .bind("created_time", current_time as i64)
180            .bind("operation_kind", operation.kind().to_string())
181            .bind("operation_id", operation.id().to_string())
182            .execute(&self.inner)
183            .await?;
184        }
185
186        Ok(ProofsWithState::new(proofs, State::Unspent).into())
187    }
188
189    /// Updates all proofs to the given state in the database.
190    ///
191    /// Also updates the `state` field on the [`ProofsWithState`] wrapper to reflect
192    /// the new state after the database update succeeds.
193    ///
194    /// When the new state is `Spent`, this method also updates the `keyset_amounts`
195    /// table to track the total redeemed amount per keyset for analytics purposes.
196    ///
197    /// # Prerequisites
198    ///
199    /// The proofs must have been previously acquired via `add_proofs`
200    /// or `get_proofs` to ensure they are locked within the current transaction.
201    async fn update_proofs_state(
202        &mut self,
203        proofs: &mut Acquired<ProofsWithState>,
204        new_state: State,
205    ) -> Result<(), Self::Err> {
206        let ys = proofs.ys()?;
207
208        query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
209            .bind("new_state", new_state.to_string())
210            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
211            .execute(&self.inner)
212            .await?;
213
214        if new_state == State::Spent {
215            query(
216                    r#"
217                    INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
218                    SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
219                    FROM proof
220                    WHERE y IN (:ys)
221                    GROUP BY keyset_id
222                    ON CONFLICT (keyset_id)
223                    DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
224                    "#,
225                )?
226                .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
227                .execute(&self.inner)
228                .await?;
229        }
230
231        proofs.state = new_state;
232
233        Ok(())
234    }
235
236    async fn remove_proofs(
237        &mut self,
238        ys: &[PublicKey],
239        _quote_id: Option<QuoteId>,
240    ) -> Result<(), Self::Err> {
241        let total_deleted = query(
242            r#"
243            DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
244            "#,
245        )?
246        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
247        .bind_vec("exclude_state", vec![State::Spent.to_string()])?
248        .execute(&self.inner)
249        .await?;
250
251        if total_deleted != ys.len() {
252            // Query current states to provide detailed logging
253            let current_states = get_current_states(&self.inner, ys, true).await?;
254
255            let missing_count = ys.len() - current_states.len();
256            let spent_count = current_states
257                .values()
258                .filter(|s| **s == State::Spent)
259                .count();
260
261            if missing_count > 0 {
262                tracing::warn!(
263                    "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
264                    missing_count,
265                    ys.len()
266                );
267            }
268
269            if spent_count > 0 {
270                tracing::warn!(
271                    "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
272                    spent_count,
273                    ys.len()
274                );
275            }
276
277            tracing::debug!(
278                "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
279                ys.len(),
280                total_deleted,
281                missing_count,
282                spent_count
283            );
284
285            return Err(Self::Err::AttemptRemoveSpentProof);
286        }
287
288        Ok(())
289    }
290
291    async fn get_proof_ys_by_quote_id(
292        &mut self,
293        quote_id: &QuoteId,
294    ) -> Result<Vec<PublicKey>, Self::Err> {
295        Ok(query(
296            r#"
297            SELECT
298                amount,
299                keyset_id,
300                secret,
301                c,
302                witness
303            FROM
304                proof
305            WHERE
306                quote_id = :quote_id
307            FOR UPDATE
308            "#,
309        )?
310        .bind("quote_id", quote_id.to_string())
311        .fetch_all(&self.inner)
312        .await?
313        .into_iter()
314        .map(sql_row_to_proof)
315        .collect::<Result<Vec<Proof>, _>>()?
316        .ys()?)
317    }
318
319    async fn get_proof_ys_by_operation_id(
320        &mut self,
321        operation_id: &uuid::Uuid,
322    ) -> Result<Vec<PublicKey>, Self::Err> {
323        Ok(query(
324            r#"
325            SELECT
326                y
327            FROM
328                proof
329            WHERE
330                operation_id = :operation_id
331            "#,
332        )?
333        .bind("operation_id", operation_id.to_string())
334        .fetch_all(&self.inner)
335        .await?
336        .into_iter()
337        .map(|row| -> Result<PublicKey, Error> {
338            Ok(column_as_string!(
339                &row[0],
340                PublicKey::from_hex,
341                PublicKey::from_slice
342            ))
343        })
344        .collect::<Result<Vec<_>, _>>()?)
345    }
346
347    async fn get_proofs(
348        &mut self,
349        ys: &[PublicKey],
350    ) -> Result<Acquired<ProofsWithState>, Self::Err> {
351        let rows = query(
352            r#"
353             SELECT
354                 keyset_id,
355                 amount,
356                 secret,
357                 c,
358                 witness,
359                 state
360             FROM
361                 proof
362             WHERE
363                 y IN (:ys)
364             FOR UPDATE
365             "#,
366        )?
367        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
368        .fetch_all(&self.inner)
369        .await?;
370
371        if rows.is_empty() || rows.len() != ys.len() {
372            return Err(database::Error::ProofNotFound);
373        }
374
375        let results: Vec<(Proof, State)> = rows
376            .into_iter()
377            .map(sql_row_to_proof_with_state)
378            .collect::<Result<Vec<_>, _>>()?;
379
380        let mut proofs = Vec::with_capacity(results.len());
381        let mut first_state: Option<State> = None;
382
383        for (proof, state) in results {
384            if let Some(first) = first_state {
385                if first != state {
386                    return Err(database::Error::Internal(
387                        "Proofs have inconsistent states".to_string(),
388                    ));
389                }
390            } else {
391                first_state = Some(state);
392            }
393
394            proofs.push(proof);
395        }
396
397        let state = first_state.unwrap_or(State::Unspent);
398        Ok(ProofsWithState::new(proofs, state).into())
399    }
400}
401
402#[async_trait]
403impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
404where
405    RM: DatabasePool + 'static,
406{
407    type Err = Error;
408
409    async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
410        let conn = self
411            .pool
412            .get()
413            .await
414            .map_err(|e| Error::Database(Box::new(e)))?;
415        let mut proofs = query(
416            r#"
417            SELECT
418                amount,
419                keyset_id,
420                secret,
421                c,
422                witness,
423                y
424            FROM
425                proof
426            WHERE
427                y IN (:ys)
428            "#,
429        )?
430        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
431        .fetch_all(&*conn)
432        .await?
433        .into_iter()
434        .map(|mut row| {
435            Ok((
436                column_as_string!(
437                    row.pop().ok_or(Error::InvalidDbResponse)?,
438                    PublicKey::from_hex,
439                    PublicKey::from_slice
440                ),
441                sql_row_to_proof(row)?,
442            ))
443        })
444        .collect::<Result<HashMap<_, _>, Error>>()?;
445
446        Ok(ys.iter().map(|y| proofs.remove(y)).collect())
447    }
448
449    async fn get_proof_ys_by_quote_id(
450        &self,
451        quote_id: &QuoteId,
452    ) -> Result<Vec<PublicKey>, Self::Err> {
453        let conn = self
454            .pool
455            .get()
456            .await
457            .map_err(|e| Error::Database(Box::new(e)))?;
458        Ok(query(
459            r#"
460            SELECT
461                amount,
462                keyset_id,
463                secret,
464                c,
465                witness
466            FROM
467                proof
468            WHERE
469                quote_id = :quote_id
470            "#,
471        )?
472        .bind("quote_id", quote_id.to_string())
473        .fetch_all(&*conn)
474        .await?
475        .into_iter()
476        .map(sql_row_to_proof)
477        .collect::<Result<Vec<Proof>, _>>()?
478        .ys()?)
479    }
480
481    async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
482        let conn = self
483            .pool
484            .get()
485            .await
486            .map_err(|e| Error::Database(Box::new(e)))?;
487        let mut current_states = get_current_states(&*conn, ys, false).await?;
488
489        Ok(ys.iter().map(|y| current_states.remove(y)).collect())
490    }
491
492    async fn get_proofs_by_keyset_id(
493        &self,
494        keyset_id: &Id,
495    ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
496        let conn = self
497            .pool
498            .get()
499            .await
500            .map_err(|e| Error::Database(Box::new(e)))?;
501
502        let (proofs, states): (Vec<Proof>, Vec<State>) = query(
503            r#"
504            SELECT
505               keyset_id,
506               amount,
507               secret,
508               c,
509               witness,
510               state
511            FROM
512                proof
513            WHERE
514                keyset_id=:keyset_id
515            "#,
516        )?
517        .bind("keyset_id", keyset_id.to_string())
518        .fetch_all(&*conn)
519        .await?
520        .into_iter()
521        .map(sql_row_to_proof_with_state)
522        .collect::<Result<Vec<_>, _>>()?
523        .into_iter()
524        .unzip();
525
526        Ok((proofs, states.into_iter().map(Some).collect()))
527    }
528
529    /// Get total proofs redeemed by keyset id
530    async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
531        let conn = self
532            .pool
533            .get()
534            .await
535            .map_err(|e| Error::Database(Box::new(e)))?;
536        query(
537            r#"
538            SELECT
539                keyset_id,
540                total_redeemed as amount
541            FROM
542                keyset_amounts
543        "#,
544        )?
545        .fetch_all(&*conn)
546        .await?
547        .into_iter()
548        .map(sql_row_to_hashmap_amount)
549        .collect()
550    }
551
552    async fn get_proof_ys_by_operation_id(
553        &self,
554        operation_id: &uuid::Uuid,
555    ) -> Result<Vec<PublicKey>, Self::Err> {
556        let conn = self
557            .pool
558            .get()
559            .await
560            .map_err(|e| Error::Database(Box::new(e)))?;
561        query(
562            r#"
563            SELECT
564                y
565            FROM
566                proof
567            WHERE
568                operation_id = :operation_id
569            "#,
570        )?
571        .bind("operation_id", operation_id.to_string())
572        .fetch_all(&*conn)
573        .await?
574        .into_iter()
575        .map(|row| -> Result<PublicKey, Error> {
576            Ok(column_as_string!(
577                &row[0],
578                PublicKey::from_hex,
579                PublicKey::from_slice
580            ))
581        })
582        .collect::<Result<Vec<_>, _>>()
583    }
584}