Skip to main content

cdk_sql_common/wallet/
mod.rs

1//! SQLite Wallet Database
2
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use cdk_common::database::{ConversionError, Error, WalletDatabase};
10use cdk_common::mint_url::MintUrl;
11use cdk_common::nuts::{MeltQuoteState, MintQuoteState};
12use cdk_common::secret::Secret;
13use cdk_common::wallet::{
14    self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
15};
16use cdk_common::{
17    database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod, Proof,
18    ProofDleq, PublicKey, SecretKey, SpendingConditions, State,
19};
20use tracing::instrument;
21use uuid::Uuid;
22
23use crate::common::migrate;
24use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
25use crate::pool::{DatabasePool, Pool, PooledResource};
26use crate::stmt::{query, Column};
27use crate::{
28    column_as_binary, column_as_nullable_binary, column_as_nullable_number,
29    column_as_nullable_string, column_as_number, column_as_string, unpack_into,
30};
31
32#[rustfmt::skip]
33mod migrations {
34    include!(concat!(env!("OUT_DIR"), "/migrations_wallet.rs"));
35}
36
37/// Wallet SQLite Database
38#[derive(Debug, Clone)]
39pub struct SQLWalletDatabase<RM>
40where
41    RM: DatabasePool + 'static,
42{
43    pool: Arc<Pool<RM>>,
44}
45
46impl<RM> SQLWalletDatabase<RM>
47where
48    RM: DatabasePool + 'static,
49{
50    /// Creates a new instance
51    pub async fn new<X>(db: X) -> Result<Self, Error>
52    where
53        X: Into<RM::Config>,
54    {
55        let pool = Pool::new(db.into());
56        Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
57
58        Ok(Self { pool })
59    }
60
61    /// Migrate [`WalletSqliteDatabase`]
62    async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
63        let tx = ConnectionWithTransaction::new(conn).await?;
64        migrate(&tx, RM::Connection::name(), migrations::MIGRATIONS).await?;
65        // Update any existing keys with missing keyset_u32 values
66        Self::add_keyset_u32(&tx).await?;
67        tx.commit().await?;
68
69        Ok(())
70    }
71
72    async fn add_keyset_u32<T>(conn: &T) -> Result<(), Error>
73    where
74        T: DatabaseExecutor,
75    {
76        // First get the keysets where keyset_u32 on key is null
77        let keys_without_u32: Vec<Vec<Column>> = query(
78            r#"
79            SELECT
80                id
81            FROM key
82            WHERE keyset_u32 IS NULL
83            "#,
84        )?
85        .fetch_all(conn)
86        .await?;
87
88        for row in keys_without_u32 {
89            unpack_into!(let (id) = row);
90            let id = column_as_string!(id);
91
92            if let Ok(id) = Id::from_str(&id) {
93                query(
94                    r#"
95            UPDATE
96                key
97            SET keyset_u32 = :u32_keyset
98            WHERE id = :keyset_id
99            "#,
100                )?
101                .bind("u32_keyset", u32::from(id))
102                .bind("keyset_id", id.to_string())
103                .execute(conn)
104                .await?;
105            }
106        }
107
108        // Also update keysets where keyset_u32 is null
109        let keysets_without_u32: Vec<Vec<Column>> = query(
110            r#"
111            SELECT
112                id
113            FROM keyset
114            WHERE keyset_u32 IS NULL
115            "#,
116        )?
117        .fetch_all(conn)
118        .await?;
119
120        for row in keysets_without_u32 {
121            unpack_into!(let (id) = row);
122            let id = column_as_string!(id);
123
124            if let Ok(id) = Id::from_str(&id) {
125                query(
126                    r#"
127            UPDATE
128                keyset
129            SET keyset_u32 = :u32_keyset
130            WHERE id = :keyset_id
131            "#,
132                )?
133                .bind("u32_keyset", u32::from(id))
134                .bind("keyset_id", id.to_string())
135                .execute(conn)
136                .await?;
137            }
138        }
139
140        Ok(())
141    }
142}
143
144#[async_trait]
145impl<RM> WalletDatabase<database::Error> for SQLWalletDatabase<RM>
146where
147    RM: DatabasePool + 'static,
148{
149    #[instrument(skip(self))]
150    async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
151        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
152
153        Ok(query(
154            r#"
155              SELECT
156                  id,
157                  unit,
158                  amount,
159                  request,
160                  fee_reserve,
161                  state,
162                  expiry,
163                  payment_preimage,
164                  payment_method,
165                  used_by_operation,
166                  version
167              FROM
168                  melt_quote
169              "#,
170        )?
171        .fetch_all(&*conn)
172        .await?
173        .into_iter()
174        .map(sql_row_to_melt_quote)
175        .collect::<Result<_, _>>()?)
176    }
177
178    #[instrument(skip(self))]
179    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
180        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
181        Ok(query(
182            r#"
183            SELECT
184                name,
185                pubkey,
186                version,
187                description,
188                description_long,
189                contact,
190                nuts,
191                icon_url,
192                motd,
193                urls,
194                mint_time,
195                tos_url
196            FROM
197                mint
198            WHERE mint_url = :mint_url
199            "#,
200        )?
201        .bind("mint_url", mint_url.to_string())
202        .fetch_one(&*conn)
203        .await?
204        .map(sql_row_to_mint_info)
205        .transpose()?)
206    }
207
208    #[instrument(skip(self))]
209    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
210        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
211        Ok(query(
212            r#"
213                SELECT
214                    name,
215                    pubkey,
216                    version,
217                    description,
218                    description_long,
219                    contact,
220                    nuts,
221                    icon_url,
222                    motd,
223                    urls,
224                    mint_time,
225                    tos_url,
226                    mint_url
227                FROM
228                    mint
229                "#,
230        )?
231        .fetch_all(&*conn)
232        .await?
233        .into_iter()
234        .map(|mut row| {
235            let url = column_as_string!(
236                row.pop().ok_or(ConversionError::MissingColumn(0, 1))?,
237                MintUrl::from_str
238            );
239
240            Ok((url, sql_row_to_mint_info(row).ok()))
241        })
242        .collect::<Result<HashMap<_, _>, Error>>()?)
243    }
244
245    #[instrument(skip(self))]
246    async fn get_mint_keysets(
247        &self,
248        mint_url: MintUrl,
249    ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
250        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
251
252        let keysets = query(
253            r#"
254            SELECT
255                id,
256                unit,
257                active,
258                input_fee_ppk,
259                final_expiry
260            FROM
261                keyset
262            WHERE mint_url = :mint_url
263            "#,
264        )?
265        .bind("mint_url", mint_url.to_string())
266        .fetch_all(&*conn)
267        .await?
268        .into_iter()
269        .map(sql_row_to_keyset)
270        .collect::<Result<Vec<_>, Error>>()?;
271
272        match keysets.is_empty() {
273            false => Ok(Some(keysets)),
274            true => Ok(None),
275        }
276    }
277
278    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
279    async fn get_keyset_by_id(
280        &self,
281        keyset_id: &Id,
282    ) -> Result<Option<KeySetInfo>, database::Error> {
283        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
284        query(
285            r#"
286            SELECT
287                id,
288                unit,
289                active,
290                input_fee_ppk,
291                final_expiry
292            FROM
293                keyset
294            WHERE id = :id
295            "#,
296        )?
297        .bind("id", keyset_id.to_string())
298        .fetch_one(&*conn)
299        .await?
300        .map(sql_row_to_keyset)
301        .transpose()
302    }
303
304    #[instrument(skip(self))]
305    async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
306        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
307        query(
308            r#"
309            SELECT
310                id,
311                mint_url,
312                amount,
313                unit,
314                request,
315                state,
316                expiry,
317                secret_key,
318                payment_method,
319                amount_issued,
320                amount_paid,
321                used_by_operation,
322                version
323            FROM
324                mint_quote
325            WHERE
326                id = :id
327            "#,
328        )?
329        .bind("id", quote_id.to_string())
330        .fetch_one(&*conn)
331        .await?
332        .map(sql_row_to_mint_quote)
333        .transpose()
334    }
335
336    #[instrument(skip(self))]
337    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
338        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
339        Ok(query(
340            r#"
341            SELECT
342                id,
343                mint_url,
344                amount,
345                unit,
346                request,
347                state,
348                expiry,
349                secret_key,
350                payment_method,
351                amount_issued,
352                amount_paid,
353                used_by_operation,
354                version
355            FROM
356                mint_quote
357            "#,
358        )?
359        .fetch_all(&*conn)
360        .await?
361        .into_iter()
362        .map(sql_row_to_mint_quote)
363        .collect::<Result<_, _>>()?)
364    }
365
366    #[instrument(skip(self))]
367    async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
368        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
369        Ok(query(
370            r#"
371            SELECT
372                id,
373                mint_url,
374                amount,
375                unit,
376                request,
377                state,
378                expiry,
379                secret_key,
380                payment_method,
381                amount_issued,
382                amount_paid,
383                used_by_operation,
384                version
385            FROM
386                mint_quote
387            WHERE
388                amount_issued = 0
389                OR
390                payment_method = 'bolt12'
391            "#,
392        )?
393        .fetch_all(&*conn)
394        .await?
395        .into_iter()
396        .map(sql_row_to_mint_quote)
397        .collect::<Result<_, _>>()?)
398    }
399
400    #[instrument(skip(self))]
401    async fn get_melt_quote(
402        &self,
403        quote_id: &str,
404    ) -> Result<Option<wallet::MeltQuote>, database::Error> {
405        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
406        query(
407            r#"
408            SELECT
409                id,
410                unit,
411                amount,
412                request,
413                fee_reserve,
414                state,
415                expiry,
416                payment_preimage,
417                payment_method,
418                used_by_operation,
419                version
420            FROM
421                melt_quote
422            WHERE
423                id=:id
424            "#,
425        )?
426        .bind("id", quote_id.to_owned())
427        .fetch_one(&*conn)
428        .await?
429        .map(sql_row_to_melt_quote)
430        .transpose()
431    }
432
433    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
434    async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
435        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
436        query(
437            r#"
438            SELECT
439                keys
440            FROM key
441            WHERE id = :id
442            "#,
443        )?
444        .bind("id", keyset_id.to_string())
445        .pluck(&*conn)
446        .await?
447        .map(|keys| {
448            let keys = column_as_string!(keys);
449            serde_json::from_str(&keys).map_err(Error::from)
450        })
451        .transpose()
452    }
453
454    #[instrument(skip(self, state, spending_conditions))]
455    async fn get_proofs(
456        &self,
457        mint_url: Option<MintUrl>,
458        unit: Option<CurrencyUnit>,
459        state: Option<Vec<State>>,
460        spending_conditions: Option<Vec<SpendingConditions>>,
461    ) -> Result<Vec<ProofInfo>, database::Error> {
462        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
463        Ok(query(
464            r#"
465            SELECT
466                amount,
467                unit,
468                keyset_id,
469                secret,
470                c,
471                witness,
472                dleq_e,
473                dleq_s,
474                dleq_r,
475                y,
476                mint_url,
477                state,
478                spending_condition,
479                used_by_operation,
480                created_by_operation
481            FROM proof
482            "#,
483        )?
484        .fetch_all(&*conn)
485        .await?
486        .into_iter()
487        .filter_map(|row| {
488            let row = sql_row_to_proof_info(row).ok()?;
489
490            if row.matches_conditions(&mint_url, &unit, &state, &spending_conditions) {
491                Some(row)
492            } else {
493                None
494            }
495        })
496        .collect::<Vec<_>>())
497    }
498
499    #[instrument(skip(self, ys))]
500    async fn get_proofs_by_ys(
501        &self,
502        ys: Vec<PublicKey>,
503    ) -> Result<Vec<ProofInfo>, database::Error> {
504        if ys.is_empty() {
505            return Ok(Vec::new());
506        }
507
508        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
509        Ok(query(
510            r#"
511            SELECT
512                amount,
513                unit,
514                keyset_id,
515                secret,
516                c,
517                witness,
518                dleq_e,
519                dleq_s,
520                dleq_r,
521                y,
522                mint_url,
523                state,
524                spending_condition,
525                used_by_operation,
526                created_by_operation
527            FROM proof
528            WHERE y IN (:ys)
529        "#,
530        )?
531        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
532        .fetch_all(&*conn)
533        .await?
534        .into_iter()
535        .filter_map(|row| sql_row_to_proof_info(row).ok())
536        .collect::<Vec<_>>())
537    }
538
539    async fn get_balance(
540        &self,
541        mint_url: Option<MintUrl>,
542        unit: Option<CurrencyUnit>,
543        states: Option<Vec<State>>,
544    ) -> Result<u64, database::Error> {
545        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
546
547        let mut query_str = "SELECT COALESCE(SUM(amount), 0) as total FROM proof".to_string();
548        let mut where_clauses = Vec::new();
549        let states = states
550            .unwrap_or_default()
551            .into_iter()
552            .map(|x| x.to_string())
553            .collect::<Vec<_>>();
554
555        if mint_url.is_some() {
556            where_clauses.push("mint_url = :mint_url");
557        }
558        if unit.is_some() {
559            where_clauses.push("unit = :unit");
560        }
561        if !states.is_empty() {
562            where_clauses.push("state IN (:states)");
563        }
564
565        if !where_clauses.is_empty() {
566            query_str.push_str(" WHERE ");
567            query_str.push_str(&where_clauses.join(" AND "));
568        }
569
570        let mut q = query(&query_str)?;
571
572        if let Some(ref mint_url) = mint_url {
573            q = q.bind("mint_url", mint_url.to_string());
574        }
575        if let Some(ref unit) = unit {
576            q = q.bind("unit", unit.to_string());
577        }
578
579        if !states.is_empty() {
580            q = q.bind_vec("states", states);
581        }
582
583        let balance = q
584            .pluck(&*conn)
585            .await?
586            .map(|n| {
587                // SQLite SUM returns INTEGER which we need to convert to u64
588                match n {
589                    crate::stmt::Column::Integer(i) => Ok(i as u64),
590                    crate::stmt::Column::Real(f) => Ok(f as u64),
591                    _ => Err(Error::Database(Box::new(std::io::Error::new(
592                        std::io::ErrorKind::InvalidData,
593                        "Invalid balance type",
594                    )))),
595                }
596            })
597            .transpose()?
598            .unwrap_or(0);
599
600        Ok(balance)
601    }
602
603    #[instrument(skip(self))]
604    async fn get_transaction(
605        &self,
606        transaction_id: TransactionId,
607    ) -> Result<Option<Transaction>, database::Error> {
608        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
609        Ok(query(
610            r#"
611            SELECT
612                mint_url,
613                direction,
614                unit,
615                amount,
616                fee,
617                ys,
618                timestamp,
619                memo,
620                metadata,
621                quote_id,
622                payment_request,
623                payment_proof,
624                payment_method,
625                saga_id
626            FROM
627                transactions
628            WHERE
629                id = :id
630            "#,
631        )?
632        .bind("id", transaction_id.as_slice().to_vec())
633        .fetch_one(&*conn)
634        .await?
635        .map(sql_row_to_transaction)
636        .transpose()?)
637    }
638
639    #[instrument(skip(self))]
640    async fn list_transactions(
641        &self,
642        mint_url: Option<MintUrl>,
643        direction: Option<TransactionDirection>,
644        unit: Option<CurrencyUnit>,
645    ) -> Result<Vec<Transaction>, database::Error> {
646        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
647
648        Ok(query(
649            r#"
650            SELECT
651                mint_url,
652                direction,
653                unit,
654                amount,
655                fee,
656                ys,
657                timestamp,
658                memo,
659                metadata,
660                quote_id,
661                payment_request,
662                payment_proof,
663                payment_method,
664                saga_id
665            FROM
666                transactions
667            "#,
668        )?
669        .fetch_all(&*conn)
670        .await?
671        .into_iter()
672        .filter_map(|row| {
673            // TODO: Avoid a table scan by passing the heavy lifting of checking to the DB engine
674            let transaction = sql_row_to_transaction(row).ok()?;
675            if transaction.matches_conditions(&mint_url, &direction, &unit) {
676                Some(transaction)
677            } else {
678                None
679            }
680        })
681        .collect::<Vec<_>>())
682    }
683
684    #[instrument(skip(self))]
685    async fn update_proofs(
686        &self,
687        added: Vec<ProofInfo>,
688        removed_ys: Vec<PublicKey>,
689    ) -> Result<(), database::Error> {
690        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
691        let tx = ConnectionWithTransaction::new(conn).await?;
692
693        for proof in added {
694            query(
695                r#"
696    INSERT INTO proof
697    (y, mint_url, state, spending_condition, unit, amount, keyset_id, secret, c, witness, dleq_e, dleq_s, dleq_r, used_by_operation, created_by_operation)
698    VALUES
699    (:y, :mint_url, :state, :spending_condition, :unit, :amount, :keyset_id, :secret, :c, :witness, :dleq_e, :dleq_s, :dleq_r, :used_by_operation, :created_by_operation)
700    ON CONFLICT(y) DO UPDATE SET
701        mint_url = excluded.mint_url,
702        state = excluded.state,
703        spending_condition = excluded.spending_condition,
704        unit = excluded.unit,
705        amount = excluded.amount,
706        keyset_id = excluded.keyset_id,
707        secret = excluded.secret,
708        c = excluded.c,
709        witness = excluded.witness,
710        dleq_e = excluded.dleq_e,
711        dleq_s = excluded.dleq_s,
712        dleq_r = excluded.dleq_r,
713        used_by_operation = excluded.used_by_operation,
714        created_by_operation = excluded.created_by_operation
715    ;
716            "#,
717            )?
718            .bind("y", proof.y.to_bytes().to_vec())
719            .bind("mint_url", proof.mint_url.to_string())
720            .bind("state", proof.state.to_string())
721            .bind(
722                "spending_condition",
723                proof
724                    .spending_condition
725                    .map(|s| serde_json::to_string(&s).ok()),
726            )
727            .bind("unit", proof.unit.to_string())
728            .bind("amount", u64::from(proof.proof.amount) as i64)
729            .bind("keyset_id", proof.proof.keyset_id.to_string())
730            .bind("secret", proof.proof.secret.to_string())
731            .bind("c", proof.proof.c.to_bytes().to_vec())
732            .bind(
733                "witness",
734                proof
735                    .proof
736                    .witness
737                    .and_then(|w| serde_json::to_string(&w).ok()),
738            )
739            .bind(
740                "dleq_e",
741                proof.proof.dleq.as_ref().map(|dleq| dleq.e.to_secret_bytes().to_vec()),
742            )
743            .bind(
744                "dleq_s",
745                proof.proof.dleq.as_ref().map(|dleq| dleq.s.to_secret_bytes().to_vec()),
746            )
747            .bind(
748                "dleq_r",
749                proof.proof.dleq.as_ref().map(|dleq| dleq.r.to_secret_bytes().to_vec()),
750            )
751            .bind("used_by_operation", proof.used_by_operation.map(|id| id.to_string()))
752            .bind("created_by_operation", proof.created_by_operation.map(|id| id.to_string()))
753            .execute(&tx)
754            .await?;
755        }
756
757        if !removed_ys.is_empty() {
758            query(r#"DELETE FROM proof WHERE y IN (:ys)"#)?
759                .bind_vec(
760                    "ys",
761                    removed_ys.iter().map(|y| y.to_bytes().to_vec()).collect(),
762                )
763                .execute(&tx)
764                .await?;
765        }
766
767        tx.commit().await?;
768
769        Ok(())
770    }
771
772    #[instrument(skip(self))]
773    async fn update_proofs_state(
774        &self,
775        ys: Vec<PublicKey>,
776        state: State,
777    ) -> Result<(), database::Error> {
778        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
779
780        query("UPDATE proof SET state = :state WHERE y IN (:ys)")?
781            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
782            .bind("state", state.to_string())
783            .execute(&*conn)
784            .await?;
785
786        Ok(())
787    }
788
789    #[instrument(skip(self))]
790    async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
791        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
792
793        let mint_url = transaction.mint_url.to_string();
794        let direction = transaction.direction.to_string();
795        let unit = transaction.unit.to_string();
796        let amount = u64::from(transaction.amount) as i64;
797        let fee = u64::from(transaction.fee) as i64;
798        let ys = transaction
799            .ys
800            .iter()
801            .flat_map(|y| y.to_bytes().to_vec())
802            .collect::<Vec<_>>();
803
804        let id = transaction.id();
805
806        query(
807               r#"
808   INSERT INTO transactions
809   (id, mint_url, direction, unit, amount, fee, ys, timestamp, memo, metadata, quote_id, payment_request, payment_proof, payment_method, saga_id)
810   VALUES
811   (:id, :mint_url, :direction, :unit, :amount, :fee, :ys, :timestamp, :memo, :metadata, :quote_id, :payment_request, :payment_proof, :payment_method, :saga_id)
812   ON CONFLICT(id) DO UPDATE SET
813       mint_url = excluded.mint_url,
814       direction = excluded.direction,
815       unit = excluded.unit,
816       amount = excluded.amount,
817       fee = excluded.fee,
818       timestamp = excluded.timestamp,
819       memo = excluded.memo,
820       metadata = excluded.metadata,
821       quote_id = excluded.quote_id,
822       payment_request = excluded.payment_request,
823       payment_proof = excluded.payment_proof,
824       payment_method = excluded.payment_method,
825       saga_id = excluded.saga_id
826   ;
827           "#,
828           )?
829           .bind("id", id.as_slice().to_vec())
830           .bind("mint_url", mint_url)
831           .bind("direction", direction)
832           .bind("unit", unit)
833           .bind("amount", amount)
834           .bind("fee", fee)
835           .bind("ys", ys)
836           .bind("timestamp", transaction.timestamp as i64)
837           .bind("memo", transaction.memo)
838           .bind(
839               "metadata",
840               serde_json::to_string(&transaction.metadata).map_err(Error::from)?,
841           )
842           .bind("quote_id", transaction.quote_id)
843           .bind("payment_request", transaction.payment_request)
844           .bind("payment_proof", transaction.payment_proof)
845           .bind("payment_method", transaction.payment_method.map(|pm| pm.to_string()))
846           .bind("saga_id", transaction.saga_id.map(|id| id.to_string()))
847           .execute(&*conn)
848           .await?;
849
850        Ok(())
851    }
852
853    #[instrument(skip(self))]
854    async fn update_mint_url(
855        &self,
856        old_mint_url: MintUrl,
857        new_mint_url: MintUrl,
858    ) -> Result<(), database::Error> {
859        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
860        let tx = ConnectionWithTransaction::new(conn).await?;
861        let tables = ["mint_quote", "proof"];
862
863        for table in &tables {
864            query(&format!(
865                r#"
866                UPDATE {table}
867                SET mint_url = :new_mint_url
868                WHERE mint_url = :old_mint_url
869            "#
870            ))?
871            .bind("new_mint_url", new_mint_url.to_string())
872            .bind("old_mint_url", old_mint_url.to_string())
873            .execute(&tx)
874            .await?;
875        }
876
877        tx.commit().await?;
878
879        Ok(())
880    }
881
882    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
883    async fn increment_keyset_counter(
884        &self,
885        keyset_id: &Id,
886        count: u32,
887    ) -> Result<u32, database::Error> {
888        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
889
890        let new_counter = query(
891            r#"
892            INSERT INTO keyset_counter (keyset_id, counter)
893            VALUES (:keyset_id, :count)
894            ON CONFLICT(keyset_id) DO UPDATE SET
895                counter = keyset_counter.counter + :count
896            RETURNING counter
897            "#,
898        )?
899        .bind("keyset_id", keyset_id.to_string())
900        .bind("count", count)
901        .pluck(&*conn)
902        .await?
903        .map(|n| Ok::<_, Error>(column_as_number!(n)))
904        .transpose()?
905        .ok_or_else(|| Error::Internal("Counter update returned no value".to_owned()))?;
906
907        Ok(new_counter)
908    }
909
910    #[instrument(skip(self, mint_info))]
911    async fn add_mint(
912        &self,
913        mint_url: MintUrl,
914        mint_info: Option<MintInfo>,
915    ) -> Result<(), database::Error> {
916        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
917
918        let (
919            name,
920            pubkey,
921            version,
922            description,
923            description_long,
924            contact,
925            nuts,
926            icon_url,
927            urls,
928            motd,
929            time,
930            tos_url,
931        ) = match mint_info {
932            Some(mint_info) => {
933                let MintInfo {
934                    name,
935                    pubkey,
936                    version,
937                    description,
938                    description_long,
939                    contact,
940                    nuts,
941                    icon_url,
942                    urls,
943                    motd,
944                    time,
945                    tos_url,
946                } = mint_info;
947
948                (
949                    name,
950                    pubkey.map(|p| p.to_bytes().to_vec()),
951                    version.map(|v| serde_json::to_string(&v).ok()),
952                    description,
953                    description_long,
954                    contact.map(|c| serde_json::to_string(&c).ok()),
955                    serde_json::to_string(&nuts).ok(),
956                    icon_url,
957                    urls.map(|c| serde_json::to_string(&c).ok()),
958                    motd,
959                    time,
960                    tos_url,
961                )
962            }
963            None => (
964                None, None, None, None, None, None, None, None, None, None, None, None,
965            ),
966        };
967
968        query(
969            r#"
970   INSERT INTO mint
971   (
972       mint_url, name, pubkey, version, description, description_long,
973       contact, nuts, icon_url, urls, motd, mint_time, tos_url
974   )
975   VALUES
976   (
977       :mint_url, :name, :pubkey, :version, :description, :description_long,
978       :contact, :nuts, :icon_url, :urls, :motd, :mint_time, :tos_url
979   )
980   ON CONFLICT(mint_url) DO UPDATE SET
981       name = excluded.name,
982       pubkey = excluded.pubkey,
983       version = excluded.version,
984       description = excluded.description,
985       description_long = excluded.description_long,
986       contact = excluded.contact,
987       nuts = excluded.nuts,
988       icon_url = excluded.icon_url,
989       urls = excluded.urls,
990       motd = excluded.motd,
991       mint_time = excluded.mint_time,
992       tos_url = excluded.tos_url
993   ;
994           "#,
995        )?
996        .bind("mint_url", mint_url.to_string())
997        .bind("name", name)
998        .bind("pubkey", pubkey)
999        .bind("version", version)
1000        .bind("description", description)
1001        .bind("description_long", description_long)
1002        .bind("contact", contact)
1003        .bind("nuts", nuts)
1004        .bind("icon_url", icon_url)
1005        .bind("urls", urls)
1006        .bind("motd", motd)
1007        .bind("mint_time", time.map(|v| v as i64))
1008        .bind("tos_url", tos_url)
1009        .execute(&*conn)
1010        .await?;
1011
1012        Ok(())
1013    }
1014
1015    #[instrument(skip(self))]
1016    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
1017        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1018
1019        query(r#"DELETE FROM mint WHERE mint_url=:mint_url"#)?
1020            .bind("mint_url", mint_url.to_string())
1021            .execute(&*conn)
1022            .await?;
1023
1024        Ok(())
1025    }
1026
1027    #[instrument(skip(self, keysets))]
1028    async fn add_mint_keysets(
1029        &self,
1030        mint_url: MintUrl,
1031        keysets: Vec<KeySetInfo>,
1032    ) -> Result<(), database::Error> {
1033        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1034        let tx = ConnectionWithTransaction::new(conn).await?;
1035
1036        for keyset in keysets {
1037            query(
1038                r#"
1039        INSERT INTO keyset
1040        (mint_url, id, unit, active, input_fee_ppk, final_expiry, keyset_u32)
1041        VALUES
1042        (:mint_url, :id, :unit, :active, :input_fee_ppk, :final_expiry, :keyset_u32)
1043        ON CONFLICT(id) DO UPDATE SET
1044            active = excluded.active,
1045            input_fee_ppk = excluded.input_fee_ppk
1046        "#,
1047            )?
1048            .bind("mint_url", mint_url.to_string())
1049            .bind("id", keyset.id.to_string())
1050            .bind("unit", keyset.unit.to_string())
1051            .bind("active", keyset.active)
1052            .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
1053            .bind("final_expiry", keyset.final_expiry.map(|v| v as i64))
1054            .bind("keyset_u32", u32::from(keyset.id))
1055            .execute(&tx)
1056            .await?;
1057        }
1058
1059        tx.commit().await?;
1060
1061        Ok(())
1062    }
1063
1064    #[instrument(skip_all)]
1065    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
1066        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1067
1068        let expected_version = quote.version;
1069        let new_version = expected_version.wrapping_add(1);
1070
1071        let rows_affected = query(
1072                r#"
1073    INSERT INTO mint_quote
1074    (id, mint_url, amount, unit, request, state, expiry, secret_key, payment_method, amount_issued, amount_paid, version)
1075    VALUES
1076    (:id, :mint_url, :amount, :unit, :request, :state, :expiry, :secret_key, :payment_method, :amount_issued, :amount_paid, :version)
1077    ON CONFLICT(id) DO UPDATE SET
1078        mint_url = excluded.mint_url,
1079        amount = excluded.amount,
1080        unit = excluded.unit,
1081        request = excluded.request,
1082        state = excluded.state,
1083        expiry = excluded.expiry,
1084        secret_key = excluded.secret_key,
1085        payment_method = excluded.payment_method,
1086        amount_issued = excluded.amount_issued,
1087        amount_paid = excluded.amount_paid,
1088        version = :new_version
1089    WHERE mint_quote.version = :expected_version
1090    ;
1091            "#,
1092            )?
1093            .bind("id", quote.id.to_string())
1094            .bind("mint_url", quote.mint_url.to_string())
1095            .bind("amount", quote.amount.map(|a| a.to_i64()))
1096            .bind("unit", quote.unit.to_string())
1097            .bind("request", quote.request)
1098            .bind("state", quote.state.to_string())
1099            .bind("expiry", quote.expiry as i64)
1100            .bind("secret_key", quote.secret_key.map(|p| p.to_string()))
1101            .bind("payment_method", quote.payment_method.to_string())
1102            .bind("amount_issued", quote.amount_issued.to_i64())
1103            .bind("amount_paid", quote.amount_paid.to_i64())
1104            .bind("version", quote.version as i64)
1105            .bind("new_version", new_version as i64)
1106            .bind("expected_version", expected_version as i64)
1107            .execute(&*conn).await?;
1108
1109        if rows_affected == 0 {
1110            return Err(database::Error::ConcurrentUpdate);
1111        }
1112
1113        Ok(())
1114    }
1115
1116    #[instrument(skip(self))]
1117    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1118        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1119
1120        query(r#"DELETE FROM mint_quote WHERE id=:id"#)?
1121            .bind("id", quote_id.to_string())
1122            .execute(&*conn)
1123            .await?;
1124
1125        Ok(())
1126    }
1127
1128    #[instrument(skip_all)]
1129    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
1130        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1131
1132        let expected_version = quote.version;
1133        let new_version = expected_version.wrapping_add(1);
1134
1135        let rows_affected = query(
1136            r#"
1137 INSERT INTO melt_quote
1138 (id, unit, amount, request, fee_reserve, state, expiry, payment_method, version)
1139 VALUES
1140 (:id, :unit, :amount, :request, :fee_reserve, :state, :expiry, :payment_method, :version)
1141 ON CONFLICT(id) DO UPDATE SET
1142     unit = excluded.unit,
1143     amount = excluded.amount,
1144     request = excluded.request,
1145     fee_reserve = excluded.fee_reserve,
1146     state = excluded.state,
1147     expiry = excluded.expiry,
1148     payment_method = excluded.payment_method,
1149     version = :new_version
1150 WHERE melt_quote.version = :expected_version
1151 ;
1152         "#,
1153        )?
1154        .bind("id", quote.id.to_string())
1155        .bind("unit", quote.unit.to_string())
1156        .bind("amount", u64::from(quote.amount) as i64)
1157        .bind("request", quote.request)
1158        .bind("fee_reserve", u64::from(quote.fee_reserve) as i64)
1159        .bind("state", quote.state.to_string())
1160        .bind("expiry", quote.expiry as i64)
1161        .bind("payment_method", quote.payment_method.to_string())
1162        .bind("version", quote.version as i64)
1163        .bind("new_version", new_version as i64)
1164        .bind("expected_version", expected_version as i64)
1165        .execute(&*conn)
1166        .await?;
1167
1168        if rows_affected == 0 {
1169            return Err(database::Error::ConcurrentUpdate);
1170        }
1171
1172        Ok(())
1173    }
1174
1175    #[instrument(skip(self))]
1176    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1177        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1178
1179        query(r#"DELETE FROM melt_quote WHERE id=:id"#)?
1180            .bind("id", quote_id.to_owned())
1181            .execute(&*conn)
1182            .await?;
1183
1184        Ok(())
1185    }
1186
1187    #[instrument(skip_all)]
1188    async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
1189        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1190
1191        keyset.verify_id()?;
1192
1193        query(
1194            r#"
1195                INSERT INTO key
1196                (id, keys, keyset_u32)
1197                VALUES
1198                (:id, :keys, :keyset_u32)
1199            "#,
1200        )?
1201        .bind("id", keyset.id.to_string())
1202        .bind(
1203            "keys",
1204            serde_json::to_string(&keyset.keys).map_err(Error::from)?,
1205        )
1206        .bind("keyset_u32", u32::from(keyset.id))
1207        .execute(&*conn)
1208        .await?;
1209
1210        Ok(())
1211    }
1212
1213    #[instrument(skip(self))]
1214    async fn remove_keys(&self, id: &Id) -> Result<(), database::Error> {
1215        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1216
1217        query(r#"DELETE FROM key WHERE id = :id"#)?
1218            .bind("id", id.to_string())
1219            .execute(&*conn)
1220            .await?;
1221
1222        Ok(())
1223    }
1224
1225    #[instrument(skip(self))]
1226    async fn remove_transaction(
1227        &self,
1228        transaction_id: TransactionId,
1229    ) -> Result<(), database::Error> {
1230        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1231
1232        query(r#"DELETE FROM transactions WHERE id=:id"#)?
1233            .bind("id", transaction_id.as_slice().to_vec())
1234            .execute(&*conn)
1235            .await?;
1236
1237        Ok(())
1238    }
1239
1240    #[instrument(skip(self))]
1241    async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
1242        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1243
1244        let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1245            Error::Database(Box::new(std::io::Error::new(
1246                std::io::ErrorKind::InvalidData,
1247                format!("Failed to serialize saga state: {}", e),
1248            )))
1249        })?;
1250
1251        let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1252            Error::Database(Box::new(std::io::Error::new(
1253                std::io::ErrorKind::InvalidData,
1254                format!("Failed to serialize saga data: {}", e),
1255            )))
1256        })?;
1257
1258        query(
1259            r#"
1260            INSERT INTO wallet_sagas
1261            (id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version)
1262            VALUES
1263            (:id, :kind, :state, :amount, :mint_url, :unit, :quote_id, :created_at, :updated_at, :data, :version)
1264            "#,
1265        )?
1266        .bind("id", saga.id.to_string())
1267        .bind("kind", saga.kind.to_string())
1268        .bind("state", state_json)
1269        .bind("amount", u64::from(saga.amount) as i64)
1270        .bind("mint_url", saga.mint_url.to_string())
1271        .bind("unit", saga.unit.to_string())
1272        .bind("quote_id", saga.quote_id)
1273        .bind("created_at", saga.created_at as i64)
1274        .bind("updated_at", saga.updated_at as i64)
1275        .bind("data", data_json)
1276        .bind("version", saga.version as i64)
1277        .execute(&*conn)
1278        .await?;
1279
1280        Ok(())
1281    }
1282
1283    #[instrument(skip(self))]
1284    async fn get_saga(
1285        &self,
1286        id: &uuid::Uuid,
1287    ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1288        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1289
1290        let rows = query(
1291            r#"
1292            SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1293            FROM wallet_sagas
1294            WHERE id = :id
1295            "#,
1296        )?
1297        .bind("id", id.to_string())
1298        .fetch_all(&*conn)
1299        .await?;
1300
1301        match rows.into_iter().next() {
1302            Some(row) => Ok(Some(sql_row_to_wallet_saga(row)?)),
1303            None => Ok(None),
1304        }
1305    }
1306
1307    #[instrument(skip(self))]
1308    async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1309        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1310
1311        let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1312            Error::Database(Box::new(std::io::Error::new(
1313                std::io::ErrorKind::InvalidData,
1314                format!("Failed to serialize saga state: {}", e),
1315            )))
1316        })?;
1317
1318        let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1319            Error::Database(Box::new(std::io::Error::new(
1320                std::io::ErrorKind::InvalidData,
1321                format!("Failed to serialize saga data: {}", e),
1322            )))
1323        })?;
1324
1325        // Optimistic locking: only update if the version matches the expected value.
1326        // The saga.version has already been incremented by the caller, so we check
1327        // for (saga.version - 1) in the WHERE clause.
1328        let expected_version = saga.version.saturating_sub(1);
1329
1330        let rows_affected = query(
1331            r#"
1332            UPDATE wallet_sagas
1333            SET kind = :kind, state = :state, amount = :amount, mint_url = :mint_url,
1334                unit = :unit, quote_id = :quote_id, updated_at = :updated_at, data = :data,
1335                version = :new_version
1336            WHERE id = :id AND version = :expected_version
1337            "#,
1338        )?
1339        .bind("id", saga.id.to_string())
1340        .bind("kind", saga.kind.to_string())
1341        .bind("state", state_json)
1342        .bind("amount", u64::from(saga.amount) as i64)
1343        .bind("mint_url", saga.mint_url.to_string())
1344        .bind("unit", saga.unit.to_string())
1345        .bind("quote_id", saga.quote_id)
1346        .bind("updated_at", saga.updated_at as i64)
1347        .bind("data", data_json)
1348        .bind("new_version", saga.version as i64)
1349        .bind("expected_version", expected_version as i64)
1350        .execute(&*conn)
1351        .await?;
1352
1353        // Return true if the update succeeded (version matched), false if version mismatch
1354        Ok(rows_affected > 0)
1355    }
1356
1357    #[instrument(skip(self))]
1358    async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1359        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1360
1361        query(r#"DELETE FROM wallet_sagas WHERE id = :id"#)?
1362            .bind("id", id.to_string())
1363            .execute(&*conn)
1364            .await?;
1365
1366        Ok(())
1367    }
1368
1369    #[instrument(skip(self))]
1370    async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1371        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1372
1373        let rows = query(
1374            r#"
1375            SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1376            FROM wallet_sagas
1377            ORDER BY created_at ASC
1378            "#,
1379        )?
1380        .fetch_all(&*conn)
1381        .await?;
1382
1383        rows.into_iter().map(sql_row_to_wallet_saga).collect()
1384    }
1385
1386    #[instrument(skip(self))]
1387    async fn reserve_proofs(
1388        &self,
1389        ys: Vec<PublicKey>,
1390        operation_id: &uuid::Uuid,
1391    ) -> Result<(), database::Error> {
1392        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1393
1394        for y in ys {
1395            let rows_affected = query(
1396                r#"
1397                UPDATE proof
1398                SET state = 'RESERVED', used_by_operation = :operation_id
1399                WHERE y = :y AND state = 'UNSPENT'
1400                "#,
1401            )?
1402            .bind("y", y.to_bytes().to_vec())
1403            .bind("operation_id", operation_id.to_string())
1404            .execute(&*conn)
1405            .await?;
1406
1407            if rows_affected == 0 {
1408                return Err(database::Error::ProofNotUnspent);
1409            }
1410        }
1411
1412        Ok(())
1413    }
1414
1415    #[instrument(skip(self))]
1416    async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1417        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1418
1419        query(
1420            r#"
1421            UPDATE proof
1422            SET state = 'UNSPENT', used_by_operation = NULL
1423            WHERE used_by_operation = :operation_id
1424            "#,
1425        )?
1426        .bind("operation_id", operation_id.to_string())
1427        .execute(&*conn)
1428        .await?;
1429
1430        Ok(())
1431    }
1432
1433    #[instrument(skip(self))]
1434    async fn get_reserved_proofs(
1435        &self,
1436        operation_id: &uuid::Uuid,
1437    ) -> Result<Vec<ProofInfo>, database::Error> {
1438        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1439
1440        let rows = query(
1441            r#"
1442            SELECT
1443                amount,
1444                unit,
1445                keyset_id,
1446                secret,
1447                c,
1448                witness,
1449                dleq_e,
1450                dleq_s,
1451                dleq_r,
1452                y,
1453                mint_url,
1454                state,
1455                spending_condition,
1456                used_by_operation,
1457                created_by_operation
1458            FROM proof
1459            WHERE used_by_operation = :operation_id
1460            "#,
1461        )?
1462        .bind("operation_id", operation_id.to_string())
1463        .fetch_all(&*conn)
1464        .await?;
1465
1466        rows.into_iter().map(sql_row_to_proof_info).collect()
1467    }
1468
1469    #[instrument(skip(self))]
1470    async fn reserve_melt_quote(
1471        &self,
1472        quote_id: &str,
1473        operation_id: &uuid::Uuid,
1474    ) -> Result<(), database::Error> {
1475        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1476
1477        let rows_affected = query(
1478            r#"
1479            UPDATE melt_quote
1480            SET used_by_operation = :operation_id
1481            WHERE id = :quote_id AND used_by_operation IS NULL
1482            "#,
1483        )?
1484        .bind("operation_id", operation_id.to_string())
1485        .bind("quote_id", quote_id)
1486        .execute(&*conn)
1487        .await?;
1488
1489        if rows_affected == 0 {
1490            // Check if the quote exists
1491            let exists = query(
1492                r#"
1493                SELECT 1 FROM melt_quote WHERE id = :quote_id
1494                "#,
1495            )?
1496            .bind("quote_id", quote_id)
1497            .fetch_one(&*conn)
1498            .await?;
1499
1500            if exists.is_none() {
1501                return Err(database::Error::UnknownQuote);
1502            }
1503            return Err(database::Error::QuoteAlreadyInUse);
1504        }
1505
1506        Ok(())
1507    }
1508
1509    #[instrument(skip(self))]
1510    async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1511        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1512
1513        query(
1514            r#"
1515            UPDATE melt_quote
1516            SET used_by_operation = NULL
1517            WHERE used_by_operation = :operation_id
1518            "#,
1519        )?
1520        .bind("operation_id", operation_id.to_string())
1521        .execute(&*conn)
1522        .await?;
1523
1524        Ok(())
1525    }
1526
1527    #[instrument(skip(self))]
1528    async fn reserve_mint_quote(
1529        &self,
1530        quote_id: &str,
1531        operation_id: &uuid::Uuid,
1532    ) -> Result<(), database::Error> {
1533        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1534
1535        let rows_affected = query(
1536            r#"
1537            UPDATE mint_quote
1538            SET used_by_operation = :operation_id
1539            WHERE id = :quote_id AND used_by_operation IS NULL
1540            "#,
1541        )?
1542        .bind("operation_id", operation_id.to_string())
1543        .bind("quote_id", quote_id)
1544        .execute(&*conn)
1545        .await?;
1546
1547        if rows_affected == 0 {
1548            // Check if the quote exists
1549            let exists = query(
1550                r#"
1551                SELECT 1 FROM mint_quote WHERE id = :quote_id
1552                "#,
1553            )?
1554            .bind("quote_id", quote_id)
1555            .fetch_one(&*conn)
1556            .await?;
1557
1558            if exists.is_none() {
1559                return Err(database::Error::UnknownQuote);
1560            }
1561            return Err(database::Error::QuoteAlreadyInUse);
1562        }
1563
1564        Ok(())
1565    }
1566
1567    #[instrument(skip(self))]
1568    async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1569        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1570
1571        query(
1572            r#"
1573            UPDATE mint_quote
1574            SET used_by_operation = NULL
1575            WHERE used_by_operation = :operation_id
1576            "#,
1577        )?
1578        .bind("operation_id", operation_id.to_string())
1579        .execute(&*conn)
1580        .await?;
1581
1582        Ok(())
1583    }
1584
1585    async fn kv_read(
1586        &self,
1587        primary_namespace: &str,
1588        secondary_namespace: &str,
1589        key: &str,
1590    ) -> Result<Option<Vec<u8>>, database::Error> {
1591        crate::keyvalue::kv_read(&self.pool, primary_namespace, secondary_namespace, key).await
1592    }
1593
1594    async fn kv_list(
1595        &self,
1596        primary_namespace: &str,
1597        secondary_namespace: &str,
1598    ) -> Result<Vec<String>, database::Error> {
1599        crate::keyvalue::kv_list(&self.pool, primary_namespace, secondary_namespace).await
1600    }
1601
1602    async fn kv_write(
1603        &self,
1604        primary_namespace: &str,
1605        secondary_namespace: &str,
1606        key: &str,
1607        value: &[u8],
1608    ) -> Result<(), database::Error> {
1609        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1610        crate::keyvalue::kv_write_standalone(
1611            &*conn,
1612            primary_namespace,
1613            secondary_namespace,
1614            key,
1615            value,
1616        )
1617        .await?;
1618        Ok(())
1619    }
1620
1621    async fn kv_remove(
1622        &self,
1623        primary_namespace: &str,
1624        secondary_namespace: &str,
1625        key: &str,
1626    ) -> Result<(), database::Error> {
1627        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1628        crate::keyvalue::kv_remove_standalone(&*conn, primary_namespace, secondary_namespace, key)
1629            .await?;
1630        Ok(())
1631    }
1632}
1633
1634fn sql_row_to_mint_info(row: Vec<Column>) -> Result<MintInfo, Error> {
1635    unpack_into!(
1636        let (
1637            name,
1638            pubkey,
1639            version,
1640            description,
1641            description_long,
1642            contact,
1643            nuts,
1644            icon_url,
1645            motd,
1646            urls,
1647            mint_time,
1648            tos_url
1649        ) = row
1650    );
1651
1652    Ok(MintInfo {
1653        name: column_as_nullable_string!(&name),
1654        pubkey: column_as_nullable_string!(&pubkey, |v| serde_json::from_str(v).ok(), |v| {
1655            serde_json::from_slice(v).ok()
1656        }),
1657        version: column_as_nullable_string!(&version).and_then(|v| serde_json::from_str(&v).ok()),
1658        description: column_as_nullable_string!(description),
1659        description_long: column_as_nullable_string!(description_long),
1660        contact: column_as_nullable_string!(contact, |v| serde_json::from_str(&v).ok()),
1661        nuts: column_as_nullable_string!(nuts, |v| serde_json::from_str(&v).ok())
1662            .unwrap_or_default(),
1663        urls: column_as_nullable_string!(urls, |v| serde_json::from_str(&v).ok()),
1664        icon_url: column_as_nullable_string!(icon_url),
1665        motd: column_as_nullable_string!(motd),
1666        time: column_as_nullable_number!(mint_time).map(|t| t),
1667        tos_url: column_as_nullable_string!(tos_url),
1668    })
1669}
1670
1671#[instrument(skip_all)]
1672fn sql_row_to_keyset(row: Vec<Column>) -> Result<KeySetInfo, Error> {
1673    unpack_into!(
1674        let (
1675            id,
1676            unit,
1677            active,
1678            input_fee_ppk,
1679            final_expiry
1680        ) = row
1681    );
1682
1683    Ok(KeySetInfo {
1684        id: column_as_string!(id, Id::from_str, Id::from_bytes),
1685        unit: column_as_string!(unit, CurrencyUnit::from_str),
1686        active: matches!(active, Column::Integer(1)),
1687        input_fee_ppk: column_as_nullable_number!(input_fee_ppk).unwrap_or(0),
1688        final_expiry: column_as_nullable_number!(final_expiry),
1689    })
1690}
1691
1692fn sql_row_to_mint_quote(row: Vec<Column>) -> Result<MintQuote, Error> {
1693    unpack_into!(
1694        let (
1695            id,
1696            mint_url,
1697            amount,
1698            unit,
1699            request,
1700            state,
1701            expiry,
1702            secret_key,
1703            row_method,
1704            row_amount_minted,
1705            row_amount_paid,
1706            used_by_operation,
1707            version
1708        ) = row
1709    );
1710
1711    let amount: Option<i64> = column_as_nullable_number!(amount);
1712
1713    let amount_paid: u64 = column_as_number!(row_amount_paid);
1714    let amount_minted: u64 = column_as_number!(row_amount_minted);
1715    let expiry_val: u64 = column_as_number!(expiry);
1716    let version_val: u32 = column_as_number!(version);
1717    let payment_method =
1718        PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1719
1720    Ok(MintQuote {
1721        id: column_as_string!(id),
1722        mint_url: column_as_string!(mint_url, MintUrl::from_str),
1723        amount: amount.and_then(Amount::from_i64),
1724        unit: column_as_string!(unit, CurrencyUnit::from_str),
1725        request: column_as_string!(request),
1726        state: column_as_string!(state, MintQuoteState::from_str),
1727        expiry: expiry_val,
1728        secret_key: column_as_nullable_string!(secret_key, |s| SecretKey::from_str(&s).ok()),
1729        payment_method,
1730        amount_issued: Amount::from(amount_minted),
1731        amount_paid: Amount::from(amount_paid),
1732        used_by_operation: column_as_nullable_string!(used_by_operation),
1733        version: version_val,
1734    })
1735}
1736
1737fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<wallet::MeltQuote, Error> {
1738    unpack_into!(
1739        let (
1740            id,
1741            unit,
1742            amount,
1743            request,
1744            fee_reserve,
1745            state,
1746            expiry,
1747            payment_preimage,
1748            row_method,
1749            used_by_operation,
1750            version
1751        ) = row
1752    );
1753
1754    let payment_method =
1755        PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1756
1757    let amount_val: u64 = column_as_number!(amount);
1758    let fee_reserve_val: u64 = column_as_number!(fee_reserve);
1759    let expiry_val: u64 = column_as_number!(expiry);
1760    let version_val: u32 = column_as_number!(version);
1761
1762    Ok(wallet::MeltQuote {
1763        id: column_as_string!(id),
1764        unit: column_as_string!(unit, CurrencyUnit::from_str),
1765        amount: Amount::from(amount_val),
1766        request: column_as_string!(request),
1767        fee_reserve: Amount::from(fee_reserve_val),
1768        state: column_as_string!(state, MeltQuoteState::from_str),
1769        expiry: expiry_val,
1770        payment_preimage: column_as_nullable_string!(payment_preimage),
1771        payment_method,
1772        used_by_operation: column_as_nullable_string!(used_by_operation),
1773        version: version_val,
1774    })
1775}
1776
1777fn sql_row_to_proof_info(row: Vec<Column>) -> Result<ProofInfo, Error> {
1778    unpack_into!(
1779        let (
1780            amount,
1781            unit,
1782            keyset_id,
1783            secret,
1784            c,
1785            witness,
1786            dleq_e,
1787            dleq_s,
1788            dleq_r,
1789            y,
1790            mint_url,
1791            state,
1792            spending_condition,
1793            used_by_operation,
1794            created_by_operation
1795        ) = row
1796    );
1797
1798    let dleq = match (
1799        column_as_nullable_binary!(dleq_e),
1800        column_as_nullable_binary!(dleq_s),
1801        column_as_nullable_binary!(dleq_r),
1802    ) {
1803        (Some(e), Some(s), Some(r)) => {
1804            let e_key = SecretKey::from_slice(&e)?;
1805            let s_key = SecretKey::from_slice(&s)?;
1806            let r_key = SecretKey::from_slice(&r)?;
1807
1808            Some(ProofDleq::new(e_key, s_key, r_key))
1809        }
1810        _ => None,
1811    };
1812
1813    let amount: u64 = column_as_number!(amount);
1814    let proof = Proof {
1815        amount: Amount::from(amount),
1816        keyset_id: column_as_string!(keyset_id, Id::from_str),
1817        secret: column_as_string!(secret, Secret::from_str),
1818        witness: column_as_nullable_string!(witness, |v| { serde_json::from_str(&v).ok() }, |v| {
1819            serde_json::from_slice(&v).ok()
1820        }),
1821        c: column_as_string!(c, PublicKey::from_str, PublicKey::from_slice),
1822        dleq,
1823    };
1824
1825    let used_by_operation =
1826        column_as_nullable_string!(used_by_operation).and_then(|id| Uuid::from_str(&id).ok());
1827    let created_by_operation =
1828        column_as_nullable_string!(created_by_operation).and_then(|id| Uuid::from_str(&id).ok());
1829
1830    Ok(ProofInfo {
1831        proof,
1832        y: column_as_string!(y, PublicKey::from_str, PublicKey::from_slice),
1833        mint_url: column_as_string!(mint_url, MintUrl::from_str),
1834        state: column_as_string!(state, State::from_str),
1835        spending_condition: column_as_nullable_string!(
1836            spending_condition,
1837            |r| { serde_json::from_str(&r).ok() },
1838            |r| { serde_json::from_slice(&r).ok() }
1839        ),
1840        unit: column_as_string!(unit, CurrencyUnit::from_str),
1841        used_by_operation,
1842        created_by_operation,
1843    })
1844}
1845
1846fn sql_row_to_wallet_saga(row: Vec<Column>) -> Result<wallet::WalletSaga, Error> {
1847    unpack_into!(
1848        let (
1849            id,
1850            kind,
1851            state,
1852            amount,
1853            mint_url,
1854            unit,
1855            quote_id,
1856            created_at,
1857            updated_at,
1858            data,
1859            version
1860        ) = row
1861    );
1862
1863    let id_str: String = column_as_string!(id);
1864    let id = uuid::Uuid::parse_str(&id_str).map_err(|e| {
1865        Error::Database(Box::new(std::io::Error::new(
1866            std::io::ErrorKind::InvalidData,
1867            format!("Invalid UUID: {}", e),
1868        )))
1869    })?;
1870    let kind_str: String = column_as_string!(kind);
1871    let state_json: String = column_as_string!(state);
1872    let amount: u64 = column_as_number!(amount);
1873    let mint_url: MintUrl = column_as_string!(mint_url, MintUrl::from_str);
1874    let unit: CurrencyUnit = column_as_string!(unit, CurrencyUnit::from_str);
1875    let quote_id: Option<String> = column_as_nullable_string!(quote_id);
1876    let created_at: u64 = column_as_number!(created_at);
1877    let updated_at: u64 = column_as_number!(updated_at);
1878    let data_json: String = column_as_string!(data);
1879    let version: u32 = column_as_number!(version);
1880
1881    let kind = wallet::OperationKind::from_str(&kind_str).map_err(|_| {
1882        Error::Database(Box::new(std::io::Error::new(
1883            std::io::ErrorKind::InvalidData,
1884            format!("Invalid operation kind: {}", kind_str),
1885        )))
1886    })?;
1887    let state: wallet::WalletSagaState = serde_json::from_str(&state_json).map_err(|e| {
1888        Error::Database(Box::new(std::io::Error::new(
1889            std::io::ErrorKind::InvalidData,
1890            format!("Failed to deserialize saga state: {}", e),
1891        )))
1892    })?;
1893    let data: wallet::OperationData = serde_json::from_str(&data_json).map_err(|e| {
1894        Error::Database(Box::new(std::io::Error::new(
1895            std::io::ErrorKind::InvalidData,
1896            format!("Failed to deserialize saga data: {}", e),
1897        )))
1898    })?;
1899
1900    Ok(wallet::WalletSaga {
1901        id,
1902        kind,
1903        state,
1904        amount: Amount::from(amount),
1905        mint_url,
1906        unit,
1907        quote_id,
1908        created_at,
1909        updated_at,
1910        data,
1911        version,
1912    })
1913}
1914
1915fn sql_row_to_transaction(row: Vec<Column>) -> Result<Transaction, Error> {
1916    unpack_into!(
1917        let (
1918            mint_url,
1919            direction,
1920            unit,
1921            amount,
1922            fee,
1923            ys,
1924            timestamp,
1925            memo,
1926            metadata,
1927            quote_id,
1928            payment_request,
1929            payment_proof,
1930            payment_method,
1931            saga_id
1932        ) = row
1933    );
1934
1935    let amount: u64 = column_as_number!(amount);
1936    let fee: u64 = column_as_number!(fee);
1937
1938    let saga_id: Option<Uuid> = column_as_nullable_string!(saga_id)
1939        .map(|id| Uuid::from_str(&id).ok())
1940        .flatten();
1941
1942    Ok(Transaction {
1943        mint_url: column_as_string!(mint_url, MintUrl::from_str),
1944        direction: column_as_string!(direction, TransactionDirection::from_str),
1945        unit: column_as_string!(unit, CurrencyUnit::from_str),
1946        amount: Amount::from(amount),
1947        fee: Amount::from(fee),
1948        ys: column_as_binary!(ys)
1949            .chunks(33)
1950            .map(PublicKey::from_slice)
1951            .collect::<Result<Vec<_>, _>>()?,
1952        timestamp: column_as_number!(timestamp),
1953        memo: column_as_nullable_string!(memo),
1954        metadata: column_as_nullable_string!(metadata, |v| serde_json::from_str(&v).ok(), |v| {
1955            serde_json::from_slice(&v).ok()
1956        })
1957        .unwrap_or_default(),
1958        quote_id: column_as_nullable_string!(quote_id),
1959        payment_request: column_as_nullable_string!(payment_request),
1960        payment_proof: column_as_nullable_string!(payment_proof),
1961        payment_method: column_as_nullable_string!(payment_method)
1962            .map(|v| PaymentMethod::from_str(&v))
1963            .transpose()
1964            .map_err(Error::from)?,
1965        saga_id,
1966    })
1967}