Skip to main content

cdk_sql_common/wallet/
mod.rs

1//! SQLite Wallet Database
2
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use bitcoin::bip32::DerivationPath;
10use cdk_common::database::{ConversionError, Error, WalletDatabase};
11use cdk_common::mint_url::MintUrl;
12use cdk_common::nuts::{MeltQuoteState, MintQuoteState};
13use cdk_common::secret::Secret;
14use cdk_common::util::unix_time;
15use cdk_common::wallet::{
16    self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
17};
18use cdk_common::{
19    database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod, Proof,
20    ProofDleq, PublicKey, SecretKey, SpendingConditions, State,
21};
22use tracing::instrument;
23use uuid::Uuid;
24
25use crate::common::migrate;
26use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
27use crate::pool::{DatabasePool, Pool, PooledResource};
28use crate::stmt::{query, Column};
29use crate::{
30    column_as_binary, column_as_nullable_binary, column_as_nullable_number,
31    column_as_nullable_string, column_as_number, column_as_string, unpack_into,
32};
33
34#[rustfmt::skip]
35mod migrations {
36    include!(concat!(env!("OUT_DIR"), "/migrations_wallet.rs"));
37}
38
39/// Wallet SQLite Database
40#[derive(Debug, Clone)]
41pub struct SQLWalletDatabase<RM>
42where
43    RM: DatabasePool + 'static,
44{
45    pool: Arc<Pool<RM>>,
46}
47
48impl<RM> SQLWalletDatabase<RM>
49where
50    RM: DatabasePool + 'static,
51{
52    /// Creates a new instance
53    pub async fn new<X>(db: X) -> Result<Self, Error>
54    where
55        X: Into<RM::Config>,
56    {
57        let pool = Pool::new(db.into());
58        Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
59
60        Ok(Self { pool })
61    }
62
63    /// Migrate [`WalletSqliteDatabase`]
64    async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
65        let tx = ConnectionWithTransaction::new(conn).await?;
66        migrate(&tx, RM::Connection::name(), migrations::MIGRATIONS).await?;
67        // Update any existing keys with missing keyset_u32 values
68        Self::add_keyset_u32(&tx).await?;
69        tx.commit().await?;
70
71        Ok(())
72    }
73
74    async fn add_keyset_u32<T>(conn: &T) -> Result<(), Error>
75    where
76        T: DatabaseExecutor,
77    {
78        // First get the keysets where keyset_u32 on key is null
79        let keys_without_u32: Vec<Vec<Column>> = query(
80            r#"
81            SELECT
82                id
83            FROM key
84            WHERE keyset_u32 IS NULL
85            "#,
86        )?
87        .fetch_all(conn)
88        .await?;
89
90        for row in keys_without_u32 {
91            unpack_into!(let (id) = row);
92            let id = column_as_string!(id);
93
94            if let Ok(id) = Id::from_str(&id) {
95                query(
96                    r#"
97            UPDATE
98                key
99            SET keyset_u32 = :u32_keyset
100            WHERE id = :keyset_id
101            "#,
102                )?
103                .bind("u32_keyset", u32::from(id))
104                .bind("keyset_id", id.to_string())
105                .execute(conn)
106                .await?;
107            }
108        }
109
110        // Also update keysets where keyset_u32 is null
111        let keysets_without_u32: Vec<Vec<Column>> = query(
112            r#"
113            SELECT
114                id
115            FROM keyset
116            WHERE keyset_u32 IS NULL
117            "#,
118        )?
119        .fetch_all(conn)
120        .await?;
121
122        for row in keysets_without_u32 {
123            unpack_into!(let (id) = row);
124            let id = column_as_string!(id);
125
126            if let Ok(id) = Id::from_str(&id) {
127                query(
128                    r#"
129            UPDATE
130                keyset
131            SET keyset_u32 = :u32_keyset
132            WHERE id = :keyset_id
133            "#,
134                )?
135                .bind("u32_keyset", u32::from(id))
136                .bind("keyset_id", id.to_string())
137                .execute(conn)
138                .await?;
139            }
140        }
141
142        Ok(())
143    }
144}
145
146#[async_trait]
147impl<RM> WalletDatabase<database::Error> for SQLWalletDatabase<RM>
148where
149    RM: DatabasePool + 'static,
150{
151    #[instrument(skip(self))]
152    async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
153        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
154
155        Ok(query(
156            r#"
157              SELECT
158                  id,
159                  unit,
160                  amount,
161                  request,
162                  fee_reserve,
163                  state,
164                  expiry,
165                  payment_preimage,
166                  payment_method,
167                  used_by_operation,
168                  version,
169                  mint_url
170              FROM
171                  melt_quote
172              "#,
173        )?
174        .fetch_all(&*conn)
175        .await?
176        .into_iter()
177        .map(sql_row_to_melt_quote)
178        .collect::<Result<_, _>>()?)
179    }
180
181    #[instrument(skip(self))]
182    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
183        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
184        Ok(query(
185            r#"
186            SELECT
187                name,
188                pubkey,
189                version,
190                description,
191                description_long,
192                contact,
193                nuts,
194                icon_url,
195                motd,
196                urls,
197                mint_time,
198                tos_url
199            FROM
200                mint
201            WHERE mint_url = :mint_url
202            "#,
203        )?
204        .bind("mint_url", mint_url.to_string())
205        .fetch_one(&*conn)
206        .await?
207        .map(sql_row_to_mint_info)
208        .transpose()?)
209    }
210
211    #[instrument(skip(self))]
212    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
213        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
214        Ok(query(
215            r#"
216                SELECT
217                    name,
218                    pubkey,
219                    version,
220                    description,
221                    description_long,
222                    contact,
223                    nuts,
224                    icon_url,
225                    motd,
226                    urls,
227                    mint_time,
228                    tos_url,
229                    mint_url
230                FROM
231                    mint
232                "#,
233        )?
234        .fetch_all(&*conn)
235        .await?
236        .into_iter()
237        .map(|mut row| {
238            let url = column_as_string!(
239                row.pop().ok_or(ConversionError::MissingColumn(0, 1))?,
240                MintUrl::from_str
241            );
242
243            Ok((url, sql_row_to_mint_info(row).ok()))
244        })
245        .collect::<Result<HashMap<_, _>, Error>>()?)
246    }
247
248    #[instrument(skip(self))]
249    async fn get_mint_keysets(
250        &self,
251        mint_url: MintUrl,
252    ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
253        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
254
255        let keysets = query(
256            r#"
257            SELECT
258                id,
259                unit,
260                active,
261                input_fee_ppk,
262                final_expiry
263            FROM
264                keyset
265            WHERE mint_url = :mint_url
266            "#,
267        )?
268        .bind("mint_url", mint_url.to_string())
269        .fetch_all(&*conn)
270        .await?
271        .into_iter()
272        .map(sql_row_to_keyset)
273        .collect::<Result<Vec<_>, Error>>()?;
274
275        match keysets.is_empty() {
276            false => Ok(Some(keysets)),
277            true => Ok(None),
278        }
279    }
280
281    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
282    async fn get_keyset_by_id(
283        &self,
284        keyset_id: &Id,
285    ) -> Result<Option<KeySetInfo>, database::Error> {
286        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
287        query(
288            r#"
289            SELECT
290                id,
291                unit,
292                active,
293                input_fee_ppk,
294                final_expiry
295            FROM
296                keyset
297            WHERE id = :id
298            "#,
299        )?
300        .bind("id", keyset_id.to_string())
301        .fetch_one(&*conn)
302        .await?
303        .map(sql_row_to_keyset)
304        .transpose()
305    }
306
307    #[instrument(skip(self))]
308    async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
309        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
310        query(
311            r#"
312            SELECT
313                id,
314                mint_url,
315                amount,
316                unit,
317                request,
318                state,
319                expiry,
320                secret_key,
321                payment_method,
322                amount_issued,
323                amount_paid,
324                used_by_operation,
325                version
326            FROM
327                mint_quote
328            WHERE
329                id = :id
330            "#,
331        )?
332        .bind("id", quote_id.to_string())
333        .fetch_one(&*conn)
334        .await?
335        .map(sql_row_to_mint_quote)
336        .transpose()
337    }
338
339    #[instrument(skip(self))]
340    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
341        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
342        Ok(query(
343            r#"
344            SELECT
345                id,
346                mint_url,
347                amount,
348                unit,
349                request,
350                state,
351                expiry,
352                secret_key,
353                payment_method,
354                amount_issued,
355                amount_paid,
356                used_by_operation,
357                version
358            FROM
359                mint_quote
360            "#,
361        )?
362        .fetch_all(&*conn)
363        .await?
364        .into_iter()
365        .map(sql_row_to_mint_quote)
366        .collect::<Result<_, _>>()?)
367    }
368
369    #[instrument(skip(self))]
370    async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
371        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
372        Ok(query(
373            r#"
374            SELECT
375                id,
376                mint_url,
377                amount,
378                unit,
379                request,
380                state,
381                expiry,
382                secret_key,
383                payment_method,
384                amount_issued,
385                amount_paid,
386                used_by_operation,
387                version
388            FROM
389                mint_quote
390            WHERE
391                amount_issued = 0
392                OR
393                payment_method = 'bolt12'
394            "#,
395        )?
396        .fetch_all(&*conn)
397        .await?
398        .into_iter()
399        .map(sql_row_to_mint_quote)
400        .collect::<Result<_, _>>()?)
401    }
402
403    #[instrument(skip(self))]
404    async fn get_melt_quote(
405        &self,
406        quote_id: &str,
407    ) -> Result<Option<wallet::MeltQuote>, database::Error> {
408        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
409        query(
410            r#"
411            SELECT
412                id,
413                unit,
414                amount,
415                request,
416                fee_reserve,
417                state,
418                expiry,
419                payment_preimage,
420                payment_method,
421                used_by_operation,
422                version,
423                mint_url
424            FROM
425                melt_quote
426            WHERE
427                id=:id
428            "#,
429        )?
430        .bind("id", quote_id.to_owned())
431        .fetch_one(&*conn)
432        .await?
433        .map(sql_row_to_melt_quote)
434        .transpose()
435    }
436
437    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
438    async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
439        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
440        query(
441            r#"
442            SELECT
443                keys
444            FROM key
445            WHERE id = :id
446            "#,
447        )?
448        .bind("id", keyset_id.to_string())
449        .pluck(&*conn)
450        .await?
451        .map(|keys| {
452            let keys = column_as_string!(keys);
453            serde_json::from_str(&keys).map_err(Error::from)
454        })
455        .transpose()
456    }
457
458    #[instrument(skip(self, state, spending_conditions))]
459    async fn get_proofs(
460        &self,
461        mint_url: Option<MintUrl>,
462        unit: Option<CurrencyUnit>,
463        state: Option<Vec<State>>,
464        spending_conditions: Option<Vec<SpendingConditions>>,
465    ) -> Result<Vec<ProofInfo>, database::Error> {
466        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
467        Ok(query(
468            r#"
469            SELECT
470                amount,
471                unit,
472                keyset_id,
473                secret,
474                c,
475                witness,
476                dleq_e,
477                dleq_s,
478                dleq_r,
479                y,
480                mint_url,
481                state,
482                spending_condition,
483                used_by_operation,
484                created_by_operation,
485                p2pk_e
486            FROM proof
487            "#,
488        )?
489        .fetch_all(&*conn)
490        .await?
491        .into_iter()
492        .filter_map(|row| {
493            let row = sql_row_to_proof_info(row).ok()?;
494
495            if row.matches_conditions(&mint_url, &unit, &state, &spending_conditions) {
496                Some(row)
497            } else {
498                None
499            }
500        })
501        .collect::<Vec<_>>())
502    }
503
504    #[instrument(skip(self, ys))]
505    async fn get_proofs_by_ys(
506        &self,
507        ys: Vec<PublicKey>,
508    ) -> Result<Vec<ProofInfo>, database::Error> {
509        if ys.is_empty() {
510            return Ok(Vec::new());
511        }
512
513        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
514        Ok(query(
515            r#"
516            SELECT
517                amount,
518                unit,
519                keyset_id,
520                secret,
521                c,
522                witness,
523                dleq_e,
524                dleq_s,
525                dleq_r,
526                y,
527                mint_url,
528                state,
529                spending_condition,
530                used_by_operation,
531                created_by_operation,
532                p2pk_e
533            FROM proof
534            WHERE y IN (:ys)
535        "#,
536        )?
537        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
538        .fetch_all(&*conn)
539        .await?
540        .into_iter()
541        .filter_map(|row| sql_row_to_proof_info(row).ok())
542        .collect::<Vec<_>>())
543    }
544
545    async fn get_balance(
546        &self,
547        mint_url: Option<MintUrl>,
548        unit: Option<CurrencyUnit>,
549        states: Option<Vec<State>>,
550    ) -> Result<u64, database::Error> {
551        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
552
553        let mut query_str = "SELECT COALESCE(SUM(amount), 0) as total FROM proof".to_string();
554        let mut where_clauses = Vec::new();
555        let states = states
556            .unwrap_or_default()
557            .into_iter()
558            .map(|x| x.to_string())
559            .collect::<Vec<_>>();
560
561        if mint_url.is_some() {
562            where_clauses.push("mint_url = :mint_url");
563        }
564        if unit.is_some() {
565            where_clauses.push("unit = :unit");
566        }
567        if !states.is_empty() {
568            where_clauses.push("state IN (:states)");
569        }
570
571        if !where_clauses.is_empty() {
572            query_str.push_str(" WHERE ");
573            query_str.push_str(&where_clauses.join(" AND "));
574        }
575
576        let mut q = query(&query_str)?;
577
578        if let Some(ref mint_url) = mint_url {
579            q = q.bind("mint_url", mint_url.to_string());
580        }
581        if let Some(ref unit) = unit {
582            q = q.bind("unit", unit.to_string());
583        }
584
585        if !states.is_empty() {
586            q = q.bind_vec("states", states);
587        }
588
589        let balance = q
590            .pluck(&*conn)
591            .await?
592            .map(|n| {
593                // SQLite SUM returns INTEGER which we need to convert to u64
594                match n {
595                    crate::stmt::Column::Integer(i) => Ok(i as u64),
596                    crate::stmt::Column::Real(f) => Ok(f as u64),
597                    _ => Err(Error::Database(Box::new(std::io::Error::new(
598                        std::io::ErrorKind::InvalidData,
599                        "Invalid balance type",
600                    )))),
601                }
602            })
603            .transpose()?
604            .unwrap_or(0);
605
606        Ok(balance)
607    }
608
609    #[instrument(skip(self))]
610    async fn get_transaction(
611        &self,
612        transaction_id: TransactionId,
613    ) -> Result<Option<Transaction>, database::Error> {
614        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
615        Ok(query(
616            r#"
617            SELECT
618                mint_url,
619                direction,
620                unit,
621                amount,
622                fee,
623                ys,
624                timestamp,
625                memo,
626                metadata,
627                quote_id,
628                payment_request,
629                payment_proof,
630                payment_method,
631                saga_id
632            FROM
633                transactions
634            WHERE
635                id = :id
636            "#,
637        )?
638        .bind("id", transaction_id.as_slice().to_vec())
639        .fetch_one(&*conn)
640        .await?
641        .map(sql_row_to_transaction)
642        .transpose()?)
643    }
644
645    #[instrument(skip(self))]
646    async fn list_transactions(
647        &self,
648        mint_url: Option<MintUrl>,
649        direction: Option<TransactionDirection>,
650        unit: Option<CurrencyUnit>,
651    ) -> Result<Vec<Transaction>, database::Error> {
652        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
653
654        Ok(query(
655            r#"
656            SELECT
657                mint_url,
658                direction,
659                unit,
660                amount,
661                fee,
662                ys,
663                timestamp,
664                memo,
665                metadata,
666                quote_id,
667                payment_request,
668                payment_proof,
669                payment_method,
670                saga_id
671            FROM
672                transactions
673            "#,
674        )?
675        .fetch_all(&*conn)
676        .await?
677        .into_iter()
678        .filter_map(|row| {
679            // TODO: Avoid a table scan by passing the heavy lifting of checking to the DB engine
680            let transaction = sql_row_to_transaction(row).ok()?;
681            if transaction.matches_conditions(&mint_url, &direction, &unit) {
682                Some(transaction)
683            } else {
684                None
685            }
686        })
687        .collect::<Vec<_>>())
688    }
689
690    async fn update_proofs(
691        &self,
692        added: Vec<ProofInfo>,
693        removed_ys: Vec<PublicKey>,
694    ) -> Result<(), database::Error> {
695        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
696        let tx = ConnectionWithTransaction::new(conn).await?;
697
698        for proof in added {
699            query(
700                r#"
701    INSERT INTO proof
702    (y, mint_url, state, spending_condition, unit, amount, keyset_id, secret, c, witness, dleq_e, dleq_s, dleq_r, used_by_operation, created_by_operation, p2pk_e)
703    VALUES
704    (:y, :mint_url, :state, :spending_condition, :unit, :amount, :keyset_id, :secret, :c, :witness, :dleq_e, :dleq_s, :dleq_r, :used_by_operation, :created_by_operation, :p2pk_e)
705    ON CONFLICT(y) DO UPDATE SET
706        mint_url = excluded.mint_url,
707        state = excluded.state,
708        spending_condition = excluded.spending_condition,
709        unit = excluded.unit,
710        amount = excluded.amount,
711        keyset_id = excluded.keyset_id,
712        secret = excluded.secret,
713        c = excluded.c,
714        witness = excluded.witness,
715        dleq_e = excluded.dleq_e,
716        dleq_s = excluded.dleq_s,
717        dleq_r = excluded.dleq_r,
718        used_by_operation = excluded.used_by_operation,
719        created_by_operation = excluded.created_by_operation,
720        p2pk_e = excluded.p2pk_e
721    ;
722            "#,
723            )?
724            .bind("y", proof.y.to_bytes().to_vec())
725            .bind("mint_url", proof.mint_url.to_string())
726            .bind("state", proof.state.to_string())
727            .bind(
728                "spending_condition",
729                proof
730                    .spending_condition
731                    .map(|s| serde_json::to_string(&s).ok()),
732            )
733            .bind("unit", proof.unit.to_string())
734            .bind("amount", u64::from(proof.proof.amount) as i64)
735            .bind("keyset_id", proof.proof.keyset_id.to_string())
736            .bind("secret", proof.proof.secret.to_string())
737            .bind("c", proof.proof.c.to_bytes().to_vec())
738            .bind(
739                "witness",
740                proof
741                    .proof
742                    .witness
743                    .and_then(|w| serde_json::to_string(&w).ok()),
744            )
745            .bind(
746                "dleq_e",
747                proof.proof.dleq.as_ref().map(|dleq| dleq.e.to_secret_bytes().to_vec()),
748            )
749            .bind(
750                "dleq_s",
751                proof.proof.dleq.as_ref().map(|dleq| dleq.s.to_secret_bytes().to_vec()),
752            )
753            .bind(
754                "dleq_r",
755                proof.proof.dleq.as_ref().map(|dleq| dleq.r.to_secret_bytes().to_vec()),
756            )
757            .bind("used_by_operation", proof.used_by_operation.map(|id| id.to_string()))
758            .bind("created_by_operation", proof.created_by_operation.map(|id| id.to_string()))
759            .bind(
760                "p2pk_e",
761                proof
762                    .proof
763                    .p2pk_e
764                    .as_ref()
765                    .map(|pk| pk.to_bytes().to_vec()),
766            )
767            .execute(&tx)
768            .await?;
769        }
770
771        if !removed_ys.is_empty() {
772            query(r#"DELETE FROM proof WHERE y IN (:ys)"#)?
773                .bind_vec(
774                    "ys",
775                    removed_ys.iter().map(|y| y.to_bytes().to_vec()).collect(),
776                )
777                .execute(&tx)
778                .await?;
779        }
780
781        tx.commit().await?;
782
783        Ok(())
784    }
785
786    #[instrument(skip(self))]
787    async fn update_proofs_state(
788        &self,
789        ys: Vec<PublicKey>,
790        state: State,
791    ) -> Result<(), database::Error> {
792        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
793
794        query("UPDATE proof SET state = :state WHERE y IN (:ys)")?
795            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
796            .bind("state", state.to_string())
797            .execute(&*conn)
798            .await?;
799
800        Ok(())
801    }
802
803    #[instrument(skip(self))]
804    async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
805        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
806
807        let mint_url = transaction.mint_url.to_string();
808        let direction = transaction.direction.to_string();
809        let unit = transaction.unit.to_string();
810        let amount = u64::from(transaction.amount) as i64;
811        let fee = u64::from(transaction.fee) as i64;
812        let ys = transaction
813            .ys
814            .iter()
815            .flat_map(|y| y.to_bytes().to_vec())
816            .collect::<Vec<_>>();
817
818        let id = transaction.id();
819
820        query(
821               r#"
822   INSERT INTO transactions
823   (id, mint_url, direction, unit, amount, fee, ys, timestamp, memo, metadata, quote_id, payment_request, payment_proof, payment_method, saga_id)
824   VALUES
825   (:id, :mint_url, :direction, :unit, :amount, :fee, :ys, :timestamp, :memo, :metadata, :quote_id, :payment_request, :payment_proof, :payment_method, :saga_id)
826   ON CONFLICT(id) DO UPDATE SET
827       mint_url = excluded.mint_url,
828       direction = excluded.direction,
829       unit = excluded.unit,
830       amount = excluded.amount,
831       fee = excluded.fee,
832       timestamp = excluded.timestamp,
833       memo = excluded.memo,
834       metadata = excluded.metadata,
835       quote_id = excluded.quote_id,
836       payment_request = excluded.payment_request,
837       payment_proof = excluded.payment_proof,
838       payment_method = excluded.payment_method,
839       saga_id = excluded.saga_id
840   ;
841           "#,
842           )?
843           .bind("id", id.as_slice().to_vec())
844           .bind("mint_url", mint_url)
845           .bind("direction", direction)
846           .bind("unit", unit)
847           .bind("amount", amount)
848           .bind("fee", fee)
849           .bind("ys", ys)
850           .bind("timestamp", transaction.timestamp as i64)
851           .bind("memo", transaction.memo)
852           .bind(
853               "metadata",
854               serde_json::to_string(&transaction.metadata).map_err(Error::from)?,
855           )
856           .bind("quote_id", transaction.quote_id)
857           .bind("payment_request", transaction.payment_request)
858           .bind("payment_proof", transaction.payment_proof)
859           .bind("payment_method", transaction.payment_method.map(|pm| pm.to_string()))
860           .bind("saga_id", transaction.saga_id.map(|id| id.to_string()))
861           .execute(&*conn)
862           .await?;
863
864        Ok(())
865    }
866
867    #[instrument(skip(self))]
868    async fn update_mint_url(
869        &self,
870        old_mint_url: MintUrl,
871        new_mint_url: MintUrl,
872    ) -> Result<(), database::Error> {
873        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
874        let tx = ConnectionWithTransaction::new(conn).await?;
875        let tables = ["mint_quote", "proof"];
876
877        for table in &tables {
878            query(&format!(
879                r#"
880                UPDATE {table}
881                SET mint_url = :new_mint_url
882                WHERE mint_url = :old_mint_url
883            "#
884            ))?
885            .bind("new_mint_url", new_mint_url.to_string())
886            .bind("old_mint_url", old_mint_url.to_string())
887            .execute(&tx)
888            .await?;
889        }
890
891        tx.commit().await?;
892
893        Ok(())
894    }
895
896    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
897    async fn increment_keyset_counter(
898        &self,
899        keyset_id: &Id,
900        count: u32,
901    ) -> Result<u32, database::Error> {
902        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
903
904        let new_counter = query(
905            r#"
906            INSERT INTO keyset_counter (keyset_id, counter)
907            VALUES (:keyset_id, :count)
908            ON CONFLICT(keyset_id) DO UPDATE SET
909                counter = keyset_counter.counter + :count
910            RETURNING counter
911            "#,
912        )?
913        .bind("keyset_id", keyset_id.to_string())
914        .bind("count", count)
915        .pluck(&*conn)
916        .await?
917        .map(|n| Ok::<_, Error>(column_as_number!(n)))
918        .transpose()?
919        .ok_or_else(|| Error::Internal("Counter update returned no value".to_owned()))?;
920
921        Ok(new_counter)
922    }
923
924    #[instrument(skip(self, mint_info))]
925    async fn add_mint(
926        &self,
927        mint_url: MintUrl,
928        mint_info: Option<MintInfo>,
929    ) -> Result<(), database::Error> {
930        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
931
932        let (
933            name,
934            pubkey,
935            version,
936            description,
937            description_long,
938            contact,
939            nuts,
940            icon_url,
941            urls,
942            motd,
943            time,
944            tos_url,
945        ) = match mint_info {
946            Some(mint_info) => {
947                let MintInfo {
948                    name,
949                    pubkey,
950                    version,
951                    description,
952                    description_long,
953                    contact,
954                    nuts,
955                    icon_url,
956                    urls,
957                    motd,
958                    time,
959                    tos_url,
960                } = mint_info;
961
962                (
963                    name,
964                    pubkey.map(|p| p.to_bytes().to_vec()),
965                    version.map(|v| serde_json::to_string(&v).ok()),
966                    description,
967                    description_long,
968                    contact.map(|c| serde_json::to_string(&c).ok()),
969                    serde_json::to_string(&nuts).ok(),
970                    icon_url,
971                    urls.map(|c| serde_json::to_string(&c).ok()),
972                    motd,
973                    time,
974                    tos_url,
975                )
976            }
977            None => (
978                None, None, None, None, None, None, None, None, None, None, None, None,
979            ),
980        };
981
982        query(
983            r#"
984   INSERT INTO mint
985   (
986       mint_url, name, pubkey, version, description, description_long,
987       contact, nuts, icon_url, urls, motd, mint_time, tos_url
988   )
989   VALUES
990   (
991       :mint_url, :name, :pubkey, :version, :description, :description_long,
992       :contact, :nuts, :icon_url, :urls, :motd, :mint_time, :tos_url
993   )
994   ON CONFLICT(mint_url) DO UPDATE SET
995       name = excluded.name,
996       pubkey = excluded.pubkey,
997       version = excluded.version,
998       description = excluded.description,
999       description_long = excluded.description_long,
1000       contact = excluded.contact,
1001       nuts = excluded.nuts,
1002       icon_url = excluded.icon_url,
1003       urls = excluded.urls,
1004       motd = excluded.motd,
1005       mint_time = excluded.mint_time,
1006       tos_url = excluded.tos_url
1007   ;
1008           "#,
1009        )?
1010        .bind("mint_url", mint_url.to_string())
1011        .bind("name", name)
1012        .bind("pubkey", pubkey)
1013        .bind("version", version)
1014        .bind("description", description)
1015        .bind("description_long", description_long)
1016        .bind("contact", contact)
1017        .bind("nuts", nuts)
1018        .bind("icon_url", icon_url)
1019        .bind("urls", urls)
1020        .bind("motd", motd)
1021        .bind("mint_time", time.map(|v| v as i64))
1022        .bind("tos_url", tos_url)
1023        .execute(&*conn)
1024        .await?;
1025
1026        Ok(())
1027    }
1028
1029    #[instrument(skip(self))]
1030    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
1031        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1032
1033        query(r#"DELETE FROM mint WHERE mint_url=:mint_url"#)?
1034            .bind("mint_url", mint_url.to_string())
1035            .execute(&*conn)
1036            .await?;
1037
1038        Ok(())
1039    }
1040
1041    #[instrument(skip(self, keysets))]
1042    async fn add_mint_keysets(
1043        &self,
1044        mint_url: MintUrl,
1045        keysets: Vec<KeySetInfo>,
1046    ) -> Result<(), database::Error> {
1047        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1048        let tx = ConnectionWithTransaction::new(conn).await?;
1049
1050        for keyset in keysets {
1051            query(
1052                r#"
1053        INSERT INTO keyset
1054        (mint_url, id, unit, active, input_fee_ppk, final_expiry, keyset_u32)
1055        VALUES
1056        (:mint_url, :id, :unit, :active, :input_fee_ppk, :final_expiry, :keyset_u32)
1057        ON CONFLICT(id) DO UPDATE SET
1058            active = excluded.active,
1059            input_fee_ppk = excluded.input_fee_ppk
1060        "#,
1061            )?
1062            .bind("mint_url", mint_url.to_string())
1063            .bind("id", keyset.id.to_string())
1064            .bind("unit", keyset.unit.to_string())
1065            .bind("active", keyset.active)
1066            .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
1067            .bind("final_expiry", keyset.final_expiry.map(|v| v as i64))
1068            .bind("keyset_u32", u32::from(keyset.id))
1069            .execute(&tx)
1070            .await?;
1071        }
1072
1073        tx.commit().await?;
1074
1075        Ok(())
1076    }
1077
1078    #[instrument(skip_all)]
1079    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
1080        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1081
1082        let expected_version = quote.version;
1083        let new_version = expected_version.wrapping_add(1);
1084
1085        let rows_affected = query(
1086                r#"
1087    INSERT INTO mint_quote
1088    (id, mint_url, amount, unit, request, state, expiry, secret_key, payment_method, amount_issued, amount_paid, version, used_by_operation)
1089    VALUES
1090    (:id, :mint_url, :amount, :unit, :request, :state, :expiry, :secret_key, :payment_method, :amount_issued, :amount_paid, :version, :used_by_operation)
1091    ON CONFLICT(id) DO UPDATE SET
1092        mint_url = excluded.mint_url,
1093        amount = excluded.amount,
1094        unit = excluded.unit,
1095        request = excluded.request,
1096        state = excluded.state,
1097        expiry = excluded.expiry,
1098        secret_key = excluded.secret_key,
1099        payment_method = excluded.payment_method,
1100        amount_issued = excluded.amount_issued,
1101        amount_paid = excluded.amount_paid,
1102        version = :new_version,
1103        used_by_operation = excluded.used_by_operation
1104    WHERE mint_quote.version = :expected_version
1105    ;
1106            "#,
1107            )?
1108            .bind("id", quote.id.to_string())
1109            .bind("mint_url", quote.mint_url.to_string())
1110            .bind("amount", quote.amount.map(|a| a.to_i64()))
1111            .bind("unit", quote.unit.to_string())
1112            .bind("request", quote.request)
1113            .bind("state", quote.state.to_string())
1114            .bind("expiry", quote.expiry as i64)
1115            .bind("secret_key", quote.secret_key.map(|p| p.to_string()))
1116            .bind("payment_method", quote.payment_method.to_string())
1117            .bind("amount_issued", quote.amount_issued.to_i64())
1118            .bind("amount_paid", quote.amount_paid.to_i64())
1119            .bind("version", quote.version as i64)
1120            .bind("new_version", new_version as i64)
1121            .bind("expected_version", expected_version as i64)
1122            .bind("used_by_operation", quote.used_by_operation)
1123            .execute(&*conn).await?;
1124
1125        if rows_affected == 0 {
1126            return Err(database::Error::ConcurrentUpdate);
1127        }
1128
1129        Ok(())
1130    }
1131
1132    #[instrument(skip(self))]
1133    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1134        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1135
1136        query(r#"DELETE FROM mint_quote WHERE id=:id"#)?
1137            .bind("id", quote_id.to_string())
1138            .execute(&*conn)
1139            .await?;
1140
1141        Ok(())
1142    }
1143
1144    #[instrument(skip_all)]
1145    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
1146        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1147
1148        let expected_version = quote.version;
1149        let new_version = expected_version.wrapping_add(1);
1150
1151        let rows_affected = query(
1152            r#"
1153 INSERT INTO melt_quote
1154 (id, unit, amount, request, fee_reserve, state, expiry, payment_method, version, mint_url, used_by_operation)
1155 VALUES
1156 (:id, :unit, :amount, :request, :fee_reserve, :state, :expiry, :payment_method, :version, :mint_url, :used_by_operation)
1157 ON CONFLICT(id) DO UPDATE SET
1158     unit = excluded.unit,
1159     amount = excluded.amount,
1160     request = excluded.request,
1161     fee_reserve = excluded.fee_reserve,
1162     state = excluded.state,
1163     expiry = excluded.expiry,
1164     payment_method = excluded.payment_method,
1165     version = :new_version,
1166     mint_url = excluded.mint_url,
1167     used_by_operation = excluded.used_by_operation
1168 WHERE melt_quote.version = :expected_version
1169 ;
1170         "#,
1171        )?
1172        .bind("id", quote.id.to_string())
1173        .bind("unit", quote.unit.to_string())
1174        .bind("amount", u64::from(quote.amount) as i64)
1175        .bind("request", quote.request)
1176        .bind("fee_reserve", u64::from(quote.fee_reserve) as i64)
1177        .bind("state", quote.state.to_string())
1178        .bind("expiry", quote.expiry as i64)
1179        .bind("payment_method", quote.payment_method.to_string())
1180        .bind("version", quote.version as i64)
1181        .bind("new_version", new_version as i64)
1182        .bind("expected_version", expected_version as i64)
1183        .bind("mint_url", quote.mint_url.map(|m| m.to_string()))
1184        .bind("used_by_operation", quote.used_by_operation)
1185        .execute(&*conn)
1186        .await?;
1187
1188        if rows_affected == 0 {
1189            return Err(database::Error::ConcurrentUpdate);
1190        }
1191
1192        Ok(())
1193    }
1194
1195    #[instrument(skip(self))]
1196    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1197        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1198
1199        query(r#"DELETE FROM melt_quote WHERE id=:id"#)?
1200            .bind("id", quote_id.to_owned())
1201            .execute(&*conn)
1202            .await?;
1203
1204        Ok(())
1205    }
1206
1207    #[instrument(skip_all)]
1208    async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
1209        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1210
1211        keyset.verify_id()?;
1212
1213        query(
1214            r#"
1215                INSERT INTO key
1216                (id, keys, keyset_u32)
1217                VALUES
1218                (:id, :keys, :keyset_u32)
1219            "#,
1220        )?
1221        .bind("id", keyset.id.to_string())
1222        .bind(
1223            "keys",
1224            serde_json::to_string(&keyset.keys).map_err(Error::from)?,
1225        )
1226        .bind("keyset_u32", u32::from(keyset.id))
1227        .execute(&*conn)
1228        .await?;
1229
1230        Ok(())
1231    }
1232
1233    #[instrument(skip(self))]
1234    async fn remove_keys(&self, id: &Id) -> Result<(), database::Error> {
1235        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1236
1237        query(r#"DELETE FROM key WHERE id = :id"#)?
1238            .bind("id", id.to_string())
1239            .execute(&*conn)
1240            .await?;
1241
1242        Ok(())
1243    }
1244
1245    #[instrument(skip(self))]
1246    async fn remove_transaction(
1247        &self,
1248        transaction_id: TransactionId,
1249    ) -> Result<(), database::Error> {
1250        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1251
1252        query(r#"DELETE FROM transactions WHERE id=:id"#)?
1253            .bind("id", transaction_id.as_slice().to_vec())
1254            .execute(&*conn)
1255            .await?;
1256
1257        Ok(())
1258    }
1259
1260    #[instrument(skip(self))]
1261    async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
1262        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1263
1264        let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1265            Error::Database(Box::new(std::io::Error::new(
1266                std::io::ErrorKind::InvalidData,
1267                format!("Failed to serialize saga state: {}", e),
1268            )))
1269        })?;
1270
1271        let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1272            Error::Database(Box::new(std::io::Error::new(
1273                std::io::ErrorKind::InvalidData,
1274                format!("Failed to serialize saga data: {}", e),
1275            )))
1276        })?;
1277
1278        query(
1279            r#"
1280            INSERT INTO wallet_sagas
1281            (id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version)
1282            VALUES
1283            (:id, :kind, :state, :amount, :mint_url, :unit, :quote_id, :created_at, :updated_at, :data, :version)
1284            "#,
1285        )?
1286        .bind("id", saga.id.to_string())
1287        .bind("kind", saga.kind.to_string())
1288        .bind("state", state_json)
1289        .bind("amount", u64::from(saga.amount) as i64)
1290        .bind("mint_url", saga.mint_url.to_string())
1291        .bind("unit", saga.unit.to_string())
1292        .bind("quote_id", saga.quote_id)
1293        .bind("created_at", saga.created_at as i64)
1294        .bind("updated_at", saga.updated_at as i64)
1295        .bind("data", data_json)
1296        .bind("version", saga.version as i64)
1297        .execute(&*conn)
1298        .await?;
1299
1300        Ok(())
1301    }
1302
1303    #[instrument(skip(self))]
1304    async fn get_saga(
1305        &self,
1306        id: &uuid::Uuid,
1307    ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1308        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1309
1310        let rows = query(
1311            r#"
1312            SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1313            FROM wallet_sagas
1314            WHERE id = :id
1315            "#,
1316        )?
1317        .bind("id", id.to_string())
1318        .fetch_all(&*conn)
1319        .await?;
1320
1321        match rows.into_iter().next() {
1322            Some(row) => Ok(Some(sql_row_to_wallet_saga(row)?)),
1323            None => Ok(None),
1324        }
1325    }
1326
1327    #[instrument(skip(self))]
1328    async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1329        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1330
1331        let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1332            Error::Database(Box::new(std::io::Error::new(
1333                std::io::ErrorKind::InvalidData,
1334                format!("Failed to serialize saga state: {}", e),
1335            )))
1336        })?;
1337
1338        let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1339            Error::Database(Box::new(std::io::Error::new(
1340                std::io::ErrorKind::InvalidData,
1341                format!("Failed to serialize saga data: {}", e),
1342            )))
1343        })?;
1344
1345        // Optimistic locking: only update if the version matches the expected value.
1346        // The saga.version has already been incremented by the caller, so we check
1347        // for (saga.version - 1) in the WHERE clause.
1348        let expected_version = saga.version.saturating_sub(1);
1349
1350        let rows_affected = query(
1351            r#"
1352            UPDATE wallet_sagas
1353            SET kind = :kind, state = :state, amount = :amount, mint_url = :mint_url,
1354                unit = :unit, quote_id = :quote_id, updated_at = :updated_at, data = :data,
1355                version = :new_version
1356            WHERE id = :id AND version = :expected_version
1357            "#,
1358        )?
1359        .bind("id", saga.id.to_string())
1360        .bind("kind", saga.kind.to_string())
1361        .bind("state", state_json)
1362        .bind("amount", u64::from(saga.amount) as i64)
1363        .bind("mint_url", saga.mint_url.to_string())
1364        .bind("unit", saga.unit.to_string())
1365        .bind("quote_id", saga.quote_id)
1366        .bind("updated_at", saga.updated_at as i64)
1367        .bind("data", data_json)
1368        .bind("new_version", saga.version as i64)
1369        .bind("expected_version", expected_version as i64)
1370        .execute(&*conn)
1371        .await?;
1372
1373        // Return true if the update succeeded (version matched), false if version mismatch
1374        Ok(rows_affected > 0)
1375    }
1376
1377    #[instrument(skip(self))]
1378    async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1379        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1380
1381        query(r#"DELETE FROM wallet_sagas WHERE id = :id"#)?
1382            .bind("id", id.to_string())
1383            .execute(&*conn)
1384            .await?;
1385
1386        Ok(())
1387    }
1388
1389    #[instrument(skip(self))]
1390    async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1391        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1392
1393        let rows = query(
1394            r#"
1395            SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1396            FROM wallet_sagas
1397            ORDER BY created_at ASC
1398            "#,
1399        )?
1400        .fetch_all(&*conn)
1401        .await?;
1402
1403        rows.into_iter().map(sql_row_to_wallet_saga).collect()
1404    }
1405
1406    #[instrument(skip(self))]
1407    async fn reserve_proofs(
1408        &self,
1409        ys: Vec<PublicKey>,
1410        operation_id: &uuid::Uuid,
1411    ) -> Result<(), database::Error> {
1412        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1413
1414        for y in ys {
1415            let rows_affected = query(
1416                r#"
1417                UPDATE proof
1418                SET state = 'RESERVED', used_by_operation = :operation_id
1419                WHERE y = :y AND state = 'UNSPENT'
1420                "#,
1421            )?
1422            .bind("y", y.to_bytes().to_vec())
1423            .bind("operation_id", operation_id.to_string())
1424            .execute(&*conn)
1425            .await?;
1426
1427            if rows_affected == 0 {
1428                return Err(database::Error::ProofNotUnspent);
1429            }
1430        }
1431
1432        Ok(())
1433    }
1434
1435    #[instrument(skip(self))]
1436    async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1437        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1438
1439        query(
1440            r#"
1441            UPDATE proof
1442            SET state = 'UNSPENT', used_by_operation = NULL
1443            WHERE used_by_operation = :operation_id
1444            "#,
1445        )?
1446        .bind("operation_id", operation_id.to_string())
1447        .execute(&*conn)
1448        .await?;
1449
1450        Ok(())
1451    }
1452
1453    #[instrument(skip(self))]
1454    async fn get_reserved_proofs(
1455        &self,
1456        operation_id: &uuid::Uuid,
1457    ) -> Result<Vec<ProofInfo>, database::Error> {
1458        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1459
1460        let rows = query(
1461            r#"
1462            SELECT
1463                amount,
1464                unit,
1465                keyset_id,
1466                secret,
1467                c,
1468                witness,
1469                dleq_e,
1470                dleq_s,
1471                dleq_r,
1472                y,
1473                mint_url,
1474                state,
1475                spending_condition,
1476                used_by_operation,
1477                created_by_operation,
1478                p2pk_e
1479            FROM proof
1480            WHERE used_by_operation = :operation_id
1481            "#,
1482        )?
1483        .bind("operation_id", operation_id.to_string())
1484        .fetch_all(&*conn)
1485        .await?;
1486
1487        rows.into_iter().map(sql_row_to_proof_info).collect()
1488    }
1489
1490    #[instrument(skip(self))]
1491    async fn reserve_melt_quote(
1492        &self,
1493        quote_id: &str,
1494        operation_id: &uuid::Uuid,
1495    ) -> Result<(), database::Error> {
1496        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1497
1498        let rows_affected = query(
1499            r#"
1500            UPDATE melt_quote
1501            SET used_by_operation = :operation_id
1502            WHERE id = :quote_id AND used_by_operation IS NULL
1503            "#,
1504        )?
1505        .bind("operation_id", operation_id.to_string())
1506        .bind("quote_id", quote_id)
1507        .execute(&*conn)
1508        .await?;
1509
1510        if rows_affected == 0 {
1511            // Check if the quote exists
1512            let exists = query(
1513                r#"
1514                SELECT 1 FROM melt_quote WHERE id = :quote_id
1515                "#,
1516            )?
1517            .bind("quote_id", quote_id)
1518            .fetch_one(&*conn)
1519            .await?;
1520
1521            if exists.is_none() {
1522                return Err(database::Error::UnknownQuote);
1523            }
1524            return Err(database::Error::QuoteAlreadyInUse);
1525        }
1526
1527        Ok(())
1528    }
1529
1530    #[instrument(skip(self))]
1531    async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1532        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1533
1534        query(
1535            r#"
1536            UPDATE melt_quote
1537            SET used_by_operation = NULL
1538            WHERE used_by_operation = :operation_id
1539            "#,
1540        )?
1541        .bind("operation_id", operation_id.to_string())
1542        .execute(&*conn)
1543        .await?;
1544
1545        Ok(())
1546    }
1547
1548    #[instrument(skip(self))]
1549    async fn reserve_mint_quote(
1550        &self,
1551        quote_id: &str,
1552        operation_id: &uuid::Uuid,
1553    ) -> Result<(), database::Error> {
1554        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1555
1556        let rows_affected = query(
1557            r#"
1558            UPDATE mint_quote
1559            SET used_by_operation = :operation_id
1560            WHERE id = :quote_id AND used_by_operation IS NULL
1561            "#,
1562        )?
1563        .bind("operation_id", operation_id.to_string())
1564        .bind("quote_id", quote_id)
1565        .execute(&*conn)
1566        .await?;
1567
1568        if rows_affected == 0 {
1569            // Check if the quote exists
1570            let exists = query(
1571                r#"
1572                SELECT 1 FROM mint_quote WHERE id = :quote_id
1573                "#,
1574            )?
1575            .bind("quote_id", quote_id)
1576            .fetch_one(&*conn)
1577            .await?;
1578
1579            if exists.is_none() {
1580                return Err(database::Error::UnknownQuote);
1581            }
1582            return Err(database::Error::QuoteAlreadyInUse);
1583        }
1584
1585        Ok(())
1586    }
1587
1588    #[instrument(skip(self))]
1589    async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1590        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1591
1592        query(
1593            r#"
1594            UPDATE mint_quote
1595            SET used_by_operation = NULL
1596            WHERE used_by_operation = :operation_id
1597            "#,
1598        )?
1599        .bind("operation_id", operation_id.to_string())
1600        .execute(&*conn)
1601        .await?;
1602
1603        Ok(())
1604    }
1605
1606    async fn kv_read(
1607        &self,
1608        primary_namespace: &str,
1609        secondary_namespace: &str,
1610        key: &str,
1611    ) -> Result<Option<Vec<u8>>, database::Error> {
1612        crate::keyvalue::kv_read(&self.pool, primary_namespace, secondary_namespace, key).await
1613    }
1614
1615    async fn kv_list(
1616        &self,
1617        primary_namespace: &str,
1618        secondary_namespace: &str,
1619    ) -> Result<Vec<String>, database::Error> {
1620        crate::keyvalue::kv_list(&self.pool, primary_namespace, secondary_namespace).await
1621    }
1622
1623    async fn kv_write(
1624        &self,
1625        primary_namespace: &str,
1626        secondary_namespace: &str,
1627        key: &str,
1628        value: &[u8],
1629    ) -> Result<(), database::Error> {
1630        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1631        crate::keyvalue::kv_write_standalone(
1632            &*conn,
1633            primary_namespace,
1634            secondary_namespace,
1635            key,
1636            value,
1637        )
1638        .await?;
1639        Ok(())
1640    }
1641
1642    async fn kv_remove(
1643        &self,
1644        primary_namespace: &str,
1645        secondary_namespace: &str,
1646        key: &str,
1647    ) -> Result<(), database::Error> {
1648        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1649        crate::keyvalue::kv_remove_standalone(&*conn, primary_namespace, secondary_namespace, key)
1650            .await?;
1651        Ok(())
1652    }
1653
1654    // P2PK methods
1655
1656    #[instrument(skip(self))]
1657    async fn add_p2pk_key(
1658        &self,
1659        pubkey: &PublicKey,
1660        derivation_path: DerivationPath,
1661        derivation_index: u32,
1662    ) -> Result<(), Error> {
1663        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1664        let query_str = r#"
1665        INSERT INTO p2pk_signing_key (pubkey, derivation_index, derivation_path, created_time)
1666        VALUES (:pubkey, :derivation_index, :derivation_path, :created_time)
1667        "#
1668        .to_string();
1669
1670        query(&query_str)?
1671            .bind("pubkey", pubkey.to_bytes().to_vec())
1672            .bind("derivation_index", derivation_index)
1673            .bind("derivation_path", derivation_path.to_string())
1674            .bind("created_time", unix_time() as i64)
1675            .execute(&*conn)
1676            .await?;
1677
1678        Ok(())
1679    }
1680
1681    #[instrument(skip(self))]
1682    async fn get_p2pk_key(
1683        &self,
1684        pubkey: &PublicKey,
1685    ) -> Result<Option<wallet::P2PKSigningKey>, Error> {
1686        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1687        let query_str = r#"SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key WHERE pubkey = :pubkey"#.to_string();
1688
1689        query(&query_str)?
1690            .bind("pubkey", pubkey.to_bytes().to_vec())
1691            .fetch_one(&*conn)
1692            .await?
1693            .map(sql_row_to_p2pk_signing_key)
1694            .transpose()
1695    }
1696
1697    #[instrument(skip(self))]
1698    async fn list_p2pk_keys(&self) -> Result<Vec<wallet::P2PKSigningKey>, Error> {
1699        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1700        let query_str = r#"
1701        SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key ORDER BY derivation_index DESC
1702        "#.to_string();
1703
1704        Ok(query(&query_str)?
1705            .fetch_all(&*conn)
1706            .await?
1707            .into_iter()
1708            .filter_map(|row| {
1709                let row = sql_row_to_p2pk_signing_key(row).ok()?;
1710
1711                Some(row)
1712            })
1713            .collect::<Vec<wallet::P2PKSigningKey>>())
1714    }
1715
1716    #[instrument(skip(self))]
1717    async fn latest_p2pk(&self) -> Result<Option<wallet::P2PKSigningKey>, Error> {
1718        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1719        let query_str = r#"
1720        SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key ORDER BY derivation_index DESC LIMIT 1
1721        "#.to_string();
1722
1723        query(&query_str)?
1724            .fetch_one(&*conn)
1725            .await?
1726            .map(sql_row_to_p2pk_signing_key)
1727            .transpose()
1728    }
1729}
1730
1731fn sql_row_to_mint_info(row: Vec<Column>) -> Result<MintInfo, Error> {
1732    unpack_into!(
1733        let (
1734            name,
1735            pubkey,
1736            version,
1737            description,
1738            description_long,
1739            contact,
1740            nuts,
1741            icon_url,
1742            motd,
1743            urls,
1744            mint_time,
1745            tos_url
1746        ) = row
1747    );
1748
1749    Ok(MintInfo {
1750        name: column_as_nullable_string!(&name),
1751        pubkey: column_as_nullable_string!(&pubkey, |v| serde_json::from_str(v).ok(), |v| {
1752            serde_json::from_slice(v).ok()
1753        }),
1754        version: column_as_nullable_string!(&version).and_then(|v| serde_json::from_str(&v).ok()),
1755        description: column_as_nullable_string!(description),
1756        description_long: column_as_nullable_string!(description_long),
1757        contact: column_as_nullable_string!(contact, |v| serde_json::from_str(&v).ok()),
1758        nuts: column_as_nullable_string!(nuts, |v| serde_json::from_str(&v).ok())
1759            .unwrap_or_default(),
1760        urls: column_as_nullable_string!(urls, |v| serde_json::from_str(&v).ok()),
1761        icon_url: column_as_nullable_string!(icon_url),
1762        motd: column_as_nullable_string!(motd),
1763        time: column_as_nullable_number!(mint_time).map(|t| t),
1764        tos_url: column_as_nullable_string!(tos_url),
1765    })
1766}
1767
1768#[instrument(skip_all)]
1769fn sql_row_to_keyset(row: Vec<Column>) -> Result<KeySetInfo, Error> {
1770    unpack_into!(
1771        let (
1772            id,
1773            unit,
1774            active,
1775            input_fee_ppk,
1776            final_expiry
1777        ) = row
1778    );
1779
1780    Ok(KeySetInfo {
1781        id: column_as_string!(id, Id::from_str, Id::from_bytes),
1782        unit: column_as_string!(unit, CurrencyUnit::from_str),
1783        active: matches!(active, Column::Integer(1)),
1784        input_fee_ppk: column_as_nullable_number!(input_fee_ppk).unwrap_or(0),
1785        final_expiry: column_as_nullable_number!(final_expiry),
1786    })
1787}
1788
1789fn sql_row_to_mint_quote(row: Vec<Column>) -> Result<MintQuote, Error> {
1790    unpack_into!(
1791        let (
1792            id,
1793            mint_url,
1794            amount,
1795            unit,
1796            request,
1797            state,
1798            expiry,
1799            secret_key,
1800            row_method,
1801            row_amount_minted,
1802            row_amount_paid,
1803            used_by_operation,
1804            version
1805        ) = row
1806    );
1807
1808    let amount: Option<i64> = column_as_nullable_number!(amount);
1809
1810    let amount_paid: u64 = column_as_number!(row_amount_paid);
1811    let amount_minted: u64 = column_as_number!(row_amount_minted);
1812    let expiry_val: u64 = column_as_number!(expiry);
1813    let version_val: u32 = column_as_number!(version);
1814    let payment_method =
1815        PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1816
1817    Ok(MintQuote {
1818        id: column_as_string!(id),
1819        mint_url: column_as_string!(mint_url, MintUrl::from_str),
1820        amount: amount.and_then(Amount::from_i64),
1821        unit: column_as_string!(unit, CurrencyUnit::from_str),
1822        request: column_as_string!(request),
1823        state: column_as_string!(state, MintQuoteState::from_str),
1824        expiry: expiry_val,
1825        secret_key: column_as_nullable_string!(secret_key, |s| SecretKey::from_str(&s).ok()),
1826        payment_method,
1827        amount_issued: Amount::from(amount_minted),
1828        amount_paid: Amount::from(amount_paid),
1829        used_by_operation: column_as_nullable_string!(used_by_operation),
1830        version: version_val,
1831    })
1832}
1833
1834fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<wallet::MeltQuote, Error> {
1835    unpack_into!(
1836        let (
1837            id,
1838            unit,
1839            amount,
1840            request,
1841            fee_reserve,
1842            state,
1843            expiry,
1844            payment_preimage,
1845            row_method,
1846            used_by_operation,
1847            version,
1848            mint_url
1849        ) = row
1850    );
1851
1852    let payment_method =
1853        PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1854
1855    let amount_val: u64 = column_as_number!(amount);
1856    let fee_reserve_val: u64 = column_as_number!(fee_reserve);
1857    let expiry_val: u64 = column_as_number!(expiry);
1858    let version_val: u32 = column_as_number!(version);
1859
1860    Ok(wallet::MeltQuote {
1861        id: column_as_string!(id),
1862        mint_url: column_as_nullable_string!(mint_url, |s| MintUrl::from_str(&s).ok()),
1863        unit: column_as_string!(unit, CurrencyUnit::from_str),
1864        amount: Amount::from(amount_val),
1865        request: column_as_string!(request),
1866        fee_reserve: Amount::from(fee_reserve_val),
1867        state: column_as_string!(state, MeltQuoteState::from_str),
1868        expiry: expiry_val,
1869        payment_preimage: column_as_nullable_string!(payment_preimage),
1870        payment_method,
1871        used_by_operation: column_as_nullable_string!(used_by_operation),
1872        version: version_val,
1873    })
1874}
1875
1876fn sql_row_to_proof_info(row: Vec<Column>) -> Result<ProofInfo, Error> {
1877    unpack_into!(
1878        let (
1879            amount,
1880            unit,
1881            keyset_id,
1882            secret,
1883            c,
1884            witness,
1885            dleq_e,
1886            dleq_s,
1887            dleq_r,
1888            y,
1889            mint_url,
1890            state,
1891            spending_condition,
1892            used_by_operation,
1893            created_by_operation,
1894            p2pk_e
1895        ) = row
1896    );
1897
1898    let dleq = match (
1899        column_as_nullable_binary!(dleq_e),
1900        column_as_nullable_binary!(dleq_s),
1901        column_as_nullable_binary!(dleq_r),
1902    ) {
1903        (Some(e), Some(s), Some(r)) => {
1904            let e_key = SecretKey::from_slice(&e)?;
1905            let s_key = SecretKey::from_slice(&s)?;
1906            let r_key = SecretKey::from_slice(&r)?;
1907
1908            Some(ProofDleq::new(e_key, s_key, r_key))
1909        }
1910        _ => None,
1911    };
1912
1913    let amount: u64 = column_as_number!(amount);
1914    let proof = Proof {
1915        amount: Amount::from(amount),
1916        keyset_id: column_as_string!(keyset_id, Id::from_str),
1917        secret: column_as_string!(secret, Secret::from_str),
1918        witness: column_as_nullable_string!(witness, |v| { serde_json::from_str(&v).ok() }, |v| {
1919            serde_json::from_slice(&v).ok()
1920        }),
1921        c: column_as_string!(c, PublicKey::from_str, PublicKey::from_slice),
1922        dleq,
1923        p2pk_e: column_as_nullable_binary!(p2pk_e)
1924            .map(|bytes| PublicKey::from_slice(&bytes))
1925            .transpose()?,
1926    };
1927
1928    let used_by_operation =
1929        column_as_nullable_string!(used_by_operation).and_then(|id| Uuid::from_str(&id).ok());
1930    let created_by_operation =
1931        column_as_nullable_string!(created_by_operation).and_then(|id| Uuid::from_str(&id).ok());
1932
1933    Ok(ProofInfo {
1934        proof,
1935        y: column_as_string!(y, PublicKey::from_str, PublicKey::from_slice),
1936        mint_url: column_as_string!(mint_url, MintUrl::from_str),
1937        state: column_as_string!(state, State::from_str),
1938        spending_condition: column_as_nullable_string!(
1939            spending_condition,
1940            |r| { serde_json::from_str(&r).ok() },
1941            |r| { serde_json::from_slice(&r).ok() }
1942        ),
1943        unit: column_as_string!(unit, CurrencyUnit::from_str),
1944        used_by_operation,
1945        created_by_operation,
1946    })
1947}
1948
1949fn sql_row_to_wallet_saga(row: Vec<Column>) -> Result<wallet::WalletSaga, Error> {
1950    unpack_into!(
1951        let (
1952            id,
1953            kind,
1954            state,
1955            amount,
1956            mint_url,
1957            unit,
1958            quote_id,
1959            created_at,
1960            updated_at,
1961            data,
1962            version
1963        ) = row
1964    );
1965
1966    let id_str: String = column_as_string!(id);
1967    let id = uuid::Uuid::parse_str(&id_str).map_err(|e| {
1968        Error::Database(Box::new(std::io::Error::new(
1969            std::io::ErrorKind::InvalidData,
1970            format!("Invalid UUID: {}", e),
1971        )))
1972    })?;
1973    let kind_str: String = column_as_string!(kind);
1974    let state_json: String = column_as_string!(state);
1975    let amount: u64 = column_as_number!(amount);
1976    let mint_url: MintUrl = column_as_string!(mint_url, MintUrl::from_str);
1977    let unit: CurrencyUnit = column_as_string!(unit, CurrencyUnit::from_str);
1978    let quote_id: Option<String> = column_as_nullable_string!(quote_id);
1979    let created_at: u64 = column_as_number!(created_at);
1980    let updated_at: u64 = column_as_number!(updated_at);
1981    let data_json: String = column_as_string!(data);
1982    let version: u32 = column_as_number!(version);
1983
1984    let kind = wallet::OperationKind::from_str(&kind_str).map_err(|_| {
1985        Error::Database(Box::new(std::io::Error::new(
1986            std::io::ErrorKind::InvalidData,
1987            format!("Invalid operation kind: {}", kind_str),
1988        )))
1989    })?;
1990    let state: wallet::WalletSagaState = serde_json::from_str(&state_json).map_err(|e| {
1991        Error::Database(Box::new(std::io::Error::new(
1992            std::io::ErrorKind::InvalidData,
1993            format!("Failed to deserialize saga state: {}", e),
1994        )))
1995    })?;
1996    let data: wallet::OperationData = serde_json::from_str(&data_json).map_err(|e| {
1997        Error::Database(Box::new(std::io::Error::new(
1998            std::io::ErrorKind::InvalidData,
1999            format!("Failed to deserialize saga data: {}", e),
2000        )))
2001    })?;
2002
2003    Ok(wallet::WalletSaga {
2004        id,
2005        kind,
2006        state,
2007        amount: Amount::from(amount),
2008        mint_url,
2009        unit,
2010        quote_id,
2011        created_at,
2012        updated_at,
2013        data,
2014        version,
2015    })
2016}
2017
2018fn sql_row_to_transaction(row: Vec<Column>) -> Result<Transaction, Error> {
2019    unpack_into!(
2020        let (
2021            mint_url,
2022            direction,
2023            unit,
2024            amount,
2025            fee,
2026            ys,
2027            timestamp,
2028            memo,
2029            metadata,
2030            quote_id,
2031            payment_request,
2032            payment_proof,
2033            payment_method,
2034            saga_id
2035        ) = row
2036    );
2037
2038    let amount: u64 = column_as_number!(amount);
2039    let fee: u64 = column_as_number!(fee);
2040
2041    let saga_id: Option<Uuid> = column_as_nullable_string!(saga_id)
2042        .map(|id| Uuid::from_str(&id).ok())
2043        .flatten();
2044
2045    Ok(Transaction {
2046        mint_url: column_as_string!(mint_url, MintUrl::from_str),
2047        direction: column_as_string!(direction, TransactionDirection::from_str),
2048        unit: column_as_string!(unit, CurrencyUnit::from_str),
2049        amount: Amount::from(amount),
2050        fee: Amount::from(fee),
2051        ys: column_as_binary!(ys)
2052            .chunks(33)
2053            .map(PublicKey::from_slice)
2054            .collect::<Result<Vec<_>, _>>()?,
2055        timestamp: column_as_number!(timestamp),
2056        memo: column_as_nullable_string!(memo),
2057        metadata: column_as_nullable_string!(metadata, |v| serde_json::from_str(&v).ok(), |v| {
2058            serde_json::from_slice(&v).ok()
2059        })
2060        .unwrap_or_default(),
2061        quote_id: column_as_nullable_string!(quote_id),
2062        payment_request: column_as_nullable_string!(payment_request),
2063        payment_proof: column_as_nullable_string!(payment_proof),
2064        payment_method: column_as_nullable_string!(payment_method)
2065            .map(|v| PaymentMethod::from_str(&v))
2066            .transpose()
2067            .map_err(Error::from)?,
2068        saga_id,
2069    })
2070}
2071
2072fn sql_row_to_p2pk_signing_key(row: Vec<Column>) -> Result<wallet::P2PKSigningKey, Error> {
2073    unpack_into!(
2074        let (
2075            pubkey,
2076            derivation_index,
2077            derivation_path,
2078            created_time
2079        ) = row
2080    );
2081
2082    Ok(wallet::P2PKSigningKey {
2083        pubkey: column_as_string!(pubkey, PublicKey::from_str, PublicKey::from_slice),
2084        derivation_index: column_as_number!(derivation_index),
2085        derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
2086        created_time: column_as_number!(created_time),
2087    })
2088}