cdk_sql_common/mint/
mod.rs

1//! SQL database implementation of the Mint
2//!
3//! This is a generic SQL implementation for the mint storage layer. Any database can be plugged in
4//! as long as standard ANSI SQL is used, as Postgres and SQLite would understand it.
5//!
6//! This implementation also has a rudimentary but standard migration and versioning system.
7//!
8//! The trait expects an asynchronous interaction, but it also provides tools to spawn blocking
9//! clients in a pool and expose them to an asynchronous environment, making them compatible with
10//! Mint.
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use bitcoin::bip32::DerivationPath;
18use cdk_common::database::mint::{validate_kvstore_params, SagaDatabase, SagaTransaction};
19use cdk_common::database::{
20    self, ConversionError, Error, MintDatabase, MintDbWriterFinalizer, MintKeyDatabaseTransaction,
21    MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, MintQuotesTransaction,
22    MintSignatureTransaction, MintSignaturesDatabase,
23};
24use cdk_common::mint::{
25    self, IncomingPayment, Issuance, MeltPaymentRequest, MeltQuote, MintKeySetInfo, MintQuote,
26    Operation,
27};
28use cdk_common::nut00::ProofsMethods;
29use cdk_common::payment::PaymentIdentifier;
30use cdk_common::quote_id::QuoteId;
31use cdk_common::secret::Secret;
32use cdk_common::state::{check_melt_quote_state_transition, check_state_transition};
33use cdk_common::util::unix_time;
34use cdk_common::{
35    Amount, BlindSignature, BlindSignatureDleq, BlindedMessage, CurrencyUnit, Id, MeltQuoteState,
36    PaymentMethod, Proof, Proofs, PublicKey, SecretKey, State,
37};
38use lightning_invoice::Bolt11Invoice;
39use migrations::MIGRATIONS;
40use tracing::instrument;
41
42use crate::common::migrate;
43use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
44use crate::pool::{DatabasePool, Pool, PooledResource};
45use crate::stmt::{query, Column};
46use crate::{
47    column_as_nullable_number, column_as_nullable_string, column_as_number, column_as_string,
48    unpack_into,
49};
50
51#[cfg(feature = "auth")]
52mod auth;
53
54#[rustfmt::skip]
55mod migrations {
56    include!(concat!(env!("OUT_DIR"), "/migrations_mint.rs"));
57}
58
59#[cfg(feature = "auth")]
60pub use auth::SQLMintAuthDatabase;
61#[cfg(feature = "prometheus")]
62use cdk_prometheus::METRICS;
63
64/// Mint SQL Database
65#[derive(Debug, Clone)]
66pub struct SQLMintDatabase<RM>
67where
68    RM: DatabasePool + 'static,
69{
70    pool: Arc<Pool<RM>>,
71}
72
73/// SQL Transaction Writer
74pub struct SQLTransaction<RM>
75where
76    RM: DatabasePool + 'static,
77{
78    inner: ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
79}
80
81#[inline(always)]
82async fn get_current_states<C>(
83    conn: &C,
84    ys: &[PublicKey],
85) -> Result<HashMap<PublicKey, State>, Error>
86where
87    C: DatabaseExecutor + Send + Sync,
88{
89    if ys.is_empty() {
90        return Ok(Default::default());
91    }
92    query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)?
93        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
94        .fetch_all(conn)
95        .await?
96        .into_iter()
97        .map(|row| {
98            Ok((
99                column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
100                column_as_string!(&row[1], State::from_str),
101            ))
102        })
103        .collect::<Result<HashMap<_, _>, _>>()
104}
105
106impl<RM> SQLMintDatabase<RM>
107where
108    RM: DatabasePool + 'static,
109{
110    /// Creates a new instance
111    pub async fn new<X>(db: X) -> Result<Self, Error>
112    where
113        X: Into<RM::Config>,
114    {
115        let pool = Pool::new(db.into());
116
117        Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
118
119        Ok(Self { pool })
120    }
121
122    /// Migrate
123    async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
124        let tx = ConnectionWithTransaction::new(conn).await?;
125        migrate(&tx, RM::Connection::name(), MIGRATIONS).await?;
126        tx.commit().await?;
127        Ok(())
128    }
129}
130
131#[async_trait]
132impl<RM> database::MintProofsTransaction<'_> for SQLTransaction<RM>
133where
134    RM: DatabasePool + 'static,
135{
136    type Err = Error;
137
138    async fn add_proofs(
139        &mut self,
140        proofs: Proofs,
141        quote_id: Option<QuoteId>,
142        operation: &Operation,
143    ) -> Result<(), Self::Err> {
144        let current_time = unix_time();
145
146        // Check any previous proof, this query should return None in order to proceed storing
147        // Any result here would error
148        match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
149            .bind_vec(
150                "ys",
151                proofs
152                    .iter()
153                    .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
154                    .collect::<Result<_, _>>()?,
155            )
156            .pluck(&self.inner)
157            .await?
158            .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
159            .transpose()?
160        {
161            Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
162            Some(_) => Err(database::Error::Duplicate),
163            None => Ok(()), // no previous record
164        }?;
165
166        for proof in proofs {
167            query(
168                r#"
169                  INSERT INTO proof
170                  (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
171                  VALUES
172                  (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
173                  "#,
174            )?
175            .bind("y", proof.y()?.to_bytes().to_vec())
176            .bind("amount", proof.amount.to_i64())
177            .bind("keyset_id", proof.keyset_id.to_string())
178            .bind("secret", proof.secret.to_string())
179            .bind("c", proof.c.to_bytes().to_vec())
180            .bind(
181                "witness",
182                proof.witness.map(|w| serde_json::to_string(&w).unwrap()),
183            )
184            .bind("state", "UNSPENT".to_string())
185            .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
186            .bind("created_time", current_time as i64)
187            .bind("operation_kind", operation.kind())
188            .bind("operation_id", operation.id().to_string())
189            .execute(&self.inner)
190            .await?;
191        }
192
193        Ok(())
194    }
195
196    async fn update_proofs_states(
197        &mut self,
198        ys: &[PublicKey],
199        new_state: State,
200    ) -> Result<Vec<Option<State>>, Self::Err> {
201        let mut current_states = get_current_states(&self.inner, ys).await?;
202
203        if current_states.len() != ys.len() {
204            tracing::warn!(
205                "Attempted to update state of non-existent proof {} {}",
206                current_states.len(),
207                ys.len()
208            );
209            return Err(database::Error::ProofNotFound);
210        }
211
212        for state in current_states.values() {
213            check_state_transition(*state, new_state)?;
214        }
215
216        query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
217            .bind("new_state", new_state.to_string())
218            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
219            .execute(&self.inner)
220            .await?;
221
222        if new_state == State::Spent {
223            query(
224                r#"
225                INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
226                SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
227                FROM proof
228                WHERE y IN (:ys)
229                GROUP BY keyset_id
230                ON CONFLICT (keyset_id)
231                DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
232                "#,
233            )?
234            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
235            .execute(&self.inner)
236            .await?;
237        }
238
239        Ok(ys.iter().map(|y| current_states.remove(y)).collect())
240    }
241
242    async fn remove_proofs(
243        &mut self,
244        ys: &[PublicKey],
245        _quote_id: Option<QuoteId>,
246    ) -> Result<(), Self::Err> {
247        if ys.is_empty() {
248            return Ok(());
249        }
250        let total_deleted = query(
251            r#"
252            DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
253            "#,
254        )?
255        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
256        .bind_vec("exclude_state", vec![State::Spent.to_string()])
257        .execute(&self.inner)
258        .await?;
259
260        if total_deleted != ys.len() {
261            // Query current states to provide detailed logging
262            let current_states = get_current_states(&self.inner, ys).await?;
263
264            let missing_count = ys.len() - current_states.len();
265            let spent_count = current_states
266                .values()
267                .filter(|s| **s == State::Spent)
268                .count();
269
270            if missing_count > 0 {
271                tracing::warn!(
272                    "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
273                    missing_count,
274                    ys.len()
275                );
276            }
277
278            if spent_count > 0 {
279                tracing::warn!(
280                    "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
281                    spent_count,
282                    ys.len()
283                );
284            }
285
286            tracing::debug!(
287                "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
288                ys.len(),
289                total_deleted,
290                missing_count,
291                spent_count
292            );
293
294            return Err(Self::Err::AttemptRemoveSpentProof);
295        }
296
297        Ok(())
298    }
299
300    async fn get_proof_ys_by_quote_id(
301        &self,
302        quote_id: &QuoteId,
303    ) -> Result<Vec<PublicKey>, Self::Err> {
304        Ok(query(
305            r#"
306            SELECT
307                amount,
308                keyset_id,
309                secret,
310                c,
311                witness
312            FROM
313                proof
314            WHERE
315                quote_id = :quote_id
316            "#,
317        )?
318        .bind("quote_id", quote_id.to_string())
319        .fetch_all(&self.inner)
320        .await?
321        .into_iter()
322        .map(sql_row_to_proof)
323        .collect::<Result<Vec<Proof>, _>>()?
324        .ys()?)
325    }
326}
327
328#[async_trait]
329impl<RM> database::MintTransaction<'_, Error> for SQLTransaction<RM> where RM: DatabasePool + 'static
330{}
331
332#[async_trait]
333impl<RM> MintDbWriterFinalizer for SQLTransaction<RM>
334where
335    RM: DatabasePool + 'static,
336{
337    type Err = Error;
338
339    async fn commit(self: Box<Self>) -> Result<(), Error> {
340        let result = self.inner.commit().await;
341        #[cfg(feature = "prometheus")]
342        {
343            let success = result.is_ok();
344            METRICS.record_mint_operation("transaction_commit", success);
345            METRICS.record_mint_operation_histogram("transaction_commit", success, 1.0);
346        }
347
348        Ok(result?)
349    }
350
351    async fn rollback(self: Box<Self>) -> Result<(), Error> {
352        let result = self.inner.rollback().await;
353
354        #[cfg(feature = "prometheus")]
355        {
356            let success = result.is_ok();
357            METRICS.record_mint_operation("transaction_rollback", success);
358            METRICS.record_mint_operation_histogram("transaction_rollback", success, 1.0);
359        }
360        Ok(result?)
361    }
362}
363
364#[inline(always)]
365async fn get_mint_quote_payments<C>(
366    conn: &C,
367    quote_id: &QuoteId,
368) -> Result<Vec<IncomingPayment>, Error>
369where
370    C: DatabaseExecutor + Send + Sync,
371{
372    // Get payment IDs and timestamps from the mint_quote_payments table
373    query(
374        r#"
375        SELECT
376            payment_id,
377            timestamp,
378            amount
379        FROM
380            mint_quote_payments
381        WHERE
382            quote_id=:quote_id
383        "#,
384    )?
385    .bind("quote_id", quote_id.to_string())
386    .fetch_all(conn)
387    .await?
388    .into_iter()
389    .map(|row| {
390        let amount: u64 = column_as_number!(row[2].clone());
391        let time: u64 = column_as_number!(row[1].clone());
392        Ok(IncomingPayment::new(
393            amount.into(),
394            column_as_string!(&row[0]),
395            time,
396        ))
397    })
398    .collect()
399}
400
401#[inline(always)]
402async fn get_mint_quote_issuance<C>(conn: &C, quote_id: &QuoteId) -> Result<Vec<Issuance>, Error>
403where
404    C: DatabaseExecutor + Send + Sync,
405{
406    // Get payment IDs and timestamps from the mint_quote_payments table
407    query(
408        r#"
409SELECT amount, timestamp
410FROM mint_quote_issued
411WHERE quote_id=:quote_id
412            "#,
413    )?
414    .bind("quote_id", quote_id.to_string())
415    .fetch_all(conn)
416    .await?
417    .into_iter()
418    .map(|row| {
419        let time: u64 = column_as_number!(row[1].clone());
420        Ok(Issuance::new(
421            Amount::from_i64(column_as_number!(row[0].clone()))
422                .expect("Is amount when put into db"),
423            time,
424        ))
425    })
426    .collect()
427}
428
429#[async_trait]
430impl<RM> MintKeyDatabaseTransaction<'_, Error> for SQLTransaction<RM>
431where
432    RM: DatabasePool + 'static,
433{
434    async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error> {
435        query(
436            r#"
437        INSERT INTO
438            keyset (
439                id, unit, active, valid_from, valid_to, derivation_path,
440                max_order, amounts, input_fee_ppk, derivation_path_index
441            )
442        VALUES (
443            :id, :unit, :active, :valid_from, :valid_to, :derivation_path,
444            :max_order, :amounts, :input_fee_ppk, :derivation_path_index
445        )
446        ON CONFLICT(id) DO UPDATE SET
447            unit = excluded.unit,
448            active = excluded.active,
449            valid_from = excluded.valid_from,
450            valid_to = excluded.valid_to,
451            derivation_path = excluded.derivation_path,
452            max_order = excluded.max_order,
453            amounts = excluded.amounts,
454            input_fee_ppk = excluded.input_fee_ppk,
455            derivation_path_index = excluded.derivation_path_index
456        "#,
457        )?
458        .bind("id", keyset.id.to_string())
459        .bind("unit", keyset.unit.to_string())
460        .bind("active", keyset.active)
461        .bind("valid_from", keyset.valid_from as i64)
462        .bind("valid_to", keyset.final_expiry.map(|v| v as i64))
463        .bind("derivation_path", keyset.derivation_path.to_string())
464        .bind("max_order", keyset.max_order)
465        .bind("amounts", serde_json::to_string(&keyset.amounts).ok())
466        .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
467        .bind("derivation_path_index", keyset.derivation_path_index)
468        .execute(&self.inner)
469        .await?;
470
471        Ok(())
472    }
473
474    async fn set_active_keyset(&mut self, unit: CurrencyUnit, id: Id) -> Result<(), Error> {
475        query(r#"UPDATE keyset SET active=FALSE WHERE unit = :unit"#)?
476            .bind("unit", unit.to_string())
477            .execute(&self.inner)
478            .await?;
479
480        query(r#"UPDATE keyset SET active=TRUE WHERE unit = :unit AND id = :id"#)?
481            .bind("unit", unit.to_string())
482            .bind("id", id.to_string())
483            .execute(&self.inner)
484            .await?;
485
486        Ok(())
487    }
488}
489
490#[async_trait]
491impl<RM> MintKeysDatabase for SQLMintDatabase<RM>
492where
493    RM: DatabasePool + 'static,
494{
495    type Err = Error;
496
497    async fn begin_transaction<'a>(
498        &'a self,
499    ) -> Result<Box<dyn MintKeyDatabaseTransaction<'a, Error> + Send + Sync + 'a>, Error> {
500        let tx = SQLTransaction {
501            inner: ConnectionWithTransaction::new(
502                self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
503            )
504            .await?,
505        };
506
507        Ok(Box::new(tx))
508    }
509
510    async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
511        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
512        Ok(
513            query(r#" SELECT id FROM keyset WHERE active = :active AND unit = :unit"#)?
514                .bind("active", true)
515                .bind("unit", unit.to_string())
516                .pluck(&*conn)
517                .await?
518                .map(|id| match id {
519                    Column::Text(text) => Ok(Id::from_str(&text)?),
520                    Column::Blob(id) => Ok(Id::from_bytes(&id)?),
521                    _ => Err(Error::InvalidKeysetId),
522                })
523                .transpose()?,
524        )
525    }
526
527    async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> {
528        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
529        Ok(
530            query(r#"SELECT id, unit FROM keyset WHERE active = :active"#)?
531                .bind("active", true)
532                .fetch_all(&*conn)
533                .await?
534                .into_iter()
535                .map(|row| {
536                    Ok((
537                        column_as_string!(&row[1], CurrencyUnit::from_str),
538                        column_as_string!(&row[0], Id::from_str, Id::from_bytes),
539                    ))
540                })
541                .collect::<Result<HashMap<_, _>, Error>>()?,
542        )
543    }
544
545    async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
546        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
547        Ok(query(
548            r#"SELECT
549                id,
550                unit,
551                active,
552                valid_from,
553                valid_to,
554                derivation_path,
555                derivation_path_index,
556                max_order,
557                amounts,
558                input_fee_ppk
559            FROM
560                keyset
561                WHERE id=:id"#,
562        )?
563        .bind("id", id.to_string())
564        .fetch_one(&*conn)
565        .await?
566        .map(sql_row_to_keyset_info)
567        .transpose()?)
568    }
569
570    async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
571        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
572        Ok(query(
573            r#"SELECT
574                id,
575                unit,
576                active,
577                valid_from,
578                valid_to,
579                derivation_path,
580                derivation_path_index,
581                max_order,
582                amounts,
583                input_fee_ppk
584            FROM
585                keyset
586            "#,
587        )?
588        .fetch_all(&*conn)
589        .await?
590        .into_iter()
591        .map(sql_row_to_keyset_info)
592        .collect::<Result<Vec<_>, _>>()?)
593    }
594}
595
596#[async_trait]
597impl<RM> MintQuotesTransaction<'_> for SQLTransaction<RM>
598where
599    RM: DatabasePool + 'static,
600{
601    type Err = Error;
602
603    async fn add_melt_request(
604        &mut self,
605        quote_id: &QuoteId,
606        inputs_amount: Amount,
607        inputs_fee: Amount,
608    ) -> Result<(), Self::Err> {
609        // Insert melt_request
610        query(
611            r#"
612            INSERT INTO melt_request
613            (quote_id, inputs_amount, inputs_fee)
614            VALUES
615            (:quote_id, :inputs_amount, :inputs_fee)
616            "#,
617        )?
618        .bind("quote_id", quote_id.to_string())
619        .bind("inputs_amount", inputs_amount.to_i64())
620        .bind("inputs_fee", inputs_fee.to_i64())
621        .execute(&self.inner)
622        .await?;
623
624        Ok(())
625    }
626
627    async fn add_blinded_messages(
628        &mut self,
629        quote_id: Option<&QuoteId>,
630        blinded_messages: &[BlindedMessage],
631        operation: &Operation,
632    ) -> Result<(), Self::Err> {
633        let current_time = unix_time();
634
635        // Insert blinded_messages directly into blind_signature with c = NULL
636        // Let the database constraint handle duplicate detection
637        for message in blinded_messages {
638            match query(
639                r#"
640                INSERT INTO blind_signature
641                (blinded_message, amount, keyset_id, c, quote_id, created_time, operation_kind, operation_id)
642                VALUES
643                (:blinded_message, :amount, :keyset_id, NULL, :quote_id, :created_time, :operation_kind, :operation_id)
644                "#,
645            )?
646            .bind(
647                "blinded_message",
648                message.blinded_secret.to_bytes().to_vec(),
649            )
650            .bind("amount", message.amount.to_i64())
651            .bind("keyset_id", message.keyset_id.to_string())
652            .bind("quote_id", quote_id.map(|q| q.to_string()))
653            .bind("created_time", current_time as i64)
654            .bind("operation_kind", operation.kind())
655            .bind("operation_id", operation.id().to_string())
656            .execute(&self.inner)
657            .await
658            {
659                Ok(_) => continue,
660                Err(database::Error::Duplicate) => {
661                    // Primary key constraint violation - blinded message already exists
662                    // This could be either:
663                    // 1. Already signed (c IS NOT NULL) - definitely an error
664                    // 2. Already pending (c IS NULL) - also an error
665                    return Err(database::Error::Duplicate);
666                }
667                Err(err) => return Err(err),
668            }
669        }
670
671        Ok(())
672    }
673
674    async fn delete_blinded_messages(
675        &mut self,
676        blinded_secrets: &[PublicKey],
677    ) -> Result<(), Self::Err> {
678        if blinded_secrets.is_empty() {
679            return Ok(());
680        }
681
682        // Delete blinded messages from blind_signature table where c IS NULL
683        // (only delete unsigned blinded messages)
684        query(
685            r#"
686            DELETE FROM blind_signature
687            WHERE blinded_message IN (:blinded_secrets) AND c IS NULL
688            "#,
689        )?
690        .bind_vec(
691            "blinded_secrets",
692            blinded_secrets
693                .iter()
694                .map(|secret| secret.to_bytes().to_vec())
695                .collect(),
696        )
697        .execute(&self.inner)
698        .await?;
699
700        Ok(())
701    }
702
703    async fn get_melt_request_and_blinded_messages(
704        &mut self,
705        quote_id: &QuoteId,
706    ) -> Result<Option<database::mint::MeltRequestInfo>, Self::Err> {
707        let melt_request_row = query(
708            r#"
709            SELECT inputs_amount, inputs_fee
710            FROM melt_request
711            WHERE quote_id = :quote_id
712            FOR UPDATE
713            "#,
714        )?
715        .bind("quote_id", quote_id.to_string())
716        .fetch_one(&self.inner)
717        .await?;
718
719        if let Some(row) = melt_request_row {
720            let inputs_amount: u64 = column_as_number!(row[0].clone());
721            let inputs_fee: u64 = column_as_number!(row[1].clone());
722
723            // Get blinded messages from blind_signature table where c IS NULL
724            let blinded_messages_rows = query(
725                r#"
726                SELECT blinded_message, keyset_id, amount
727                FROM blind_signature
728                WHERE quote_id = :quote_id AND c IS NULL
729                "#,
730            )?
731            .bind("quote_id", quote_id.to_string())
732            .fetch_all(&self.inner)
733            .await?;
734
735            let blinded_messages: Result<Vec<BlindedMessage>, Error> = blinded_messages_rows
736                .into_iter()
737                .map(|row| -> Result<BlindedMessage, Error> {
738                    let blinded_message_key =
739                        column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice);
740                    let keyset_id = column_as_string!(&row[1], Id::from_str, Id::from_bytes);
741                    let amount: u64 = column_as_number!(row[2].clone());
742
743                    Ok(BlindedMessage {
744                        blinded_secret: blinded_message_key,
745                        keyset_id,
746                        amount: Amount::from(amount),
747                        witness: None, // Not storing witness in database currently
748                    })
749                })
750                .collect();
751            let blinded_messages = blinded_messages?;
752
753            Ok(Some(database::mint::MeltRequestInfo {
754                inputs_amount: Amount::from(inputs_amount),
755                inputs_fee: Amount::from(inputs_fee),
756                change_outputs: blinded_messages,
757            }))
758        } else {
759            Ok(None)
760        }
761    }
762
763    async fn delete_melt_request(&mut self, quote_id: &QuoteId) -> Result<(), Self::Err> {
764        // Delete from melt_request table
765        query(
766            r#"
767            DELETE FROM melt_request
768            WHERE quote_id = :quote_id
769            "#,
770        )?
771        .bind("quote_id", quote_id.to_string())
772        .execute(&self.inner)
773        .await?;
774
775        // Also delete blinded messages (where c IS NULL) from blind_signature table
776        query(
777            r#"
778            DELETE FROM blind_signature
779            WHERE quote_id = :quote_id AND c IS NULL
780            "#,
781        )?
782        .bind("quote_id", quote_id.to_string())
783        .execute(&self.inner)
784        .await?;
785
786        Ok(())
787    }
788
789    #[instrument(skip(self))]
790    async fn increment_mint_quote_amount_paid(
791        &mut self,
792        quote_id: &QuoteId,
793        amount_paid: Amount,
794        payment_id: String,
795    ) -> Result<Amount, Self::Err> {
796        if amount_paid == Amount::ZERO {
797            tracing::warn!("Amount payments of zero amount should not be recorded.");
798            return Err(Error::Duplicate);
799        }
800
801        // Check if payment_id already exists in mint_quote_payments
802        let exists = query(
803            r#"
804            SELECT payment_id
805            FROM mint_quote_payments
806            WHERE payment_id = :payment_id
807            FOR UPDATE
808            "#,
809        )?
810        .bind("payment_id", payment_id.clone())
811        .fetch_one(&self.inner)
812        .await?;
813
814        if exists.is_some() {
815            tracing::error!("Payment ID already exists: {}", payment_id);
816            return Err(database::Error::Duplicate);
817        }
818
819        // Get current amount_paid from quote
820        let current_amount = query(
821            r#"
822            SELECT amount_paid
823            FROM mint_quote
824            WHERE id = :quote_id
825            FOR UPDATE
826            "#,
827        )?
828        .bind("quote_id", quote_id.to_string())
829        .fetch_one(&self.inner)
830        .await
831        .inspect_err(|err| {
832            tracing::error!("SQLite could not get mint quote amount_paid: {}", err);
833        })?;
834
835        let current_amount_paid = if let Some(current_amount) = current_amount {
836            let amount: u64 = column_as_number!(current_amount[0].clone());
837            Amount::from(amount)
838        } else {
839            Amount::ZERO
840        };
841
842        // Calculate new amount_paid with overflow check
843        let new_amount_paid = current_amount_paid
844            .checked_add(amount_paid)
845            .ok_or_else(|| database::Error::AmountOverflow)?;
846
847        tracing::debug!(
848            "Mint quote {} amount paid was {} is now {}.",
849            quote_id,
850            current_amount_paid,
851            new_amount_paid
852        );
853
854        // Update the amount_paid
855        query(
856            r#"
857            UPDATE mint_quote
858            SET amount_paid = :amount_paid
859            WHERE id = :quote_id
860            "#,
861        )?
862        .bind("amount_paid", new_amount_paid.to_i64())
863        .bind("quote_id", quote_id.to_string())
864        .execute(&self.inner)
865        .await
866        .inspect_err(|err| {
867            tracing::error!("SQLite could not update mint quote amount_paid: {}", err);
868        })?;
869
870        // Add payment_id to mint_quote_payments table
871        query(
872            r#"
873            INSERT INTO mint_quote_payments
874            (quote_id, payment_id, amount, timestamp)
875            VALUES (:quote_id, :payment_id, :amount, :timestamp)
876            "#,
877        )?
878        .bind("quote_id", quote_id.to_string())
879        .bind("payment_id", payment_id)
880        .bind("amount", amount_paid.to_i64())
881        .bind("timestamp", unix_time() as i64)
882        .execute(&self.inner)
883        .await
884        .map_err(|err| {
885            tracing::error!("SQLite could not insert payment ID: {}", err);
886            err
887        })?;
888
889        Ok(new_amount_paid)
890    }
891
892    #[instrument(skip_all)]
893    async fn increment_mint_quote_amount_issued(
894        &mut self,
895        quote_id: &QuoteId,
896        amount_issued: Amount,
897    ) -> Result<Amount, Self::Err> {
898        // Get current amount_issued from quote
899        let current_amounts = query(
900            r#"
901            SELECT amount_issued, amount_paid
902            FROM mint_quote
903            WHERE id = :quote_id
904            FOR UPDATE
905            "#,
906        )?
907        .bind("quote_id", quote_id.to_string())
908        .fetch_one(&self.inner)
909        .await
910        .inspect_err(|err| {
911            tracing::error!("SQLite could not get mint quote amount_issued: {}", err);
912        })?
913        .ok_or(Error::QuoteNotFound)?;
914
915        let new_amount_issued = {
916            // Make sure the db protects issuing not paid quotes
917            unpack_into!(
918                let (current_amount_issued, current_amount_paid) = current_amounts
919            );
920
921            let current_amount_issued: u64 = column_as_number!(current_amount_issued);
922            let current_amount_paid: u64 = column_as_number!(current_amount_paid);
923
924            let current_amount_issued = Amount::from(current_amount_issued);
925            let current_amount_paid = Amount::from(current_amount_paid);
926
927            // Calculate new amount_issued with overflow check
928            let new_amount_issued = current_amount_issued
929                .checked_add(amount_issued)
930                .ok_or_else(|| database::Error::AmountOverflow)?;
931
932            current_amount_paid
933                .checked_sub(new_amount_issued)
934                .ok_or(Error::Internal("Over-issued not allowed".to_owned()))?;
935
936            new_amount_issued
937        };
938
939        // Update the amount_issued
940        query(
941            r#"
942            UPDATE mint_quote
943            SET amount_issued = :amount_issued
944            WHERE id = :quote_id
945            "#,
946        )?
947        .bind("amount_issued", new_amount_issued.to_i64())
948        .bind("quote_id", quote_id.to_string())
949        .execute(&self.inner)
950        .await
951        .inspect_err(|err| {
952            tracing::error!("SQLite could not update mint quote amount_issued: {}", err);
953        })?;
954
955        let current_time = unix_time();
956
957        query(
958            r#"
959INSERT INTO mint_quote_issued
960(quote_id, amount, timestamp)
961VALUES (:quote_id, :amount, :timestamp);
962            "#,
963        )?
964        .bind("quote_id", quote_id.to_string())
965        .bind("amount", amount_issued.to_i64())
966        .bind("timestamp", current_time as i64)
967        .execute(&self.inner)
968        .await?;
969
970        Ok(new_amount_issued)
971    }
972
973    #[instrument(skip_all)]
974    async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), Self::Err> {
975        query(
976            r#"
977                INSERT INTO mint_quote (
978                id, amount, unit, request, expiry, request_lookup_id, pubkey, created_time, payment_method, request_lookup_id_kind
979                )
980                VALUES (
981                :id, :amount, :unit, :request, :expiry, :request_lookup_id, :pubkey, :created_time, :payment_method, :request_lookup_id_kind
982                )
983            "#,
984        )?
985        .bind("id", quote.id.to_string())
986        .bind("amount", quote.amount.map(|a| a.to_i64()))
987        .bind("unit", quote.unit.to_string())
988        .bind("request", quote.request)
989        .bind("expiry", quote.expiry as i64)
990        .bind(
991            "request_lookup_id",
992            quote.request_lookup_id.to_string(),
993        )
994        .bind("pubkey", quote.pubkey.map(|p| p.to_string()))
995        .bind("created_time", quote.created_time as i64)
996        .bind("payment_method", quote.payment_method.to_string())
997        .bind("request_lookup_id_kind", quote.request_lookup_id.kind())
998        .execute(&self.inner)
999        .await?;
1000
1001        Ok(())
1002    }
1003
1004    async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err> {
1005        // Now insert the new quote
1006        query(
1007            r#"
1008            INSERT INTO melt_quote
1009            (
1010                id, unit, amount, request, fee_reserve, state,
1011                expiry, payment_preimage, request_lookup_id,
1012                created_time, paid_time, options, request_lookup_id_kind, payment_method
1013            )
1014            VALUES
1015            (
1016                :id, :unit, :amount, :request, :fee_reserve, :state,
1017                :expiry, :payment_preimage, :request_lookup_id,
1018                :created_time, :paid_time, :options, :request_lookup_id_kind, :payment_method
1019            )
1020        "#,
1021        )?
1022        .bind("id", quote.id.to_string())
1023        .bind("unit", quote.unit.to_string())
1024        .bind("amount", quote.amount.to_i64())
1025        .bind("request", serde_json::to_string(&quote.request)?)
1026        .bind("fee_reserve", quote.fee_reserve.to_i64())
1027        .bind("state", quote.state.to_string())
1028        .bind("expiry", quote.expiry as i64)
1029        .bind("payment_preimage", quote.payment_preimage)
1030        .bind(
1031            "request_lookup_id",
1032            quote.request_lookup_id.as_ref().map(|id| id.to_string()),
1033        )
1034        .bind("created_time", quote.created_time as i64)
1035        .bind("paid_time", quote.paid_time.map(|t| t as i64))
1036        .bind(
1037            "options",
1038            quote.options.map(|o| serde_json::to_string(&o).ok()),
1039        )
1040        .bind(
1041            "request_lookup_id_kind",
1042            quote.request_lookup_id.map(|id| id.kind()),
1043        )
1044        .bind("payment_method", quote.payment_method.to_string())
1045        .execute(&self.inner)
1046        .await?;
1047
1048        Ok(())
1049    }
1050
1051    async fn update_melt_quote_request_lookup_id(
1052        &mut self,
1053        quote_id: &QuoteId,
1054        new_request_lookup_id: &PaymentIdentifier,
1055    ) -> Result<(), Self::Err> {
1056        query(r#"UPDATE melt_quote SET request_lookup_id = :new_req_id, request_lookup_id_kind = :new_kind WHERE id = :id"#)?
1057            .bind("new_req_id", new_request_lookup_id.to_string())
1058            .bind("new_kind",new_request_lookup_id.kind() )
1059            .bind("id", quote_id.to_string())
1060            .execute(&self.inner)
1061            .await?;
1062        Ok(())
1063    }
1064
1065    async fn update_melt_quote_state(
1066        &mut self,
1067        quote_id: &QuoteId,
1068        state: MeltQuoteState,
1069        payment_proof: Option<String>,
1070    ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
1071        let mut quote = query(
1072            r#"
1073            SELECT
1074                id,
1075                unit,
1076                amount,
1077                request,
1078                fee_reserve,
1079                expiry,
1080                state,
1081                payment_preimage,
1082                request_lookup_id,
1083                created_time,
1084                paid_time,
1085                payment_method,
1086                options,
1087                request_lookup_id_kind
1088            FROM
1089                melt_quote
1090            WHERE
1091                id=:id
1092                AND state != :state
1093            "#,
1094        )?
1095        .bind("id", quote_id.to_string())
1096        .bind("state", state.to_string())
1097        .fetch_one(&self.inner)
1098        .await?
1099        .map(sql_row_to_melt_quote)
1100        .transpose()?
1101        .ok_or(Error::QuoteNotFound)?;
1102
1103        check_melt_quote_state_transition(quote.state, state)?;
1104
1105        let rec = if state == MeltQuoteState::Paid {
1106            let current_time = unix_time();
1107            query(r#"UPDATE melt_quote SET state = :state, paid_time = :paid_time, payment_preimage = :payment_preimage WHERE id = :id"#)?
1108                .bind("state", state.to_string())
1109                .bind("paid_time", current_time as i64)
1110                .bind("payment_preimage", payment_proof)
1111                .bind("id", quote_id.to_string())
1112                .execute(&self.inner)
1113                .await
1114        } else {
1115            query(r#"UPDATE melt_quote SET state = :state WHERE id = :id"#)?
1116                .bind("state", state.to_string())
1117                .bind("id", quote_id.to_string())
1118                .execute(&self.inner)
1119                .await
1120        };
1121
1122        match rec {
1123            Ok(_) => {}
1124            Err(err) => {
1125                tracing::error!("SQLite Could not update melt quote");
1126                return Err(err);
1127            }
1128        };
1129
1130        let old_state = quote.state;
1131        quote.state = state;
1132
1133        if state == MeltQuoteState::Unpaid || state == MeltQuoteState::Failed {
1134            self.delete_melt_request(quote_id).await?;
1135        }
1136
1137        Ok((old_state, quote))
1138    }
1139
1140    async fn get_mint_quote(&mut self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
1141        let payments = get_mint_quote_payments(&self.inner, quote_id).await?;
1142        let issuance = get_mint_quote_issuance(&self.inner, quote_id).await?;
1143
1144        Ok(query(
1145            r#"
1146            SELECT
1147                id,
1148                amount,
1149                unit,
1150                request,
1151                expiry,
1152                request_lookup_id,
1153                pubkey,
1154                created_time,
1155                amount_paid,
1156                amount_issued,
1157                payment_method,
1158                request_lookup_id_kind
1159            FROM
1160                mint_quote
1161            WHERE id = :id
1162            FOR UPDATE
1163            "#,
1164        )?
1165        .bind("id", quote_id.to_string())
1166        .fetch_one(&self.inner)
1167        .await?
1168        .map(|row| sql_row_to_mint_quote(row, payments, issuance))
1169        .transpose()?)
1170    }
1171
1172    async fn get_melt_quote(
1173        &mut self,
1174        quote_id: &QuoteId,
1175    ) -> Result<Option<mint::MeltQuote>, Self::Err> {
1176        Ok(query(
1177            r#"
1178            SELECT
1179                id,
1180                unit,
1181                amount,
1182                request,
1183                fee_reserve,
1184                expiry,
1185                state,
1186                payment_preimage,
1187                request_lookup_id,
1188                created_time,
1189                paid_time,
1190                payment_method,
1191                options,
1192                request_lookup_id
1193            FROM
1194                melt_quote
1195            WHERE
1196                id=:id
1197            "#,
1198        )?
1199        .bind("id", quote_id.to_string())
1200        .fetch_one(&self.inner)
1201        .await?
1202        .map(sql_row_to_melt_quote)
1203        .transpose()?)
1204    }
1205
1206    async fn get_mint_quote_by_request(
1207        &mut self,
1208        request: &str,
1209    ) -> Result<Option<MintQuote>, Self::Err> {
1210        let mut mint_quote = query(
1211            r#"
1212            SELECT
1213                id,
1214                amount,
1215                unit,
1216                request,
1217                expiry,
1218                request_lookup_id,
1219                pubkey,
1220                created_time,
1221                amount_paid,
1222                amount_issued,
1223                payment_method,
1224                request_lookup_id_kind
1225            FROM
1226                mint_quote
1227            WHERE request = :request
1228            FOR UPDATE
1229            "#,
1230        )?
1231        .bind("request", request.to_string())
1232        .fetch_one(&self.inner)
1233        .await?
1234        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1235        .transpose()?;
1236
1237        if let Some(quote) = mint_quote.as_mut() {
1238            let payments = get_mint_quote_payments(&self.inner, &quote.id).await?;
1239            let issuance = get_mint_quote_issuance(&self.inner, &quote.id).await?;
1240            quote.issuance = issuance;
1241            quote.payments = payments;
1242        }
1243
1244        Ok(mint_quote)
1245    }
1246
1247    async fn get_mint_quote_by_request_lookup_id(
1248        &mut self,
1249        request_lookup_id: &PaymentIdentifier,
1250    ) -> Result<Option<MintQuote>, Self::Err> {
1251        let mut mint_quote = query(
1252            r#"
1253            SELECT
1254                id,
1255                amount,
1256                unit,
1257                request,
1258                expiry,
1259                request_lookup_id,
1260                pubkey,
1261                created_time,
1262                amount_paid,
1263                amount_issued,
1264                payment_method,
1265                request_lookup_id_kind
1266            FROM
1267                mint_quote
1268            WHERE request_lookup_id = :request_lookup_id
1269            AND request_lookup_id_kind = :request_lookup_id_kind
1270            FOR UPDATE
1271            "#,
1272        )?
1273        .bind("request_lookup_id", request_lookup_id.to_string())
1274        .bind("request_lookup_id_kind", request_lookup_id.kind())
1275        .fetch_one(&self.inner)
1276        .await?
1277        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1278        .transpose()?;
1279
1280        if let Some(quote) = mint_quote.as_mut() {
1281            let payments = get_mint_quote_payments(&self.inner, &quote.id).await?;
1282            let issuance = get_mint_quote_issuance(&self.inner, &quote.id).await?;
1283            quote.issuance = issuance;
1284            quote.payments = payments;
1285        }
1286
1287        Ok(mint_quote)
1288    }
1289}
1290
1291#[async_trait]
1292impl<RM> MintQuotesDatabase for SQLMintDatabase<RM>
1293where
1294    RM: DatabasePool + 'static,
1295{
1296    type Err = Error;
1297
1298    async fn get_mint_quote(&self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
1299        #[cfg(feature = "prometheus")]
1300        METRICS.inc_in_flight_requests("get_mint_quote");
1301
1302        #[cfg(feature = "prometheus")]
1303        let start_time = std::time::Instant::now();
1304        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1305
1306        let result = async {
1307            let payments = get_mint_quote_payments(&*conn, quote_id).await?;
1308            let issuance = get_mint_quote_issuance(&*conn, quote_id).await?;
1309
1310            query(
1311                r#"
1312                SELECT
1313                    id,
1314                    amount,
1315                    unit,
1316                    request,
1317                    expiry,
1318                    request_lookup_id,
1319                    pubkey,
1320                    created_time,
1321                    amount_paid,
1322                    amount_issued,
1323                    payment_method,
1324                    request_lookup_id_kind
1325                FROM
1326                    mint_quote
1327                WHERE id = :id"#,
1328            )?
1329            .bind("id", quote_id.to_string())
1330            .fetch_one(&*conn)
1331            .await?
1332            .map(|row| sql_row_to_mint_quote(row, payments, issuance))
1333            .transpose()
1334        }
1335        .await;
1336
1337        #[cfg(feature = "prometheus")]
1338        {
1339            let success = result.is_ok();
1340
1341            METRICS.record_mint_operation("get_mint_quote", success);
1342            METRICS.record_mint_operation_histogram(
1343                "get_mint_quote",
1344                success,
1345                start_time.elapsed().as_secs_f64(),
1346            );
1347            METRICS.dec_in_flight_requests("get_mint_quote");
1348        }
1349
1350        result
1351    }
1352
1353    async fn get_mint_quote_by_request(
1354        &self,
1355        request: &str,
1356    ) -> Result<Option<MintQuote>, Self::Err> {
1357        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1358        let mut mint_quote = query(
1359            r#"
1360            SELECT
1361                id,
1362                amount,
1363                unit,
1364                request,
1365                expiry,
1366                request_lookup_id,
1367                pubkey,
1368                created_time,
1369                amount_paid,
1370                amount_issued,
1371                payment_method,
1372                request_lookup_id_kind
1373            FROM
1374                mint_quote
1375            WHERE request = :request"#,
1376        )?
1377        .bind("request", request.to_owned())
1378        .fetch_one(&*conn)
1379        .await?
1380        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1381        .transpose()?;
1382
1383        if let Some(quote) = mint_quote.as_mut() {
1384            let payments = get_mint_quote_payments(&*conn, &quote.id).await?;
1385            let issuance = get_mint_quote_issuance(&*conn, &quote.id).await?;
1386            quote.issuance = issuance;
1387            quote.payments = payments;
1388        }
1389
1390        Ok(mint_quote)
1391    }
1392
1393    async fn get_mint_quote_by_request_lookup_id(
1394        &self,
1395        request_lookup_id: &PaymentIdentifier,
1396    ) -> Result<Option<MintQuote>, Self::Err> {
1397        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1398        let mut mint_quote = query(
1399            r#"
1400            SELECT
1401                id,
1402                amount,
1403                unit,
1404                request,
1405                expiry,
1406                request_lookup_id,
1407                pubkey,
1408                created_time,
1409                amount_paid,
1410                amount_issued,
1411                payment_method,
1412                request_lookup_id_kind
1413            FROM
1414                mint_quote
1415            WHERE request_lookup_id = :request_lookup_id
1416            AND request_lookup_id_kind = :request_lookup_id_kind
1417            "#,
1418        )?
1419        .bind("request_lookup_id", request_lookup_id.to_string())
1420        .bind("request_lookup_id_kind", request_lookup_id.kind())
1421        .fetch_one(&*conn)
1422        .await?
1423        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1424        .transpose()?;
1425
1426        // TODO: these should use an sql join so they can be done in one query
1427        if let Some(quote) = mint_quote.as_mut() {
1428            let payments = get_mint_quote_payments(&*conn, &quote.id).await?;
1429            let issuance = get_mint_quote_issuance(&*conn, &quote.id).await?;
1430            quote.issuance = issuance;
1431            quote.payments = payments;
1432        }
1433
1434        Ok(mint_quote)
1435    }
1436
1437    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
1438        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1439        let mut mint_quotes = query(
1440            r#"
1441            SELECT
1442                id,
1443                amount,
1444                unit,
1445                request,
1446                expiry,
1447                request_lookup_id,
1448                pubkey,
1449                created_time,
1450                amount_paid,
1451                amount_issued,
1452                payment_method,
1453                request_lookup_id_kind
1454            FROM
1455                mint_quote
1456            "#,
1457        )?
1458        .fetch_all(&*conn)
1459        .await?
1460        .into_iter()
1461        .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1462        .collect::<Result<Vec<_>, _>>()?;
1463
1464        for quote in mint_quotes.as_mut_slice() {
1465            let payments = get_mint_quote_payments(&*conn, &quote.id).await?;
1466            let issuance = get_mint_quote_issuance(&*conn, &quote.id).await?;
1467            quote.issuance = issuance;
1468            quote.payments = payments;
1469        }
1470
1471        Ok(mint_quotes)
1472    }
1473
1474    async fn get_melt_quote(
1475        &self,
1476        quote_id: &QuoteId,
1477    ) -> Result<Option<mint::MeltQuote>, Self::Err> {
1478        #[cfg(feature = "prometheus")]
1479        METRICS.inc_in_flight_requests("get_melt_quote");
1480
1481        #[cfg(feature = "prometheus")]
1482        let start_time = std::time::Instant::now();
1483        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1484
1485        let result = async {
1486            query(
1487                r#"
1488                SELECT
1489                    id,
1490                    unit,
1491                    amount,
1492                    request,
1493                    fee_reserve,
1494                    expiry,
1495                    state,
1496                    payment_preimage,
1497                    request_lookup_id,
1498                    created_time,
1499                    paid_time,
1500                    payment_method,
1501                    options,
1502                    request_lookup_id_kind
1503                FROM
1504                    melt_quote
1505                WHERE
1506                    id=:id
1507                "#,
1508            )?
1509            .bind("id", quote_id.to_string())
1510            .fetch_one(&*conn)
1511            .await?
1512            .map(sql_row_to_melt_quote)
1513            .transpose()
1514        }
1515        .await;
1516
1517        #[cfg(feature = "prometheus")]
1518        {
1519            let success = result.is_ok();
1520
1521            METRICS.record_mint_operation("get_melt_quote", success);
1522            METRICS.record_mint_operation_histogram(
1523                "get_melt_quote",
1524                success,
1525                start_time.elapsed().as_secs_f64(),
1526            );
1527            METRICS.dec_in_flight_requests("get_melt_quote");
1528        }
1529
1530        result
1531    }
1532
1533    async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
1534        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1535        Ok(query(
1536            r#"
1537            SELECT
1538                id,
1539                unit,
1540                amount,
1541                request,
1542                fee_reserve,
1543                expiry,
1544                state,
1545                payment_preimage,
1546                request_lookup_id,
1547                created_time,
1548                paid_time,
1549                payment_method,
1550                options,
1551                request_lookup_id_kind
1552            FROM
1553                melt_quote
1554            "#,
1555        )?
1556        .fetch_all(&*conn)
1557        .await?
1558        .into_iter()
1559        .map(sql_row_to_melt_quote)
1560        .collect::<Result<Vec<_>, _>>()?)
1561    }
1562}
1563
1564#[async_trait]
1565impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
1566where
1567    RM: DatabasePool + 'static,
1568{
1569    type Err = Error;
1570
1571    async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
1572        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1573        let mut proofs = query(
1574            r#"
1575            SELECT
1576                amount,
1577                keyset_id,
1578                secret,
1579                c,
1580                witness,
1581                y
1582            FROM
1583                proof
1584            WHERE
1585                y IN (:ys)
1586            "#,
1587        )?
1588        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
1589        .fetch_all(&*conn)
1590        .await?
1591        .into_iter()
1592        .map(|mut row| {
1593            Ok((
1594                column_as_string!(
1595                    row.pop().ok_or(Error::InvalidDbResponse)?,
1596                    PublicKey::from_hex,
1597                    PublicKey::from_slice
1598                ),
1599                sql_row_to_proof(row)?,
1600            ))
1601        })
1602        .collect::<Result<HashMap<_, _>, Error>>()?;
1603
1604        Ok(ys.iter().map(|y| proofs.remove(y)).collect())
1605    }
1606
1607    async fn get_proof_ys_by_quote_id(
1608        &self,
1609        quote_id: &QuoteId,
1610    ) -> Result<Vec<PublicKey>, Self::Err> {
1611        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1612        Ok(query(
1613            r#"
1614            SELECT
1615                amount,
1616                keyset_id,
1617                secret,
1618                c,
1619                witness
1620            FROM
1621                proof
1622            WHERE
1623                quote_id = :quote_id
1624            "#,
1625        )?
1626        .bind("quote_id", quote_id.to_string())
1627        .fetch_all(&*conn)
1628        .await?
1629        .into_iter()
1630        .map(sql_row_to_proof)
1631        .collect::<Result<Vec<Proof>, _>>()?
1632        .ys()?)
1633    }
1634
1635    async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
1636        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1637        let mut current_states = get_current_states(&*conn, ys).await?;
1638
1639        Ok(ys.iter().map(|y| current_states.remove(y)).collect())
1640    }
1641
1642    async fn get_proofs_by_keyset_id(
1643        &self,
1644        keyset_id: &Id,
1645    ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
1646        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1647        Ok(query(
1648            r#"
1649            SELECT
1650               keyset_id,
1651               amount,
1652               secret,
1653               c,
1654               witness,
1655               state
1656            FROM
1657                proof
1658            WHERE
1659                keyset_id=:keyset_id
1660            "#,
1661        )?
1662        .bind("keyset_id", keyset_id.to_string())
1663        .fetch_all(&*conn)
1664        .await?
1665        .into_iter()
1666        .map(sql_row_to_proof_with_state)
1667        .collect::<Result<Vec<_>, _>>()?
1668        .into_iter()
1669        .unzip())
1670    }
1671
1672    /// Get total proofs redeemed by keyset id
1673    async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
1674        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1675        query(
1676            r#"
1677            SELECT
1678                keyset_id,
1679                total_redeemed as amount
1680            FROM
1681                keyset_amounts
1682        "#,
1683        )?
1684        .fetch_all(&*conn)
1685        .await?
1686        .into_iter()
1687        .map(sql_row_to_hashmap_amount)
1688        .collect()
1689    }
1690}
1691
1692#[async_trait]
1693impl<RM> MintSignatureTransaction<'_> for SQLTransaction<RM>
1694where
1695    RM: DatabasePool + 'static,
1696{
1697    type Err = Error;
1698
1699    async fn add_blind_signatures(
1700        &mut self,
1701        blinded_messages: &[PublicKey],
1702        blind_signatures: &[BlindSignature],
1703        quote_id: Option<QuoteId>,
1704    ) -> Result<(), Self::Err> {
1705        let current_time = unix_time();
1706
1707        if blinded_messages.len() != blind_signatures.len() {
1708            return Err(database::Error::Internal(
1709                "Mismatched array lengths for blinded messages and blind signatures".to_string(),
1710            ));
1711        }
1712
1713        // Select all existing rows for the given blinded messages at once
1714        let mut existing_rows = query(
1715            r#"
1716            SELECT blinded_message, c, dleq_e, dleq_s
1717            FROM blind_signature
1718            WHERE blinded_message IN (:blinded_messages)
1719            FOR UPDATE
1720            "#,
1721        )?
1722        .bind_vec(
1723            "blinded_messages",
1724            blinded_messages
1725                .iter()
1726                .map(|message| message.to_bytes().to_vec())
1727                .collect(),
1728        )
1729        .fetch_all(&self.inner)
1730        .await?
1731        .into_iter()
1732        .map(|mut row| {
1733            Ok((
1734                column_as_string!(&row.remove(0), PublicKey::from_hex, PublicKey::from_slice),
1735                (row[0].clone(), row[1].clone(), row[2].clone()),
1736            ))
1737        })
1738        .collect::<Result<HashMap<_, _>, Error>>()?;
1739
1740        // Iterate over the provided blinded messages and signatures
1741        for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
1742            match existing_rows.remove(message) {
1743                None => {
1744                    // Unknown blind message: Insert new row with all columns
1745                    query(
1746                        r#"
1747                        INSERT INTO blind_signature
1748                        (blinded_message, amount, keyset_id, c, quote_id, dleq_e, dleq_s, created_time, signed_time)
1749                        VALUES
1750                        (:blinded_message, :amount, :keyset_id, :c, :quote_id, :dleq_e, :dleq_s, :created_time, :signed_time)
1751                        "#,
1752                    )?
1753                    .bind("blinded_message", message.to_bytes().to_vec())
1754                    .bind("amount", u64::from(signature.amount) as i64)
1755                    .bind("keyset_id", signature.keyset_id.to_string())
1756                    .bind("c", signature.c.to_bytes().to_vec())
1757                    .bind("quote_id", quote_id.as_ref().map(|q| q.to_string()))
1758                    .bind(
1759                        "dleq_e",
1760                        signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
1761                    )
1762                    .bind(
1763                        "dleq_s",
1764                        signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
1765                    )
1766                    .bind("created_time", current_time as i64)
1767                    .bind("signed_time", current_time as i64)
1768                    .execute(&self.inner)
1769                    .await?;
1770
1771                    query(
1772                        r#"
1773                        INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
1774                        VALUES (:keyset_id, :amount, 0)
1775                        ON CONFLICT (keyset_id)
1776                        DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
1777                        "#,
1778                    )?
1779                    .bind("amount", u64::from(signature.amount) as i64)
1780                    .bind("keyset_id", signature.keyset_id.to_string())
1781                    .execute(&self.inner)
1782                    .await?;
1783                }
1784                Some((c, _dleq_e, _dleq_s)) => {
1785                    // Blind message exists: check if c is NULL
1786                    match c {
1787                        Column::Null => {
1788                            // Blind message with no c: Update with missing columns c, dleq_e, dleq_s
1789                            query(
1790                                r#"
1791                                UPDATE blind_signature
1792                                SET c = :c, dleq_e = :dleq_e, dleq_s = :dleq_s, signed_time = :signed_time, amount = :amount
1793                                WHERE blinded_message = :blinded_message
1794                                "#,
1795                            )?
1796                            .bind("c", signature.c.to_bytes().to_vec())
1797                            .bind(
1798                                "dleq_e",
1799                                signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
1800                            )
1801                            .bind(
1802                                "dleq_s",
1803                                signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
1804                            )
1805                            .bind("blinded_message", message.to_bytes().to_vec())
1806                            .bind("signed_time", current_time as i64)
1807                            .bind("amount", u64::from(signature.amount) as i64)
1808                            .execute(&self.inner)
1809                            .await?;
1810
1811                            query(
1812                                r#"
1813                                INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
1814                                VALUES (:keyset_id, :amount, 0)
1815                                ON CONFLICT (keyset_id)
1816                                DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
1817                                "#,
1818                            )?
1819                            .bind("amount", u64::from(signature.amount) as i64)
1820                            .bind("keyset_id", signature.keyset_id.to_string())
1821                            .execute(&self.inner)
1822                            .await?;
1823                        }
1824                        _ => {
1825                            // Blind message already has c: Error
1826                            tracing::error!(
1827                                "Attempting to add signature to message already signed {}",
1828                                message
1829                            );
1830
1831                            return Err(database::Error::Duplicate);
1832                        }
1833                    }
1834                }
1835            }
1836        }
1837
1838        debug_assert!(
1839            existing_rows.is_empty(),
1840            "Unexpected existing rows remain: {:?}",
1841            existing_rows.keys().collect::<Vec<_>>()
1842        );
1843
1844        if !existing_rows.is_empty() {
1845            tracing::error!("Did not check all existing rows");
1846            return Err(Error::Internal(
1847                "Did not check all existing rows".to_string(),
1848            ));
1849        }
1850
1851        Ok(())
1852    }
1853
1854    async fn get_blind_signatures(
1855        &mut self,
1856        blinded_messages: &[PublicKey],
1857    ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
1858        let mut blinded_signatures = query(
1859            r#"SELECT
1860                keyset_id,
1861                amount,
1862                c,
1863                dleq_e,
1864                dleq_s,
1865                blinded_message
1866            FROM
1867                blind_signature
1868            WHERE blinded_message IN (:b) AND c IS NOT NULL
1869            "#,
1870        )?
1871        .bind_vec(
1872            "b",
1873            blinded_messages
1874                .iter()
1875                .map(|b| b.to_bytes().to_vec())
1876                .collect(),
1877        )
1878        .fetch_all(&self.inner)
1879        .await?
1880        .into_iter()
1881        .map(|mut row| {
1882            Ok((
1883                column_as_string!(
1884                    &row.pop().ok_or(Error::InvalidDbResponse)?,
1885                    PublicKey::from_hex,
1886                    PublicKey::from_slice
1887                ),
1888                sql_row_to_blind_signature(row)?,
1889            ))
1890        })
1891        .collect::<Result<HashMap<_, _>, Error>>()?;
1892        Ok(blinded_messages
1893            .iter()
1894            .map(|y| blinded_signatures.remove(y))
1895            .collect())
1896    }
1897}
1898
1899#[async_trait]
1900impl<RM> MintSignaturesDatabase for SQLMintDatabase<RM>
1901where
1902    RM: DatabasePool + 'static,
1903{
1904    type Err = Error;
1905
1906    async fn get_blind_signatures(
1907        &self,
1908        blinded_messages: &[PublicKey],
1909    ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
1910        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1911        let mut blinded_signatures = query(
1912            r#"SELECT
1913                keyset_id,
1914                amount,
1915                c,
1916                dleq_e,
1917                dleq_s,
1918                blinded_message
1919            FROM
1920                blind_signature
1921            WHERE blinded_message IN (:b) AND c IS NOT NULL
1922            "#,
1923        )?
1924        .bind_vec(
1925            "b",
1926            blinded_messages
1927                .iter()
1928                .map(|b_| b_.to_bytes().to_vec())
1929                .collect(),
1930        )
1931        .fetch_all(&*conn)
1932        .await?
1933        .into_iter()
1934        .map(|mut row| {
1935            Ok((
1936                column_as_string!(
1937                    &row.pop().ok_or(Error::InvalidDbResponse)?,
1938                    PublicKey::from_hex,
1939                    PublicKey::from_slice
1940                ),
1941                sql_row_to_blind_signature(row)?,
1942            ))
1943        })
1944        .collect::<Result<HashMap<_, _>, Error>>()?;
1945        Ok(blinded_messages
1946            .iter()
1947            .map(|y| blinded_signatures.remove(y))
1948            .collect())
1949    }
1950
1951    async fn get_blind_signatures_for_keyset(
1952        &self,
1953        keyset_id: &Id,
1954    ) -> Result<Vec<BlindSignature>, Self::Err> {
1955        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1956        Ok(query(
1957            r#"
1958            SELECT
1959                keyset_id,
1960                amount,
1961                c,
1962                dleq_e,
1963                dleq_s
1964            FROM
1965                blind_signature
1966            WHERE
1967                keyset_id=:keyset_id AND c IS NOT NULL
1968            "#,
1969        )?
1970        .bind("keyset_id", keyset_id.to_string())
1971        .fetch_all(&*conn)
1972        .await?
1973        .into_iter()
1974        .map(sql_row_to_blind_signature)
1975        .collect::<Result<Vec<BlindSignature>, _>>()?)
1976    }
1977
1978    /// Get [`BlindSignature`]s for quote
1979    async fn get_blind_signatures_for_quote(
1980        &self,
1981        quote_id: &QuoteId,
1982    ) -> Result<Vec<BlindSignature>, Self::Err> {
1983        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1984        Ok(query(
1985            r#"
1986            SELECT
1987                keyset_id,
1988                amount,
1989                c,
1990                dleq_e,
1991                dleq_s
1992            FROM
1993                blind_signature
1994            WHERE
1995                quote_id=:quote_id AND c IS NOT NULL
1996            "#,
1997        )?
1998        .bind("quote_id", quote_id.to_string())
1999        .fetch_all(&*conn)
2000        .await?
2001        .into_iter()
2002        .map(sql_row_to_blind_signature)
2003        .collect::<Result<Vec<BlindSignature>, _>>()?)
2004    }
2005
2006    /// Get total proofs redeemed by keyset id
2007    async fn get_total_issued(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
2008        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2009        query(
2010            r#"
2011            SELECT
2012                keyset_id,
2013                total_issued as amount
2014            FROM
2015                keyset_amounts
2016        "#,
2017        )?
2018        .fetch_all(&*conn)
2019        .await?
2020        .into_iter()
2021        .map(sql_row_to_hashmap_amount)
2022        .collect()
2023    }
2024}
2025
2026#[async_trait]
2027impl<RM> database::MintKVStoreTransaction<'_, Error> for SQLTransaction<RM>
2028where
2029    RM: DatabasePool + 'static,
2030{
2031    async fn kv_read(
2032        &mut self,
2033        primary_namespace: &str,
2034        secondary_namespace: &str,
2035        key: &str,
2036    ) -> Result<Option<Vec<u8>>, Error> {
2037        // Validate parameters according to KV store requirements
2038        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2039        Ok(query(
2040            r#"
2041            SELECT value
2042            FROM kv_store
2043            WHERE primary_namespace = :primary_namespace
2044            AND secondary_namespace = :secondary_namespace
2045            AND key = :key
2046            "#,
2047        )?
2048        .bind("primary_namespace", primary_namespace.to_owned())
2049        .bind("secondary_namespace", secondary_namespace.to_owned())
2050        .bind("key", key.to_owned())
2051        .pluck(&self.inner)
2052        .await?
2053        .and_then(|col| match col {
2054            Column::Blob(data) => Some(data),
2055            _ => None,
2056        }))
2057    }
2058
2059    async fn kv_write(
2060        &mut self,
2061        primary_namespace: &str,
2062        secondary_namespace: &str,
2063        key: &str,
2064        value: &[u8],
2065    ) -> Result<(), Error> {
2066        // Validate parameters according to KV store requirements
2067        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2068
2069        let current_time = unix_time();
2070
2071        query(
2072            r#"
2073            INSERT INTO kv_store
2074            (primary_namespace, secondary_namespace, key, value, created_time, updated_time)
2075            VALUES (:primary_namespace, :secondary_namespace, :key, :value, :created_time, :updated_time)
2076            ON CONFLICT(primary_namespace, secondary_namespace, key)
2077            DO UPDATE SET
2078                value = excluded.value,
2079                updated_time = excluded.updated_time
2080            "#,
2081        )?
2082        .bind("primary_namespace", primary_namespace.to_owned())
2083        .bind("secondary_namespace", secondary_namespace.to_owned())
2084        .bind("key", key.to_owned())
2085        .bind("value", value.to_vec())
2086        .bind("created_time", current_time as i64)
2087        .bind("updated_time", current_time as i64)
2088        .execute(&self.inner)
2089        .await?;
2090
2091        Ok(())
2092    }
2093
2094    async fn kv_remove(
2095        &mut self,
2096        primary_namespace: &str,
2097        secondary_namespace: &str,
2098        key: &str,
2099    ) -> Result<(), Error> {
2100        // Validate parameters according to KV store requirements
2101        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2102        query(
2103            r#"
2104            DELETE FROM kv_store
2105            WHERE primary_namespace = :primary_namespace
2106            AND secondary_namespace = :secondary_namespace
2107            AND key = :key
2108            "#,
2109        )?
2110        .bind("primary_namespace", primary_namespace.to_owned())
2111        .bind("secondary_namespace", secondary_namespace.to_owned())
2112        .bind("key", key.to_owned())
2113        .execute(&self.inner)
2114        .await?;
2115
2116        Ok(())
2117    }
2118
2119    async fn kv_list(
2120        &mut self,
2121        primary_namespace: &str,
2122        secondary_namespace: &str,
2123    ) -> Result<Vec<String>, Error> {
2124        // Validate namespace parameters according to KV store requirements
2125        cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
2126        cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
2127
2128        // Check empty namespace rules
2129        if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
2130            return Err(Error::KVStoreInvalidKey(
2131                "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
2132            ));
2133        }
2134        Ok(query(
2135            r#"
2136            SELECT key
2137            FROM kv_store
2138            WHERE primary_namespace = :primary_namespace
2139            AND secondary_namespace = :secondary_namespace
2140            ORDER BY key
2141            "#,
2142        )?
2143        .bind("primary_namespace", primary_namespace.to_owned())
2144        .bind("secondary_namespace", secondary_namespace.to_owned())
2145        .fetch_all(&self.inner)
2146        .await?
2147        .into_iter()
2148        .map(|row| Ok(column_as_string!(&row[0])))
2149        .collect::<Result<Vec<_>, Error>>()?)
2150    }
2151}
2152
2153#[async_trait]
2154impl<RM> database::MintKVStoreDatabase for SQLMintDatabase<RM>
2155where
2156    RM: DatabasePool + 'static,
2157{
2158    type Err = Error;
2159
2160    async fn kv_read(
2161        &self,
2162        primary_namespace: &str,
2163        secondary_namespace: &str,
2164        key: &str,
2165    ) -> Result<Option<Vec<u8>>, Error> {
2166        // Validate parameters according to KV store requirements
2167        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2168
2169        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2170        Ok(query(
2171            r#"
2172            SELECT value
2173            FROM kv_store
2174            WHERE primary_namespace = :primary_namespace
2175            AND secondary_namespace = :secondary_namespace
2176            AND key = :key
2177            "#,
2178        )?
2179        .bind("primary_namespace", primary_namespace.to_owned())
2180        .bind("secondary_namespace", secondary_namespace.to_owned())
2181        .bind("key", key.to_owned())
2182        .pluck(&*conn)
2183        .await?
2184        .and_then(|col| match col {
2185            Column::Blob(data) => Some(data),
2186            _ => None,
2187        }))
2188    }
2189
2190    async fn kv_list(
2191        &self,
2192        primary_namespace: &str,
2193        secondary_namespace: &str,
2194    ) -> Result<Vec<String>, Error> {
2195        // Validate namespace parameters according to KV store requirements
2196        cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
2197        cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
2198
2199        // Check empty namespace rules
2200        if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
2201            return Err(Error::KVStoreInvalidKey(
2202                "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
2203            ));
2204        }
2205
2206        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2207        Ok(query(
2208            r#"
2209            SELECT key
2210            FROM kv_store
2211            WHERE primary_namespace = :primary_namespace
2212            AND secondary_namespace = :secondary_namespace
2213            ORDER BY key
2214            "#,
2215        )?
2216        .bind("primary_namespace", primary_namespace.to_owned())
2217        .bind("secondary_namespace", secondary_namespace.to_owned())
2218        .fetch_all(&*conn)
2219        .await?
2220        .into_iter()
2221        .map(|row| Ok(column_as_string!(&row[0])))
2222        .collect::<Result<Vec<_>, Error>>()?)
2223    }
2224}
2225
2226#[async_trait]
2227impl<RM> database::MintKVStore for SQLMintDatabase<RM>
2228where
2229    RM: DatabasePool + 'static,
2230{
2231    async fn begin_transaction<'a>(
2232        &'a self,
2233    ) -> Result<Box<dyn database::MintKVStoreTransaction<'a, Self::Err> + Send + Sync + 'a>, Error>
2234    {
2235        Ok(Box::new(SQLTransaction {
2236            inner: ConnectionWithTransaction::new(
2237                self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
2238            )
2239            .await?,
2240        }))
2241    }
2242}
2243
2244#[async_trait]
2245impl<RM> SagaTransaction<'_> for SQLTransaction<RM>
2246where
2247    RM: DatabasePool + 'static,
2248{
2249    type Err = Error;
2250
2251    async fn get_saga(
2252        &mut self,
2253        operation_id: &uuid::Uuid,
2254    ) -> Result<Option<mint::Saga>, Self::Err> {
2255        Ok(query(
2256            r#"
2257            SELECT
2258                operation_id,
2259                operation_kind,
2260                state,
2261                blinded_secrets,
2262                input_ys,
2263                quote_id,
2264                created_at,
2265                updated_at
2266            FROM
2267                saga_state
2268            WHERE
2269                operation_id = :operation_id
2270            FOR UPDATE
2271            "#,
2272        )?
2273        .bind("operation_id", operation_id.to_string())
2274        .fetch_one(&self.inner)
2275        .await?
2276        .map(sql_row_to_saga)
2277        .transpose()?)
2278    }
2279
2280    async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
2281        let current_time = unix_time();
2282
2283        let blinded_secrets_json = serde_json::to_string(&saga.blinded_secrets)
2284            .map_err(|e| Error::Internal(format!("Failed to serialize blinded_secrets: {e}")))?;
2285
2286        let input_ys_json = serde_json::to_string(&saga.input_ys)
2287            .map_err(|e| Error::Internal(format!("Failed to serialize input_ys: {e}")))?;
2288
2289        query(
2290            r#"
2291            INSERT INTO saga_state
2292            (operation_id, operation_kind, state, blinded_secrets, input_ys, quote_id, created_at, updated_at)
2293            VALUES
2294            (:operation_id, :operation_kind, :state, :blinded_secrets, :input_ys, :quote_id, :created_at, :updated_at)
2295            "#,
2296        )?
2297        .bind("operation_id", saga.operation_id.to_string())
2298        .bind("operation_kind", saga.operation_kind.to_string())
2299        .bind("state", saga.state.state())
2300        .bind("blinded_secrets", blinded_secrets_json)
2301        .bind("input_ys", input_ys_json)
2302        .bind("quote_id", saga.quote_id.as_deref())
2303        .bind("created_at", saga.created_at as i64)
2304        .bind("updated_at", current_time as i64)
2305        .execute(&self.inner)
2306        .await?;
2307
2308        Ok(())
2309    }
2310
2311    async fn update_saga(
2312        &mut self,
2313        operation_id: &uuid::Uuid,
2314        new_state: mint::SagaStateEnum,
2315    ) -> Result<(), Self::Err> {
2316        let current_time = unix_time();
2317
2318        query(
2319            r#"
2320            UPDATE saga_state
2321            SET state = :state, updated_at = :updated_at
2322            WHERE operation_id = :operation_id
2323            "#,
2324        )?
2325        .bind("state", new_state.state())
2326        .bind("updated_at", current_time as i64)
2327        .bind("operation_id", operation_id.to_string())
2328        .execute(&self.inner)
2329        .await?;
2330
2331        Ok(())
2332    }
2333
2334    async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
2335        query(
2336            r#"
2337            DELETE FROM saga_state
2338            WHERE operation_id = :operation_id
2339            "#,
2340        )?
2341        .bind("operation_id", operation_id.to_string())
2342        .execute(&self.inner)
2343        .await?;
2344
2345        Ok(())
2346    }
2347}
2348
2349#[async_trait]
2350impl<RM> SagaDatabase for SQLMintDatabase<RM>
2351where
2352    RM: DatabasePool + 'static,
2353{
2354    type Err = Error;
2355
2356    async fn get_incomplete_sagas(
2357        &self,
2358        operation_kind: mint::OperationKind,
2359    ) -> Result<Vec<mint::Saga>, Self::Err> {
2360        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2361        Ok(query(
2362            r#"
2363            SELECT
2364                operation_id,
2365                operation_kind,
2366                state,
2367                blinded_secrets,
2368                input_ys,
2369                quote_id,
2370                created_at,
2371                updated_at
2372            FROM
2373                saga_state
2374            WHERE
2375                operation_kind = :operation_kind
2376            ORDER BY created_at ASC
2377            "#,
2378        )?
2379        .bind("operation_kind", operation_kind.to_string())
2380        .fetch_all(&*conn)
2381        .await?
2382        .into_iter()
2383        .map(sql_row_to_saga)
2384        .collect::<Result<Vec<_>, _>>()?)
2385    }
2386}
2387
2388#[async_trait]
2389impl<RM> MintDatabase<Error> for SQLMintDatabase<RM>
2390where
2391    RM: DatabasePool + 'static,
2392{
2393    async fn begin_transaction<'a>(
2394        &'a self,
2395    ) -> Result<Box<dyn database::MintTransaction<'a, Error> + Send + Sync + 'a>, Error> {
2396        let tx = SQLTransaction {
2397            inner: ConnectionWithTransaction::new(
2398                self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
2399            )
2400            .await?,
2401        };
2402
2403        Ok(Box::new(tx))
2404    }
2405}
2406
2407fn sql_row_to_keyset_info(row: Vec<Column>) -> Result<MintKeySetInfo, Error> {
2408    unpack_into!(
2409        let (
2410            id,
2411            unit,
2412            active,
2413            valid_from,
2414            valid_to,
2415            derivation_path,
2416            derivation_path_index,
2417            max_order,
2418            amounts,
2419            row_keyset_ppk
2420        ) = row
2421    );
2422
2423    let max_order: u8 = column_as_number!(max_order);
2424    let amounts = column_as_nullable_string!(amounts)
2425        .and_then(|str| serde_json::from_str(&str).ok())
2426        .unwrap_or_else(|| (0..max_order).map(|m| 2u64.pow(m.into())).collect());
2427
2428    Ok(MintKeySetInfo {
2429        id: column_as_string!(id, Id::from_str, Id::from_bytes),
2430        unit: column_as_string!(unit, CurrencyUnit::from_str),
2431        active: matches!(active, Column::Integer(1)),
2432        valid_from: column_as_number!(valid_from),
2433        derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
2434        derivation_path_index: column_as_nullable_number!(derivation_path_index),
2435        max_order,
2436        amounts,
2437        input_fee_ppk: column_as_number!(row_keyset_ppk),
2438        final_expiry: column_as_nullable_number!(valid_to),
2439    })
2440}
2441
2442#[instrument(skip_all)]
2443fn sql_row_to_mint_quote(
2444    row: Vec<Column>,
2445    payments: Vec<IncomingPayment>,
2446    issueances: Vec<Issuance>,
2447) -> Result<MintQuote, Error> {
2448    unpack_into!(
2449        let (
2450            id, amount, unit, request, expiry, request_lookup_id,
2451            pubkey, created_time, amount_paid, amount_issued, payment_method, request_lookup_id_kind
2452        ) = row
2453    );
2454
2455    let request_str = column_as_string!(&request);
2456    let request_lookup_id = column_as_nullable_string!(&request_lookup_id).unwrap_or_else(|| {
2457        Bolt11Invoice::from_str(&request_str)
2458            .map(|invoice| invoice.payment_hash().to_string())
2459            .unwrap_or_else(|_| request_str.clone())
2460    });
2461    let request_lookup_id_kind = column_as_string!(request_lookup_id_kind);
2462
2463    let pubkey = column_as_nullable_string!(&pubkey)
2464        .map(|pk| PublicKey::from_hex(&pk))
2465        .transpose()?;
2466
2467    let id = column_as_string!(id);
2468    let amount: Option<u64> = column_as_nullable_number!(amount);
2469    let amount_paid: u64 = column_as_number!(amount_paid);
2470    let amount_issued: u64 = column_as_number!(amount_issued);
2471    let payment_method = column_as_string!(payment_method, PaymentMethod::from_str);
2472
2473    Ok(MintQuote::new(
2474        Some(QuoteId::from_str(&id)?),
2475        request_str,
2476        column_as_string!(unit, CurrencyUnit::from_str),
2477        amount.map(Amount::from),
2478        column_as_number!(expiry),
2479        PaymentIdentifier::new(&request_lookup_id_kind, &request_lookup_id)
2480            .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
2481        pubkey,
2482        amount_paid.into(),
2483        amount_issued.into(),
2484        payment_method,
2485        column_as_number!(created_time),
2486        payments,
2487        issueances,
2488    ))
2489}
2490
2491fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<mint::MeltQuote, Error> {
2492    unpack_into!(
2493        let (
2494                id,
2495                unit,
2496                amount,
2497                request,
2498                fee_reserve,
2499                expiry,
2500                state,
2501                payment_preimage,
2502                request_lookup_id,
2503                created_time,
2504                paid_time,
2505                payment_method,
2506                options,
2507                request_lookup_id_kind
2508        ) = row
2509    );
2510
2511    let id = column_as_string!(id);
2512    let amount: u64 = column_as_number!(amount);
2513    let fee_reserve: u64 = column_as_number!(fee_reserve);
2514
2515    let expiry = column_as_number!(expiry);
2516    let payment_preimage = column_as_nullable_string!(payment_preimage);
2517    let options = column_as_nullable_string!(options);
2518    let options = options.and_then(|o| serde_json::from_str(&o).ok());
2519    let created_time: i64 = column_as_number!(created_time);
2520    let paid_time = column_as_nullable_number!(paid_time);
2521    let payment_method = PaymentMethod::from_str(&column_as_string!(payment_method))?;
2522
2523    let state =
2524        MeltQuoteState::from_str(&column_as_string!(&state)).map_err(ConversionError::from)?;
2525
2526    let unit = column_as_string!(unit);
2527    let request = column_as_string!(request);
2528
2529    let request_lookup_id_kind = column_as_nullable_string!(request_lookup_id_kind);
2530
2531    let request_lookup_id = column_as_nullable_string!(&request_lookup_id).or_else(|| {
2532        Bolt11Invoice::from_str(&request)
2533            .ok()
2534            .map(|invoice| invoice.payment_hash().to_string())
2535    });
2536
2537    let request_lookup_id = if let (Some(id_kind), Some(request_lookup_id)) =
2538        (request_lookup_id_kind, request_lookup_id)
2539    {
2540        Some(
2541            PaymentIdentifier::new(&id_kind, &request_lookup_id)
2542                .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
2543        )
2544    } else {
2545        None
2546    };
2547
2548    let request = match serde_json::from_str(&request) {
2549        Ok(req) => req,
2550        Err(err) => {
2551            tracing::debug!(
2552                "Melt quote from pre migrations defaulting to bolt11 {}.",
2553                err
2554            );
2555            let bolt11 = Bolt11Invoice::from_str(&request).unwrap();
2556            MeltPaymentRequest::Bolt11 { bolt11 }
2557        }
2558    };
2559
2560    Ok(MeltQuote {
2561        id: QuoteId::from_str(&id)?,
2562        unit: CurrencyUnit::from_str(&unit)?,
2563        amount: Amount::from(amount),
2564        request,
2565        fee_reserve: Amount::from(fee_reserve),
2566        state,
2567        expiry,
2568        payment_preimage,
2569        request_lookup_id,
2570        options,
2571        created_time: created_time as u64,
2572        paid_time,
2573        payment_method,
2574    })
2575}
2576
2577fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
2578    unpack_into!(
2579        let (
2580            amount,
2581            keyset_id,
2582            secret,
2583            c,
2584            witness
2585        ) = row
2586    );
2587
2588    let amount: u64 = column_as_number!(amount);
2589    Ok(Proof {
2590        amount: Amount::from(amount),
2591        keyset_id: column_as_string!(keyset_id, Id::from_str),
2592        secret: column_as_string!(secret, Secret::from_str),
2593        c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2594        witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
2595        dleq: None,
2596    })
2597}
2598
2599fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
2600    unpack_into!(
2601        let (
2602            keyset_id, amount
2603        ) = row
2604    );
2605
2606    let amount: u64 = column_as_number!(amount);
2607    Ok((
2608        column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2609        Amount::from(amount),
2610    ))
2611}
2612
2613fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, Option<State>), Error> {
2614    unpack_into!(
2615        let (
2616            keyset_id, amount, secret, c, witness, state
2617        ) = row
2618    );
2619
2620    let amount: u64 = column_as_number!(amount);
2621    let state = column_as_nullable_string!(state).and_then(|s| State::from_str(&s).ok());
2622
2623    Ok((
2624        Proof {
2625            amount: Amount::from(amount),
2626            keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2627            secret: column_as_string!(secret, Secret::from_str),
2628            c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2629            witness: column_as_nullable_string!(witness)
2630                .and_then(|w| serde_json::from_str(&w).ok()),
2631            dleq: None,
2632        },
2633        state,
2634    ))
2635}
2636
2637fn sql_row_to_blind_signature(row: Vec<Column>) -> Result<BlindSignature, Error> {
2638    unpack_into!(
2639        let (
2640            keyset_id, amount, c, dleq_e, dleq_s
2641        ) = row
2642    );
2643
2644    let dleq = match (
2645        column_as_nullable_string!(dleq_e),
2646        column_as_nullable_string!(dleq_s),
2647    ) {
2648        (Some(e), Some(s)) => Some(BlindSignatureDleq {
2649            e: SecretKey::from_hex(e)?,
2650            s: SecretKey::from_hex(s)?,
2651        }),
2652        _ => None,
2653    };
2654
2655    let amount: u64 = column_as_number!(amount);
2656
2657    Ok(BlindSignature {
2658        amount: Amount::from(amount),
2659        keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2660        c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2661        dleq,
2662    })
2663}
2664
2665fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
2666    unpack_into!(
2667        let (
2668            operation_id,
2669            operation_kind,
2670            state,
2671            blinded_secrets,
2672            input_ys,
2673            quote_id,
2674            created_at,
2675            updated_at
2676        ) = row
2677    );
2678
2679    let operation_id_str = column_as_string!(&operation_id);
2680    let operation_id = uuid::Uuid::parse_str(&operation_id_str)
2681        .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
2682
2683    let operation_kind_str = column_as_string!(&operation_kind);
2684    let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
2685        .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
2686
2687    let state_str = column_as_string!(&state);
2688    let state = mint::SagaStateEnum::new(operation_kind, &state_str)
2689        .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
2690
2691    let blinded_secrets_str = column_as_string!(&blinded_secrets);
2692    let blinded_secrets: Vec<PublicKey> = serde_json::from_str(&blinded_secrets_str)
2693        .map_err(|e| Error::Internal(format!("Failed to deserialize blinded_secrets: {e}")))?;
2694
2695    let input_ys_str = column_as_string!(&input_ys);
2696    let input_ys: Vec<PublicKey> = serde_json::from_str(&input_ys_str)
2697        .map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {e}")))?;
2698
2699    let quote_id = match &quote_id {
2700        Column::Text(s) => {
2701            if s.is_empty() {
2702                None
2703            } else {
2704                Some(s.clone())
2705            }
2706        }
2707        Column::Null => None,
2708        _ => None,
2709    };
2710
2711    let created_at: u64 = column_as_number!(created_at);
2712    let updated_at: u64 = column_as_number!(updated_at);
2713
2714    Ok(mint::Saga {
2715        operation_id,
2716        operation_kind,
2717        state,
2718        blinded_secrets,
2719        input_ys,
2720        quote_id,
2721        created_at,
2722        updated_at,
2723    })
2724}
2725
2726#[cfg(test)]
2727mod test {
2728    use super::*;
2729
2730    mod max_order_to_amounts_migrations {
2731        use super::*;
2732
2733        #[test]
2734        fn legacy_payload() {
2735            let result = sql_row_to_keyset_info(vec![
2736                Column::Text("0083a60439303340".to_owned()),
2737                Column::Text("sat".to_owned()),
2738                Column::Integer(1),
2739                Column::Integer(1749844864),
2740                Column::Null,
2741                Column::Text("0'/0'/0'".to_owned()),
2742                Column::Integer(0),
2743                Column::Integer(32),
2744                Column::Null,
2745                Column::Integer(0),
2746            ]);
2747            assert!(result.is_ok());
2748        }
2749
2750        #[test]
2751        fn migrated_payload() {
2752            let legacy = sql_row_to_keyset_info(vec![
2753                Column::Text("0083a60439303340".to_owned()),
2754                Column::Text("sat".to_owned()),
2755                Column::Integer(1),
2756                Column::Integer(1749844864),
2757                Column::Null,
2758                Column::Text("0'/0'/0'".to_owned()),
2759                Column::Integer(0),
2760                Column::Integer(32),
2761                Column::Null,
2762                Column::Integer(0),
2763            ]);
2764            assert!(legacy.is_ok());
2765
2766            let amounts = (0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
2767            let migrated = sql_row_to_keyset_info(vec![
2768                Column::Text("0083a60439303340".to_owned()),
2769                Column::Text("sat".to_owned()),
2770                Column::Integer(1),
2771                Column::Integer(1749844864),
2772                Column::Null,
2773                Column::Text("0'/0'/0'".to_owned()),
2774                Column::Integer(0),
2775                Column::Integer(32),
2776                Column::Text(serde_json::to_string(&amounts).expect("valid json")),
2777                Column::Integer(0),
2778            ]);
2779            assert!(migrated.is_ok());
2780            assert_eq!(legacy.unwrap(), migrated.unwrap());
2781        }
2782
2783        #[test]
2784        fn amounts_over_max_order() {
2785            let legacy = sql_row_to_keyset_info(vec![
2786                Column::Text("0083a60439303340".to_owned()),
2787                Column::Text("sat".to_owned()),
2788                Column::Integer(1),
2789                Column::Integer(1749844864),
2790                Column::Null,
2791                Column::Text("0'/0'/0'".to_owned()),
2792                Column::Integer(0),
2793                Column::Integer(32),
2794                Column::Null,
2795                Column::Integer(0),
2796            ]);
2797            assert!(legacy.is_ok());
2798
2799            let amounts = (0..16).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
2800            let migrated = sql_row_to_keyset_info(vec![
2801                Column::Text("0083a60439303340".to_owned()),
2802                Column::Text("sat".to_owned()),
2803                Column::Integer(1),
2804                Column::Integer(1749844864),
2805                Column::Null,
2806                Column::Text("0'/0'/0'".to_owned()),
2807                Column::Integer(0),
2808                Column::Integer(32),
2809                Column::Text(serde_json::to_string(&amounts).expect("valid json")),
2810                Column::Integer(0),
2811            ]);
2812            assert!(migrated.is_ok());
2813            let migrated = migrated.unwrap();
2814            assert_ne!(legacy.unwrap(), migrated);
2815            assert_eq!(migrated.amounts.len(), 16);
2816        }
2817    }
2818}