cdk_sql_common/mint/
mod.rs

1//! SQL database implementation of the Mint
2//!
3//! This is a generic SQL implementation for the mint storage layer. Any database can be plugged in
4//! as long as standard ANSI SQL is used, as Postgres and SQLite would understand it.
5//!
6//! This implementation also has a rudimentary but standard migration and versioning system.
7//!
8//! The trait expects an asynchronous interaction, but it also provides tools to spawn blocking
9//! clients in a pool and expose them to an asynchronous environment, making them compatible with
10//! Mint.
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use bitcoin::bip32::DerivationPath;
18use cdk_common::database::mint::{validate_kvstore_params, SagaDatabase, SagaTransaction};
19use cdk_common::database::{
20    self, ConversionError, Error, MintDatabase, MintDbWriterFinalizer, MintKeyDatabaseTransaction,
21    MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, MintQuotesTransaction,
22    MintSignatureTransaction, MintSignaturesDatabase,
23};
24use cdk_common::mint::{
25    self, IncomingPayment, Issuance, MeltPaymentRequest, MeltQuote, MintKeySetInfo, MintQuote,
26    Operation,
27};
28use cdk_common::nut00::ProofsMethods;
29use cdk_common::payment::PaymentIdentifier;
30use cdk_common::quote_id::QuoteId;
31use cdk_common::secret::Secret;
32use cdk_common::state::{check_melt_quote_state_transition, check_state_transition};
33use cdk_common::util::unix_time;
34use cdk_common::{
35    Amount, BlindSignature, BlindSignatureDleq, BlindedMessage, CurrencyUnit, Id, MeltQuoteState,
36    PaymentMethod, Proof, Proofs, PublicKey, SecretKey, State,
37};
38use lightning_invoice::Bolt11Invoice;
39use migrations::MIGRATIONS;
40use tracing::instrument;
41
42use crate::common::migrate;
43use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
44use crate::pool::{DatabasePool, Pool, PooledResource};
45use crate::stmt::{query, Column};
46use crate::{
47    column_as_nullable_number, column_as_nullable_string, column_as_number, column_as_string,
48    unpack_into,
49};
50
51#[cfg(feature = "auth")]
52mod auth;
53
54#[rustfmt::skip]
55mod migrations {
56    include!(concat!(env!("OUT_DIR"), "/migrations_mint.rs"));
57}
58
59#[cfg(feature = "auth")]
60pub use auth::SQLMintAuthDatabase;
61#[cfg(feature = "prometheus")]
62use cdk_prometheus::METRICS;
63
64/// Mint SQL Database
65#[derive(Debug, Clone)]
66pub struct SQLMintDatabase<RM>
67where
68    RM: DatabasePool + 'static,
69{
70    pool: Arc<Pool<RM>>,
71}
72
73/// SQL Transaction Writer
74pub struct SQLTransaction<RM>
75where
76    RM: DatabasePool + 'static,
77{
78    inner: ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
79}
80
81#[inline(always)]
82async fn get_current_states<C>(
83    conn: &C,
84    ys: &[PublicKey],
85) -> Result<HashMap<PublicKey, State>, Error>
86where
87    C: DatabaseExecutor + Send + Sync,
88{
89    if ys.is_empty() {
90        return Ok(Default::default());
91    }
92    query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)?
93        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
94        .fetch_all(conn)
95        .await?
96        .into_iter()
97        .map(|row| {
98            Ok((
99                column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
100                column_as_string!(&row[1], State::from_str),
101            ))
102        })
103        .collect::<Result<HashMap<_, _>, _>>()
104}
105
106impl<RM> SQLMintDatabase<RM>
107where
108    RM: DatabasePool + 'static,
109{
110    /// Creates a new instance
111    pub async fn new<X>(db: X) -> Result<Self, Error>
112    where
113        X: Into<RM::Config>,
114    {
115        let pool = Pool::new(db.into());
116
117        Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
118
119        Ok(Self { pool })
120    }
121
122    /// Migrate
123    async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
124        let tx = ConnectionWithTransaction::new(conn).await?;
125        migrate(&tx, RM::Connection::name(), MIGRATIONS).await?;
126        tx.commit().await?;
127        Ok(())
128    }
129}
130
131#[async_trait]
132impl<RM> database::MintProofsTransaction<'_> for SQLTransaction<RM>
133where
134    RM: DatabasePool + 'static,
135{
136    type Err = Error;
137
138    async fn add_proofs(
139        &mut self,
140        proofs: Proofs,
141        quote_id: Option<QuoteId>,
142        operation: &Operation,
143    ) -> Result<(), Self::Err> {
144        let current_time = unix_time();
145
146        // Check any previous proof, this query should return None in order to proceed storing
147        // Any result here would error
148        match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
149            .bind_vec(
150                "ys",
151                proofs
152                    .iter()
153                    .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
154                    .collect::<Result<_, _>>()?,
155            )
156            .pluck(&self.inner)
157            .await?
158            .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
159            .transpose()?
160        {
161            Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
162            Some(_) => Err(database::Error::Duplicate),
163            None => Ok(()), // no previous record
164        }?;
165
166        for proof in proofs {
167            query(
168                r#"
169                  INSERT INTO proof
170                  (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
171                  VALUES
172                  (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
173                  "#,
174            )?
175            .bind("y", proof.y()?.to_bytes().to_vec())
176            .bind("amount", proof.amount.to_i64())
177            .bind("keyset_id", proof.keyset_id.to_string())
178            .bind("secret", proof.secret.to_string())
179            .bind("c", proof.c.to_bytes().to_vec())
180            .bind(
181                "witness",
182                proof.witness.map(|w| serde_json::to_string(&w).unwrap()),
183            )
184            .bind("state", "UNSPENT".to_string())
185            .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
186            .bind("created_time", current_time as i64)
187            .bind("operation_kind", operation.kind())
188            .bind("operation_id", operation.id().to_string())
189            .execute(&self.inner)
190            .await?;
191        }
192
193        Ok(())
194    }
195
196    async fn update_proofs_states(
197        &mut self,
198        ys: &[PublicKey],
199        new_state: State,
200    ) -> Result<Vec<Option<State>>, Self::Err> {
201        let mut current_states = get_current_states(&self.inner, ys).await?;
202
203        if current_states.len() != ys.len() {
204            tracing::warn!(
205                "Attempted to update state of non-existent proof {} {}",
206                current_states.len(),
207                ys.len()
208            );
209            return Err(database::Error::ProofNotFound);
210        }
211
212        for state in current_states.values() {
213            check_state_transition(*state, new_state)?;
214        }
215
216        query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
217            .bind("new_state", new_state.to_string())
218            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
219            .execute(&self.inner)
220            .await?;
221
222        if new_state == State::Spent {
223            query(
224                r#"
225                INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
226                SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
227                FROM proof
228                WHERE y IN (:ys)
229                GROUP BY keyset_id
230                ON CONFLICT (keyset_id)
231                DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
232                "#,
233            )?
234            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
235            .execute(&self.inner)
236            .await?;
237        }
238
239        Ok(ys.iter().map(|y| current_states.remove(y)).collect())
240    }
241
242    async fn remove_proofs(
243        &mut self,
244        ys: &[PublicKey],
245        _quote_id: Option<QuoteId>,
246    ) -> Result<(), Self::Err> {
247        if ys.is_empty() {
248            return Ok(());
249        }
250        let total_deleted = query(
251            r#"
252            DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
253            "#,
254        )?
255        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
256        .bind_vec("exclude_state", vec![State::Spent.to_string()])
257        .execute(&self.inner)
258        .await?;
259
260        if total_deleted != ys.len() {
261            return Err(Self::Err::AttemptRemoveSpentProof);
262        }
263
264        Ok(())
265    }
266
267    async fn get_proof_ys_by_quote_id(
268        &self,
269        quote_id: &QuoteId,
270    ) -> Result<Vec<PublicKey>, Self::Err> {
271        Ok(query(
272            r#"
273            SELECT
274                amount,
275                keyset_id,
276                secret,
277                c,
278                witness
279            FROM
280                proof
281            WHERE
282                quote_id = :quote_id
283            "#,
284        )?
285        .bind("quote_id", quote_id.to_string())
286        .fetch_all(&self.inner)
287        .await?
288        .into_iter()
289        .map(sql_row_to_proof)
290        .collect::<Result<Vec<Proof>, _>>()?
291        .ys()?)
292    }
293}
294
295#[async_trait]
296impl<RM> database::MintTransaction<'_, Error> for SQLTransaction<RM> where RM: DatabasePool + 'static
297{}
298
299#[async_trait]
300impl<RM> MintDbWriterFinalizer for SQLTransaction<RM>
301where
302    RM: DatabasePool + 'static,
303{
304    type Err = Error;
305
306    async fn commit(self: Box<Self>) -> Result<(), Error> {
307        let result = self.inner.commit().await;
308        #[cfg(feature = "prometheus")]
309        {
310            let success = result.is_ok();
311            METRICS.record_mint_operation("transaction_commit", success);
312            METRICS.record_mint_operation_histogram("transaction_commit", success, 1.0);
313        }
314
315        Ok(result?)
316    }
317
318    async fn rollback(self: Box<Self>) -> Result<(), Error> {
319        let result = self.inner.rollback().await;
320
321        #[cfg(feature = "prometheus")]
322        {
323            let success = result.is_ok();
324            METRICS.record_mint_operation("transaction_rollback", success);
325            METRICS.record_mint_operation_histogram("transaction_rollback", success, 1.0);
326        }
327        Ok(result?)
328    }
329}
330
331#[inline(always)]
332async fn get_mint_quote_payments<C>(
333    conn: &C,
334    quote_id: &QuoteId,
335) -> Result<Vec<IncomingPayment>, Error>
336where
337    C: DatabaseExecutor + Send + Sync,
338{
339    // Get payment IDs and timestamps from the mint_quote_payments table
340    query(
341        r#"
342        SELECT
343            payment_id,
344            timestamp,
345            amount
346        FROM
347            mint_quote_payments
348        WHERE
349            quote_id=:quote_id
350        "#,
351    )?
352    .bind("quote_id", quote_id.to_string())
353    .fetch_all(conn)
354    .await?
355    .into_iter()
356    .map(|row| {
357        let amount: u64 = column_as_number!(row[2].clone());
358        let time: u64 = column_as_number!(row[1].clone());
359        Ok(IncomingPayment::new(
360            amount.into(),
361            column_as_string!(&row[0]),
362            time,
363        ))
364    })
365    .collect()
366}
367
368#[inline(always)]
369async fn get_mint_quote_issuance<C>(conn: &C, quote_id: &QuoteId) -> Result<Vec<Issuance>, Error>
370where
371    C: DatabaseExecutor + Send + Sync,
372{
373    // Get payment IDs and timestamps from the mint_quote_payments table
374    query(
375        r#"
376SELECT amount, timestamp
377FROM mint_quote_issued
378WHERE quote_id=:quote_id
379            "#,
380    )?
381    .bind("quote_id", quote_id.to_string())
382    .fetch_all(conn)
383    .await?
384    .into_iter()
385    .map(|row| {
386        let time: u64 = column_as_number!(row[1].clone());
387        Ok(Issuance::new(
388            Amount::from_i64(column_as_number!(row[0].clone()))
389                .expect("Is amount when put into db"),
390            time,
391        ))
392    })
393    .collect()
394}
395
396#[async_trait]
397impl<RM> MintKeyDatabaseTransaction<'_, Error> for SQLTransaction<RM>
398where
399    RM: DatabasePool + 'static,
400{
401    async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error> {
402        query(
403            r#"
404        INSERT INTO
405            keyset (
406                id, unit, active, valid_from, valid_to, derivation_path,
407                max_order, amounts, input_fee_ppk, derivation_path_index
408            )
409        VALUES (
410            :id, :unit, :active, :valid_from, :valid_to, :derivation_path,
411            :max_order, :amounts, :input_fee_ppk, :derivation_path_index
412        )
413        ON CONFLICT(id) DO UPDATE SET
414            unit = excluded.unit,
415            active = excluded.active,
416            valid_from = excluded.valid_from,
417            valid_to = excluded.valid_to,
418            derivation_path = excluded.derivation_path,
419            max_order = excluded.max_order,
420            amounts = excluded.amounts,
421            input_fee_ppk = excluded.input_fee_ppk,
422            derivation_path_index = excluded.derivation_path_index
423        "#,
424        )?
425        .bind("id", keyset.id.to_string())
426        .bind("unit", keyset.unit.to_string())
427        .bind("active", keyset.active)
428        .bind("valid_from", keyset.valid_from as i64)
429        .bind("valid_to", keyset.final_expiry.map(|v| v as i64))
430        .bind("derivation_path", keyset.derivation_path.to_string())
431        .bind("max_order", keyset.max_order)
432        .bind("amounts", serde_json::to_string(&keyset.amounts).ok())
433        .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
434        .bind("derivation_path_index", keyset.derivation_path_index)
435        .execute(&self.inner)
436        .await?;
437
438        Ok(())
439    }
440
441    async fn set_active_keyset(&mut self, unit: CurrencyUnit, id: Id) -> Result<(), Error> {
442        query(r#"UPDATE keyset SET active=FALSE WHERE unit = :unit"#)?
443            .bind("unit", unit.to_string())
444            .execute(&self.inner)
445            .await?;
446
447        query(r#"UPDATE keyset SET active=TRUE WHERE unit = :unit AND id = :id"#)?
448            .bind("unit", unit.to_string())
449            .bind("id", id.to_string())
450            .execute(&self.inner)
451            .await?;
452
453        Ok(())
454    }
455}
456
457#[async_trait]
458impl<RM> MintKeysDatabase for SQLMintDatabase<RM>
459where
460    RM: DatabasePool + 'static,
461{
462    type Err = Error;
463
464    async fn begin_transaction<'a>(
465        &'a self,
466    ) -> Result<Box<dyn MintKeyDatabaseTransaction<'a, Error> + Send + Sync + 'a>, Error> {
467        let tx = SQLTransaction {
468            inner: ConnectionWithTransaction::new(
469                self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
470            )
471            .await?,
472        };
473
474        Ok(Box::new(tx))
475    }
476
477    async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
478        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
479        Ok(
480            query(r#" SELECT id FROM keyset WHERE active = :active AND unit = :unit"#)?
481                .bind("active", true)
482                .bind("unit", unit.to_string())
483                .pluck(&*conn)
484                .await?
485                .map(|id| match id {
486                    Column::Text(text) => Ok(Id::from_str(&text)?),
487                    Column::Blob(id) => Ok(Id::from_bytes(&id)?),
488                    _ => Err(Error::InvalidKeysetId),
489                })
490                .transpose()?,
491        )
492    }
493
494    async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> {
495        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
496        Ok(
497            query(r#"SELECT id, unit FROM keyset WHERE active = :active"#)?
498                .bind("active", true)
499                .fetch_all(&*conn)
500                .await?
501                .into_iter()
502                .map(|row| {
503                    Ok((
504                        column_as_string!(&row[1], CurrencyUnit::from_str),
505                        column_as_string!(&row[0], Id::from_str, Id::from_bytes),
506                    ))
507                })
508                .collect::<Result<HashMap<_, _>, Error>>()?,
509        )
510    }
511
512    async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
513        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
514        Ok(query(
515            r#"SELECT
516                id,
517                unit,
518                active,
519                valid_from,
520                valid_to,
521                derivation_path,
522                derivation_path_index,
523                max_order,
524                amounts,
525                input_fee_ppk
526            FROM
527                keyset
528                WHERE id=:id"#,
529        )?
530        .bind("id", id.to_string())
531        .fetch_one(&*conn)
532        .await?
533        .map(sql_row_to_keyset_info)
534        .transpose()?)
535    }
536
537    async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
538        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
539        Ok(query(
540            r#"SELECT
541                id,
542                unit,
543                active,
544                valid_from,
545                valid_to,
546                derivation_path,
547                derivation_path_index,
548                max_order,
549                amounts,
550                input_fee_ppk
551            FROM
552                keyset
553            "#,
554        )?
555        .fetch_all(&*conn)
556        .await?
557        .into_iter()
558        .map(sql_row_to_keyset_info)
559        .collect::<Result<Vec<_>, _>>()?)
560    }
561}
562
563#[async_trait]
564impl<RM> MintQuotesTransaction<'_> for SQLTransaction<RM>
565where
566    RM: DatabasePool + 'static,
567{
568    type Err = Error;
569
570    async fn add_melt_request(
571        &mut self,
572        quote_id: &QuoteId,
573        inputs_amount: Amount,
574        inputs_fee: Amount,
575    ) -> Result<(), Self::Err> {
576        // Insert melt_request
577        query(
578            r#"
579            INSERT INTO melt_request
580            (quote_id, inputs_amount, inputs_fee)
581            VALUES
582            (:quote_id, :inputs_amount, :inputs_fee)
583            "#,
584        )?
585        .bind("quote_id", quote_id.to_string())
586        .bind("inputs_amount", inputs_amount.to_i64())
587        .bind("inputs_fee", inputs_fee.to_i64())
588        .execute(&self.inner)
589        .await?;
590
591        Ok(())
592    }
593
594    async fn add_blinded_messages(
595        &mut self,
596        quote_id: Option<&QuoteId>,
597        blinded_messages: &[BlindedMessage],
598        operation: &Operation,
599    ) -> Result<(), Self::Err> {
600        let current_time = unix_time();
601
602        // Insert blinded_messages directly into blind_signature with c = NULL
603        // Let the database constraint handle duplicate detection
604        for message in blinded_messages {
605            match query(
606                r#"
607                INSERT INTO blind_signature
608                (blinded_message, amount, keyset_id, c, quote_id, created_time, operation_kind, operation_id)
609                VALUES
610                (:blinded_message, :amount, :keyset_id, NULL, :quote_id, :created_time, :operation_kind, :operation_id)
611                "#,
612            )?
613            .bind(
614                "blinded_message",
615                message.blinded_secret.to_bytes().to_vec(),
616            )
617            .bind("amount", message.amount.to_i64())
618            .bind("keyset_id", message.keyset_id.to_string())
619            .bind("quote_id", quote_id.map(|q| q.to_string()))
620            .bind("created_time", current_time as i64)
621            .bind("operation_kind", operation.kind())
622            .bind("operation_id", operation.id().to_string())
623            .execute(&self.inner)
624            .await
625            {
626                Ok(_) => continue,
627                Err(database::Error::Duplicate) => {
628                    // Primary key constraint violation - blinded message already exists
629                    // This could be either:
630                    // 1. Already signed (c IS NOT NULL) - definitely an error
631                    // 2. Already pending (c IS NULL) - also an error
632                    return Err(database::Error::Duplicate);
633                }
634                Err(err) => return Err(err),
635            }
636        }
637
638        Ok(())
639    }
640
641    async fn delete_blinded_messages(
642        &mut self,
643        blinded_secrets: &[PublicKey],
644    ) -> Result<(), Self::Err> {
645        if blinded_secrets.is_empty() {
646            return Ok(());
647        }
648
649        // Delete blinded messages from blind_signature table where c IS NULL
650        // (only delete unsigned blinded messages)
651        query(
652            r#"
653            DELETE FROM blind_signature
654            WHERE blinded_message IN (:blinded_secrets) AND c IS NULL
655            "#,
656        )?
657        .bind_vec(
658            "blinded_secrets",
659            blinded_secrets
660                .iter()
661                .map(|secret| secret.to_bytes().to_vec())
662                .collect(),
663        )
664        .execute(&self.inner)
665        .await?;
666
667        Ok(())
668    }
669
670    async fn get_melt_request_and_blinded_messages(
671        &mut self,
672        quote_id: &QuoteId,
673    ) -> Result<Option<database::mint::MeltRequestInfo>, Self::Err> {
674        let melt_request_row = query(
675            r#"
676            SELECT inputs_amount, inputs_fee
677            FROM melt_request
678            WHERE quote_id = :quote_id
679            FOR UPDATE
680            "#,
681        )?
682        .bind("quote_id", quote_id.to_string())
683        .fetch_one(&self.inner)
684        .await?;
685
686        if let Some(row) = melt_request_row {
687            let inputs_amount: u64 = column_as_number!(row[0].clone());
688            let inputs_fee: u64 = column_as_number!(row[1].clone());
689
690            // Get blinded messages from blind_signature table where c IS NULL
691            let blinded_messages_rows = query(
692                r#"
693                SELECT blinded_message, keyset_id, amount
694                FROM blind_signature
695                WHERE quote_id = :quote_id AND c IS NULL
696                "#,
697            )?
698            .bind("quote_id", quote_id.to_string())
699            .fetch_all(&self.inner)
700            .await?;
701
702            let blinded_messages: Result<Vec<BlindedMessage>, Error> = blinded_messages_rows
703                .into_iter()
704                .map(|row| -> Result<BlindedMessage, Error> {
705                    let blinded_message_key =
706                        column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice);
707                    let keyset_id = column_as_string!(&row[1], Id::from_str, Id::from_bytes);
708                    let amount: u64 = column_as_number!(row[2].clone());
709
710                    Ok(BlindedMessage {
711                        blinded_secret: blinded_message_key,
712                        keyset_id,
713                        amount: Amount::from(amount),
714                        witness: None, // Not storing witness in database currently
715                    })
716                })
717                .collect();
718            let blinded_messages = blinded_messages?;
719
720            Ok(Some(database::mint::MeltRequestInfo {
721                inputs_amount: Amount::from(inputs_amount),
722                inputs_fee: Amount::from(inputs_fee),
723                change_outputs: blinded_messages,
724            }))
725        } else {
726            Ok(None)
727        }
728    }
729
730    async fn delete_melt_request(&mut self, quote_id: &QuoteId) -> Result<(), Self::Err> {
731        // Delete from melt_request table
732        query(
733            r#"
734            DELETE FROM melt_request
735            WHERE quote_id = :quote_id
736            "#,
737        )?
738        .bind("quote_id", quote_id.to_string())
739        .execute(&self.inner)
740        .await?;
741
742        // Also delete blinded messages (where c IS NULL) from blind_signature table
743        query(
744            r#"
745            DELETE FROM blind_signature
746            WHERE quote_id = :quote_id AND c IS NULL
747            "#,
748        )?
749        .bind("quote_id", quote_id.to_string())
750        .execute(&self.inner)
751        .await?;
752
753        Ok(())
754    }
755
756    #[instrument(skip(self))]
757    async fn increment_mint_quote_amount_paid(
758        &mut self,
759        quote_id: &QuoteId,
760        amount_paid: Amount,
761        payment_id: String,
762    ) -> Result<Amount, Self::Err> {
763        if amount_paid == Amount::ZERO {
764            tracing::warn!("Amount payments of zero amount should not be recorded.");
765            return Err(Error::Duplicate);
766        }
767
768        // Check if payment_id already exists in mint_quote_payments
769        let exists = query(
770            r#"
771            SELECT payment_id
772            FROM mint_quote_payments
773            WHERE payment_id = :payment_id
774            FOR UPDATE
775            "#,
776        )?
777        .bind("payment_id", payment_id.clone())
778        .fetch_one(&self.inner)
779        .await?;
780
781        if exists.is_some() {
782            tracing::error!("Payment ID already exists: {}", payment_id);
783            return Err(database::Error::Duplicate);
784        }
785
786        // Get current amount_paid from quote
787        let current_amount = query(
788            r#"
789            SELECT amount_paid
790            FROM mint_quote
791            WHERE id = :quote_id
792            FOR UPDATE
793            "#,
794        )?
795        .bind("quote_id", quote_id.to_string())
796        .fetch_one(&self.inner)
797        .await
798        .inspect_err(|err| {
799            tracing::error!("SQLite could not get mint quote amount_paid: {}", err);
800        })?;
801
802        let current_amount_paid = if let Some(current_amount) = current_amount {
803            let amount: u64 = column_as_number!(current_amount[0].clone());
804            Amount::from(amount)
805        } else {
806            Amount::ZERO
807        };
808
809        // Calculate new amount_paid with overflow check
810        let new_amount_paid = current_amount_paid
811            .checked_add(amount_paid)
812            .ok_or_else(|| database::Error::AmountOverflow)?;
813
814        tracing::debug!(
815            "Mint quote {} amount paid was {} is now {}.",
816            quote_id,
817            current_amount_paid,
818            new_amount_paid
819        );
820
821        // Update the amount_paid
822        query(
823            r#"
824            UPDATE mint_quote
825            SET amount_paid = :amount_paid
826            WHERE id = :quote_id
827            "#,
828        )?
829        .bind("amount_paid", new_amount_paid.to_i64())
830        .bind("quote_id", quote_id.to_string())
831        .execute(&self.inner)
832        .await
833        .inspect_err(|err| {
834            tracing::error!("SQLite could not update mint quote amount_paid: {}", err);
835        })?;
836
837        // Add payment_id to mint_quote_payments table
838        query(
839            r#"
840            INSERT INTO mint_quote_payments
841            (quote_id, payment_id, amount, timestamp)
842            VALUES (:quote_id, :payment_id, :amount, :timestamp)
843            "#,
844        )?
845        .bind("quote_id", quote_id.to_string())
846        .bind("payment_id", payment_id)
847        .bind("amount", amount_paid.to_i64())
848        .bind("timestamp", unix_time() as i64)
849        .execute(&self.inner)
850        .await
851        .map_err(|err| {
852            tracing::error!("SQLite could not insert payment ID: {}", err);
853            err
854        })?;
855
856        Ok(new_amount_paid)
857    }
858
859    #[instrument(skip_all)]
860    async fn increment_mint_quote_amount_issued(
861        &mut self,
862        quote_id: &QuoteId,
863        amount_issued: Amount,
864    ) -> Result<Amount, Self::Err> {
865        // Get current amount_issued from quote
866        let current_amounts = query(
867            r#"
868            SELECT amount_issued, amount_paid
869            FROM mint_quote
870            WHERE id = :quote_id
871            FOR UPDATE
872            "#,
873        )?
874        .bind("quote_id", quote_id.to_string())
875        .fetch_one(&self.inner)
876        .await
877        .inspect_err(|err| {
878            tracing::error!("SQLite could not get mint quote amount_issued: {}", err);
879        })?
880        .ok_or(Error::QuoteNotFound)?;
881
882        let new_amount_issued = {
883            // Make sure the db protects issuing not paid quotes
884            unpack_into!(
885                let (current_amount_issued, current_amount_paid) = current_amounts
886            );
887
888            let current_amount_issued: u64 = column_as_number!(current_amount_issued);
889            let current_amount_paid: u64 = column_as_number!(current_amount_paid);
890
891            let current_amount_issued = Amount::from(current_amount_issued);
892            let current_amount_paid = Amount::from(current_amount_paid);
893
894            // Calculate new amount_issued with overflow check
895            let new_amount_issued = current_amount_issued
896                .checked_add(amount_issued)
897                .ok_or_else(|| database::Error::AmountOverflow)?;
898
899            current_amount_paid
900                .checked_sub(new_amount_issued)
901                .ok_or(Error::Internal("Over-issued not allowed".to_owned()))?;
902
903            new_amount_issued
904        };
905
906        // Update the amount_issued
907        query(
908            r#"
909            UPDATE mint_quote
910            SET amount_issued = :amount_issued
911            WHERE id = :quote_id
912            "#,
913        )?
914        .bind("amount_issued", new_amount_issued.to_i64())
915        .bind("quote_id", quote_id.to_string())
916        .execute(&self.inner)
917        .await
918        .inspect_err(|err| {
919            tracing::error!("SQLite could not update mint quote amount_issued: {}", err);
920        })?;
921
922        let current_time = unix_time();
923
924        query(
925            r#"
926INSERT INTO mint_quote_issued
927(quote_id, amount, timestamp)
928VALUES (:quote_id, :amount, :timestamp);
929            "#,
930        )?
931        .bind("quote_id", quote_id.to_string())
932        .bind("amount", amount_issued.to_i64())
933        .bind("timestamp", current_time as i64)
934        .execute(&self.inner)
935        .await?;
936
937        Ok(new_amount_issued)
938    }
939
940    #[instrument(skip_all)]
941    async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), Self::Err> {
942        query(
943            r#"
944                INSERT INTO mint_quote (
945                id, amount, unit, request, expiry, request_lookup_id, pubkey, created_time, payment_method, request_lookup_id_kind
946                )
947                VALUES (
948                :id, :amount, :unit, :request, :expiry, :request_lookup_id, :pubkey, :created_time, :payment_method, :request_lookup_id_kind
949                )
950            "#,
951        )?
952        .bind("id", quote.id.to_string())
953        .bind("amount", quote.amount.map(|a| a.to_i64()))
954        .bind("unit", quote.unit.to_string())
955        .bind("request", quote.request)
956        .bind("expiry", quote.expiry as i64)
957        .bind(
958            "request_lookup_id",
959            quote.request_lookup_id.to_string(),
960        )
961        .bind("pubkey", quote.pubkey.map(|p| p.to_string()))
962        .bind("created_time", quote.created_time as i64)
963        .bind("payment_method", quote.payment_method.to_string())
964        .bind("request_lookup_id_kind", quote.request_lookup_id.kind())
965        .execute(&self.inner)
966        .await?;
967
968        Ok(())
969    }
970
971    async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err> {
972        // Now insert the new quote
973        query(
974            r#"
975            INSERT INTO melt_quote
976            (
977                id, unit, amount, request, fee_reserve, state,
978                expiry, payment_preimage, request_lookup_id,
979                created_time, paid_time, options, request_lookup_id_kind, payment_method
980            )
981            VALUES
982            (
983                :id, :unit, :amount, :request, :fee_reserve, :state,
984                :expiry, :payment_preimage, :request_lookup_id,
985                :created_time, :paid_time, :options, :request_lookup_id_kind, :payment_method
986            )
987        "#,
988        )?
989        .bind("id", quote.id.to_string())
990        .bind("unit", quote.unit.to_string())
991        .bind("amount", quote.amount.to_i64())
992        .bind("request", serde_json::to_string(&quote.request)?)
993        .bind("fee_reserve", quote.fee_reserve.to_i64())
994        .bind("state", quote.state.to_string())
995        .bind("expiry", quote.expiry as i64)
996        .bind("payment_preimage", quote.payment_preimage)
997        .bind(
998            "request_lookup_id",
999            quote.request_lookup_id.as_ref().map(|id| id.to_string()),
1000        )
1001        .bind("created_time", quote.created_time as i64)
1002        .bind("paid_time", quote.paid_time.map(|t| t as i64))
1003        .bind(
1004            "options",
1005            quote.options.map(|o| serde_json::to_string(&o).ok()),
1006        )
1007        .bind(
1008            "request_lookup_id_kind",
1009            quote.request_lookup_id.map(|id| id.kind()),
1010        )
1011        .bind("payment_method", quote.payment_method.to_string())
1012        .execute(&self.inner)
1013        .await?;
1014
1015        Ok(())
1016    }
1017
1018    async fn update_melt_quote_request_lookup_id(
1019        &mut self,
1020        quote_id: &QuoteId,
1021        new_request_lookup_id: &PaymentIdentifier,
1022    ) -> Result<(), Self::Err> {
1023        query(r#"UPDATE melt_quote SET request_lookup_id = :new_req_id, request_lookup_id_kind = :new_kind WHERE id = :id"#)?
1024            .bind("new_req_id", new_request_lookup_id.to_string())
1025            .bind("new_kind",new_request_lookup_id.kind() )
1026            .bind("id", quote_id.to_string())
1027            .execute(&self.inner)
1028            .await?;
1029        Ok(())
1030    }
1031
1032    async fn update_melt_quote_state(
1033        &mut self,
1034        quote_id: &QuoteId,
1035        state: MeltQuoteState,
1036        payment_proof: Option<String>,
1037    ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
1038        let mut quote = query(
1039            r#"
1040            SELECT
1041                id,
1042                unit,
1043                amount,
1044                request,
1045                fee_reserve,
1046                expiry,
1047                state,
1048                payment_preimage,
1049                request_lookup_id,
1050                created_time,
1051                paid_time,
1052                payment_method,
1053                options,
1054                request_lookup_id_kind
1055            FROM
1056                melt_quote
1057            WHERE
1058                id=:id
1059                AND state != :state
1060            "#,
1061        )?
1062        .bind("id", quote_id.to_string())
1063        .bind("state", state.to_string())
1064        .fetch_one(&self.inner)
1065        .await?
1066        .map(sql_row_to_melt_quote)
1067        .transpose()?
1068        .ok_or(Error::QuoteNotFound)?;
1069
1070        check_melt_quote_state_transition(quote.state, state)?;
1071
1072        let rec = if state == MeltQuoteState::Paid {
1073            let current_time = unix_time();
1074            query(r#"UPDATE melt_quote SET state = :state, paid_time = :paid_time, payment_preimage = :payment_preimage WHERE id = :id"#)?
1075                .bind("state", state.to_string())
1076                .bind("paid_time", current_time as i64)
1077                .bind("payment_preimage", payment_proof)
1078                .bind("id", quote_id.to_string())
1079                .execute(&self.inner)
1080                .await
1081        } else {
1082            query(r#"UPDATE melt_quote SET state = :state WHERE id = :id"#)?
1083                .bind("state", state.to_string())
1084                .bind("id", quote_id.to_string())
1085                .execute(&self.inner)
1086                .await
1087        };
1088
1089        match rec {
1090            Ok(_) => {}
1091            Err(err) => {
1092                tracing::error!("SQLite Could not update melt quote");
1093                return Err(err);
1094            }
1095        };
1096
1097        let old_state = quote.state;
1098        quote.state = state;
1099
1100        if state == MeltQuoteState::Unpaid || state == MeltQuoteState::Failed {
1101            self.delete_melt_request(quote_id).await?;
1102        }
1103
1104        Ok((old_state, quote))
1105    }
1106
1107    async fn get_mint_quote(&mut self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
1108        let payments = get_mint_quote_payments(&self.inner, quote_id).await?;
1109        let issuance = get_mint_quote_issuance(&self.inner, quote_id).await?;
1110
1111        Ok(query(
1112            r#"
1113            SELECT
1114                id,
1115                amount,
1116                unit,
1117                request,
1118                expiry,
1119                request_lookup_id,
1120                pubkey,
1121                created_time,
1122                amount_paid,
1123                amount_issued,
1124                payment_method,
1125                request_lookup_id_kind
1126            FROM
1127                mint_quote
1128            WHERE id = :id
1129            FOR UPDATE
1130            "#,
1131        )?
1132        .bind("id", quote_id.to_string())
1133        .fetch_one(&self.inner)
1134        .await?
1135        .map(|row| sql_row_to_mint_quote(row, payments, issuance))
1136        .transpose()?)
1137    }
1138
1139    async fn get_melt_quote(
1140        &mut self,
1141        quote_id: &QuoteId,
1142    ) -> Result<Option<mint::MeltQuote>, Self::Err> {
1143        Ok(query(
1144            r#"
1145            SELECT
1146                id,
1147                unit,
1148                amount,
1149                request,
1150                fee_reserve,
1151                expiry,
1152                state,
1153                payment_preimage,
1154                request_lookup_id,
1155                created_time,
1156                paid_time,
1157                payment_method,
1158                options,
1159                request_lookup_id
1160            FROM
1161                melt_quote
1162            WHERE
1163                id=:id
1164            "#,
1165        )?
1166        .bind("id", quote_id.to_string())
1167        .fetch_one(&self.inner)
1168        .await?
1169        .map(sql_row_to_melt_quote)
1170        .transpose()?)
1171    }
1172
1173    async fn get_mint_quote_by_request(
1174        &mut self,
1175        request: &str,
1176    ) -> Result<Option<MintQuote>, Self::Err> {
1177        let mut mint_quote = query(
1178            r#"
1179            SELECT
1180                id,
1181                amount,
1182                unit,
1183                request,
1184                expiry,
1185                request_lookup_id,
1186                pubkey,
1187                created_time,
1188                amount_paid,
1189                amount_issued,
1190                payment_method,
1191                request_lookup_id_kind
1192            FROM
1193                mint_quote
1194            WHERE request = :request
1195            FOR UPDATE
1196            "#,
1197        )?
1198        .bind("request", request.to_string())
1199        .fetch_one(&self.inner)
1200        .await?
1201        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1202        .transpose()?;
1203
1204        if let Some(quote) = mint_quote.as_mut() {
1205            let payments = get_mint_quote_payments(&self.inner, &quote.id).await?;
1206            let issuance = get_mint_quote_issuance(&self.inner, &quote.id).await?;
1207            quote.issuance = issuance;
1208            quote.payments = payments;
1209        }
1210
1211        Ok(mint_quote)
1212    }
1213
1214    async fn get_mint_quote_by_request_lookup_id(
1215        &mut self,
1216        request_lookup_id: &PaymentIdentifier,
1217    ) -> Result<Option<MintQuote>, Self::Err> {
1218        let mut mint_quote = query(
1219            r#"
1220            SELECT
1221                id,
1222                amount,
1223                unit,
1224                request,
1225                expiry,
1226                request_lookup_id,
1227                pubkey,
1228                created_time,
1229                amount_paid,
1230                amount_issued,
1231                payment_method,
1232                request_lookup_id_kind
1233            FROM
1234                mint_quote
1235            WHERE request_lookup_id = :request_lookup_id
1236            AND request_lookup_id_kind = :request_lookup_id_kind
1237            FOR UPDATE
1238            "#,
1239        )?
1240        .bind("request_lookup_id", request_lookup_id.to_string())
1241        .bind("request_lookup_id_kind", request_lookup_id.kind())
1242        .fetch_one(&self.inner)
1243        .await?
1244        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1245        .transpose()?;
1246
1247        if let Some(quote) = mint_quote.as_mut() {
1248            let payments = get_mint_quote_payments(&self.inner, &quote.id).await?;
1249            let issuance = get_mint_quote_issuance(&self.inner, &quote.id).await?;
1250            quote.issuance = issuance;
1251            quote.payments = payments;
1252        }
1253
1254        Ok(mint_quote)
1255    }
1256}
1257
1258#[async_trait]
1259impl<RM> MintQuotesDatabase for SQLMintDatabase<RM>
1260where
1261    RM: DatabasePool + 'static,
1262{
1263    type Err = Error;
1264
1265    async fn get_mint_quote(&self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
1266        #[cfg(feature = "prometheus")]
1267        METRICS.inc_in_flight_requests("get_mint_quote");
1268
1269        #[cfg(feature = "prometheus")]
1270        let start_time = std::time::Instant::now();
1271        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1272
1273        let result = async {
1274            let payments = get_mint_quote_payments(&*conn, quote_id).await?;
1275            let issuance = get_mint_quote_issuance(&*conn, quote_id).await?;
1276
1277            query(
1278                r#"
1279                SELECT
1280                    id,
1281                    amount,
1282                    unit,
1283                    request,
1284                    expiry,
1285                    request_lookup_id,
1286                    pubkey,
1287                    created_time,
1288                    amount_paid,
1289                    amount_issued,
1290                    payment_method,
1291                    request_lookup_id_kind
1292                FROM
1293                    mint_quote
1294                WHERE id = :id"#,
1295            )?
1296            .bind("id", quote_id.to_string())
1297            .fetch_one(&*conn)
1298            .await?
1299            .map(|row| sql_row_to_mint_quote(row, payments, issuance))
1300            .transpose()
1301        }
1302        .await;
1303
1304        #[cfg(feature = "prometheus")]
1305        {
1306            let success = result.is_ok();
1307
1308            METRICS.record_mint_operation("get_mint_quote", success);
1309            METRICS.record_mint_operation_histogram(
1310                "get_mint_quote",
1311                success,
1312                start_time.elapsed().as_secs_f64(),
1313            );
1314            METRICS.dec_in_flight_requests("get_mint_quote");
1315        }
1316
1317        result
1318    }
1319
1320    async fn get_mint_quote_by_request(
1321        &self,
1322        request: &str,
1323    ) -> Result<Option<MintQuote>, Self::Err> {
1324        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1325        let mut mint_quote = query(
1326            r#"
1327            SELECT
1328                id,
1329                amount,
1330                unit,
1331                request,
1332                expiry,
1333                request_lookup_id,
1334                pubkey,
1335                created_time,
1336                amount_paid,
1337                amount_issued,
1338                payment_method,
1339                request_lookup_id_kind
1340            FROM
1341                mint_quote
1342            WHERE request = :request"#,
1343        )?
1344        .bind("request", request.to_owned())
1345        .fetch_one(&*conn)
1346        .await?
1347        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1348        .transpose()?;
1349
1350        if let Some(quote) = mint_quote.as_mut() {
1351            let payments = get_mint_quote_payments(&*conn, &quote.id).await?;
1352            let issuance = get_mint_quote_issuance(&*conn, &quote.id).await?;
1353            quote.issuance = issuance;
1354            quote.payments = payments;
1355        }
1356
1357        Ok(mint_quote)
1358    }
1359
1360    async fn get_mint_quote_by_request_lookup_id(
1361        &self,
1362        request_lookup_id: &PaymentIdentifier,
1363    ) -> Result<Option<MintQuote>, Self::Err> {
1364        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1365        let mut mint_quote = query(
1366            r#"
1367            SELECT
1368                id,
1369                amount,
1370                unit,
1371                request,
1372                expiry,
1373                request_lookup_id,
1374                pubkey,
1375                created_time,
1376                amount_paid,
1377                amount_issued,
1378                payment_method,
1379                request_lookup_id_kind
1380            FROM
1381                mint_quote
1382            WHERE request_lookup_id = :request_lookup_id
1383            AND request_lookup_id_kind = :request_lookup_id_kind
1384            "#,
1385        )?
1386        .bind("request_lookup_id", request_lookup_id.to_string())
1387        .bind("request_lookup_id_kind", request_lookup_id.kind())
1388        .fetch_one(&*conn)
1389        .await?
1390        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1391        .transpose()?;
1392
1393        // TODO: these should use an sql join so they can be done in one query
1394        if let Some(quote) = mint_quote.as_mut() {
1395            let payments = get_mint_quote_payments(&*conn, &quote.id).await?;
1396            let issuance = get_mint_quote_issuance(&*conn, &quote.id).await?;
1397            quote.issuance = issuance;
1398            quote.payments = payments;
1399        }
1400
1401        Ok(mint_quote)
1402    }
1403
1404    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
1405        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1406        let mut mint_quotes = query(
1407            r#"
1408            SELECT
1409                id,
1410                amount,
1411                unit,
1412                request,
1413                expiry,
1414                request_lookup_id,
1415                pubkey,
1416                created_time,
1417                amount_paid,
1418                amount_issued,
1419                payment_method,
1420                request_lookup_id_kind
1421            FROM
1422                mint_quote
1423            "#,
1424        )?
1425        .fetch_all(&*conn)
1426        .await?
1427        .into_iter()
1428        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1429        .collect::<Result<Vec<_>, _>>()?;
1430
1431        for quote in mint_quotes.as_mut_slice() {
1432            let payments = get_mint_quote_payments(&*conn, &quote.id).await?;
1433            let issuance = get_mint_quote_issuance(&*conn, &quote.id).await?;
1434            quote.issuance = issuance;
1435            quote.payments = payments;
1436        }
1437
1438        Ok(mint_quotes)
1439    }
1440
1441    async fn get_melt_quote(
1442        &self,
1443        quote_id: &QuoteId,
1444    ) -> Result<Option<mint::MeltQuote>, Self::Err> {
1445        #[cfg(feature = "prometheus")]
1446        METRICS.inc_in_flight_requests("get_melt_quote");
1447
1448        #[cfg(feature = "prometheus")]
1449        let start_time = std::time::Instant::now();
1450        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1451
1452        let result = async {
1453            query(
1454                r#"
1455                SELECT
1456                    id,
1457                    unit,
1458                    amount,
1459                    request,
1460                    fee_reserve,
1461                    expiry,
1462                    state,
1463                    payment_preimage,
1464                    request_lookup_id,
1465                    created_time,
1466                    paid_time,
1467                    payment_method,
1468                    options,
1469                    request_lookup_id_kind
1470                FROM
1471                    melt_quote
1472                WHERE
1473                    id=:id
1474                "#,
1475            )?
1476            .bind("id", quote_id.to_string())
1477            .fetch_one(&*conn)
1478            .await?
1479            .map(sql_row_to_melt_quote)
1480            .transpose()
1481        }
1482        .await;
1483
1484        #[cfg(feature = "prometheus")]
1485        {
1486            let success = result.is_ok();
1487
1488            METRICS.record_mint_operation("get_melt_quote", success);
1489            METRICS.record_mint_operation_histogram(
1490                "get_melt_quote",
1491                success,
1492                start_time.elapsed().as_secs_f64(),
1493            );
1494            METRICS.dec_in_flight_requests("get_melt_quote");
1495        }
1496
1497        result
1498    }
1499
1500    async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
1501        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1502        Ok(query(
1503            r#"
1504            SELECT
1505                id,
1506                unit,
1507                amount,
1508                request,
1509                fee_reserve,
1510                expiry,
1511                state,
1512                payment_preimage,
1513                request_lookup_id,
1514                created_time,
1515                paid_time,
1516                payment_method,
1517                options,
1518                request_lookup_id_kind
1519            FROM
1520                melt_quote
1521            "#,
1522        )?
1523        .fetch_all(&*conn)
1524        .await?
1525        .into_iter()
1526        .map(sql_row_to_melt_quote)
1527        .collect::<Result<Vec<_>, _>>()?)
1528    }
1529}
1530
1531#[async_trait]
1532impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
1533where
1534    RM: DatabasePool + 'static,
1535{
1536    type Err = Error;
1537
1538    async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
1539        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1540        let mut proofs = query(
1541            r#"
1542            SELECT
1543                amount,
1544                keyset_id,
1545                secret,
1546                c,
1547                witness,
1548                y
1549            FROM
1550                proof
1551            WHERE
1552                y IN (:ys)
1553            "#,
1554        )?
1555        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
1556        .fetch_all(&*conn)
1557        .await?
1558        .into_iter()
1559        .map(|mut row| {
1560            Ok((
1561                column_as_string!(
1562                    row.pop().ok_or(Error::InvalidDbResponse)?,
1563                    PublicKey::from_hex,
1564                    PublicKey::from_slice
1565                ),
1566                sql_row_to_proof(row)?,
1567            ))
1568        })
1569        .collect::<Result<HashMap<_, _>, Error>>()?;
1570
1571        Ok(ys.iter().map(|y| proofs.remove(y)).collect())
1572    }
1573
1574    async fn get_proof_ys_by_quote_id(
1575        &self,
1576        quote_id: &QuoteId,
1577    ) -> Result<Vec<PublicKey>, Self::Err> {
1578        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1579        Ok(query(
1580            r#"
1581            SELECT
1582                amount,
1583                keyset_id,
1584                secret,
1585                c,
1586                witness
1587            FROM
1588                proof
1589            WHERE
1590                quote_id = :quote_id
1591            "#,
1592        )?
1593        .bind("quote_id", quote_id.to_string())
1594        .fetch_all(&*conn)
1595        .await?
1596        .into_iter()
1597        .map(sql_row_to_proof)
1598        .collect::<Result<Vec<Proof>, _>>()?
1599        .ys()?)
1600    }
1601
1602    async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
1603        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1604        let mut current_states = get_current_states(&*conn, ys).await?;
1605
1606        Ok(ys.iter().map(|y| current_states.remove(y)).collect())
1607    }
1608
1609    async fn get_proofs_by_keyset_id(
1610        &self,
1611        keyset_id: &Id,
1612    ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
1613        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1614        Ok(query(
1615            r#"
1616            SELECT
1617               keyset_id,
1618               amount,
1619               secret,
1620               c,
1621               witness,
1622               state
1623            FROM
1624                proof
1625            WHERE
1626                keyset_id=:keyset_id
1627            "#,
1628        )?
1629        .bind("keyset_id", keyset_id.to_string())
1630        .fetch_all(&*conn)
1631        .await?
1632        .into_iter()
1633        .map(sql_row_to_proof_with_state)
1634        .collect::<Result<Vec<_>, _>>()?
1635        .into_iter()
1636        .unzip())
1637    }
1638
1639    /// Get total proofs redeemed by keyset id
1640    async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
1641        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1642        query(
1643            r#"
1644            SELECT
1645                keyset_id,
1646                total_redeemed as amount
1647            FROM
1648                keyset_amounts
1649        "#,
1650        )?
1651        .fetch_all(&*conn)
1652        .await?
1653        .into_iter()
1654        .map(sql_row_to_hashmap_amount)
1655        .collect()
1656    }
1657}
1658
1659#[async_trait]
1660impl<RM> MintSignatureTransaction<'_> for SQLTransaction<RM>
1661where
1662    RM: DatabasePool + 'static,
1663{
1664    type Err = Error;
1665
1666    async fn add_blind_signatures(
1667        &mut self,
1668        blinded_messages: &[PublicKey],
1669        blind_signatures: &[BlindSignature],
1670        quote_id: Option<QuoteId>,
1671    ) -> Result<(), Self::Err> {
1672        let current_time = unix_time();
1673
1674        if blinded_messages.len() != blind_signatures.len() {
1675            return Err(database::Error::Internal(
1676                "Mismatched array lengths for blinded messages and blind signatures".to_string(),
1677            ));
1678        }
1679
1680        // Select all existing rows for the given blinded messages at once
1681        let mut existing_rows = query(
1682            r#"
1683            SELECT blinded_message, c, dleq_e, dleq_s
1684            FROM blind_signature
1685            WHERE blinded_message IN (:blinded_messages)
1686            FOR UPDATE
1687            "#,
1688        )?
1689        .bind_vec(
1690            "blinded_messages",
1691            blinded_messages
1692                .iter()
1693                .map(|message| message.to_bytes().to_vec())
1694                .collect(),
1695        )
1696        .fetch_all(&self.inner)
1697        .await?
1698        .into_iter()
1699        .map(|mut row| {
1700            Ok((
1701                column_as_string!(&row.remove(0), PublicKey::from_hex, PublicKey::from_slice),
1702                (row[0].clone(), row[1].clone(), row[2].clone()),
1703            ))
1704        })
1705        .collect::<Result<HashMap<_, _>, Error>>()?;
1706
1707        // Iterate over the provided blinded messages and signatures
1708        for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
1709            match existing_rows.remove(message) {
1710                None => {
1711                    // Unknown blind message: Insert new row with all columns
1712                    query(
1713                        r#"
1714                        INSERT INTO blind_signature
1715                        (blinded_message, amount, keyset_id, c, quote_id, dleq_e, dleq_s, created_time, signed_time)
1716                        VALUES
1717                        (:blinded_message, :amount, :keyset_id, :c, :quote_id, :dleq_e, :dleq_s, :created_time, :signed_time)
1718                        "#,
1719                    )?
1720                    .bind("blinded_message", message.to_bytes().to_vec())
1721                    .bind("amount", u64::from(signature.amount) as i64)
1722                    .bind("keyset_id", signature.keyset_id.to_string())
1723                    .bind("c", signature.c.to_bytes().to_vec())
1724                    .bind("quote_id", quote_id.as_ref().map(|q| q.to_string()))
1725                    .bind(
1726                        "dleq_e",
1727                        signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
1728                    )
1729                    .bind(
1730                        "dleq_s",
1731                        signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
1732                    )
1733                    .bind("created_time", current_time as i64)
1734                    .bind("signed_time", current_time as i64)
1735                    .execute(&self.inner)
1736                    .await?;
1737
1738                    query(
1739                        r#"
1740                        INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
1741                        VALUES (:keyset_id, :amount, 0)
1742                        ON CONFLICT (keyset_id)
1743                        DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
1744                        "#,
1745                    )?
1746                    .bind("amount", u64::from(signature.amount) as i64)
1747                    .bind("keyset_id", signature.keyset_id.to_string())
1748                    .execute(&self.inner)
1749                    .await?;
1750                }
1751                Some((c, _dleq_e, _dleq_s)) => {
1752                    // Blind message exists: check if c is NULL
1753                    match c {
1754                        Column::Null => {
1755                            // Blind message with no c: Update with missing columns c, dleq_e, dleq_s
1756                            query(
1757                                r#"
1758                                UPDATE blind_signature
1759                                SET c = :c, dleq_e = :dleq_e, dleq_s = :dleq_s, signed_time = :signed_time, amount = :amount
1760                                WHERE blinded_message = :blinded_message
1761                                "#,
1762                            )?
1763                            .bind("c", signature.c.to_bytes().to_vec())
1764                            .bind(
1765                                "dleq_e",
1766                                signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
1767                            )
1768                            .bind(
1769                                "dleq_s",
1770                                signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
1771                            )
1772                            .bind("blinded_message", message.to_bytes().to_vec())
1773                            .bind("signed_time", current_time as i64)
1774                            .bind("amount", u64::from(signature.amount) as i64)
1775                            .execute(&self.inner)
1776                            .await?;
1777
1778                            query(
1779                                r#"
1780                                INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
1781                                VALUES (:keyset_id, :amount, 0)
1782                                ON CONFLICT (keyset_id)
1783                                DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
1784                                "#,
1785                            )?
1786                            .bind("amount", u64::from(signature.amount) as i64)
1787                            .bind("keyset_id", signature.keyset_id.to_string())
1788                            .execute(&self.inner)
1789                            .await?;
1790                        }
1791                        _ => {
1792                            // Blind message already has c: Error
1793                            tracing::error!(
1794                                "Attempting to add signature to message already signed {}",
1795                                message
1796                            );
1797
1798                            return Err(database::Error::Duplicate);
1799                        }
1800                    }
1801                }
1802            }
1803        }
1804
1805        debug_assert!(
1806            existing_rows.is_empty(),
1807            "Unexpected existing rows remain: {:?}",
1808            existing_rows.keys().collect::<Vec<_>>()
1809        );
1810
1811        if !existing_rows.is_empty() {
1812            tracing::error!("Did not check all existing rows");
1813            return Err(Error::Internal(
1814                "Did not check all existing rows".to_string(),
1815            ));
1816        }
1817
1818        Ok(())
1819    }
1820
1821    async fn get_blind_signatures(
1822        &mut self,
1823        blinded_messages: &[PublicKey],
1824    ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
1825        let mut blinded_signatures = query(
1826            r#"SELECT
1827                keyset_id,
1828                amount,
1829                c,
1830                dleq_e,
1831                dleq_s,
1832                blinded_message
1833            FROM
1834                blind_signature
1835            WHERE blinded_message IN (:b) AND c IS NOT NULL
1836            "#,
1837        )?
1838        .bind_vec(
1839            "b",
1840            blinded_messages
1841                .iter()
1842                .map(|b| b.to_bytes().to_vec())
1843                .collect(),
1844        )
1845        .fetch_all(&self.inner)
1846        .await?
1847        .into_iter()
1848        .map(|mut row| {
1849            Ok((
1850                column_as_string!(
1851                    &row.pop().ok_or(Error::InvalidDbResponse)?,
1852                    PublicKey::from_hex,
1853                    PublicKey::from_slice
1854                ),
1855                sql_row_to_blind_signature(row)?,
1856            ))
1857        })
1858        .collect::<Result<HashMap<_, _>, Error>>()?;
1859        Ok(blinded_messages
1860            .iter()
1861            .map(|y| blinded_signatures.remove(y))
1862            .collect())
1863    }
1864}
1865
1866#[async_trait]
1867impl<RM> MintSignaturesDatabase for SQLMintDatabase<RM>
1868where
1869    RM: DatabasePool + 'static,
1870{
1871    type Err = Error;
1872
1873    async fn get_blind_signatures(
1874        &self,
1875        blinded_messages: &[PublicKey],
1876    ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
1877        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1878        let mut blinded_signatures = query(
1879            r#"SELECT
1880                keyset_id,
1881                amount,
1882                c,
1883                dleq_e,
1884                dleq_s,
1885                blinded_message
1886            FROM
1887                blind_signature
1888            WHERE blinded_message IN (:b) AND c IS NOT NULL
1889            "#,
1890        )?
1891        .bind_vec(
1892            "b",
1893            blinded_messages
1894                .iter()
1895                .map(|b_| b_.to_bytes().to_vec())
1896                .collect(),
1897        )
1898        .fetch_all(&*conn)
1899        .await?
1900        .into_iter()
1901        .map(|mut row| {
1902            Ok((
1903                column_as_string!(
1904                    &row.pop().ok_or(Error::InvalidDbResponse)?,
1905                    PublicKey::from_hex,
1906                    PublicKey::from_slice
1907                ),
1908                sql_row_to_blind_signature(row)?,
1909            ))
1910        })
1911        .collect::<Result<HashMap<_, _>, Error>>()?;
1912        Ok(blinded_messages
1913            .iter()
1914            .map(|y| blinded_signatures.remove(y))
1915            .collect())
1916    }
1917
1918    async fn get_blind_signatures_for_keyset(
1919        &self,
1920        keyset_id: &Id,
1921    ) -> Result<Vec<BlindSignature>, Self::Err> {
1922        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1923        Ok(query(
1924            r#"
1925            SELECT
1926                keyset_id,
1927                amount,
1928                c,
1929                dleq_e,
1930                dleq_s
1931            FROM
1932                blind_signature
1933            WHERE
1934                keyset_id=:keyset_id AND c IS NOT NULL
1935            "#,
1936        )?
1937        .bind("keyset_id", keyset_id.to_string())
1938        .fetch_all(&*conn)
1939        .await?
1940        .into_iter()
1941        .map(sql_row_to_blind_signature)
1942        .collect::<Result<Vec<BlindSignature>, _>>()?)
1943    }
1944
1945    /// Get [`BlindSignature`]s for quote
1946    async fn get_blind_signatures_for_quote(
1947        &self,
1948        quote_id: &QuoteId,
1949    ) -> Result<Vec<BlindSignature>, Self::Err> {
1950        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1951        Ok(query(
1952            r#"
1953            SELECT
1954                keyset_id,
1955                amount,
1956                c,
1957                dleq_e,
1958                dleq_s
1959            FROM
1960                blind_signature
1961            WHERE
1962                quote_id=:quote_id AND c IS NOT NULL
1963            "#,
1964        )?
1965        .bind("quote_id", quote_id.to_string())
1966        .fetch_all(&*conn)
1967        .await?
1968        .into_iter()
1969        .map(sql_row_to_blind_signature)
1970        .collect::<Result<Vec<BlindSignature>, _>>()?)
1971    }
1972
1973    /// Get total proofs redeemed by keyset id
1974    async fn get_total_issued(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
1975        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1976        query(
1977            r#"
1978            SELECT
1979                keyset_id,
1980                total_issued as amount
1981            FROM
1982                keyset_amounts
1983        "#,
1984        )?
1985        .fetch_all(&*conn)
1986        .await?
1987        .into_iter()
1988        .map(sql_row_to_hashmap_amount)
1989        .collect()
1990    }
1991}
1992
1993#[async_trait]
1994impl<RM> database::MintKVStoreTransaction<'_, Error> for SQLTransaction<RM>
1995where
1996    RM: DatabasePool + 'static,
1997{
1998    async fn kv_read(
1999        &mut self,
2000        primary_namespace: &str,
2001        secondary_namespace: &str,
2002        key: &str,
2003    ) -> Result<Option<Vec<u8>>, Error> {
2004        // Validate parameters according to KV store requirements
2005        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2006        Ok(query(
2007            r#"
2008            SELECT value
2009            FROM kv_store
2010            WHERE primary_namespace = :primary_namespace
2011            AND secondary_namespace = :secondary_namespace
2012            AND key = :key
2013            "#,
2014        )?
2015        .bind("primary_namespace", primary_namespace.to_owned())
2016        .bind("secondary_namespace", secondary_namespace.to_owned())
2017        .bind("key", key.to_owned())
2018        .pluck(&self.inner)
2019        .await?
2020        .and_then(|col| match col {
2021            Column::Blob(data) => Some(data),
2022            _ => None,
2023        }))
2024    }
2025
2026    async fn kv_write(
2027        &mut self,
2028        primary_namespace: &str,
2029        secondary_namespace: &str,
2030        key: &str,
2031        value: &[u8],
2032    ) -> Result<(), Error> {
2033        // Validate parameters according to KV store requirements
2034        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2035
2036        let current_time = unix_time();
2037
2038        query(
2039            r#"
2040            INSERT INTO kv_store
2041            (primary_namespace, secondary_namespace, key, value, created_time, updated_time)
2042            VALUES (:primary_namespace, :secondary_namespace, :key, :value, :created_time, :updated_time)
2043            ON CONFLICT(primary_namespace, secondary_namespace, key)
2044            DO UPDATE SET
2045                value = excluded.value,
2046                updated_time = excluded.updated_time
2047            "#,
2048        )?
2049        .bind("primary_namespace", primary_namespace.to_owned())
2050        .bind("secondary_namespace", secondary_namespace.to_owned())
2051        .bind("key", key.to_owned())
2052        .bind("value", value.to_vec())
2053        .bind("created_time", current_time as i64)
2054        .bind("updated_time", current_time as i64)
2055        .execute(&self.inner)
2056        .await?;
2057
2058        Ok(())
2059    }
2060
2061    async fn kv_remove(
2062        &mut self,
2063        primary_namespace: &str,
2064        secondary_namespace: &str,
2065        key: &str,
2066    ) -> Result<(), Error> {
2067        // Validate parameters according to KV store requirements
2068        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2069        query(
2070            r#"
2071            DELETE FROM kv_store
2072            WHERE primary_namespace = :primary_namespace
2073            AND secondary_namespace = :secondary_namespace
2074            AND key = :key
2075            "#,
2076        )?
2077        .bind("primary_namespace", primary_namespace.to_owned())
2078        .bind("secondary_namespace", secondary_namespace.to_owned())
2079        .bind("key", key.to_owned())
2080        .execute(&self.inner)
2081        .await?;
2082
2083        Ok(())
2084    }
2085
2086    async fn kv_list(
2087        &mut self,
2088        primary_namespace: &str,
2089        secondary_namespace: &str,
2090    ) -> Result<Vec<String>, Error> {
2091        // Validate namespace parameters according to KV store requirements
2092        cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
2093        cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
2094
2095        // Check empty namespace rules
2096        if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
2097            return Err(Error::KVStoreInvalidKey(
2098                "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
2099            ));
2100        }
2101        Ok(query(
2102            r#"
2103            SELECT key
2104            FROM kv_store
2105            WHERE primary_namespace = :primary_namespace
2106            AND secondary_namespace = :secondary_namespace
2107            ORDER BY key
2108            "#,
2109        )?
2110        .bind("primary_namespace", primary_namespace.to_owned())
2111        .bind("secondary_namespace", secondary_namespace.to_owned())
2112        .fetch_all(&self.inner)
2113        .await?
2114        .into_iter()
2115        .map(|row| Ok(column_as_string!(&row[0])))
2116        .collect::<Result<Vec<_>, Error>>()?)
2117    }
2118}
2119
2120#[async_trait]
2121impl<RM> database::MintKVStoreDatabase for SQLMintDatabase<RM>
2122where
2123    RM: DatabasePool + 'static,
2124{
2125    type Err = Error;
2126
2127    async fn kv_read(
2128        &self,
2129        primary_namespace: &str,
2130        secondary_namespace: &str,
2131        key: &str,
2132    ) -> Result<Option<Vec<u8>>, Error> {
2133        // Validate parameters according to KV store requirements
2134        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2135
2136        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2137        Ok(query(
2138            r#"
2139            SELECT value
2140            FROM kv_store
2141            WHERE primary_namespace = :primary_namespace
2142            AND secondary_namespace = :secondary_namespace
2143            AND key = :key
2144            "#,
2145        )?
2146        .bind("primary_namespace", primary_namespace.to_owned())
2147        .bind("secondary_namespace", secondary_namespace.to_owned())
2148        .bind("key", key.to_owned())
2149        .pluck(&*conn)
2150        .await?
2151        .and_then(|col| match col {
2152            Column::Blob(data) => Some(data),
2153            _ => None,
2154        }))
2155    }
2156
2157    async fn kv_list(
2158        &self,
2159        primary_namespace: &str,
2160        secondary_namespace: &str,
2161    ) -> Result<Vec<String>, Error> {
2162        // Validate namespace parameters according to KV store requirements
2163        cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
2164        cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
2165
2166        // Check empty namespace rules
2167        if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
2168            return Err(Error::KVStoreInvalidKey(
2169                "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
2170            ));
2171        }
2172
2173        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2174        Ok(query(
2175            r#"
2176            SELECT key
2177            FROM kv_store
2178            WHERE primary_namespace = :primary_namespace
2179            AND secondary_namespace = :secondary_namespace
2180            ORDER BY key
2181            "#,
2182        )?
2183        .bind("primary_namespace", primary_namespace.to_owned())
2184        .bind("secondary_namespace", secondary_namespace.to_owned())
2185        .fetch_all(&*conn)
2186        .await?
2187        .into_iter()
2188        .map(|row| Ok(column_as_string!(&row[0])))
2189        .collect::<Result<Vec<_>, Error>>()?)
2190    }
2191}
2192
2193#[async_trait]
2194impl<RM> database::MintKVStore for SQLMintDatabase<RM>
2195where
2196    RM: DatabasePool + 'static,
2197{
2198    async fn begin_transaction<'a>(
2199        &'a self,
2200    ) -> Result<Box<dyn database::MintKVStoreTransaction<'a, Self::Err> + Send + Sync + 'a>, Error>
2201    {
2202        Ok(Box::new(SQLTransaction {
2203            inner: ConnectionWithTransaction::new(
2204                self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
2205            )
2206            .await?,
2207        }))
2208    }
2209}
2210
2211#[async_trait]
2212impl<RM> SagaTransaction<'_> for SQLTransaction<RM>
2213where
2214    RM: DatabasePool + 'static,
2215{
2216    type Err = Error;
2217
2218    async fn get_saga(
2219        &mut self,
2220        operation_id: &uuid::Uuid,
2221    ) -> Result<Option<mint::Saga>, Self::Err> {
2222        Ok(query(
2223            r#"
2224            SELECT
2225                operation_id,
2226                operation_kind,
2227                state,
2228                blinded_secrets,
2229                input_ys,
2230                quote_id,
2231                created_at,
2232                updated_at
2233            FROM
2234                saga_state
2235            WHERE
2236                operation_id = :operation_id
2237            FOR UPDATE
2238            "#,
2239        )?
2240        .bind("operation_id", operation_id.to_string())
2241        .fetch_one(&self.inner)
2242        .await?
2243        .map(sql_row_to_saga)
2244        .transpose()?)
2245    }
2246
2247    async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
2248        let current_time = unix_time();
2249
2250        let blinded_secrets_json = serde_json::to_string(&saga.blinded_secrets)
2251            .map_err(|e| Error::Internal(format!("Failed to serialize blinded_secrets: {e}")))?;
2252
2253        let input_ys_json = serde_json::to_string(&saga.input_ys)
2254            .map_err(|e| Error::Internal(format!("Failed to serialize input_ys: {e}")))?;
2255
2256        query(
2257            r#"
2258            INSERT INTO saga_state
2259            (operation_id, operation_kind, state, blinded_secrets, input_ys, quote_id, created_at, updated_at)
2260            VALUES
2261            (:operation_id, :operation_kind, :state, :blinded_secrets, :input_ys, :quote_id, :created_at, :updated_at)
2262            "#,
2263        )?
2264        .bind("operation_id", saga.operation_id.to_string())
2265        .bind("operation_kind", saga.operation_kind.to_string())
2266        .bind("state", saga.state.state())
2267        .bind("blinded_secrets", blinded_secrets_json)
2268        .bind("input_ys", input_ys_json)
2269        .bind("quote_id", saga.quote_id.as_deref())
2270        .bind("created_at", saga.created_at as i64)
2271        .bind("updated_at", current_time as i64)
2272        .execute(&self.inner)
2273        .await?;
2274
2275        Ok(())
2276    }
2277
2278    async fn update_saga(
2279        &mut self,
2280        operation_id: &uuid::Uuid,
2281        new_state: mint::SagaStateEnum,
2282    ) -> Result<(), Self::Err> {
2283        let current_time = unix_time();
2284
2285        query(
2286            r#"
2287            UPDATE saga_state
2288            SET state = :state, updated_at = :updated_at
2289            WHERE operation_id = :operation_id
2290            "#,
2291        )?
2292        .bind("state", new_state.state())
2293        .bind("updated_at", current_time as i64)
2294        .bind("operation_id", operation_id.to_string())
2295        .execute(&self.inner)
2296        .await?;
2297
2298        Ok(())
2299    }
2300
2301    async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
2302        query(
2303            r#"
2304            DELETE FROM saga_state
2305            WHERE operation_id = :operation_id
2306            "#,
2307        )?
2308        .bind("operation_id", operation_id.to_string())
2309        .execute(&self.inner)
2310        .await?;
2311
2312        Ok(())
2313    }
2314}
2315
2316#[async_trait]
2317impl<RM> SagaDatabase for SQLMintDatabase<RM>
2318where
2319    RM: DatabasePool + 'static,
2320{
2321    type Err = Error;
2322
2323    async fn get_incomplete_sagas(
2324        &self,
2325        operation_kind: mint::OperationKind,
2326    ) -> Result<Vec<mint::Saga>, Self::Err> {
2327        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2328        Ok(query(
2329            r#"
2330            SELECT
2331                operation_id,
2332                operation_kind,
2333                state,
2334                blinded_secrets,
2335                input_ys,
2336                quote_id,
2337                created_at,
2338                updated_at
2339            FROM
2340                saga_state
2341            WHERE
2342                operation_kind = :operation_kind
2343            ORDER BY created_at ASC
2344            "#,
2345        )?
2346        .bind("operation_kind", operation_kind.to_string())
2347        .fetch_all(&*conn)
2348        .await?
2349        .into_iter()
2350        .map(sql_row_to_saga)
2351        .collect::<Result<Vec<_>, _>>()?)
2352    }
2353}
2354
2355#[async_trait]
2356impl<RM> MintDatabase<Error> for SQLMintDatabase<RM>
2357where
2358    RM: DatabasePool + 'static,
2359{
2360    async fn begin_transaction<'a>(
2361        &'a self,
2362    ) -> Result<Box<dyn database::MintTransaction<'a, Error> + Send + Sync + 'a>, Error> {
2363        let tx = SQLTransaction {
2364            inner: ConnectionWithTransaction::new(
2365                self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
2366            )
2367            .await?,
2368        };
2369
2370        Ok(Box::new(tx))
2371    }
2372}
2373
2374fn sql_row_to_keyset_info(row: Vec<Column>) -> Result<MintKeySetInfo, Error> {
2375    unpack_into!(
2376        let (
2377            id,
2378            unit,
2379            active,
2380            valid_from,
2381            valid_to,
2382            derivation_path,
2383            derivation_path_index,
2384            max_order,
2385            amounts,
2386            row_keyset_ppk
2387        ) = row
2388    );
2389
2390    let max_order: u8 = column_as_number!(max_order);
2391    let amounts = column_as_nullable_string!(amounts)
2392        .and_then(|str| serde_json::from_str(&str).ok())
2393        .unwrap_or_else(|| (0..max_order).map(|m| 2u64.pow(m.into())).collect());
2394
2395    Ok(MintKeySetInfo {
2396        id: column_as_string!(id, Id::from_str, Id::from_bytes),
2397        unit: column_as_string!(unit, CurrencyUnit::from_str),
2398        active: matches!(active, Column::Integer(1)),
2399        valid_from: column_as_number!(valid_from),
2400        derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
2401        derivation_path_index: column_as_nullable_number!(derivation_path_index),
2402        max_order,
2403        amounts,
2404        input_fee_ppk: column_as_number!(row_keyset_ppk),
2405        final_expiry: column_as_nullable_number!(valid_to),
2406    })
2407}
2408
2409#[instrument(skip_all)]
2410fn sql_row_to_mint_quote(
2411    row: Vec<Column>,
2412    payments: Vec<IncomingPayment>,
2413    issueances: Vec<Issuance>,
2414) -> Result<MintQuote, Error> {
2415    unpack_into!(
2416        let (
2417            id, amount, unit, request, expiry, request_lookup_id,
2418            pubkey, created_time, amount_paid, amount_issued, payment_method, request_lookup_id_kind
2419        ) = row
2420    );
2421
2422    let request_str = column_as_string!(&request);
2423    let request_lookup_id = column_as_nullable_string!(&request_lookup_id).unwrap_or_else(|| {
2424        Bolt11Invoice::from_str(&request_str)
2425            .map(|invoice| invoice.payment_hash().to_string())
2426            .unwrap_or_else(|_| request_str.clone())
2427    });
2428    let request_lookup_id_kind = column_as_string!(request_lookup_id_kind);
2429
2430    let pubkey = column_as_nullable_string!(&pubkey)
2431        .map(|pk| PublicKey::from_hex(&pk))
2432        .transpose()?;
2433
2434    let id = column_as_string!(id);
2435    let amount: Option<u64> = column_as_nullable_number!(amount);
2436    let amount_paid: u64 = column_as_number!(amount_paid);
2437    let amount_issued: u64 = column_as_number!(amount_issued);
2438    let payment_method = column_as_string!(payment_method, PaymentMethod::from_str);
2439
2440    Ok(MintQuote::new(
2441        Some(QuoteId::from_str(&id)?),
2442        request_str,
2443        column_as_string!(unit, CurrencyUnit::from_str),
2444        amount.map(Amount::from),
2445        column_as_number!(expiry),
2446        PaymentIdentifier::new(&request_lookup_id_kind, &request_lookup_id)
2447            .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
2448        pubkey,
2449        amount_paid.into(),
2450        amount_issued.into(),
2451        payment_method,
2452        column_as_number!(created_time),
2453        payments,
2454        issueances,
2455    ))
2456}
2457
2458fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<mint::MeltQuote, Error> {
2459    unpack_into!(
2460        let (
2461                id,
2462                unit,
2463                amount,
2464                request,
2465                fee_reserve,
2466                expiry,
2467                state,
2468                payment_preimage,
2469                request_lookup_id,
2470                created_time,
2471                paid_time,
2472                payment_method,
2473                options,
2474                request_lookup_id_kind
2475        ) = row
2476    );
2477
2478    let id = column_as_string!(id);
2479    let amount: u64 = column_as_number!(amount);
2480    let fee_reserve: u64 = column_as_number!(fee_reserve);
2481
2482    let expiry = column_as_number!(expiry);
2483    let payment_preimage = column_as_nullable_string!(payment_preimage);
2484    let options = column_as_nullable_string!(options);
2485    let options = options.and_then(|o| serde_json::from_str(&o).ok());
2486    let created_time: i64 = column_as_number!(created_time);
2487    let paid_time = column_as_nullable_number!(paid_time);
2488    let payment_method = PaymentMethod::from_str(&column_as_string!(payment_method))?;
2489
2490    let state =
2491        MeltQuoteState::from_str(&column_as_string!(&state)).map_err(ConversionError::from)?;
2492
2493    let unit = column_as_string!(unit);
2494    let request = column_as_string!(request);
2495
2496    let request_lookup_id_kind = column_as_nullable_string!(request_lookup_id_kind);
2497
2498    let request_lookup_id = column_as_nullable_string!(&request_lookup_id).or_else(|| {
2499        Bolt11Invoice::from_str(&request)
2500            .ok()
2501            .map(|invoice| invoice.payment_hash().to_string())
2502    });
2503
2504    let request_lookup_id = if let (Some(id_kind), Some(request_lookup_id)) =
2505        (request_lookup_id_kind, request_lookup_id)
2506    {
2507        Some(
2508            PaymentIdentifier::new(&id_kind, &request_lookup_id)
2509                .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
2510        )
2511    } else {
2512        None
2513    };
2514
2515    let request = match serde_json::from_str(&request) {
2516        Ok(req) => req,
2517        Err(err) => {
2518            tracing::debug!(
2519                "Melt quote from pre migrations defaulting to bolt11 {}.",
2520                err
2521            );
2522            let bolt11 = Bolt11Invoice::from_str(&request).unwrap();
2523            MeltPaymentRequest::Bolt11 { bolt11 }
2524        }
2525    };
2526
2527    Ok(MeltQuote {
2528        id: QuoteId::from_str(&id)?,
2529        unit: CurrencyUnit::from_str(&unit)?,
2530        amount: Amount::from(amount),
2531        request,
2532        fee_reserve: Amount::from(fee_reserve),
2533        state,
2534        expiry,
2535        payment_preimage,
2536        request_lookup_id,
2537        options,
2538        created_time: created_time as u64,
2539        paid_time,
2540        payment_method,
2541    })
2542}
2543
2544fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
2545    unpack_into!(
2546        let (
2547            amount,
2548            keyset_id,
2549            secret,
2550            c,
2551            witness
2552        ) = row
2553    );
2554
2555    let amount: u64 = column_as_number!(amount);
2556    Ok(Proof {
2557        amount: Amount::from(amount),
2558        keyset_id: column_as_string!(keyset_id, Id::from_str),
2559        secret: column_as_string!(secret, Secret::from_str),
2560        c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2561        witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
2562        dleq: None,
2563    })
2564}
2565
2566fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
2567    unpack_into!(
2568        let (
2569            keyset_id, amount
2570        ) = row
2571    );
2572
2573    let amount: u64 = column_as_number!(amount);
2574    Ok((
2575        column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2576        Amount::from(amount),
2577    ))
2578}
2579
2580fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, Option<State>), Error> {
2581    unpack_into!(
2582        let (
2583            keyset_id, amount, secret, c, witness, state
2584        ) = row
2585    );
2586
2587    let amount: u64 = column_as_number!(amount);
2588    let state = column_as_nullable_string!(state).and_then(|s| State::from_str(&s).ok());
2589
2590    Ok((
2591        Proof {
2592            amount: Amount::from(amount),
2593            keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2594            secret: column_as_string!(secret, Secret::from_str),
2595            c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2596            witness: column_as_nullable_string!(witness)
2597                .and_then(|w| serde_json::from_str(&w).ok()),
2598            dleq: None,
2599        },
2600        state,
2601    ))
2602}
2603
2604fn sql_row_to_blind_signature(row: Vec<Column>) -> Result<BlindSignature, Error> {
2605    unpack_into!(
2606        let (
2607            keyset_id, amount, c, dleq_e, dleq_s
2608        ) = row
2609    );
2610
2611    let dleq = match (
2612        column_as_nullable_string!(dleq_e),
2613        column_as_nullable_string!(dleq_s),
2614    ) {
2615        (Some(e), Some(s)) => Some(BlindSignatureDleq {
2616            e: SecretKey::from_hex(e)?,
2617            s: SecretKey::from_hex(s)?,
2618        }),
2619        _ => None,
2620    };
2621
2622    let amount: u64 = column_as_number!(amount);
2623
2624    Ok(BlindSignature {
2625        amount: Amount::from(amount),
2626        keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2627        c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2628        dleq,
2629    })
2630}
2631
2632fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
2633    unpack_into!(
2634        let (
2635            operation_id,
2636            operation_kind,
2637            state,
2638            blinded_secrets,
2639            input_ys,
2640            quote_id,
2641            created_at,
2642            updated_at
2643        ) = row
2644    );
2645
2646    let operation_id_str = column_as_string!(&operation_id);
2647    let operation_id = uuid::Uuid::parse_str(&operation_id_str)
2648        .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
2649
2650    let operation_kind_str = column_as_string!(&operation_kind);
2651    let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
2652        .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
2653
2654    let state_str = column_as_string!(&state);
2655    let state = mint::SagaStateEnum::new(operation_kind, &state_str)
2656        .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
2657
2658    let blinded_secrets_str = column_as_string!(&blinded_secrets);
2659    let blinded_secrets: Vec<PublicKey> = serde_json::from_str(&blinded_secrets_str)
2660        .map_err(|e| Error::Internal(format!("Failed to deserialize blinded_secrets: {e}")))?;
2661
2662    let input_ys_str = column_as_string!(&input_ys);
2663    let input_ys: Vec<PublicKey> = serde_json::from_str(&input_ys_str)
2664        .map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {e}")))?;
2665
2666    let quote_id = match &quote_id {
2667        Column::Text(s) => {
2668            if s.is_empty() {
2669                None
2670            } else {
2671                Some(s.clone())
2672            }
2673        }
2674        Column::Null => None,
2675        _ => None,
2676    };
2677
2678    let created_at: u64 = column_as_number!(created_at);
2679    let updated_at: u64 = column_as_number!(updated_at);
2680
2681    Ok(mint::Saga {
2682        operation_id,
2683        operation_kind,
2684        state,
2685        blinded_secrets,
2686        input_ys,
2687        quote_id,
2688        created_at,
2689        updated_at,
2690    })
2691}
2692
2693#[cfg(test)]
2694mod test {
2695    use super::*;
2696
2697    mod max_order_to_amounts_migrations {
2698        use super::*;
2699
2700        #[test]
2701        fn legacy_payload() {
2702            let result = sql_row_to_keyset_info(vec![
2703                Column::Text("0083a60439303340".to_owned()),
2704                Column::Text("sat".to_owned()),
2705                Column::Integer(1),
2706                Column::Integer(1749844864),
2707                Column::Null,
2708                Column::Text("0'/0'/0'".to_owned()),
2709                Column::Integer(0),
2710                Column::Integer(32),
2711                Column::Null,
2712                Column::Integer(0),
2713            ]);
2714            assert!(result.is_ok());
2715        }
2716
2717        #[test]
2718        fn migrated_payload() {
2719            let legacy = sql_row_to_keyset_info(vec![
2720                Column::Text("0083a60439303340".to_owned()),
2721                Column::Text("sat".to_owned()),
2722                Column::Integer(1),
2723                Column::Integer(1749844864),
2724                Column::Null,
2725                Column::Text("0'/0'/0'".to_owned()),
2726                Column::Integer(0),
2727                Column::Integer(32),
2728                Column::Null,
2729                Column::Integer(0),
2730            ]);
2731            assert!(legacy.is_ok());
2732
2733            let amounts = (0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
2734            let migrated = sql_row_to_keyset_info(vec![
2735                Column::Text("0083a60439303340".to_owned()),
2736                Column::Text("sat".to_owned()),
2737                Column::Integer(1),
2738                Column::Integer(1749844864),
2739                Column::Null,
2740                Column::Text("0'/0'/0'".to_owned()),
2741                Column::Integer(0),
2742                Column::Integer(32),
2743                Column::Text(serde_json::to_string(&amounts).expect("valid json")),
2744                Column::Integer(0),
2745            ]);
2746            assert!(migrated.is_ok());
2747            assert_eq!(legacy.unwrap(), migrated.unwrap());
2748        }
2749
2750        #[test]
2751        fn amounts_over_max_order() {
2752            let legacy = sql_row_to_keyset_info(vec![
2753                Column::Text("0083a60439303340".to_owned()),
2754                Column::Text("sat".to_owned()),
2755                Column::Integer(1),
2756                Column::Integer(1749844864),
2757                Column::Null,
2758                Column::Text("0'/0'/0'".to_owned()),
2759                Column::Integer(0),
2760                Column::Integer(32),
2761                Column::Null,
2762                Column::Integer(0),
2763            ]);
2764            assert!(legacy.is_ok());
2765
2766            let amounts = (0..16).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
2767            let migrated = sql_row_to_keyset_info(vec![
2768                Column::Text("0083a60439303340".to_owned()),
2769                Column::Text("sat".to_owned()),
2770                Column::Integer(1),
2771                Column::Integer(1749844864),
2772                Column::Null,
2773                Column::Text("0'/0'/0'".to_owned()),
2774                Column::Integer(0),
2775                Column::Integer(32),
2776                Column::Text(serde_json::to_string(&amounts).expect("valid json")),
2777                Column::Integer(0),
2778            ]);
2779            assert!(migrated.is_ok());
2780            let migrated = migrated.unwrap();
2781            assert_ne!(legacy.unwrap(), migrated);
2782            assert_eq!(migrated.amounts.len(), 16);
2783        }
2784    }
2785}