Skip to main content

cdk_sql_common/wallet/
mod.rs

1//! SQLite Wallet Database
2
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use bitcoin::bip32::DerivationPath;
10use cdk_common::database::{ConversionError, Error, WalletDatabase};
11use cdk_common::mint_url::MintUrl;
12use cdk_common::nuts::{MeltQuoteState, MintQuoteState};
13use cdk_common::secret::Secret;
14use cdk_common::util::unix_time;
15use cdk_common::wallet::{
16    self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
17};
18use cdk_common::{
19    database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod, Proof,
20    ProofDleq, PublicKey, SecretKey, SpendingConditions, State,
21};
22use tracing::instrument;
23use uuid::Uuid;
24
25use crate::common::migrate;
26use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
27use crate::pool::{DatabasePool, Pool, PooledResource};
28use crate::stmt::{query, Column};
29use crate::{
30    column_as_binary, column_as_nullable_binary, column_as_nullable_number,
31    column_as_nullable_string, column_as_number, column_as_string, unpack_into,
32};
33
34#[rustfmt::skip]
35mod migrations {
36    include!(concat!(env!("OUT_DIR"), "/migrations_wallet.rs"));
37}
38
39/// Wallet SQLite Database
40#[derive(Debug, Clone)]
41pub struct SQLWalletDatabase<RM>
42where
43    RM: DatabasePool + 'static,
44{
45    pool: Arc<Pool<RM>>,
46}
47
48impl<RM> SQLWalletDatabase<RM>
49where
50    RM: DatabasePool + 'static,
51{
52    /// Creates a new instance
53    pub async fn new<X>(db: X) -> Result<Self, Error>
54    where
55        X: Into<RM::Config>,
56    {
57        let pool = Pool::new(db.into());
58        Self::migrate(pool.get().await.map_err(|e| Error::Database(Box::new(e)))?).await?;
59
60        Ok(Self { pool })
61    }
62
63    /// Migrate [`WalletSqliteDatabase`]
64    async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
65        let tx = ConnectionWithTransaction::new(conn).await?;
66        migrate(&tx, RM::Connection::name(), migrations::MIGRATIONS).await?;
67        // Update any existing keys with missing keyset_u32 values
68        Self::add_keyset_u32(&tx).await?;
69        tx.commit().await?;
70
71        Ok(())
72    }
73
74    async fn add_keyset_u32<T>(conn: &T) -> Result<(), Error>
75    where
76        T: DatabaseExecutor,
77    {
78        // First get the keysets where keyset_u32 on key is null
79        let keys_without_u32: Vec<Vec<Column>> = query(
80            r#"
81            SELECT
82                id
83            FROM key
84            WHERE keyset_u32 IS NULL
85            "#,
86        )?
87        .fetch_all(conn)
88        .await?;
89
90        for row in keys_without_u32 {
91            unpack_into!(let (id) = row);
92            let id = column_as_string!(id);
93
94            if let Ok(id) = Id::from_str(&id) {
95                query(
96                    r#"
97            UPDATE
98                key
99            SET keyset_u32 = :u32_keyset
100            WHERE id = :keyset_id
101            "#,
102                )?
103                .bind("u32_keyset", u32::from(id))
104                .bind("keyset_id", id.to_string())
105                .execute(conn)
106                .await?;
107            }
108        }
109
110        // Also update keysets where keyset_u32 is null
111        let keysets_without_u32: Vec<Vec<Column>> = query(
112            r#"
113            SELECT
114                id
115            FROM keyset
116            WHERE keyset_u32 IS NULL
117            "#,
118        )?
119        .fetch_all(conn)
120        .await?;
121
122        for row in keysets_without_u32 {
123            unpack_into!(let (id) = row);
124            let id = column_as_string!(id);
125
126            if let Ok(id) = Id::from_str(&id) {
127                query(
128                    r#"
129            UPDATE
130                keyset
131            SET keyset_u32 = :u32_keyset
132            WHERE id = :keyset_id
133            "#,
134                )?
135                .bind("u32_keyset", u32::from(id))
136                .bind("keyset_id", id.to_string())
137                .execute(conn)
138                .await?;
139            }
140        }
141
142        Ok(())
143    }
144}
145
146#[async_trait]
147impl<RM> WalletDatabase<database::Error> for SQLWalletDatabase<RM>
148where
149    RM: DatabasePool + 'static,
150{
151    #[instrument(skip(self))]
152    async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
153        let conn = self
154            .pool
155            .get()
156            .await
157            .map_err(|e| Error::Database(Box::new(e)))?;
158
159        Ok(query(
160            r#"
161              SELECT
162                  id,
163                  unit,
164                  amount,
165                  request,
166                  fee_reserve,
167                  state,
168                  expiry,
169                  payment_proof,
170                  payment_method,
171                  estimated_blocks,
172                  fee_index,
173                  used_by_operation,
174                  version,
175                  mint_url
176              FROM
177                  melt_quote
178              "#,
179        )?
180        .fetch_all(&*conn)
181        .await?
182        .into_iter()
183        .map(sql_row_to_melt_quote)
184        .collect::<Result<_, _>>()?)
185    }
186
187    #[instrument(skip(self))]
188    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
189        let conn = self
190            .pool
191            .get()
192            .await
193            .map_err(|e| Error::Database(Box::new(e)))?;
194        Ok(query(
195            r#"
196            SELECT
197                name,
198                pubkey,
199                version,
200                description,
201                description_long,
202                contact,
203                nuts,
204                icon_url,
205                motd,
206                urls,
207                mint_time,
208                tos_url
209            FROM
210                mint
211            WHERE mint_url = :mint_url
212            "#,
213        )?
214        .bind("mint_url", mint_url.to_string())
215        .fetch_one(&*conn)
216        .await?
217        .map(sql_row_to_mint_info)
218        .transpose()?)
219    }
220
221    #[instrument(skip(self))]
222    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
223        let conn = self
224            .pool
225            .get()
226            .await
227            .map_err(|e| Error::Database(Box::new(e)))?;
228        Ok(query(
229            r#"
230                SELECT
231                    name,
232                    pubkey,
233                    version,
234                    description,
235                    description_long,
236                    contact,
237                    nuts,
238                    icon_url,
239                    motd,
240                    urls,
241                    mint_time,
242                    tos_url,
243                    mint_url
244                FROM
245                    mint
246                "#,
247        )?
248        .fetch_all(&*conn)
249        .await?
250        .into_iter()
251        .map(|mut row| {
252            let url = column_as_string!(
253                row.pop().ok_or(ConversionError::MissingColumn(0, 1))?,
254                MintUrl::from_str
255            );
256
257            Ok((url, sql_row_to_mint_info(row).ok()))
258        })
259        .collect::<Result<HashMap<_, _>, Error>>()?)
260    }
261
262    #[instrument(skip(self))]
263    async fn get_mint_keysets(
264        &self,
265        mint_url: MintUrl,
266    ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
267        let conn = self
268            .pool
269            .get()
270            .await
271            .map_err(|e| Error::Database(Box::new(e)))?;
272
273        let keysets = query(
274            r#"
275            SELECT
276                id,
277                unit,
278                active,
279                input_fee_ppk,
280                final_expiry
281            FROM
282                keyset
283            WHERE mint_url = :mint_url
284            "#,
285        )?
286        .bind("mint_url", mint_url.to_string())
287        .fetch_all(&*conn)
288        .await?
289        .into_iter()
290        .map(sql_row_to_keyset)
291        .collect::<Result<Vec<_>, Error>>()?;
292
293        match keysets.is_empty() {
294            false => Ok(Some(keysets)),
295            true => Ok(None),
296        }
297    }
298
299    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
300    async fn get_keyset_by_id(
301        &self,
302        keyset_id: &Id,
303    ) -> Result<Option<KeySetInfo>, database::Error> {
304        let conn = self
305            .pool
306            .get()
307            .await
308            .map_err(|e| Error::Database(Box::new(e)))?;
309        query(
310            r#"
311            SELECT
312                id,
313                unit,
314                active,
315                input_fee_ppk,
316                final_expiry
317            FROM
318                keyset
319            WHERE id = :id
320            "#,
321        )?
322        .bind("id", keyset_id.to_string())
323        .fetch_one(&*conn)
324        .await?
325        .map(sql_row_to_keyset)
326        .transpose()
327    }
328
329    #[instrument(skip(self))]
330    async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
331        let conn = self
332            .pool
333            .get()
334            .await
335            .map_err(|e| Error::Database(Box::new(e)))?;
336        query(
337            r#"
338            SELECT
339                id,
340                mint_url,
341                amount,
342                unit,
343                request,
344                state,
345                expiry,
346                secret_key,
347                payment_method,
348                amount_issued,
349                amount_paid,
350                estimated_blocks,
351                used_by_operation,
352                version
353            FROM
354                mint_quote
355            WHERE
356                id = :id
357            "#,
358        )?
359        .bind("id", quote_id.to_string())
360        .fetch_one(&*conn)
361        .await?
362        .map(sql_row_to_mint_quote)
363        .transpose()
364    }
365
366    #[instrument(skip(self))]
367    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
368        let conn = self
369            .pool
370            .get()
371            .await
372            .map_err(|e| Error::Database(Box::new(e)))?;
373        Ok(query(
374            r#"
375            SELECT
376                id,
377                mint_url,
378                amount,
379                unit,
380                request,
381                state,
382                expiry,
383                secret_key,
384                payment_method,
385                amount_issued,
386                amount_paid,
387                estimated_blocks,
388                used_by_operation,
389                version
390            FROM
391                mint_quote
392            "#,
393        )?
394        .fetch_all(&*conn)
395        .await?
396        .into_iter()
397        .map(sql_row_to_mint_quote)
398        .collect::<Result<_, _>>()?)
399    }
400
401    #[instrument(skip(self))]
402    async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
403        let conn = self
404            .pool
405            .get()
406            .await
407            .map_err(|e| Error::Database(Box::new(e)))?;
408        Ok(query(
409            r#"
410            SELECT
411                id,
412                mint_url,
413                amount,
414                unit,
415                request,
416                state,
417                expiry,
418                secret_key,
419                payment_method,
420                amount_issued,
421                amount_paid,
422                estimated_blocks,
423                used_by_operation,
424                version
425            FROM
426                mint_quote
427            WHERE
428                amount_issued = 0
429                OR
430                payment_method = 'bolt12'
431            "#,
432        )?
433        .fetch_all(&*conn)
434        .await?
435        .into_iter()
436        .map(sql_row_to_mint_quote)
437        .collect::<Result<_, _>>()?)
438    }
439
440    #[instrument(skip(self))]
441    async fn get_melt_quote(
442        &self,
443        quote_id: &str,
444    ) -> Result<Option<wallet::MeltQuote>, database::Error> {
445        let conn = self
446            .pool
447            .get()
448            .await
449            .map_err(|e| Error::Database(Box::new(e)))?;
450        query(
451            r#"
452            SELECT
453                id,
454                unit,
455                amount,
456                request,
457                fee_reserve,
458                state,
459                expiry,
460                payment_proof,
461                payment_method,
462                estimated_blocks,
463                fee_index,
464                used_by_operation,
465                version,
466                mint_url
467            FROM
468                melt_quote
469            WHERE
470                id=:id
471            "#,
472        )?
473        .bind("id", quote_id.to_owned())
474        .fetch_one(&*conn)
475        .await?
476        .map(sql_row_to_melt_quote)
477        .transpose()
478    }
479
480    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
481    async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
482        let conn = self
483            .pool
484            .get()
485            .await
486            .map_err(|e| Error::Database(Box::new(e)))?;
487        query(
488            r#"
489            SELECT
490                keys
491            FROM key
492            WHERE id = :id
493            "#,
494        )?
495        .bind("id", keyset_id.to_string())
496        .pluck(&*conn)
497        .await?
498        .map(|keys| {
499            let keys = column_as_string!(keys);
500            serde_json::from_str(&keys).map_err(Error::from)
501        })
502        .transpose()
503    }
504
505    #[instrument(skip(self, state, spending_conditions))]
506    async fn get_proofs(
507        &self,
508        mint_url: Option<MintUrl>,
509        unit: Option<CurrencyUnit>,
510        state: Option<Vec<State>>,
511        spending_conditions: Option<Vec<SpendingConditions>>,
512    ) -> Result<Vec<ProofInfo>, database::Error> {
513        let conn = self
514            .pool
515            .get()
516            .await
517            .map_err(|e| Error::Database(Box::new(e)))?;
518        Ok(query(
519            r#"
520            SELECT
521                amount,
522                unit,
523                keyset_id,
524                secret,
525                c,
526                witness,
527                dleq_e,
528                dleq_s,
529                dleq_r,
530                y,
531                mint_url,
532                state,
533                spending_condition,
534                used_by_operation,
535                created_by_operation,
536                p2pk_e
537            FROM proof
538            "#,
539        )?
540        .fetch_all(&*conn)
541        .await?
542        .into_iter()
543        .filter_map(|row| {
544            let row = sql_row_to_proof_info(row).ok()?;
545
546            if row.matches_conditions(&mint_url, &unit, &state, &spending_conditions) {
547                Some(row)
548            } else {
549                None
550            }
551        })
552        .collect::<Vec<_>>())
553    }
554
555    #[instrument(skip(self, ys))]
556    async fn get_proofs_by_ys(
557        &self,
558        ys: Vec<PublicKey>,
559    ) -> Result<Vec<ProofInfo>, database::Error> {
560        let conn = self
561            .pool
562            .get()
563            .await
564            .map_err(|e| Error::Database(Box::new(e)))?;
565        Ok(query(
566            r#"
567            SELECT
568                amount,
569                unit,
570                keyset_id,
571                secret,
572                c,
573                witness,
574                dleq_e,
575                dleq_s,
576                dleq_r,
577                y,
578                mint_url,
579                state,
580                spending_condition,
581                used_by_operation,
582                created_by_operation,
583                p2pk_e
584            FROM proof
585            WHERE y IN (:ys)
586        "#,
587        )?
588        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
589        .fetch_all(&*conn)
590        .await?
591        .into_iter()
592        .filter_map(|row| sql_row_to_proof_info(row).ok())
593        .collect::<Vec<_>>())
594    }
595
596    async fn get_balance(
597        &self,
598        mint_url: Option<MintUrl>,
599        unit: Option<CurrencyUnit>,
600        states: Option<Vec<State>>,
601    ) -> Result<u64, database::Error> {
602        let conn = self
603            .pool
604            .get()
605            .await
606            .map_err(|e| Error::Database(Box::new(e)))?;
607
608        let mut query_str = "SELECT COALESCE(SUM(amount), 0) as total FROM proof".to_string();
609        let mut where_clauses = Vec::new();
610        let states = states
611            .unwrap_or_default()
612            .into_iter()
613            .map(|x| x.to_string())
614            .collect::<Vec<_>>();
615
616        if mint_url.is_some() {
617            where_clauses.push("mint_url = :mint_url");
618        }
619        if unit.is_some() {
620            where_clauses.push("unit = :unit");
621        }
622        if !states.is_empty() {
623            where_clauses.push("state IN (:states)");
624        }
625
626        if !where_clauses.is_empty() {
627            query_str.push_str(" WHERE ");
628            query_str.push_str(&where_clauses.join(" AND "));
629        }
630
631        let mut q = query(&query_str)?;
632
633        if let Some(ref mint_url) = mint_url {
634            q = q.bind("mint_url", mint_url.to_string());
635        }
636        if let Some(ref unit) = unit {
637            q = q.bind("unit", unit.to_string());
638        }
639
640        if !states.is_empty() {
641            q = q.bind_vec("states", states)?;
642        }
643
644        let balance = q
645            .pluck(&*conn)
646            .await?
647            .map(|n| {
648                // SQLite SUM returns INTEGER which we need to convert to u64
649                match n {
650                    crate::stmt::Column::Integer(i) => Ok(i as u64),
651                    crate::stmt::Column::Real(f) => Ok(f as u64),
652                    _ => Err(Error::Database(Box::new(std::io::Error::new(
653                        std::io::ErrorKind::InvalidData,
654                        "Invalid balance type",
655                    )))),
656                }
657            })
658            .transpose()?
659            .unwrap_or(0);
660
661        Ok(balance)
662    }
663
664    #[instrument(skip(self))]
665    async fn get_transaction(
666        &self,
667        transaction_id: TransactionId,
668    ) -> Result<Option<Transaction>, database::Error> {
669        let conn = self
670            .pool
671            .get()
672            .await
673            .map_err(|e| Error::Database(Box::new(e)))?;
674        Ok(query(
675            r#"
676            SELECT
677                mint_url,
678                direction,
679                unit,
680                amount,
681                fee,
682                ys,
683                timestamp,
684                memo,
685                metadata,
686                quote_id,
687                payment_request,
688                payment_proof,
689                payment_method,
690                saga_id
691            FROM
692                transactions
693            WHERE
694                id = :id
695            "#,
696        )?
697        .bind("id", transaction_id.as_slice().to_vec())
698        .fetch_one(&*conn)
699        .await?
700        .map(sql_row_to_transaction)
701        .transpose()?)
702    }
703
704    #[instrument(skip(self))]
705    async fn list_transactions(
706        &self,
707        mint_url: Option<MintUrl>,
708        direction: Option<TransactionDirection>,
709        unit: Option<CurrencyUnit>,
710    ) -> Result<Vec<Transaction>, database::Error> {
711        let conn = self
712            .pool
713            .get()
714            .await
715            .map_err(|e| Error::Database(Box::new(e)))?;
716
717        Ok(query(
718            r#"
719            SELECT
720                mint_url,
721                direction,
722                unit,
723                amount,
724                fee,
725                ys,
726                timestamp,
727                memo,
728                metadata,
729                quote_id,
730                payment_request,
731                payment_proof,
732                payment_method,
733                saga_id
734            FROM
735                transactions
736            "#,
737        )?
738        .fetch_all(&*conn)
739        .await?
740        .into_iter()
741        .filter_map(|row| {
742            // TODO: Avoid a table scan by passing the heavy lifting of checking to the DB engine
743            let transaction = sql_row_to_transaction(row).ok()?;
744            if transaction.matches_conditions(&mint_url, &direction, &unit) {
745                Some(transaction)
746            } else {
747                None
748            }
749        })
750        .collect::<Vec<_>>())
751    }
752
753    async fn update_proofs(
754        &self,
755        added: Vec<ProofInfo>,
756        removed_ys: Vec<PublicKey>,
757    ) -> Result<(), database::Error> {
758        let conn = self
759            .pool
760            .get()
761            .await
762            .map_err(|e| Error::Database(Box::new(e)))?;
763        let tx = ConnectionWithTransaction::new(conn).await?;
764
765        for proof in added {
766            query(
767                r#"
768    INSERT INTO proof
769    (y, mint_url, state, spending_condition, unit, amount, keyset_id, secret, c, witness, dleq_e, dleq_s, dleq_r, used_by_operation, created_by_operation, p2pk_e)
770    VALUES
771    (:y, :mint_url, :state, :spending_condition, :unit, :amount, :keyset_id, :secret, :c, :witness, :dleq_e, :dleq_s, :dleq_r, :used_by_operation, :created_by_operation, :p2pk_e)
772    ON CONFLICT(y) DO UPDATE SET
773        mint_url = excluded.mint_url,
774        state = excluded.state,
775        spending_condition = excluded.spending_condition,
776        unit = excluded.unit,
777        amount = excluded.amount,
778        keyset_id = excluded.keyset_id,
779        secret = excluded.secret,
780        c = excluded.c,
781        witness = excluded.witness,
782        dleq_e = excluded.dleq_e,
783        dleq_s = excluded.dleq_s,
784        dleq_r = excluded.dleq_r,
785        used_by_operation = excluded.used_by_operation,
786        created_by_operation = excluded.created_by_operation,
787        p2pk_e = excluded.p2pk_e
788    ;
789            "#,
790            )?
791            .bind("y", proof.y.to_bytes().to_vec())
792            .bind("mint_url", proof.mint_url.to_string())
793            .bind("state", proof.state.to_string())
794            .bind(
795                "spending_condition",
796                proof
797                    .spending_condition
798                    .map(|s| serde_json::to_string(&s).ok()),
799            )
800            .bind("unit", proof.unit.to_string())
801            .bind("amount", u64::from(proof.proof.amount) as i64)
802            .bind("keyset_id", proof.proof.keyset_id.to_string())
803            .bind("secret", proof.proof.secret.to_string())
804            .bind("c", proof.proof.c.to_bytes().to_vec())
805            .bind(
806                "witness",
807                proof
808                    .proof
809                    .witness
810                    .and_then(|w| serde_json::to_string(&w).ok()),
811            )
812            .bind(
813                "dleq_e",
814                proof.proof.dleq.as_ref().map(|dleq| dleq.e.to_secret_bytes().to_vec()),
815            )
816            .bind(
817                "dleq_s",
818                proof.proof.dleq.as_ref().map(|dleq| dleq.s.to_secret_bytes().to_vec()),
819            )
820            .bind(
821                "dleq_r",
822                proof.proof.dleq.as_ref().map(|dleq| dleq.r.to_secret_bytes().to_vec()),
823            )
824            .bind("used_by_operation", proof.used_by_operation.map(|id| id.to_string()))
825            .bind("created_by_operation", proof.created_by_operation.map(|id| id.to_string()))
826            .bind(
827                "p2pk_e",
828                proof
829                    .proof
830                    .p2pk_e
831                    .as_ref()
832                    .map(|pk| pk.to_bytes().to_vec()),
833            )
834            .execute(&tx)
835            .await?;
836        }
837
838        if !removed_ys.is_empty() {
839            query(r#"DELETE FROM proof WHERE y IN (:ys)"#)?
840                .bind_vec(
841                    "ys",
842                    removed_ys.iter().map(|y| y.to_bytes().to_vec()).collect(),
843                )?
844                .execute(&tx)
845                .await?;
846        }
847
848        tx.commit().await?;
849
850        Ok(())
851    }
852
853    #[instrument(skip(self))]
854    async fn update_proofs_state(
855        &self,
856        ys: Vec<PublicKey>,
857        state: State,
858    ) -> Result<(), database::Error> {
859        let conn = self
860            .pool
861            .get()
862            .await
863            .map_err(|e| Error::Database(Box::new(e)))?;
864
865        query("UPDATE proof SET state = :state WHERE y IN (:ys)")?
866            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
867            .bind("state", state.to_string())
868            .execute(&*conn)
869            .await?;
870
871        Ok(())
872    }
873
874    #[instrument(skip(self))]
875    async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
876        let conn = self
877            .pool
878            .get()
879            .await
880            .map_err(|e| Error::Database(Box::new(e)))?;
881
882        let mint_url = transaction.mint_url.to_string();
883        let direction = transaction.direction.to_string();
884        let unit = transaction.unit.to_string();
885        let amount = u64::from(transaction.amount) as i64;
886        let fee = u64::from(transaction.fee) as i64;
887        let ys = transaction
888            .ys
889            .iter()
890            .flat_map(|y| y.to_bytes().to_vec())
891            .collect::<Vec<_>>();
892
893        let id = transaction.id();
894
895        query(
896               r#"
897   INSERT INTO transactions
898   (id, mint_url, direction, unit, amount, fee, ys, timestamp, memo, metadata, quote_id, payment_request, payment_proof, payment_method, saga_id)
899   VALUES
900   (:id, :mint_url, :direction, :unit, :amount, :fee, :ys, :timestamp, :memo, :metadata, :quote_id, :payment_request, :payment_proof, :payment_method, :saga_id)
901   ON CONFLICT(id) DO UPDATE SET
902       mint_url = excluded.mint_url,
903       direction = excluded.direction,
904       unit = excluded.unit,
905       amount = excluded.amount,
906       fee = excluded.fee,
907       timestamp = excluded.timestamp,
908       memo = excluded.memo,
909       metadata = excluded.metadata,
910       quote_id = excluded.quote_id,
911       payment_request = excluded.payment_request,
912       payment_proof = excluded.payment_proof,
913       payment_method = excluded.payment_method,
914       saga_id = excluded.saga_id
915   ;
916           "#,
917           )?
918           .bind("id", id.as_slice().to_vec())
919           .bind("mint_url", mint_url)
920           .bind("direction", direction)
921           .bind("unit", unit)
922           .bind("amount", amount)
923           .bind("fee", fee)
924           .bind("ys", ys)
925           .bind("timestamp", transaction.timestamp as i64)
926           .bind("memo", transaction.memo)
927           .bind(
928               "metadata",
929               serde_json::to_string(&transaction.metadata).map_err(Error::from)?,
930           )
931           .bind("quote_id", transaction.quote_id)
932           .bind("payment_request", transaction.payment_request)
933           .bind("payment_proof", transaction.payment_proof)
934           .bind("payment_method", transaction.payment_method.map(|pm| pm.to_string()))
935           .bind("saga_id", transaction.saga_id.map(|id| id.to_string()))
936           .execute(&*conn)
937           .await?;
938
939        Ok(())
940    }
941
942    #[instrument(skip(self))]
943    async fn update_mint_url(
944        &self,
945        old_mint_url: MintUrl,
946        new_mint_url: MintUrl,
947    ) -> Result<(), database::Error> {
948        let conn = self
949            .pool
950            .get()
951            .await
952            .map_err(|e| Error::Database(Box::new(e)))?;
953        let tx = ConnectionWithTransaction::new(conn).await?;
954        let tables = ["mint_quote", "proof"];
955
956        for table in &tables {
957            query(&format!(
958                r#"
959                UPDATE {table}
960                SET mint_url = :new_mint_url
961                WHERE mint_url = :old_mint_url
962            "#
963            ))?
964            .bind("new_mint_url", new_mint_url.to_string())
965            .bind("old_mint_url", old_mint_url.to_string())
966            .execute(&tx)
967            .await?;
968        }
969
970        tx.commit().await?;
971
972        Ok(())
973    }
974
975    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
976    async fn increment_keyset_counter(
977        &self,
978        keyset_id: &Id,
979        count: u32,
980    ) -> Result<u32, database::Error> {
981        let conn = self
982            .pool
983            .get()
984            .await
985            .map_err(|e| Error::Database(Box::new(e)))?;
986
987        let new_counter = query(
988            r#"
989            INSERT INTO keyset_counter (keyset_id, counter)
990            VALUES (:keyset_id, :count)
991            ON CONFLICT(keyset_id) DO UPDATE SET
992                counter = keyset_counter.counter + :count
993            RETURNING counter
994            "#,
995        )?
996        .bind("keyset_id", keyset_id.to_string())
997        .bind("count", count)
998        .pluck(&*conn)
999        .await?
1000        .map(|n| Ok::<_, Error>(column_as_number!(n)))
1001        .transpose()?
1002        .ok_or_else(|| Error::Internal("Counter update returned no value".to_owned()))?;
1003
1004        Ok(new_counter)
1005    }
1006
1007    #[instrument(skip(self, mint_info))]
1008    async fn add_mint(
1009        &self,
1010        mint_url: MintUrl,
1011        mint_info: Option<MintInfo>,
1012    ) -> Result<(), database::Error> {
1013        let conn = self
1014            .pool
1015            .get()
1016            .await
1017            .map_err(|e| Error::Database(Box::new(e)))?;
1018
1019        let (
1020            name,
1021            pubkey,
1022            version,
1023            description,
1024            description_long,
1025            contact,
1026            nuts,
1027            icon_url,
1028            urls,
1029            motd,
1030            time,
1031            tos_url,
1032        ) = match mint_info {
1033            Some(mint_info) => {
1034                let MintInfo {
1035                    name,
1036                    pubkey,
1037                    version,
1038                    description,
1039                    description_long,
1040                    contact,
1041                    nuts,
1042                    icon_url,
1043                    urls,
1044                    motd,
1045                    time,
1046                    tos_url,
1047                } = mint_info;
1048
1049                (
1050                    name,
1051                    pubkey.map(|p| p.to_bytes().to_vec()),
1052                    version.map(|v| serde_json::to_string(&v).ok()),
1053                    description,
1054                    description_long,
1055                    contact.map(|c| serde_json::to_string(&c).ok()),
1056                    serde_json::to_string(&nuts).ok(),
1057                    icon_url,
1058                    urls.map(|c| serde_json::to_string(&c).ok()),
1059                    motd,
1060                    time,
1061                    tos_url,
1062                )
1063            }
1064            None => (
1065                None, None, None, None, None, None, None, None, None, None, None, None,
1066            ),
1067        };
1068
1069        query(
1070            r#"
1071   INSERT INTO mint
1072   (
1073       mint_url, name, pubkey, version, description, description_long,
1074       contact, nuts, icon_url, urls, motd, mint_time, tos_url
1075   )
1076   VALUES
1077   (
1078       :mint_url, :name, :pubkey, :version, :description, :description_long,
1079       :contact, :nuts, :icon_url, :urls, :motd, :mint_time, :tos_url
1080   )
1081   ON CONFLICT(mint_url) DO UPDATE SET
1082       name = excluded.name,
1083       pubkey = excluded.pubkey,
1084       version = excluded.version,
1085       description = excluded.description,
1086       description_long = excluded.description_long,
1087       contact = excluded.contact,
1088       nuts = excluded.nuts,
1089       icon_url = excluded.icon_url,
1090       urls = excluded.urls,
1091       motd = excluded.motd,
1092       mint_time = excluded.mint_time,
1093       tos_url = excluded.tos_url
1094   ;
1095           "#,
1096        )?
1097        .bind("mint_url", mint_url.to_string())
1098        .bind("name", name)
1099        .bind("pubkey", pubkey)
1100        .bind("version", version)
1101        .bind("description", description)
1102        .bind("description_long", description_long)
1103        .bind("contact", contact)
1104        .bind("nuts", nuts)
1105        .bind("icon_url", icon_url)
1106        .bind("urls", urls)
1107        .bind("motd", motd)
1108        .bind("mint_time", time.map(|v| v as i64))
1109        .bind("tos_url", tos_url)
1110        .execute(&*conn)
1111        .await?;
1112
1113        Ok(())
1114    }
1115
1116    #[instrument(skip(self))]
1117    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
1118        let conn = self
1119            .pool
1120            .get()
1121            .await
1122            .map_err(|e| Error::Database(Box::new(e)))?;
1123
1124        query(r#"DELETE FROM mint WHERE mint_url=:mint_url"#)?
1125            .bind("mint_url", mint_url.to_string())
1126            .execute(&*conn)
1127            .await?;
1128
1129        Ok(())
1130    }
1131
1132    #[instrument(skip(self, keysets))]
1133    async fn add_mint_keysets(
1134        &self,
1135        mint_url: MintUrl,
1136        keysets: Vec<KeySetInfo>,
1137    ) -> Result<(), database::Error> {
1138        let conn = self
1139            .pool
1140            .get()
1141            .await
1142            .map_err(|e| Error::Database(Box::new(e)))?;
1143        let tx = ConnectionWithTransaction::new(conn).await?;
1144
1145        for keyset in keysets {
1146            query(
1147                r#"
1148        INSERT INTO keyset
1149        (mint_url, id, unit, active, input_fee_ppk, final_expiry, keyset_u32)
1150        VALUES
1151        (:mint_url, :id, :unit, :active, :input_fee_ppk, :final_expiry, :keyset_u32)
1152        ON CONFLICT(id) DO UPDATE SET
1153            active = excluded.active,
1154            input_fee_ppk = excluded.input_fee_ppk
1155        "#,
1156            )?
1157            .bind("mint_url", mint_url.to_string())
1158            .bind("id", keyset.id.to_string())
1159            .bind("unit", keyset.unit.to_string())
1160            .bind("active", keyset.active)
1161            .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
1162            .bind("final_expiry", keyset.final_expiry.map(|v| v as i64))
1163            .bind("keyset_u32", u32::from(keyset.id))
1164            .execute(&tx)
1165            .await?;
1166        }
1167
1168        tx.commit().await?;
1169
1170        Ok(())
1171    }
1172
1173    #[instrument(skip_all)]
1174    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
1175        let conn = self
1176            .pool
1177            .get()
1178            .await
1179            .map_err(|e| Error::Database(Box::new(e)))?;
1180
1181        let expected_version = quote.version;
1182        let new_version = expected_version.wrapping_add(1);
1183
1184        let rows_affected = query(
1185                r#"
1186    INSERT INTO mint_quote
1187    (id, mint_url, amount, unit, request, state, expiry, secret_key, payment_method, amount_issued, amount_paid, estimated_blocks, version, used_by_operation)
1188    VALUES
1189    (:id, :mint_url, :amount, :unit, :request, :state, :expiry, :secret_key, :payment_method, :amount_issued, :amount_paid, :estimated_blocks, :version, :used_by_operation)
1190    ON CONFLICT(id) DO UPDATE SET
1191        mint_url = excluded.mint_url,
1192        amount = excluded.amount,
1193        unit = excluded.unit,
1194        request = excluded.request,
1195        state = excluded.state,
1196        expiry = excluded.expiry,
1197        secret_key = excluded.secret_key,
1198        payment_method = excluded.payment_method,
1199        amount_issued = excluded.amount_issued,
1200        amount_paid = excluded.amount_paid,
1201        estimated_blocks = excluded.estimated_blocks,
1202        version = :new_version,
1203        used_by_operation = excluded.used_by_operation
1204    WHERE mint_quote.version = :expected_version
1205    ;
1206            "#,
1207            )?
1208            .bind("id", quote.id.to_string())
1209            .bind("mint_url", quote.mint_url.to_string())
1210            .bind("amount", quote.amount.map(|a| a.to_i64()))
1211            .bind("unit", quote.unit.to_string())
1212            .bind("request", quote.request)
1213            .bind("state", quote.state.to_string())
1214            .bind("expiry", quote.expiry as i64)
1215            .bind("secret_key", quote.secret_key.map(|p| p.to_string()))
1216            .bind("payment_method", quote.payment_method.to_string())
1217            .bind("amount_issued", quote.amount_issued.to_i64())
1218            .bind("amount_paid", quote.amount_paid.to_i64())
1219            .bind("estimated_blocks", quote.estimated_blocks.map(i64::from))
1220            .bind("version", quote.version as i64)
1221            .bind("new_version", new_version as i64)
1222            .bind("expected_version", expected_version as i64)
1223            .bind("used_by_operation", quote.used_by_operation)
1224            .execute(&*conn).await?;
1225
1226        if rows_affected == 0 {
1227            return Err(database::Error::ConcurrentUpdate);
1228        }
1229
1230        Ok(())
1231    }
1232
1233    #[instrument(skip(self))]
1234    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1235        let conn = self
1236            .pool
1237            .get()
1238            .await
1239            .map_err(|e| Error::Database(Box::new(e)))?;
1240
1241        query(r#"DELETE FROM mint_quote WHERE id=:id"#)?
1242            .bind("id", quote_id.to_string())
1243            .execute(&*conn)
1244            .await?;
1245
1246        Ok(())
1247    }
1248
1249    #[instrument(skip_all)]
1250    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
1251        let conn = self
1252            .pool
1253            .get()
1254            .await
1255            .map_err(|e| Error::Database(Box::new(e)))?;
1256
1257        let expected_version = quote.version;
1258        let new_version = expected_version.wrapping_add(1);
1259
1260        let rows_affected = query(
1261            r#"
1262 INSERT INTO melt_quote
1263 (id, unit, amount, request, fee_reserve, state, expiry, payment_method, estimated_blocks, fee_index, version, mint_url, used_by_operation)
1264 VALUES
1265 (:id, :unit, :amount, :request, :fee_reserve, :state, :expiry, :payment_method, :estimated_blocks, :fee_index, :version, :mint_url, :used_by_operation)
1266 ON CONFLICT(id) DO UPDATE SET
1267     unit = excluded.unit,
1268     amount = excluded.amount,
1269     request = excluded.request,
1270     fee_reserve = excluded.fee_reserve,
1271     state = excluded.state,
1272     expiry = excluded.expiry,
1273     payment_method = excluded.payment_method,
1274     estimated_blocks = excluded.estimated_blocks,
1275     fee_index = excluded.fee_index,
1276     version = :new_version,
1277     mint_url = excluded.mint_url,
1278     used_by_operation = excluded.used_by_operation
1279 WHERE melt_quote.version = :expected_version
1280 ;
1281         "#,
1282        )?
1283        .bind("id", quote.id.to_string())
1284        .bind("unit", quote.unit.to_string())
1285        .bind("amount", u64::from(quote.amount) as i64)
1286        .bind("request", quote.request)
1287        .bind("fee_reserve", u64::from(quote.fee_reserve) as i64)
1288        .bind("state", quote.state.to_string())
1289        .bind("expiry", quote.expiry as i64)
1290        .bind("payment_method", quote.payment_method.to_string())
1291        .bind("estimated_blocks", quote.estimated_blocks.map(i64::from))
1292        .bind("fee_index", quote.fee_index.map(i64::from))
1293        .bind("version", quote.version as i64)
1294        .bind("new_version", new_version as i64)
1295        .bind("expected_version", expected_version as i64)
1296        .bind("mint_url", quote.mint_url.map(|m| m.to_string()))
1297        .bind("used_by_operation", quote.used_by_operation)
1298        .execute(&*conn)
1299        .await?;
1300
1301        if rows_affected == 0 {
1302            return Err(database::Error::ConcurrentUpdate);
1303        }
1304
1305        Ok(())
1306    }
1307
1308    #[instrument(skip(self))]
1309    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1310        let conn = self
1311            .pool
1312            .get()
1313            .await
1314            .map_err(|e| Error::Database(Box::new(e)))?;
1315
1316        query(r#"DELETE FROM melt_quote WHERE id=:id"#)?
1317            .bind("id", quote_id.to_owned())
1318            .execute(&*conn)
1319            .await?;
1320
1321        Ok(())
1322    }
1323
1324    #[instrument(skip_all)]
1325    async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
1326        let conn = self
1327            .pool
1328            .get()
1329            .await
1330            .map_err(|e| Error::Database(Box::new(e)))?;
1331
1332        keyset.verify_id()?;
1333
1334        query(
1335            r#"
1336                INSERT INTO key
1337                (id, keys, keyset_u32)
1338                VALUES
1339                (:id, :keys, :keyset_u32)
1340            "#,
1341        )?
1342        .bind("id", keyset.id.to_string())
1343        .bind(
1344            "keys",
1345            serde_json::to_string(&keyset.keys).map_err(Error::from)?,
1346        )
1347        .bind("keyset_u32", u32::from(keyset.id))
1348        .execute(&*conn)
1349        .await?;
1350
1351        Ok(())
1352    }
1353
1354    #[instrument(skip(self))]
1355    async fn remove_keys(&self, id: &Id) -> Result<(), database::Error> {
1356        let conn = self
1357            .pool
1358            .get()
1359            .await
1360            .map_err(|e| Error::Database(Box::new(e)))?;
1361
1362        query(r#"DELETE FROM key WHERE id = :id"#)?
1363            .bind("id", id.to_string())
1364            .execute(&*conn)
1365            .await?;
1366
1367        Ok(())
1368    }
1369
1370    #[instrument(skip(self))]
1371    async fn remove_transaction(
1372        &self,
1373        transaction_id: TransactionId,
1374    ) -> Result<(), database::Error> {
1375        let conn = self
1376            .pool
1377            .get()
1378            .await
1379            .map_err(|e| Error::Database(Box::new(e)))?;
1380
1381        query(r#"DELETE FROM transactions WHERE id=:id"#)?
1382            .bind("id", transaction_id.as_slice().to_vec())
1383            .execute(&*conn)
1384            .await?;
1385
1386        Ok(())
1387    }
1388
1389    #[instrument(skip(self))]
1390    async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
1391        let conn = self
1392            .pool
1393            .get()
1394            .await
1395            .map_err(|e| Error::Database(Box::new(e)))?;
1396
1397        let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1398            Error::Database(Box::new(std::io::Error::new(
1399                std::io::ErrorKind::InvalidData,
1400                format!("Failed to serialize saga state: {}", e),
1401            )))
1402        })?;
1403
1404        let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1405            Error::Database(Box::new(std::io::Error::new(
1406                std::io::ErrorKind::InvalidData,
1407                format!("Failed to serialize saga data: {}", e),
1408            )))
1409        })?;
1410
1411        query(
1412            r#"
1413            INSERT INTO wallet_sagas
1414            (id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version)
1415            VALUES
1416            (:id, :kind, :state, :amount, :mint_url, :unit, :quote_id, :created_at, :updated_at, :data, :version)
1417            "#,
1418        )?
1419        .bind("id", saga.id.to_string())
1420        .bind("kind", saga.kind.to_string())
1421        .bind("state", state_json)
1422        .bind("amount", u64::from(saga.amount) as i64)
1423        .bind("mint_url", saga.mint_url.to_string())
1424        .bind("unit", saga.unit.to_string())
1425        .bind("quote_id", saga.quote_id)
1426        .bind("created_at", saga.created_at as i64)
1427        .bind("updated_at", saga.updated_at as i64)
1428        .bind("data", data_json)
1429        .bind("version", saga.version as i64)
1430        .execute(&*conn)
1431        .await?;
1432
1433        Ok(())
1434    }
1435
1436    #[instrument(skip(self))]
1437    async fn get_saga(
1438        &self,
1439        id: &uuid::Uuid,
1440    ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1441        let conn = self
1442            .pool
1443            .get()
1444            .await
1445            .map_err(|e| Error::Database(Box::new(e)))?;
1446
1447        let rows = query(
1448            r#"
1449            SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1450            FROM wallet_sagas
1451            WHERE id = :id
1452            "#,
1453        )?
1454        .bind("id", id.to_string())
1455        .fetch_all(&*conn)
1456        .await?;
1457
1458        match rows.into_iter().next() {
1459            Some(row) => Ok(Some(sql_row_to_wallet_saga(row)?)),
1460            None => Ok(None),
1461        }
1462    }
1463
1464    #[instrument(skip(self))]
1465    async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1466        let conn = self
1467            .pool
1468            .get()
1469            .await
1470            .map_err(|e| Error::Database(Box::new(e)))?;
1471
1472        let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1473            Error::Database(Box::new(std::io::Error::new(
1474                std::io::ErrorKind::InvalidData,
1475                format!("Failed to serialize saga state: {}", e),
1476            )))
1477        })?;
1478
1479        let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1480            Error::Database(Box::new(std::io::Error::new(
1481                std::io::ErrorKind::InvalidData,
1482                format!("Failed to serialize saga data: {}", e),
1483            )))
1484        })?;
1485
1486        // Optimistic locking: only update if the version matches the expected value.
1487        // The saga.version has already been incremented by the caller, so we check
1488        // for (saga.version - 1) in the WHERE clause.
1489        let expected_version = saga.version.saturating_sub(1);
1490
1491        let rows_affected = query(
1492            r#"
1493            UPDATE wallet_sagas
1494            SET kind = :kind, state = :state, amount = :amount, mint_url = :mint_url,
1495                unit = :unit, quote_id = :quote_id, updated_at = :updated_at, data = :data,
1496                version = :new_version
1497            WHERE id = :id AND version = :expected_version
1498            "#,
1499        )?
1500        .bind("id", saga.id.to_string())
1501        .bind("kind", saga.kind.to_string())
1502        .bind("state", state_json)
1503        .bind("amount", u64::from(saga.amount) as i64)
1504        .bind("mint_url", saga.mint_url.to_string())
1505        .bind("unit", saga.unit.to_string())
1506        .bind("quote_id", saga.quote_id)
1507        .bind("updated_at", saga.updated_at as i64)
1508        .bind("data", data_json)
1509        .bind("new_version", saga.version as i64)
1510        .bind("expected_version", expected_version as i64)
1511        .execute(&*conn)
1512        .await?;
1513
1514        // Return true if the update succeeded (version matched), false if version mismatch
1515        Ok(rows_affected > 0)
1516    }
1517
1518    #[instrument(skip(self))]
1519    async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1520        let conn = self
1521            .pool
1522            .get()
1523            .await
1524            .map_err(|e| Error::Database(Box::new(e)))?;
1525
1526        query(r#"DELETE FROM wallet_sagas WHERE id = :id"#)?
1527            .bind("id", id.to_string())
1528            .execute(&*conn)
1529            .await?;
1530
1531        Ok(())
1532    }
1533
1534    #[instrument(skip(self))]
1535    async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1536        let conn = self
1537            .pool
1538            .get()
1539            .await
1540            .map_err(|e| Error::Database(Box::new(e)))?;
1541
1542        let rows = query(
1543            r#"
1544            SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1545            FROM wallet_sagas
1546            ORDER BY created_at ASC
1547            "#,
1548        )?
1549        .fetch_all(&*conn)
1550        .await?;
1551
1552        rows.into_iter().map(sql_row_to_wallet_saga).collect()
1553    }
1554
1555    #[instrument(skip(self))]
1556    async fn reserve_proofs(
1557        &self,
1558        ys: Vec<PublicKey>,
1559        operation_id: &uuid::Uuid,
1560    ) -> Result<(), database::Error> {
1561        let conn = self
1562            .pool
1563            .get()
1564            .await
1565            .map_err(|e| Error::Database(Box::new(e)))?;
1566
1567        for y in ys {
1568            let rows_affected = query(
1569                r#"
1570                UPDATE proof
1571                SET state = 'RESERVED', used_by_operation = :operation_id
1572                WHERE y = :y AND state = 'UNSPENT'
1573                "#,
1574            )?
1575            .bind("y", y.to_bytes().to_vec())
1576            .bind("operation_id", operation_id.to_string())
1577            .execute(&*conn)
1578            .await?;
1579
1580            if rows_affected == 0 {
1581                return Err(database::Error::ProofNotUnspent);
1582            }
1583        }
1584
1585        Ok(())
1586    }
1587
1588    #[instrument(skip(self))]
1589    async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1590        let conn = self
1591            .pool
1592            .get()
1593            .await
1594            .map_err(|e| Error::Database(Box::new(e)))?;
1595
1596        query(
1597            r#"
1598            UPDATE proof
1599            SET state = 'UNSPENT', used_by_operation = NULL
1600            WHERE used_by_operation = :operation_id
1601            "#,
1602        )?
1603        .bind("operation_id", operation_id.to_string())
1604        .execute(&*conn)
1605        .await?;
1606
1607        Ok(())
1608    }
1609
1610    #[instrument(skip(self))]
1611    async fn get_reserved_proofs(
1612        &self,
1613        operation_id: &uuid::Uuid,
1614    ) -> Result<Vec<ProofInfo>, database::Error> {
1615        let conn = self
1616            .pool
1617            .get()
1618            .await
1619            .map_err(|e| Error::Database(Box::new(e)))?;
1620
1621        let rows = query(
1622            r#"
1623            SELECT
1624                amount,
1625                unit,
1626                keyset_id,
1627                secret,
1628                c,
1629                witness,
1630                dleq_e,
1631                dleq_s,
1632                dleq_r,
1633                y,
1634                mint_url,
1635                state,
1636                spending_condition,
1637                used_by_operation,
1638                created_by_operation,
1639                p2pk_e
1640            FROM proof
1641            WHERE used_by_operation = :operation_id
1642            "#,
1643        )?
1644        .bind("operation_id", operation_id.to_string())
1645        .fetch_all(&*conn)
1646        .await?;
1647
1648        rows.into_iter().map(sql_row_to_proof_info).collect()
1649    }
1650
1651    #[instrument(skip(self))]
1652    async fn reserve_melt_quote(
1653        &self,
1654        quote_id: &str,
1655        operation_id: &uuid::Uuid,
1656    ) -> Result<(), database::Error> {
1657        let conn = self
1658            .pool
1659            .get()
1660            .await
1661            .map_err(|e| Error::Database(Box::new(e)))?;
1662
1663        let rows_affected = query(
1664            r#"
1665            UPDATE melt_quote
1666            SET used_by_operation = :operation_id
1667            WHERE id = :quote_id AND used_by_operation IS NULL
1668            "#,
1669        )?
1670        .bind("operation_id", operation_id.to_string())
1671        .bind("quote_id", quote_id)
1672        .execute(&*conn)
1673        .await?;
1674
1675        if rows_affected == 0 {
1676            // Check if the quote exists
1677            let exists = query(
1678                r#"
1679                SELECT 1 FROM melt_quote WHERE id = :quote_id
1680                "#,
1681            )?
1682            .bind("quote_id", quote_id)
1683            .fetch_one(&*conn)
1684            .await?;
1685
1686            if exists.is_none() {
1687                return Err(database::Error::UnknownQuote);
1688            }
1689            return Err(database::Error::QuoteAlreadyInUse);
1690        }
1691
1692        Ok(())
1693    }
1694
1695    #[instrument(skip(self))]
1696    async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1697        let conn = self
1698            .pool
1699            .get()
1700            .await
1701            .map_err(|e| Error::Database(Box::new(e)))?;
1702
1703        query(
1704            r#"
1705            UPDATE melt_quote
1706            SET used_by_operation = NULL
1707            WHERE used_by_operation = :operation_id
1708            "#,
1709        )?
1710        .bind("operation_id", operation_id.to_string())
1711        .execute(&*conn)
1712        .await?;
1713
1714        Ok(())
1715    }
1716
1717    #[instrument(skip(self))]
1718    async fn reserve_mint_quote(
1719        &self,
1720        quote_id: &str,
1721        operation_id: &uuid::Uuid,
1722    ) -> Result<(), database::Error> {
1723        let conn = self
1724            .pool
1725            .get()
1726            .await
1727            .map_err(|e| Error::Database(Box::new(e)))?;
1728
1729        let rows_affected = query(
1730            r#"
1731            UPDATE mint_quote
1732            SET used_by_operation = :operation_id
1733            WHERE id = :quote_id AND used_by_operation IS NULL
1734            "#,
1735        )?
1736        .bind("operation_id", operation_id.to_string())
1737        .bind("quote_id", quote_id)
1738        .execute(&*conn)
1739        .await?;
1740
1741        if rows_affected == 0 {
1742            // Check if the quote exists
1743            let exists = query(
1744                r#"
1745                SELECT 1 FROM mint_quote WHERE id = :quote_id
1746                "#,
1747            )?
1748            .bind("quote_id", quote_id)
1749            .fetch_one(&*conn)
1750            .await?;
1751
1752            if exists.is_none() {
1753                return Err(database::Error::UnknownQuote);
1754            }
1755            return Err(database::Error::QuoteAlreadyInUse);
1756        }
1757
1758        Ok(())
1759    }
1760
1761    #[instrument(skip(self))]
1762    async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1763        let conn = self
1764            .pool
1765            .get()
1766            .await
1767            .map_err(|e| Error::Database(Box::new(e)))?;
1768
1769        query(
1770            r#"
1771            UPDATE mint_quote
1772            SET used_by_operation = NULL
1773            WHERE used_by_operation = :operation_id
1774            "#,
1775        )?
1776        .bind("operation_id", operation_id.to_string())
1777        .execute(&*conn)
1778        .await?;
1779
1780        Ok(())
1781    }
1782
1783    async fn kv_read(
1784        &self,
1785        primary_namespace: &str,
1786        secondary_namespace: &str,
1787        key: &str,
1788    ) -> Result<Option<Vec<u8>>, database::Error> {
1789        crate::keyvalue::kv_read(&self.pool, primary_namespace, secondary_namespace, key).await
1790    }
1791
1792    async fn kv_list(
1793        &self,
1794        primary_namespace: &str,
1795        secondary_namespace: &str,
1796    ) -> Result<Vec<String>, database::Error> {
1797        crate::keyvalue::kv_list(&self.pool, primary_namespace, secondary_namespace).await
1798    }
1799
1800    async fn kv_write(
1801        &self,
1802        primary_namespace: &str,
1803        secondary_namespace: &str,
1804        key: &str,
1805        value: &[u8],
1806    ) -> Result<(), database::Error> {
1807        let conn = self
1808            .pool
1809            .get()
1810            .await
1811            .map_err(|e| Error::Database(Box::new(e)))?;
1812        crate::keyvalue::kv_write_standalone(
1813            &*conn,
1814            primary_namespace,
1815            secondary_namespace,
1816            key,
1817            value,
1818        )
1819        .await?;
1820        Ok(())
1821    }
1822
1823    async fn kv_remove(
1824        &self,
1825        primary_namespace: &str,
1826        secondary_namespace: &str,
1827        key: &str,
1828    ) -> Result<(), database::Error> {
1829        let conn = self
1830            .pool
1831            .get()
1832            .await
1833            .map_err(|e| Error::Database(Box::new(e)))?;
1834        crate::keyvalue::kv_remove_standalone(&*conn, primary_namespace, secondary_namespace, key)
1835            .await?;
1836        Ok(())
1837    }
1838
1839    // P2PK methods
1840
1841    #[instrument(skip(self))]
1842    async fn add_p2pk_key(
1843        &self,
1844        pubkey: &PublicKey,
1845        derivation_path: DerivationPath,
1846        derivation_index: u32,
1847    ) -> Result<(), Error> {
1848        let conn = self
1849            .pool
1850            .get()
1851            .await
1852            .map_err(|e| Error::Database(Box::new(e)))?;
1853        let query_str = r#"
1854        INSERT INTO p2pk_signing_key (pubkey, derivation_index, derivation_path, created_time)
1855        VALUES (:pubkey, :derivation_index, :derivation_path, :created_time)
1856        "#
1857        .to_string();
1858
1859        query(&query_str)?
1860            .bind("pubkey", pubkey.to_bytes().to_vec())
1861            .bind("derivation_index", derivation_index)
1862            .bind("derivation_path", derivation_path.to_string())
1863            .bind("created_time", unix_time() as i64)
1864            .execute(&*conn)
1865            .await?;
1866
1867        Ok(())
1868    }
1869
1870    #[instrument(skip(self))]
1871    async fn get_p2pk_key(
1872        &self,
1873        pubkey: &PublicKey,
1874    ) -> Result<Option<wallet::P2PKSigningKey>, Error> {
1875        let conn = self
1876            .pool
1877            .get()
1878            .await
1879            .map_err(|e| Error::Database(Box::new(e)))?;
1880        let query_str = r#"SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key WHERE pubkey = :pubkey"#.to_string();
1881
1882        query(&query_str)?
1883            .bind("pubkey", pubkey.to_bytes().to_vec())
1884            .fetch_one(&*conn)
1885            .await?
1886            .map(sql_row_to_p2pk_signing_key)
1887            .transpose()
1888    }
1889
1890    #[instrument(skip(self))]
1891    async fn list_p2pk_keys(&self) -> Result<Vec<wallet::P2PKSigningKey>, Error> {
1892        let conn = self
1893            .pool
1894            .get()
1895            .await
1896            .map_err(|e| Error::Database(Box::new(e)))?;
1897        let query_str = r#"
1898        SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key ORDER BY derivation_index DESC
1899        "#.to_string();
1900
1901        Ok(query(&query_str)?
1902            .fetch_all(&*conn)
1903            .await?
1904            .into_iter()
1905            .filter_map(|row| {
1906                let row = sql_row_to_p2pk_signing_key(row).ok()?;
1907
1908                Some(row)
1909            })
1910            .collect::<Vec<wallet::P2PKSigningKey>>())
1911    }
1912
1913    #[instrument(skip(self))]
1914    async fn latest_p2pk(&self) -> Result<Option<wallet::P2PKSigningKey>, Error> {
1915        let conn = self
1916            .pool
1917            .get()
1918            .await
1919            .map_err(|e| Error::Database(Box::new(e)))?;
1920        let query_str = r#"
1921        SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key ORDER BY derivation_index DESC LIMIT 1
1922        "#.to_string();
1923
1924        query(&query_str)?
1925            .fetch_one(&*conn)
1926            .await?
1927            .map(sql_row_to_p2pk_signing_key)
1928            .transpose()
1929    }
1930}
1931
1932fn sql_row_to_mint_info(row: Vec<Column>) -> Result<MintInfo, Error> {
1933    unpack_into!(
1934        let (
1935            name,
1936            pubkey,
1937            version,
1938            description,
1939            description_long,
1940            contact,
1941            nuts,
1942            icon_url,
1943            motd,
1944            urls,
1945            mint_time,
1946            tos_url
1947        ) = row
1948    );
1949
1950    Ok(MintInfo {
1951        name: column_as_nullable_string!(&name),
1952        pubkey: column_as_nullable_string!(&pubkey, |v| serde_json::from_str(v).ok(), |v| {
1953            serde_json::from_slice(v).ok()
1954        }),
1955        version: column_as_nullable_string!(&version).and_then(|v| serde_json::from_str(&v).ok()),
1956        description: column_as_nullable_string!(description),
1957        description_long: column_as_nullable_string!(description_long),
1958        contact: column_as_nullable_string!(contact, |v| serde_json::from_str(&v).ok()),
1959        nuts: column_as_nullable_string!(nuts, |v| serde_json::from_str(&v).ok())
1960            .unwrap_or_default(),
1961        urls: column_as_nullable_string!(urls, |v| serde_json::from_str(&v).ok()),
1962        icon_url: column_as_nullable_string!(icon_url),
1963        motd: column_as_nullable_string!(motd),
1964        time: column_as_nullable_number!(mint_time).map(|t| t),
1965        tos_url: column_as_nullable_string!(tos_url),
1966    })
1967}
1968
1969#[instrument(skip_all)]
1970fn sql_row_to_keyset(row: Vec<Column>) -> Result<KeySetInfo, Error> {
1971    unpack_into!(
1972        let (
1973            id,
1974            unit,
1975            active,
1976            input_fee_ppk,
1977            final_expiry
1978        ) = row
1979    );
1980
1981    Ok(KeySetInfo {
1982        id: column_as_string!(id, Id::from_str, Id::from_bytes),
1983        unit: column_as_string!(unit, CurrencyUnit::from_str),
1984        active: matches!(active, Column::Integer(1)),
1985        input_fee_ppk: column_as_nullable_number!(input_fee_ppk).unwrap_or(0),
1986        final_expiry: column_as_nullable_number!(final_expiry),
1987    })
1988}
1989
1990fn sql_row_to_mint_quote(row: Vec<Column>) -> Result<MintQuote, Error> {
1991    unpack_into!(
1992        let (
1993            id,
1994            mint_url,
1995            amount,
1996            unit,
1997            request,
1998            state,
1999            expiry,
2000            secret_key,
2001            row_method,
2002            row_amount_minted,
2003            row_amount_paid,
2004            estimated_blocks,
2005            used_by_operation,
2006            version
2007        ) = row
2008    );
2009
2010    let amount: Option<i64> = column_as_nullable_number!(amount);
2011
2012    let amount_paid: u64 = column_as_number!(row_amount_paid);
2013    let amount_minted: u64 = column_as_number!(row_amount_minted);
2014    let expiry_val: u64 = column_as_number!(expiry);
2015    let version_val: u32 = column_as_number!(version);
2016    let payment_method =
2017        PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
2018
2019    Ok(MintQuote {
2020        id: column_as_string!(id),
2021        mint_url: column_as_string!(mint_url, MintUrl::from_str),
2022        amount: amount.and_then(Amount::from_i64),
2023        unit: column_as_string!(unit, CurrencyUnit::from_str),
2024        request: column_as_string!(request),
2025        state: column_as_string!(state, MintQuoteState::from_str),
2026        expiry: expiry_val,
2027        secret_key: column_as_nullable_string!(secret_key, |s| SecretKey::from_str(&s).ok()),
2028        payment_method,
2029        amount_issued: Amount::from(amount_minted),
2030        amount_paid: Amount::from(amount_paid),
2031        estimated_blocks: column_as_nullable_number!(estimated_blocks),
2032        used_by_operation: column_as_nullable_string!(used_by_operation),
2033        version: version_val,
2034    })
2035}
2036
2037fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<wallet::MeltQuote, Error> {
2038    unpack_into!(
2039        let (
2040            id,
2041            unit,
2042            amount,
2043            request,
2044            fee_reserve,
2045            state,
2046            expiry,
2047            payment_proof,
2048            row_method,
2049            estimated_blocks,
2050            fee_index,
2051            used_by_operation,
2052            version,
2053            mint_url
2054        ) = row
2055    );
2056
2057    let payment_method =
2058        PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
2059
2060    let amount_val: u64 = column_as_number!(amount);
2061    let fee_reserve_val: u64 = column_as_number!(fee_reserve);
2062    let expiry_val: u64 = column_as_number!(expiry);
2063    let version_val: u32 = column_as_number!(version);
2064
2065    Ok(wallet::MeltQuote {
2066        id: column_as_string!(id),
2067        mint_url: column_as_nullable_string!(mint_url, |s| MintUrl::from_str(&s).ok()),
2068        unit: column_as_string!(unit, CurrencyUnit::from_str),
2069        amount: Amount::from(amount_val),
2070        request: column_as_string!(request),
2071        fee_reserve: Amount::from(fee_reserve_val),
2072        state: column_as_string!(state, MeltQuoteState::from_str),
2073        expiry: expiry_val,
2074        payment_proof: column_as_nullable_string!(payment_proof),
2075        estimated_blocks: column_as_nullable_number!(estimated_blocks),
2076        fee_index: column_as_nullable_number!(fee_index),
2077        payment_method,
2078        used_by_operation: column_as_nullable_string!(used_by_operation),
2079        version: version_val,
2080    })
2081}
2082
2083fn sql_row_to_proof_info(row: Vec<Column>) -> Result<ProofInfo, Error> {
2084    unpack_into!(
2085        let (
2086            amount,
2087            unit,
2088            keyset_id,
2089            secret,
2090            c,
2091            witness,
2092            dleq_e,
2093            dleq_s,
2094            dleq_r,
2095            y,
2096            mint_url,
2097            state,
2098            spending_condition,
2099            used_by_operation,
2100            created_by_operation,
2101            p2pk_e
2102        ) = row
2103    );
2104
2105    let dleq = match (
2106        column_as_nullable_binary!(dleq_e),
2107        column_as_nullable_binary!(dleq_s),
2108        column_as_nullable_binary!(dleq_r),
2109    ) {
2110        (Some(e), Some(s), Some(r)) => {
2111            let e_key = SecretKey::from_slice(&e)?;
2112            let s_key = SecretKey::from_slice(&s)?;
2113            let r_key = SecretKey::from_slice(&r)?;
2114
2115            Some(ProofDleq::new(e_key, s_key, r_key))
2116        }
2117        _ => None,
2118    };
2119
2120    let amount: u64 = column_as_number!(amount);
2121    let proof = Proof {
2122        amount: Amount::from(amount),
2123        keyset_id: column_as_string!(keyset_id, Id::from_str),
2124        secret: column_as_string!(secret, Secret::from_str),
2125        witness: column_as_nullable_string!(witness, |v| { serde_json::from_str(&v).ok() }, |v| {
2126            serde_json::from_slice(&v).ok()
2127        }),
2128        c: column_as_string!(c, PublicKey::from_str, PublicKey::from_slice),
2129        dleq,
2130        p2pk_e: column_as_nullable_binary!(p2pk_e)
2131            .map(|bytes| PublicKey::from_slice(&bytes))
2132            .transpose()?,
2133    };
2134
2135    let used_by_operation =
2136        column_as_nullable_string!(used_by_operation).and_then(|id| Uuid::from_str(&id).ok());
2137    let created_by_operation =
2138        column_as_nullable_string!(created_by_operation).and_then(|id| Uuid::from_str(&id).ok());
2139
2140    Ok(ProofInfo {
2141        proof,
2142        y: column_as_string!(y, PublicKey::from_str, PublicKey::from_slice),
2143        mint_url: column_as_string!(mint_url, MintUrl::from_str),
2144        state: column_as_string!(state, State::from_str),
2145        spending_condition: column_as_nullable_string!(
2146            spending_condition,
2147            |r| { serde_json::from_str(&r).ok() },
2148            |r| { serde_json::from_slice(&r).ok() }
2149        ),
2150        unit: column_as_string!(unit, CurrencyUnit::from_str),
2151        used_by_operation,
2152        created_by_operation,
2153    })
2154}
2155
2156fn sql_row_to_wallet_saga(row: Vec<Column>) -> Result<wallet::WalletSaga, Error> {
2157    unpack_into!(
2158        let (
2159            id,
2160            kind,
2161            state,
2162            amount,
2163            mint_url,
2164            unit,
2165            quote_id,
2166            created_at,
2167            updated_at,
2168            data,
2169            version
2170        ) = row
2171    );
2172
2173    let id_str: String = column_as_string!(id);
2174    let id = uuid::Uuid::parse_str(&id_str).map_err(|e| {
2175        Error::Database(Box::new(std::io::Error::new(
2176            std::io::ErrorKind::InvalidData,
2177            format!("Invalid UUID: {}", e),
2178        )))
2179    })?;
2180    let kind_str: String = column_as_string!(kind);
2181    let state_json: String = column_as_string!(state);
2182    let amount: u64 = column_as_number!(amount);
2183    let mint_url: MintUrl = column_as_string!(mint_url, MintUrl::from_str);
2184    let unit: CurrencyUnit = column_as_string!(unit, CurrencyUnit::from_str);
2185    let quote_id: Option<String> = column_as_nullable_string!(quote_id);
2186    let created_at: u64 = column_as_number!(created_at);
2187    let updated_at: u64 = column_as_number!(updated_at);
2188    let data_json: String = column_as_string!(data);
2189    let version: u32 = column_as_number!(version);
2190
2191    let kind = wallet::OperationKind::from_str(&kind_str).map_err(|_| {
2192        Error::Database(Box::new(std::io::Error::new(
2193            std::io::ErrorKind::InvalidData,
2194            format!("Invalid operation kind: {}", kind_str),
2195        )))
2196    })?;
2197    let state: wallet::WalletSagaState = serde_json::from_str(&state_json).map_err(|e| {
2198        Error::Database(Box::new(std::io::Error::new(
2199            std::io::ErrorKind::InvalidData,
2200            format!("Failed to deserialize saga state: {}", e),
2201        )))
2202    })?;
2203    let data: wallet::OperationData = serde_json::from_str(&data_json).map_err(|e| {
2204        Error::Database(Box::new(std::io::Error::new(
2205            std::io::ErrorKind::InvalidData,
2206            format!("Failed to deserialize saga data: {}", e),
2207        )))
2208    })?;
2209
2210    Ok(wallet::WalletSaga {
2211        id,
2212        kind,
2213        state,
2214        amount: Amount::from(amount),
2215        mint_url,
2216        unit,
2217        quote_id,
2218        created_at,
2219        updated_at,
2220        data,
2221        version,
2222    })
2223}
2224
2225fn sql_row_to_transaction(row: Vec<Column>) -> Result<Transaction, Error> {
2226    unpack_into!(
2227        let (
2228            mint_url,
2229            direction,
2230            unit,
2231            amount,
2232            fee,
2233            ys,
2234            timestamp,
2235            memo,
2236            metadata,
2237            quote_id,
2238            payment_request,
2239            payment_proof,
2240            payment_method,
2241            saga_id
2242        ) = row
2243    );
2244
2245    let amount: u64 = column_as_number!(amount);
2246    let fee: u64 = column_as_number!(fee);
2247
2248    let saga_id: Option<Uuid> = column_as_nullable_string!(saga_id)
2249        .map(|id| Uuid::from_str(&id).ok())
2250        .flatten();
2251
2252    Ok(Transaction {
2253        mint_url: column_as_string!(mint_url, MintUrl::from_str),
2254        direction: column_as_string!(direction, TransactionDirection::from_str),
2255        unit: column_as_string!(unit, CurrencyUnit::from_str),
2256        amount: Amount::from(amount),
2257        fee: Amount::from(fee),
2258        ys: column_as_binary!(ys)
2259            .chunks(33)
2260            .map(PublicKey::from_slice)
2261            .collect::<Result<Vec<_>, _>>()?,
2262        timestamp: column_as_number!(timestamp),
2263        memo: column_as_nullable_string!(memo),
2264        metadata: column_as_nullable_string!(metadata, |v| serde_json::from_str(&v).ok(), |v| {
2265            serde_json::from_slice(&v).ok()
2266        })
2267        .unwrap_or_default(),
2268        quote_id: column_as_nullable_string!(quote_id),
2269        payment_request: column_as_nullable_string!(payment_request),
2270        payment_proof: column_as_nullable_string!(payment_proof),
2271        payment_method: column_as_nullable_string!(payment_method)
2272            .map(|v| PaymentMethod::from_str(&v))
2273            .transpose()
2274            .map_err(Error::from)?,
2275        saga_id,
2276    })
2277}
2278
2279fn sql_row_to_p2pk_signing_key(row: Vec<Column>) -> Result<wallet::P2PKSigningKey, Error> {
2280    unpack_into!(
2281        let (
2282            pubkey,
2283            derivation_index,
2284            derivation_path,
2285            created_time
2286        ) = row
2287    );
2288
2289    Ok(wallet::P2PKSigningKey {
2290        pubkey: column_as_string!(pubkey, PublicKey::from_str, PublicKey::from_slice),
2291        derivation_index: column_as_number!(derivation_index),
2292        derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
2293        created_time: column_as_number!(created_time),
2294    })
2295}