1use std::collections::HashMap;
4use std::fmt::Debug;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use bitcoin::bip32::DerivationPath;
10use cdk_common::database::{ConversionError, Error, WalletDatabase};
11use cdk_common::mint_url::MintUrl;
12use cdk_common::nuts::{MeltQuoteState, MintQuoteState};
13use cdk_common::secret::Secret;
14use cdk_common::util::unix_time;
15use cdk_common::wallet::{
16 self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
17};
18use cdk_common::{
19 database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod, Proof,
20 ProofDleq, PublicKey, SecretKey, SpendingConditions, State,
21};
22use tracing::instrument;
23use uuid::Uuid;
24
25use crate::common::migrate;
26use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
27use crate::pool::{DatabasePool, Pool, PooledResource};
28use crate::stmt::{query, Column};
29use crate::{
30 column_as_binary, column_as_nullable_binary, column_as_nullable_number,
31 column_as_nullable_string, column_as_number, column_as_string, unpack_into,
32};
33
34#[rustfmt::skip]
35mod migrations {
36 include!(concat!(env!("OUT_DIR"), "/migrations_wallet.rs"));
37}
38
39#[derive(Debug, Clone)]
41pub struct SQLWalletDatabase<RM>
42where
43 RM: DatabasePool + 'static,
44{
45 pool: Arc<Pool<RM>>,
46}
47
48impl<RM> SQLWalletDatabase<RM>
49where
50 RM: DatabasePool + 'static,
51{
52 pub async fn new<X>(db: X) -> Result<Self, Error>
54 where
55 X: Into<RM::Config>,
56 {
57 let pool = Pool::new(db.into());
58 Self::migrate(pool.get().await.map_err(|e| Error::Database(Box::new(e)))?).await?;
59
60 Ok(Self { pool })
61 }
62
63 async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
65 let tx = ConnectionWithTransaction::new(conn).await?;
66 migrate(&tx, RM::Connection::name(), migrations::MIGRATIONS).await?;
67 Self::add_keyset_u32(&tx).await?;
69 tx.commit().await?;
70
71 Ok(())
72 }
73
74 async fn add_keyset_u32<T>(conn: &T) -> Result<(), Error>
75 where
76 T: DatabaseExecutor,
77 {
78 let keys_without_u32: Vec<Vec<Column>> = query(
80 r#"
81 SELECT
82 id
83 FROM key
84 WHERE keyset_u32 IS NULL
85 "#,
86 )?
87 .fetch_all(conn)
88 .await?;
89
90 for row in keys_without_u32 {
91 unpack_into!(let (id) = row);
92 let id = column_as_string!(id);
93
94 if let Ok(id) = Id::from_str(&id) {
95 query(
96 r#"
97 UPDATE
98 key
99 SET keyset_u32 = :u32_keyset
100 WHERE id = :keyset_id
101 "#,
102 )?
103 .bind("u32_keyset", u32::from(id))
104 .bind("keyset_id", id.to_string())
105 .execute(conn)
106 .await?;
107 }
108 }
109
110 let keysets_without_u32: Vec<Vec<Column>> = query(
112 r#"
113 SELECT
114 id
115 FROM keyset
116 WHERE keyset_u32 IS NULL
117 "#,
118 )?
119 .fetch_all(conn)
120 .await?;
121
122 for row in keysets_without_u32 {
123 unpack_into!(let (id) = row);
124 let id = column_as_string!(id);
125
126 if let Ok(id) = Id::from_str(&id) {
127 query(
128 r#"
129 UPDATE
130 keyset
131 SET keyset_u32 = :u32_keyset
132 WHERE id = :keyset_id
133 "#,
134 )?
135 .bind("u32_keyset", u32::from(id))
136 .bind("keyset_id", id.to_string())
137 .execute(conn)
138 .await?;
139 }
140 }
141
142 Ok(())
143 }
144}
145
146#[async_trait]
147impl<RM> WalletDatabase<database::Error> for SQLWalletDatabase<RM>
148where
149 RM: DatabasePool + 'static,
150{
151 #[instrument(skip(self))]
152 async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
153 let conn = self
154 .pool
155 .get()
156 .await
157 .map_err(|e| Error::Database(Box::new(e)))?;
158
159 Ok(query(
160 r#"
161 SELECT
162 id,
163 unit,
164 amount,
165 request,
166 fee_reserve,
167 state,
168 expiry,
169 payment_proof,
170 payment_method,
171 estimated_blocks,
172 fee_index,
173 used_by_operation,
174 version,
175 mint_url
176 FROM
177 melt_quote
178 "#,
179 )?
180 .fetch_all(&*conn)
181 .await?
182 .into_iter()
183 .map(sql_row_to_melt_quote)
184 .collect::<Result<_, _>>()?)
185 }
186
187 #[instrument(skip(self))]
188 async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
189 let conn = self
190 .pool
191 .get()
192 .await
193 .map_err(|e| Error::Database(Box::new(e)))?;
194 Ok(query(
195 r#"
196 SELECT
197 name,
198 pubkey,
199 version,
200 description,
201 description_long,
202 contact,
203 nuts,
204 icon_url,
205 motd,
206 urls,
207 mint_time,
208 tos_url
209 FROM
210 mint
211 WHERE mint_url = :mint_url
212 "#,
213 )?
214 .bind("mint_url", mint_url.to_string())
215 .fetch_one(&*conn)
216 .await?
217 .map(sql_row_to_mint_info)
218 .transpose()?)
219 }
220
221 #[instrument(skip(self))]
222 async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
223 let conn = self
224 .pool
225 .get()
226 .await
227 .map_err(|e| Error::Database(Box::new(e)))?;
228 Ok(query(
229 r#"
230 SELECT
231 name,
232 pubkey,
233 version,
234 description,
235 description_long,
236 contact,
237 nuts,
238 icon_url,
239 motd,
240 urls,
241 mint_time,
242 tos_url,
243 mint_url
244 FROM
245 mint
246 "#,
247 )?
248 .fetch_all(&*conn)
249 .await?
250 .into_iter()
251 .map(|mut row| {
252 let url = column_as_string!(
253 row.pop().ok_or(ConversionError::MissingColumn(0, 1))?,
254 MintUrl::from_str
255 );
256
257 Ok((url, sql_row_to_mint_info(row).ok()))
258 })
259 .collect::<Result<HashMap<_, _>, Error>>()?)
260 }
261
262 #[instrument(skip(self))]
263 async fn get_mint_keysets(
264 &self,
265 mint_url: MintUrl,
266 ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
267 let conn = self
268 .pool
269 .get()
270 .await
271 .map_err(|e| Error::Database(Box::new(e)))?;
272
273 let keysets = query(
274 r#"
275 SELECT
276 id,
277 unit,
278 active,
279 input_fee_ppk,
280 final_expiry
281 FROM
282 keyset
283 WHERE mint_url = :mint_url
284 "#,
285 )?
286 .bind("mint_url", mint_url.to_string())
287 .fetch_all(&*conn)
288 .await?
289 .into_iter()
290 .map(sql_row_to_keyset)
291 .collect::<Result<Vec<_>, Error>>()?;
292
293 match keysets.is_empty() {
294 false => Ok(Some(keysets)),
295 true => Ok(None),
296 }
297 }
298
299 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
300 async fn get_keyset_by_id(
301 &self,
302 keyset_id: &Id,
303 ) -> Result<Option<KeySetInfo>, database::Error> {
304 let conn = self
305 .pool
306 .get()
307 .await
308 .map_err(|e| Error::Database(Box::new(e)))?;
309 query(
310 r#"
311 SELECT
312 id,
313 unit,
314 active,
315 input_fee_ppk,
316 final_expiry
317 FROM
318 keyset
319 WHERE id = :id
320 "#,
321 )?
322 .bind("id", keyset_id.to_string())
323 .fetch_one(&*conn)
324 .await?
325 .map(sql_row_to_keyset)
326 .transpose()
327 }
328
329 #[instrument(skip(self))]
330 async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
331 let conn = self
332 .pool
333 .get()
334 .await
335 .map_err(|e| Error::Database(Box::new(e)))?;
336 query(
337 r#"
338 SELECT
339 id,
340 mint_url,
341 amount,
342 unit,
343 request,
344 state,
345 expiry,
346 secret_key,
347 payment_method,
348 amount_issued,
349 amount_paid,
350 estimated_blocks,
351 used_by_operation,
352 version
353 FROM
354 mint_quote
355 WHERE
356 id = :id
357 "#,
358 )?
359 .bind("id", quote_id.to_string())
360 .fetch_one(&*conn)
361 .await?
362 .map(sql_row_to_mint_quote)
363 .transpose()
364 }
365
366 #[instrument(skip(self))]
367 async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
368 let conn = self
369 .pool
370 .get()
371 .await
372 .map_err(|e| Error::Database(Box::new(e)))?;
373 Ok(query(
374 r#"
375 SELECT
376 id,
377 mint_url,
378 amount,
379 unit,
380 request,
381 state,
382 expiry,
383 secret_key,
384 payment_method,
385 amount_issued,
386 amount_paid,
387 estimated_blocks,
388 used_by_operation,
389 version
390 FROM
391 mint_quote
392 "#,
393 )?
394 .fetch_all(&*conn)
395 .await?
396 .into_iter()
397 .map(sql_row_to_mint_quote)
398 .collect::<Result<_, _>>()?)
399 }
400
401 #[instrument(skip(self))]
402 async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
403 let conn = self
404 .pool
405 .get()
406 .await
407 .map_err(|e| Error::Database(Box::new(e)))?;
408 Ok(query(
409 r#"
410 SELECT
411 id,
412 mint_url,
413 amount,
414 unit,
415 request,
416 state,
417 expiry,
418 secret_key,
419 payment_method,
420 amount_issued,
421 amount_paid,
422 estimated_blocks,
423 used_by_operation,
424 version
425 FROM
426 mint_quote
427 WHERE
428 amount_issued = 0
429 OR
430 payment_method = 'bolt12'
431 "#,
432 )?
433 .fetch_all(&*conn)
434 .await?
435 .into_iter()
436 .map(sql_row_to_mint_quote)
437 .collect::<Result<_, _>>()?)
438 }
439
440 #[instrument(skip(self))]
441 async fn get_melt_quote(
442 &self,
443 quote_id: &str,
444 ) -> Result<Option<wallet::MeltQuote>, database::Error> {
445 let conn = self
446 .pool
447 .get()
448 .await
449 .map_err(|e| Error::Database(Box::new(e)))?;
450 query(
451 r#"
452 SELECT
453 id,
454 unit,
455 amount,
456 request,
457 fee_reserve,
458 state,
459 expiry,
460 payment_proof,
461 payment_method,
462 estimated_blocks,
463 fee_index,
464 used_by_operation,
465 version,
466 mint_url
467 FROM
468 melt_quote
469 WHERE
470 id=:id
471 "#,
472 )?
473 .bind("id", quote_id.to_owned())
474 .fetch_one(&*conn)
475 .await?
476 .map(sql_row_to_melt_quote)
477 .transpose()
478 }
479
480 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
481 async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
482 let conn = self
483 .pool
484 .get()
485 .await
486 .map_err(|e| Error::Database(Box::new(e)))?;
487 query(
488 r#"
489 SELECT
490 keys
491 FROM key
492 WHERE id = :id
493 "#,
494 )?
495 .bind("id", keyset_id.to_string())
496 .pluck(&*conn)
497 .await?
498 .map(|keys| {
499 let keys = column_as_string!(keys);
500 serde_json::from_str(&keys).map_err(Error::from)
501 })
502 .transpose()
503 }
504
505 #[instrument(skip(self, state, spending_conditions))]
506 async fn get_proofs(
507 &self,
508 mint_url: Option<MintUrl>,
509 unit: Option<CurrencyUnit>,
510 state: Option<Vec<State>>,
511 spending_conditions: Option<Vec<SpendingConditions>>,
512 ) -> Result<Vec<ProofInfo>, database::Error> {
513 let conn = self
514 .pool
515 .get()
516 .await
517 .map_err(|e| Error::Database(Box::new(e)))?;
518 Ok(query(
519 r#"
520 SELECT
521 amount,
522 unit,
523 keyset_id,
524 secret,
525 c,
526 witness,
527 dleq_e,
528 dleq_s,
529 dleq_r,
530 y,
531 mint_url,
532 state,
533 spending_condition,
534 used_by_operation,
535 created_by_operation,
536 p2pk_e
537 FROM proof
538 "#,
539 )?
540 .fetch_all(&*conn)
541 .await?
542 .into_iter()
543 .filter_map(|row| {
544 let row = sql_row_to_proof_info(row).ok()?;
545
546 if row.matches_conditions(&mint_url, &unit, &state, &spending_conditions) {
547 Some(row)
548 } else {
549 None
550 }
551 })
552 .collect::<Vec<_>>())
553 }
554
555 #[instrument(skip(self, ys))]
556 async fn get_proofs_by_ys(
557 &self,
558 ys: Vec<PublicKey>,
559 ) -> Result<Vec<ProofInfo>, database::Error> {
560 let conn = self
561 .pool
562 .get()
563 .await
564 .map_err(|e| Error::Database(Box::new(e)))?;
565 Ok(query(
566 r#"
567 SELECT
568 amount,
569 unit,
570 keyset_id,
571 secret,
572 c,
573 witness,
574 dleq_e,
575 dleq_s,
576 dleq_r,
577 y,
578 mint_url,
579 state,
580 spending_condition,
581 used_by_operation,
582 created_by_operation,
583 p2pk_e
584 FROM proof
585 WHERE y IN (:ys)
586 "#,
587 )?
588 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
589 .fetch_all(&*conn)
590 .await?
591 .into_iter()
592 .filter_map(|row| sql_row_to_proof_info(row).ok())
593 .collect::<Vec<_>>())
594 }
595
596 async fn get_balance(
597 &self,
598 mint_url: Option<MintUrl>,
599 unit: Option<CurrencyUnit>,
600 states: Option<Vec<State>>,
601 ) -> Result<u64, database::Error> {
602 let conn = self
603 .pool
604 .get()
605 .await
606 .map_err(|e| Error::Database(Box::new(e)))?;
607
608 let mut query_str = "SELECT COALESCE(SUM(amount), 0) as total FROM proof".to_string();
609 let mut where_clauses = Vec::new();
610 let states = states
611 .unwrap_or_default()
612 .into_iter()
613 .map(|x| x.to_string())
614 .collect::<Vec<_>>();
615
616 if mint_url.is_some() {
617 where_clauses.push("mint_url = :mint_url");
618 }
619 if unit.is_some() {
620 where_clauses.push("unit = :unit");
621 }
622 if !states.is_empty() {
623 where_clauses.push("state IN (:states)");
624 }
625
626 if !where_clauses.is_empty() {
627 query_str.push_str(" WHERE ");
628 query_str.push_str(&where_clauses.join(" AND "));
629 }
630
631 let mut q = query(&query_str)?;
632
633 if let Some(ref mint_url) = mint_url {
634 q = q.bind("mint_url", mint_url.to_string());
635 }
636 if let Some(ref unit) = unit {
637 q = q.bind("unit", unit.to_string());
638 }
639
640 if !states.is_empty() {
641 q = q.bind_vec("states", states)?;
642 }
643
644 let balance = q
645 .pluck(&*conn)
646 .await?
647 .map(|n| {
648 match n {
650 crate::stmt::Column::Integer(i) => Ok(i as u64),
651 crate::stmt::Column::Real(f) => Ok(f as u64),
652 _ => Err(Error::Database(Box::new(std::io::Error::new(
653 std::io::ErrorKind::InvalidData,
654 "Invalid balance type",
655 )))),
656 }
657 })
658 .transpose()?
659 .unwrap_or(0);
660
661 Ok(balance)
662 }
663
664 #[instrument(skip(self))]
665 async fn get_transaction(
666 &self,
667 transaction_id: TransactionId,
668 ) -> Result<Option<Transaction>, database::Error> {
669 let conn = self
670 .pool
671 .get()
672 .await
673 .map_err(|e| Error::Database(Box::new(e)))?;
674 Ok(query(
675 r#"
676 SELECT
677 mint_url,
678 direction,
679 unit,
680 amount,
681 fee,
682 ys,
683 timestamp,
684 memo,
685 metadata,
686 quote_id,
687 payment_request,
688 payment_proof,
689 payment_method,
690 saga_id
691 FROM
692 transactions
693 WHERE
694 id = :id
695 "#,
696 )?
697 .bind("id", transaction_id.as_slice().to_vec())
698 .fetch_one(&*conn)
699 .await?
700 .map(sql_row_to_transaction)
701 .transpose()?)
702 }
703
704 #[instrument(skip(self))]
705 async fn list_transactions(
706 &self,
707 mint_url: Option<MintUrl>,
708 direction: Option<TransactionDirection>,
709 unit: Option<CurrencyUnit>,
710 ) -> Result<Vec<Transaction>, database::Error> {
711 let conn = self
712 .pool
713 .get()
714 .await
715 .map_err(|e| Error::Database(Box::new(e)))?;
716
717 Ok(query(
718 r#"
719 SELECT
720 mint_url,
721 direction,
722 unit,
723 amount,
724 fee,
725 ys,
726 timestamp,
727 memo,
728 metadata,
729 quote_id,
730 payment_request,
731 payment_proof,
732 payment_method,
733 saga_id
734 FROM
735 transactions
736 "#,
737 )?
738 .fetch_all(&*conn)
739 .await?
740 .into_iter()
741 .filter_map(|row| {
742 let transaction = sql_row_to_transaction(row).ok()?;
744 if transaction.matches_conditions(&mint_url, &direction, &unit) {
745 Some(transaction)
746 } else {
747 None
748 }
749 })
750 .collect::<Vec<_>>())
751 }
752
753 async fn update_proofs(
754 &self,
755 added: Vec<ProofInfo>,
756 removed_ys: Vec<PublicKey>,
757 ) -> Result<(), database::Error> {
758 let conn = self
759 .pool
760 .get()
761 .await
762 .map_err(|e| Error::Database(Box::new(e)))?;
763 let tx = ConnectionWithTransaction::new(conn).await?;
764
765 for proof in added {
766 query(
767 r#"
768 INSERT INTO proof
769 (y, mint_url, state, spending_condition, unit, amount, keyset_id, secret, c, witness, dleq_e, dleq_s, dleq_r, used_by_operation, created_by_operation, p2pk_e)
770 VALUES
771 (:y, :mint_url, :state, :spending_condition, :unit, :amount, :keyset_id, :secret, :c, :witness, :dleq_e, :dleq_s, :dleq_r, :used_by_operation, :created_by_operation, :p2pk_e)
772 ON CONFLICT(y) DO UPDATE SET
773 mint_url = excluded.mint_url,
774 state = excluded.state,
775 spending_condition = excluded.spending_condition,
776 unit = excluded.unit,
777 amount = excluded.amount,
778 keyset_id = excluded.keyset_id,
779 secret = excluded.secret,
780 c = excluded.c,
781 witness = excluded.witness,
782 dleq_e = excluded.dleq_e,
783 dleq_s = excluded.dleq_s,
784 dleq_r = excluded.dleq_r,
785 used_by_operation = excluded.used_by_operation,
786 created_by_operation = excluded.created_by_operation,
787 p2pk_e = excluded.p2pk_e
788 ;
789 "#,
790 )?
791 .bind("y", proof.y.to_bytes().to_vec())
792 .bind("mint_url", proof.mint_url.to_string())
793 .bind("state", proof.state.to_string())
794 .bind(
795 "spending_condition",
796 proof
797 .spending_condition
798 .map(|s| serde_json::to_string(&s).ok()),
799 )
800 .bind("unit", proof.unit.to_string())
801 .bind("amount", u64::from(proof.proof.amount) as i64)
802 .bind("keyset_id", proof.proof.keyset_id.to_string())
803 .bind("secret", proof.proof.secret.to_string())
804 .bind("c", proof.proof.c.to_bytes().to_vec())
805 .bind(
806 "witness",
807 proof
808 .proof
809 .witness
810 .and_then(|w| serde_json::to_string(&w).ok()),
811 )
812 .bind(
813 "dleq_e",
814 proof.proof.dleq.as_ref().map(|dleq| dleq.e.to_secret_bytes().to_vec()),
815 )
816 .bind(
817 "dleq_s",
818 proof.proof.dleq.as_ref().map(|dleq| dleq.s.to_secret_bytes().to_vec()),
819 )
820 .bind(
821 "dleq_r",
822 proof.proof.dleq.as_ref().map(|dleq| dleq.r.to_secret_bytes().to_vec()),
823 )
824 .bind("used_by_operation", proof.used_by_operation.map(|id| id.to_string()))
825 .bind("created_by_operation", proof.created_by_operation.map(|id| id.to_string()))
826 .bind(
827 "p2pk_e",
828 proof
829 .proof
830 .p2pk_e
831 .as_ref()
832 .map(|pk| pk.to_bytes().to_vec()),
833 )
834 .execute(&tx)
835 .await?;
836 }
837
838 if !removed_ys.is_empty() {
839 query(r#"DELETE FROM proof WHERE y IN (:ys)"#)?
840 .bind_vec(
841 "ys",
842 removed_ys.iter().map(|y| y.to_bytes().to_vec()).collect(),
843 )?
844 .execute(&tx)
845 .await?;
846 }
847
848 tx.commit().await?;
849
850 Ok(())
851 }
852
853 #[instrument(skip(self))]
854 async fn update_proofs_state(
855 &self,
856 ys: Vec<PublicKey>,
857 state: State,
858 ) -> Result<(), database::Error> {
859 let conn = self
860 .pool
861 .get()
862 .await
863 .map_err(|e| Error::Database(Box::new(e)))?;
864
865 query("UPDATE proof SET state = :state WHERE y IN (:ys)")?
866 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
867 .bind("state", state.to_string())
868 .execute(&*conn)
869 .await?;
870
871 Ok(())
872 }
873
874 #[instrument(skip(self))]
875 async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
876 let conn = self
877 .pool
878 .get()
879 .await
880 .map_err(|e| Error::Database(Box::new(e)))?;
881
882 let mint_url = transaction.mint_url.to_string();
883 let direction = transaction.direction.to_string();
884 let unit = transaction.unit.to_string();
885 let amount = u64::from(transaction.amount) as i64;
886 let fee = u64::from(transaction.fee) as i64;
887 let ys = transaction
888 .ys
889 .iter()
890 .flat_map(|y| y.to_bytes().to_vec())
891 .collect::<Vec<_>>();
892
893 let id = transaction.id();
894
895 query(
896 r#"
897 INSERT INTO transactions
898 (id, mint_url, direction, unit, amount, fee, ys, timestamp, memo, metadata, quote_id, payment_request, payment_proof, payment_method, saga_id)
899 VALUES
900 (:id, :mint_url, :direction, :unit, :amount, :fee, :ys, :timestamp, :memo, :metadata, :quote_id, :payment_request, :payment_proof, :payment_method, :saga_id)
901 ON CONFLICT(id) DO UPDATE SET
902 mint_url = excluded.mint_url,
903 direction = excluded.direction,
904 unit = excluded.unit,
905 amount = excluded.amount,
906 fee = excluded.fee,
907 timestamp = excluded.timestamp,
908 memo = excluded.memo,
909 metadata = excluded.metadata,
910 quote_id = excluded.quote_id,
911 payment_request = excluded.payment_request,
912 payment_proof = excluded.payment_proof,
913 payment_method = excluded.payment_method,
914 saga_id = excluded.saga_id
915 ;
916 "#,
917 )?
918 .bind("id", id.as_slice().to_vec())
919 .bind("mint_url", mint_url)
920 .bind("direction", direction)
921 .bind("unit", unit)
922 .bind("amount", amount)
923 .bind("fee", fee)
924 .bind("ys", ys)
925 .bind("timestamp", transaction.timestamp as i64)
926 .bind("memo", transaction.memo)
927 .bind(
928 "metadata",
929 serde_json::to_string(&transaction.metadata).map_err(Error::from)?,
930 )
931 .bind("quote_id", transaction.quote_id)
932 .bind("payment_request", transaction.payment_request)
933 .bind("payment_proof", transaction.payment_proof)
934 .bind("payment_method", transaction.payment_method.map(|pm| pm.to_string()))
935 .bind("saga_id", transaction.saga_id.map(|id| id.to_string()))
936 .execute(&*conn)
937 .await?;
938
939 Ok(())
940 }
941
942 #[instrument(skip(self))]
943 async fn update_mint_url(
944 &self,
945 old_mint_url: MintUrl,
946 new_mint_url: MintUrl,
947 ) -> Result<(), database::Error> {
948 let conn = self
949 .pool
950 .get()
951 .await
952 .map_err(|e| Error::Database(Box::new(e)))?;
953 let tx = ConnectionWithTransaction::new(conn).await?;
954 let tables = ["mint_quote", "proof"];
955
956 for table in &tables {
957 query(&format!(
958 r#"
959 UPDATE {table}
960 SET mint_url = :new_mint_url
961 WHERE mint_url = :old_mint_url
962 "#
963 ))?
964 .bind("new_mint_url", new_mint_url.to_string())
965 .bind("old_mint_url", old_mint_url.to_string())
966 .execute(&tx)
967 .await?;
968 }
969
970 tx.commit().await?;
971
972 Ok(())
973 }
974
975 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
976 async fn increment_keyset_counter(
977 &self,
978 keyset_id: &Id,
979 count: u32,
980 ) -> Result<u32, database::Error> {
981 let conn = self
982 .pool
983 .get()
984 .await
985 .map_err(|e| Error::Database(Box::new(e)))?;
986
987 let new_counter = query(
988 r#"
989 INSERT INTO keyset_counter (keyset_id, counter)
990 VALUES (:keyset_id, :count)
991 ON CONFLICT(keyset_id) DO UPDATE SET
992 counter = keyset_counter.counter + :count
993 RETURNING counter
994 "#,
995 )?
996 .bind("keyset_id", keyset_id.to_string())
997 .bind("count", count)
998 .pluck(&*conn)
999 .await?
1000 .map(|n| Ok::<_, Error>(column_as_number!(n)))
1001 .transpose()?
1002 .ok_or_else(|| Error::Internal("Counter update returned no value".to_owned()))?;
1003
1004 Ok(new_counter)
1005 }
1006
1007 #[instrument(skip(self, mint_info))]
1008 async fn add_mint(
1009 &self,
1010 mint_url: MintUrl,
1011 mint_info: Option<MintInfo>,
1012 ) -> Result<(), database::Error> {
1013 let conn = self
1014 .pool
1015 .get()
1016 .await
1017 .map_err(|e| Error::Database(Box::new(e)))?;
1018
1019 let (
1020 name,
1021 pubkey,
1022 version,
1023 description,
1024 description_long,
1025 contact,
1026 nuts,
1027 icon_url,
1028 urls,
1029 motd,
1030 time,
1031 tos_url,
1032 ) = match mint_info {
1033 Some(mint_info) => {
1034 let MintInfo {
1035 name,
1036 pubkey,
1037 version,
1038 description,
1039 description_long,
1040 contact,
1041 nuts,
1042 icon_url,
1043 urls,
1044 motd,
1045 time,
1046 tos_url,
1047 } = mint_info;
1048
1049 (
1050 name,
1051 pubkey.map(|p| p.to_bytes().to_vec()),
1052 version.map(|v| serde_json::to_string(&v).ok()),
1053 description,
1054 description_long,
1055 contact.map(|c| serde_json::to_string(&c).ok()),
1056 serde_json::to_string(&nuts).ok(),
1057 icon_url,
1058 urls.map(|c| serde_json::to_string(&c).ok()),
1059 motd,
1060 time,
1061 tos_url,
1062 )
1063 }
1064 None => (
1065 None, None, None, None, None, None, None, None, None, None, None, None,
1066 ),
1067 };
1068
1069 query(
1070 r#"
1071 INSERT INTO mint
1072 (
1073 mint_url, name, pubkey, version, description, description_long,
1074 contact, nuts, icon_url, urls, motd, mint_time, tos_url
1075 )
1076 VALUES
1077 (
1078 :mint_url, :name, :pubkey, :version, :description, :description_long,
1079 :contact, :nuts, :icon_url, :urls, :motd, :mint_time, :tos_url
1080 )
1081 ON CONFLICT(mint_url) DO UPDATE SET
1082 name = excluded.name,
1083 pubkey = excluded.pubkey,
1084 version = excluded.version,
1085 description = excluded.description,
1086 description_long = excluded.description_long,
1087 contact = excluded.contact,
1088 nuts = excluded.nuts,
1089 icon_url = excluded.icon_url,
1090 urls = excluded.urls,
1091 motd = excluded.motd,
1092 mint_time = excluded.mint_time,
1093 tos_url = excluded.tos_url
1094 ;
1095 "#,
1096 )?
1097 .bind("mint_url", mint_url.to_string())
1098 .bind("name", name)
1099 .bind("pubkey", pubkey)
1100 .bind("version", version)
1101 .bind("description", description)
1102 .bind("description_long", description_long)
1103 .bind("contact", contact)
1104 .bind("nuts", nuts)
1105 .bind("icon_url", icon_url)
1106 .bind("urls", urls)
1107 .bind("motd", motd)
1108 .bind("mint_time", time.map(|v| v as i64))
1109 .bind("tos_url", tos_url)
1110 .execute(&*conn)
1111 .await?;
1112
1113 Ok(())
1114 }
1115
1116 #[instrument(skip(self))]
1117 async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
1118 let conn = self
1119 .pool
1120 .get()
1121 .await
1122 .map_err(|e| Error::Database(Box::new(e)))?;
1123
1124 query(r#"DELETE FROM mint WHERE mint_url=:mint_url"#)?
1125 .bind("mint_url", mint_url.to_string())
1126 .execute(&*conn)
1127 .await?;
1128
1129 Ok(())
1130 }
1131
1132 #[instrument(skip(self, keysets))]
1133 async fn add_mint_keysets(
1134 &self,
1135 mint_url: MintUrl,
1136 keysets: Vec<KeySetInfo>,
1137 ) -> Result<(), database::Error> {
1138 let conn = self
1139 .pool
1140 .get()
1141 .await
1142 .map_err(|e| Error::Database(Box::new(e)))?;
1143 let tx = ConnectionWithTransaction::new(conn).await?;
1144
1145 for keyset in keysets {
1146 query(
1147 r#"
1148 INSERT INTO keyset
1149 (mint_url, id, unit, active, input_fee_ppk, final_expiry, keyset_u32)
1150 VALUES
1151 (:mint_url, :id, :unit, :active, :input_fee_ppk, :final_expiry, :keyset_u32)
1152 ON CONFLICT(id) DO UPDATE SET
1153 active = excluded.active,
1154 input_fee_ppk = excluded.input_fee_ppk
1155 "#,
1156 )?
1157 .bind("mint_url", mint_url.to_string())
1158 .bind("id", keyset.id.to_string())
1159 .bind("unit", keyset.unit.to_string())
1160 .bind("active", keyset.active)
1161 .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
1162 .bind("final_expiry", keyset.final_expiry.map(|v| v as i64))
1163 .bind("keyset_u32", u32::from(keyset.id))
1164 .execute(&tx)
1165 .await?;
1166 }
1167
1168 tx.commit().await?;
1169
1170 Ok(())
1171 }
1172
1173 #[instrument(skip_all)]
1174 async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
1175 let conn = self
1176 .pool
1177 .get()
1178 .await
1179 .map_err(|e| Error::Database(Box::new(e)))?;
1180
1181 let expected_version = quote.version;
1182 let new_version = expected_version.wrapping_add(1);
1183
1184 let rows_affected = query(
1185 r#"
1186 INSERT INTO mint_quote
1187 (id, mint_url, amount, unit, request, state, expiry, secret_key, payment_method, amount_issued, amount_paid, estimated_blocks, version, used_by_operation)
1188 VALUES
1189 (:id, :mint_url, :amount, :unit, :request, :state, :expiry, :secret_key, :payment_method, :amount_issued, :amount_paid, :estimated_blocks, :version, :used_by_operation)
1190 ON CONFLICT(id) DO UPDATE SET
1191 mint_url = excluded.mint_url,
1192 amount = excluded.amount,
1193 unit = excluded.unit,
1194 request = excluded.request,
1195 state = excluded.state,
1196 expiry = excluded.expiry,
1197 secret_key = excluded.secret_key,
1198 payment_method = excluded.payment_method,
1199 amount_issued = excluded.amount_issued,
1200 amount_paid = excluded.amount_paid,
1201 estimated_blocks = excluded.estimated_blocks,
1202 version = :new_version,
1203 used_by_operation = excluded.used_by_operation
1204 WHERE mint_quote.version = :expected_version
1205 ;
1206 "#,
1207 )?
1208 .bind("id", quote.id.to_string())
1209 .bind("mint_url", quote.mint_url.to_string())
1210 .bind("amount", quote.amount.map(|a| a.to_i64()))
1211 .bind("unit", quote.unit.to_string())
1212 .bind("request", quote.request)
1213 .bind("state", quote.state.to_string())
1214 .bind("expiry", quote.expiry as i64)
1215 .bind("secret_key", quote.secret_key.map(|p| p.to_string()))
1216 .bind("payment_method", quote.payment_method.to_string())
1217 .bind("amount_issued", quote.amount_issued.to_i64())
1218 .bind("amount_paid", quote.amount_paid.to_i64())
1219 .bind("estimated_blocks", quote.estimated_blocks.map(i64::from))
1220 .bind("version", quote.version as i64)
1221 .bind("new_version", new_version as i64)
1222 .bind("expected_version", expected_version as i64)
1223 .bind("used_by_operation", quote.used_by_operation)
1224 .execute(&*conn).await?;
1225
1226 if rows_affected == 0 {
1227 return Err(database::Error::ConcurrentUpdate);
1228 }
1229
1230 Ok(())
1231 }
1232
1233 #[instrument(skip(self))]
1234 async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1235 let conn = self
1236 .pool
1237 .get()
1238 .await
1239 .map_err(|e| Error::Database(Box::new(e)))?;
1240
1241 query(r#"DELETE FROM mint_quote WHERE id=:id"#)?
1242 .bind("id", quote_id.to_string())
1243 .execute(&*conn)
1244 .await?;
1245
1246 Ok(())
1247 }
1248
1249 #[instrument(skip_all)]
1250 async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
1251 let conn = self
1252 .pool
1253 .get()
1254 .await
1255 .map_err(|e| Error::Database(Box::new(e)))?;
1256
1257 let expected_version = quote.version;
1258 let new_version = expected_version.wrapping_add(1);
1259
1260 let rows_affected = query(
1261 r#"
1262 INSERT INTO melt_quote
1263 (id, unit, amount, request, fee_reserve, state, expiry, payment_method, estimated_blocks, fee_index, version, mint_url, used_by_operation)
1264 VALUES
1265 (:id, :unit, :amount, :request, :fee_reserve, :state, :expiry, :payment_method, :estimated_blocks, :fee_index, :version, :mint_url, :used_by_operation)
1266 ON CONFLICT(id) DO UPDATE SET
1267 unit = excluded.unit,
1268 amount = excluded.amount,
1269 request = excluded.request,
1270 fee_reserve = excluded.fee_reserve,
1271 state = excluded.state,
1272 expiry = excluded.expiry,
1273 payment_method = excluded.payment_method,
1274 estimated_blocks = excluded.estimated_blocks,
1275 fee_index = excluded.fee_index,
1276 version = :new_version,
1277 mint_url = excluded.mint_url,
1278 used_by_operation = excluded.used_by_operation
1279 WHERE melt_quote.version = :expected_version
1280 ;
1281 "#,
1282 )?
1283 .bind("id", quote.id.to_string())
1284 .bind("unit", quote.unit.to_string())
1285 .bind("amount", u64::from(quote.amount) as i64)
1286 .bind("request", quote.request)
1287 .bind("fee_reserve", u64::from(quote.fee_reserve) as i64)
1288 .bind("state", quote.state.to_string())
1289 .bind("expiry", quote.expiry as i64)
1290 .bind("payment_method", quote.payment_method.to_string())
1291 .bind("estimated_blocks", quote.estimated_blocks.map(i64::from))
1292 .bind("fee_index", quote.fee_index.map(i64::from))
1293 .bind("version", quote.version as i64)
1294 .bind("new_version", new_version as i64)
1295 .bind("expected_version", expected_version as i64)
1296 .bind("mint_url", quote.mint_url.map(|m| m.to_string()))
1297 .bind("used_by_operation", quote.used_by_operation)
1298 .execute(&*conn)
1299 .await?;
1300
1301 if rows_affected == 0 {
1302 return Err(database::Error::ConcurrentUpdate);
1303 }
1304
1305 Ok(())
1306 }
1307
1308 #[instrument(skip(self))]
1309 async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1310 let conn = self
1311 .pool
1312 .get()
1313 .await
1314 .map_err(|e| Error::Database(Box::new(e)))?;
1315
1316 query(r#"DELETE FROM melt_quote WHERE id=:id"#)?
1317 .bind("id", quote_id.to_owned())
1318 .execute(&*conn)
1319 .await?;
1320
1321 Ok(())
1322 }
1323
1324 #[instrument(skip_all)]
1325 async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
1326 let conn = self
1327 .pool
1328 .get()
1329 .await
1330 .map_err(|e| Error::Database(Box::new(e)))?;
1331
1332 keyset.verify_id()?;
1333
1334 query(
1335 r#"
1336 INSERT INTO key
1337 (id, keys, keyset_u32)
1338 VALUES
1339 (:id, :keys, :keyset_u32)
1340 "#,
1341 )?
1342 .bind("id", keyset.id.to_string())
1343 .bind(
1344 "keys",
1345 serde_json::to_string(&keyset.keys).map_err(Error::from)?,
1346 )
1347 .bind("keyset_u32", u32::from(keyset.id))
1348 .execute(&*conn)
1349 .await?;
1350
1351 Ok(())
1352 }
1353
1354 #[instrument(skip(self))]
1355 async fn remove_keys(&self, id: &Id) -> Result<(), database::Error> {
1356 let conn = self
1357 .pool
1358 .get()
1359 .await
1360 .map_err(|e| Error::Database(Box::new(e)))?;
1361
1362 query(r#"DELETE FROM key WHERE id = :id"#)?
1363 .bind("id", id.to_string())
1364 .execute(&*conn)
1365 .await?;
1366
1367 Ok(())
1368 }
1369
1370 #[instrument(skip(self))]
1371 async fn remove_transaction(
1372 &self,
1373 transaction_id: TransactionId,
1374 ) -> Result<(), database::Error> {
1375 let conn = self
1376 .pool
1377 .get()
1378 .await
1379 .map_err(|e| Error::Database(Box::new(e)))?;
1380
1381 query(r#"DELETE FROM transactions WHERE id=:id"#)?
1382 .bind("id", transaction_id.as_slice().to_vec())
1383 .execute(&*conn)
1384 .await?;
1385
1386 Ok(())
1387 }
1388
1389 #[instrument(skip(self))]
1390 async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
1391 let conn = self
1392 .pool
1393 .get()
1394 .await
1395 .map_err(|e| Error::Database(Box::new(e)))?;
1396
1397 let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1398 Error::Database(Box::new(std::io::Error::new(
1399 std::io::ErrorKind::InvalidData,
1400 format!("Failed to serialize saga state: {}", e),
1401 )))
1402 })?;
1403
1404 let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1405 Error::Database(Box::new(std::io::Error::new(
1406 std::io::ErrorKind::InvalidData,
1407 format!("Failed to serialize saga data: {}", e),
1408 )))
1409 })?;
1410
1411 query(
1412 r#"
1413 INSERT INTO wallet_sagas
1414 (id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version)
1415 VALUES
1416 (:id, :kind, :state, :amount, :mint_url, :unit, :quote_id, :created_at, :updated_at, :data, :version)
1417 "#,
1418 )?
1419 .bind("id", saga.id.to_string())
1420 .bind("kind", saga.kind.to_string())
1421 .bind("state", state_json)
1422 .bind("amount", u64::from(saga.amount) as i64)
1423 .bind("mint_url", saga.mint_url.to_string())
1424 .bind("unit", saga.unit.to_string())
1425 .bind("quote_id", saga.quote_id)
1426 .bind("created_at", saga.created_at as i64)
1427 .bind("updated_at", saga.updated_at as i64)
1428 .bind("data", data_json)
1429 .bind("version", saga.version as i64)
1430 .execute(&*conn)
1431 .await?;
1432
1433 Ok(())
1434 }
1435
1436 #[instrument(skip(self))]
1437 async fn get_saga(
1438 &self,
1439 id: &uuid::Uuid,
1440 ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1441 let conn = self
1442 .pool
1443 .get()
1444 .await
1445 .map_err(|e| Error::Database(Box::new(e)))?;
1446
1447 let rows = query(
1448 r#"
1449 SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1450 FROM wallet_sagas
1451 WHERE id = :id
1452 "#,
1453 )?
1454 .bind("id", id.to_string())
1455 .fetch_all(&*conn)
1456 .await?;
1457
1458 match rows.into_iter().next() {
1459 Some(row) => Ok(Some(sql_row_to_wallet_saga(row)?)),
1460 None => Ok(None),
1461 }
1462 }
1463
1464 #[instrument(skip(self))]
1465 async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1466 let conn = self
1467 .pool
1468 .get()
1469 .await
1470 .map_err(|e| Error::Database(Box::new(e)))?;
1471
1472 let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1473 Error::Database(Box::new(std::io::Error::new(
1474 std::io::ErrorKind::InvalidData,
1475 format!("Failed to serialize saga state: {}", e),
1476 )))
1477 })?;
1478
1479 let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1480 Error::Database(Box::new(std::io::Error::new(
1481 std::io::ErrorKind::InvalidData,
1482 format!("Failed to serialize saga data: {}", e),
1483 )))
1484 })?;
1485
1486 let expected_version = saga.version.saturating_sub(1);
1490
1491 let rows_affected = query(
1492 r#"
1493 UPDATE wallet_sagas
1494 SET kind = :kind, state = :state, amount = :amount, mint_url = :mint_url,
1495 unit = :unit, quote_id = :quote_id, updated_at = :updated_at, data = :data,
1496 version = :new_version
1497 WHERE id = :id AND version = :expected_version
1498 "#,
1499 )?
1500 .bind("id", saga.id.to_string())
1501 .bind("kind", saga.kind.to_string())
1502 .bind("state", state_json)
1503 .bind("amount", u64::from(saga.amount) as i64)
1504 .bind("mint_url", saga.mint_url.to_string())
1505 .bind("unit", saga.unit.to_string())
1506 .bind("quote_id", saga.quote_id)
1507 .bind("updated_at", saga.updated_at as i64)
1508 .bind("data", data_json)
1509 .bind("new_version", saga.version as i64)
1510 .bind("expected_version", expected_version as i64)
1511 .execute(&*conn)
1512 .await?;
1513
1514 Ok(rows_affected > 0)
1516 }
1517
1518 #[instrument(skip(self))]
1519 async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1520 let conn = self
1521 .pool
1522 .get()
1523 .await
1524 .map_err(|e| Error::Database(Box::new(e)))?;
1525
1526 query(r#"DELETE FROM wallet_sagas WHERE id = :id"#)?
1527 .bind("id", id.to_string())
1528 .execute(&*conn)
1529 .await?;
1530
1531 Ok(())
1532 }
1533
1534 #[instrument(skip(self))]
1535 async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1536 let conn = self
1537 .pool
1538 .get()
1539 .await
1540 .map_err(|e| Error::Database(Box::new(e)))?;
1541
1542 let rows = query(
1543 r#"
1544 SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1545 FROM wallet_sagas
1546 ORDER BY created_at ASC
1547 "#,
1548 )?
1549 .fetch_all(&*conn)
1550 .await?;
1551
1552 rows.into_iter().map(sql_row_to_wallet_saga).collect()
1553 }
1554
1555 #[instrument(skip(self))]
1556 async fn reserve_proofs(
1557 &self,
1558 ys: Vec<PublicKey>,
1559 operation_id: &uuid::Uuid,
1560 ) -> Result<(), database::Error> {
1561 let conn = self
1562 .pool
1563 .get()
1564 .await
1565 .map_err(|e| Error::Database(Box::new(e)))?;
1566
1567 for y in ys {
1568 let rows_affected = query(
1569 r#"
1570 UPDATE proof
1571 SET state = 'RESERVED', used_by_operation = :operation_id
1572 WHERE y = :y AND state = 'UNSPENT'
1573 "#,
1574 )?
1575 .bind("y", y.to_bytes().to_vec())
1576 .bind("operation_id", operation_id.to_string())
1577 .execute(&*conn)
1578 .await?;
1579
1580 if rows_affected == 0 {
1581 return Err(database::Error::ProofNotUnspent);
1582 }
1583 }
1584
1585 Ok(())
1586 }
1587
1588 #[instrument(skip(self))]
1589 async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1590 let conn = self
1591 .pool
1592 .get()
1593 .await
1594 .map_err(|e| Error::Database(Box::new(e)))?;
1595
1596 query(
1597 r#"
1598 UPDATE proof
1599 SET state = 'UNSPENT', used_by_operation = NULL
1600 WHERE used_by_operation = :operation_id
1601 "#,
1602 )?
1603 .bind("operation_id", operation_id.to_string())
1604 .execute(&*conn)
1605 .await?;
1606
1607 Ok(())
1608 }
1609
1610 #[instrument(skip(self))]
1611 async fn get_reserved_proofs(
1612 &self,
1613 operation_id: &uuid::Uuid,
1614 ) -> Result<Vec<ProofInfo>, database::Error> {
1615 let conn = self
1616 .pool
1617 .get()
1618 .await
1619 .map_err(|e| Error::Database(Box::new(e)))?;
1620
1621 let rows = query(
1622 r#"
1623 SELECT
1624 amount,
1625 unit,
1626 keyset_id,
1627 secret,
1628 c,
1629 witness,
1630 dleq_e,
1631 dleq_s,
1632 dleq_r,
1633 y,
1634 mint_url,
1635 state,
1636 spending_condition,
1637 used_by_operation,
1638 created_by_operation,
1639 p2pk_e
1640 FROM proof
1641 WHERE used_by_operation = :operation_id
1642 "#,
1643 )?
1644 .bind("operation_id", operation_id.to_string())
1645 .fetch_all(&*conn)
1646 .await?;
1647
1648 rows.into_iter().map(sql_row_to_proof_info).collect()
1649 }
1650
1651 #[instrument(skip(self))]
1652 async fn reserve_melt_quote(
1653 &self,
1654 quote_id: &str,
1655 operation_id: &uuid::Uuid,
1656 ) -> Result<(), database::Error> {
1657 let conn = self
1658 .pool
1659 .get()
1660 .await
1661 .map_err(|e| Error::Database(Box::new(e)))?;
1662
1663 let rows_affected = query(
1664 r#"
1665 UPDATE melt_quote
1666 SET used_by_operation = :operation_id
1667 WHERE id = :quote_id AND used_by_operation IS NULL
1668 "#,
1669 )?
1670 .bind("operation_id", operation_id.to_string())
1671 .bind("quote_id", quote_id)
1672 .execute(&*conn)
1673 .await?;
1674
1675 if rows_affected == 0 {
1676 let exists = query(
1678 r#"
1679 SELECT 1 FROM melt_quote WHERE id = :quote_id
1680 "#,
1681 )?
1682 .bind("quote_id", quote_id)
1683 .fetch_one(&*conn)
1684 .await?;
1685
1686 if exists.is_none() {
1687 return Err(database::Error::UnknownQuote);
1688 }
1689 return Err(database::Error::QuoteAlreadyInUse);
1690 }
1691
1692 Ok(())
1693 }
1694
1695 #[instrument(skip(self))]
1696 async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1697 let conn = self
1698 .pool
1699 .get()
1700 .await
1701 .map_err(|e| Error::Database(Box::new(e)))?;
1702
1703 query(
1704 r#"
1705 UPDATE melt_quote
1706 SET used_by_operation = NULL
1707 WHERE used_by_operation = :operation_id
1708 "#,
1709 )?
1710 .bind("operation_id", operation_id.to_string())
1711 .execute(&*conn)
1712 .await?;
1713
1714 Ok(())
1715 }
1716
1717 #[instrument(skip(self))]
1718 async fn reserve_mint_quote(
1719 &self,
1720 quote_id: &str,
1721 operation_id: &uuid::Uuid,
1722 ) -> Result<(), database::Error> {
1723 let conn = self
1724 .pool
1725 .get()
1726 .await
1727 .map_err(|e| Error::Database(Box::new(e)))?;
1728
1729 let rows_affected = query(
1730 r#"
1731 UPDATE mint_quote
1732 SET used_by_operation = :operation_id
1733 WHERE id = :quote_id AND used_by_operation IS NULL
1734 "#,
1735 )?
1736 .bind("operation_id", operation_id.to_string())
1737 .bind("quote_id", quote_id)
1738 .execute(&*conn)
1739 .await?;
1740
1741 if rows_affected == 0 {
1742 let exists = query(
1744 r#"
1745 SELECT 1 FROM mint_quote WHERE id = :quote_id
1746 "#,
1747 )?
1748 .bind("quote_id", quote_id)
1749 .fetch_one(&*conn)
1750 .await?;
1751
1752 if exists.is_none() {
1753 return Err(database::Error::UnknownQuote);
1754 }
1755 return Err(database::Error::QuoteAlreadyInUse);
1756 }
1757
1758 Ok(())
1759 }
1760
1761 #[instrument(skip(self))]
1762 async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1763 let conn = self
1764 .pool
1765 .get()
1766 .await
1767 .map_err(|e| Error::Database(Box::new(e)))?;
1768
1769 query(
1770 r#"
1771 UPDATE mint_quote
1772 SET used_by_operation = NULL
1773 WHERE used_by_operation = :operation_id
1774 "#,
1775 )?
1776 .bind("operation_id", operation_id.to_string())
1777 .execute(&*conn)
1778 .await?;
1779
1780 Ok(())
1781 }
1782
1783 async fn kv_read(
1784 &self,
1785 primary_namespace: &str,
1786 secondary_namespace: &str,
1787 key: &str,
1788 ) -> Result<Option<Vec<u8>>, database::Error> {
1789 crate::keyvalue::kv_read(&self.pool, primary_namespace, secondary_namespace, key).await
1790 }
1791
1792 async fn kv_list(
1793 &self,
1794 primary_namespace: &str,
1795 secondary_namespace: &str,
1796 ) -> Result<Vec<String>, database::Error> {
1797 crate::keyvalue::kv_list(&self.pool, primary_namespace, secondary_namespace).await
1798 }
1799
1800 async fn kv_write(
1801 &self,
1802 primary_namespace: &str,
1803 secondary_namespace: &str,
1804 key: &str,
1805 value: &[u8],
1806 ) -> Result<(), database::Error> {
1807 let conn = self
1808 .pool
1809 .get()
1810 .await
1811 .map_err(|e| Error::Database(Box::new(e)))?;
1812 crate::keyvalue::kv_write_standalone(
1813 &*conn,
1814 primary_namespace,
1815 secondary_namespace,
1816 key,
1817 value,
1818 )
1819 .await?;
1820 Ok(())
1821 }
1822
1823 async fn kv_remove(
1824 &self,
1825 primary_namespace: &str,
1826 secondary_namespace: &str,
1827 key: &str,
1828 ) -> Result<(), database::Error> {
1829 let conn = self
1830 .pool
1831 .get()
1832 .await
1833 .map_err(|e| Error::Database(Box::new(e)))?;
1834 crate::keyvalue::kv_remove_standalone(&*conn, primary_namespace, secondary_namespace, key)
1835 .await?;
1836 Ok(())
1837 }
1838
1839 #[instrument(skip(self))]
1842 async fn add_p2pk_key(
1843 &self,
1844 pubkey: &PublicKey,
1845 derivation_path: DerivationPath,
1846 derivation_index: u32,
1847 ) -> Result<(), Error> {
1848 let conn = self
1849 .pool
1850 .get()
1851 .await
1852 .map_err(|e| Error::Database(Box::new(e)))?;
1853 let query_str = r#"
1854 INSERT INTO p2pk_signing_key (pubkey, derivation_index, derivation_path, created_time)
1855 VALUES (:pubkey, :derivation_index, :derivation_path, :created_time)
1856 "#
1857 .to_string();
1858
1859 query(&query_str)?
1860 .bind("pubkey", pubkey.to_bytes().to_vec())
1861 .bind("derivation_index", derivation_index)
1862 .bind("derivation_path", derivation_path.to_string())
1863 .bind("created_time", unix_time() as i64)
1864 .execute(&*conn)
1865 .await?;
1866
1867 Ok(())
1868 }
1869
1870 #[instrument(skip(self))]
1871 async fn get_p2pk_key(
1872 &self,
1873 pubkey: &PublicKey,
1874 ) -> Result<Option<wallet::P2PKSigningKey>, Error> {
1875 let conn = self
1876 .pool
1877 .get()
1878 .await
1879 .map_err(|e| Error::Database(Box::new(e)))?;
1880 let query_str = r#"SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key WHERE pubkey = :pubkey"#.to_string();
1881
1882 query(&query_str)?
1883 .bind("pubkey", pubkey.to_bytes().to_vec())
1884 .fetch_one(&*conn)
1885 .await?
1886 .map(sql_row_to_p2pk_signing_key)
1887 .transpose()
1888 }
1889
1890 #[instrument(skip(self))]
1891 async fn list_p2pk_keys(&self) -> Result<Vec<wallet::P2PKSigningKey>, Error> {
1892 let conn = self
1893 .pool
1894 .get()
1895 .await
1896 .map_err(|e| Error::Database(Box::new(e)))?;
1897 let query_str = r#"
1898 SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key ORDER BY derivation_index DESC
1899 "#.to_string();
1900
1901 Ok(query(&query_str)?
1902 .fetch_all(&*conn)
1903 .await?
1904 .into_iter()
1905 .filter_map(|row| {
1906 let row = sql_row_to_p2pk_signing_key(row).ok()?;
1907
1908 Some(row)
1909 })
1910 .collect::<Vec<wallet::P2PKSigningKey>>())
1911 }
1912
1913 #[instrument(skip(self))]
1914 async fn latest_p2pk(&self) -> Result<Option<wallet::P2PKSigningKey>, Error> {
1915 let conn = self
1916 .pool
1917 .get()
1918 .await
1919 .map_err(|e| Error::Database(Box::new(e)))?;
1920 let query_str = r#"
1921 SELECT pubkey, derivation_index, derivation_path, created_time FROM p2pk_signing_key ORDER BY derivation_index DESC LIMIT 1
1922 "#.to_string();
1923
1924 query(&query_str)?
1925 .fetch_one(&*conn)
1926 .await?
1927 .map(sql_row_to_p2pk_signing_key)
1928 .transpose()
1929 }
1930}
1931
1932fn sql_row_to_mint_info(row: Vec<Column>) -> Result<MintInfo, Error> {
1933 unpack_into!(
1934 let (
1935 name,
1936 pubkey,
1937 version,
1938 description,
1939 description_long,
1940 contact,
1941 nuts,
1942 icon_url,
1943 motd,
1944 urls,
1945 mint_time,
1946 tos_url
1947 ) = row
1948 );
1949
1950 Ok(MintInfo {
1951 name: column_as_nullable_string!(&name),
1952 pubkey: column_as_nullable_string!(&pubkey, |v| serde_json::from_str(v).ok(), |v| {
1953 serde_json::from_slice(v).ok()
1954 }),
1955 version: column_as_nullable_string!(&version).and_then(|v| serde_json::from_str(&v).ok()),
1956 description: column_as_nullable_string!(description),
1957 description_long: column_as_nullable_string!(description_long),
1958 contact: column_as_nullable_string!(contact, |v| serde_json::from_str(&v).ok()),
1959 nuts: column_as_nullable_string!(nuts, |v| serde_json::from_str(&v).ok())
1960 .unwrap_or_default(),
1961 urls: column_as_nullable_string!(urls, |v| serde_json::from_str(&v).ok()),
1962 icon_url: column_as_nullable_string!(icon_url),
1963 motd: column_as_nullable_string!(motd),
1964 time: column_as_nullable_number!(mint_time).map(|t| t),
1965 tos_url: column_as_nullable_string!(tos_url),
1966 })
1967}
1968
1969#[instrument(skip_all)]
1970fn sql_row_to_keyset(row: Vec<Column>) -> Result<KeySetInfo, Error> {
1971 unpack_into!(
1972 let (
1973 id,
1974 unit,
1975 active,
1976 input_fee_ppk,
1977 final_expiry
1978 ) = row
1979 );
1980
1981 Ok(KeySetInfo {
1982 id: column_as_string!(id, Id::from_str, Id::from_bytes),
1983 unit: column_as_string!(unit, CurrencyUnit::from_str),
1984 active: matches!(active, Column::Integer(1)),
1985 input_fee_ppk: column_as_nullable_number!(input_fee_ppk).unwrap_or(0),
1986 final_expiry: column_as_nullable_number!(final_expiry),
1987 })
1988}
1989
1990fn sql_row_to_mint_quote(row: Vec<Column>) -> Result<MintQuote, Error> {
1991 unpack_into!(
1992 let (
1993 id,
1994 mint_url,
1995 amount,
1996 unit,
1997 request,
1998 state,
1999 expiry,
2000 secret_key,
2001 row_method,
2002 row_amount_minted,
2003 row_amount_paid,
2004 estimated_blocks,
2005 used_by_operation,
2006 version
2007 ) = row
2008 );
2009
2010 let amount: Option<i64> = column_as_nullable_number!(amount);
2011
2012 let amount_paid: u64 = column_as_number!(row_amount_paid);
2013 let amount_minted: u64 = column_as_number!(row_amount_minted);
2014 let expiry_val: u64 = column_as_number!(expiry);
2015 let version_val: u32 = column_as_number!(version);
2016 let payment_method =
2017 PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
2018
2019 Ok(MintQuote {
2020 id: column_as_string!(id),
2021 mint_url: column_as_string!(mint_url, MintUrl::from_str),
2022 amount: amount.and_then(Amount::from_i64),
2023 unit: column_as_string!(unit, CurrencyUnit::from_str),
2024 request: column_as_string!(request),
2025 state: column_as_string!(state, MintQuoteState::from_str),
2026 expiry: expiry_val,
2027 secret_key: column_as_nullable_string!(secret_key, |s| SecretKey::from_str(&s).ok()),
2028 payment_method,
2029 amount_issued: Amount::from(amount_minted),
2030 amount_paid: Amount::from(amount_paid),
2031 estimated_blocks: column_as_nullable_number!(estimated_blocks),
2032 used_by_operation: column_as_nullable_string!(used_by_operation),
2033 version: version_val,
2034 })
2035}
2036
2037fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<wallet::MeltQuote, Error> {
2038 unpack_into!(
2039 let (
2040 id,
2041 unit,
2042 amount,
2043 request,
2044 fee_reserve,
2045 state,
2046 expiry,
2047 payment_proof,
2048 row_method,
2049 estimated_blocks,
2050 fee_index,
2051 used_by_operation,
2052 version,
2053 mint_url
2054 ) = row
2055 );
2056
2057 let payment_method =
2058 PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
2059
2060 let amount_val: u64 = column_as_number!(amount);
2061 let fee_reserve_val: u64 = column_as_number!(fee_reserve);
2062 let expiry_val: u64 = column_as_number!(expiry);
2063 let version_val: u32 = column_as_number!(version);
2064
2065 Ok(wallet::MeltQuote {
2066 id: column_as_string!(id),
2067 mint_url: column_as_nullable_string!(mint_url, |s| MintUrl::from_str(&s).ok()),
2068 unit: column_as_string!(unit, CurrencyUnit::from_str),
2069 amount: Amount::from(amount_val),
2070 request: column_as_string!(request),
2071 fee_reserve: Amount::from(fee_reserve_val),
2072 state: column_as_string!(state, MeltQuoteState::from_str),
2073 expiry: expiry_val,
2074 payment_proof: column_as_nullable_string!(payment_proof),
2075 estimated_blocks: column_as_nullable_number!(estimated_blocks),
2076 fee_index: column_as_nullable_number!(fee_index),
2077 payment_method,
2078 used_by_operation: column_as_nullable_string!(used_by_operation),
2079 version: version_val,
2080 })
2081}
2082
2083fn sql_row_to_proof_info(row: Vec<Column>) -> Result<ProofInfo, Error> {
2084 unpack_into!(
2085 let (
2086 amount,
2087 unit,
2088 keyset_id,
2089 secret,
2090 c,
2091 witness,
2092 dleq_e,
2093 dleq_s,
2094 dleq_r,
2095 y,
2096 mint_url,
2097 state,
2098 spending_condition,
2099 used_by_operation,
2100 created_by_operation,
2101 p2pk_e
2102 ) = row
2103 );
2104
2105 let dleq = match (
2106 column_as_nullable_binary!(dleq_e),
2107 column_as_nullable_binary!(dleq_s),
2108 column_as_nullable_binary!(dleq_r),
2109 ) {
2110 (Some(e), Some(s), Some(r)) => {
2111 let e_key = SecretKey::from_slice(&e)?;
2112 let s_key = SecretKey::from_slice(&s)?;
2113 let r_key = SecretKey::from_slice(&r)?;
2114
2115 Some(ProofDleq::new(e_key, s_key, r_key))
2116 }
2117 _ => None,
2118 };
2119
2120 let amount: u64 = column_as_number!(amount);
2121 let proof = Proof {
2122 amount: Amount::from(amount),
2123 keyset_id: column_as_string!(keyset_id, Id::from_str),
2124 secret: column_as_string!(secret, Secret::from_str),
2125 witness: column_as_nullable_string!(witness, |v| { serde_json::from_str(&v).ok() }, |v| {
2126 serde_json::from_slice(&v).ok()
2127 }),
2128 c: column_as_string!(c, PublicKey::from_str, PublicKey::from_slice),
2129 dleq,
2130 p2pk_e: column_as_nullable_binary!(p2pk_e)
2131 .map(|bytes| PublicKey::from_slice(&bytes))
2132 .transpose()?,
2133 };
2134
2135 let used_by_operation =
2136 column_as_nullable_string!(used_by_operation).and_then(|id| Uuid::from_str(&id).ok());
2137 let created_by_operation =
2138 column_as_nullable_string!(created_by_operation).and_then(|id| Uuid::from_str(&id).ok());
2139
2140 Ok(ProofInfo {
2141 proof,
2142 y: column_as_string!(y, PublicKey::from_str, PublicKey::from_slice),
2143 mint_url: column_as_string!(mint_url, MintUrl::from_str),
2144 state: column_as_string!(state, State::from_str),
2145 spending_condition: column_as_nullable_string!(
2146 spending_condition,
2147 |r| { serde_json::from_str(&r).ok() },
2148 |r| { serde_json::from_slice(&r).ok() }
2149 ),
2150 unit: column_as_string!(unit, CurrencyUnit::from_str),
2151 used_by_operation,
2152 created_by_operation,
2153 })
2154}
2155
2156fn sql_row_to_wallet_saga(row: Vec<Column>) -> Result<wallet::WalletSaga, Error> {
2157 unpack_into!(
2158 let (
2159 id,
2160 kind,
2161 state,
2162 amount,
2163 mint_url,
2164 unit,
2165 quote_id,
2166 created_at,
2167 updated_at,
2168 data,
2169 version
2170 ) = row
2171 );
2172
2173 let id_str: String = column_as_string!(id);
2174 let id = uuid::Uuid::parse_str(&id_str).map_err(|e| {
2175 Error::Database(Box::new(std::io::Error::new(
2176 std::io::ErrorKind::InvalidData,
2177 format!("Invalid UUID: {}", e),
2178 )))
2179 })?;
2180 let kind_str: String = column_as_string!(kind);
2181 let state_json: String = column_as_string!(state);
2182 let amount: u64 = column_as_number!(amount);
2183 let mint_url: MintUrl = column_as_string!(mint_url, MintUrl::from_str);
2184 let unit: CurrencyUnit = column_as_string!(unit, CurrencyUnit::from_str);
2185 let quote_id: Option<String> = column_as_nullable_string!(quote_id);
2186 let created_at: u64 = column_as_number!(created_at);
2187 let updated_at: u64 = column_as_number!(updated_at);
2188 let data_json: String = column_as_string!(data);
2189 let version: u32 = column_as_number!(version);
2190
2191 let kind = wallet::OperationKind::from_str(&kind_str).map_err(|_| {
2192 Error::Database(Box::new(std::io::Error::new(
2193 std::io::ErrorKind::InvalidData,
2194 format!("Invalid operation kind: {}", kind_str),
2195 )))
2196 })?;
2197 let state: wallet::WalletSagaState = serde_json::from_str(&state_json).map_err(|e| {
2198 Error::Database(Box::new(std::io::Error::new(
2199 std::io::ErrorKind::InvalidData,
2200 format!("Failed to deserialize saga state: {}", e),
2201 )))
2202 })?;
2203 let data: wallet::OperationData = serde_json::from_str(&data_json).map_err(|e| {
2204 Error::Database(Box::new(std::io::Error::new(
2205 std::io::ErrorKind::InvalidData,
2206 format!("Failed to deserialize saga data: {}", e),
2207 )))
2208 })?;
2209
2210 Ok(wallet::WalletSaga {
2211 id,
2212 kind,
2213 state,
2214 amount: Amount::from(amount),
2215 mint_url,
2216 unit,
2217 quote_id,
2218 created_at,
2219 updated_at,
2220 data,
2221 version,
2222 })
2223}
2224
2225fn sql_row_to_transaction(row: Vec<Column>) -> Result<Transaction, Error> {
2226 unpack_into!(
2227 let (
2228 mint_url,
2229 direction,
2230 unit,
2231 amount,
2232 fee,
2233 ys,
2234 timestamp,
2235 memo,
2236 metadata,
2237 quote_id,
2238 payment_request,
2239 payment_proof,
2240 payment_method,
2241 saga_id
2242 ) = row
2243 );
2244
2245 let amount: u64 = column_as_number!(amount);
2246 let fee: u64 = column_as_number!(fee);
2247
2248 let saga_id: Option<Uuid> = column_as_nullable_string!(saga_id)
2249 .map(|id| Uuid::from_str(&id).ok())
2250 .flatten();
2251
2252 Ok(Transaction {
2253 mint_url: column_as_string!(mint_url, MintUrl::from_str),
2254 direction: column_as_string!(direction, TransactionDirection::from_str),
2255 unit: column_as_string!(unit, CurrencyUnit::from_str),
2256 amount: Amount::from(amount),
2257 fee: Amount::from(fee),
2258 ys: column_as_binary!(ys)
2259 .chunks(33)
2260 .map(PublicKey::from_slice)
2261 .collect::<Result<Vec<_>, _>>()?,
2262 timestamp: column_as_number!(timestamp),
2263 memo: column_as_nullable_string!(memo),
2264 metadata: column_as_nullable_string!(metadata, |v| serde_json::from_str(&v).ok(), |v| {
2265 serde_json::from_slice(&v).ok()
2266 })
2267 .unwrap_or_default(),
2268 quote_id: column_as_nullable_string!(quote_id),
2269 payment_request: column_as_nullable_string!(payment_request),
2270 payment_proof: column_as_nullable_string!(payment_proof),
2271 payment_method: column_as_nullable_string!(payment_method)
2272 .map(|v| PaymentMethod::from_str(&v))
2273 .transpose()
2274 .map_err(Error::from)?,
2275 saga_id,
2276 })
2277}
2278
2279fn sql_row_to_p2pk_signing_key(row: Vec<Column>) -> Result<wallet::P2PKSigningKey, Error> {
2280 unpack_into!(
2281 let (
2282 pubkey,
2283 derivation_index,
2284 derivation_path,
2285 created_time
2286 ) = row
2287 );
2288
2289 Ok(wallet::P2PKSigningKey {
2290 pubkey: column_as_string!(pubkey, PublicKey::from_str, PublicKey::from_slice),
2291 derivation_index: column_as_number!(derivation_index),
2292 derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
2293 created_time: column_as_number!(created_time),
2294 })
2295}