1use std::collections::HashMap;
4use std::fmt::Debug;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use cdk_common::database::{ConversionError, Error, WalletDatabase};
10use cdk_common::mint_url::MintUrl;
11use cdk_common::nuts::{MeltQuoteState, MintQuoteState};
12use cdk_common::secret::Secret;
13use cdk_common::wallet::{
14 self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
15};
16use cdk_common::{
17 database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod, Proof,
18 ProofDleq, PublicKey, SecretKey, SpendingConditions, State,
19};
20use tracing::instrument;
21use uuid::Uuid;
22
23use crate::common::migrate;
24use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
25use crate::pool::{DatabasePool, Pool, PooledResource};
26use crate::stmt::{query, Column};
27use crate::{
28 column_as_binary, column_as_nullable_binary, column_as_nullable_number,
29 column_as_nullable_string, column_as_number, column_as_string, unpack_into,
30};
31
32#[rustfmt::skip]
33mod migrations {
34 include!(concat!(env!("OUT_DIR"), "/migrations_wallet.rs"));
35}
36
37#[derive(Debug, Clone)]
39pub struct SQLWalletDatabase<RM>
40where
41 RM: DatabasePool + 'static,
42{
43 pool: Arc<Pool<RM>>,
44}
45
46impl<RM> SQLWalletDatabase<RM>
47where
48 RM: DatabasePool + 'static,
49{
50 pub async fn new<X>(db: X) -> Result<Self, Error>
52 where
53 X: Into<RM::Config>,
54 {
55 let pool = Pool::new(db.into());
56 Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
57
58 Ok(Self { pool })
59 }
60
61 async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
63 let tx = ConnectionWithTransaction::new(conn).await?;
64 migrate(&tx, RM::Connection::name(), migrations::MIGRATIONS).await?;
65 Self::add_keyset_u32(&tx).await?;
67 tx.commit().await?;
68
69 Ok(())
70 }
71
72 async fn add_keyset_u32<T>(conn: &T) -> Result<(), Error>
73 where
74 T: DatabaseExecutor,
75 {
76 let keys_without_u32: Vec<Vec<Column>> = query(
78 r#"
79 SELECT
80 id
81 FROM key
82 WHERE keyset_u32 IS NULL
83 "#,
84 )?
85 .fetch_all(conn)
86 .await?;
87
88 for row in keys_without_u32 {
89 unpack_into!(let (id) = row);
90 let id = column_as_string!(id);
91
92 if let Ok(id) = Id::from_str(&id) {
93 query(
94 r#"
95 UPDATE
96 key
97 SET keyset_u32 = :u32_keyset
98 WHERE id = :keyset_id
99 "#,
100 )?
101 .bind("u32_keyset", u32::from(id))
102 .bind("keyset_id", id.to_string())
103 .execute(conn)
104 .await?;
105 }
106 }
107
108 let keysets_without_u32: Vec<Vec<Column>> = query(
110 r#"
111 SELECT
112 id
113 FROM keyset
114 WHERE keyset_u32 IS NULL
115 "#,
116 )?
117 .fetch_all(conn)
118 .await?;
119
120 for row in keysets_without_u32 {
121 unpack_into!(let (id) = row);
122 let id = column_as_string!(id);
123
124 if let Ok(id) = Id::from_str(&id) {
125 query(
126 r#"
127 UPDATE
128 keyset
129 SET keyset_u32 = :u32_keyset
130 WHERE id = :keyset_id
131 "#,
132 )?
133 .bind("u32_keyset", u32::from(id))
134 .bind("keyset_id", id.to_string())
135 .execute(conn)
136 .await?;
137 }
138 }
139
140 Ok(())
141 }
142}
143
144#[async_trait]
145impl<RM> WalletDatabase<database::Error> for SQLWalletDatabase<RM>
146where
147 RM: DatabasePool + 'static,
148{
149 #[instrument(skip(self))]
150 async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
151 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
152
153 Ok(query(
154 r#"
155 SELECT
156 id,
157 unit,
158 amount,
159 request,
160 fee_reserve,
161 state,
162 expiry,
163 payment_preimage,
164 payment_method,
165 used_by_operation,
166 version
167 FROM
168 melt_quote
169 "#,
170 )?
171 .fetch_all(&*conn)
172 .await?
173 .into_iter()
174 .map(sql_row_to_melt_quote)
175 .collect::<Result<_, _>>()?)
176 }
177
178 #[instrument(skip(self))]
179 async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
180 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
181 Ok(query(
182 r#"
183 SELECT
184 name,
185 pubkey,
186 version,
187 description,
188 description_long,
189 contact,
190 nuts,
191 icon_url,
192 motd,
193 urls,
194 mint_time,
195 tos_url
196 FROM
197 mint
198 WHERE mint_url = :mint_url
199 "#,
200 )?
201 .bind("mint_url", mint_url.to_string())
202 .fetch_one(&*conn)
203 .await?
204 .map(sql_row_to_mint_info)
205 .transpose()?)
206 }
207
208 #[instrument(skip(self))]
209 async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
210 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
211 Ok(query(
212 r#"
213 SELECT
214 name,
215 pubkey,
216 version,
217 description,
218 description_long,
219 contact,
220 nuts,
221 icon_url,
222 motd,
223 urls,
224 mint_time,
225 tos_url,
226 mint_url
227 FROM
228 mint
229 "#,
230 )?
231 .fetch_all(&*conn)
232 .await?
233 .into_iter()
234 .map(|mut row| {
235 let url = column_as_string!(
236 row.pop().ok_or(ConversionError::MissingColumn(0, 1))?,
237 MintUrl::from_str
238 );
239
240 Ok((url, sql_row_to_mint_info(row).ok()))
241 })
242 .collect::<Result<HashMap<_, _>, Error>>()?)
243 }
244
245 #[instrument(skip(self))]
246 async fn get_mint_keysets(
247 &self,
248 mint_url: MintUrl,
249 ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
250 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
251
252 let keysets = query(
253 r#"
254 SELECT
255 id,
256 unit,
257 active,
258 input_fee_ppk,
259 final_expiry
260 FROM
261 keyset
262 WHERE mint_url = :mint_url
263 "#,
264 )?
265 .bind("mint_url", mint_url.to_string())
266 .fetch_all(&*conn)
267 .await?
268 .into_iter()
269 .map(sql_row_to_keyset)
270 .collect::<Result<Vec<_>, Error>>()?;
271
272 match keysets.is_empty() {
273 false => Ok(Some(keysets)),
274 true => Ok(None),
275 }
276 }
277
278 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
279 async fn get_keyset_by_id(
280 &self,
281 keyset_id: &Id,
282 ) -> Result<Option<KeySetInfo>, database::Error> {
283 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
284 query(
285 r#"
286 SELECT
287 id,
288 unit,
289 active,
290 input_fee_ppk,
291 final_expiry
292 FROM
293 keyset
294 WHERE id = :id
295 "#,
296 )?
297 .bind("id", keyset_id.to_string())
298 .fetch_one(&*conn)
299 .await?
300 .map(sql_row_to_keyset)
301 .transpose()
302 }
303
304 #[instrument(skip(self))]
305 async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
306 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
307 query(
308 r#"
309 SELECT
310 id,
311 mint_url,
312 amount,
313 unit,
314 request,
315 state,
316 expiry,
317 secret_key,
318 payment_method,
319 amount_issued,
320 amount_paid,
321 used_by_operation,
322 version
323 FROM
324 mint_quote
325 WHERE
326 id = :id
327 "#,
328 )?
329 .bind("id", quote_id.to_string())
330 .fetch_one(&*conn)
331 .await?
332 .map(sql_row_to_mint_quote)
333 .transpose()
334 }
335
336 #[instrument(skip(self))]
337 async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
338 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
339 Ok(query(
340 r#"
341 SELECT
342 id,
343 mint_url,
344 amount,
345 unit,
346 request,
347 state,
348 expiry,
349 secret_key,
350 payment_method,
351 amount_issued,
352 amount_paid,
353 used_by_operation,
354 version
355 FROM
356 mint_quote
357 "#,
358 )?
359 .fetch_all(&*conn)
360 .await?
361 .into_iter()
362 .map(sql_row_to_mint_quote)
363 .collect::<Result<_, _>>()?)
364 }
365
366 #[instrument(skip(self))]
367 async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
368 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
369 Ok(query(
370 r#"
371 SELECT
372 id,
373 mint_url,
374 amount,
375 unit,
376 request,
377 state,
378 expiry,
379 secret_key,
380 payment_method,
381 amount_issued,
382 amount_paid,
383 used_by_operation,
384 version
385 FROM
386 mint_quote
387 WHERE
388 amount_issued = 0
389 OR
390 payment_method = 'bolt12'
391 "#,
392 )?
393 .fetch_all(&*conn)
394 .await?
395 .into_iter()
396 .map(sql_row_to_mint_quote)
397 .collect::<Result<_, _>>()?)
398 }
399
400 #[instrument(skip(self))]
401 async fn get_melt_quote(
402 &self,
403 quote_id: &str,
404 ) -> Result<Option<wallet::MeltQuote>, database::Error> {
405 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
406 query(
407 r#"
408 SELECT
409 id,
410 unit,
411 amount,
412 request,
413 fee_reserve,
414 state,
415 expiry,
416 payment_preimage,
417 payment_method,
418 used_by_operation,
419 version
420 FROM
421 melt_quote
422 WHERE
423 id=:id
424 "#,
425 )?
426 .bind("id", quote_id.to_owned())
427 .fetch_one(&*conn)
428 .await?
429 .map(sql_row_to_melt_quote)
430 .transpose()
431 }
432
433 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
434 async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
435 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
436 query(
437 r#"
438 SELECT
439 keys
440 FROM key
441 WHERE id = :id
442 "#,
443 )?
444 .bind("id", keyset_id.to_string())
445 .pluck(&*conn)
446 .await?
447 .map(|keys| {
448 let keys = column_as_string!(keys);
449 serde_json::from_str(&keys).map_err(Error::from)
450 })
451 .transpose()
452 }
453
454 #[instrument(skip(self, state, spending_conditions))]
455 async fn get_proofs(
456 &self,
457 mint_url: Option<MintUrl>,
458 unit: Option<CurrencyUnit>,
459 state: Option<Vec<State>>,
460 spending_conditions: Option<Vec<SpendingConditions>>,
461 ) -> Result<Vec<ProofInfo>, database::Error> {
462 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
463 Ok(query(
464 r#"
465 SELECT
466 amount,
467 unit,
468 keyset_id,
469 secret,
470 c,
471 witness,
472 dleq_e,
473 dleq_s,
474 dleq_r,
475 y,
476 mint_url,
477 state,
478 spending_condition,
479 used_by_operation,
480 created_by_operation
481 FROM proof
482 "#,
483 )?
484 .fetch_all(&*conn)
485 .await?
486 .into_iter()
487 .filter_map(|row| {
488 let row = sql_row_to_proof_info(row).ok()?;
489
490 if row.matches_conditions(&mint_url, &unit, &state, &spending_conditions) {
491 Some(row)
492 } else {
493 None
494 }
495 })
496 .collect::<Vec<_>>())
497 }
498
499 #[instrument(skip(self, ys))]
500 async fn get_proofs_by_ys(
501 &self,
502 ys: Vec<PublicKey>,
503 ) -> Result<Vec<ProofInfo>, database::Error> {
504 if ys.is_empty() {
505 return Ok(Vec::new());
506 }
507
508 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
509 Ok(query(
510 r#"
511 SELECT
512 amount,
513 unit,
514 keyset_id,
515 secret,
516 c,
517 witness,
518 dleq_e,
519 dleq_s,
520 dleq_r,
521 y,
522 mint_url,
523 state,
524 spending_condition,
525 used_by_operation,
526 created_by_operation
527 FROM proof
528 WHERE y IN (:ys)
529 "#,
530 )?
531 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
532 .fetch_all(&*conn)
533 .await?
534 .into_iter()
535 .filter_map(|row| sql_row_to_proof_info(row).ok())
536 .collect::<Vec<_>>())
537 }
538
539 async fn get_balance(
540 &self,
541 mint_url: Option<MintUrl>,
542 unit: Option<CurrencyUnit>,
543 states: Option<Vec<State>>,
544 ) -> Result<u64, database::Error> {
545 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
546
547 let mut query_str = "SELECT COALESCE(SUM(amount), 0) as total FROM proof".to_string();
548 let mut where_clauses = Vec::new();
549 let states = states
550 .unwrap_or_default()
551 .into_iter()
552 .map(|x| x.to_string())
553 .collect::<Vec<_>>();
554
555 if mint_url.is_some() {
556 where_clauses.push("mint_url = :mint_url");
557 }
558 if unit.is_some() {
559 where_clauses.push("unit = :unit");
560 }
561 if !states.is_empty() {
562 where_clauses.push("state IN (:states)");
563 }
564
565 if !where_clauses.is_empty() {
566 query_str.push_str(" WHERE ");
567 query_str.push_str(&where_clauses.join(" AND "));
568 }
569
570 let mut q = query(&query_str)?;
571
572 if let Some(ref mint_url) = mint_url {
573 q = q.bind("mint_url", mint_url.to_string());
574 }
575 if let Some(ref unit) = unit {
576 q = q.bind("unit", unit.to_string());
577 }
578
579 if !states.is_empty() {
580 q = q.bind_vec("states", states);
581 }
582
583 let balance = q
584 .pluck(&*conn)
585 .await?
586 .map(|n| {
587 match n {
589 crate::stmt::Column::Integer(i) => Ok(i as u64),
590 crate::stmt::Column::Real(f) => Ok(f as u64),
591 _ => Err(Error::Database(Box::new(std::io::Error::new(
592 std::io::ErrorKind::InvalidData,
593 "Invalid balance type",
594 )))),
595 }
596 })
597 .transpose()?
598 .unwrap_or(0);
599
600 Ok(balance)
601 }
602
603 #[instrument(skip(self))]
604 async fn get_transaction(
605 &self,
606 transaction_id: TransactionId,
607 ) -> Result<Option<Transaction>, database::Error> {
608 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
609 Ok(query(
610 r#"
611 SELECT
612 mint_url,
613 direction,
614 unit,
615 amount,
616 fee,
617 ys,
618 timestamp,
619 memo,
620 metadata,
621 quote_id,
622 payment_request,
623 payment_proof,
624 payment_method,
625 saga_id
626 FROM
627 transactions
628 WHERE
629 id = :id
630 "#,
631 )?
632 .bind("id", transaction_id.as_slice().to_vec())
633 .fetch_one(&*conn)
634 .await?
635 .map(sql_row_to_transaction)
636 .transpose()?)
637 }
638
639 #[instrument(skip(self))]
640 async fn list_transactions(
641 &self,
642 mint_url: Option<MintUrl>,
643 direction: Option<TransactionDirection>,
644 unit: Option<CurrencyUnit>,
645 ) -> Result<Vec<Transaction>, database::Error> {
646 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
647
648 Ok(query(
649 r#"
650 SELECT
651 mint_url,
652 direction,
653 unit,
654 amount,
655 fee,
656 ys,
657 timestamp,
658 memo,
659 metadata,
660 quote_id,
661 payment_request,
662 payment_proof,
663 payment_method,
664 saga_id
665 FROM
666 transactions
667 "#,
668 )?
669 .fetch_all(&*conn)
670 .await?
671 .into_iter()
672 .filter_map(|row| {
673 let transaction = sql_row_to_transaction(row).ok()?;
675 if transaction.matches_conditions(&mint_url, &direction, &unit) {
676 Some(transaction)
677 } else {
678 None
679 }
680 })
681 .collect::<Vec<_>>())
682 }
683
684 #[instrument(skip(self))]
685 async fn update_proofs(
686 &self,
687 added: Vec<ProofInfo>,
688 removed_ys: Vec<PublicKey>,
689 ) -> Result<(), database::Error> {
690 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
691 let tx = ConnectionWithTransaction::new(conn).await?;
692
693 for proof in added {
694 query(
695 r#"
696 INSERT INTO proof
697 (y, mint_url, state, spending_condition, unit, amount, keyset_id, secret, c, witness, dleq_e, dleq_s, dleq_r, used_by_operation, created_by_operation)
698 VALUES
699 (:y, :mint_url, :state, :spending_condition, :unit, :amount, :keyset_id, :secret, :c, :witness, :dleq_e, :dleq_s, :dleq_r, :used_by_operation, :created_by_operation)
700 ON CONFLICT(y) DO UPDATE SET
701 mint_url = excluded.mint_url,
702 state = excluded.state,
703 spending_condition = excluded.spending_condition,
704 unit = excluded.unit,
705 amount = excluded.amount,
706 keyset_id = excluded.keyset_id,
707 secret = excluded.secret,
708 c = excluded.c,
709 witness = excluded.witness,
710 dleq_e = excluded.dleq_e,
711 dleq_s = excluded.dleq_s,
712 dleq_r = excluded.dleq_r,
713 used_by_operation = excluded.used_by_operation,
714 created_by_operation = excluded.created_by_operation
715 ;
716 "#,
717 )?
718 .bind("y", proof.y.to_bytes().to_vec())
719 .bind("mint_url", proof.mint_url.to_string())
720 .bind("state", proof.state.to_string())
721 .bind(
722 "spending_condition",
723 proof
724 .spending_condition
725 .map(|s| serde_json::to_string(&s).ok()),
726 )
727 .bind("unit", proof.unit.to_string())
728 .bind("amount", u64::from(proof.proof.amount) as i64)
729 .bind("keyset_id", proof.proof.keyset_id.to_string())
730 .bind("secret", proof.proof.secret.to_string())
731 .bind("c", proof.proof.c.to_bytes().to_vec())
732 .bind(
733 "witness",
734 proof
735 .proof
736 .witness
737 .and_then(|w| serde_json::to_string(&w).ok()),
738 )
739 .bind(
740 "dleq_e",
741 proof.proof.dleq.as_ref().map(|dleq| dleq.e.to_secret_bytes().to_vec()),
742 )
743 .bind(
744 "dleq_s",
745 proof.proof.dleq.as_ref().map(|dleq| dleq.s.to_secret_bytes().to_vec()),
746 )
747 .bind(
748 "dleq_r",
749 proof.proof.dleq.as_ref().map(|dleq| dleq.r.to_secret_bytes().to_vec()),
750 )
751 .bind("used_by_operation", proof.used_by_operation.map(|id| id.to_string()))
752 .bind("created_by_operation", proof.created_by_operation.map(|id| id.to_string()))
753 .execute(&tx)
754 .await?;
755 }
756
757 if !removed_ys.is_empty() {
758 query(r#"DELETE FROM proof WHERE y IN (:ys)"#)?
759 .bind_vec(
760 "ys",
761 removed_ys.iter().map(|y| y.to_bytes().to_vec()).collect(),
762 )
763 .execute(&tx)
764 .await?;
765 }
766
767 tx.commit().await?;
768
769 Ok(())
770 }
771
772 #[instrument(skip(self))]
773 async fn update_proofs_state(
774 &self,
775 ys: Vec<PublicKey>,
776 state: State,
777 ) -> Result<(), database::Error> {
778 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
779
780 query("UPDATE proof SET state = :state WHERE y IN (:ys)")?
781 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
782 .bind("state", state.to_string())
783 .execute(&*conn)
784 .await?;
785
786 Ok(())
787 }
788
789 #[instrument(skip(self))]
790 async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
791 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
792
793 let mint_url = transaction.mint_url.to_string();
794 let direction = transaction.direction.to_string();
795 let unit = transaction.unit.to_string();
796 let amount = u64::from(transaction.amount) as i64;
797 let fee = u64::from(transaction.fee) as i64;
798 let ys = transaction
799 .ys
800 .iter()
801 .flat_map(|y| y.to_bytes().to_vec())
802 .collect::<Vec<_>>();
803
804 let id = transaction.id();
805
806 query(
807 r#"
808 INSERT INTO transactions
809 (id, mint_url, direction, unit, amount, fee, ys, timestamp, memo, metadata, quote_id, payment_request, payment_proof, payment_method, saga_id)
810 VALUES
811 (:id, :mint_url, :direction, :unit, :amount, :fee, :ys, :timestamp, :memo, :metadata, :quote_id, :payment_request, :payment_proof, :payment_method, :saga_id)
812 ON CONFLICT(id) DO UPDATE SET
813 mint_url = excluded.mint_url,
814 direction = excluded.direction,
815 unit = excluded.unit,
816 amount = excluded.amount,
817 fee = excluded.fee,
818 timestamp = excluded.timestamp,
819 memo = excluded.memo,
820 metadata = excluded.metadata,
821 quote_id = excluded.quote_id,
822 payment_request = excluded.payment_request,
823 payment_proof = excluded.payment_proof,
824 payment_method = excluded.payment_method,
825 saga_id = excluded.saga_id
826 ;
827 "#,
828 )?
829 .bind("id", id.as_slice().to_vec())
830 .bind("mint_url", mint_url)
831 .bind("direction", direction)
832 .bind("unit", unit)
833 .bind("amount", amount)
834 .bind("fee", fee)
835 .bind("ys", ys)
836 .bind("timestamp", transaction.timestamp as i64)
837 .bind("memo", transaction.memo)
838 .bind(
839 "metadata",
840 serde_json::to_string(&transaction.metadata).map_err(Error::from)?,
841 )
842 .bind("quote_id", transaction.quote_id)
843 .bind("payment_request", transaction.payment_request)
844 .bind("payment_proof", transaction.payment_proof)
845 .bind("payment_method", transaction.payment_method.map(|pm| pm.to_string()))
846 .bind("saga_id", transaction.saga_id.map(|id| id.to_string()))
847 .execute(&*conn)
848 .await?;
849
850 Ok(())
851 }
852
853 #[instrument(skip(self))]
854 async fn update_mint_url(
855 &self,
856 old_mint_url: MintUrl,
857 new_mint_url: MintUrl,
858 ) -> Result<(), database::Error> {
859 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
860 let tx = ConnectionWithTransaction::new(conn).await?;
861 let tables = ["mint_quote", "proof"];
862
863 for table in &tables {
864 query(&format!(
865 r#"
866 UPDATE {table}
867 SET mint_url = :new_mint_url
868 WHERE mint_url = :old_mint_url
869 "#
870 ))?
871 .bind("new_mint_url", new_mint_url.to_string())
872 .bind("old_mint_url", old_mint_url.to_string())
873 .execute(&tx)
874 .await?;
875 }
876
877 tx.commit().await?;
878
879 Ok(())
880 }
881
882 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
883 async fn increment_keyset_counter(
884 &self,
885 keyset_id: &Id,
886 count: u32,
887 ) -> Result<u32, database::Error> {
888 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
889
890 let new_counter = query(
891 r#"
892 INSERT INTO keyset_counter (keyset_id, counter)
893 VALUES (:keyset_id, :count)
894 ON CONFLICT(keyset_id) DO UPDATE SET
895 counter = keyset_counter.counter + :count
896 RETURNING counter
897 "#,
898 )?
899 .bind("keyset_id", keyset_id.to_string())
900 .bind("count", count)
901 .pluck(&*conn)
902 .await?
903 .map(|n| Ok::<_, Error>(column_as_number!(n)))
904 .transpose()?
905 .ok_or_else(|| Error::Internal("Counter update returned no value".to_owned()))?;
906
907 Ok(new_counter)
908 }
909
910 #[instrument(skip(self, mint_info))]
911 async fn add_mint(
912 &self,
913 mint_url: MintUrl,
914 mint_info: Option<MintInfo>,
915 ) -> Result<(), database::Error> {
916 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
917
918 let (
919 name,
920 pubkey,
921 version,
922 description,
923 description_long,
924 contact,
925 nuts,
926 icon_url,
927 urls,
928 motd,
929 time,
930 tos_url,
931 ) = match mint_info {
932 Some(mint_info) => {
933 let MintInfo {
934 name,
935 pubkey,
936 version,
937 description,
938 description_long,
939 contact,
940 nuts,
941 icon_url,
942 urls,
943 motd,
944 time,
945 tos_url,
946 } = mint_info;
947
948 (
949 name,
950 pubkey.map(|p| p.to_bytes().to_vec()),
951 version.map(|v| serde_json::to_string(&v).ok()),
952 description,
953 description_long,
954 contact.map(|c| serde_json::to_string(&c).ok()),
955 serde_json::to_string(&nuts).ok(),
956 icon_url,
957 urls.map(|c| serde_json::to_string(&c).ok()),
958 motd,
959 time,
960 tos_url,
961 )
962 }
963 None => (
964 None, None, None, None, None, None, None, None, None, None, None, None,
965 ),
966 };
967
968 query(
969 r#"
970 INSERT INTO mint
971 (
972 mint_url, name, pubkey, version, description, description_long,
973 contact, nuts, icon_url, urls, motd, mint_time, tos_url
974 )
975 VALUES
976 (
977 :mint_url, :name, :pubkey, :version, :description, :description_long,
978 :contact, :nuts, :icon_url, :urls, :motd, :mint_time, :tos_url
979 )
980 ON CONFLICT(mint_url) DO UPDATE SET
981 name = excluded.name,
982 pubkey = excluded.pubkey,
983 version = excluded.version,
984 description = excluded.description,
985 description_long = excluded.description_long,
986 contact = excluded.contact,
987 nuts = excluded.nuts,
988 icon_url = excluded.icon_url,
989 urls = excluded.urls,
990 motd = excluded.motd,
991 mint_time = excluded.mint_time,
992 tos_url = excluded.tos_url
993 ;
994 "#,
995 )?
996 .bind("mint_url", mint_url.to_string())
997 .bind("name", name)
998 .bind("pubkey", pubkey)
999 .bind("version", version)
1000 .bind("description", description)
1001 .bind("description_long", description_long)
1002 .bind("contact", contact)
1003 .bind("nuts", nuts)
1004 .bind("icon_url", icon_url)
1005 .bind("urls", urls)
1006 .bind("motd", motd)
1007 .bind("mint_time", time.map(|v| v as i64))
1008 .bind("tos_url", tos_url)
1009 .execute(&*conn)
1010 .await?;
1011
1012 Ok(())
1013 }
1014
1015 #[instrument(skip(self))]
1016 async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
1017 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1018
1019 query(r#"DELETE FROM mint WHERE mint_url=:mint_url"#)?
1020 .bind("mint_url", mint_url.to_string())
1021 .execute(&*conn)
1022 .await?;
1023
1024 Ok(())
1025 }
1026
1027 #[instrument(skip(self, keysets))]
1028 async fn add_mint_keysets(
1029 &self,
1030 mint_url: MintUrl,
1031 keysets: Vec<KeySetInfo>,
1032 ) -> Result<(), database::Error> {
1033 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1034 let tx = ConnectionWithTransaction::new(conn).await?;
1035
1036 for keyset in keysets {
1037 query(
1038 r#"
1039 INSERT INTO keyset
1040 (mint_url, id, unit, active, input_fee_ppk, final_expiry, keyset_u32)
1041 VALUES
1042 (:mint_url, :id, :unit, :active, :input_fee_ppk, :final_expiry, :keyset_u32)
1043 ON CONFLICT(id) DO UPDATE SET
1044 active = excluded.active,
1045 input_fee_ppk = excluded.input_fee_ppk
1046 "#,
1047 )?
1048 .bind("mint_url", mint_url.to_string())
1049 .bind("id", keyset.id.to_string())
1050 .bind("unit", keyset.unit.to_string())
1051 .bind("active", keyset.active)
1052 .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
1053 .bind("final_expiry", keyset.final_expiry.map(|v| v as i64))
1054 .bind("keyset_u32", u32::from(keyset.id))
1055 .execute(&tx)
1056 .await?;
1057 }
1058
1059 tx.commit().await?;
1060
1061 Ok(())
1062 }
1063
1064 #[instrument(skip_all)]
1065 async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
1066 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1067
1068 let expected_version = quote.version;
1069 let new_version = expected_version.wrapping_add(1);
1070
1071 let rows_affected = query(
1072 r#"
1073 INSERT INTO mint_quote
1074 (id, mint_url, amount, unit, request, state, expiry, secret_key, payment_method, amount_issued, amount_paid, version)
1075 VALUES
1076 (:id, :mint_url, :amount, :unit, :request, :state, :expiry, :secret_key, :payment_method, :amount_issued, :amount_paid, :version)
1077 ON CONFLICT(id) DO UPDATE SET
1078 mint_url = excluded.mint_url,
1079 amount = excluded.amount,
1080 unit = excluded.unit,
1081 request = excluded.request,
1082 state = excluded.state,
1083 expiry = excluded.expiry,
1084 secret_key = excluded.secret_key,
1085 payment_method = excluded.payment_method,
1086 amount_issued = excluded.amount_issued,
1087 amount_paid = excluded.amount_paid,
1088 version = :new_version
1089 WHERE mint_quote.version = :expected_version
1090 ;
1091 "#,
1092 )?
1093 .bind("id", quote.id.to_string())
1094 .bind("mint_url", quote.mint_url.to_string())
1095 .bind("amount", quote.amount.map(|a| a.to_i64()))
1096 .bind("unit", quote.unit.to_string())
1097 .bind("request", quote.request)
1098 .bind("state", quote.state.to_string())
1099 .bind("expiry", quote.expiry as i64)
1100 .bind("secret_key", quote.secret_key.map(|p| p.to_string()))
1101 .bind("payment_method", quote.payment_method.to_string())
1102 .bind("amount_issued", quote.amount_issued.to_i64())
1103 .bind("amount_paid", quote.amount_paid.to_i64())
1104 .bind("version", quote.version as i64)
1105 .bind("new_version", new_version as i64)
1106 .bind("expected_version", expected_version as i64)
1107 .execute(&*conn).await?;
1108
1109 if rows_affected == 0 {
1110 return Err(database::Error::ConcurrentUpdate);
1111 }
1112
1113 Ok(())
1114 }
1115
1116 #[instrument(skip(self))]
1117 async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1118 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1119
1120 query(r#"DELETE FROM mint_quote WHERE id=:id"#)?
1121 .bind("id", quote_id.to_string())
1122 .execute(&*conn)
1123 .await?;
1124
1125 Ok(())
1126 }
1127
1128 #[instrument(skip_all)]
1129 async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
1130 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1131
1132 let expected_version = quote.version;
1133 let new_version = expected_version.wrapping_add(1);
1134
1135 let rows_affected = query(
1136 r#"
1137 INSERT INTO melt_quote
1138 (id, unit, amount, request, fee_reserve, state, expiry, payment_method, version)
1139 VALUES
1140 (:id, :unit, :amount, :request, :fee_reserve, :state, :expiry, :payment_method, :version)
1141 ON CONFLICT(id) DO UPDATE SET
1142 unit = excluded.unit,
1143 amount = excluded.amount,
1144 request = excluded.request,
1145 fee_reserve = excluded.fee_reserve,
1146 state = excluded.state,
1147 expiry = excluded.expiry,
1148 payment_method = excluded.payment_method,
1149 version = :new_version
1150 WHERE melt_quote.version = :expected_version
1151 ;
1152 "#,
1153 )?
1154 .bind("id", quote.id.to_string())
1155 .bind("unit", quote.unit.to_string())
1156 .bind("amount", u64::from(quote.amount) as i64)
1157 .bind("request", quote.request)
1158 .bind("fee_reserve", u64::from(quote.fee_reserve) as i64)
1159 .bind("state", quote.state.to_string())
1160 .bind("expiry", quote.expiry as i64)
1161 .bind("payment_method", quote.payment_method.to_string())
1162 .bind("version", quote.version as i64)
1163 .bind("new_version", new_version as i64)
1164 .bind("expected_version", expected_version as i64)
1165 .execute(&*conn)
1166 .await?;
1167
1168 if rows_affected == 0 {
1169 return Err(database::Error::ConcurrentUpdate);
1170 }
1171
1172 Ok(())
1173 }
1174
1175 #[instrument(skip(self))]
1176 async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
1177 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1178
1179 query(r#"DELETE FROM melt_quote WHERE id=:id"#)?
1180 .bind("id", quote_id.to_owned())
1181 .execute(&*conn)
1182 .await?;
1183
1184 Ok(())
1185 }
1186
1187 #[instrument(skip_all)]
1188 async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
1189 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1190
1191 keyset.verify_id()?;
1192
1193 query(
1194 r#"
1195 INSERT INTO key
1196 (id, keys, keyset_u32)
1197 VALUES
1198 (:id, :keys, :keyset_u32)
1199 "#,
1200 )?
1201 .bind("id", keyset.id.to_string())
1202 .bind(
1203 "keys",
1204 serde_json::to_string(&keyset.keys).map_err(Error::from)?,
1205 )
1206 .bind("keyset_u32", u32::from(keyset.id))
1207 .execute(&*conn)
1208 .await?;
1209
1210 Ok(())
1211 }
1212
1213 #[instrument(skip(self))]
1214 async fn remove_keys(&self, id: &Id) -> Result<(), database::Error> {
1215 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1216
1217 query(r#"DELETE FROM key WHERE id = :id"#)?
1218 .bind("id", id.to_string())
1219 .execute(&*conn)
1220 .await?;
1221
1222 Ok(())
1223 }
1224
1225 #[instrument(skip(self))]
1226 async fn remove_transaction(
1227 &self,
1228 transaction_id: TransactionId,
1229 ) -> Result<(), database::Error> {
1230 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1231
1232 query(r#"DELETE FROM transactions WHERE id=:id"#)?
1233 .bind("id", transaction_id.as_slice().to_vec())
1234 .execute(&*conn)
1235 .await?;
1236
1237 Ok(())
1238 }
1239
1240 #[instrument(skip(self))]
1241 async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
1242 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1243
1244 let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1245 Error::Database(Box::new(std::io::Error::new(
1246 std::io::ErrorKind::InvalidData,
1247 format!("Failed to serialize saga state: {}", e),
1248 )))
1249 })?;
1250
1251 let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1252 Error::Database(Box::new(std::io::Error::new(
1253 std::io::ErrorKind::InvalidData,
1254 format!("Failed to serialize saga data: {}", e),
1255 )))
1256 })?;
1257
1258 query(
1259 r#"
1260 INSERT INTO wallet_sagas
1261 (id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version)
1262 VALUES
1263 (:id, :kind, :state, :amount, :mint_url, :unit, :quote_id, :created_at, :updated_at, :data, :version)
1264 "#,
1265 )?
1266 .bind("id", saga.id.to_string())
1267 .bind("kind", saga.kind.to_string())
1268 .bind("state", state_json)
1269 .bind("amount", u64::from(saga.amount) as i64)
1270 .bind("mint_url", saga.mint_url.to_string())
1271 .bind("unit", saga.unit.to_string())
1272 .bind("quote_id", saga.quote_id)
1273 .bind("created_at", saga.created_at as i64)
1274 .bind("updated_at", saga.updated_at as i64)
1275 .bind("data", data_json)
1276 .bind("version", saga.version as i64)
1277 .execute(&*conn)
1278 .await?;
1279
1280 Ok(())
1281 }
1282
1283 #[instrument(skip(self))]
1284 async fn get_saga(
1285 &self,
1286 id: &uuid::Uuid,
1287 ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1288 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1289
1290 let rows = query(
1291 r#"
1292 SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1293 FROM wallet_sagas
1294 WHERE id = :id
1295 "#,
1296 )?
1297 .bind("id", id.to_string())
1298 .fetch_all(&*conn)
1299 .await?;
1300
1301 match rows.into_iter().next() {
1302 Some(row) => Ok(Some(sql_row_to_wallet_saga(row)?)),
1303 None => Ok(None),
1304 }
1305 }
1306
1307 #[instrument(skip(self))]
1308 async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1309 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1310
1311 let state_json = serde_json::to_string(&saga.state).map_err(|e| {
1312 Error::Database(Box::new(std::io::Error::new(
1313 std::io::ErrorKind::InvalidData,
1314 format!("Failed to serialize saga state: {}", e),
1315 )))
1316 })?;
1317
1318 let data_json = serde_json::to_string(&saga.data).map_err(|e| {
1319 Error::Database(Box::new(std::io::Error::new(
1320 std::io::ErrorKind::InvalidData,
1321 format!("Failed to serialize saga data: {}", e),
1322 )))
1323 })?;
1324
1325 let expected_version = saga.version.saturating_sub(1);
1329
1330 let rows_affected = query(
1331 r#"
1332 UPDATE wallet_sagas
1333 SET kind = :kind, state = :state, amount = :amount, mint_url = :mint_url,
1334 unit = :unit, quote_id = :quote_id, updated_at = :updated_at, data = :data,
1335 version = :new_version
1336 WHERE id = :id AND version = :expected_version
1337 "#,
1338 )?
1339 .bind("id", saga.id.to_string())
1340 .bind("kind", saga.kind.to_string())
1341 .bind("state", state_json)
1342 .bind("amount", u64::from(saga.amount) as i64)
1343 .bind("mint_url", saga.mint_url.to_string())
1344 .bind("unit", saga.unit.to_string())
1345 .bind("quote_id", saga.quote_id)
1346 .bind("updated_at", saga.updated_at as i64)
1347 .bind("data", data_json)
1348 .bind("new_version", saga.version as i64)
1349 .bind("expected_version", expected_version as i64)
1350 .execute(&*conn)
1351 .await?;
1352
1353 Ok(rows_affected > 0)
1355 }
1356
1357 #[instrument(skip(self))]
1358 async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1359 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1360
1361 query(r#"DELETE FROM wallet_sagas WHERE id = :id"#)?
1362 .bind("id", id.to_string())
1363 .execute(&*conn)
1364 .await?;
1365
1366 Ok(())
1367 }
1368
1369 #[instrument(skip(self))]
1370 async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1371 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1372
1373 let rows = query(
1374 r#"
1375 SELECT id, kind, state, amount, mint_url, unit, quote_id, created_at, updated_at, data, version
1376 FROM wallet_sagas
1377 ORDER BY created_at ASC
1378 "#,
1379 )?
1380 .fetch_all(&*conn)
1381 .await?;
1382
1383 rows.into_iter().map(sql_row_to_wallet_saga).collect()
1384 }
1385
1386 #[instrument(skip(self))]
1387 async fn reserve_proofs(
1388 &self,
1389 ys: Vec<PublicKey>,
1390 operation_id: &uuid::Uuid,
1391 ) -> Result<(), database::Error> {
1392 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1393
1394 for y in ys {
1395 let rows_affected = query(
1396 r#"
1397 UPDATE proof
1398 SET state = 'RESERVED', used_by_operation = :operation_id
1399 WHERE y = :y AND state = 'UNSPENT'
1400 "#,
1401 )?
1402 .bind("y", y.to_bytes().to_vec())
1403 .bind("operation_id", operation_id.to_string())
1404 .execute(&*conn)
1405 .await?;
1406
1407 if rows_affected == 0 {
1408 return Err(database::Error::ProofNotUnspent);
1409 }
1410 }
1411
1412 Ok(())
1413 }
1414
1415 #[instrument(skip(self))]
1416 async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1417 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1418
1419 query(
1420 r#"
1421 UPDATE proof
1422 SET state = 'UNSPENT', used_by_operation = NULL
1423 WHERE used_by_operation = :operation_id
1424 "#,
1425 )?
1426 .bind("operation_id", operation_id.to_string())
1427 .execute(&*conn)
1428 .await?;
1429
1430 Ok(())
1431 }
1432
1433 #[instrument(skip(self))]
1434 async fn get_reserved_proofs(
1435 &self,
1436 operation_id: &uuid::Uuid,
1437 ) -> Result<Vec<ProofInfo>, database::Error> {
1438 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1439
1440 let rows = query(
1441 r#"
1442 SELECT
1443 amount,
1444 unit,
1445 keyset_id,
1446 secret,
1447 c,
1448 witness,
1449 dleq_e,
1450 dleq_s,
1451 dleq_r,
1452 y,
1453 mint_url,
1454 state,
1455 spending_condition,
1456 used_by_operation,
1457 created_by_operation
1458 FROM proof
1459 WHERE used_by_operation = :operation_id
1460 "#,
1461 )?
1462 .bind("operation_id", operation_id.to_string())
1463 .fetch_all(&*conn)
1464 .await?;
1465
1466 rows.into_iter().map(sql_row_to_proof_info).collect()
1467 }
1468
1469 #[instrument(skip(self))]
1470 async fn reserve_melt_quote(
1471 &self,
1472 quote_id: &str,
1473 operation_id: &uuid::Uuid,
1474 ) -> Result<(), database::Error> {
1475 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1476
1477 let rows_affected = query(
1478 r#"
1479 UPDATE melt_quote
1480 SET used_by_operation = :operation_id
1481 WHERE id = :quote_id AND used_by_operation IS NULL
1482 "#,
1483 )?
1484 .bind("operation_id", operation_id.to_string())
1485 .bind("quote_id", quote_id)
1486 .execute(&*conn)
1487 .await?;
1488
1489 if rows_affected == 0 {
1490 let exists = query(
1492 r#"
1493 SELECT 1 FROM melt_quote WHERE id = :quote_id
1494 "#,
1495 )?
1496 .bind("quote_id", quote_id)
1497 .fetch_one(&*conn)
1498 .await?;
1499
1500 if exists.is_none() {
1501 return Err(database::Error::UnknownQuote);
1502 }
1503 return Err(database::Error::QuoteAlreadyInUse);
1504 }
1505
1506 Ok(())
1507 }
1508
1509 #[instrument(skip(self))]
1510 async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1511 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1512
1513 query(
1514 r#"
1515 UPDATE melt_quote
1516 SET used_by_operation = NULL
1517 WHERE used_by_operation = :operation_id
1518 "#,
1519 )?
1520 .bind("operation_id", operation_id.to_string())
1521 .execute(&*conn)
1522 .await?;
1523
1524 Ok(())
1525 }
1526
1527 #[instrument(skip(self))]
1528 async fn reserve_mint_quote(
1529 &self,
1530 quote_id: &str,
1531 operation_id: &uuid::Uuid,
1532 ) -> Result<(), database::Error> {
1533 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1534
1535 let rows_affected = query(
1536 r#"
1537 UPDATE mint_quote
1538 SET used_by_operation = :operation_id
1539 WHERE id = :quote_id AND used_by_operation IS NULL
1540 "#,
1541 )?
1542 .bind("operation_id", operation_id.to_string())
1543 .bind("quote_id", quote_id)
1544 .execute(&*conn)
1545 .await?;
1546
1547 if rows_affected == 0 {
1548 let exists = query(
1550 r#"
1551 SELECT 1 FROM mint_quote WHERE id = :quote_id
1552 "#,
1553 )?
1554 .bind("quote_id", quote_id)
1555 .fetch_one(&*conn)
1556 .await?;
1557
1558 if exists.is_none() {
1559 return Err(database::Error::UnknownQuote);
1560 }
1561 return Err(database::Error::QuoteAlreadyInUse);
1562 }
1563
1564 Ok(())
1565 }
1566
1567 #[instrument(skip(self))]
1568 async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1569 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1570
1571 query(
1572 r#"
1573 UPDATE mint_quote
1574 SET used_by_operation = NULL
1575 WHERE used_by_operation = :operation_id
1576 "#,
1577 )?
1578 .bind("operation_id", operation_id.to_string())
1579 .execute(&*conn)
1580 .await?;
1581
1582 Ok(())
1583 }
1584
1585 async fn kv_read(
1586 &self,
1587 primary_namespace: &str,
1588 secondary_namespace: &str,
1589 key: &str,
1590 ) -> Result<Option<Vec<u8>>, database::Error> {
1591 crate::keyvalue::kv_read(&self.pool, primary_namespace, secondary_namespace, key).await
1592 }
1593
1594 async fn kv_list(
1595 &self,
1596 primary_namespace: &str,
1597 secondary_namespace: &str,
1598 ) -> Result<Vec<String>, database::Error> {
1599 crate::keyvalue::kv_list(&self.pool, primary_namespace, secondary_namespace).await
1600 }
1601
1602 async fn kv_write(
1603 &self,
1604 primary_namespace: &str,
1605 secondary_namespace: &str,
1606 key: &str,
1607 value: &[u8],
1608 ) -> Result<(), database::Error> {
1609 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1610 crate::keyvalue::kv_write_standalone(
1611 &*conn,
1612 primary_namespace,
1613 secondary_namespace,
1614 key,
1615 value,
1616 )
1617 .await?;
1618 Ok(())
1619 }
1620
1621 async fn kv_remove(
1622 &self,
1623 primary_namespace: &str,
1624 secondary_namespace: &str,
1625 key: &str,
1626 ) -> Result<(), database::Error> {
1627 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1628 crate::keyvalue::kv_remove_standalone(&*conn, primary_namespace, secondary_namespace, key)
1629 .await?;
1630 Ok(())
1631 }
1632}
1633
1634fn sql_row_to_mint_info(row: Vec<Column>) -> Result<MintInfo, Error> {
1635 unpack_into!(
1636 let (
1637 name,
1638 pubkey,
1639 version,
1640 description,
1641 description_long,
1642 contact,
1643 nuts,
1644 icon_url,
1645 motd,
1646 urls,
1647 mint_time,
1648 tos_url
1649 ) = row
1650 );
1651
1652 Ok(MintInfo {
1653 name: column_as_nullable_string!(&name),
1654 pubkey: column_as_nullable_string!(&pubkey, |v| serde_json::from_str(v).ok(), |v| {
1655 serde_json::from_slice(v).ok()
1656 }),
1657 version: column_as_nullable_string!(&version).and_then(|v| serde_json::from_str(&v).ok()),
1658 description: column_as_nullable_string!(description),
1659 description_long: column_as_nullable_string!(description_long),
1660 contact: column_as_nullable_string!(contact, |v| serde_json::from_str(&v).ok()),
1661 nuts: column_as_nullable_string!(nuts, |v| serde_json::from_str(&v).ok())
1662 .unwrap_or_default(),
1663 urls: column_as_nullable_string!(urls, |v| serde_json::from_str(&v).ok()),
1664 icon_url: column_as_nullable_string!(icon_url),
1665 motd: column_as_nullable_string!(motd),
1666 time: column_as_nullable_number!(mint_time).map(|t| t),
1667 tos_url: column_as_nullable_string!(tos_url),
1668 })
1669}
1670
1671#[instrument(skip_all)]
1672fn sql_row_to_keyset(row: Vec<Column>) -> Result<KeySetInfo, Error> {
1673 unpack_into!(
1674 let (
1675 id,
1676 unit,
1677 active,
1678 input_fee_ppk,
1679 final_expiry
1680 ) = row
1681 );
1682
1683 Ok(KeySetInfo {
1684 id: column_as_string!(id, Id::from_str, Id::from_bytes),
1685 unit: column_as_string!(unit, CurrencyUnit::from_str),
1686 active: matches!(active, Column::Integer(1)),
1687 input_fee_ppk: column_as_nullable_number!(input_fee_ppk).unwrap_or(0),
1688 final_expiry: column_as_nullable_number!(final_expiry),
1689 })
1690}
1691
1692fn sql_row_to_mint_quote(row: Vec<Column>) -> Result<MintQuote, Error> {
1693 unpack_into!(
1694 let (
1695 id,
1696 mint_url,
1697 amount,
1698 unit,
1699 request,
1700 state,
1701 expiry,
1702 secret_key,
1703 row_method,
1704 row_amount_minted,
1705 row_amount_paid,
1706 used_by_operation,
1707 version
1708 ) = row
1709 );
1710
1711 let amount: Option<i64> = column_as_nullable_number!(amount);
1712
1713 let amount_paid: u64 = column_as_number!(row_amount_paid);
1714 let amount_minted: u64 = column_as_number!(row_amount_minted);
1715 let expiry_val: u64 = column_as_number!(expiry);
1716 let version_val: u32 = column_as_number!(version);
1717 let payment_method =
1718 PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1719
1720 Ok(MintQuote {
1721 id: column_as_string!(id),
1722 mint_url: column_as_string!(mint_url, MintUrl::from_str),
1723 amount: amount.and_then(Amount::from_i64),
1724 unit: column_as_string!(unit, CurrencyUnit::from_str),
1725 request: column_as_string!(request),
1726 state: column_as_string!(state, MintQuoteState::from_str),
1727 expiry: expiry_val,
1728 secret_key: column_as_nullable_string!(secret_key, |s| SecretKey::from_str(&s).ok()),
1729 payment_method,
1730 amount_issued: Amount::from(amount_minted),
1731 amount_paid: Amount::from(amount_paid),
1732 used_by_operation: column_as_nullable_string!(used_by_operation),
1733 version: version_val,
1734 })
1735}
1736
1737fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<wallet::MeltQuote, Error> {
1738 unpack_into!(
1739 let (
1740 id,
1741 unit,
1742 amount,
1743 request,
1744 fee_reserve,
1745 state,
1746 expiry,
1747 payment_preimage,
1748 row_method,
1749 used_by_operation,
1750 version
1751 ) = row
1752 );
1753
1754 let payment_method =
1755 PaymentMethod::from_str(&column_as_string!(row_method)).map_err(Error::from)?;
1756
1757 let amount_val: u64 = column_as_number!(amount);
1758 let fee_reserve_val: u64 = column_as_number!(fee_reserve);
1759 let expiry_val: u64 = column_as_number!(expiry);
1760 let version_val: u32 = column_as_number!(version);
1761
1762 Ok(wallet::MeltQuote {
1763 id: column_as_string!(id),
1764 unit: column_as_string!(unit, CurrencyUnit::from_str),
1765 amount: Amount::from(amount_val),
1766 request: column_as_string!(request),
1767 fee_reserve: Amount::from(fee_reserve_val),
1768 state: column_as_string!(state, MeltQuoteState::from_str),
1769 expiry: expiry_val,
1770 payment_preimage: column_as_nullable_string!(payment_preimage),
1771 payment_method,
1772 used_by_operation: column_as_nullable_string!(used_by_operation),
1773 version: version_val,
1774 })
1775}
1776
1777fn sql_row_to_proof_info(row: Vec<Column>) -> Result<ProofInfo, Error> {
1778 unpack_into!(
1779 let (
1780 amount,
1781 unit,
1782 keyset_id,
1783 secret,
1784 c,
1785 witness,
1786 dleq_e,
1787 dleq_s,
1788 dleq_r,
1789 y,
1790 mint_url,
1791 state,
1792 spending_condition,
1793 used_by_operation,
1794 created_by_operation
1795 ) = row
1796 );
1797
1798 let dleq = match (
1799 column_as_nullable_binary!(dleq_e),
1800 column_as_nullable_binary!(dleq_s),
1801 column_as_nullable_binary!(dleq_r),
1802 ) {
1803 (Some(e), Some(s), Some(r)) => {
1804 let e_key = SecretKey::from_slice(&e)?;
1805 let s_key = SecretKey::from_slice(&s)?;
1806 let r_key = SecretKey::from_slice(&r)?;
1807
1808 Some(ProofDleq::new(e_key, s_key, r_key))
1809 }
1810 _ => None,
1811 };
1812
1813 let amount: u64 = column_as_number!(amount);
1814 let proof = Proof {
1815 amount: Amount::from(amount),
1816 keyset_id: column_as_string!(keyset_id, Id::from_str),
1817 secret: column_as_string!(secret, Secret::from_str),
1818 witness: column_as_nullable_string!(witness, |v| { serde_json::from_str(&v).ok() }, |v| {
1819 serde_json::from_slice(&v).ok()
1820 }),
1821 c: column_as_string!(c, PublicKey::from_str, PublicKey::from_slice),
1822 dleq,
1823 };
1824
1825 let used_by_operation =
1826 column_as_nullable_string!(used_by_operation).and_then(|id| Uuid::from_str(&id).ok());
1827 let created_by_operation =
1828 column_as_nullable_string!(created_by_operation).and_then(|id| Uuid::from_str(&id).ok());
1829
1830 Ok(ProofInfo {
1831 proof,
1832 y: column_as_string!(y, PublicKey::from_str, PublicKey::from_slice),
1833 mint_url: column_as_string!(mint_url, MintUrl::from_str),
1834 state: column_as_string!(state, State::from_str),
1835 spending_condition: column_as_nullable_string!(
1836 spending_condition,
1837 |r| { serde_json::from_str(&r).ok() },
1838 |r| { serde_json::from_slice(&r).ok() }
1839 ),
1840 unit: column_as_string!(unit, CurrencyUnit::from_str),
1841 used_by_operation,
1842 created_by_operation,
1843 })
1844}
1845
1846fn sql_row_to_wallet_saga(row: Vec<Column>) -> Result<wallet::WalletSaga, Error> {
1847 unpack_into!(
1848 let (
1849 id,
1850 kind,
1851 state,
1852 amount,
1853 mint_url,
1854 unit,
1855 quote_id,
1856 created_at,
1857 updated_at,
1858 data,
1859 version
1860 ) = row
1861 );
1862
1863 let id_str: String = column_as_string!(id);
1864 let id = uuid::Uuid::parse_str(&id_str).map_err(|e| {
1865 Error::Database(Box::new(std::io::Error::new(
1866 std::io::ErrorKind::InvalidData,
1867 format!("Invalid UUID: {}", e),
1868 )))
1869 })?;
1870 let kind_str: String = column_as_string!(kind);
1871 let state_json: String = column_as_string!(state);
1872 let amount: u64 = column_as_number!(amount);
1873 let mint_url: MintUrl = column_as_string!(mint_url, MintUrl::from_str);
1874 let unit: CurrencyUnit = column_as_string!(unit, CurrencyUnit::from_str);
1875 let quote_id: Option<String> = column_as_nullable_string!(quote_id);
1876 let created_at: u64 = column_as_number!(created_at);
1877 let updated_at: u64 = column_as_number!(updated_at);
1878 let data_json: String = column_as_string!(data);
1879 let version: u32 = column_as_number!(version);
1880
1881 let kind = wallet::OperationKind::from_str(&kind_str).map_err(|_| {
1882 Error::Database(Box::new(std::io::Error::new(
1883 std::io::ErrorKind::InvalidData,
1884 format!("Invalid operation kind: {}", kind_str),
1885 )))
1886 })?;
1887 let state: wallet::WalletSagaState = serde_json::from_str(&state_json).map_err(|e| {
1888 Error::Database(Box::new(std::io::Error::new(
1889 std::io::ErrorKind::InvalidData,
1890 format!("Failed to deserialize saga state: {}", e),
1891 )))
1892 })?;
1893 let data: wallet::OperationData = serde_json::from_str(&data_json).map_err(|e| {
1894 Error::Database(Box::new(std::io::Error::new(
1895 std::io::ErrorKind::InvalidData,
1896 format!("Failed to deserialize saga data: {}", e),
1897 )))
1898 })?;
1899
1900 Ok(wallet::WalletSaga {
1901 id,
1902 kind,
1903 state,
1904 amount: Amount::from(amount),
1905 mint_url,
1906 unit,
1907 quote_id,
1908 created_at,
1909 updated_at,
1910 data,
1911 version,
1912 })
1913}
1914
1915fn sql_row_to_transaction(row: Vec<Column>) -> Result<Transaction, Error> {
1916 unpack_into!(
1917 let (
1918 mint_url,
1919 direction,
1920 unit,
1921 amount,
1922 fee,
1923 ys,
1924 timestamp,
1925 memo,
1926 metadata,
1927 quote_id,
1928 payment_request,
1929 payment_proof,
1930 payment_method,
1931 saga_id
1932 ) = row
1933 );
1934
1935 let amount: u64 = column_as_number!(amount);
1936 let fee: u64 = column_as_number!(fee);
1937
1938 let saga_id: Option<Uuid> = column_as_nullable_string!(saga_id)
1939 .map(|id| Uuid::from_str(&id).ok())
1940 .flatten();
1941
1942 Ok(Transaction {
1943 mint_url: column_as_string!(mint_url, MintUrl::from_str),
1944 direction: column_as_string!(direction, TransactionDirection::from_str),
1945 unit: column_as_string!(unit, CurrencyUnit::from_str),
1946 amount: Amount::from(amount),
1947 fee: Amount::from(fee),
1948 ys: column_as_binary!(ys)
1949 .chunks(33)
1950 .map(PublicKey::from_slice)
1951 .collect::<Result<Vec<_>, _>>()?,
1952 timestamp: column_as_number!(timestamp),
1953 memo: column_as_nullable_string!(memo),
1954 metadata: column_as_nullable_string!(metadata, |v| serde_json::from_str(&v).ok(), |v| {
1955 serde_json::from_slice(&v).ok()
1956 })
1957 .unwrap_or_default(),
1958 quote_id: column_as_nullable_string!(quote_id),
1959 payment_request: column_as_nullable_string!(payment_request),
1960 payment_proof: column_as_nullable_string!(payment_proof),
1961 payment_method: column_as_nullable_string!(payment_method)
1962 .map(|v| PaymentMethod::from_str(&v))
1963 .transpose()
1964 .map_err(Error::from)?,
1965 saga_id,
1966 })
1967}