cdk_sql_common/wallet/
mod.rs

1//! SQLite Wallet Database
2
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use cdk_common::common::ProofInfo;
10use cdk_common::database::{ConversionError, Error, WalletDatabase};
11use cdk_common::mint_url::MintUrl;
12use cdk_common::nuts::{MeltQuoteState, MintQuoteState};
13use cdk_common::secret::Secret;
14use cdk_common::wallet::{self, MintQuote, Transaction, TransactionDirection, TransactionId};
15use cdk_common::{
16    database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod, Proof,
17    ProofDleq, PublicKey, SecretKey, SpendingConditions, State,
18};
19use tracing::instrument;
20
21use crate::common::migrate;
22use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
23use crate::pool::{DatabasePool, Pool, PooledResource};
24use crate::stmt::{query, Column};
25use crate::{
26    column_as_binary, column_as_nullable_binary, column_as_nullable_number,
27    column_as_nullable_string, column_as_number, column_as_string, unpack_into,
28};
29
30#[rustfmt::skip]
31mod migrations {
32    include!(concat!(env!("OUT_DIR"), "/migrations_wallet.rs"));
33}
34
35/// Wallet SQLite Database
36#[derive(Debug, Clone)]
37pub struct SQLWalletDatabase<RM>
38where
39    RM: DatabasePool + 'static,
40{
41    pool: Arc<Pool<RM>>,
42}
43
44impl<RM> SQLWalletDatabase<RM>
45where
46    RM: DatabasePool + 'static,
47{
48    /// Creates a new instance
49    pub async fn new<X>(db: X) -> Result<Self, Error>
50    where
51        X: Into<RM::Config>,
52    {
53        let pool = Pool::new(db.into());
54        Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
55
56        Ok(Self { pool })
57    }
58
59    /// Migrate [`WalletSqliteDatabase`]
60    async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
61        let tx = ConnectionWithTransaction::new(conn).await?;
62        migrate(&tx, RM::Connection::name(), migrations::MIGRATIONS).await?;
63        // Update any existing keys with missing keyset_u32 values
64        Self::add_keyset_u32(&tx).await?;
65        tx.commit().await?;
66
67        Ok(())
68    }
69
70    async fn add_keyset_u32<T>(conn: &T) -> Result<(), Error>
71    where
72        T: DatabaseExecutor,
73    {
74        // First get the keysets where keyset_u32 on key is null
75        let keys_without_u32: Vec<Vec<Column>> = query(
76            r#"
77            SELECT
78                id
79            FROM key
80            WHERE keyset_u32 IS NULL
81            "#,
82        )?
83        .fetch_all(conn)
84        .await?;
85
86        for id in keys_without_u32 {
87            let id = column_as_string!(id.first().unwrap());
88
89            if let Ok(id) = Id::from_str(&id) {
90                query(
91                    r#"
92            UPDATE
93                key
94            SET keyset_u32 = :u32_keyset
95            WHERE id = :keyset_id
96            "#,
97                )?
98                .bind("u32_keyset", u32::from(id))
99                .bind("keyset_id", id.to_string())
100                .execute(conn)
101                .await?;
102            }
103        }
104
105        // Also update keysets where keyset_u32 is null
106        let keysets_without_u32: Vec<Vec<Column>> = query(
107            r#"
108            SELECT
109                id
110            FROM keyset
111            WHERE keyset_u32 IS NULL
112            "#,
113        )?
114        .fetch_all(conn)
115        .await?;
116
117        for id in keysets_without_u32 {
118            let id = column_as_string!(id.first().unwrap());
119
120            if let Ok(id) = Id::from_str(&id) {
121                query(
122                    r#"
123            UPDATE
124                keyset
125            SET keyset_u32 = :u32_keyset
126            WHERE id = :keyset_id
127            "#,
128                )?
129                .bind("u32_keyset", u32::from(id))
130                .bind("keyset_id", id.to_string())
131                .execute(conn)
132                .await?;
133            }
134        }
135
136        Ok(())
137    }
138}
139
140#[async_trait]
141impl<RM> WalletDatabase for SQLWalletDatabase<RM>
142where
143    RM: DatabasePool + 'static,
144{
145    type Err = database::Error;
146
147    #[instrument(skip(self))]
148    async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, Self::Err> {
149        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
150
151        Ok(query(
152            r#"
153              SELECT
154                  id,
155                  unit,
156                  amount,
157                  request,
158                  fee_reserve,
159                  state,
160                  expiry,
161                  payment_preimage,
162                  payment_method
163              FROM
164                  melt_quote
165              "#,
166        )?
167        .fetch_all(&*conn)
168        .await?
169        .into_iter()
170        .map(sql_row_to_melt_quote)
171        .collect::<Result<_, _>>()?)
172    }
173
174    #[instrument(skip(self, mint_info))]
175    async fn add_mint(
176        &self,
177        mint_url: MintUrl,
178        mint_info: Option<MintInfo>,
179    ) -> Result<(), Self::Err> {
180        let (
181            name,
182            pubkey,
183            version,
184            description,
185            description_long,
186            contact,
187            nuts,
188            icon_url,
189            urls,
190            motd,
191            time,
192            tos_url,
193        ) = match mint_info {
194            Some(mint_info) => {
195                let MintInfo {
196                    name,
197                    pubkey,
198                    version,
199                    description,
200                    description_long,
201                    contact,
202                    nuts,
203                    icon_url,
204                    urls,
205                    motd,
206                    time,
207                    tos_url,
208                } = mint_info;
209
210                (
211                    name,
212                    pubkey.map(|p| p.to_bytes().to_vec()),
213                    version.map(|v| serde_json::to_string(&v).ok()),
214                    description,
215                    description_long,
216                    contact.map(|c| serde_json::to_string(&c).ok()),
217                    serde_json::to_string(&nuts).ok(),
218                    icon_url,
219                    urls.map(|c| serde_json::to_string(&c).ok()),
220                    motd,
221                    time,
222                    tos_url,
223                )
224            }
225            None => (
226                None, None, None, None, None, None, None, None, None, None, None, None,
227            ),
228        };
229
230        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
231
232        query(
233            r#"
234INSERT INTO mint
235(
236    mint_url, name, pubkey, version, description, description_long,
237    contact, nuts, icon_url, urls, motd, mint_time, tos_url
238)
239VALUES
240(
241    :mint_url, :name, :pubkey, :version, :description, :description_long,
242    :contact, :nuts, :icon_url, :urls, :motd, :mint_time, :tos_url
243)
244ON CONFLICT(mint_url) DO UPDATE SET
245    name = excluded.name,
246    pubkey = excluded.pubkey,
247    version = excluded.version,
248    description = excluded.description,
249    description_long = excluded.description_long,
250    contact = excluded.contact,
251    nuts = excluded.nuts,
252    icon_url = excluded.icon_url,
253    urls = excluded.urls,
254    motd = excluded.motd,
255    mint_time = excluded.mint_time,
256    tos_url = excluded.tos_url
257;
258        "#,
259        )?
260        .bind("mint_url", mint_url.to_string())
261        .bind("name", name)
262        .bind("pubkey", pubkey)
263        .bind("version", version)
264        .bind("description", description)
265        .bind("description_long", description_long)
266        .bind("contact", contact)
267        .bind("nuts", nuts)
268        .bind("icon_url", icon_url)
269        .bind("urls", urls)
270        .bind("motd", motd)
271        .bind("mint_time", time.map(|v| v as i64))
272        .bind("tos_url", tos_url)
273        .execute(&*conn)
274        .await?;
275
276        Ok(())
277    }
278
279    #[instrument(skip(self))]
280    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), Self::Err> {
281        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
282
283        query(r#"DELETE FROM mint WHERE mint_url=:mint_url"#)?
284            .bind("mint_url", mint_url.to_string())
285            .execute(&*conn)
286            .await?;
287
288        Ok(())
289    }
290
291    #[instrument(skip(self))]
292    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, Self::Err> {
293        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
294        Ok(query(
295            r#"
296            SELECT
297                name,
298                pubkey,
299                version,
300                description,
301                description_long,
302                contact,
303                nuts,
304                icon_url,
305                motd,
306                urls,
307                mint_time,
308                tos_url
309            FROM
310                mint
311            WHERE mint_url = :mint_url
312            "#,
313        )?
314        .bind("mint_url", mint_url.to_string())
315        .fetch_one(&*conn)
316        .await?
317        .map(sql_row_to_mint_info)
318        .transpose()?)
319    }
320
321    #[instrument(skip(self))]
322    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, Self::Err> {
323        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
324        Ok(query(
325            r#"
326                SELECT
327                    name,
328                    pubkey,
329                    version,
330                    description,
331                    description_long,
332                    contact,
333                    nuts,
334                    icon_url,
335                    motd,
336                    urls,
337                    mint_time,
338                    tos_url,
339                    mint_url
340                FROM
341                    mint
342                "#,
343        )?
344        .fetch_all(&*conn)
345        .await?
346        .into_iter()
347        .map(|mut row| {
348            let url = column_as_string!(
349                row.pop().ok_or(ConversionError::MissingColumn(0, 1))?,
350                MintUrl::from_str
351            );
352
353            Ok((url, sql_row_to_mint_info(row).ok()))
354        })
355        .collect::<Result<HashMap<_, _>, Error>>()?)
356    }
357
358    #[instrument(skip(self))]
359    async fn update_mint_url(
360        &self,
361        old_mint_url: MintUrl,
362        new_mint_url: MintUrl,
363    ) -> Result<(), Self::Err> {
364        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
365        let tables = ["mint_quote", "proof"];
366
367        for table in &tables {
368            query(&format!(
369                r#"
370                UPDATE {table}
371                SET mint_url = :new_mint_url
372                WHERE mint_url = :old_mint_url
373            "#
374            ))?
375            .bind("new_mint_url", new_mint_url.to_string())
376            .bind("old_mint_url", old_mint_url.to_string())
377            .execute(&*conn)
378            .await?;
379        }
380
381        Ok(())
382    }
383
384    #[instrument(skip(self, keysets))]
385    async fn add_mint_keysets(
386        &self,
387        mint_url: MintUrl,
388        keysets: Vec<KeySetInfo>,
389    ) -> Result<(), Self::Err> {
390        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
391
392        for keyset in keysets {
393            query(
394                r#"
395    INSERT INTO keyset
396    (mint_url, id, unit, active, input_fee_ppk, final_expiry, keyset_u32)
397    VALUES
398    (:mint_url, :id, :unit, :active, :input_fee_ppk, :final_expiry, :keyset_u32)
399    ON CONFLICT(id) DO UPDATE SET
400        active = excluded.active,
401        input_fee_ppk = excluded.input_fee_ppk
402    "#,
403            )?
404            .bind("mint_url", mint_url.to_string())
405            .bind("id", keyset.id.to_string())
406            .bind("unit", keyset.unit.to_string())
407            .bind("active", keyset.active)
408            .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
409            .bind("final_expiry", keyset.final_expiry.map(|v| v as i64))
410            .bind("keyset_u32", u32::from(keyset.id))
411            .execute(&*conn)
412            .await?;
413        }
414
415        Ok(())
416    }
417
418    #[instrument(skip(self))]
419    async fn get_mint_keysets(
420        &self,
421        mint_url: MintUrl,
422    ) -> Result<Option<Vec<KeySetInfo>>, Self::Err> {
423        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
424
425        let keysets = query(
426            r#"
427            SELECT
428                id,
429                unit,
430                active,
431                input_fee_ppk,
432                final_expiry
433            FROM
434                keyset
435            WHERE mint_url = :mint_url
436            "#,
437        )?
438        .bind("mint_url", mint_url.to_string())
439        .fetch_all(&*conn)
440        .await?
441        .into_iter()
442        .map(sql_row_to_keyset)
443        .collect::<Result<Vec<_>, Error>>()?;
444
445        match keysets.is_empty() {
446            false => Ok(Some(keysets)),
447            true => Ok(None),
448        }
449    }
450
451    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
452    async fn get_keyset_by_id(&self, keyset_id: &Id) -> Result<Option<KeySetInfo>, Self::Err> {
453        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
454        Ok(query(
455            r#"
456            SELECT
457                id,
458                unit,
459                active,
460                input_fee_ppk,
461                final_expiry
462            FROM
463                keyset
464            WHERE id = :id
465            "#,
466        )?
467        .bind("id", keyset_id.to_string())
468        .fetch_one(&*conn)
469        .await?
470        .map(sql_row_to_keyset)
471        .transpose()?)
472    }
473
474    #[instrument(skip_all)]
475    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), Self::Err> {
476        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
477        query(
478            r#"
479INSERT INTO mint_quote
480(id, mint_url, amount, unit, request, state, expiry, secret_key, payment_method, amount_issued, amount_paid)
481VALUES
482(:id, :mint_url, :amount, :unit, :request, :state, :expiry, :secret_key, :payment_method, :amount_issued, :amount_paid)
483ON CONFLICT(id) DO UPDATE SET
484    mint_url = excluded.mint_url,
485    amount = excluded.amount,
486    unit = excluded.unit,
487    request = excluded.request,
488    state = excluded.state,
489    expiry = excluded.expiry,
490    secret_key = excluded.secret_key,
491    payment_method = excluded.payment_method,
492    amount_issued = excluded.amount_issued,
493    amount_paid = excluded.amount_paid
494;
495        "#,
496        )?
497        .bind("id", quote.id.to_string())
498        .bind("mint_url", quote.mint_url.to_string())
499        .bind("amount", quote.amount.map(|a| a.to_i64()))
500        .bind("unit", quote.unit.to_string())
501        .bind("request", quote.request)
502        .bind("state", quote.state.to_string())
503        .bind("expiry", quote.expiry as i64)
504        .bind("secret_key", quote.secret_key.map(|p| p.to_string()))
505        .bind("payment_method", quote.payment_method.to_string())
506        .bind("amount_issued", quote.amount_issued.to_i64())
507        .bind("amount_paid", quote.amount_paid.to_i64())
508        .execute(&*conn).await?;
509
510        Ok(())
511    }
512
513    #[instrument(skip(self))]
514    async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, Self::Err> {
515        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
516        Ok(query(
517            r#"
518            SELECT
519                id,
520                mint_url,
521                amount,
522                unit,
523                request,
524                state,
525                expiry,
526                secret_key,
527                payment_method,
528                amount_issued,
529                amount_paid
530            FROM
531                mint_quote
532            WHERE
533                id = :id
534            "#,
535        )?
536        .bind("id", quote_id.to_string())
537        .fetch_one(&*conn)
538        .await?
539        .map(sql_row_to_mint_quote)
540        .transpose()?)
541    }
542
543    #[instrument(skip(self))]
544    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
545        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
546        Ok(query(
547            r#"
548            SELECT
549                id,
550                mint_url,
551                amount,
552                unit,
553                request,
554                state,
555                expiry,
556                secret_key,
557                payment_method,
558                amount_issued,
559                amount_paid
560            FROM
561                mint_quote
562            "#,
563        )?
564        .fetch_all(&*conn)
565        .await?
566        .into_iter()
567        .map(sql_row_to_mint_quote)
568        .collect::<Result<_, _>>()?)
569    }
570
571    #[instrument(skip(self))]
572    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
573        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
574        query(r#"DELETE FROM mint_quote WHERE id=:id"#)?
575            .bind("id", quote_id.to_string())
576            .execute(&*conn)
577            .await?;
578
579        Ok(())
580    }
581
582    #[instrument(skip_all)]
583    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), Self::Err> {
584        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
585        query(
586            r#"
587INSERT INTO melt_quote
588(id, unit, amount, request, fee_reserve, state, expiry, payment_method)
589VALUES
590(:id, :unit, :amount, :request, :fee_reserve, :state, :expiry, :payment_method)
591ON CONFLICT(id) DO UPDATE SET
592    unit = excluded.unit,
593    amount = excluded.amount,
594    request = excluded.request,
595    fee_reserve = excluded.fee_reserve,
596    state = excluded.state,
597    expiry = excluded.expiry,
598    payment_method = excluded.payment_method
599;
600        "#,
601        )?
602        .bind("id", quote.id.to_string())
603        .bind("unit", quote.unit.to_string())
604        .bind("amount", u64::from(quote.amount) as i64)
605        .bind("request", quote.request)
606        .bind("fee_reserve", u64::from(quote.fee_reserve) as i64)
607        .bind("state", quote.state.to_string())
608        .bind("expiry", quote.expiry as i64)
609        .bind("payment_method", quote.payment_method.to_string())
610        .execute(&*conn)
611        .await?;
612
613        Ok(())
614    }
615
616    #[instrument(skip(self))]
617    async fn get_melt_quote(&self, quote_id: &str) -> Result<Option<wallet::MeltQuote>, Self::Err> {
618        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
619        Ok(query(
620            r#"
621            SELECT
622                id,
623                unit,
624                amount,
625                request,
626                fee_reserve,
627                state,
628                expiry,
629                payment_preimage,
630                payment_method
631            FROM
632                melt_quote
633            WHERE
634                id=:id
635            "#,
636        )?
637        .bind("id", quote_id.to_owned())
638        .fetch_one(&*conn)
639        .await?
640        .map(sql_row_to_melt_quote)
641        .transpose()?)
642    }
643
644    #[instrument(skip(self))]
645    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
646        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
647        query(r#"DELETE FROM melt_quote WHERE id=:id"#)?
648            .bind("id", quote_id.to_owned())
649            .execute(&*conn)
650            .await?;
651
652        Ok(())
653    }
654
655    #[instrument(skip_all)]
656    async fn add_keys(&self, keyset: KeySet) -> Result<(), Self::Err> {
657        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
658
659        // Recompute ID for verification
660        keyset.verify_id()?;
661
662        query(
663            r#"
664            INSERT INTO key
665            (id, keys, keyset_u32)
666            VALUES
667            (:id, :keys, :keyset_u32)
668        "#,
669        )?
670        .bind("id", keyset.id.to_string())
671        .bind(
672            "keys",
673            serde_json::to_string(&keyset.keys).map_err(Error::from)?,
674        )
675        .bind("keyset_u32", u32::from(keyset.id))
676        .execute(&*conn)
677        .await?;
678
679        Ok(())
680    }
681
682    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
683    async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, Self::Err> {
684        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
685        Ok(query(
686            r#"
687            SELECT
688                keys
689            FROM key
690            WHERE id = :id
691            "#,
692        )?
693        .bind("id", keyset_id.to_string())
694        .pluck(&*conn)
695        .await?
696        .map(|keys| {
697            let keys = column_as_string!(keys);
698            serde_json::from_str(&keys).map_err(Error::from)
699        })
700        .transpose()?)
701    }
702
703    #[instrument(skip(self))]
704    async fn remove_keys(&self, id: &Id) -> Result<(), Self::Err> {
705        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
706        query(r#"DELETE FROM key WHERE id = :id"#)?
707            .bind("id", id.to_string())
708            .pluck(&*conn)
709            .await?;
710
711        Ok(())
712    }
713
714    async fn update_proofs(
715        &self,
716        added: Vec<ProofInfo>,
717        removed_ys: Vec<PublicKey>,
718    ) -> Result<(), Self::Err> {
719        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
720
721        let tx = ConnectionWithTransaction::new(conn).await?;
722
723        // TODO: Use a transaction for all these operations
724        for proof in added {
725            query(
726                r#"
727    INSERT INTO proof
728    (y, mint_url, state, spending_condition, unit, amount, keyset_id, secret, c, witness, dleq_e, dleq_s, dleq_r)
729    VALUES
730    (:y, :mint_url, :state, :spending_condition, :unit, :amount, :keyset_id, :secret, :c, :witness, :dleq_e, :dleq_s, :dleq_r)
731    ON CONFLICT(y) DO UPDATE SET
732        mint_url = excluded.mint_url,
733        state = excluded.state,
734        spending_condition = excluded.spending_condition,
735        unit = excluded.unit,
736        amount = excluded.amount,
737        keyset_id = excluded.keyset_id,
738        secret = excluded.secret,
739        c = excluded.c,
740        witness = excluded.witness,
741        dleq_e = excluded.dleq_e,
742        dleq_s = excluded.dleq_s,
743        dleq_r = excluded.dleq_r
744    ;
745            "#,
746            )?
747            .bind("y", proof.y.to_bytes().to_vec())
748            .bind("mint_url", proof.mint_url.to_string())
749            .bind("state",proof.state.to_string())
750            .bind(
751                "spending_condition",
752                proof
753                    .spending_condition
754                    .map(|s| serde_json::to_string(&s).ok()),
755            )
756            .bind("unit", proof.unit.to_string())
757            .bind("amount", u64::from(proof.proof.amount) as i64)
758            .bind("keyset_id", proof.proof.keyset_id.to_string())
759            .bind("secret", proof.proof.secret.to_string())
760            .bind("c", proof.proof.c.to_bytes().to_vec())
761            .bind(
762                "witness",
763                proof
764                    .proof
765                    .witness
766                    .map(|w| serde_json::to_string(&w).unwrap()),
767            )
768            .bind(
769                "dleq_e",
770                proof.proof.dleq.as_ref().map(|dleq| dleq.e.to_secret_bytes().to_vec()),
771            )
772            .bind(
773                "dleq_s",
774                proof.proof.dleq.as_ref().map(|dleq| dleq.s.to_secret_bytes().to_vec()),
775            )
776            .bind(
777                "dleq_r",
778                proof.proof.dleq.as_ref().map(|dleq| dleq.r.to_secret_bytes().to_vec()),
779            )
780            .execute(&tx).await?;
781        }
782        if !removed_ys.is_empty() {
783            query(r#"DELETE FROM proof WHERE y IN (:ys)"#)?
784                .bind_vec(
785                    "ys",
786                    removed_ys.iter().map(|y| y.to_bytes().to_vec()).collect(),
787                )
788                .execute(&tx)
789                .await?;
790        }
791
792        tx.commit().await?;
793
794        Ok(())
795    }
796
797    #[instrument(skip(self, state, spending_conditions))]
798    async fn get_proofs(
799        &self,
800        mint_url: Option<MintUrl>,
801        unit: Option<CurrencyUnit>,
802        state: Option<Vec<State>>,
803        spending_conditions: Option<Vec<SpendingConditions>>,
804    ) -> Result<Vec<ProofInfo>, Self::Err> {
805        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
806        Ok(query(
807            r#"
808            SELECT
809                amount,
810                unit,
811                keyset_id,
812                secret,
813                c,
814                witness,
815                dleq_e,
816                dleq_s,
817                dleq_r,
818                y,
819                mint_url,
820                state,
821                spending_condition
822            FROM proof
823        "#,
824        )?
825        .fetch_all(&*conn)
826        .await?
827        .into_iter()
828        .filter_map(|row| {
829            let row = sql_row_to_proof_info(row).ok()?;
830
831            if row.matches_conditions(&mint_url, &unit, &state, &spending_conditions) {
832                Some(row)
833            } else {
834                None
835            }
836        })
837        .collect::<Vec<_>>())
838    }
839
840    async fn get_balance(
841        &self,
842        mint_url: Option<MintUrl>,
843        unit: Option<CurrencyUnit>,
844        states: Option<Vec<State>>,
845    ) -> Result<u64, Self::Err> {
846        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
847
848        let mut query_str = "SELECT COALESCE(SUM(amount), 0) as total FROM proof".to_string();
849        let mut where_clauses = Vec::new();
850        let states = states
851            .unwrap_or_default()
852            .into_iter()
853            .map(|x| x.to_string())
854            .collect::<Vec<_>>();
855
856        if mint_url.is_some() {
857            where_clauses.push("mint_url = :mint_url");
858        }
859        if unit.is_some() {
860            where_clauses.push("unit = :unit");
861        }
862        if !states.is_empty() {
863            where_clauses.push("state IN (:states)");
864        }
865
866        if !where_clauses.is_empty() {
867            query_str.push_str(" WHERE ");
868            query_str.push_str(&where_clauses.join(" AND "));
869        }
870
871        let mut q = query(&query_str)?;
872
873        if let Some(ref mint_url) = mint_url {
874            q = q.bind("mint_url", mint_url.to_string());
875        }
876        if let Some(ref unit) = unit {
877            q = q.bind("unit", unit.to_string());
878        }
879
880        if !states.is_empty() {
881            q = q.bind_vec("states", states);
882        }
883
884        let balance = q
885            .pluck(&*conn)
886            .await?
887            .map(|n| {
888                // SQLite SUM returns INTEGER which we need to convert to u64
889                match n {
890                    crate::stmt::Column::Integer(i) => Ok(i as u64),
891                    crate::stmt::Column::Real(f) => Ok(f as u64),
892                    _ => Err(Error::Database(Box::new(std::io::Error::new(
893                        std::io::ErrorKind::InvalidData,
894                        "Invalid balance type",
895                    )))),
896                }
897            })
898            .transpose()?
899            .unwrap_or(0);
900
901        Ok(balance)
902    }
903
904    async fn update_proofs_state(&self, ys: Vec<PublicKey>, state: State) -> Result<(), Self::Err> {
905        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
906        query("UPDATE proof SET state = :state WHERE y IN (:ys)")?
907            .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
908            .bind("state", state.to_string())
909            .execute(&*conn)
910            .await?;
911
912        Ok(())
913    }
914
915    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
916    async fn increment_keyset_counter(&self, keyset_id: &Id, count: u32) -> Result<u32, Self::Err> {
917        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
918        let tx = ConnectionWithTransaction::new(conn).await?;
919
920        // Lock the row and get current counter from keyset_counter table
921        let current_counter = query(
922            r#"
923            SELECT counter
924            FROM keyset_counter
925            WHERE keyset_id=:keyset_id
926            FOR UPDATE
927            "#,
928        )?
929        .bind("keyset_id", keyset_id.to_string())
930        .pluck(&tx)
931        .await?
932        .map(|n| Ok::<_, Error>(column_as_number!(n)))
933        .transpose()?
934        .unwrap_or(0);
935
936        let new_counter = current_counter + count;
937
938        // Upsert the new counter value
939        query(
940            r#"
941            INSERT INTO keyset_counter (keyset_id, counter)
942            VALUES (:keyset_id, :new_counter)
943            ON CONFLICT(keyset_id) DO UPDATE SET
944                counter = excluded.counter
945            "#,
946        )?
947        .bind("keyset_id", keyset_id.to_string())
948        .bind("new_counter", new_counter)
949        .execute(&tx)
950        .await?;
951
952        tx.commit().await?;
953
954        Ok(new_counter)
955    }
956
957    #[instrument(skip(self))]
958    async fn add_transaction(&self, transaction: Transaction) -> Result<(), Self::Err> {
959        let mint_url = transaction.mint_url.to_string();
960        let direction = transaction.direction.to_string();
961        let unit = transaction.unit.to_string();
962        let amount = u64::from(transaction.amount) as i64;
963        let fee = u64::from(transaction.fee) as i64;
964        let ys = transaction
965            .ys
966            .iter()
967            .flat_map(|y| y.to_bytes().to_vec())
968            .collect::<Vec<_>>();
969
970        let id = transaction.id();
971
972        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
973
974        query(
975            r#"
976INSERT INTO transactions
977(id, mint_url, direction, unit, amount, fee, ys, timestamp, memo, metadata, quote_id, payment_request, payment_proof)
978VALUES
979(:id, :mint_url, :direction, :unit, :amount, :fee, :ys, :timestamp, :memo, :metadata, :quote_id, :payment_request, :payment_proof)
980ON CONFLICT(id) DO UPDATE SET
981    mint_url = excluded.mint_url,
982    direction = excluded.direction,
983    unit = excluded.unit,
984    amount = excluded.amount,
985    fee = excluded.fee,
986    timestamp = excluded.timestamp,
987    memo = excluded.memo,
988    metadata = excluded.metadata,
989    quote_id = excluded.quote_id,
990    payment_request = excluded.payment_request,
991    payment_proof = excluded.payment_proof
992;
993        "#,
994        )?
995        .bind("id", id.as_slice().to_vec())
996        .bind("mint_url", mint_url)
997        .bind("direction", direction)
998        .bind("unit", unit)
999        .bind("amount", amount)
1000        .bind("fee", fee)
1001        .bind("ys", ys)
1002        .bind("timestamp", transaction.timestamp as i64)
1003        .bind("memo", transaction.memo)
1004        .bind(
1005            "metadata",
1006            serde_json::to_string(&transaction.metadata).map_err(Error::from)?,
1007        )
1008        .bind("quote_id", transaction.quote_id)
1009        .bind("payment_request", transaction.payment_request)
1010        .bind("payment_proof", transaction.payment_proof)
1011        .execute(&*conn)
1012        .await?;
1013
1014        Ok(())
1015    }
1016
1017    #[instrument(skip(self))]
1018    async fn get_transaction(
1019        &self,
1020        transaction_id: TransactionId,
1021    ) -> Result<Option<Transaction>, Self::Err> {
1022        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1023        Ok(query(
1024            r#"
1025            SELECT
1026                mint_url,
1027                direction,
1028                unit,
1029                amount,
1030                fee,
1031                ys,
1032                timestamp,
1033                memo,
1034                metadata,
1035                quote_id,
1036                payment_request,
1037                payment_proof
1038            FROM
1039                transactions
1040            WHERE
1041                id = :id
1042            "#,
1043        )?
1044        .bind("id", transaction_id.as_slice().to_vec())
1045        .fetch_one(&*conn)
1046        .await?
1047        .map(sql_row_to_transaction)
1048        .transpose()?)
1049    }
1050
1051    #[instrument(skip(self))]
1052    async fn list_transactions(
1053        &self,
1054        mint_url: Option<MintUrl>,
1055        direction: Option<TransactionDirection>,
1056        unit: Option<CurrencyUnit>,
1057    ) -> Result<Vec<Transaction>, Self::Err> {
1058        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1059
1060        Ok(query(
1061            r#"
1062            SELECT
1063                mint_url,
1064                direction,
1065                unit,
1066                amount,
1067                fee,
1068                ys,
1069                timestamp,
1070                memo,
1071                metadata,
1072                quote_id,
1073                payment_request,
1074                payment_proof
1075            FROM
1076                transactions
1077            "#,
1078        )?
1079        .fetch_all(&*conn)
1080        .await?
1081        .into_iter()
1082        .filter_map(|row| {
1083            // TODO: Avoid a table scan by passing the heavy lifting of checking to the DB engine
1084            let transaction = sql_row_to_transaction(row).ok()?;
1085            if transaction.matches_conditions(&mint_url, &direction, &unit) {
1086                Some(transaction)
1087            } else {
1088                None
1089            }
1090        })
1091        .collect::<Vec<_>>())
1092    }
1093
1094    #[instrument(skip(self))]
1095    async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), Self::Err> {
1096        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1097
1098        query(r#"DELETE FROM transactions WHERE id=:id"#)?
1099            .bind("id", transaction_id.as_slice().to_vec())
1100            .execute(&*conn)
1101            .await?;
1102
1103        Ok(())
1104    }
1105}
1106
1107fn sql_row_to_mint_info(row: Vec<Column>) -> Result<MintInfo, Error> {
1108    unpack_into!(
1109        let (
1110            name,
1111            pubkey,
1112            version,
1113            description,
1114            description_long,
1115            contact,
1116            nuts,
1117            icon_url,
1118            motd,
1119            urls,
1120            mint_time,
1121            tos_url
1122        ) = row
1123    );
1124
1125    Ok(MintInfo {
1126        name: column_as_nullable_string!(&name),
1127        pubkey: column_as_nullable_string!(&pubkey, |v| serde_json::from_str(v).ok(), |v| {
1128            serde_json::from_slice(v).ok()
1129        }),
1130        version: column_as_nullable_string!(&version).and_then(|v| serde_json::from_str(&v).ok()),
1131        description: column_as_nullable_string!(description),
1132        description_long: column_as_nullable_string!(description_long),
1133        contact: column_as_nullable_string!(contact, |v| serde_json::from_str(&v).ok()),
1134        nuts: column_as_nullable_string!(nuts, |v| serde_json::from_str(&v).ok())
1135            .unwrap_or_default(),
1136        urls: column_as_nullable_string!(urls, |v| serde_json::from_str(&v).ok()),
1137        icon_url: column_as_nullable_string!(icon_url),
1138        motd: column_as_nullable_string!(motd),
1139        time: column_as_nullable_number!(mint_time).map(|t| t),
1140        tos_url: column_as_nullable_string!(tos_url),
1141    })
1142}
1143
1144#[instrument(skip_all)]
1145fn sql_row_to_keyset(row: Vec<Column>) -> Result<KeySetInfo, Error> {
1146    unpack_into!(
1147        let (
1148            id,
1149            unit,
1150            active,
1151            input_fee_ppk,
1152            final_expiry
1153        ) = row
1154    );
1155
1156    Ok(KeySetInfo {
1157        id: column_as_string!(id, Id::from_str, Id::from_bytes),
1158        unit: column_as_string!(unit, CurrencyUnit::from_str),
1159        active: matches!(active, Column::Integer(1)),
1160        input_fee_ppk: column_as_nullable_number!(input_fee_ppk).unwrap_or_default(),
1161        final_expiry: column_as_nullable_number!(final_expiry),
1162    })
1163}
1164
1165fn sql_row_to_mint_quote(row: Vec<Column>) -> Result<MintQuote, Error> {
1166    unpack_into!(
1167        let (
1168            id,
1169            mint_url,
1170            amount,
1171            unit,
1172            request,
1173            state,
1174            expiry,
1175            secret_key,
1176            row_method,
1177            row_amount_minted,
1178            row_amount_paid
1179        ) = row
1180    );
1181
1182    let amount: Option<i64> = column_as_nullable_number!(amount);
1183
1184    let amount_paid: u64 = column_as_number!(row_amount_paid);
1185    let amount_minted: u64 = column_as_number!(row_amount_minted);
1186    let payment_method =
1187        PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1188
1189    Ok(MintQuote {
1190        id: column_as_string!(id),
1191        mint_url: column_as_string!(mint_url, MintUrl::from_str),
1192        amount: amount.and_then(Amount::from_i64),
1193        unit: column_as_string!(unit, CurrencyUnit::from_str),
1194        request: column_as_string!(request),
1195        state: column_as_string!(state, MintQuoteState::from_str),
1196        expiry: column_as_number!(expiry),
1197        secret_key: column_as_nullable_string!(secret_key)
1198            .map(|v| SecretKey::from_str(&v))
1199            .transpose()?,
1200        payment_method,
1201        amount_issued: amount_minted.into(),
1202        amount_paid: amount_paid.into(),
1203    })
1204}
1205
1206fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<wallet::MeltQuote, Error> {
1207    unpack_into!(
1208        let (
1209            id,
1210            unit,
1211            amount,
1212            request,
1213            fee_reserve,
1214            state,
1215            expiry,
1216            payment_preimage,
1217            row_method
1218        ) = row
1219    );
1220
1221    let amount: u64 = column_as_number!(amount);
1222    let fee_reserve: u64 = column_as_number!(fee_reserve);
1223
1224    let payment_method =
1225        PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1226
1227    Ok(wallet::MeltQuote {
1228        id: column_as_string!(id),
1229        amount: Amount::from(amount),
1230        unit: column_as_string!(unit, CurrencyUnit::from_str),
1231        request: column_as_string!(request),
1232        fee_reserve: Amount::from(fee_reserve),
1233        state: column_as_string!(state, MeltQuoteState::from_str),
1234        expiry: column_as_number!(expiry),
1235        payment_preimage: column_as_nullable_string!(payment_preimage),
1236        payment_method,
1237    })
1238}
1239
1240fn sql_row_to_proof_info(row: Vec<Column>) -> Result<ProofInfo, Error> {
1241    unpack_into!(
1242        let (
1243            amount,
1244            unit,
1245            keyset_id,
1246            secret,
1247            c,
1248            witness,
1249            dleq_e,
1250            dleq_s,
1251            dleq_r,
1252            y,
1253            mint_url,
1254            state,
1255            spending_condition
1256        ) = row
1257    );
1258
1259    let dleq = match (
1260        column_as_nullable_binary!(dleq_e),
1261        column_as_nullable_binary!(dleq_s),
1262        column_as_nullable_binary!(dleq_r),
1263    ) {
1264        (Some(e), Some(s), Some(r)) => {
1265            let e_key = SecretKey::from_slice(&e)?;
1266            let s_key = SecretKey::from_slice(&s)?;
1267            let r_key = SecretKey::from_slice(&r)?;
1268
1269            Some(ProofDleq::new(e_key, s_key, r_key))
1270        }
1271        _ => None,
1272    };
1273
1274    let amount: u64 = column_as_number!(amount);
1275    let proof = Proof {
1276        amount: Amount::from(amount),
1277        keyset_id: column_as_string!(keyset_id, Id::from_str),
1278        secret: column_as_string!(secret, Secret::from_str),
1279        witness: column_as_nullable_string!(witness, |v| { serde_json::from_str(&v).ok() }, |v| {
1280            serde_json::from_slice(&v).ok()
1281        }),
1282        c: column_as_string!(c, PublicKey::from_str, PublicKey::from_slice),
1283        dleq,
1284    };
1285
1286    Ok(ProofInfo {
1287        proof,
1288        y: column_as_string!(y, PublicKey::from_str, PublicKey::from_slice),
1289        mint_url: column_as_string!(mint_url, MintUrl::from_str),
1290        state: column_as_string!(state, State::from_str),
1291        spending_condition: column_as_nullable_string!(
1292            spending_condition,
1293            |r| { serde_json::from_str(&r).ok() },
1294            |r| { serde_json::from_slice(&r).ok() }
1295        ),
1296        unit: column_as_string!(unit, CurrencyUnit::from_str),
1297    })
1298}
1299
1300fn sql_row_to_transaction(row: Vec<Column>) -> Result<Transaction, Error> {
1301    unpack_into!(
1302        let (
1303            mint_url,
1304            direction,
1305            unit,
1306            amount,
1307            fee,
1308            ys,
1309            timestamp,
1310            memo,
1311            metadata,
1312            quote_id,
1313            payment_request,
1314            payment_proof
1315        ) = row
1316    );
1317
1318    let amount: u64 = column_as_number!(amount);
1319    let fee: u64 = column_as_number!(fee);
1320
1321    Ok(Transaction {
1322        mint_url: column_as_string!(mint_url, MintUrl::from_str),
1323        direction: column_as_string!(direction, TransactionDirection::from_str),
1324        unit: column_as_string!(unit, CurrencyUnit::from_str),
1325        amount: Amount::from(amount),
1326        fee: Amount::from(fee),
1327        ys: column_as_binary!(ys)
1328            .chunks(33)
1329            .map(PublicKey::from_slice)
1330            .collect::<Result<Vec<_>, _>>()?,
1331        timestamp: column_as_number!(timestamp),
1332        memo: column_as_nullable_string!(memo),
1333        metadata: column_as_nullable_string!(metadata, |v| serde_json::from_str(&v).ok(), |v| {
1334            serde_json::from_slice(&v).ok()
1335        })
1336        .unwrap_or_default(),
1337        quote_id: column_as_nullable_string!(quote_id),
1338        payment_request: column_as_nullable_string!(payment_request),
1339        payment_proof: column_as_nullable_string!(payment_proof),
1340    })
1341}