1use std::collections::HashMap;
12use std::fmt::Debug;
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use bitcoin::bip32::DerivationPath;
18use cdk_common::database::mint::{validate_kvstore_params, SagaDatabase, SagaTransaction};
19use cdk_common::database::{
20 self, ConversionError, Error, MintDatabase, MintDbWriterFinalizer, MintKeyDatabaseTransaction,
21 MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, MintQuotesTransaction,
22 MintSignatureTransaction, MintSignaturesDatabase,
23};
24use cdk_common::mint::{
25 self, IncomingPayment, Issuance, MeltPaymentRequest, MeltQuote, MintKeySetInfo, MintQuote,
26 Operation,
27};
28use cdk_common::nut00::ProofsMethods;
29use cdk_common::payment::PaymentIdentifier;
30use cdk_common::quote_id::QuoteId;
31use cdk_common::secret::Secret;
32use cdk_common::state::{check_melt_quote_state_transition, check_state_transition};
33use cdk_common::util::unix_time;
34use cdk_common::{
35 Amount, BlindSignature, BlindSignatureDleq, BlindedMessage, CurrencyUnit, Id, MeltQuoteState,
36 PaymentMethod, Proof, Proofs, PublicKey, SecretKey, State,
37};
38use lightning_invoice::Bolt11Invoice;
39use migrations::MIGRATIONS;
40use tracing::instrument;
41
42use crate::common::migrate;
43use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
44use crate::pool::{DatabasePool, Pool, PooledResource};
45use crate::stmt::{query, Column};
46use crate::{
47 column_as_nullable_number, column_as_nullable_string, column_as_number, column_as_string,
48 unpack_into,
49};
50
51#[cfg(feature = "auth")]
52mod auth;
53
54#[rustfmt::skip]
55mod migrations {
56 include!(concat!(env!("OUT_DIR"), "/migrations_mint.rs"));
57}
58
59#[cfg(feature = "auth")]
60pub use auth::SQLMintAuthDatabase;
61#[cfg(feature = "prometheus")]
62use cdk_prometheus::METRICS;
63
64#[derive(Debug, Clone)]
66pub struct SQLMintDatabase<RM>
67where
68 RM: DatabasePool + 'static,
69{
70 pool: Arc<Pool<RM>>,
71}
72
73pub struct SQLTransaction<RM>
75where
76 RM: DatabasePool + 'static,
77{
78 inner: ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
79}
80
81#[inline(always)]
82async fn get_current_states<C>(
83 conn: &C,
84 ys: &[PublicKey],
85) -> Result<HashMap<PublicKey, State>, Error>
86where
87 C: DatabaseExecutor + Send + Sync,
88{
89 if ys.is_empty() {
90 return Ok(Default::default());
91 }
92 query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)?
93 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
94 .fetch_all(conn)
95 .await?
96 .into_iter()
97 .map(|row| {
98 Ok((
99 column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
100 column_as_string!(&row[1], State::from_str),
101 ))
102 })
103 .collect::<Result<HashMap<_, _>, _>>()
104}
105
106impl<RM> SQLMintDatabase<RM>
107where
108 RM: DatabasePool + 'static,
109{
110 pub async fn new<X>(db: X) -> Result<Self, Error>
112 where
113 X: Into<RM::Config>,
114 {
115 let pool = Pool::new(db.into());
116
117 Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
118
119 Ok(Self { pool })
120 }
121
122 async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
124 let tx = ConnectionWithTransaction::new(conn).await?;
125 migrate(&tx, RM::Connection::name(), MIGRATIONS).await?;
126 tx.commit().await?;
127 Ok(())
128 }
129}
130
131#[async_trait]
132impl<RM> database::MintProofsTransaction<'_> for SQLTransaction<RM>
133where
134 RM: DatabasePool + 'static,
135{
136 type Err = Error;
137
138 async fn add_proofs(
139 &mut self,
140 proofs: Proofs,
141 quote_id: Option<QuoteId>,
142 operation: &Operation,
143 ) -> Result<(), Self::Err> {
144 let current_time = unix_time();
145
146 match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
149 .bind_vec(
150 "ys",
151 proofs
152 .iter()
153 .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
154 .collect::<Result<_, _>>()?,
155 )
156 .pluck(&self.inner)
157 .await?
158 .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
159 .transpose()?
160 {
161 Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
162 Some(_) => Err(database::Error::Duplicate),
163 None => Ok(()), }?;
165
166 for proof in proofs {
167 query(
168 r#"
169 INSERT INTO proof
170 (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
171 VALUES
172 (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
173 "#,
174 )?
175 .bind("y", proof.y()?.to_bytes().to_vec())
176 .bind("amount", proof.amount.to_i64())
177 .bind("keyset_id", proof.keyset_id.to_string())
178 .bind("secret", proof.secret.to_string())
179 .bind("c", proof.c.to_bytes().to_vec())
180 .bind(
181 "witness",
182 proof.witness.map(|w| serde_json::to_string(&w).unwrap()),
183 )
184 .bind("state", "UNSPENT".to_string())
185 .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
186 .bind("created_time", current_time as i64)
187 .bind("operation_kind", operation.kind())
188 .bind("operation_id", operation.id().to_string())
189 .execute(&self.inner)
190 .await?;
191 }
192
193 Ok(())
194 }
195
196 async fn update_proofs_states(
197 &mut self,
198 ys: &[PublicKey],
199 new_state: State,
200 ) -> Result<Vec<Option<State>>, Self::Err> {
201 let mut current_states = get_current_states(&self.inner, ys).await?;
202
203 if current_states.len() != ys.len() {
204 tracing::warn!(
205 "Attempted to update state of non-existent proof {} {}",
206 current_states.len(),
207 ys.len()
208 );
209 return Err(database::Error::ProofNotFound);
210 }
211
212 for state in current_states.values() {
213 check_state_transition(*state, new_state)?;
214 }
215
216 query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
217 .bind("new_state", new_state.to_string())
218 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
219 .execute(&self.inner)
220 .await?;
221
222 if new_state == State::Spent {
223 query(
224 r#"
225 INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
226 SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
227 FROM proof
228 WHERE y IN (:ys)
229 GROUP BY keyset_id
230 ON CONFLICT (keyset_id)
231 DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
232 "#,
233 )?
234 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
235 .execute(&self.inner)
236 .await?;
237 }
238
239 Ok(ys.iter().map(|y| current_states.remove(y)).collect())
240 }
241
242 async fn remove_proofs(
243 &mut self,
244 ys: &[PublicKey],
245 _quote_id: Option<QuoteId>,
246 ) -> Result<(), Self::Err> {
247 if ys.is_empty() {
248 return Ok(());
249 }
250 let total_deleted = query(
251 r#"
252 DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
253 "#,
254 )?
255 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
256 .bind_vec("exclude_state", vec![State::Spent.to_string()])
257 .execute(&self.inner)
258 .await?;
259
260 if total_deleted != ys.len() {
261 return Err(Self::Err::AttemptRemoveSpentProof);
262 }
263
264 Ok(())
265 }
266
267 async fn get_proof_ys_by_quote_id(
268 &self,
269 quote_id: &QuoteId,
270 ) -> Result<Vec<PublicKey>, Self::Err> {
271 Ok(query(
272 r#"
273 SELECT
274 amount,
275 keyset_id,
276 secret,
277 c,
278 witness
279 FROM
280 proof
281 WHERE
282 quote_id = :quote_id
283 "#,
284 )?
285 .bind("quote_id", quote_id.to_string())
286 .fetch_all(&self.inner)
287 .await?
288 .into_iter()
289 .map(sql_row_to_proof)
290 .collect::<Result<Vec<Proof>, _>>()?
291 .ys()?)
292 }
293}
294
295#[async_trait]
296impl<RM> database::MintTransaction<'_, Error> for SQLTransaction<RM> where RM: DatabasePool + 'static
297{}
298
299#[async_trait]
300impl<RM> MintDbWriterFinalizer for SQLTransaction<RM>
301where
302 RM: DatabasePool + 'static,
303{
304 type Err = Error;
305
306 async fn commit(self: Box<Self>) -> Result<(), Error> {
307 let result = self.inner.commit().await;
308 #[cfg(feature = "prometheus")]
309 {
310 let success = result.is_ok();
311 METRICS.record_mint_operation("transaction_commit", success);
312 METRICS.record_mint_operation_histogram("transaction_commit", success, 1.0);
313 }
314
315 Ok(result?)
316 }
317
318 async fn rollback(self: Box<Self>) -> Result<(), Error> {
319 let result = self.inner.rollback().await;
320
321 #[cfg(feature = "prometheus")]
322 {
323 let success = result.is_ok();
324 METRICS.record_mint_operation("transaction_rollback", success);
325 METRICS.record_mint_operation_histogram("transaction_rollback", success, 1.0);
326 }
327 Ok(result?)
328 }
329}
330
331#[inline(always)]
332async fn get_mint_quote_payments<C>(
333 conn: &C,
334 quote_id: &QuoteId,
335) -> Result<Vec<IncomingPayment>, Error>
336where
337 C: DatabaseExecutor + Send + Sync,
338{
339 query(
341 r#"
342 SELECT
343 payment_id,
344 timestamp,
345 amount
346 FROM
347 mint_quote_payments
348 WHERE
349 quote_id=:quote_id
350 "#,
351 )?
352 .bind("quote_id", quote_id.to_string())
353 .fetch_all(conn)
354 .await?
355 .into_iter()
356 .map(|row| {
357 let amount: u64 = column_as_number!(row[2].clone());
358 let time: u64 = column_as_number!(row[1].clone());
359 Ok(IncomingPayment::new(
360 amount.into(),
361 column_as_string!(&row[0]),
362 time,
363 ))
364 })
365 .collect()
366}
367
368#[inline(always)]
369async fn get_mint_quote_issuance<C>(conn: &C, quote_id: &QuoteId) -> Result<Vec<Issuance>, Error>
370where
371 C: DatabaseExecutor + Send + Sync,
372{
373 query(
375 r#"
376SELECT amount, timestamp
377FROM mint_quote_issued
378WHERE quote_id=:quote_id
379 "#,
380 )?
381 .bind("quote_id", quote_id.to_string())
382 .fetch_all(conn)
383 .await?
384 .into_iter()
385 .map(|row| {
386 let time: u64 = column_as_number!(row[1].clone());
387 Ok(Issuance::new(
388 Amount::from_i64(column_as_number!(row[0].clone()))
389 .expect("Is amount when put into db"),
390 time,
391 ))
392 })
393 .collect()
394}
395
396#[async_trait]
397impl<RM> MintKeyDatabaseTransaction<'_, Error> for SQLTransaction<RM>
398where
399 RM: DatabasePool + 'static,
400{
401 async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error> {
402 query(
403 r#"
404 INSERT INTO
405 keyset (
406 id, unit, active, valid_from, valid_to, derivation_path,
407 max_order, amounts, input_fee_ppk, derivation_path_index
408 )
409 VALUES (
410 :id, :unit, :active, :valid_from, :valid_to, :derivation_path,
411 :max_order, :amounts, :input_fee_ppk, :derivation_path_index
412 )
413 ON CONFLICT(id) DO UPDATE SET
414 unit = excluded.unit,
415 active = excluded.active,
416 valid_from = excluded.valid_from,
417 valid_to = excluded.valid_to,
418 derivation_path = excluded.derivation_path,
419 max_order = excluded.max_order,
420 amounts = excluded.amounts,
421 input_fee_ppk = excluded.input_fee_ppk,
422 derivation_path_index = excluded.derivation_path_index
423 "#,
424 )?
425 .bind("id", keyset.id.to_string())
426 .bind("unit", keyset.unit.to_string())
427 .bind("active", keyset.active)
428 .bind("valid_from", keyset.valid_from as i64)
429 .bind("valid_to", keyset.final_expiry.map(|v| v as i64))
430 .bind("derivation_path", keyset.derivation_path.to_string())
431 .bind("max_order", keyset.max_order)
432 .bind("amounts", serde_json::to_string(&keyset.amounts).ok())
433 .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
434 .bind("derivation_path_index", keyset.derivation_path_index)
435 .execute(&self.inner)
436 .await?;
437
438 Ok(())
439 }
440
441 async fn set_active_keyset(&mut self, unit: CurrencyUnit, id: Id) -> Result<(), Error> {
442 query(r#"UPDATE keyset SET active=FALSE WHERE unit = :unit"#)?
443 .bind("unit", unit.to_string())
444 .execute(&self.inner)
445 .await?;
446
447 query(r#"UPDATE keyset SET active=TRUE WHERE unit = :unit AND id = :id"#)?
448 .bind("unit", unit.to_string())
449 .bind("id", id.to_string())
450 .execute(&self.inner)
451 .await?;
452
453 Ok(())
454 }
455}
456
457#[async_trait]
458impl<RM> MintKeysDatabase for SQLMintDatabase<RM>
459where
460 RM: DatabasePool + 'static,
461{
462 type Err = Error;
463
464 async fn begin_transaction<'a>(
465 &'a self,
466 ) -> Result<Box<dyn MintKeyDatabaseTransaction<'a, Error> + Send + Sync + 'a>, Error> {
467 let tx = SQLTransaction {
468 inner: ConnectionWithTransaction::new(
469 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
470 )
471 .await?,
472 };
473
474 Ok(Box::new(tx))
475 }
476
477 async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
478 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
479 Ok(
480 query(r#" SELECT id FROM keyset WHERE active = :active AND unit = :unit"#)?
481 .bind("active", true)
482 .bind("unit", unit.to_string())
483 .pluck(&*conn)
484 .await?
485 .map(|id| match id {
486 Column::Text(text) => Ok(Id::from_str(&text)?),
487 Column::Blob(id) => Ok(Id::from_bytes(&id)?),
488 _ => Err(Error::InvalidKeysetId),
489 })
490 .transpose()?,
491 )
492 }
493
494 async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> {
495 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
496 Ok(
497 query(r#"SELECT id, unit FROM keyset WHERE active = :active"#)?
498 .bind("active", true)
499 .fetch_all(&*conn)
500 .await?
501 .into_iter()
502 .map(|row| {
503 Ok((
504 column_as_string!(&row[1], CurrencyUnit::from_str),
505 column_as_string!(&row[0], Id::from_str, Id::from_bytes),
506 ))
507 })
508 .collect::<Result<HashMap<_, _>, Error>>()?,
509 )
510 }
511
512 async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
513 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
514 Ok(query(
515 r#"SELECT
516 id,
517 unit,
518 active,
519 valid_from,
520 valid_to,
521 derivation_path,
522 derivation_path_index,
523 max_order,
524 amounts,
525 input_fee_ppk
526 FROM
527 keyset
528 WHERE id=:id"#,
529 )?
530 .bind("id", id.to_string())
531 .fetch_one(&*conn)
532 .await?
533 .map(sql_row_to_keyset_info)
534 .transpose()?)
535 }
536
537 async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
538 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
539 Ok(query(
540 r#"SELECT
541 id,
542 unit,
543 active,
544 valid_from,
545 valid_to,
546 derivation_path,
547 derivation_path_index,
548 max_order,
549 amounts,
550 input_fee_ppk
551 FROM
552 keyset
553 "#,
554 )?
555 .fetch_all(&*conn)
556 .await?
557 .into_iter()
558 .map(sql_row_to_keyset_info)
559 .collect::<Result<Vec<_>, _>>()?)
560 }
561}
562
563#[async_trait]
564impl<RM> MintQuotesTransaction<'_> for SQLTransaction<RM>
565where
566 RM: DatabasePool + 'static,
567{
568 type Err = Error;
569
570 async fn add_melt_request(
571 &mut self,
572 quote_id: &QuoteId,
573 inputs_amount: Amount,
574 inputs_fee: Amount,
575 ) -> Result<(), Self::Err> {
576 query(
578 r#"
579 INSERT INTO melt_request
580 (quote_id, inputs_amount, inputs_fee)
581 VALUES
582 (:quote_id, :inputs_amount, :inputs_fee)
583 "#,
584 )?
585 .bind("quote_id", quote_id.to_string())
586 .bind("inputs_amount", inputs_amount.to_i64())
587 .bind("inputs_fee", inputs_fee.to_i64())
588 .execute(&self.inner)
589 .await?;
590
591 Ok(())
592 }
593
594 async fn add_blinded_messages(
595 &mut self,
596 quote_id: Option<&QuoteId>,
597 blinded_messages: &[BlindedMessage],
598 operation: &Operation,
599 ) -> Result<(), Self::Err> {
600 let current_time = unix_time();
601
602 for message in blinded_messages {
605 match query(
606 r#"
607 INSERT INTO blind_signature
608 (blinded_message, amount, keyset_id, c, quote_id, created_time, operation_kind, operation_id)
609 VALUES
610 (:blinded_message, :amount, :keyset_id, NULL, :quote_id, :created_time, :operation_kind, :operation_id)
611 "#,
612 )?
613 .bind(
614 "blinded_message",
615 message.blinded_secret.to_bytes().to_vec(),
616 )
617 .bind("amount", message.amount.to_i64())
618 .bind("keyset_id", message.keyset_id.to_string())
619 .bind("quote_id", quote_id.map(|q| q.to_string()))
620 .bind("created_time", current_time as i64)
621 .bind("operation_kind", operation.kind())
622 .bind("operation_id", operation.id().to_string())
623 .execute(&self.inner)
624 .await
625 {
626 Ok(_) => continue,
627 Err(database::Error::Duplicate) => {
628 return Err(database::Error::Duplicate);
633 }
634 Err(err) => return Err(err),
635 }
636 }
637
638 Ok(())
639 }
640
641 async fn delete_blinded_messages(
642 &mut self,
643 blinded_secrets: &[PublicKey],
644 ) -> Result<(), Self::Err> {
645 if blinded_secrets.is_empty() {
646 return Ok(());
647 }
648
649 query(
652 r#"
653 DELETE FROM blind_signature
654 WHERE blinded_message IN (:blinded_secrets) AND c IS NULL
655 "#,
656 )?
657 .bind_vec(
658 "blinded_secrets",
659 blinded_secrets
660 .iter()
661 .map(|secret| secret.to_bytes().to_vec())
662 .collect(),
663 )
664 .execute(&self.inner)
665 .await?;
666
667 Ok(())
668 }
669
670 async fn get_melt_request_and_blinded_messages(
671 &mut self,
672 quote_id: &QuoteId,
673 ) -> Result<Option<database::mint::MeltRequestInfo>, Self::Err> {
674 let melt_request_row = query(
675 r#"
676 SELECT inputs_amount, inputs_fee
677 FROM melt_request
678 WHERE quote_id = :quote_id
679 FOR UPDATE
680 "#,
681 )?
682 .bind("quote_id", quote_id.to_string())
683 .fetch_one(&self.inner)
684 .await?;
685
686 if let Some(row) = melt_request_row {
687 let inputs_amount: u64 = column_as_number!(row[0].clone());
688 let inputs_fee: u64 = column_as_number!(row[1].clone());
689
690 let blinded_messages_rows = query(
692 r#"
693 SELECT blinded_message, keyset_id, amount
694 FROM blind_signature
695 WHERE quote_id = :quote_id AND c IS NULL
696 "#,
697 )?
698 .bind("quote_id", quote_id.to_string())
699 .fetch_all(&self.inner)
700 .await?;
701
702 let blinded_messages: Result<Vec<BlindedMessage>, Error> = blinded_messages_rows
703 .into_iter()
704 .map(|row| -> Result<BlindedMessage, Error> {
705 let blinded_message_key =
706 column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice);
707 let keyset_id = column_as_string!(&row[1], Id::from_str, Id::from_bytes);
708 let amount: u64 = column_as_number!(row[2].clone());
709
710 Ok(BlindedMessage {
711 blinded_secret: blinded_message_key,
712 keyset_id,
713 amount: Amount::from(amount),
714 witness: None, })
716 })
717 .collect();
718 let blinded_messages = blinded_messages?;
719
720 Ok(Some(database::mint::MeltRequestInfo {
721 inputs_amount: Amount::from(inputs_amount),
722 inputs_fee: Amount::from(inputs_fee),
723 change_outputs: blinded_messages,
724 }))
725 } else {
726 Ok(None)
727 }
728 }
729
730 async fn delete_melt_request(&mut self, quote_id: &QuoteId) -> Result<(), Self::Err> {
731 query(
733 r#"
734 DELETE FROM melt_request
735 WHERE quote_id = :quote_id
736 "#,
737 )?
738 .bind("quote_id", quote_id.to_string())
739 .execute(&self.inner)
740 .await?;
741
742 query(
744 r#"
745 DELETE FROM blind_signature
746 WHERE quote_id = :quote_id AND c IS NULL
747 "#,
748 )?
749 .bind("quote_id", quote_id.to_string())
750 .execute(&self.inner)
751 .await?;
752
753 Ok(())
754 }
755
756 #[instrument(skip(self))]
757 async fn increment_mint_quote_amount_paid(
758 &mut self,
759 quote_id: &QuoteId,
760 amount_paid: Amount,
761 payment_id: String,
762 ) -> Result<Amount, Self::Err> {
763 if amount_paid == Amount::ZERO {
764 tracing::warn!("Amount payments of zero amount should not be recorded.");
765 return Err(Error::Duplicate);
766 }
767
768 let exists = query(
770 r#"
771 SELECT payment_id
772 FROM mint_quote_payments
773 WHERE payment_id = :payment_id
774 FOR UPDATE
775 "#,
776 )?
777 .bind("payment_id", payment_id.clone())
778 .fetch_one(&self.inner)
779 .await?;
780
781 if exists.is_some() {
782 tracing::error!("Payment ID already exists: {}", payment_id);
783 return Err(database::Error::Duplicate);
784 }
785
786 let current_amount = query(
788 r#"
789 SELECT amount_paid
790 FROM mint_quote
791 WHERE id = :quote_id
792 FOR UPDATE
793 "#,
794 )?
795 .bind("quote_id", quote_id.to_string())
796 .fetch_one(&self.inner)
797 .await
798 .inspect_err(|err| {
799 tracing::error!("SQLite could not get mint quote amount_paid: {}", err);
800 })?;
801
802 let current_amount_paid = if let Some(current_amount) = current_amount {
803 let amount: u64 = column_as_number!(current_amount[0].clone());
804 Amount::from(amount)
805 } else {
806 Amount::ZERO
807 };
808
809 let new_amount_paid = current_amount_paid
811 .checked_add(amount_paid)
812 .ok_or_else(|| database::Error::AmountOverflow)?;
813
814 tracing::debug!(
815 "Mint quote {} amount paid was {} is now {}.",
816 quote_id,
817 current_amount_paid,
818 new_amount_paid
819 );
820
821 query(
823 r#"
824 UPDATE mint_quote
825 SET amount_paid = :amount_paid
826 WHERE id = :quote_id
827 "#,
828 )?
829 .bind("amount_paid", new_amount_paid.to_i64())
830 .bind("quote_id", quote_id.to_string())
831 .execute(&self.inner)
832 .await
833 .inspect_err(|err| {
834 tracing::error!("SQLite could not update mint quote amount_paid: {}", err);
835 })?;
836
837 query(
839 r#"
840 INSERT INTO mint_quote_payments
841 (quote_id, payment_id, amount, timestamp)
842 VALUES (:quote_id, :payment_id, :amount, :timestamp)
843 "#,
844 )?
845 .bind("quote_id", quote_id.to_string())
846 .bind("payment_id", payment_id)
847 .bind("amount", amount_paid.to_i64())
848 .bind("timestamp", unix_time() as i64)
849 .execute(&self.inner)
850 .await
851 .map_err(|err| {
852 tracing::error!("SQLite could not insert payment ID: {}", err);
853 err
854 })?;
855
856 Ok(new_amount_paid)
857 }
858
859 #[instrument(skip_all)]
860 async fn increment_mint_quote_amount_issued(
861 &mut self,
862 quote_id: &QuoteId,
863 amount_issued: Amount,
864 ) -> Result<Amount, Self::Err> {
865 let current_amounts = query(
867 r#"
868 SELECT amount_issued, amount_paid
869 FROM mint_quote
870 WHERE id = :quote_id
871 FOR UPDATE
872 "#,
873 )?
874 .bind("quote_id", quote_id.to_string())
875 .fetch_one(&self.inner)
876 .await
877 .inspect_err(|err| {
878 tracing::error!("SQLite could not get mint quote amount_issued: {}", err);
879 })?
880 .ok_or(Error::QuoteNotFound)?;
881
882 let new_amount_issued = {
883 unpack_into!(
885 let (current_amount_issued, current_amount_paid) = current_amounts
886 );
887
888 let current_amount_issued: u64 = column_as_number!(current_amount_issued);
889 let current_amount_paid: u64 = column_as_number!(current_amount_paid);
890
891 let current_amount_issued = Amount::from(current_amount_issued);
892 let current_amount_paid = Amount::from(current_amount_paid);
893
894 let new_amount_issued = current_amount_issued
896 .checked_add(amount_issued)
897 .ok_or_else(|| database::Error::AmountOverflow)?;
898
899 current_amount_paid
900 .checked_sub(new_amount_issued)
901 .ok_or(Error::Internal("Over-issued not allowed".to_owned()))?;
902
903 new_amount_issued
904 };
905
906 query(
908 r#"
909 UPDATE mint_quote
910 SET amount_issued = :amount_issued
911 WHERE id = :quote_id
912 "#,
913 )?
914 .bind("amount_issued", new_amount_issued.to_i64())
915 .bind("quote_id", quote_id.to_string())
916 .execute(&self.inner)
917 .await
918 .inspect_err(|err| {
919 tracing::error!("SQLite could not update mint quote amount_issued: {}", err);
920 })?;
921
922 let current_time = unix_time();
923
924 query(
925 r#"
926INSERT INTO mint_quote_issued
927(quote_id, amount, timestamp)
928VALUES (:quote_id, :amount, :timestamp);
929 "#,
930 )?
931 .bind("quote_id", quote_id.to_string())
932 .bind("amount", amount_issued.to_i64())
933 .bind("timestamp", current_time as i64)
934 .execute(&self.inner)
935 .await?;
936
937 Ok(new_amount_issued)
938 }
939
940 #[instrument(skip_all)]
941 async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), Self::Err> {
942 query(
943 r#"
944 INSERT INTO mint_quote (
945 id, amount, unit, request, expiry, request_lookup_id, pubkey, created_time, payment_method, request_lookup_id_kind
946 )
947 VALUES (
948 :id, :amount, :unit, :request, :expiry, :request_lookup_id, :pubkey, :created_time, :payment_method, :request_lookup_id_kind
949 )
950 "#,
951 )?
952 .bind("id", quote.id.to_string())
953 .bind("amount", quote.amount.map(|a| a.to_i64()))
954 .bind("unit", quote.unit.to_string())
955 .bind("request", quote.request)
956 .bind("expiry", quote.expiry as i64)
957 .bind(
958 "request_lookup_id",
959 quote.request_lookup_id.to_string(),
960 )
961 .bind("pubkey", quote.pubkey.map(|p| p.to_string()))
962 .bind("created_time", quote.created_time as i64)
963 .bind("payment_method", quote.payment_method.to_string())
964 .bind("request_lookup_id_kind", quote.request_lookup_id.kind())
965 .execute(&self.inner)
966 .await?;
967
968 Ok(())
969 }
970
971 async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err> {
972 query(
974 r#"
975 INSERT INTO melt_quote
976 (
977 id, unit, amount, request, fee_reserve, state,
978 expiry, payment_preimage, request_lookup_id,
979 created_time, paid_time, options, request_lookup_id_kind, payment_method
980 )
981 VALUES
982 (
983 :id, :unit, :amount, :request, :fee_reserve, :state,
984 :expiry, :payment_preimage, :request_lookup_id,
985 :created_time, :paid_time, :options, :request_lookup_id_kind, :payment_method
986 )
987 "#,
988 )?
989 .bind("id", quote.id.to_string())
990 .bind("unit", quote.unit.to_string())
991 .bind("amount", quote.amount.to_i64())
992 .bind("request", serde_json::to_string("e.request)?)
993 .bind("fee_reserve", quote.fee_reserve.to_i64())
994 .bind("state", quote.state.to_string())
995 .bind("expiry", quote.expiry as i64)
996 .bind("payment_preimage", quote.payment_preimage)
997 .bind(
998 "request_lookup_id",
999 quote.request_lookup_id.as_ref().map(|id| id.to_string()),
1000 )
1001 .bind("created_time", quote.created_time as i64)
1002 .bind("paid_time", quote.paid_time.map(|t| t as i64))
1003 .bind(
1004 "options",
1005 quote.options.map(|o| serde_json::to_string(&o).ok()),
1006 )
1007 .bind(
1008 "request_lookup_id_kind",
1009 quote.request_lookup_id.map(|id| id.kind()),
1010 )
1011 .bind("payment_method", quote.payment_method.to_string())
1012 .execute(&self.inner)
1013 .await?;
1014
1015 Ok(())
1016 }
1017
1018 async fn update_melt_quote_request_lookup_id(
1019 &mut self,
1020 quote_id: &QuoteId,
1021 new_request_lookup_id: &PaymentIdentifier,
1022 ) -> Result<(), Self::Err> {
1023 query(r#"UPDATE melt_quote SET request_lookup_id = :new_req_id, request_lookup_id_kind = :new_kind WHERE id = :id"#)?
1024 .bind("new_req_id", new_request_lookup_id.to_string())
1025 .bind("new_kind",new_request_lookup_id.kind() )
1026 .bind("id", quote_id.to_string())
1027 .execute(&self.inner)
1028 .await?;
1029 Ok(())
1030 }
1031
1032 async fn update_melt_quote_state(
1033 &mut self,
1034 quote_id: &QuoteId,
1035 state: MeltQuoteState,
1036 payment_proof: Option<String>,
1037 ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
1038 let mut quote = query(
1039 r#"
1040 SELECT
1041 id,
1042 unit,
1043 amount,
1044 request,
1045 fee_reserve,
1046 expiry,
1047 state,
1048 payment_preimage,
1049 request_lookup_id,
1050 created_time,
1051 paid_time,
1052 payment_method,
1053 options,
1054 request_lookup_id_kind
1055 FROM
1056 melt_quote
1057 WHERE
1058 id=:id
1059 AND state != :state
1060 "#,
1061 )?
1062 .bind("id", quote_id.to_string())
1063 .bind("state", state.to_string())
1064 .fetch_one(&self.inner)
1065 .await?
1066 .map(sql_row_to_melt_quote)
1067 .transpose()?
1068 .ok_or(Error::QuoteNotFound)?;
1069
1070 check_melt_quote_state_transition(quote.state, state)?;
1071
1072 let rec = if state == MeltQuoteState::Paid {
1073 let current_time = unix_time();
1074 query(r#"UPDATE melt_quote SET state = :state, paid_time = :paid_time, payment_preimage = :payment_preimage WHERE id = :id"#)?
1075 .bind("state", state.to_string())
1076 .bind("paid_time", current_time as i64)
1077 .bind("payment_preimage", payment_proof)
1078 .bind("id", quote_id.to_string())
1079 .execute(&self.inner)
1080 .await
1081 } else {
1082 query(r#"UPDATE melt_quote SET state = :state WHERE id = :id"#)?
1083 .bind("state", state.to_string())
1084 .bind("id", quote_id.to_string())
1085 .execute(&self.inner)
1086 .await
1087 };
1088
1089 match rec {
1090 Ok(_) => {}
1091 Err(err) => {
1092 tracing::error!("SQLite Could not update melt quote");
1093 return Err(err);
1094 }
1095 };
1096
1097 let old_state = quote.state;
1098 quote.state = state;
1099
1100 if state == MeltQuoteState::Unpaid || state == MeltQuoteState::Failed {
1101 self.delete_melt_request(quote_id).await?;
1102 }
1103
1104 Ok((old_state, quote))
1105 }
1106
1107 async fn get_mint_quote(&mut self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
1108 let payments = get_mint_quote_payments(&self.inner, quote_id).await?;
1109 let issuance = get_mint_quote_issuance(&self.inner, quote_id).await?;
1110
1111 Ok(query(
1112 r#"
1113 SELECT
1114 id,
1115 amount,
1116 unit,
1117 request,
1118 expiry,
1119 request_lookup_id,
1120 pubkey,
1121 created_time,
1122 amount_paid,
1123 amount_issued,
1124 payment_method,
1125 request_lookup_id_kind
1126 FROM
1127 mint_quote
1128 WHERE id = :id
1129 FOR UPDATE
1130 "#,
1131 )?
1132 .bind("id", quote_id.to_string())
1133 .fetch_one(&self.inner)
1134 .await?
1135 .map(|row| sql_row_to_mint_quote(row, payments, issuance))
1136 .transpose()?)
1137 }
1138
1139 async fn get_melt_quote(
1140 &mut self,
1141 quote_id: &QuoteId,
1142 ) -> Result<Option<mint::MeltQuote>, Self::Err> {
1143 Ok(query(
1144 r#"
1145 SELECT
1146 id,
1147 unit,
1148 amount,
1149 request,
1150 fee_reserve,
1151 expiry,
1152 state,
1153 payment_preimage,
1154 request_lookup_id,
1155 created_time,
1156 paid_time,
1157 payment_method,
1158 options,
1159 request_lookup_id
1160 FROM
1161 melt_quote
1162 WHERE
1163 id=:id
1164 "#,
1165 )?
1166 .bind("id", quote_id.to_string())
1167 .fetch_one(&self.inner)
1168 .await?
1169 .map(sql_row_to_melt_quote)
1170 .transpose()?)
1171 }
1172
1173 async fn get_mint_quote_by_request(
1174 &mut self,
1175 request: &str,
1176 ) -> Result<Option<MintQuote>, Self::Err> {
1177 let mut mint_quote = query(
1178 r#"
1179 SELECT
1180 id,
1181 amount,
1182 unit,
1183 request,
1184 expiry,
1185 request_lookup_id,
1186 pubkey,
1187 created_time,
1188 amount_paid,
1189 amount_issued,
1190 payment_method,
1191 request_lookup_id_kind
1192 FROM
1193 mint_quote
1194 WHERE request = :request
1195 FOR UPDATE
1196 "#,
1197 )?
1198 .bind("request", request.to_string())
1199 .fetch_one(&self.inner)
1200 .await?
1201 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1202 .transpose()?;
1203
1204 if let Some(quote) = mint_quote.as_mut() {
1205 let payments = get_mint_quote_payments(&self.inner, "e.id).await?;
1206 let issuance = get_mint_quote_issuance(&self.inner, "e.id).await?;
1207 quote.issuance = issuance;
1208 quote.payments = payments;
1209 }
1210
1211 Ok(mint_quote)
1212 }
1213
1214 async fn get_mint_quote_by_request_lookup_id(
1215 &mut self,
1216 request_lookup_id: &PaymentIdentifier,
1217 ) -> Result<Option<MintQuote>, Self::Err> {
1218 let mut mint_quote = query(
1219 r#"
1220 SELECT
1221 id,
1222 amount,
1223 unit,
1224 request,
1225 expiry,
1226 request_lookup_id,
1227 pubkey,
1228 created_time,
1229 amount_paid,
1230 amount_issued,
1231 payment_method,
1232 request_lookup_id_kind
1233 FROM
1234 mint_quote
1235 WHERE request_lookup_id = :request_lookup_id
1236 AND request_lookup_id_kind = :request_lookup_id_kind
1237 FOR UPDATE
1238 "#,
1239 )?
1240 .bind("request_lookup_id", request_lookup_id.to_string())
1241 .bind("request_lookup_id_kind", request_lookup_id.kind())
1242 .fetch_one(&self.inner)
1243 .await?
1244 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1245 .transpose()?;
1246
1247 if let Some(quote) = mint_quote.as_mut() {
1248 let payments = get_mint_quote_payments(&self.inner, "e.id).await?;
1249 let issuance = get_mint_quote_issuance(&self.inner, "e.id).await?;
1250 quote.issuance = issuance;
1251 quote.payments = payments;
1252 }
1253
1254 Ok(mint_quote)
1255 }
1256}
1257
1258#[async_trait]
1259impl<RM> MintQuotesDatabase for SQLMintDatabase<RM>
1260where
1261 RM: DatabasePool + 'static,
1262{
1263 type Err = Error;
1264
1265 async fn get_mint_quote(&self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
1266 #[cfg(feature = "prometheus")]
1267 METRICS.inc_in_flight_requests("get_mint_quote");
1268
1269 #[cfg(feature = "prometheus")]
1270 let start_time = std::time::Instant::now();
1271 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1272
1273 let result = async {
1274 let payments = get_mint_quote_payments(&*conn, quote_id).await?;
1275 let issuance = get_mint_quote_issuance(&*conn, quote_id).await?;
1276
1277 query(
1278 r#"
1279 SELECT
1280 id,
1281 amount,
1282 unit,
1283 request,
1284 expiry,
1285 request_lookup_id,
1286 pubkey,
1287 created_time,
1288 amount_paid,
1289 amount_issued,
1290 payment_method,
1291 request_lookup_id_kind
1292 FROM
1293 mint_quote
1294 WHERE id = :id"#,
1295 )?
1296 .bind("id", quote_id.to_string())
1297 .fetch_one(&*conn)
1298 .await?
1299 .map(|row| sql_row_to_mint_quote(row, payments, issuance))
1300 .transpose()
1301 }
1302 .await;
1303
1304 #[cfg(feature = "prometheus")]
1305 {
1306 let success = result.is_ok();
1307
1308 METRICS.record_mint_operation("get_mint_quote", success);
1309 METRICS.record_mint_operation_histogram(
1310 "get_mint_quote",
1311 success,
1312 start_time.elapsed().as_secs_f64(),
1313 );
1314 METRICS.dec_in_flight_requests("get_mint_quote");
1315 }
1316
1317 result
1318 }
1319
1320 async fn get_mint_quote_by_request(
1321 &self,
1322 request: &str,
1323 ) -> Result<Option<MintQuote>, Self::Err> {
1324 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1325 let mut mint_quote = query(
1326 r#"
1327 SELECT
1328 id,
1329 amount,
1330 unit,
1331 request,
1332 expiry,
1333 request_lookup_id,
1334 pubkey,
1335 created_time,
1336 amount_paid,
1337 amount_issued,
1338 payment_method,
1339 request_lookup_id_kind
1340 FROM
1341 mint_quote
1342 WHERE request = :request"#,
1343 )?
1344 .bind("request", request.to_owned())
1345 .fetch_one(&*conn)
1346 .await?
1347 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1348 .transpose()?;
1349
1350 if let Some(quote) = mint_quote.as_mut() {
1351 let payments = get_mint_quote_payments(&*conn, "e.id).await?;
1352 let issuance = get_mint_quote_issuance(&*conn, "e.id).await?;
1353 quote.issuance = issuance;
1354 quote.payments = payments;
1355 }
1356
1357 Ok(mint_quote)
1358 }
1359
1360 async fn get_mint_quote_by_request_lookup_id(
1361 &self,
1362 request_lookup_id: &PaymentIdentifier,
1363 ) -> Result<Option<MintQuote>, Self::Err> {
1364 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1365 let mut mint_quote = query(
1366 r#"
1367 SELECT
1368 id,
1369 amount,
1370 unit,
1371 request,
1372 expiry,
1373 request_lookup_id,
1374 pubkey,
1375 created_time,
1376 amount_paid,
1377 amount_issued,
1378 payment_method,
1379 request_lookup_id_kind
1380 FROM
1381 mint_quote
1382 WHERE request_lookup_id = :request_lookup_id
1383 AND request_lookup_id_kind = :request_lookup_id_kind
1384 "#,
1385 )?
1386 .bind("request_lookup_id", request_lookup_id.to_string())
1387 .bind("request_lookup_id_kind", request_lookup_id.kind())
1388 .fetch_one(&*conn)
1389 .await?
1390 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1391 .transpose()?;
1392
1393 if let Some(quote) = mint_quote.as_mut() {
1395 let payments = get_mint_quote_payments(&*conn, "e.id).await?;
1396 let issuance = get_mint_quote_issuance(&*conn, "e.id).await?;
1397 quote.issuance = issuance;
1398 quote.payments = payments;
1399 }
1400
1401 Ok(mint_quote)
1402 }
1403
1404 async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
1405 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1406 let mut mint_quotes = query(
1407 r#"
1408 SELECT
1409 id,
1410 amount,
1411 unit,
1412 request,
1413 expiry,
1414 request_lookup_id,
1415 pubkey,
1416 created_time,
1417 amount_paid,
1418 amount_issued,
1419 payment_method,
1420 request_lookup_id_kind
1421 FROM
1422 mint_quote
1423 "#,
1424 )?
1425 .fetch_all(&*conn)
1426 .await?
1427 .into_iter()
1428 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1429 .collect::<Result<Vec<_>, _>>()?;
1430
1431 for quote in mint_quotes.as_mut_slice() {
1432 let payments = get_mint_quote_payments(&*conn, "e.id).await?;
1433 let issuance = get_mint_quote_issuance(&*conn, "e.id).await?;
1434 quote.issuance = issuance;
1435 quote.payments = payments;
1436 }
1437
1438 Ok(mint_quotes)
1439 }
1440
1441 async fn get_melt_quote(
1442 &self,
1443 quote_id: &QuoteId,
1444 ) -> Result<Option<mint::MeltQuote>, Self::Err> {
1445 #[cfg(feature = "prometheus")]
1446 METRICS.inc_in_flight_requests("get_melt_quote");
1447
1448 #[cfg(feature = "prometheus")]
1449 let start_time = std::time::Instant::now();
1450 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1451
1452 let result = async {
1453 query(
1454 r#"
1455 SELECT
1456 id,
1457 unit,
1458 amount,
1459 request,
1460 fee_reserve,
1461 expiry,
1462 state,
1463 payment_preimage,
1464 request_lookup_id,
1465 created_time,
1466 paid_time,
1467 payment_method,
1468 options,
1469 request_lookup_id_kind
1470 FROM
1471 melt_quote
1472 WHERE
1473 id=:id
1474 "#,
1475 )?
1476 .bind("id", quote_id.to_string())
1477 .fetch_one(&*conn)
1478 .await?
1479 .map(sql_row_to_melt_quote)
1480 .transpose()
1481 }
1482 .await;
1483
1484 #[cfg(feature = "prometheus")]
1485 {
1486 let success = result.is_ok();
1487
1488 METRICS.record_mint_operation("get_melt_quote", success);
1489 METRICS.record_mint_operation_histogram(
1490 "get_melt_quote",
1491 success,
1492 start_time.elapsed().as_secs_f64(),
1493 );
1494 METRICS.dec_in_flight_requests("get_melt_quote");
1495 }
1496
1497 result
1498 }
1499
1500 async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
1501 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1502 Ok(query(
1503 r#"
1504 SELECT
1505 id,
1506 unit,
1507 amount,
1508 request,
1509 fee_reserve,
1510 expiry,
1511 state,
1512 payment_preimage,
1513 request_lookup_id,
1514 created_time,
1515 paid_time,
1516 payment_method,
1517 options,
1518 request_lookup_id_kind
1519 FROM
1520 melt_quote
1521 "#,
1522 )?
1523 .fetch_all(&*conn)
1524 .await?
1525 .into_iter()
1526 .map(sql_row_to_melt_quote)
1527 .collect::<Result<Vec<_>, _>>()?)
1528 }
1529}
1530
1531#[async_trait]
1532impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
1533where
1534 RM: DatabasePool + 'static,
1535{
1536 type Err = Error;
1537
1538 async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
1539 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1540 let mut proofs = query(
1541 r#"
1542 SELECT
1543 amount,
1544 keyset_id,
1545 secret,
1546 c,
1547 witness,
1548 y
1549 FROM
1550 proof
1551 WHERE
1552 y IN (:ys)
1553 "#,
1554 )?
1555 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
1556 .fetch_all(&*conn)
1557 .await?
1558 .into_iter()
1559 .map(|mut row| {
1560 Ok((
1561 column_as_string!(
1562 row.pop().ok_or(Error::InvalidDbResponse)?,
1563 PublicKey::from_hex,
1564 PublicKey::from_slice
1565 ),
1566 sql_row_to_proof(row)?,
1567 ))
1568 })
1569 .collect::<Result<HashMap<_, _>, Error>>()?;
1570
1571 Ok(ys.iter().map(|y| proofs.remove(y)).collect())
1572 }
1573
1574 async fn get_proof_ys_by_quote_id(
1575 &self,
1576 quote_id: &QuoteId,
1577 ) -> Result<Vec<PublicKey>, Self::Err> {
1578 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1579 Ok(query(
1580 r#"
1581 SELECT
1582 amount,
1583 keyset_id,
1584 secret,
1585 c,
1586 witness
1587 FROM
1588 proof
1589 WHERE
1590 quote_id = :quote_id
1591 "#,
1592 )?
1593 .bind("quote_id", quote_id.to_string())
1594 .fetch_all(&*conn)
1595 .await?
1596 .into_iter()
1597 .map(sql_row_to_proof)
1598 .collect::<Result<Vec<Proof>, _>>()?
1599 .ys()?)
1600 }
1601
1602 async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
1603 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1604 let mut current_states = get_current_states(&*conn, ys).await?;
1605
1606 Ok(ys.iter().map(|y| current_states.remove(y)).collect())
1607 }
1608
1609 async fn get_proofs_by_keyset_id(
1610 &self,
1611 keyset_id: &Id,
1612 ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
1613 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1614 Ok(query(
1615 r#"
1616 SELECT
1617 keyset_id,
1618 amount,
1619 secret,
1620 c,
1621 witness,
1622 state
1623 FROM
1624 proof
1625 WHERE
1626 keyset_id=:keyset_id
1627 "#,
1628 )?
1629 .bind("keyset_id", keyset_id.to_string())
1630 .fetch_all(&*conn)
1631 .await?
1632 .into_iter()
1633 .map(sql_row_to_proof_with_state)
1634 .collect::<Result<Vec<_>, _>>()?
1635 .into_iter()
1636 .unzip())
1637 }
1638
1639 async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
1641 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1642 query(
1643 r#"
1644 SELECT
1645 keyset_id,
1646 total_redeemed as amount
1647 FROM
1648 keyset_amounts
1649 "#,
1650 )?
1651 .fetch_all(&*conn)
1652 .await?
1653 .into_iter()
1654 .map(sql_row_to_hashmap_amount)
1655 .collect()
1656 }
1657}
1658
1659#[async_trait]
1660impl<RM> MintSignatureTransaction<'_> for SQLTransaction<RM>
1661where
1662 RM: DatabasePool + 'static,
1663{
1664 type Err = Error;
1665
1666 async fn add_blind_signatures(
1667 &mut self,
1668 blinded_messages: &[PublicKey],
1669 blind_signatures: &[BlindSignature],
1670 quote_id: Option<QuoteId>,
1671 ) -> Result<(), Self::Err> {
1672 let current_time = unix_time();
1673
1674 if blinded_messages.len() != blind_signatures.len() {
1675 return Err(database::Error::Internal(
1676 "Mismatched array lengths for blinded messages and blind signatures".to_string(),
1677 ));
1678 }
1679
1680 let mut existing_rows = query(
1682 r#"
1683 SELECT blinded_message, c, dleq_e, dleq_s
1684 FROM blind_signature
1685 WHERE blinded_message IN (:blinded_messages)
1686 FOR UPDATE
1687 "#,
1688 )?
1689 .bind_vec(
1690 "blinded_messages",
1691 blinded_messages
1692 .iter()
1693 .map(|message| message.to_bytes().to_vec())
1694 .collect(),
1695 )
1696 .fetch_all(&self.inner)
1697 .await?
1698 .into_iter()
1699 .map(|mut row| {
1700 Ok((
1701 column_as_string!(&row.remove(0), PublicKey::from_hex, PublicKey::from_slice),
1702 (row[0].clone(), row[1].clone(), row[2].clone()),
1703 ))
1704 })
1705 .collect::<Result<HashMap<_, _>, Error>>()?;
1706
1707 for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
1709 match existing_rows.remove(message) {
1710 None => {
1711 query(
1713 r#"
1714 INSERT INTO blind_signature
1715 (blinded_message, amount, keyset_id, c, quote_id, dleq_e, dleq_s, created_time, signed_time)
1716 VALUES
1717 (:blinded_message, :amount, :keyset_id, :c, :quote_id, :dleq_e, :dleq_s, :created_time, :signed_time)
1718 "#,
1719 )?
1720 .bind("blinded_message", message.to_bytes().to_vec())
1721 .bind("amount", u64::from(signature.amount) as i64)
1722 .bind("keyset_id", signature.keyset_id.to_string())
1723 .bind("c", signature.c.to_bytes().to_vec())
1724 .bind("quote_id", quote_id.as_ref().map(|q| q.to_string()))
1725 .bind(
1726 "dleq_e",
1727 signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
1728 )
1729 .bind(
1730 "dleq_s",
1731 signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
1732 )
1733 .bind("created_time", current_time as i64)
1734 .bind("signed_time", current_time as i64)
1735 .execute(&self.inner)
1736 .await?;
1737
1738 query(
1739 r#"
1740 INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
1741 VALUES (:keyset_id, :amount, 0)
1742 ON CONFLICT (keyset_id)
1743 DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
1744 "#,
1745 )?
1746 .bind("amount", u64::from(signature.amount) as i64)
1747 .bind("keyset_id", signature.keyset_id.to_string())
1748 .execute(&self.inner)
1749 .await?;
1750 }
1751 Some((c, _dleq_e, _dleq_s)) => {
1752 match c {
1754 Column::Null => {
1755 query(
1757 r#"
1758 UPDATE blind_signature
1759 SET c = :c, dleq_e = :dleq_e, dleq_s = :dleq_s, signed_time = :signed_time, amount = :amount
1760 WHERE blinded_message = :blinded_message
1761 "#,
1762 )?
1763 .bind("c", signature.c.to_bytes().to_vec())
1764 .bind(
1765 "dleq_e",
1766 signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
1767 )
1768 .bind(
1769 "dleq_s",
1770 signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
1771 )
1772 .bind("blinded_message", message.to_bytes().to_vec())
1773 .bind("signed_time", current_time as i64)
1774 .bind("amount", u64::from(signature.amount) as i64)
1775 .execute(&self.inner)
1776 .await?;
1777
1778 query(
1779 r#"
1780 INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
1781 VALUES (:keyset_id, :amount, 0)
1782 ON CONFLICT (keyset_id)
1783 DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
1784 "#,
1785 )?
1786 .bind("amount", u64::from(signature.amount) as i64)
1787 .bind("keyset_id", signature.keyset_id.to_string())
1788 .execute(&self.inner)
1789 .await?;
1790 }
1791 _ => {
1792 tracing::error!(
1794 "Attempting to add signature to message already signed {}",
1795 message
1796 );
1797
1798 return Err(database::Error::Duplicate);
1799 }
1800 }
1801 }
1802 }
1803 }
1804
1805 debug_assert!(
1806 existing_rows.is_empty(),
1807 "Unexpected existing rows remain: {:?}",
1808 existing_rows.keys().collect::<Vec<_>>()
1809 );
1810
1811 if !existing_rows.is_empty() {
1812 tracing::error!("Did not check all existing rows");
1813 return Err(Error::Internal(
1814 "Did not check all existing rows".to_string(),
1815 ));
1816 }
1817
1818 Ok(())
1819 }
1820
1821 async fn get_blind_signatures(
1822 &mut self,
1823 blinded_messages: &[PublicKey],
1824 ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
1825 let mut blinded_signatures = query(
1826 r#"SELECT
1827 keyset_id,
1828 amount,
1829 c,
1830 dleq_e,
1831 dleq_s,
1832 blinded_message
1833 FROM
1834 blind_signature
1835 WHERE blinded_message IN (:b) AND c IS NOT NULL
1836 "#,
1837 )?
1838 .bind_vec(
1839 "b",
1840 blinded_messages
1841 .iter()
1842 .map(|b| b.to_bytes().to_vec())
1843 .collect(),
1844 )
1845 .fetch_all(&self.inner)
1846 .await?
1847 .into_iter()
1848 .map(|mut row| {
1849 Ok((
1850 column_as_string!(
1851 &row.pop().ok_or(Error::InvalidDbResponse)?,
1852 PublicKey::from_hex,
1853 PublicKey::from_slice
1854 ),
1855 sql_row_to_blind_signature(row)?,
1856 ))
1857 })
1858 .collect::<Result<HashMap<_, _>, Error>>()?;
1859 Ok(blinded_messages
1860 .iter()
1861 .map(|y| blinded_signatures.remove(y))
1862 .collect())
1863 }
1864}
1865
1866#[async_trait]
1867impl<RM> MintSignaturesDatabase for SQLMintDatabase<RM>
1868where
1869 RM: DatabasePool + 'static,
1870{
1871 type Err = Error;
1872
1873 async fn get_blind_signatures(
1874 &self,
1875 blinded_messages: &[PublicKey],
1876 ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
1877 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1878 let mut blinded_signatures = query(
1879 r#"SELECT
1880 keyset_id,
1881 amount,
1882 c,
1883 dleq_e,
1884 dleq_s,
1885 blinded_message
1886 FROM
1887 blind_signature
1888 WHERE blinded_message IN (:b) AND c IS NOT NULL
1889 "#,
1890 )?
1891 .bind_vec(
1892 "b",
1893 blinded_messages
1894 .iter()
1895 .map(|b_| b_.to_bytes().to_vec())
1896 .collect(),
1897 )
1898 .fetch_all(&*conn)
1899 .await?
1900 .into_iter()
1901 .map(|mut row| {
1902 Ok((
1903 column_as_string!(
1904 &row.pop().ok_or(Error::InvalidDbResponse)?,
1905 PublicKey::from_hex,
1906 PublicKey::from_slice
1907 ),
1908 sql_row_to_blind_signature(row)?,
1909 ))
1910 })
1911 .collect::<Result<HashMap<_, _>, Error>>()?;
1912 Ok(blinded_messages
1913 .iter()
1914 .map(|y| blinded_signatures.remove(y))
1915 .collect())
1916 }
1917
1918 async fn get_blind_signatures_for_keyset(
1919 &self,
1920 keyset_id: &Id,
1921 ) -> Result<Vec<BlindSignature>, Self::Err> {
1922 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1923 Ok(query(
1924 r#"
1925 SELECT
1926 keyset_id,
1927 amount,
1928 c,
1929 dleq_e,
1930 dleq_s
1931 FROM
1932 blind_signature
1933 WHERE
1934 keyset_id=:keyset_id AND c IS NOT NULL
1935 "#,
1936 )?
1937 .bind("keyset_id", keyset_id.to_string())
1938 .fetch_all(&*conn)
1939 .await?
1940 .into_iter()
1941 .map(sql_row_to_blind_signature)
1942 .collect::<Result<Vec<BlindSignature>, _>>()?)
1943 }
1944
1945 async fn get_blind_signatures_for_quote(
1947 &self,
1948 quote_id: &QuoteId,
1949 ) -> Result<Vec<BlindSignature>, Self::Err> {
1950 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1951 Ok(query(
1952 r#"
1953 SELECT
1954 keyset_id,
1955 amount,
1956 c,
1957 dleq_e,
1958 dleq_s
1959 FROM
1960 blind_signature
1961 WHERE
1962 quote_id=:quote_id AND c IS NOT NULL
1963 "#,
1964 )?
1965 .bind("quote_id", quote_id.to_string())
1966 .fetch_all(&*conn)
1967 .await?
1968 .into_iter()
1969 .map(sql_row_to_blind_signature)
1970 .collect::<Result<Vec<BlindSignature>, _>>()?)
1971 }
1972
1973 async fn get_total_issued(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
1975 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1976 query(
1977 r#"
1978 SELECT
1979 keyset_id,
1980 total_issued as amount
1981 FROM
1982 keyset_amounts
1983 "#,
1984 )?
1985 .fetch_all(&*conn)
1986 .await?
1987 .into_iter()
1988 .map(sql_row_to_hashmap_amount)
1989 .collect()
1990 }
1991}
1992
1993#[async_trait]
1994impl<RM> database::MintKVStoreTransaction<'_, Error> for SQLTransaction<RM>
1995where
1996 RM: DatabasePool + 'static,
1997{
1998 async fn kv_read(
1999 &mut self,
2000 primary_namespace: &str,
2001 secondary_namespace: &str,
2002 key: &str,
2003 ) -> Result<Option<Vec<u8>>, Error> {
2004 validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2006 Ok(query(
2007 r#"
2008 SELECT value
2009 FROM kv_store
2010 WHERE primary_namespace = :primary_namespace
2011 AND secondary_namespace = :secondary_namespace
2012 AND key = :key
2013 "#,
2014 )?
2015 .bind("primary_namespace", primary_namespace.to_owned())
2016 .bind("secondary_namespace", secondary_namespace.to_owned())
2017 .bind("key", key.to_owned())
2018 .pluck(&self.inner)
2019 .await?
2020 .and_then(|col| match col {
2021 Column::Blob(data) => Some(data),
2022 _ => None,
2023 }))
2024 }
2025
2026 async fn kv_write(
2027 &mut self,
2028 primary_namespace: &str,
2029 secondary_namespace: &str,
2030 key: &str,
2031 value: &[u8],
2032 ) -> Result<(), Error> {
2033 validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2035
2036 let current_time = unix_time();
2037
2038 query(
2039 r#"
2040 INSERT INTO kv_store
2041 (primary_namespace, secondary_namespace, key, value, created_time, updated_time)
2042 VALUES (:primary_namespace, :secondary_namespace, :key, :value, :created_time, :updated_time)
2043 ON CONFLICT(primary_namespace, secondary_namespace, key)
2044 DO UPDATE SET
2045 value = excluded.value,
2046 updated_time = excluded.updated_time
2047 "#,
2048 )?
2049 .bind("primary_namespace", primary_namespace.to_owned())
2050 .bind("secondary_namespace", secondary_namespace.to_owned())
2051 .bind("key", key.to_owned())
2052 .bind("value", value.to_vec())
2053 .bind("created_time", current_time as i64)
2054 .bind("updated_time", current_time as i64)
2055 .execute(&self.inner)
2056 .await?;
2057
2058 Ok(())
2059 }
2060
2061 async fn kv_remove(
2062 &mut self,
2063 primary_namespace: &str,
2064 secondary_namespace: &str,
2065 key: &str,
2066 ) -> Result<(), Error> {
2067 validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2069 query(
2070 r#"
2071 DELETE FROM kv_store
2072 WHERE primary_namespace = :primary_namespace
2073 AND secondary_namespace = :secondary_namespace
2074 AND key = :key
2075 "#,
2076 )?
2077 .bind("primary_namespace", primary_namespace.to_owned())
2078 .bind("secondary_namespace", secondary_namespace.to_owned())
2079 .bind("key", key.to_owned())
2080 .execute(&self.inner)
2081 .await?;
2082
2083 Ok(())
2084 }
2085
2086 async fn kv_list(
2087 &mut self,
2088 primary_namespace: &str,
2089 secondary_namespace: &str,
2090 ) -> Result<Vec<String>, Error> {
2091 cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
2093 cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
2094
2095 if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
2097 return Err(Error::KVStoreInvalidKey(
2098 "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
2099 ));
2100 }
2101 Ok(query(
2102 r#"
2103 SELECT key
2104 FROM kv_store
2105 WHERE primary_namespace = :primary_namespace
2106 AND secondary_namespace = :secondary_namespace
2107 ORDER BY key
2108 "#,
2109 )?
2110 .bind("primary_namespace", primary_namespace.to_owned())
2111 .bind("secondary_namespace", secondary_namespace.to_owned())
2112 .fetch_all(&self.inner)
2113 .await?
2114 .into_iter()
2115 .map(|row| Ok(column_as_string!(&row[0])))
2116 .collect::<Result<Vec<_>, Error>>()?)
2117 }
2118}
2119
2120#[async_trait]
2121impl<RM> database::MintKVStoreDatabase for SQLMintDatabase<RM>
2122where
2123 RM: DatabasePool + 'static,
2124{
2125 type Err = Error;
2126
2127 async fn kv_read(
2128 &self,
2129 primary_namespace: &str,
2130 secondary_namespace: &str,
2131 key: &str,
2132 ) -> Result<Option<Vec<u8>>, Error> {
2133 validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2135
2136 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2137 Ok(query(
2138 r#"
2139 SELECT value
2140 FROM kv_store
2141 WHERE primary_namespace = :primary_namespace
2142 AND secondary_namespace = :secondary_namespace
2143 AND key = :key
2144 "#,
2145 )?
2146 .bind("primary_namespace", primary_namespace.to_owned())
2147 .bind("secondary_namespace", secondary_namespace.to_owned())
2148 .bind("key", key.to_owned())
2149 .pluck(&*conn)
2150 .await?
2151 .and_then(|col| match col {
2152 Column::Blob(data) => Some(data),
2153 _ => None,
2154 }))
2155 }
2156
2157 async fn kv_list(
2158 &self,
2159 primary_namespace: &str,
2160 secondary_namespace: &str,
2161 ) -> Result<Vec<String>, Error> {
2162 cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
2164 cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
2165
2166 if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
2168 return Err(Error::KVStoreInvalidKey(
2169 "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
2170 ));
2171 }
2172
2173 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2174 Ok(query(
2175 r#"
2176 SELECT key
2177 FROM kv_store
2178 WHERE primary_namespace = :primary_namespace
2179 AND secondary_namespace = :secondary_namespace
2180 ORDER BY key
2181 "#,
2182 )?
2183 .bind("primary_namespace", primary_namespace.to_owned())
2184 .bind("secondary_namespace", secondary_namespace.to_owned())
2185 .fetch_all(&*conn)
2186 .await?
2187 .into_iter()
2188 .map(|row| Ok(column_as_string!(&row[0])))
2189 .collect::<Result<Vec<_>, Error>>()?)
2190 }
2191}
2192
2193#[async_trait]
2194impl<RM> database::MintKVStore for SQLMintDatabase<RM>
2195where
2196 RM: DatabasePool + 'static,
2197{
2198 async fn begin_transaction<'a>(
2199 &'a self,
2200 ) -> Result<Box<dyn database::MintKVStoreTransaction<'a, Self::Err> + Send + Sync + 'a>, Error>
2201 {
2202 Ok(Box::new(SQLTransaction {
2203 inner: ConnectionWithTransaction::new(
2204 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
2205 )
2206 .await?,
2207 }))
2208 }
2209}
2210
2211#[async_trait]
2212impl<RM> SagaTransaction<'_> for SQLTransaction<RM>
2213where
2214 RM: DatabasePool + 'static,
2215{
2216 type Err = Error;
2217
2218 async fn get_saga(
2219 &mut self,
2220 operation_id: &uuid::Uuid,
2221 ) -> Result<Option<mint::Saga>, Self::Err> {
2222 Ok(query(
2223 r#"
2224 SELECT
2225 operation_id,
2226 operation_kind,
2227 state,
2228 blinded_secrets,
2229 input_ys,
2230 quote_id,
2231 created_at,
2232 updated_at
2233 FROM
2234 saga_state
2235 WHERE
2236 operation_id = :operation_id
2237 FOR UPDATE
2238 "#,
2239 )?
2240 .bind("operation_id", operation_id.to_string())
2241 .fetch_one(&self.inner)
2242 .await?
2243 .map(sql_row_to_saga)
2244 .transpose()?)
2245 }
2246
2247 async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
2248 let current_time = unix_time();
2249
2250 let blinded_secrets_json = serde_json::to_string(&saga.blinded_secrets)
2251 .map_err(|e| Error::Internal(format!("Failed to serialize blinded_secrets: {e}")))?;
2252
2253 let input_ys_json = serde_json::to_string(&saga.input_ys)
2254 .map_err(|e| Error::Internal(format!("Failed to serialize input_ys: {e}")))?;
2255
2256 query(
2257 r#"
2258 INSERT INTO saga_state
2259 (operation_id, operation_kind, state, blinded_secrets, input_ys, quote_id, created_at, updated_at)
2260 VALUES
2261 (:operation_id, :operation_kind, :state, :blinded_secrets, :input_ys, :quote_id, :created_at, :updated_at)
2262 "#,
2263 )?
2264 .bind("operation_id", saga.operation_id.to_string())
2265 .bind("operation_kind", saga.operation_kind.to_string())
2266 .bind("state", saga.state.state())
2267 .bind("blinded_secrets", blinded_secrets_json)
2268 .bind("input_ys", input_ys_json)
2269 .bind("quote_id", saga.quote_id.as_deref())
2270 .bind("created_at", saga.created_at as i64)
2271 .bind("updated_at", current_time as i64)
2272 .execute(&self.inner)
2273 .await?;
2274
2275 Ok(())
2276 }
2277
2278 async fn update_saga(
2279 &mut self,
2280 operation_id: &uuid::Uuid,
2281 new_state: mint::SagaStateEnum,
2282 ) -> Result<(), Self::Err> {
2283 let current_time = unix_time();
2284
2285 query(
2286 r#"
2287 UPDATE saga_state
2288 SET state = :state, updated_at = :updated_at
2289 WHERE operation_id = :operation_id
2290 "#,
2291 )?
2292 .bind("state", new_state.state())
2293 .bind("updated_at", current_time as i64)
2294 .bind("operation_id", operation_id.to_string())
2295 .execute(&self.inner)
2296 .await?;
2297
2298 Ok(())
2299 }
2300
2301 async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
2302 query(
2303 r#"
2304 DELETE FROM saga_state
2305 WHERE operation_id = :operation_id
2306 "#,
2307 )?
2308 .bind("operation_id", operation_id.to_string())
2309 .execute(&self.inner)
2310 .await?;
2311
2312 Ok(())
2313 }
2314}
2315
2316#[async_trait]
2317impl<RM> SagaDatabase for SQLMintDatabase<RM>
2318where
2319 RM: DatabasePool + 'static,
2320{
2321 type Err = Error;
2322
2323 async fn get_incomplete_sagas(
2324 &self,
2325 operation_kind: mint::OperationKind,
2326 ) -> Result<Vec<mint::Saga>, Self::Err> {
2327 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2328 Ok(query(
2329 r#"
2330 SELECT
2331 operation_id,
2332 operation_kind,
2333 state,
2334 blinded_secrets,
2335 input_ys,
2336 quote_id,
2337 created_at,
2338 updated_at
2339 FROM
2340 saga_state
2341 WHERE
2342 operation_kind = :operation_kind
2343 ORDER BY created_at ASC
2344 "#,
2345 )?
2346 .bind("operation_kind", operation_kind.to_string())
2347 .fetch_all(&*conn)
2348 .await?
2349 .into_iter()
2350 .map(sql_row_to_saga)
2351 .collect::<Result<Vec<_>, _>>()?)
2352 }
2353}
2354
2355#[async_trait]
2356impl<RM> MintDatabase<Error> for SQLMintDatabase<RM>
2357where
2358 RM: DatabasePool + 'static,
2359{
2360 async fn begin_transaction<'a>(
2361 &'a self,
2362 ) -> Result<Box<dyn database::MintTransaction<'a, Error> + Send + Sync + 'a>, Error> {
2363 let tx = SQLTransaction {
2364 inner: ConnectionWithTransaction::new(
2365 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
2366 )
2367 .await?,
2368 };
2369
2370 Ok(Box::new(tx))
2371 }
2372}
2373
2374fn sql_row_to_keyset_info(row: Vec<Column>) -> Result<MintKeySetInfo, Error> {
2375 unpack_into!(
2376 let (
2377 id,
2378 unit,
2379 active,
2380 valid_from,
2381 valid_to,
2382 derivation_path,
2383 derivation_path_index,
2384 max_order,
2385 amounts,
2386 row_keyset_ppk
2387 ) = row
2388 );
2389
2390 let max_order: u8 = column_as_number!(max_order);
2391 let amounts = column_as_nullable_string!(amounts)
2392 .and_then(|str| serde_json::from_str(&str).ok())
2393 .unwrap_or_else(|| (0..max_order).map(|m| 2u64.pow(m.into())).collect());
2394
2395 Ok(MintKeySetInfo {
2396 id: column_as_string!(id, Id::from_str, Id::from_bytes),
2397 unit: column_as_string!(unit, CurrencyUnit::from_str),
2398 active: matches!(active, Column::Integer(1)),
2399 valid_from: column_as_number!(valid_from),
2400 derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
2401 derivation_path_index: column_as_nullable_number!(derivation_path_index),
2402 max_order,
2403 amounts,
2404 input_fee_ppk: column_as_number!(row_keyset_ppk),
2405 final_expiry: column_as_nullable_number!(valid_to),
2406 })
2407}
2408
2409#[instrument(skip_all)]
2410fn sql_row_to_mint_quote(
2411 row: Vec<Column>,
2412 payments: Vec<IncomingPayment>,
2413 issueances: Vec<Issuance>,
2414) -> Result<MintQuote, Error> {
2415 unpack_into!(
2416 let (
2417 id, amount, unit, request, expiry, request_lookup_id,
2418 pubkey, created_time, amount_paid, amount_issued, payment_method, request_lookup_id_kind
2419 ) = row
2420 );
2421
2422 let request_str = column_as_string!(&request);
2423 let request_lookup_id = column_as_nullable_string!(&request_lookup_id).unwrap_or_else(|| {
2424 Bolt11Invoice::from_str(&request_str)
2425 .map(|invoice| invoice.payment_hash().to_string())
2426 .unwrap_or_else(|_| request_str.clone())
2427 });
2428 let request_lookup_id_kind = column_as_string!(request_lookup_id_kind);
2429
2430 let pubkey = column_as_nullable_string!(&pubkey)
2431 .map(|pk| PublicKey::from_hex(&pk))
2432 .transpose()?;
2433
2434 let id = column_as_string!(id);
2435 let amount: Option<u64> = column_as_nullable_number!(amount);
2436 let amount_paid: u64 = column_as_number!(amount_paid);
2437 let amount_issued: u64 = column_as_number!(amount_issued);
2438 let payment_method = column_as_string!(payment_method, PaymentMethod::from_str);
2439
2440 Ok(MintQuote::new(
2441 Some(QuoteId::from_str(&id)?),
2442 request_str,
2443 column_as_string!(unit, CurrencyUnit::from_str),
2444 amount.map(Amount::from),
2445 column_as_number!(expiry),
2446 PaymentIdentifier::new(&request_lookup_id_kind, &request_lookup_id)
2447 .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
2448 pubkey,
2449 amount_paid.into(),
2450 amount_issued.into(),
2451 payment_method,
2452 column_as_number!(created_time),
2453 payments,
2454 issueances,
2455 ))
2456}
2457
2458fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<mint::MeltQuote, Error> {
2459 unpack_into!(
2460 let (
2461 id,
2462 unit,
2463 amount,
2464 request,
2465 fee_reserve,
2466 expiry,
2467 state,
2468 payment_preimage,
2469 request_lookup_id,
2470 created_time,
2471 paid_time,
2472 payment_method,
2473 options,
2474 request_lookup_id_kind
2475 ) = row
2476 );
2477
2478 let id = column_as_string!(id);
2479 let amount: u64 = column_as_number!(amount);
2480 let fee_reserve: u64 = column_as_number!(fee_reserve);
2481
2482 let expiry = column_as_number!(expiry);
2483 let payment_preimage = column_as_nullable_string!(payment_preimage);
2484 let options = column_as_nullable_string!(options);
2485 let options = options.and_then(|o| serde_json::from_str(&o).ok());
2486 let created_time: i64 = column_as_number!(created_time);
2487 let paid_time = column_as_nullable_number!(paid_time);
2488 let payment_method = PaymentMethod::from_str(&column_as_string!(payment_method))?;
2489
2490 let state =
2491 MeltQuoteState::from_str(&column_as_string!(&state)).map_err(ConversionError::from)?;
2492
2493 let unit = column_as_string!(unit);
2494 let request = column_as_string!(request);
2495
2496 let request_lookup_id_kind = column_as_nullable_string!(request_lookup_id_kind);
2497
2498 let request_lookup_id = column_as_nullable_string!(&request_lookup_id).or_else(|| {
2499 Bolt11Invoice::from_str(&request)
2500 .ok()
2501 .map(|invoice| invoice.payment_hash().to_string())
2502 });
2503
2504 let request_lookup_id = if let (Some(id_kind), Some(request_lookup_id)) =
2505 (request_lookup_id_kind, request_lookup_id)
2506 {
2507 Some(
2508 PaymentIdentifier::new(&id_kind, &request_lookup_id)
2509 .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
2510 )
2511 } else {
2512 None
2513 };
2514
2515 let request = match serde_json::from_str(&request) {
2516 Ok(req) => req,
2517 Err(err) => {
2518 tracing::debug!(
2519 "Melt quote from pre migrations defaulting to bolt11 {}.",
2520 err
2521 );
2522 let bolt11 = Bolt11Invoice::from_str(&request).unwrap();
2523 MeltPaymentRequest::Bolt11 { bolt11 }
2524 }
2525 };
2526
2527 Ok(MeltQuote {
2528 id: QuoteId::from_str(&id)?,
2529 unit: CurrencyUnit::from_str(&unit)?,
2530 amount: Amount::from(amount),
2531 request,
2532 fee_reserve: Amount::from(fee_reserve),
2533 state,
2534 expiry,
2535 payment_preimage,
2536 request_lookup_id,
2537 options,
2538 created_time: created_time as u64,
2539 paid_time,
2540 payment_method,
2541 })
2542}
2543
2544fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
2545 unpack_into!(
2546 let (
2547 amount,
2548 keyset_id,
2549 secret,
2550 c,
2551 witness
2552 ) = row
2553 );
2554
2555 let amount: u64 = column_as_number!(amount);
2556 Ok(Proof {
2557 amount: Amount::from(amount),
2558 keyset_id: column_as_string!(keyset_id, Id::from_str),
2559 secret: column_as_string!(secret, Secret::from_str),
2560 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2561 witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
2562 dleq: None,
2563 })
2564}
2565
2566fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
2567 unpack_into!(
2568 let (
2569 keyset_id, amount
2570 ) = row
2571 );
2572
2573 let amount: u64 = column_as_number!(amount);
2574 Ok((
2575 column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2576 Amount::from(amount),
2577 ))
2578}
2579
2580fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, Option<State>), Error> {
2581 unpack_into!(
2582 let (
2583 keyset_id, amount, secret, c, witness, state
2584 ) = row
2585 );
2586
2587 let amount: u64 = column_as_number!(amount);
2588 let state = column_as_nullable_string!(state).and_then(|s| State::from_str(&s).ok());
2589
2590 Ok((
2591 Proof {
2592 amount: Amount::from(amount),
2593 keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2594 secret: column_as_string!(secret, Secret::from_str),
2595 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2596 witness: column_as_nullable_string!(witness)
2597 .and_then(|w| serde_json::from_str(&w).ok()),
2598 dleq: None,
2599 },
2600 state,
2601 ))
2602}
2603
2604fn sql_row_to_blind_signature(row: Vec<Column>) -> Result<BlindSignature, Error> {
2605 unpack_into!(
2606 let (
2607 keyset_id, amount, c, dleq_e, dleq_s
2608 ) = row
2609 );
2610
2611 let dleq = match (
2612 column_as_nullable_string!(dleq_e),
2613 column_as_nullable_string!(dleq_s),
2614 ) {
2615 (Some(e), Some(s)) => Some(BlindSignatureDleq {
2616 e: SecretKey::from_hex(e)?,
2617 s: SecretKey::from_hex(s)?,
2618 }),
2619 _ => None,
2620 };
2621
2622 let amount: u64 = column_as_number!(amount);
2623
2624 Ok(BlindSignature {
2625 amount: Amount::from(amount),
2626 keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2627 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2628 dleq,
2629 })
2630}
2631
2632fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
2633 unpack_into!(
2634 let (
2635 operation_id,
2636 operation_kind,
2637 state,
2638 blinded_secrets,
2639 input_ys,
2640 quote_id,
2641 created_at,
2642 updated_at
2643 ) = row
2644 );
2645
2646 let operation_id_str = column_as_string!(&operation_id);
2647 let operation_id = uuid::Uuid::parse_str(&operation_id_str)
2648 .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
2649
2650 let operation_kind_str = column_as_string!(&operation_kind);
2651 let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
2652 .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
2653
2654 let state_str = column_as_string!(&state);
2655 let state = mint::SagaStateEnum::new(operation_kind, &state_str)
2656 .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
2657
2658 let blinded_secrets_str = column_as_string!(&blinded_secrets);
2659 let blinded_secrets: Vec<PublicKey> = serde_json::from_str(&blinded_secrets_str)
2660 .map_err(|e| Error::Internal(format!("Failed to deserialize blinded_secrets: {e}")))?;
2661
2662 let input_ys_str = column_as_string!(&input_ys);
2663 let input_ys: Vec<PublicKey> = serde_json::from_str(&input_ys_str)
2664 .map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {e}")))?;
2665
2666 let quote_id = match "e_id {
2667 Column::Text(s) => {
2668 if s.is_empty() {
2669 None
2670 } else {
2671 Some(s.clone())
2672 }
2673 }
2674 Column::Null => None,
2675 _ => None,
2676 };
2677
2678 let created_at: u64 = column_as_number!(created_at);
2679 let updated_at: u64 = column_as_number!(updated_at);
2680
2681 Ok(mint::Saga {
2682 operation_id,
2683 operation_kind,
2684 state,
2685 blinded_secrets,
2686 input_ys,
2687 quote_id,
2688 created_at,
2689 updated_at,
2690 })
2691}
2692
2693#[cfg(test)]
2694mod test {
2695 use super::*;
2696
2697 mod max_order_to_amounts_migrations {
2698 use super::*;
2699
2700 #[test]
2701 fn legacy_payload() {
2702 let result = sql_row_to_keyset_info(vec![
2703 Column::Text("0083a60439303340".to_owned()),
2704 Column::Text("sat".to_owned()),
2705 Column::Integer(1),
2706 Column::Integer(1749844864),
2707 Column::Null,
2708 Column::Text("0'/0'/0'".to_owned()),
2709 Column::Integer(0),
2710 Column::Integer(32),
2711 Column::Null,
2712 Column::Integer(0),
2713 ]);
2714 assert!(result.is_ok());
2715 }
2716
2717 #[test]
2718 fn migrated_payload() {
2719 let legacy = sql_row_to_keyset_info(vec![
2720 Column::Text("0083a60439303340".to_owned()),
2721 Column::Text("sat".to_owned()),
2722 Column::Integer(1),
2723 Column::Integer(1749844864),
2724 Column::Null,
2725 Column::Text("0'/0'/0'".to_owned()),
2726 Column::Integer(0),
2727 Column::Integer(32),
2728 Column::Null,
2729 Column::Integer(0),
2730 ]);
2731 assert!(legacy.is_ok());
2732
2733 let amounts = (0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
2734 let migrated = sql_row_to_keyset_info(vec![
2735 Column::Text("0083a60439303340".to_owned()),
2736 Column::Text("sat".to_owned()),
2737 Column::Integer(1),
2738 Column::Integer(1749844864),
2739 Column::Null,
2740 Column::Text("0'/0'/0'".to_owned()),
2741 Column::Integer(0),
2742 Column::Integer(32),
2743 Column::Text(serde_json::to_string(&amounts).expect("valid json")),
2744 Column::Integer(0),
2745 ]);
2746 assert!(migrated.is_ok());
2747 assert_eq!(legacy.unwrap(), migrated.unwrap());
2748 }
2749
2750 #[test]
2751 fn amounts_over_max_order() {
2752 let legacy = sql_row_to_keyset_info(vec![
2753 Column::Text("0083a60439303340".to_owned()),
2754 Column::Text("sat".to_owned()),
2755 Column::Integer(1),
2756 Column::Integer(1749844864),
2757 Column::Null,
2758 Column::Text("0'/0'/0'".to_owned()),
2759 Column::Integer(0),
2760 Column::Integer(32),
2761 Column::Null,
2762 Column::Integer(0),
2763 ]);
2764 assert!(legacy.is_ok());
2765
2766 let amounts = (0..16).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
2767 let migrated = sql_row_to_keyset_info(vec![
2768 Column::Text("0083a60439303340".to_owned()),
2769 Column::Text("sat".to_owned()),
2770 Column::Integer(1),
2771 Column::Integer(1749844864),
2772 Column::Null,
2773 Column::Text("0'/0'/0'".to_owned()),
2774 Column::Integer(0),
2775 Column::Integer(32),
2776 Column::Text(serde_json::to_string(&amounts).expect("valid json")),
2777 Column::Integer(0),
2778 ]);
2779 assert!(migrated.is_ok());
2780 let migrated = migrated.unwrap();
2781 assert_ne!(legacy.unwrap(), migrated);
2782 assert_eq!(migrated.amounts.len(), 16);
2783 }
2784 }
2785}