1use std::collections::HashMap;
4use std::fmt::Debug;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use bitcoin::bip32::DerivationPath;
10use cdk_common::database::{ConversionError, Error, WalletDatabase};
11use cdk_common::mint_url::MintUrl;
12use cdk_common::nuts::{MeltQuoteState, MintQuoteState};
13use cdk_common::secret::Secret;
14use cdk_common::util::unix_time;
15use cdk_common::wallet::{
16 self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
17};
18use cdk_common::{
19 database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod, Proof,
20 ProofDleq, PublicKey, SecretKey, SpendingConditions, State,
21};
22use tracing::instrument;
23use uuid::Uuid;
24
25use crate::common::migrate;
26use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
27use crate::pool::{DatabasePool, Pool, PooledResource};
28use crate::stmt::{query, Column};
29use crate::{
30 column_as_binary, column_as_nullable_binary, column_as_nullable_number,
31 column_as_nullable_string, column_as_number, column_as_string, unpack_into,
32};
33
34#[rustfmt::skip]
35mod migrations {
36 include!(concat!(env!("OUT_DIR"), "/migrations_wallet.rs"));
37}
38
39#[derive(Debug, Clone)]
41pub struct SQLWalletDatabase<RM>
42where
43 RM: DatabasePool + 'static,
44{
45 pool: Arc<Pool<RM>>,
46}
47
48impl<RM> SQLWalletDatabase<RM>
49where
50 RM: DatabasePool + 'static,
51{
52 pub async fn new<X>(db: X) -> Result<Self, Error>
54 where
55 X: Into<RM::Config>,
56 {
57 let pool = Pool::new(db.into());
58 Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
59
60 Ok(Self { pool })
61 }
62
63 async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
65 let tx = ConnectionWithTransaction::new(conn).await?;
66 migrate(&tx, RM::Connection::name(), migrations::MIGRATIONS).await?;
67 Self::add_keyset_u32(&tx).await?;
69 tx.commit().await?;
70
71 Ok(())
72 }
73
74 async fn add_keyset_u32<T>(conn: &T) -> Result<(), Error>
75 where
76 T: DatabaseExecutor,
77 {
78 let keys_without_u32: Vec<Vec<Column>> = query(
80 r#"
81 SELECT
82 id
83 FROM key
84 WHERE keyset_u32 IS NULL
85 "#,
86 )?
87 .fetch_all(conn)
88 .await?;
89
90 for row in keys_without_u32 {
91 unpack_into!(let (id) = row);
92 let id = column_as_string!(id);
93
94 if let Ok(id) = Id::from_str(&id) {
95 query(
96 r#"
97 UPDATE
98 key
99 SET keyset_u32 = :u32_keyset
100 WHERE id = :keyset_id
101 "#,
102 )?
103 .bind("u32_keyset", u32::from(id))
104 .bind("keyset_id", id.to_string())
105 .execute(conn)
106 .await?;
107 }
108 }
109
110 let keysets_without_u32: Vec<Vec<Column>> = query(
112 r#"
113 SELECT
114 id
115 FROM keyset
116 WHERE keyset_u32 IS NULL
117 "#,
118 )?
119 .fetch_all(conn)
120 .await?;
121
122 for row in keysets_without_u32 {
123 unpack_into!(let (id) = row);
124 let id = column_as_string!(id);
125
126 if let Ok(id) = Id::from_str(&id) {
127 query(
128 r#"
129 UPDATE
130 keyset
131 SET keyset_u32 = :u32_keyset
132 WHERE id = :keyset_id
133 "#,
134 )?
135 .bind("u32_keyset", u32::from(id))
136 .bind("keyset_id", id.to_string())
137 .execute(conn)
138 .await?;
139 }
140 }
141
142 Ok(())
143 }
144}
145
146#[async_trait]
147impl<RM> WalletDatabase<database::Error> for SQLWalletDatabase<RM>
148where
149 RM: DatabasePool + 'static,
150{
151 #[instrument(skip(self))]
152 async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
153 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
154
155 Ok(query(
156 r#"
157 SELECT
158 id,
159 unit,
160 amount,
161 request,
162 fee_reserve,
163 state,
164 expiry,
165 payment_preimage,
166 payment_method,
167 used_by_operation,
168 version,
169 mint_url
170 FROM
171 melt_quote
172 "#,
173 )?
174 .fetch_all(&*conn)
175 .await?
176 .into_iter()
177 .map(sql_row_to_melt_quote)
178 .collect::<Result<_, _>>()?)
179 }
180
181 #[instrument(skip(self))]
182 async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
183 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
184 Ok(query(
185 r#"
186 SELECT
187 name,
188 pubkey,
189 version,
190 description,
191 description_long,
192 contact,
193 nuts,
194 icon_url,
195 motd,
196 urls,
197 mint_time,
198 tos_url
199 FROM
200 mint
201 WHERE mint_url = :mint_url
202 "#,
203 )?
204 .bind("mint_url", mint_url.to_string())
205 .fetch_one(&*conn)
206 .await?
207 .map(sql_row_to_mint_info)
208 .transpose()?)
209 }
210
211 #[instrument(skip(self))]
212 async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
213 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
214 Ok(query(
215 r#"
216 SELECT
217 name,
218 pubkey,
219 version,
220 description,
221 description_long,
222 contact,
223 nuts,
224 icon_url,
225 motd,
226 urls,
227 mint_time,
228 tos_url,
229 mint_url
230 FROM
231 mint
232 "#,
233 )?
234 .fetch_all(&*conn)
235 .await?
236 .into_iter()
237 .map(|mut row| {
238 let url = column_as_string!(
239 row.pop().ok_or(ConversionError::MissingColumn(0, 1))?,
240 MintUrl::from_str
241 );
242
243 Ok((url, sql_row_to_mint_info(row).ok()))
244 })
245 .collect::<Result<HashMap<_, _>, Error>>()?)
246 }
247
248 #[instrument(skip(self))]
249 async fn get_mint_keysets(
250 &self,
251 mint_url: MintUrl,
252 ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
253 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
254
255 let keysets = query(
256 r#"
257 SELECT
258 id,
259 unit,
260 active,
261 input_fee_ppk,
262 final_expiry
263 FROM
264 keyset
265 WHERE mint_url = :mint_url
266 "#,
267 )?
268 .bind("mint_url", mint_url.to_string())
269 .fetch_all(&*conn)
270 .await?
271 .into_iter()
272 .map(sql_row_to_keyset)
273 .collect::<Result<Vec<_>, Error>>()?;
274
275 match keysets.is_empty() {
276 false => Ok(Some(keysets)),
277 true => Ok(None),
278 }
279 }
280
281 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
282 async fn get_keyset_by_id(
283 &self,
284 keyset_id: &Id,
285 ) -> Result<Option<KeySetInfo>, database::Error> {
286 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
287 query(
288 r#"
289 SELECT
290 id,
291 unit,
292 active,
293 input_fee_ppk,
294 final_expiry
295 FROM
296 keyset
297 WHERE id = :id
298 "#,
299 )?
300 .bind("id", keyset_id.to_string())
301 .fetch_one(&*conn)
302 .await?
303 .map(sql_row_to_keyset)
304 .transpose()
305 }
306
307 #[instrument(skip(self))]
308 async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
309 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
310 query(
311 r#"
312 SELECT
313 id,
314 mint_url,
315 amount,
316 unit,
317 request,
318 state,
319 expiry,
320 secret_key,
321 payment_method,
322 amount_issued,
323 amount_paid,
324 used_by_operation,
325 version
326 FROM
327 mint_quote
328 WHERE
329 id = :id
330 "#,
331 )?
332 .bind("id", quote_id.to_string())
333 .fetch_one(&*conn)
334 .await?
335 .map(sql_row_to_mint_quote)
336 .transpose()
337 }
338
339 #[instrument(skip(self))]
340 async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
341 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
342 Ok(query(
343 r#"
344 SELECT
345 id,
346 mint_url,
347 amount,
348 unit,
349 request,
350 state,
351 expiry,
352 secret_key,
353 payment_method,
354 amount_issued,
355 amount_paid,
356 used_by_operation,
357 version
358 FROM
359 mint_quote
360 "#,
361 )?
362 .fetch_all(&*conn)
363 .await?
364 .into_iter()
365 .map(sql_row_to_mint_quote)
366 .collect::<Result<_, _>>()?)
367 }
368
369 #[instrument(skip(self))]
370 async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
371 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
372 Ok(query(
373 r#"
374 SELECT
375 id,
376 mint_url,
377 amount,
378 unit,
379 request,
380 state,
381 expiry,
382 secret_key,
383 payment_method,
384 amount_issued,
385 amount_paid,
386 used_by_operation,
387 version
388 FROM
389 mint_quote
390 WHERE
391 amount_issued = 0
392 OR
393 payment_method = 'bolt12'
394 "#,
395 )?
396 .fetch_all(&*conn)
397 .await?
398 .into_iter()
399 .map(sql_row_to_mint_quote)
400 .collect::<Result<_, _>>()?)
401 }
402
403 #[instrument(skip(self))]
404 async fn get_melt_quote(
405 &self,
406 quote_id: &str,
407 ) -> Result<Option<wallet::MeltQuote>, database::Error> {
408 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
409 query(
410 r#"
411 SELECT
412 id,
413 unit,
414 amount,
415 request,
416 fee_reserve,
417 state,
418 expiry,
419 payment_preimage,
420 payment_method,
421 used_by_operation,
422 version,
423 mint_url
424 FROM
425 melt_quote
426 WHERE
427 id=:id
428 "#,
429 )?
430 .bind("id", quote_id.to_owned())
431 .fetch_one(&*conn)
432 .await?
433 .map(sql_row_to_melt_quote)
434 .transpose()
435 }
436
437 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
438 async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
439 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
440 query(
441 r#"
442 SELECT
443 keys
444 FROM key
445 WHERE id = :id
446 "#,
447 )?
448 .bind("id", keyset_id.to_string())
449 .pluck(&*conn)
450 .await?
451 .map(|keys| {
452 let keys = column_as_string!(keys);
453 serde_json::from_str(&keys).map_err(Error::from)
454 })
455 .transpose()
456 }
457
458 #[instrument(skip(self, state, spending_conditions))]
459 async fn get_proofs(
460 &self,
461 mint_url: Option<MintUrl>,
462 unit: Option<CurrencyUnit>,
463 state: Option<Vec<State>>,
464 spending_conditions: Option<Vec<SpendingConditions>>,
465 ) -> Result<Vec<ProofInfo>, database::Error> {
466 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
467 Ok(query(
468 r#"
469 SELECT
470 amount,
471 unit,
472 keyset_id,
473 secret,
474 c,
475 witness,
476 dleq_e,
477 dleq_s,
478 dleq_r,
479 y,
480 mint_url,
481 state,
482 spending_condition,
483 used_by_operation,
484 created_by_operation,
485 p2pk_e
486 FROM proof
487 "#,
488 )?
489 .fetch_all(&*conn)
490 .await?
491 .into_iter()
492 .filter_map(|row| {
493 let row = sql_row_to_proof_info(row).ok()?;
494
495 if row.matches_conditions(&mint_url, &unit, &state, &spending_conditions) {
496 Some(row)
497 } else {
498 None
499 }
500 })
501 .collect::<Vec<_>>())
502 }
503
504 #[instrument(skip(self, ys))]
505 async fn get_proofs_by_ys(
506 &self,
507 ys: Vec<PublicKey>,
508 ) -> Result<Vec<ProofInfo>, database::Error> {
509 if ys.is_empty() {
510 return Ok(Vec::new());
511 }
512
513 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
514 Ok(query(
515 r#"
516 SELECT
517 amount,
518 unit,
519 keyset_id,
520 secret,
521 c,
522 witness,
523 dleq_e,
524 dleq_s,
525 dleq_r,
526 y,
527 mint_url,
528 state,
529 spending_condition,
530 used_by_operation,
531 created_by_operation,
532 p2pk_e
533 FROM proof
534 WHERE y IN (:ys)
535 "#,
536 )?
537 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
538 .fetch_all(&*conn)
539 .await?
540 .into_iter()
541 .filter_map(|row| sql_row_to_proof_info(row).ok())
542 .collect::<Vec<_>>())
543 }
544
545 async fn get_balance(
546 &self,
547 mint_url: Option<MintUrl>,
548 unit: Option<CurrencyUnit>,
549 states: Option<Vec<State>>,
550 ) -> Result<u64, database::Error> {
551 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
552
553 let mut query_str = "SELECT COALESCE(SUM(amount), 0) as total FROM proof".to_string();
554 let mut where_clauses = Vec::new();
555 let states = states
556 .unwrap_or_default()
557 .into_iter()
558 .map(|x| x.to_string())
559 .collect::<Vec<_>>();
560
561 if mint_url.is_some() {
562 where_clauses.push("mint_url = :mint_url");
563 }
564 if unit.is_some() {
565 where_clauses.push("unit = :unit");
566 }
567 if !states.is_empty() {
568 where_clauses.push("state IN (:states)");
569 }
570
571 if !where_clauses.is_empty() {
572 query_str.push_str(" WHERE ");
573 query_str.push_str(&where_clauses.join(" AND "));
574 }
575
576 let mut q = query(&query_str)?;
577
578 if let Some(ref mint_url) = mint_url {
579 q = q.bind("mint_url", mint_url.to_string());
580 }
581 if let Some(ref unit) = unit {
582 q = q.bind("unit", unit.to_string());
583 }
584
585 if !states.is_empty() {
586 q = q.bind_vec("states", states);
587 }
588
589 let balance = q
590 .pluck(&*conn)
591 .await?
592 .map(|n| {
593 match n {
595 crate::stmt::Column::Integer(i) => Ok(i as u64),
596 crate::stmt::Column::Real(f) => Ok(f as u64),
597 _ => Err(Error::Database(Box::new(std::io::Error::new(
598 std::io::ErrorKind::InvalidData,
599 "Invalid balance type",
600 )))),
601 }
602 })
603 .transpose()?
604 .unwrap_or(0);
605
606 Ok(balance)
607 }
608
609 #[instrument(skip(self))]
610 async fn get_transaction(
611 &self,
612 transaction_id: TransactionId,
613 ) -> Result<Option<Transaction>, database::Error> {
614 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
615 Ok(query(
616 r#"
617 SELECT
618 mint_url,
619 direction,
620 unit,
621 amount,
622 fee,
623 ys,
624 timestamp,
625 memo,
626 metadata,
627 quote_id,
628 payment_request,
629 payment_proof,
630 payment_method,
631 saga_id
632 FROM
633 transactions
634 WHERE
635 id = :id
636 "#,
637 )?
638 .bind("id", transaction_id.as_slice().to_vec())
639 .fetch_one(&*conn)
640 .await?
641 .map(sql_row_to_transaction)
642 .transpose()?)
643 }
644
645 #[instrument(skip(self))]
646 async fn list_transactions(
647 &self,
648 mint_url: Option<MintUrl>,
649 direction: Option<TransactionDirection>,
650 unit: Option<CurrencyUnit>,
651 ) -> Result<Vec<Transaction>, database::Error> {
652 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
653
654 Ok(query(
655 r#"
656 SELECT
657 mint_url,
658 direction,
659 unit,
660 amount,
661 fee,
662 ys,
663 timestamp,
664 memo,
665 metadata,
666 quote_id,
667 payment_request,
668 payment_proof,
669 payment_method,
670 saga_id
671 FROM
672 transactions
673 "#,
674 )?
675 .fetch_all(&*conn)
676 .await?
677 .into_iter()
678 .filter_map(|row| {
679 let transaction = sql_row_to_transaction(row).ok()?;
681 if transaction.matches_conditions(&mint_url, &direction, &unit) {
682 Some(transaction)
683 } else {
684 None
685 }
686 })
687 .collect::<Vec<_>>())
688 }
689
690 async fn update_proofs(
691 &self,
692 added: Vec<ProofInfo>,
693 removed_ys: Vec<PublicKey>,
694 ) -> Result<(), database::Error> {
695 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
696 let tx = ConnectionWithTransaction::new(conn).await?;
697
698 for proof in added {
699 query(
700 r#"
701 INSERT INTO proof
702 (y, mint_url, state, spending_condition, unit, amount, keyset_id, secret, c, witness, dleq_e, dleq_s, dleq_r, used_by_operation, created_by_operation, p2pk_e)
703 VALUES
704 (:y, :mint_url, :state, :spending_condition, :unit, :amount, :keyset_id, :secret, :c, :witness, :dleq_e, :dleq_s, :dleq_r, :used_by_operation, :created_by_operation, :p2pk_e)
705 ON CONFLICT(y) DO UPDATE SET
706 mint_url = excluded.mint_url,
707 state = excluded.state,
708 spending_condition = excluded.spending_condition,
709 unit = excluded.unit,
710 amount = excluded.amount,
711 keyset_id = excluded.keyset_id,
712 secret = excluded.secret,
713 c = excluded.c,
714 witness = excluded.witness,
715 dleq_e = excluded.dleq_e,
716 dleq_s = excluded.dleq_s,
717 dleq_r = excluded.dleq_r,
718 used_by_operation = excluded.used_by_operation,
719 created_by_operation = excluded.created_by_operation,
720 p2pk_e = excluded.p2pk_e
721 ;
722 "#,
723 )?
724 .bind("y", proof.y.to_bytes().to_vec())
725 .bind("mint_url", proof.mint_url.to_string())
726 .bind("state", proof.state.to_string())
727 .bind(
728 "spending_condition",
729 proof
730 .spending_condition
731 .map(|s| serde_json::to_string(&s).ok()),
732 )
733 .bind("unit", proof.unit.to_string())
734 .bind("amount", u64::from(proof.proof.amount) as i64)
735 .bind("keyset_id", proof.proof.keyset_id.to_string())
736 .bind("secret", proof.proof.secret.to_string())
737 .bind("c", proof.proof.c.to_bytes().to_vec())
738 .bind(
739 "witness",
740 proof
741 .proof
742 .witness
743 .and_then(|w| serde_json::to_string(&w).ok()),
744 )
745 .bind(
746 "dleq_e",
747 proof.proof.dleq.as_ref().map(|dleq| dleq.e.to_secret_bytes().to_vec()),
748 )
749 .bind(
750 "dleq_s",
751 proof.proof.dleq.as_ref().map(|dleq| dleq.s.to_secret_bytes().to_vec()),
752 )
753 .bind(
754 "dleq_r",
755 proof.proof.dleq.as_ref().map(|dleq| dleq.r.to_secret_bytes().to_vec()),
756 )
757 .bind("used_by_operation", proof.used_by_operation.map(|id| id.to_string()))
758 .bind("created_by_operation", proof.created_by_operation.map(|id| id.to_string()))
759 .bind(
760 "p2pk_e",
761 proof
762 .proof
763 .p2pk_e
764 .as_ref()
765 .map(|pk| pk.to_bytes().to_vec()),
766 )
767 .execute(&tx)
768 .await?;
769 }
770
771 if !removed_ys.is_empty() {
772 query(r#"DELETE FROM proof WHERE y IN (:ys)"#)?
773 .bind_vec(
774 "ys",
775 removed_ys.iter().map(|y| y.to_bytes().to_vec()).collect(),
776 )
777 .execute(&tx)
778 .await?;
779 }
780
781 tx.commit().await?;
782
783 Ok(())
784 }
785
786 #[instrument(skip(self))]
787 async fn update_proofs_state(
788 &self,
789 ys: Vec<PublicKey>,
790 state: State,
791 ) -> Result<(), database::Error> {
792 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
793
794 query("UPDATE proof SET state = :state WHERE y IN (:ys)")?
795 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
796 .bind("state", state.to_string())
797 .execute(&*conn)
798 .await?;
799
800 Ok(())
801 }
802
803 #[instrument(skip(self))]
804 async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
805 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
806
807 let mint_url = transaction.mint_url.to_string();
808 let direction = transaction.direction.to_string();
809 let unit = transaction.unit.to_string();
810 let amount = u64::from(transaction.amount) as i64;
811 let fee = u64::from(transaction.fee) as i64;
812 let ys = transaction
813 .ys
814 .iter()
815 .flat_map(|y| y.to_bytes().to_vec())
816 .collect::<Vec<_>>();
817
818 let id = transaction.id();
819
820 query(
821 r#"
822 INSERT INTO transactions
823 (id, mint_url, direction, unit, amount, fee, ys, timestamp, memo, metadata, quote_id, payment_request, payment_proof, payment_method, saga_id)
824 VALUES
825 (:id, :mint_url, :direction, :unit, :amount, :fee, :ys, :timestamp, :memo, :metadata, :quote_id, :payment_request, :payment_proof, :payment_method, :saga_id)
826 ON CONFLICT(id) DO UPDATE SET
827 mint_url = excluded.mint_url,
828 direction = excluded.direction,
829 unit = excluded.unit,
830 amount = excluded.amount,
831 fee = excluded.fee,
832 timestamp = excluded.timestamp,
833 memo = excluded.memo,
834 metadata = excluded.metadata,
835 quote_id = excluded.quote_id,
836 payment_request = excluded.payment_request,
837 payment_proof = excluded.payment_proof,
838 payment_method = excluded.payment_method,
839 saga_id = excluded.saga_id
840 ;
841 "#,
842 )?
843 .bind("id", id.as_slice().to_vec())
844 .bind("mint_url", mint_url)
845 .bind("direction", direction)
846 .bind("unit", unit)
847 .bind("amount", amount)
848 .bind("fee", fee)
849 .bind("ys", ys)
850 .bind("timestamp", transaction.timestamp as i64)
851 .bind("memo", transaction.memo)
852 .bind(
853 "metadata",
854 serde_json::to_string(&transaction.metadata).map_err(Error::from)?,
855 )
856 .bind("quote_id", transaction.quote_id)
857 .bind("payment_request", transaction.payment_request)
858 .bind("payment_proof", transaction.payment_proof)
859 .bind("payment_method", transaction.payment_method.map(|pm| pm.to_string()))
860 .bind("saga_id", transaction.saga_id.map(|id| id.to_string()))
861 .execute(&*conn)
862 .await?;
863
864 Ok(())
865 }
866
867 #[instrument(skip(self))]
868 async fn update_mint_url(
869 &self,
870 old_mint_url: MintUrl,
871 new_mint_url: MintUrl,
872 ) -> Result<(), database::Error> {
873 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
874 let tx = ConnectionWithTransaction::new(conn).await?;
875 let tables = ["mint_quote", "proof"];
876
877 for table in &tables {
878 query(&format!(
879 r#"
880 UPDATE {table}
881 SET mint_url = :new_mint_url
882 WHERE mint_url = :old_mint_url
883 "#
884 ))?
885 .bind("new_mint_url", new_mint_url.to_string())
886 .bind("old_mint_url", old_mint_url.to_string())
887 .execute(&tx)
888 .await?;
889 }
890
891 tx.commit().await?;
892
893 Ok(())
894 }
895
896 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
897 async fn increment_keyset_counter(
898 &self,
899 keyset_id: &Id,
900 count: u32,
901 ) -> Result<u32, database::Error> {
902 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
903
904 let new_counter = query(
905 r#"
906 INSERT INTO keyset_counter (keyset_id, counter)
907 VALUES (:keyset_id, :count)
908 ON CONFLICT(keyset_id) DO UPDATE SET
909 counter = keyset_counter.counter + :count
910 RETURNING counter
911 "#,
912 )?
913 .bind("keyset_id", keyset_id.to_string())
914 .bind("count", count)
915 .pluck(&*conn)
916 .await?
917 .map(|n| Ok::<_, Error>(column_as_number!(n)))
918 .transpose()?
919 .ok_or_else(|| Error::Internal("Counter update returned no value".to_owned()))?;
920
921 Ok(new_counter)
922 }
923
924 #[instrument(skip(self, mint_info))]
925 async fn add_mint(
926 &self,
927 mint_url: MintUrl,
928 mint_info: Option<MintInfo>,
929 ) -> Result<(), database::Error> {
930 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
931
932 let (
933 name,
934 pubkey,
935 version,
936 description,
937 description_long,
938 contact,
939 nuts,
940 icon_url,
941 urls,
942 motd,
943 time,
944 tos_url,
945 ) = match mint_info {
946 Some(mint_info) => {
947 let MintInfo {
948 name,
949 pubkey,
950 version,
951 description,
952 description_long,
953 contact,
954 nuts,
955 icon_url,
956 urls,
957 motd,
958 time,
959 tos_url,
960 } = mint_info;
961
962 (
963 name,
964 pubkey.map(|p| p.to_bytes().to_vec()),
965 version.map(|v| serde_json::to_string(&v).ok()),
966 description,
967 description_long,
968 contact.map(|c| serde_json::to_string(&c).ok()),
969 serde_json::to_string(&nuts).ok(),
970 icon_url,
971 urls.map(|c| serde_json::to_string(&c).ok()),
972 motd,
973 time,
974 tos_url,
975 )
976 }
977 None => (
978 None, None, None, None, None, None, None, None, None, None, None, None,
979 ),
980 };
981
982 query(
983 r#"
984 INSERT INTO mint
985 (
986 mint_url, name, pubkey, version, description, description_long,
987 contact, nuts, icon_url, urls, motd, mint_time, tos_url
988 )
989 VALUES
990 (
991 :mint_url, :name, :pubkey, :version, :description, :description_long,
992 :contact, :nuts, :icon_url, :urls, :motd, :mint_time, :tos_url
993 )
994 ON CONFLICT(mint_url) DO UPDATE SET
995 name = excluded.name,
996 pubkey = excluded.pubkey,
997 version = excluded.version,
998 description = excluded.description,
999 description_long = excluded.description_long,
1000 contact = excluded.contact,
1001 nuts = excluded.nuts,
1002 icon_url = excluded.icon_url,
1003 urls = excluded.urls,
1004 motd = excluded.motd,
1005 mint_time = excluded.mint_time,
1006 tos_url = excluded.tos_url
1007 ;
1008 "#,
1009 )?
1010 .bind("mint_url", mint_url.to_string())
1011 .bind("name", name)
1012 .bind("pubkey", pubkey)
1013 .bind("version", version)
1014 .bind("description", description)
1015 .bind("description_long", description_long)
1016 .bind("contact", contact)
1017 .bind("nuts", nuts)
1018 .bind("icon_url", icon_url)
1019 .bind("urls", urls)
1020 .bind("motd", motd)
1021 .bind("mint_time", time.map(|v| v as i64))
1022 .bind("tos_url", tos_url)
1023 .execute(&*conn)
1024 .await?;
1025
1026 Ok(())
1027 }
1028
1029 #[instrument(skip(self))]
1030 async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
1031 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1032
1033 query(r#"DELETE FROM mint WHERE mint_url=:mint_url"#)?
1034 .bind("mint_url", mint_url.to_string())
1035 .execute(&*conn)
1036 .await?;
1037
1038 Ok(())
1039 }
1040
1041 #[instrument(skip(self, keysets))]
1042 async fn add_mint_keysets(
1043 &self,
1044 mint_url: MintUrl,
1045 keysets: Vec<KeySetInfo>,
1046 ) -> Result<(), database::Error> {
1047 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1048 let tx = ConnectionWithTransaction::new(conn).await?;
1049
1050 for keyset in keysets {
1051 query(
1052 r#"
1053 INSERT INTO keyset
1054 (mint_url, id, unit, active, input_fee_ppk, final_expiry, keyset_u32)
1055 VALUES
1056 (:mint_url, :id, :unit, :active, :input_fee_ppk, :final_expiry, :keyset_u32)
1057 ON CONFLICT(id) DO UPDATE SET
1058 active = excluded.active,
1059 input_fee_ppk = excluded.input_fee_ppk
1060 "#,
1061 )?
1062 .bind("mint_url", mint_url.to_string())
1063 .bind("id", keyset.id.to_string())
1064 .bind("unit", keyset.unit.to_string())
1065 .bind("active", keyset.active)
1066 .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
1067 .bind("final_expiry", keyset.final_expiry.map(|v| v as i64))
1068 .bind("keyset_u32", u32::from(keyset.id))
1069 .execute(&tx)
1070 .await?;
1071 }
1072
1073 tx.commit().await?;
1074
1075 Ok(())
1076 }
1077
1078 #[instrument(skip_all)]
1079 async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
1080 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1081
1082 let expected_version = quote.version;
1083 let new_version = expected_version.wrapping_add(1);
1084
1085 let rows_affected = query(
1086 r#"
1087 INSERT INTO mint_quote
1088 (id, mint_url, amount, unit, request, state, expiry, secret_key, payment_method, amount_issued, amount_paid, version, used_by_operation)
1089 VALUES
1090 (:id, :mint_url, :amount, :unit, :request, :state, :expiry, :secret_key, :payment_method, :amount_issued, :amount_paid, :version, :used_by_operation)
1091 ON CONFLICT(id) DO UPDATE SET
1092 mint_url = excluded.mint_url,
1093 amount = excluded.amount,
1094 unit = excluded.unit,
1095 request = excluded.request,
1096 state = excluded.state,
1097 expiry = excluded.expiry,
1098 secret_key = excluded.secret_key,
1099 payment_method = excluded.payment_method,
1100 amount_issued = excluded.amount_issued,
1101 amount_paid = excluded.amount_paid,
1102 version = :new_version,
1103 used_by_operation = excluded.used_by_operation
1104 WHERE mint_quote.version = :expected_version
1105 ;
1106 "#,
1107 )?
1108 .bind("id", quote.id.to_string())
1109 .bind("mint_url", quote.mint_url.to_string())
1110 .bind("amount", quote.amount.map(|a| a.to_i64()))
1111 .bind("unit", quote.unit.to_string())
1112 .bind("request", quote.request)
1113 .bind("state", quote.state.to_string())
1114 .bind("expiry", quote.expiry as i64)
1115 .bind("secret_key", quote.secret_key.map(|p| p.to_string()))
1116 .bind("payment_method", quote.payment_method.to_string())
1117 .bind("amount_issued", quote.amount_issued.to_i64())
1118 .bind("amount_paid", quote.amount_paid.to_i64())
1119 .bind("version", quote.version as i64)
1120 .bind("new_version", new_version as i64)
1121 .bind("expected_version", expected_version as i64)
1122 .bind("used_by_operation", quote.used_by_operation)
1123 .execute(&*conn).await?;
1124
1125 if rows_affected == 0 {
1126 return Err(database::Error::ConcurrentUpdate);
1127 }
1128
1129 Ok(())
1130 }
1131
1132 #[instrument(skip(self))]
1133 async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1134 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1135
1136 query(r#"DELETE FROM mint_quote WHERE id=:id"#)?
1137 .bind("id", quote_id.to_string())
1138 .execute(&*conn)
1139 .await?;
1140
1141 Ok(())
1142 }
1143
1144 #[instrument(skip_all)]
1145 async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
1146 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1147
1148 let expected_version = quote.version;
1149 let new_version = expected_version.wrapping_add(1);
1150
1151 let rows_affected = query(
1152 r#"
1153 INSERT INTO melt_quote
1154 (id, unit, amount, request, fee_reserve, state, expiry, payment_method, version, mint_url, used_by_operation)
1155 VALUES
1156 (:id, :unit, :amount, :request, :fee_reserve, :state, :expiry, :payment_method, :version, :mint_url, :used_by_operation)
1157 ON CONFLICT(id) DO UPDATE SET
1158 unit = excluded.unit,
1159 amount = excluded.amount,
1160 request = excluded.request,
1161 fee_reserve = excluded.fee_reserve,
1162 state = excluded.state,
1163 expiry = excluded.expiry,
1164 payment_method = excluded.payment_method,
1165 version = :new_version,
1166 mint_url = excluded.mint_url,
1167 used_by_operation = excluded.used_by_operation
1168 WHERE melt_quote.version = :expected_version
1169 ;
1170 "#,
1171 )?
1172 .bind("id", quote.id.to_string())
1173 .bind("unit", quote.unit.to_string())
1174 .bind("amount", u64::from(quote.amount) as i64)
1175 .bind("request", quote.request)
1176 .bind("fee_reserve", u64::from(quote.fee_reserve) as i64)
1177 .bind("state", quote.state.to_string())
1178 .bind("expiry", quote.expiry as i64)
1179 .bind("payment_method", quote.payment_method.to_string())
1180 .bind("version", quote.version as i64)
1181 .bind("new_version", new_version as i64)
1182 .bind("expected_version", expected_version as i64)
1183 .bind("mint_url", quote.mint_url.map(|m| m.to_string()))
1184 .bind("used_by_operation", quote.used_by_operation)
1185 .execute(&*conn)
1186 .await?;
1187
1188 if rows_affected == 0 {
1189 return Err(database::Error::ConcurrentUpdate);
1190 }
1191
1192 Ok(())
1193 }
1194
1195 #[instrument(skip(self))]
1196 async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1197 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1198
1199 query(r#"DELETE FROM melt_quote WHERE id=:id"#)?
1200 .bind("id", quote_id.to_owned())
1201 .execute(&*conn)
1202 .await?;
1203
1204 Ok(())
1205 }
1206
1207 #[instrument(skip_all)]
1208 async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
1209 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1210
1211 keyset.verify_id()?;
1212
1213 query(
1214 r#"
1215 INSERT INTO key
1216 (id, keys, keyset_u32)
1217 VALUES
1218 (:id, :keys, :keyset_u32)
1219 "#,
1220 )?
1221 .bind("id", keyset.id.to_string())
1222 .bind(
1223 "keys",
1224 serde_json::to_string(&keyset.keys).map_err(Error::from)?,
1225 )
1226 .bind("keyset_u32", u32::from(keyset.id))
1227 .execute(&*conn)
1228 .await?;
1229
1230 Ok(())
1231 }
1232
1233 #[instrument(skip(self))]
1234 async fn remove_keys(&self, id: &Id) -> Result<(), database::Error> {
1235 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1236
1237 query(r#"DELETE FROM key WHERE id = :id"#)?
1238 .bind("id", id.to_string())
1239 .execute(&*conn)
1240 .await?;
1241
1242 Ok(())
1243 }
1244
1245 #[instrument(skip(self))]
1246 async fn remove_transaction(
1247 &self,
1248 transaction_id: TransactionId,
1249 ) -> Result<(), database::Error> {
1250 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1251
1252 query(r#"DELETE FROM transactions WHERE id=:id"#)?
1253 .bind("id", transaction_id.as_slice().to_vec())
1254 .execute(&*conn)
1255 .await?;
1256
1257 Ok(())
1258 }
1259
1260 #[instrument(skip(self))]
1261 async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
1262 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1263
1264 let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1265 Error::Database(Box::new(std::io::Error::new(
1266 std::io::ErrorKind::InvalidData,
1267 format!("Failed to serialize saga state: {}", e),
1268 )))
1269 })?;
1270
1271 let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1272 Error::Database(Box::new(std::io::Error::new(
1273 std::io::ErrorKind::InvalidData,
1274 format!("Failed to serialize saga data: {}", e),
1275 )))
1276 })?;
1277
1278 query(
1279 r#"
1280 INSERT INTO wallet_sagas
1281 (id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version)
1282 VALUES
1283 (:id, :kind, :state, :amount, :mint_url, :unit, :quote_id, :created_at, :updated_at, :data, :version)
1284 "#,
1285 )?
1286 .bind("id", saga.id.to_string())
1287 .bind("kind", saga.kind.to_string())
1288 .bind("state", state_json)
1289 .bind("amount", u64::from(saga.amount) as i64)
1290 .bind("mint_url", saga.mint_url.to_string())
1291 .bind("unit", saga.unit.to_string())
1292 .bind("quote_id", saga.quote_id)
1293 .bind("created_at", saga.created_at as i64)
1294 .bind("updated_at", saga.updated_at as i64)
1295 .bind("data", data_json)
1296 .bind("version", saga.version as i64)
1297 .execute(&*conn)
1298 .await?;
1299
1300 Ok(())
1301 }
1302
1303 #[instrument(skip(self))]
1304 async fn get_saga(
1305 &self,
1306 id: &uuid::Uuid,
1307 ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1308 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1309
1310 let rows = query(
1311 r#"
1312 SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1313 FROM wallet_sagas
1314 WHERE id = :id
1315 "#,
1316 )?
1317 .bind("id", id.to_string())
1318 .fetch_all(&*conn)
1319 .await?;
1320
1321 match rows.into_iter().next() {
1322 Some(row) => Ok(Some(sql_row_to_wallet_saga(row)?)),
1323 None => Ok(None),
1324 }
1325 }
1326
1327 #[instrument(skip(self))]
1328 async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1329 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1330
1331 let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1332 Error::Database(Box::new(std::io::Error::new(
1333 std::io::ErrorKind::InvalidData,
1334 format!("Failed to serialize saga state: {}", e),
1335 )))
1336 })?;
1337
1338 let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1339 Error::Database(Box::new(std::io::Error::new(
1340 std::io::ErrorKind::InvalidData,
1341 format!("Failed to serialize saga data: {}", e),
1342 )))
1343 })?;
1344
1345 let expected_version = saga.version.saturating_sub(1);
1349
1350 let rows_affected = query(
1351 r#"
1352 UPDATE wallet_sagas
1353 SET kind = :kind, state = :state, amount = :amount, mint_url = :mint_url,
1354 unit = :unit, quote_id = :quote_id, updated_at = :updated_at, data = :data,
1355 version = :new_version
1356 WHERE id = :id AND version = :expected_version
1357 "#,
1358 )?
1359 .bind("id", saga.id.to_string())
1360 .bind("kind", saga.kind.to_string())
1361 .bind("state", state_json)
1362 .bind("amount", u64::from(saga.amount) as i64)
1363 .bind("mint_url", saga.mint_url.to_string())
1364 .bind("unit", saga.unit.to_string())
1365 .bind("quote_id", saga.quote_id)
1366 .bind("updated_at", saga.updated_at as i64)
1367 .bind("data", data_json)
1368 .bind("new_version", saga.version as i64)
1369 .bind("expected_version", expected_version as i64)
1370 .execute(&*conn)
1371 .await?;
1372
1373 Ok(rows_affected > 0)
1375 }
1376
1377 #[instrument(skip(self))]
1378 async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1379 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1380
1381 query(r#"DELETE FROM wallet_sagas WHERE id = :id"#)?
1382 .bind("id", id.to_string())
1383 .execute(&*conn)
1384 .await?;
1385
1386 Ok(())
1387 }
1388
1389 #[instrument(skip(self))]
1390 async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1391 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1392
1393 let rows = query(
1394 r#"
1395 SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1396 FROM wallet_sagas
1397 ORDER BY created_at ASC
1398 "#,
1399 )?
1400 .fetch_all(&*conn)
1401 .await?;
1402
1403 rows.into_iter().map(sql_row_to_wallet_saga).collect()
1404 }
1405
1406 #[instrument(skip(self))]
1407 async fn reserve_proofs(
1408 &self,
1409 ys: Vec<PublicKey>,
1410 operation_id: &uuid::Uuid,
1411 ) -> Result<(), database::Error> {
1412 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1413
1414 for y in ys {
1415 let rows_affected = query(
1416 r#"
1417 UPDATE proof
1418 SET state = 'RESERVED', used_by_operation = :operation_id
1419 WHERE y = :y AND state = 'UNSPENT'
1420 "#,
1421 )?
1422 .bind("y", y.to_bytes().to_vec())
1423 .bind("operation_id", operation_id.to_string())
1424 .execute(&*conn)
1425 .await?;
1426
1427 if rows_affected == 0 {
1428 return Err(database::Error::ProofNotUnspent);
1429 }
1430 }
1431
1432 Ok(())
1433 }
1434
1435 #[instrument(skip(self))]
1436 async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1437 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1438
1439 query(
1440 r#"
1441 UPDATE proof
1442 SET state = 'UNSPENT', used_by_operation = NULL
1443 WHERE used_by_operation = :operation_id
1444 "#,
1445 )?
1446 .bind("operation_id", operation_id.to_string())
1447 .execute(&*conn)
1448 .await?;
1449
1450 Ok(())
1451 }
1452
1453 #[instrument(skip(self))]
1454 async fn get_reserved_proofs(
1455 &self,
1456 operation_id: &uuid::Uuid,
1457 ) -> Result<Vec<ProofInfo>, database::Error> {
1458 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1459
1460 let rows = query(
1461 r#"
1462 SELECT
1463 amount,
1464 unit,
1465 keyset_id,
1466 secret,
1467 c,
1468 witness,
1469 dleq_e,
1470 dleq_s,
1471 dleq_r,
1472 y,
1473 mint_url,
1474 state,
1475 spending_condition,
1476 used_by_operation,
1477 created_by_operation,
1478 p2pk_e
1479 FROM proof
1480 WHERE used_by_operation = :operation_id
1481 "#,
1482 )?
1483 .bind("operation_id", operation_id.to_string())
1484 .fetch_all(&*conn)
1485 .await?;
1486
1487 rows.into_iter().map(sql_row_to_proof_info).collect()
1488 }
1489
1490 #[instrument(skip(self))]
1491 async fn reserve_melt_quote(
1492 &self,
1493 quote_id: &str,
1494 operation_id: &uuid::Uuid,
1495 ) -> Result<(), database::Error> {
1496 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1497
1498 let rows_affected = query(
1499 r#"
1500 UPDATE melt_quote
1501 SET used_by_operation = :operation_id
1502 WHERE id = :quote_id AND used_by_operation IS NULL
1503 "#,
1504 )?
1505 .bind("operation_id", operation_id.to_string())
1506 .bind("quote_id", quote_id)
1507 .execute(&*conn)
1508 .await?;
1509
1510 if rows_affected == 0 {
1511 let exists = query(
1513 r#"
1514 SELECT 1 FROM melt_quote WHERE id = :quote_id
1515 "#,
1516 )?
1517 .bind("quote_id", quote_id)
1518 .fetch_one(&*conn)
1519 .await?;
1520
1521 if exists.is_none() {
1522 return Err(database::Error::UnknownQuote);
1523 }
1524 return Err(database::Error::QuoteAlreadyInUse);
1525 }
1526
1527 Ok(())
1528 }
1529
1530 #[instrument(skip(self))]
1531 async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1532 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1533
1534 query(
1535 r#"
1536 UPDATE melt_quote
1537 SET used_by_operation = NULL
1538 WHERE used_by_operation = :operation_id
1539 "#,
1540 )?
1541 .bind("operation_id", operation_id.to_string())
1542 .execute(&*conn)
1543 .await?;
1544
1545 Ok(())
1546 }
1547
1548 #[instrument(skip(self))]
1549 async fn reserve_mint_quote(
1550 &self,
1551 quote_id: &str,
1552 operation_id: &uuid::Uuid,
1553 ) -> Result<(), database::Error> {
1554 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1555
1556 let rows_affected = query(
1557 r#"
1558 UPDATE mint_quote
1559 SET used_by_operation = :operation_id
1560 WHERE id = :quote_id AND used_by_operation IS NULL
1561 "#,
1562 )?
1563 .bind("operation_id", operation_id.to_string())
1564 .bind("quote_id", quote_id)
1565 .execute(&*conn)
1566 .await?;
1567
1568 if rows_affected == 0 {
1569 let exists = query(
1571 r#"
1572 SELECT 1 FROM mint_quote WHERE id = :quote_id
1573 "#,
1574 )?
1575 .bind("quote_id", quote_id)
1576 .fetch_one(&*conn)
1577 .await?;
1578
1579 if exists.is_none() {
1580 return Err(database::Error::UnknownQuote);
1581 }
1582 return Err(database::Error::QuoteAlreadyInUse);
1583 }
1584
1585 Ok(())
1586 }
1587
1588 #[instrument(skip(self))]
1589 async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1590 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1591
1592 query(
1593 r#"
1594 UPDATE mint_quote
1595 SET used_by_operation = NULL
1596 WHERE used_by_operation = :operation_id
1597 "#,
1598 )?
1599 .bind("operation_id", operation_id.to_string())
1600 .execute(&*conn)
1601 .await?;
1602
1603 Ok(())
1604 }
1605
1606 async fn kv_read(
1607 &self,
1608 primary_namespace: &str,
1609 secondary_namespace: &str,
1610 key: &str,
1611 ) -> Result<Option<Vec<u8>>, database::Error> {
1612 crate::keyvalue::kv_read(&self.pool, primary_namespace, secondary_namespace, key).await
1613 }
1614
1615 async fn kv_list(
1616 &self,
1617 primary_namespace: &str,
1618 secondary_namespace: &str,
1619 ) -> Result<Vec<String>, database::Error> {
1620 crate::keyvalue::kv_list(&self.pool, primary_namespace, secondary_namespace).await
1621 }
1622
1623 async fn kv_write(
1624 &self,
1625 primary_namespace: &str,
1626 secondary_namespace: &str,
1627 key: &str,
1628 value: &[u8],
1629 ) -> Result<(), database::Error> {
1630 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1631 crate::keyvalue::kv_write_standalone(
1632 &*conn,
1633 primary_namespace,
1634 secondary_namespace,
1635 key,
1636 value,
1637 )
1638 .await?;
1639 Ok(())
1640 }
1641
1642 async fn kv_remove(
1643 &self,
1644 primary_namespace: &str,
1645 secondary_namespace: &str,
1646 key: &str,
1647 ) -> Result<(), database::Error> {
1648 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1649 crate::keyvalue::kv_remove_standalone(&*conn, primary_namespace, secondary_namespace, key)
1650 .await?;
1651 Ok(())
1652 }
1653
1654 #[instrument(skip(self))]
1657 async fn add_p2pk_key(
1658 &self,
1659 pubkey: &PublicKey,
1660 derivation_path: DerivationPath,
1661 derivation_index: u32,
1662 ) -> Result<(), Error> {
1663 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1664 let query_str = r#"
1665 INSERT INTO p2pk_signing_key (pubkey, derivation_index, derivation_path, created_time)
1666 VALUES (:pubkey, :derivation_index, :derivation_path, :created_time)
1667 "#
1668 .to_string();
1669
1670 query(&query_str)?
1671 .bind("pubkey", pubkey.to_bytes().to_vec())
1672 .bind("derivation_index", derivation_index)
1673 .bind("derivation_path", derivation_path.to_string())
1674 .bind("created_time", unix_time() as i64)
1675 .execute(&*conn)
1676 .await?;
1677
1678 Ok(())
1679 }
1680
1681 #[instrument(skip(self))]
1682 async fn get_p2pk_key(
1683 &self,
1684 pubkey: &PublicKey,
1685 ) -> Result<Option<wallet::P2PKSigningKey>, Error> {
1686 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1687 let query_str = r#"SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key WHERE pubkey = :pubkey"#.to_string();
1688
1689 query(&query_str)?
1690 .bind("pubkey", pubkey.to_bytes().to_vec())
1691 .fetch_one(&*conn)
1692 .await?
1693 .map(sql_row_to_p2pk_signing_key)
1694 .transpose()
1695 }
1696
1697 #[instrument(skip(self))]
1698 async fn list_p2pk_keys(&self) -> Result<Vec<wallet::P2PKSigningKey>, Error> {
1699 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1700 let query_str = r#"
1701 SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key ORDER BY derivation_index DESC
1702 "#.to_string();
1703
1704 Ok(query(&query_str)?
1705 .fetch_all(&*conn)
1706 .await?
1707 .into_iter()
1708 .filter_map(|row| {
1709 let row = sql_row_to_p2pk_signing_key(row).ok()?;
1710
1711 Some(row)
1712 })
1713 .collect::<Vec<wallet::P2PKSigningKey>>())
1714 }
1715
1716 #[instrument(skip(self))]
1717 async fn latest_p2pk(&self) -> Result<Option<wallet::P2PKSigningKey>, Error> {
1718 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1719 let query_str = r#"
1720 SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key ORDER BY derivation_index DESC LIMIT 1
1721 "#.to_string();
1722
1723 query(&query_str)?
1724 .fetch_one(&*conn)
1725 .await?
1726 .map(sql_row_to_p2pk_signing_key)
1727 .transpose()
1728 }
1729}
1730
1731fn sql_row_to_mint_info(row: Vec<Column>) -> Result<MintInfo, Error> {
1732 unpack_into!(
1733 let (
1734 name,
1735 pubkey,
1736 version,
1737 description,
1738 description_long,
1739 contact,
1740 nuts,
1741 icon_url,
1742 motd,
1743 urls,
1744 mint_time,
1745 tos_url
1746 ) = row
1747 );
1748
1749 Ok(MintInfo {
1750 name: column_as_nullable_string!(&name),
1751 pubkey: column_as_nullable_string!(&pubkey, |v| serde_json::from_str(v).ok(), |v| {
1752 serde_json::from_slice(v).ok()
1753 }),
1754 version: column_as_nullable_string!(&version).and_then(|v| serde_json::from_str(&v).ok()),
1755 description: column_as_nullable_string!(description),
1756 description_long: column_as_nullable_string!(description_long),
1757 contact: column_as_nullable_string!(contact, |v| serde_json::from_str(&v).ok()),
1758 nuts: column_as_nullable_string!(nuts, |v| serde_json::from_str(&v).ok())
1759 .unwrap_or_default(),
1760 urls: column_as_nullable_string!(urls, |v| serde_json::from_str(&v).ok()),
1761 icon_url: column_as_nullable_string!(icon_url),
1762 motd: column_as_nullable_string!(motd),
1763 time: column_as_nullable_number!(mint_time).map(|t| t),
1764 tos_url: column_as_nullable_string!(tos_url),
1765 })
1766}
1767
1768#[instrument(skip_all)]
1769fn sql_row_to_keyset(row: Vec<Column>) -> Result<KeySetInfo, Error> {
1770 unpack_into!(
1771 let (
1772 id,
1773 unit,
1774 active,
1775 input_fee_ppk,
1776 final_expiry
1777 ) = row
1778 );
1779
1780 Ok(KeySetInfo {
1781 id: column_as_string!(id, Id::from_str, Id::from_bytes),
1782 unit: column_as_string!(unit, CurrencyUnit::from_str),
1783 active: matches!(active, Column::Integer(1)),
1784 input_fee_ppk: column_as_nullable_number!(input_fee_ppk).unwrap_or(0),
1785 final_expiry: column_as_nullable_number!(final_expiry),
1786 })
1787}
1788
1789fn sql_row_to_mint_quote(row: Vec<Column>) -> Result<MintQuote, Error> {
1790 unpack_into!(
1791 let (
1792 id,
1793 mint_url,
1794 amount,
1795 unit,
1796 request,
1797 state,
1798 expiry,
1799 secret_key,
1800 row_method,
1801 row_amount_minted,
1802 row_amount_paid,
1803 used_by_operation,
1804 version
1805 ) = row
1806 );
1807
1808 let amount: Option<i64> = column_as_nullable_number!(amount);
1809
1810 let amount_paid: u64 = column_as_number!(row_amount_paid);
1811 let amount_minted: u64 = column_as_number!(row_amount_minted);
1812 let expiry_val: u64 = column_as_number!(expiry);
1813 let version_val: u32 = column_as_number!(version);
1814 let payment_method =
1815 PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1816
1817 Ok(MintQuote {
1818 id: column_as_string!(id),
1819 mint_url: column_as_string!(mint_url, MintUrl::from_str),
1820 amount: amount.and_then(Amount::from_i64),
1821 unit: column_as_string!(unit, CurrencyUnit::from_str),
1822 request: column_as_string!(request),
1823 state: column_as_string!(state, MintQuoteState::from_str),
1824 expiry: expiry_val,
1825 secret_key: column_as_nullable_string!(secret_key, |s| SecretKey::from_str(&s).ok()),
1826 payment_method,
1827 amount_issued: Amount::from(amount_minted),
1828 amount_paid: Amount::from(amount_paid),
1829 used_by_operation: column_as_nullable_string!(used_by_operation),
1830 version: version_val,
1831 })
1832}
1833
1834fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<wallet::MeltQuote, Error> {
1835 unpack_into!(
1836 let (
1837 id,
1838 unit,
1839 amount,
1840 request,
1841 fee_reserve,
1842 state,
1843 expiry,
1844 payment_preimage,
1845 row_method,
1846 used_by_operation,
1847 version,
1848 mint_url
1849 ) = row
1850 );
1851
1852 let payment_method =
1853 PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1854
1855 let amount_val: u64 = column_as_number!(amount);
1856 let fee_reserve_val: u64 = column_as_number!(fee_reserve);
1857 let expiry_val: u64 = column_as_number!(expiry);
1858 let version_val: u32 = column_as_number!(version);
1859
1860 Ok(wallet::MeltQuote {
1861 id: column_as_string!(id),
1862 mint_url: column_as_nullable_string!(mint_url, |s| MintUrl::from_str(&s).ok()),
1863 unit: column_as_string!(unit, CurrencyUnit::from_str),
1864 amount: Amount::from(amount_val),
1865 request: column_as_string!(request),
1866 fee_reserve: Amount::from(fee_reserve_val),
1867 state: column_as_string!(state, MeltQuoteState::from_str),
1868 expiry: expiry_val,
1869 payment_preimage: column_as_nullable_string!(payment_preimage),
1870 payment_method,
1871 used_by_operation: column_as_nullable_string!(used_by_operation),
1872 version: version_val,
1873 })
1874}
1875
1876fn sql_row_to_proof_info(row: Vec<Column>) -> Result<ProofInfo, Error> {
1877 unpack_into!(
1878 let (
1879 amount,
1880 unit,
1881 keyset_id,
1882 secret,
1883 c,
1884 witness,
1885 dleq_e,
1886 dleq_s,
1887 dleq_r,
1888 y,
1889 mint_url,
1890 state,
1891 spending_condition,
1892 used_by_operation,
1893 created_by_operation,
1894 p2pk_e
1895 ) = row
1896 );
1897
1898 let dleq = match (
1899 column_as_nullable_binary!(dleq_e),
1900 column_as_nullable_binary!(dleq_s),
1901 column_as_nullable_binary!(dleq_r),
1902 ) {
1903 (Some(e), Some(s), Some(r)) => {
1904 let e_key = SecretKey::from_slice(&e)?;
1905 let s_key = SecretKey::from_slice(&s)?;
1906 let r_key = SecretKey::from_slice(&r)?;
1907
1908 Some(ProofDleq::new(e_key, s_key, r_key))
1909 }
1910 _ => None,
1911 };
1912
1913 let amount: u64 = column_as_number!(amount);
1914 let proof = Proof {
1915 amount: Amount::from(amount),
1916 keyset_id: column_as_string!(keyset_id, Id::from_str),
1917 secret: column_as_string!(secret, Secret::from_str),
1918 witness: column_as_nullable_string!(witness, |v| { serde_json::from_str(&v).ok() }, |v| {
1919 serde_json::from_slice(&v).ok()
1920 }),
1921 c: column_as_string!(c, PublicKey::from_str, PublicKey::from_slice),
1922 dleq,
1923 p2pk_e: column_as_nullable_binary!(p2pk_e)
1924 .map(|bytes| PublicKey::from_slice(&bytes))
1925 .transpose()?,
1926 };
1927
1928 let used_by_operation =
1929 column_as_nullable_string!(used_by_operation).and_then(|id| Uuid::from_str(&id).ok());
1930 let created_by_operation =
1931 column_as_nullable_string!(created_by_operation).and_then(|id| Uuid::from_str(&id).ok());
1932
1933 Ok(ProofInfo {
1934 proof,
1935 y: column_as_string!(y, PublicKey::from_str, PublicKey::from_slice),
1936 mint_url: column_as_string!(mint_url, MintUrl::from_str),
1937 state: column_as_string!(state, State::from_str),
1938 spending_condition: column_as_nullable_string!(
1939 spending_condition,
1940 |r| { serde_json::from_str(&r).ok() },
1941 |r| { serde_json::from_slice(&r).ok() }
1942 ),
1943 unit: column_as_string!(unit, CurrencyUnit::from_str),
1944 used_by_operation,
1945 created_by_operation,
1946 })
1947}
1948
1949fn sql_row_to_wallet_saga(row: Vec<Column>) -> Result<wallet::WalletSaga, Error> {
1950 unpack_into!(
1951 let (
1952 id,
1953 kind,
1954 state,
1955 amount,
1956 mint_url,
1957 unit,
1958 quote_id,
1959 created_at,
1960 updated_at,
1961 data,
1962 version
1963 ) = row
1964 );
1965
1966 let id_str: String = column_as_string!(id);
1967 let id = uuid::Uuid::parse_str(&id_str).map_err(|e| {
1968 Error::Database(Box::new(std::io::Error::new(
1969 std::io::ErrorKind::InvalidData,
1970 format!("Invalid UUID: {}", e),
1971 )))
1972 })?;
1973 let kind_str: String = column_as_string!(kind);
1974 let state_json: String = column_as_string!(state);
1975 let amount: u64 = column_as_number!(amount);
1976 let mint_url: MintUrl = column_as_string!(mint_url, MintUrl::from_str);
1977 let unit: CurrencyUnit = column_as_string!(unit, CurrencyUnit::from_str);
1978 let quote_id: Option<String> = column_as_nullable_string!(quote_id);
1979 let created_at: u64 = column_as_number!(created_at);
1980 let updated_at: u64 = column_as_number!(updated_at);
1981 let data_json: String = column_as_string!(data);
1982 let version: u32 = column_as_number!(version);
1983
1984 let kind = wallet::OperationKind::from_str(&kind_str).map_err(|_| {
1985 Error::Database(Box::new(std::io::Error::new(
1986 std::io::ErrorKind::InvalidData,
1987 format!("Invalid operation kind: {}", kind_str),
1988 )))
1989 })?;
1990 let state: wallet::WalletSagaState = serde_json::from_str(&state_json).map_err(|e| {
1991 Error::Database(Box::new(std::io::Error::new(
1992 std::io::ErrorKind::InvalidData,
1993 format!("Failed to deserialize saga state: {}", e),
1994 )))
1995 })?;
1996 let data: wallet::OperationData = serde_json::from_str(&data_json).map_err(|e| {
1997 Error::Database(Box::new(std::io::Error::new(
1998 std::io::ErrorKind::InvalidData,
1999 format!("Failed to deserialize saga data: {}", e),
2000 )))
2001 })?;
2002
2003 Ok(wallet::WalletSaga {
2004 id,
2005 kind,
2006 state,
2007 amount: Amount::from(amount),
2008 mint_url,
2009 unit,
2010 quote_id,
2011 created_at,
2012 updated_at,
2013 data,
2014 version,
2015 })
2016}
2017
2018fn sql_row_to_transaction(row: Vec<Column>) -> Result<Transaction, Error> {
2019 unpack_into!(
2020 let (
2021 mint_url,
2022 direction,
2023 unit,
2024 amount,
2025 fee,
2026 ys,
2027 timestamp,
2028 memo,
2029 metadata,
2030 quote_id,
2031 payment_request,
2032 payment_proof,
2033 payment_method,
2034 saga_id
2035 ) = row
2036 );
2037
2038 let amount: u64 = column_as_number!(amount);
2039 let fee: u64 = column_as_number!(fee);
2040
2041 let saga_id: Option<Uuid> = column_as_nullable_string!(saga_id)
2042 .map(|id| Uuid::from_str(&id).ok())
2043 .flatten();
2044
2045 Ok(Transaction {
2046 mint_url: column_as_string!(mint_url, MintUrl::from_str),
2047 direction: column_as_string!(direction, TransactionDirection::from_str),
2048 unit: column_as_string!(unit, CurrencyUnit::from_str),
2049 amount: Amount::from(amount),
2050 fee: Amount::from(fee),
2051 ys: column_as_binary!(ys)
2052 .chunks(33)
2053 .map(PublicKey::from_slice)
2054 .collect::<Result<Vec<_>, _>>()?,
2055 timestamp: column_as_number!(timestamp),
2056 memo: column_as_nullable_string!(memo),
2057 metadata: column_as_nullable_string!(metadata, |v| serde_json::from_str(&v).ok(), |v| {
2058 serde_json::from_slice(&v).ok()
2059 })
2060 .unwrap_or_default(),
2061 quote_id: column_as_nullable_string!(quote_id),
2062 payment_request: column_as_nullable_string!(payment_request),
2063 payment_proof: column_as_nullable_string!(payment_proof),
2064 payment_method: column_as_nullable_string!(payment_method)
2065 .map(|v| PaymentMethod::from_str(&v))
2066 .transpose()
2067 .map_err(Error::from)?,
2068 saga_id,
2069 })
2070}
2071
2072fn sql_row_to_p2pk_signing_key(row: Vec<Column>) -> Result<wallet::P2PKSigningKey, Error> {
2073 unpack_into!(
2074 let (
2075 pubkey,
2076 derivation_index,
2077 derivation_path,
2078 created_time
2079 ) = row
2080 );
2081
2082 Ok(wallet::P2PKSigningKey {
2083 pubkey: column_as_string!(pubkey, PublicKey::from_str, PublicKey::from_slice),
2084 derivation_index: column_as_number!(derivation_index),
2085 derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
2086 created_time: column_as_number!(created_time),
2087 })
2088}