1use std::collections::HashMap;
12use std::fmt::Debug;
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use bitcoin::bip32::DerivationPath;
18use cdk_common::database::mint::{validate_kvstore_params, SagaDatabase, SagaTransaction};
19use cdk_common::database::{
20 self, ConversionError, Error, MintDatabase, MintDbWriterFinalizer, MintKeyDatabaseTransaction,
21 MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, MintQuotesTransaction,
22 MintSignatureTransaction, MintSignaturesDatabase,
23};
24use cdk_common::mint::{
25 self, IncomingPayment, Issuance, MeltPaymentRequest, MeltQuote, MintKeySetInfo, MintQuote,
26 Operation,
27};
28use cdk_common::nut00::ProofsMethods;
29use cdk_common::payment::PaymentIdentifier;
30use cdk_common::quote_id::QuoteId;
31use cdk_common::secret::Secret;
32use cdk_common::state::{check_melt_quote_state_transition, check_state_transition};
33use cdk_common::util::unix_time;
34use cdk_common::{
35 Amount, BlindSignature, BlindSignatureDleq, BlindedMessage, CurrencyUnit, Id, MeltQuoteState,
36 PaymentMethod, Proof, Proofs, PublicKey, SecretKey, State,
37};
38use lightning_invoice::Bolt11Invoice;
39use migrations::MIGRATIONS;
40use tracing::instrument;
41
42use crate::common::migrate;
43use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
44use crate::pool::{DatabasePool, Pool, PooledResource};
45use crate::stmt::{query, Column};
46use crate::{
47 column_as_nullable_number, column_as_nullable_string, column_as_number, column_as_string,
48 unpack_into,
49};
50
51#[cfg(feature = "auth")]
52mod auth;
53
54#[rustfmt::skip]
55mod migrations {
56 include!(concat!(env!("OUT_DIR"), "/migrations_mint.rs"));
57}
58
59#[cfg(feature = "auth")]
60pub use auth::SQLMintAuthDatabase;
61#[cfg(feature = "prometheus")]
62use cdk_prometheus::METRICS;
63
64#[derive(Debug, Clone)]
66pub struct SQLMintDatabase<RM>
67where
68 RM: DatabasePool + 'static,
69{
70 pool: Arc<Pool<RM>>,
71}
72
73pub struct SQLTransaction<RM>
75where
76 RM: DatabasePool + 'static,
77{
78 inner: ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
79}
80
81#[inline(always)]
82async fn get_current_states<C>(
83 conn: &C,
84 ys: &[PublicKey],
85) -> Result<HashMap<PublicKey, State>, Error>
86where
87 C: DatabaseExecutor + Send + Sync,
88{
89 if ys.is_empty() {
90 return Ok(Default::default());
91 }
92 query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)?
93 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
94 .fetch_all(conn)
95 .await?
96 .into_iter()
97 .map(|row| {
98 Ok((
99 column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
100 column_as_string!(&row[1], State::from_str),
101 ))
102 })
103 .collect::<Result<HashMap<_, _>, _>>()
104}
105
106impl<RM> SQLMintDatabase<RM>
107where
108 RM: DatabasePool + 'static,
109{
110 pub async fn new<X>(db: X) -> Result<Self, Error>
112 where
113 X: Into<RM::Config>,
114 {
115 let pool = Pool::new(db.into());
116
117 Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
118
119 Ok(Self { pool })
120 }
121
122 async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
124 let tx = ConnectionWithTransaction::new(conn).await?;
125 migrate(&tx, RM::Connection::name(), MIGRATIONS).await?;
126 tx.commit().await?;
127 Ok(())
128 }
129}
130
131#[async_trait]
132impl<RM> database::MintProofsTransaction<'_> for SQLTransaction<RM>
133where
134 RM: DatabasePool + 'static,
135{
136 type Err = Error;
137
138 async fn add_proofs(
139 &mut self,
140 proofs: Proofs,
141 quote_id: Option<QuoteId>,
142 operation: &Operation,
143 ) -> Result<(), Self::Err> {
144 let current_time = unix_time();
145
146 match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
149 .bind_vec(
150 "ys",
151 proofs
152 .iter()
153 .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
154 .collect::<Result<_, _>>()?,
155 )
156 .pluck(&self.inner)
157 .await?
158 .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
159 .transpose()?
160 {
161 Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
162 Some(_) => Err(database::Error::Duplicate),
163 None => Ok(()), }?;
165
166 for proof in proofs {
167 query(
168 r#"
169 INSERT INTO proof
170 (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
171 VALUES
172 (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
173 "#,
174 )?
175 .bind("y", proof.y()?.to_bytes().to_vec())
176 .bind("amount", proof.amount.to_i64())
177 .bind("keyset_id", proof.keyset_id.to_string())
178 .bind("secret", proof.secret.to_string())
179 .bind("c", proof.c.to_bytes().to_vec())
180 .bind(
181 "witness",
182 proof.witness.map(|w| serde_json::to_string(&w).unwrap()),
183 )
184 .bind("state", "UNSPENT".to_string())
185 .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
186 .bind("created_time", current_time as i64)
187 .bind("operation_kind", operation.kind())
188 .bind("operation_id", operation.id().to_string())
189 .execute(&self.inner)
190 .await?;
191 }
192
193 Ok(())
194 }
195
196 async fn update_proofs_states(
197 &mut self,
198 ys: &[PublicKey],
199 new_state: State,
200 ) -> Result<Vec<Option<State>>, Self::Err> {
201 let mut current_states = get_current_states(&self.inner, ys).await?;
202
203 if current_states.len() != ys.len() {
204 tracing::warn!(
205 "Attempted to update state of non-existent proof {} {}",
206 current_states.len(),
207 ys.len()
208 );
209 return Err(database::Error::ProofNotFound);
210 }
211
212 for state in current_states.values() {
213 check_state_transition(*state, new_state)?;
214 }
215
216 query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
217 .bind("new_state", new_state.to_string())
218 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
219 .execute(&self.inner)
220 .await?;
221
222 if new_state == State::Spent {
223 query(
224 r#"
225 INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
226 SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
227 FROM proof
228 WHERE y IN (:ys)
229 GROUP BY keyset_id
230 ON CONFLICT (keyset_id)
231 DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
232 "#,
233 )?
234 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
235 .execute(&self.inner)
236 .await?;
237 }
238
239 Ok(ys.iter().map(|y| current_states.remove(y)).collect())
240 }
241
242 async fn remove_proofs(
243 &mut self,
244 ys: &[PublicKey],
245 _quote_id: Option<QuoteId>,
246 ) -> Result<(), Self::Err> {
247 if ys.is_empty() {
248 return Ok(());
249 }
250 let total_deleted = query(
251 r#"
252 DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
253 "#,
254 )?
255 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
256 .bind_vec("exclude_state", vec![State::Spent.to_string()])
257 .execute(&self.inner)
258 .await?;
259
260 if total_deleted != ys.len() {
261 let current_states = get_current_states(&self.inner, ys).await?;
263
264 let missing_count = ys.len() - current_states.len();
265 let spent_count = current_states
266 .values()
267 .filter(|s| **s == State::Spent)
268 .count();
269
270 if missing_count > 0 {
271 tracing::warn!(
272 "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
273 missing_count,
274 ys.len()
275 );
276 }
277
278 if spent_count > 0 {
279 tracing::warn!(
280 "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
281 spent_count,
282 ys.len()
283 );
284 }
285
286 tracing::debug!(
287 "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
288 ys.len(),
289 total_deleted,
290 missing_count,
291 spent_count
292 );
293
294 return Err(Self::Err::AttemptRemoveSpentProof);
295 }
296
297 Ok(())
298 }
299
300 async fn get_proof_ys_by_quote_id(
301 &self,
302 quote_id: &QuoteId,
303 ) -> Result<Vec<PublicKey>, Self::Err> {
304 Ok(query(
305 r#"
306 SELECT
307 amount,
308 keyset_id,
309 secret,
310 c,
311 witness
312 FROM
313 proof
314 WHERE
315 quote_id = :quote_id
316 "#,
317 )?
318 .bind("quote_id", quote_id.to_string())
319 .fetch_all(&self.inner)
320 .await?
321 .into_iter()
322 .map(sql_row_to_proof)
323 .collect::<Result<Vec<Proof>, _>>()?
324 .ys()?)
325 }
326}
327
328#[async_trait]
329impl<RM> database::MintTransaction<'_, Error> for SQLTransaction<RM> where RM: DatabasePool + 'static
330{}
331
332#[async_trait]
333impl<RM> MintDbWriterFinalizer for SQLTransaction<RM>
334where
335 RM: DatabasePool + 'static,
336{
337 type Err = Error;
338
339 async fn commit(self: Box<Self>) -> Result<(), Error> {
340 let result = self.inner.commit().await;
341 #[cfg(feature = "prometheus")]
342 {
343 let success = result.is_ok();
344 METRICS.record_mint_operation("transaction_commit", success);
345 METRICS.record_mint_operation_histogram("transaction_commit", success, 1.0);
346 }
347
348 Ok(result?)
349 }
350
351 async fn rollback(self: Box<Self>) -> Result<(), Error> {
352 let result = self.inner.rollback().await;
353
354 #[cfg(feature = "prometheus")]
355 {
356 let success = result.is_ok();
357 METRICS.record_mint_operation("transaction_rollback", success);
358 METRICS.record_mint_operation_histogram("transaction_rollback", success, 1.0);
359 }
360 Ok(result?)
361 }
362}
363
364#[inline(always)]
365async fn get_mint_quote_payments<C>(
366 conn: &C,
367 quote_id: &QuoteId,
368) -> Result<Vec<IncomingPayment>, Error>
369where
370 C: DatabaseExecutor + Send + Sync,
371{
372 query(
374 r#"
375 SELECT
376 payment_id,
377 timestamp,
378 amount
379 FROM
380 mint_quote_payments
381 WHERE
382 quote_id=:quote_id
383 "#,
384 )?
385 .bind("quote_id", quote_id.to_string())
386 .fetch_all(conn)
387 .await?
388 .into_iter()
389 .map(|row| {
390 let amount: u64 = column_as_number!(row[2].clone());
391 let time: u64 = column_as_number!(row[1].clone());
392 Ok(IncomingPayment::new(
393 amount.into(),
394 column_as_string!(&row[0]),
395 time,
396 ))
397 })
398 .collect()
399}
400
401#[inline(always)]
402async fn get_mint_quote_issuance<C>(conn: &C, quote_id: &QuoteId) -> Result<Vec<Issuance>, Error>
403where
404 C: DatabaseExecutor + Send + Sync,
405{
406 query(
408 r#"
409SELECT amount, timestamp
410FROM mint_quote_issued
411WHERE quote_id=:quote_id
412 "#,
413 )?
414 .bind("quote_id", quote_id.to_string())
415 .fetch_all(conn)
416 .await?
417 .into_iter()
418 .map(|row| {
419 let time: u64 = column_as_number!(row[1].clone());
420 Ok(Issuance::new(
421 Amount::from_i64(column_as_number!(row[0].clone()))
422 .expect("Is amount when put into db"),
423 time,
424 ))
425 })
426 .collect()
427}
428
429#[async_trait]
430impl<RM> MintKeyDatabaseTransaction<'_, Error> for SQLTransaction<RM>
431where
432 RM: DatabasePool + 'static,
433{
434 async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error> {
435 query(
436 r#"
437 INSERT INTO
438 keyset (
439 id, unit, active, valid_from, valid_to, derivation_path,
440 max_order, amounts, input_fee_ppk, derivation_path_index
441 )
442 VALUES (
443 :id, :unit, :active, :valid_from, :valid_to, :derivation_path,
444 :max_order, :amounts, :input_fee_ppk, :derivation_path_index
445 )
446 ON CONFLICT(id) DO UPDATE SET
447 unit = excluded.unit,
448 active = excluded.active,
449 valid_from = excluded.valid_from,
450 valid_to = excluded.valid_to,
451 derivation_path = excluded.derivation_path,
452 max_order = excluded.max_order,
453 amounts = excluded.amounts,
454 input_fee_ppk = excluded.input_fee_ppk,
455 derivation_path_index = excluded.derivation_path_index
456 "#,
457 )?
458 .bind("id", keyset.id.to_string())
459 .bind("unit", keyset.unit.to_string())
460 .bind("active", keyset.active)
461 .bind("valid_from", keyset.valid_from as i64)
462 .bind("valid_to", keyset.final_expiry.map(|v| v as i64))
463 .bind("derivation_path", keyset.derivation_path.to_string())
464 .bind("max_order", keyset.max_order)
465 .bind("amounts", serde_json::to_string(&keyset.amounts).ok())
466 .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
467 .bind("derivation_path_index", keyset.derivation_path_index)
468 .execute(&self.inner)
469 .await?;
470
471 Ok(())
472 }
473
474 async fn set_active_keyset(&mut self, unit: CurrencyUnit, id: Id) -> Result<(), Error> {
475 query(r#"UPDATE keyset SET active=FALSE WHERE unit = :unit"#)?
476 .bind("unit", unit.to_string())
477 .execute(&self.inner)
478 .await?;
479
480 query(r#"UPDATE keyset SET active=TRUE WHERE unit = :unit AND id = :id"#)?
481 .bind("unit", unit.to_string())
482 .bind("id", id.to_string())
483 .execute(&self.inner)
484 .await?;
485
486 Ok(())
487 }
488}
489
490#[async_trait]
491impl<RM> MintKeysDatabase for SQLMintDatabase<RM>
492where
493 RM: DatabasePool + 'static,
494{
495 type Err = Error;
496
497 async fn begin_transaction<'a>(
498 &'a self,
499 ) -> Result<Box<dyn MintKeyDatabaseTransaction<'a, Error> + Send + Sync + 'a>, Error> {
500 let tx = SQLTransaction {
501 inner: ConnectionWithTransaction::new(
502 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
503 )
504 .await?,
505 };
506
507 Ok(Box::new(tx))
508 }
509
510 async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
511 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
512 Ok(
513 query(r#" SELECT id FROM keyset WHERE active = :active AND unit = :unit"#)?
514 .bind("active", true)
515 .bind("unit", unit.to_string())
516 .pluck(&*conn)
517 .await?
518 .map(|id| match id {
519 Column::Text(text) => Ok(Id::from_str(&text)?),
520 Column::Blob(id) => Ok(Id::from_bytes(&id)?),
521 _ => Err(Error::InvalidKeysetId),
522 })
523 .transpose()?,
524 )
525 }
526
527 async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> {
528 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
529 Ok(
530 query(r#"SELECT id, unit FROM keyset WHERE active = :active"#)?
531 .bind("active", true)
532 .fetch_all(&*conn)
533 .await?
534 .into_iter()
535 .map(|row| {
536 Ok((
537 column_as_string!(&row[1], CurrencyUnit::from_str),
538 column_as_string!(&row[0], Id::from_str, Id::from_bytes),
539 ))
540 })
541 .collect::<Result<HashMap<_, _>, Error>>()?,
542 )
543 }
544
545 async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
546 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
547 Ok(query(
548 r#"SELECT
549 id,
550 unit,
551 active,
552 valid_from,
553 valid_to,
554 derivation_path,
555 derivation_path_index,
556 max_order,
557 amounts,
558 input_fee_ppk
559 FROM
560 keyset
561 WHERE id=:id"#,
562 )?
563 .bind("id", id.to_string())
564 .fetch_one(&*conn)
565 .await?
566 .map(sql_row_to_keyset_info)
567 .transpose()?)
568 }
569
570 async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
571 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
572 Ok(query(
573 r#"SELECT
574 id,
575 unit,
576 active,
577 valid_from,
578 valid_to,
579 derivation_path,
580 derivation_path_index,
581 max_order,
582 amounts,
583 input_fee_ppk
584 FROM
585 keyset
586 "#,
587 )?
588 .fetch_all(&*conn)
589 .await?
590 .into_iter()
591 .map(sql_row_to_keyset_info)
592 .collect::<Result<Vec<_>, _>>()?)
593 }
594}
595
596#[async_trait]
597impl<RM> MintQuotesTransaction<'_> for SQLTransaction<RM>
598where
599 RM: DatabasePool + 'static,
600{
601 type Err = Error;
602
603 async fn add_melt_request(
604 &mut self,
605 quote_id: &QuoteId,
606 inputs_amount: Amount,
607 inputs_fee: Amount,
608 ) -> Result<(), Self::Err> {
609 query(
611 r#"
612 INSERT INTO melt_request
613 (quote_id, inputs_amount, inputs_fee)
614 VALUES
615 (:quote_id, :inputs_amount, :inputs_fee)
616 "#,
617 )?
618 .bind("quote_id", quote_id.to_string())
619 .bind("inputs_amount", inputs_amount.to_i64())
620 .bind("inputs_fee", inputs_fee.to_i64())
621 .execute(&self.inner)
622 .await?;
623
624 Ok(())
625 }
626
627 async fn add_blinded_messages(
628 &mut self,
629 quote_id: Option<&QuoteId>,
630 blinded_messages: &[BlindedMessage],
631 operation: &Operation,
632 ) -> Result<(), Self::Err> {
633 let current_time = unix_time();
634
635 for message in blinded_messages {
638 match query(
639 r#"
640 INSERT INTO blind_signature
641 (blinded_message, amount, keyset_id, c, quote_id, created_time, operation_kind, operation_id)
642 VALUES
643 (:blinded_message, :amount, :keyset_id, NULL, :quote_id, :created_time, :operation_kind, :operation_id)
644 "#,
645 )?
646 .bind(
647 "blinded_message",
648 message.blinded_secret.to_bytes().to_vec(),
649 )
650 .bind("amount", message.amount.to_i64())
651 .bind("keyset_id", message.keyset_id.to_string())
652 .bind("quote_id", quote_id.map(|q| q.to_string()))
653 .bind("created_time", current_time as i64)
654 .bind("operation_kind", operation.kind())
655 .bind("operation_id", operation.id().to_string())
656 .execute(&self.inner)
657 .await
658 {
659 Ok(_) => continue,
660 Err(database::Error::Duplicate) => {
661 return Err(database::Error::Duplicate);
666 }
667 Err(err) => return Err(err),
668 }
669 }
670
671 Ok(())
672 }
673
674 async fn delete_blinded_messages(
675 &mut self,
676 blinded_secrets: &[PublicKey],
677 ) -> Result<(), Self::Err> {
678 if blinded_secrets.is_empty() {
679 return Ok(());
680 }
681
682 query(
685 r#"
686 DELETE FROM blind_signature
687 WHERE blinded_message IN (:blinded_secrets) AND c IS NULL
688 "#,
689 )?
690 .bind_vec(
691 "blinded_secrets",
692 blinded_secrets
693 .iter()
694 .map(|secret| secret.to_bytes().to_vec())
695 .collect(),
696 )
697 .execute(&self.inner)
698 .await?;
699
700 Ok(())
701 }
702
703 async fn get_melt_request_and_blinded_messages(
704 &mut self,
705 quote_id: &QuoteId,
706 ) -> Result<Option<database::mint::MeltRequestInfo>, Self::Err> {
707 let melt_request_row = query(
708 r#"
709 SELECT inputs_amount, inputs_fee
710 FROM melt_request
711 WHERE quote_id = :quote_id
712 FOR UPDATE
713 "#,
714 )?
715 .bind("quote_id", quote_id.to_string())
716 .fetch_one(&self.inner)
717 .await?;
718
719 if let Some(row) = melt_request_row {
720 let inputs_amount: u64 = column_as_number!(row[0].clone());
721 let inputs_fee: u64 = column_as_number!(row[1].clone());
722
723 let blinded_messages_rows = query(
725 r#"
726 SELECT blinded_message, keyset_id, amount
727 FROM blind_signature
728 WHERE quote_id = :quote_id AND c IS NULL
729 "#,
730 )?
731 .bind("quote_id", quote_id.to_string())
732 .fetch_all(&self.inner)
733 .await?;
734
735 let blinded_messages: Result<Vec<BlindedMessage>, Error> = blinded_messages_rows
736 .into_iter()
737 .map(|row| -> Result<BlindedMessage, Error> {
738 let blinded_message_key =
739 column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice);
740 let keyset_id = column_as_string!(&row[1], Id::from_str, Id::from_bytes);
741 let amount: u64 = column_as_number!(row[2].clone());
742
743 Ok(BlindedMessage {
744 blinded_secret: blinded_message_key,
745 keyset_id,
746 amount: Amount::from(amount),
747 witness: None, })
749 })
750 .collect();
751 let blinded_messages = blinded_messages?;
752
753 Ok(Some(database::mint::MeltRequestInfo {
754 inputs_amount: Amount::from(inputs_amount),
755 inputs_fee: Amount::from(inputs_fee),
756 change_outputs: blinded_messages,
757 }))
758 } else {
759 Ok(None)
760 }
761 }
762
763 async fn delete_melt_request(&mut self, quote_id: &QuoteId) -> Result<(), Self::Err> {
764 query(
766 r#"
767 DELETE FROM melt_request
768 WHERE quote_id = :quote_id
769 "#,
770 )?
771 .bind("quote_id", quote_id.to_string())
772 .execute(&self.inner)
773 .await?;
774
775 query(
777 r#"
778 DELETE FROM blind_signature
779 WHERE quote_id = :quote_id AND c IS NULL
780 "#,
781 )?
782 .bind("quote_id", quote_id.to_string())
783 .execute(&self.inner)
784 .await?;
785
786 Ok(())
787 }
788
789 #[instrument(skip(self))]
790 async fn increment_mint_quote_amount_paid(
791 &mut self,
792 quote_id: &QuoteId,
793 amount_paid: Amount,
794 payment_id: String,
795 ) -> Result<Amount, Self::Err> {
796 if amount_paid == Amount::ZERO {
797 tracing::warn!("Amount payments of zero amount should not be recorded.");
798 return Err(Error::Duplicate);
799 }
800
801 let exists = query(
803 r#"
804 SELECT payment_id
805 FROM mint_quote_payments
806 WHERE payment_id = :payment_id
807 FOR UPDATE
808 "#,
809 )?
810 .bind("payment_id", payment_id.clone())
811 .fetch_one(&self.inner)
812 .await?;
813
814 if exists.is_some() {
815 tracing::error!("Payment ID already exists: {}", payment_id);
816 return Err(database::Error::Duplicate);
817 }
818
819 let current_amount = query(
821 r#"
822 SELECT amount_paid
823 FROM mint_quote
824 WHERE id = :quote_id
825 FOR UPDATE
826 "#,
827 )?
828 .bind("quote_id", quote_id.to_string())
829 .fetch_one(&self.inner)
830 .await
831 .inspect_err(|err| {
832 tracing::error!("SQLite could not get mint quote amount_paid: {}", err);
833 })?;
834
835 let current_amount_paid = if let Some(current_amount) = current_amount {
836 let amount: u64 = column_as_number!(current_amount[0].clone());
837 Amount::from(amount)
838 } else {
839 Amount::ZERO
840 };
841
842 let new_amount_paid = current_amount_paid
844 .checked_add(amount_paid)
845 .ok_or_else(|| database::Error::AmountOverflow)?;
846
847 tracing::debug!(
848 "Mint quote {} amount paid was {} is now {}.",
849 quote_id,
850 current_amount_paid,
851 new_amount_paid
852 );
853
854 query(
856 r#"
857 UPDATE mint_quote
858 SET amount_paid = :amount_paid
859 WHERE id = :quote_id
860 "#,
861 )?
862 .bind("amount_paid", new_amount_paid.to_i64())
863 .bind("quote_id", quote_id.to_string())
864 .execute(&self.inner)
865 .await
866 .inspect_err(|err| {
867 tracing::error!("SQLite could not update mint quote amount_paid: {}", err);
868 })?;
869
870 query(
872 r#"
873 INSERT INTO mint_quote_payments
874 (quote_id, payment_id, amount, timestamp)
875 VALUES (:quote_id, :payment_id, :amount, :timestamp)
876 "#,
877 )?
878 .bind("quote_id", quote_id.to_string())
879 .bind("payment_id", payment_id)
880 .bind("amount", amount_paid.to_i64())
881 .bind("timestamp", unix_time() as i64)
882 .execute(&self.inner)
883 .await
884 .map_err(|err| {
885 tracing::error!("SQLite could not insert payment ID: {}", err);
886 err
887 })?;
888
889 Ok(new_amount_paid)
890 }
891
892 #[instrument(skip_all)]
893 async fn increment_mint_quote_amount_issued(
894 &mut self,
895 quote_id: &QuoteId,
896 amount_issued: Amount,
897 ) -> Result<Amount, Self::Err> {
898 let current_amounts = query(
900 r#"
901 SELECT amount_issued, amount_paid
902 FROM mint_quote
903 WHERE id = :quote_id
904 FOR UPDATE
905 "#,
906 )?
907 .bind("quote_id", quote_id.to_string())
908 .fetch_one(&self.inner)
909 .await
910 .inspect_err(|err| {
911 tracing::error!("SQLite could not get mint quote amount_issued: {}", err);
912 })?
913 .ok_or(Error::QuoteNotFound)?;
914
915 let new_amount_issued = {
916 unpack_into!(
918 let (current_amount_issued, current_amount_paid) = current_amounts
919 );
920
921 let current_amount_issued: u64 = column_as_number!(current_amount_issued);
922 let current_amount_paid: u64 = column_as_number!(current_amount_paid);
923
924 let current_amount_issued = Amount::from(current_amount_issued);
925 let current_amount_paid = Amount::from(current_amount_paid);
926
927 let new_amount_issued = current_amount_issued
929 .checked_add(amount_issued)
930 .ok_or_else(|| database::Error::AmountOverflow)?;
931
932 current_amount_paid
933 .checked_sub(new_amount_issued)
934 .ok_or(Error::Internal("Over-issued not allowed".to_owned()))?;
935
936 new_amount_issued
937 };
938
939 query(
941 r#"
942 UPDATE mint_quote
943 SET amount_issued = :amount_issued
944 WHERE id = :quote_id
945 "#,
946 )?
947 .bind("amount_issued", new_amount_issued.to_i64())
948 .bind("quote_id", quote_id.to_string())
949 .execute(&self.inner)
950 .await
951 .inspect_err(|err| {
952 tracing::error!("SQLite could not update mint quote amount_issued: {}", err);
953 })?;
954
955 let current_time = unix_time();
956
957 query(
958 r#"
959INSERT INTO mint_quote_issued
960(quote_id, amount, timestamp)
961VALUES (:quote_id, :amount, :timestamp);
962 "#,
963 )?
964 .bind("quote_id", quote_id.to_string())
965 .bind("amount", amount_issued.to_i64())
966 .bind("timestamp", current_time as i64)
967 .execute(&self.inner)
968 .await?;
969
970 Ok(new_amount_issued)
971 }
972
973 #[instrument(skip_all)]
974 async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), Self::Err> {
975 query(
976 r#"
977 INSERT INTO mint_quote (
978 id, amount, unit, request, expiry, request_lookup_id, pubkey, created_time, payment_method, request_lookup_id_kind
979 )
980 VALUES (
981 :id, :amount, :unit, :request, :expiry, :request_lookup_id, :pubkey, :created_time, :payment_method, :request_lookup_id_kind
982 )
983 "#,
984 )?
985 .bind("id", quote.id.to_string())
986 .bind("amount", quote.amount.map(|a| a.to_i64()))
987 .bind("unit", quote.unit.to_string())
988 .bind("request", quote.request)
989 .bind("expiry", quote.expiry as i64)
990 .bind(
991 "request_lookup_id",
992 quote.request_lookup_id.to_string(),
993 )
994 .bind("pubkey", quote.pubkey.map(|p| p.to_string()))
995 .bind("created_time", quote.created_time as i64)
996 .bind("payment_method", quote.payment_method.to_string())
997 .bind("request_lookup_id_kind", quote.request_lookup_id.kind())
998 .execute(&self.inner)
999 .await?;
1000
1001 Ok(())
1002 }
1003
1004 async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err> {
1005 query(
1007 r#"
1008 INSERT INTO melt_quote
1009 (
1010 id, unit, amount, request, fee_reserve, state,
1011 expiry, payment_preimage, request_lookup_id,
1012 created_time, paid_time, options, request_lookup_id_kind, payment_method
1013 )
1014 VALUES
1015 (
1016 :id, :unit, :amount, :request, :fee_reserve, :state,
1017 :expiry, :payment_preimage, :request_lookup_id,
1018 :created_time, :paid_time, :options, :request_lookup_id_kind, :payment_method
1019 )
1020 "#,
1021 )?
1022 .bind("id", quote.id.to_string())
1023 .bind("unit", quote.unit.to_string())
1024 .bind("amount", quote.amount.to_i64())
1025 .bind("request", serde_json::to_string("e.request)?)
1026 .bind("fee_reserve", quote.fee_reserve.to_i64())
1027 .bind("state", quote.state.to_string())
1028 .bind("expiry", quote.expiry as i64)
1029 .bind("payment_preimage", quote.payment_preimage)
1030 .bind(
1031 "request_lookup_id",
1032 quote.request_lookup_id.as_ref().map(|id| id.to_string()),
1033 )
1034 .bind("created_time", quote.created_time as i64)
1035 .bind("paid_time", quote.paid_time.map(|t| t as i64))
1036 .bind(
1037 "options",
1038 quote.options.map(|o| serde_json::to_string(&o).ok()),
1039 )
1040 .bind(
1041 "request_lookup_id_kind",
1042 quote.request_lookup_id.map(|id| id.kind()),
1043 )
1044 .bind("payment_method", quote.payment_method.to_string())
1045 .execute(&self.inner)
1046 .await?;
1047
1048 Ok(())
1049 }
1050
1051 async fn update_melt_quote_request_lookup_id(
1052 &mut self,
1053 quote_id: &QuoteId,
1054 new_request_lookup_id: &PaymentIdentifier,
1055 ) -> Result<(), Self::Err> {
1056 query(r#"UPDATE melt_quote SET request_lookup_id = :new_req_id, request_lookup_id_kind = :new_kind WHERE id = :id"#)?
1057 .bind("new_req_id", new_request_lookup_id.to_string())
1058 .bind("new_kind",new_request_lookup_id.kind() )
1059 .bind("id", quote_id.to_string())
1060 .execute(&self.inner)
1061 .await?;
1062 Ok(())
1063 }
1064
1065 async fn update_melt_quote_state(
1066 &mut self,
1067 quote_id: &QuoteId,
1068 state: MeltQuoteState,
1069 payment_proof: Option<String>,
1070 ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
1071 let mut quote = query(
1072 r#"
1073 SELECT
1074 id,
1075 unit,
1076 amount,
1077 request,
1078 fee_reserve,
1079 expiry,
1080 state,
1081 payment_preimage,
1082 request_lookup_id,
1083 created_time,
1084 paid_time,
1085 payment_method,
1086 options,
1087 request_lookup_id_kind
1088 FROM
1089 melt_quote
1090 WHERE
1091 id=:id
1092 AND state != :state
1093 "#,
1094 )?
1095 .bind("id", quote_id.to_string())
1096 .bind("state", state.to_string())
1097 .fetch_one(&self.inner)
1098 .await?
1099 .map(sql_row_to_melt_quote)
1100 .transpose()?
1101 .ok_or(Error::QuoteNotFound)?;
1102
1103 check_melt_quote_state_transition(quote.state, state)?;
1104
1105 let rec = if state == MeltQuoteState::Paid {
1106 let current_time = unix_time();
1107 query(r#"UPDATE melt_quote SET state = :state, paid_time = :paid_time, payment_preimage = :payment_preimage WHERE id = :id"#)?
1108 .bind("state", state.to_string())
1109 .bind("paid_time", current_time as i64)
1110 .bind("payment_preimage", payment_proof)
1111 .bind("id", quote_id.to_string())
1112 .execute(&self.inner)
1113 .await
1114 } else {
1115 query(r#"UPDATE melt_quote SET state = :state WHERE id = :id"#)?
1116 .bind("state", state.to_string())
1117 .bind("id", quote_id.to_string())
1118 .execute(&self.inner)
1119 .await
1120 };
1121
1122 match rec {
1123 Ok(_) => {}
1124 Err(err) => {
1125 tracing::error!("SQLite Could not update melt quote");
1126 return Err(err);
1127 }
1128 };
1129
1130 let old_state = quote.state;
1131 quote.state = state;
1132
1133 if state == MeltQuoteState::Unpaid || state == MeltQuoteState::Failed {
1134 self.delete_melt_request(quote_id).await?;
1135 }
1136
1137 Ok((old_state, quote))
1138 }
1139
1140 async fn get_mint_quote(&mut self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
1141 let payments = get_mint_quote_payments(&self.inner, quote_id).await?;
1142 let issuance = get_mint_quote_issuance(&self.inner, quote_id).await?;
1143
1144 Ok(query(
1145 r#"
1146 SELECT
1147 id,
1148 amount,
1149 unit,
1150 request,
1151 expiry,
1152 request_lookup_id,
1153 pubkey,
1154 created_time,
1155 amount_paid,
1156 amount_issued,
1157 payment_method,
1158 request_lookup_id_kind
1159 FROM
1160 mint_quote
1161 WHERE id = :id
1162 FOR UPDATE
1163 "#,
1164 )?
1165 .bind("id", quote_id.to_string())
1166 .fetch_one(&self.inner)
1167 .await?
1168 .map(|row| sql_row_to_mint_quote(row, payments, issuance))
1169 .transpose()?)
1170 }
1171
1172 async fn get_melt_quote(
1173 &mut self,
1174 quote_id: &QuoteId,
1175 ) -> Result<Option<mint::MeltQuote>, Self::Err> {
1176 Ok(query(
1177 r#"
1178 SELECT
1179 id,
1180 unit,
1181 amount,
1182 request,
1183 fee_reserve,
1184 expiry,
1185 state,
1186 payment_preimage,
1187 request_lookup_id,
1188 created_time,
1189 paid_time,
1190 payment_method,
1191 options,
1192 request_lookup_id
1193 FROM
1194 melt_quote
1195 WHERE
1196 id=:id
1197 "#,
1198 )?
1199 .bind("id", quote_id.to_string())
1200 .fetch_one(&self.inner)
1201 .await?
1202 .map(sql_row_to_melt_quote)
1203 .transpose()?)
1204 }
1205
1206 async fn get_mint_quote_by_request(
1207 &mut self,
1208 request: &str,
1209 ) -> Result<Option<MintQuote>, Self::Err> {
1210 let mut mint_quote = query(
1211 r#"
1212 SELECT
1213 id,
1214 amount,
1215 unit,
1216 request,
1217 expiry,
1218 request_lookup_id,
1219 pubkey,
1220 created_time,
1221 amount_paid,
1222 amount_issued,
1223 payment_method,
1224 request_lookup_id_kind
1225 FROM
1226 mint_quote
1227 WHERE request = :request
1228 FOR UPDATE
1229 "#,
1230 )?
1231 .bind("request", request.to_string())
1232 .fetch_one(&self.inner)
1233 .await?
1234 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1235 .transpose()?;
1236
1237 if let Some(quote) = mint_quote.as_mut() {
1238 let payments = get_mint_quote_payments(&self.inner, "e.id).await?;
1239 let issuance = get_mint_quote_issuance(&self.inner, "e.id).await?;
1240 quote.issuance = issuance;
1241 quote.payments = payments;
1242 }
1243
1244 Ok(mint_quote)
1245 }
1246
1247 async fn get_mint_quote_by_request_lookup_id(
1248 &mut self,
1249 request_lookup_id: &PaymentIdentifier,
1250 ) -> Result<Option<MintQuote>, Self::Err> {
1251 let mut mint_quote = query(
1252 r#"
1253 SELECT
1254 id,
1255 amount,
1256 unit,
1257 request,
1258 expiry,
1259 request_lookup_id,
1260 pubkey,
1261 created_time,
1262 amount_paid,
1263 amount_issued,
1264 payment_method,
1265 request_lookup_id_kind
1266 FROM
1267 mint_quote
1268 WHERE request_lookup_id = :request_lookup_id
1269 AND request_lookup_id_kind = :request_lookup_id_kind
1270 FOR UPDATE
1271 "#,
1272 )?
1273 .bind("request_lookup_id", request_lookup_id.to_string())
1274 .bind("request_lookup_id_kind", request_lookup_id.kind())
1275 .fetch_one(&self.inner)
1276 .await?
1277 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1278 .transpose()?;
1279
1280 if let Some(quote) = mint_quote.as_mut() {
1281 let payments = get_mint_quote_payments(&self.inner, "e.id).await?;
1282 let issuance = get_mint_quote_issuance(&self.inner, "e.id).await?;
1283 quote.issuance = issuance;
1284 quote.payments = payments;
1285 }
1286
1287 Ok(mint_quote)
1288 }
1289}
1290
1291#[async_trait]
1292impl<RM> MintQuotesDatabase for SQLMintDatabase<RM>
1293where
1294 RM: DatabasePool + 'static,
1295{
1296 type Err = Error;
1297
1298 async fn get_mint_quote(&self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
1299 #[cfg(feature = "prometheus")]
1300 METRICS.inc_in_flight_requests("get_mint_quote");
1301
1302 #[cfg(feature = "prometheus")]
1303 let start_time = std::time::Instant::now();
1304 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1305
1306 let result = async {
1307 let payments = get_mint_quote_payments(&*conn, quote_id).await?;
1308 let issuance = get_mint_quote_issuance(&*conn, quote_id).await?;
1309
1310 query(
1311 r#"
1312 SELECT
1313 id,
1314 amount,
1315 unit,
1316 request,
1317 expiry,
1318 request_lookup_id,
1319 pubkey,
1320 created_time,
1321 amount_paid,
1322 amount_issued,
1323 payment_method,
1324 request_lookup_id_kind
1325 FROM
1326 mint_quote
1327 WHERE id = :id"#,
1328 )?
1329 .bind("id", quote_id.to_string())
1330 .fetch_one(&*conn)
1331 .await?
1332 .map(|row| sql_row_to_mint_quote(row, payments, issuance))
1333 .transpose()
1334 }
1335 .await;
1336
1337 #[cfg(feature = "prometheus")]
1338 {
1339 let success = result.is_ok();
1340
1341 METRICS.record_mint_operation("get_mint_quote", success);
1342 METRICS.record_mint_operation_histogram(
1343 "get_mint_quote",
1344 success,
1345 start_time.elapsed().as_secs_f64(),
1346 );
1347 METRICS.dec_in_flight_requests("get_mint_quote");
1348 }
1349
1350 result
1351 }
1352
1353 async fn get_mint_quote_by_request(
1354 &self,
1355 request: &str,
1356 ) -> Result<Option<MintQuote>, Self::Err> {
1357 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1358 let mut mint_quote = query(
1359 r#"
1360 SELECT
1361 id,
1362 amount,
1363 unit,
1364 request,
1365 expiry,
1366 request_lookup_id,
1367 pubkey,
1368 created_time,
1369 amount_paid,
1370 amount_issued,
1371 payment_method,
1372 request_lookup_id_kind
1373 FROM
1374 mint_quote
1375 WHERE request = :request"#,
1376 )?
1377 .bind("request", request.to_owned())
1378 .fetch_one(&*conn)
1379 .await?
1380 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1381 .transpose()?;
1382
1383 if let Some(quote) = mint_quote.as_mut() {
1384 let payments = get_mint_quote_payments(&*conn, "e.id).await?;
1385 let issuance = get_mint_quote_issuance(&*conn, "e.id).await?;
1386 quote.issuance = issuance;
1387 quote.payments = payments;
1388 }
1389
1390 Ok(mint_quote)
1391 }
1392
1393 async fn get_mint_quote_by_request_lookup_id(
1394 &self,
1395 request_lookup_id: &PaymentIdentifier,
1396 ) -> Result<Option<MintQuote>, Self::Err> {
1397 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1398 let mut mint_quote = query(
1399 r#"
1400 SELECT
1401 id,
1402 amount,
1403 unit,
1404 request,
1405 expiry,
1406 request_lookup_id,
1407 pubkey,
1408 created_time,
1409 amount_paid,
1410 amount_issued,
1411 payment_method,
1412 request_lookup_id_kind
1413 FROM
1414 mint_quote
1415 WHERE request_lookup_id = :request_lookup_id
1416 AND request_lookup_id_kind = :request_lookup_id_kind
1417 "#,
1418 )?
1419 .bind("request_lookup_id", request_lookup_id.to_string())
1420 .bind("request_lookup_id_kind", request_lookup_id.kind())
1421 .fetch_one(&*conn)
1422 .await?
1423 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1424 .transpose()?;
1425
1426 if let Some(quote) = mint_quote.as_mut() {
1428 let payments = get_mint_quote_payments(&*conn, "e.id).await?;
1429 let issuance = get_mint_quote_issuance(&*conn, "e.id).await?;
1430 quote.issuance = issuance;
1431 quote.payments = payments;
1432 }
1433
1434 Ok(mint_quote)
1435 }
1436
1437 async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
1438 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1439 let mut mint_quotes = query(
1440 r#"
1441 SELECT
1442 id,
1443 amount,
1444 unit,
1445 request,
1446 expiry,
1447 request_lookup_id,
1448 pubkey,
1449 created_time,
1450 amount_paid,
1451 amount_issued,
1452 payment_method,
1453 request_lookup_id_kind
1454 FROM
1455 mint_quote
1456 "#,
1457 )?
1458 .fetch_all(&*conn)
1459 .await?
1460 .into_iter()
1461 .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
1462 .collect::<Result<Vec<_>, _>>()?;
1463
1464 for quote in mint_quotes.as_mut_slice() {
1465 let payments = get_mint_quote_payments(&*conn, "e.id).await?;
1466 let issuance = get_mint_quote_issuance(&*conn, "e.id).await?;
1467 quote.issuance = issuance;
1468 quote.payments = payments;
1469 }
1470
1471 Ok(mint_quotes)
1472 }
1473
1474 async fn get_melt_quote(
1475 &self,
1476 quote_id: &QuoteId,
1477 ) -> Result<Option<mint::MeltQuote>, Self::Err> {
1478 #[cfg(feature = "prometheus")]
1479 METRICS.inc_in_flight_requests("get_melt_quote");
1480
1481 #[cfg(feature = "prometheus")]
1482 let start_time = std::time::Instant::now();
1483 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1484
1485 let result = async {
1486 query(
1487 r#"
1488 SELECT
1489 id,
1490 unit,
1491 amount,
1492 request,
1493 fee_reserve,
1494 expiry,
1495 state,
1496 payment_preimage,
1497 request_lookup_id,
1498 created_time,
1499 paid_time,
1500 payment_method,
1501 options,
1502 request_lookup_id_kind
1503 FROM
1504 melt_quote
1505 WHERE
1506 id=:id
1507 "#,
1508 )?
1509 .bind("id", quote_id.to_string())
1510 .fetch_one(&*conn)
1511 .await?
1512 .map(sql_row_to_melt_quote)
1513 .transpose()
1514 }
1515 .await;
1516
1517 #[cfg(feature = "prometheus")]
1518 {
1519 let success = result.is_ok();
1520
1521 METRICS.record_mint_operation("get_melt_quote", success);
1522 METRICS.record_mint_operation_histogram(
1523 "get_melt_quote",
1524 success,
1525 start_time.elapsed().as_secs_f64(),
1526 );
1527 METRICS.dec_in_flight_requests("get_melt_quote");
1528 }
1529
1530 result
1531 }
1532
1533 async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
1534 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1535 Ok(query(
1536 r#"
1537 SELECT
1538 id,
1539 unit,
1540 amount,
1541 request,
1542 fee_reserve,
1543 expiry,
1544 state,
1545 payment_preimage,
1546 request_lookup_id,
1547 created_time,
1548 paid_time,
1549 payment_method,
1550 options,
1551 request_lookup_id_kind
1552 FROM
1553 melt_quote
1554 "#,
1555 )?
1556 .fetch_all(&*conn)
1557 .await?
1558 .into_iter()
1559 .map(sql_row_to_melt_quote)
1560 .collect::<Result<Vec<_>, _>>()?)
1561 }
1562}
1563
1564#[async_trait]
1565impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
1566where
1567 RM: DatabasePool + 'static,
1568{
1569 type Err = Error;
1570
1571 async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
1572 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1573 let mut proofs = query(
1574 r#"
1575 SELECT
1576 amount,
1577 keyset_id,
1578 secret,
1579 c,
1580 witness,
1581 y
1582 FROM
1583 proof
1584 WHERE
1585 y IN (:ys)
1586 "#,
1587 )?
1588 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
1589 .fetch_all(&*conn)
1590 .await?
1591 .into_iter()
1592 .map(|mut row| {
1593 Ok((
1594 column_as_string!(
1595 row.pop().ok_or(Error::InvalidDbResponse)?,
1596 PublicKey::from_hex,
1597 PublicKey::from_slice
1598 ),
1599 sql_row_to_proof(row)?,
1600 ))
1601 })
1602 .collect::<Result<HashMap<_, _>, Error>>()?;
1603
1604 Ok(ys.iter().map(|y| proofs.remove(y)).collect())
1605 }
1606
1607 async fn get_proof_ys_by_quote_id(
1608 &self,
1609 quote_id: &QuoteId,
1610 ) -> Result<Vec<PublicKey>, Self::Err> {
1611 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1612 Ok(query(
1613 r#"
1614 SELECT
1615 amount,
1616 keyset_id,
1617 secret,
1618 c,
1619 witness
1620 FROM
1621 proof
1622 WHERE
1623 quote_id = :quote_id
1624 "#,
1625 )?
1626 .bind("quote_id", quote_id.to_string())
1627 .fetch_all(&*conn)
1628 .await?
1629 .into_iter()
1630 .map(sql_row_to_proof)
1631 .collect::<Result<Vec<Proof>, _>>()?
1632 .ys()?)
1633 }
1634
1635 async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
1636 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1637 let mut current_states = get_current_states(&*conn, ys).await?;
1638
1639 Ok(ys.iter().map(|y| current_states.remove(y)).collect())
1640 }
1641
1642 async fn get_proofs_by_keyset_id(
1643 &self,
1644 keyset_id: &Id,
1645 ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
1646 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1647 Ok(query(
1648 r#"
1649 SELECT
1650 keyset_id,
1651 amount,
1652 secret,
1653 c,
1654 witness,
1655 state
1656 FROM
1657 proof
1658 WHERE
1659 keyset_id=:keyset_id
1660 "#,
1661 )?
1662 .bind("keyset_id", keyset_id.to_string())
1663 .fetch_all(&*conn)
1664 .await?
1665 .into_iter()
1666 .map(sql_row_to_proof_with_state)
1667 .collect::<Result<Vec<_>, _>>()?
1668 .into_iter()
1669 .unzip())
1670 }
1671
1672 async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
1674 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1675 query(
1676 r#"
1677 SELECT
1678 keyset_id,
1679 total_redeemed as amount
1680 FROM
1681 keyset_amounts
1682 "#,
1683 )?
1684 .fetch_all(&*conn)
1685 .await?
1686 .into_iter()
1687 .map(sql_row_to_hashmap_amount)
1688 .collect()
1689 }
1690}
1691
1692#[async_trait]
1693impl<RM> MintSignatureTransaction<'_> for SQLTransaction<RM>
1694where
1695 RM: DatabasePool + 'static,
1696{
1697 type Err = Error;
1698
1699 async fn add_blind_signatures(
1700 &mut self,
1701 blinded_messages: &[PublicKey],
1702 blind_signatures: &[BlindSignature],
1703 quote_id: Option<QuoteId>,
1704 ) -> Result<(), Self::Err> {
1705 let current_time = unix_time();
1706
1707 if blinded_messages.len() != blind_signatures.len() {
1708 return Err(database::Error::Internal(
1709 "Mismatched array lengths for blinded messages and blind signatures".to_string(),
1710 ));
1711 }
1712
1713 let mut existing_rows = query(
1715 r#"
1716 SELECT blinded_message, c, dleq_e, dleq_s
1717 FROM blind_signature
1718 WHERE blinded_message IN (:blinded_messages)
1719 FOR UPDATE
1720 "#,
1721 )?
1722 .bind_vec(
1723 "blinded_messages",
1724 blinded_messages
1725 .iter()
1726 .map(|message| message.to_bytes().to_vec())
1727 .collect(),
1728 )
1729 .fetch_all(&self.inner)
1730 .await?
1731 .into_iter()
1732 .map(|mut row| {
1733 Ok((
1734 column_as_string!(&row.remove(0), PublicKey::from_hex, PublicKey::from_slice),
1735 (row[0].clone(), row[1].clone(), row[2].clone()),
1736 ))
1737 })
1738 .collect::<Result<HashMap<_, _>, Error>>()?;
1739
1740 for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
1742 match existing_rows.remove(message) {
1743 None => {
1744 query(
1746 r#"
1747 INSERT INTO blind_signature
1748 (blinded_message, amount, keyset_id, c, quote_id, dleq_e, dleq_s, created_time, signed_time)
1749 VALUES
1750 (:blinded_message, :amount, :keyset_id, :c, :quote_id, :dleq_e, :dleq_s, :created_time, :signed_time)
1751 "#,
1752 )?
1753 .bind("blinded_message", message.to_bytes().to_vec())
1754 .bind("amount", u64::from(signature.amount) as i64)
1755 .bind("keyset_id", signature.keyset_id.to_string())
1756 .bind("c", signature.c.to_bytes().to_vec())
1757 .bind("quote_id", quote_id.as_ref().map(|q| q.to_string()))
1758 .bind(
1759 "dleq_e",
1760 signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
1761 )
1762 .bind(
1763 "dleq_s",
1764 signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
1765 )
1766 .bind("created_time", current_time as i64)
1767 .bind("signed_time", current_time as i64)
1768 .execute(&self.inner)
1769 .await?;
1770
1771 query(
1772 r#"
1773 INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
1774 VALUES (:keyset_id, :amount, 0)
1775 ON CONFLICT (keyset_id)
1776 DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
1777 "#,
1778 )?
1779 .bind("amount", u64::from(signature.amount) as i64)
1780 .bind("keyset_id", signature.keyset_id.to_string())
1781 .execute(&self.inner)
1782 .await?;
1783 }
1784 Some((c, _dleq_e, _dleq_s)) => {
1785 match c {
1787 Column::Null => {
1788 query(
1790 r#"
1791 UPDATE blind_signature
1792 SET c = :c, dleq_e = :dleq_e, dleq_s = :dleq_s, signed_time = :signed_time, amount = :amount
1793 WHERE blinded_message = :blinded_message
1794 "#,
1795 )?
1796 .bind("c", signature.c.to_bytes().to_vec())
1797 .bind(
1798 "dleq_e",
1799 signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
1800 )
1801 .bind(
1802 "dleq_s",
1803 signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
1804 )
1805 .bind("blinded_message", message.to_bytes().to_vec())
1806 .bind("signed_time", current_time as i64)
1807 .bind("amount", u64::from(signature.amount) as i64)
1808 .execute(&self.inner)
1809 .await?;
1810
1811 query(
1812 r#"
1813 INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
1814 VALUES (:keyset_id, :amount, 0)
1815 ON CONFLICT (keyset_id)
1816 DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
1817 "#,
1818 )?
1819 .bind("amount", u64::from(signature.amount) as i64)
1820 .bind("keyset_id", signature.keyset_id.to_string())
1821 .execute(&self.inner)
1822 .await?;
1823 }
1824 _ => {
1825 tracing::error!(
1827 "Attempting to add signature to message already signed {}",
1828 message
1829 );
1830
1831 return Err(database::Error::Duplicate);
1832 }
1833 }
1834 }
1835 }
1836 }
1837
1838 debug_assert!(
1839 existing_rows.is_empty(),
1840 "Unexpected existing rows remain: {:?}",
1841 existing_rows.keys().collect::<Vec<_>>()
1842 );
1843
1844 if !existing_rows.is_empty() {
1845 tracing::error!("Did not check all existing rows");
1846 return Err(Error::Internal(
1847 "Did not check all existing rows".to_string(),
1848 ));
1849 }
1850
1851 Ok(())
1852 }
1853
1854 async fn get_blind_signatures(
1855 &mut self,
1856 blinded_messages: &[PublicKey],
1857 ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
1858 let mut blinded_signatures = query(
1859 r#"SELECT
1860 keyset_id,
1861 amount,
1862 c,
1863 dleq_e,
1864 dleq_s,
1865 blinded_message
1866 FROM
1867 blind_signature
1868 WHERE blinded_message IN (:b) AND c IS NOT NULL
1869 "#,
1870 )?
1871 .bind_vec(
1872 "b",
1873 blinded_messages
1874 .iter()
1875 .map(|b| b.to_bytes().to_vec())
1876 .collect(),
1877 )
1878 .fetch_all(&self.inner)
1879 .await?
1880 .into_iter()
1881 .map(|mut row| {
1882 Ok((
1883 column_as_string!(
1884 &row.pop().ok_or(Error::InvalidDbResponse)?,
1885 PublicKey::from_hex,
1886 PublicKey::from_slice
1887 ),
1888 sql_row_to_blind_signature(row)?,
1889 ))
1890 })
1891 .collect::<Result<HashMap<_, _>, Error>>()?;
1892 Ok(blinded_messages
1893 .iter()
1894 .map(|y| blinded_signatures.remove(y))
1895 .collect())
1896 }
1897}
1898
1899#[async_trait]
1900impl<RM> MintSignaturesDatabase for SQLMintDatabase<RM>
1901where
1902 RM: DatabasePool + 'static,
1903{
1904 type Err = Error;
1905
1906 async fn get_blind_signatures(
1907 &self,
1908 blinded_messages: &[PublicKey],
1909 ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
1910 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1911 let mut blinded_signatures = query(
1912 r#"SELECT
1913 keyset_id,
1914 amount,
1915 c,
1916 dleq_e,
1917 dleq_s,
1918 blinded_message
1919 FROM
1920 blind_signature
1921 WHERE blinded_message IN (:b) AND c IS NOT NULL
1922 "#,
1923 )?
1924 .bind_vec(
1925 "b",
1926 blinded_messages
1927 .iter()
1928 .map(|b_| b_.to_bytes().to_vec())
1929 .collect(),
1930 )
1931 .fetch_all(&*conn)
1932 .await?
1933 .into_iter()
1934 .map(|mut row| {
1935 Ok((
1936 column_as_string!(
1937 &row.pop().ok_or(Error::InvalidDbResponse)?,
1938 PublicKey::from_hex,
1939 PublicKey::from_slice
1940 ),
1941 sql_row_to_blind_signature(row)?,
1942 ))
1943 })
1944 .collect::<Result<HashMap<_, _>, Error>>()?;
1945 Ok(blinded_messages
1946 .iter()
1947 .map(|y| blinded_signatures.remove(y))
1948 .collect())
1949 }
1950
1951 async fn get_blind_signatures_for_keyset(
1952 &self,
1953 keyset_id: &Id,
1954 ) -> Result<Vec<BlindSignature>, Self::Err> {
1955 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1956 Ok(query(
1957 r#"
1958 SELECT
1959 keyset_id,
1960 amount,
1961 c,
1962 dleq_e,
1963 dleq_s
1964 FROM
1965 blind_signature
1966 WHERE
1967 keyset_id=:keyset_id AND c IS NOT NULL
1968 "#,
1969 )?
1970 .bind("keyset_id", keyset_id.to_string())
1971 .fetch_all(&*conn)
1972 .await?
1973 .into_iter()
1974 .map(sql_row_to_blind_signature)
1975 .collect::<Result<Vec<BlindSignature>, _>>()?)
1976 }
1977
1978 async fn get_blind_signatures_for_quote(
1980 &self,
1981 quote_id: &QuoteId,
1982 ) -> Result<Vec<BlindSignature>, Self::Err> {
1983 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
1984 Ok(query(
1985 r#"
1986 SELECT
1987 keyset_id,
1988 amount,
1989 c,
1990 dleq_e,
1991 dleq_s
1992 FROM
1993 blind_signature
1994 WHERE
1995 quote_id=:quote_id AND c IS NOT NULL
1996 "#,
1997 )?
1998 .bind("quote_id", quote_id.to_string())
1999 .fetch_all(&*conn)
2000 .await?
2001 .into_iter()
2002 .map(sql_row_to_blind_signature)
2003 .collect::<Result<Vec<BlindSignature>, _>>()?)
2004 }
2005
2006 async fn get_total_issued(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
2008 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2009 query(
2010 r#"
2011 SELECT
2012 keyset_id,
2013 total_issued as amount
2014 FROM
2015 keyset_amounts
2016 "#,
2017 )?
2018 .fetch_all(&*conn)
2019 .await?
2020 .into_iter()
2021 .map(sql_row_to_hashmap_amount)
2022 .collect()
2023 }
2024}
2025
2026#[async_trait]
2027impl<RM> database::MintKVStoreTransaction<'_, Error> for SQLTransaction<RM>
2028where
2029 RM: DatabasePool + 'static,
2030{
2031 async fn kv_read(
2032 &mut self,
2033 primary_namespace: &str,
2034 secondary_namespace: &str,
2035 key: &str,
2036 ) -> Result<Option<Vec<u8>>, Error> {
2037 validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2039 Ok(query(
2040 r#"
2041 SELECT value
2042 FROM kv_store
2043 WHERE primary_namespace = :primary_namespace
2044 AND secondary_namespace = :secondary_namespace
2045 AND key = :key
2046 "#,
2047 )?
2048 .bind("primary_namespace", primary_namespace.to_owned())
2049 .bind("secondary_namespace", secondary_namespace.to_owned())
2050 .bind("key", key.to_owned())
2051 .pluck(&self.inner)
2052 .await?
2053 .and_then(|col| match col {
2054 Column::Blob(data) => Some(data),
2055 _ => None,
2056 }))
2057 }
2058
2059 async fn kv_write(
2060 &mut self,
2061 primary_namespace: &str,
2062 secondary_namespace: &str,
2063 key: &str,
2064 value: &[u8],
2065 ) -> Result<(), Error> {
2066 validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2068
2069 let current_time = unix_time();
2070
2071 query(
2072 r#"
2073 INSERT INTO kv_store
2074 (primary_namespace, secondary_namespace, key, value, created_time, updated_time)
2075 VALUES (:primary_namespace, :secondary_namespace, :key, :value, :created_time, :updated_time)
2076 ON CONFLICT(primary_namespace, secondary_namespace, key)
2077 DO UPDATE SET
2078 value = excluded.value,
2079 updated_time = excluded.updated_time
2080 "#,
2081 )?
2082 .bind("primary_namespace", primary_namespace.to_owned())
2083 .bind("secondary_namespace", secondary_namespace.to_owned())
2084 .bind("key", key.to_owned())
2085 .bind("value", value.to_vec())
2086 .bind("created_time", current_time as i64)
2087 .bind("updated_time", current_time as i64)
2088 .execute(&self.inner)
2089 .await?;
2090
2091 Ok(())
2092 }
2093
2094 async fn kv_remove(
2095 &mut self,
2096 primary_namespace: &str,
2097 secondary_namespace: &str,
2098 key: &str,
2099 ) -> Result<(), Error> {
2100 validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2102 query(
2103 r#"
2104 DELETE FROM kv_store
2105 WHERE primary_namespace = :primary_namespace
2106 AND secondary_namespace = :secondary_namespace
2107 AND key = :key
2108 "#,
2109 )?
2110 .bind("primary_namespace", primary_namespace.to_owned())
2111 .bind("secondary_namespace", secondary_namespace.to_owned())
2112 .bind("key", key.to_owned())
2113 .execute(&self.inner)
2114 .await?;
2115
2116 Ok(())
2117 }
2118
2119 async fn kv_list(
2120 &mut self,
2121 primary_namespace: &str,
2122 secondary_namespace: &str,
2123 ) -> Result<Vec<String>, Error> {
2124 cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
2126 cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
2127
2128 if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
2130 return Err(Error::KVStoreInvalidKey(
2131 "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
2132 ));
2133 }
2134 Ok(query(
2135 r#"
2136 SELECT key
2137 FROM kv_store
2138 WHERE primary_namespace = :primary_namespace
2139 AND secondary_namespace = :secondary_namespace
2140 ORDER BY key
2141 "#,
2142 )?
2143 .bind("primary_namespace", primary_namespace.to_owned())
2144 .bind("secondary_namespace", secondary_namespace.to_owned())
2145 .fetch_all(&self.inner)
2146 .await?
2147 .into_iter()
2148 .map(|row| Ok(column_as_string!(&row[0])))
2149 .collect::<Result<Vec<_>, Error>>()?)
2150 }
2151}
2152
2153#[async_trait]
2154impl<RM> database::MintKVStoreDatabase for SQLMintDatabase<RM>
2155where
2156 RM: DatabasePool + 'static,
2157{
2158 type Err = Error;
2159
2160 async fn kv_read(
2161 &self,
2162 primary_namespace: &str,
2163 secondary_namespace: &str,
2164 key: &str,
2165 ) -> Result<Option<Vec<u8>>, Error> {
2166 validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
2168
2169 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2170 Ok(query(
2171 r#"
2172 SELECT value
2173 FROM kv_store
2174 WHERE primary_namespace = :primary_namespace
2175 AND secondary_namespace = :secondary_namespace
2176 AND key = :key
2177 "#,
2178 )?
2179 .bind("primary_namespace", primary_namespace.to_owned())
2180 .bind("secondary_namespace", secondary_namespace.to_owned())
2181 .bind("key", key.to_owned())
2182 .pluck(&*conn)
2183 .await?
2184 .and_then(|col| match col {
2185 Column::Blob(data) => Some(data),
2186 _ => None,
2187 }))
2188 }
2189
2190 async fn kv_list(
2191 &self,
2192 primary_namespace: &str,
2193 secondary_namespace: &str,
2194 ) -> Result<Vec<String>, Error> {
2195 cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
2197 cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
2198
2199 if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
2201 return Err(Error::KVStoreInvalidKey(
2202 "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
2203 ));
2204 }
2205
2206 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2207 Ok(query(
2208 r#"
2209 SELECT key
2210 FROM kv_store
2211 WHERE primary_namespace = :primary_namespace
2212 AND secondary_namespace = :secondary_namespace
2213 ORDER BY key
2214 "#,
2215 )?
2216 .bind("primary_namespace", primary_namespace.to_owned())
2217 .bind("secondary_namespace", secondary_namespace.to_owned())
2218 .fetch_all(&*conn)
2219 .await?
2220 .into_iter()
2221 .map(|row| Ok(column_as_string!(&row[0])))
2222 .collect::<Result<Vec<_>, Error>>()?)
2223 }
2224}
2225
2226#[async_trait]
2227impl<RM> database::MintKVStore for SQLMintDatabase<RM>
2228where
2229 RM: DatabasePool + 'static,
2230{
2231 async fn begin_transaction<'a>(
2232 &'a self,
2233 ) -> Result<Box<dyn database::MintKVStoreTransaction<'a, Self::Err> + Send + Sync + 'a>, Error>
2234 {
2235 Ok(Box::new(SQLTransaction {
2236 inner: ConnectionWithTransaction::new(
2237 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
2238 )
2239 .await?,
2240 }))
2241 }
2242}
2243
2244#[async_trait]
2245impl<RM> SagaTransaction<'_> for SQLTransaction<RM>
2246where
2247 RM: DatabasePool + 'static,
2248{
2249 type Err = Error;
2250
2251 async fn get_saga(
2252 &mut self,
2253 operation_id: &uuid::Uuid,
2254 ) -> Result<Option<mint::Saga>, Self::Err> {
2255 Ok(query(
2256 r#"
2257 SELECT
2258 operation_id,
2259 operation_kind,
2260 state,
2261 blinded_secrets,
2262 input_ys,
2263 quote_id,
2264 created_at,
2265 updated_at
2266 FROM
2267 saga_state
2268 WHERE
2269 operation_id = :operation_id
2270 FOR UPDATE
2271 "#,
2272 )?
2273 .bind("operation_id", operation_id.to_string())
2274 .fetch_one(&self.inner)
2275 .await?
2276 .map(sql_row_to_saga)
2277 .transpose()?)
2278 }
2279
2280 async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
2281 let current_time = unix_time();
2282
2283 let blinded_secrets_json = serde_json::to_string(&saga.blinded_secrets)
2284 .map_err(|e| Error::Internal(format!("Failed to serialize blinded_secrets: {e}")))?;
2285
2286 let input_ys_json = serde_json::to_string(&saga.input_ys)
2287 .map_err(|e| Error::Internal(format!("Failed to serialize input_ys: {e}")))?;
2288
2289 query(
2290 r#"
2291 INSERT INTO saga_state
2292 (operation_id, operation_kind, state, blinded_secrets, input_ys, quote_id, created_at, updated_at)
2293 VALUES
2294 (:operation_id, :operation_kind, :state, :blinded_secrets, :input_ys, :quote_id, :created_at, :updated_at)
2295 "#,
2296 )?
2297 .bind("operation_id", saga.operation_id.to_string())
2298 .bind("operation_kind", saga.operation_kind.to_string())
2299 .bind("state", saga.state.state())
2300 .bind("blinded_secrets", blinded_secrets_json)
2301 .bind("input_ys", input_ys_json)
2302 .bind("quote_id", saga.quote_id.as_deref())
2303 .bind("created_at", saga.created_at as i64)
2304 .bind("updated_at", current_time as i64)
2305 .execute(&self.inner)
2306 .await?;
2307
2308 Ok(())
2309 }
2310
2311 async fn update_saga(
2312 &mut self,
2313 operation_id: &uuid::Uuid,
2314 new_state: mint::SagaStateEnum,
2315 ) -> Result<(), Self::Err> {
2316 let current_time = unix_time();
2317
2318 query(
2319 r#"
2320 UPDATE saga_state
2321 SET state = :state, updated_at = :updated_at
2322 WHERE operation_id = :operation_id
2323 "#,
2324 )?
2325 .bind("state", new_state.state())
2326 .bind("updated_at", current_time as i64)
2327 .bind("operation_id", operation_id.to_string())
2328 .execute(&self.inner)
2329 .await?;
2330
2331 Ok(())
2332 }
2333
2334 async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
2335 query(
2336 r#"
2337 DELETE FROM saga_state
2338 WHERE operation_id = :operation_id
2339 "#,
2340 )?
2341 .bind("operation_id", operation_id.to_string())
2342 .execute(&self.inner)
2343 .await?;
2344
2345 Ok(())
2346 }
2347}
2348
2349#[async_trait]
2350impl<RM> SagaDatabase for SQLMintDatabase<RM>
2351where
2352 RM: DatabasePool + 'static,
2353{
2354 type Err = Error;
2355
2356 async fn get_incomplete_sagas(
2357 &self,
2358 operation_kind: mint::OperationKind,
2359 ) -> Result<Vec<mint::Saga>, Self::Err> {
2360 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
2361 Ok(query(
2362 r#"
2363 SELECT
2364 operation_id,
2365 operation_kind,
2366 state,
2367 blinded_secrets,
2368 input_ys,
2369 quote_id,
2370 created_at,
2371 updated_at
2372 FROM
2373 saga_state
2374 WHERE
2375 operation_kind = :operation_kind
2376 ORDER BY created_at ASC
2377 "#,
2378 )?
2379 .bind("operation_kind", operation_kind.to_string())
2380 .fetch_all(&*conn)
2381 .await?
2382 .into_iter()
2383 .map(sql_row_to_saga)
2384 .collect::<Result<Vec<_>, _>>()?)
2385 }
2386}
2387
2388#[async_trait]
2389impl<RM> MintDatabase<Error> for SQLMintDatabase<RM>
2390where
2391 RM: DatabasePool + 'static,
2392{
2393 async fn begin_transaction<'a>(
2394 &'a self,
2395 ) -> Result<Box<dyn database::MintTransaction<'a, Error> + Send + Sync + 'a>, Error> {
2396 let tx = SQLTransaction {
2397 inner: ConnectionWithTransaction::new(
2398 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
2399 )
2400 .await?,
2401 };
2402
2403 Ok(Box::new(tx))
2404 }
2405}
2406
2407fn sql_row_to_keyset_info(row: Vec<Column>) -> Result<MintKeySetInfo, Error> {
2408 unpack_into!(
2409 let (
2410 id,
2411 unit,
2412 active,
2413 valid_from,
2414 valid_to,
2415 derivation_path,
2416 derivation_path_index,
2417 max_order,
2418 amounts,
2419 row_keyset_ppk
2420 ) = row
2421 );
2422
2423 let max_order: u8 = column_as_number!(max_order);
2424 let amounts = column_as_nullable_string!(amounts)
2425 .and_then(|str| serde_json::from_str(&str).ok())
2426 .unwrap_or_else(|| (0..max_order).map(|m| 2u64.pow(m.into())).collect());
2427
2428 Ok(MintKeySetInfo {
2429 id: column_as_string!(id, Id::from_str, Id::from_bytes),
2430 unit: column_as_string!(unit, CurrencyUnit::from_str),
2431 active: matches!(active, Column::Integer(1)),
2432 valid_from: column_as_number!(valid_from),
2433 derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
2434 derivation_path_index: column_as_nullable_number!(derivation_path_index),
2435 max_order,
2436 amounts,
2437 input_fee_ppk: column_as_number!(row_keyset_ppk),
2438 final_expiry: column_as_nullable_number!(valid_to),
2439 })
2440}
2441
2442#[instrument(skip_all)]
2443fn sql_row_to_mint_quote(
2444 row: Vec<Column>,
2445 payments: Vec<IncomingPayment>,
2446 issueances: Vec<Issuance>,
2447) -> Result<MintQuote, Error> {
2448 unpack_into!(
2449 let (
2450 id, amount, unit, request, expiry, request_lookup_id,
2451 pubkey, created_time, amount_paid, amount_issued, payment_method, request_lookup_id_kind
2452 ) = row
2453 );
2454
2455 let request_str = column_as_string!(&request);
2456 let request_lookup_id = column_as_nullable_string!(&request_lookup_id).unwrap_or_else(|| {
2457 Bolt11Invoice::from_str(&request_str)
2458 .map(|invoice| invoice.payment_hash().to_string())
2459 .unwrap_or_else(|_| request_str.clone())
2460 });
2461 let request_lookup_id_kind = column_as_string!(request_lookup_id_kind);
2462
2463 let pubkey = column_as_nullable_string!(&pubkey)
2464 .map(|pk| PublicKey::from_hex(&pk))
2465 .transpose()?;
2466
2467 let id = column_as_string!(id);
2468 let amount: Option<u64> = column_as_nullable_number!(amount);
2469 let amount_paid: u64 = column_as_number!(amount_paid);
2470 let amount_issued: u64 = column_as_number!(amount_issued);
2471 let payment_method = column_as_string!(payment_method, PaymentMethod::from_str);
2472
2473 Ok(MintQuote::new(
2474 Some(QuoteId::from_str(&id)?),
2475 request_str,
2476 column_as_string!(unit, CurrencyUnit::from_str),
2477 amount.map(Amount::from),
2478 column_as_number!(expiry),
2479 PaymentIdentifier::new(&request_lookup_id_kind, &request_lookup_id)
2480 .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
2481 pubkey,
2482 amount_paid.into(),
2483 amount_issued.into(),
2484 payment_method,
2485 column_as_number!(created_time),
2486 payments,
2487 issueances,
2488 ))
2489}
2490
2491fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<mint::MeltQuote, Error> {
2492 unpack_into!(
2493 let (
2494 id,
2495 unit,
2496 amount,
2497 request,
2498 fee_reserve,
2499 expiry,
2500 state,
2501 payment_preimage,
2502 request_lookup_id,
2503 created_time,
2504 paid_time,
2505 payment_method,
2506 options,
2507 request_lookup_id_kind
2508 ) = row
2509 );
2510
2511 let id = column_as_string!(id);
2512 let amount: u64 = column_as_number!(amount);
2513 let fee_reserve: u64 = column_as_number!(fee_reserve);
2514
2515 let expiry = column_as_number!(expiry);
2516 let payment_preimage = column_as_nullable_string!(payment_preimage);
2517 let options = column_as_nullable_string!(options);
2518 let options = options.and_then(|o| serde_json::from_str(&o).ok());
2519 let created_time: i64 = column_as_number!(created_time);
2520 let paid_time = column_as_nullable_number!(paid_time);
2521 let payment_method = PaymentMethod::from_str(&column_as_string!(payment_method))?;
2522
2523 let state =
2524 MeltQuoteState::from_str(&column_as_string!(&state)).map_err(ConversionError::from)?;
2525
2526 let unit = column_as_string!(unit);
2527 let request = column_as_string!(request);
2528
2529 let request_lookup_id_kind = column_as_nullable_string!(request_lookup_id_kind);
2530
2531 let request_lookup_id = column_as_nullable_string!(&request_lookup_id).or_else(|| {
2532 Bolt11Invoice::from_str(&request)
2533 .ok()
2534 .map(|invoice| invoice.payment_hash().to_string())
2535 });
2536
2537 let request_lookup_id = if let (Some(id_kind), Some(request_lookup_id)) =
2538 (request_lookup_id_kind, request_lookup_id)
2539 {
2540 Some(
2541 PaymentIdentifier::new(&id_kind, &request_lookup_id)
2542 .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
2543 )
2544 } else {
2545 None
2546 };
2547
2548 let request = match serde_json::from_str(&request) {
2549 Ok(req) => req,
2550 Err(err) => {
2551 tracing::debug!(
2552 "Melt quote from pre migrations defaulting to bolt11 {}.",
2553 err
2554 );
2555 let bolt11 = Bolt11Invoice::from_str(&request).unwrap();
2556 MeltPaymentRequest::Bolt11 { bolt11 }
2557 }
2558 };
2559
2560 Ok(MeltQuote {
2561 id: QuoteId::from_str(&id)?,
2562 unit: CurrencyUnit::from_str(&unit)?,
2563 amount: Amount::from(amount),
2564 request,
2565 fee_reserve: Amount::from(fee_reserve),
2566 state,
2567 expiry,
2568 payment_preimage,
2569 request_lookup_id,
2570 options,
2571 created_time: created_time as u64,
2572 paid_time,
2573 payment_method,
2574 })
2575}
2576
2577fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
2578 unpack_into!(
2579 let (
2580 amount,
2581 keyset_id,
2582 secret,
2583 c,
2584 witness
2585 ) = row
2586 );
2587
2588 let amount: u64 = column_as_number!(amount);
2589 Ok(Proof {
2590 amount: Amount::from(amount),
2591 keyset_id: column_as_string!(keyset_id, Id::from_str),
2592 secret: column_as_string!(secret, Secret::from_str),
2593 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2594 witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
2595 dleq: None,
2596 })
2597}
2598
2599fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
2600 unpack_into!(
2601 let (
2602 keyset_id, amount
2603 ) = row
2604 );
2605
2606 let amount: u64 = column_as_number!(amount);
2607 Ok((
2608 column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2609 Amount::from(amount),
2610 ))
2611}
2612
2613fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, Option<State>), Error> {
2614 unpack_into!(
2615 let (
2616 keyset_id, amount, secret, c, witness, state
2617 ) = row
2618 );
2619
2620 let amount: u64 = column_as_number!(amount);
2621 let state = column_as_nullable_string!(state).and_then(|s| State::from_str(&s).ok());
2622
2623 Ok((
2624 Proof {
2625 amount: Amount::from(amount),
2626 keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2627 secret: column_as_string!(secret, Secret::from_str),
2628 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2629 witness: column_as_nullable_string!(witness)
2630 .and_then(|w| serde_json::from_str(&w).ok()),
2631 dleq: None,
2632 },
2633 state,
2634 ))
2635}
2636
2637fn sql_row_to_blind_signature(row: Vec<Column>) -> Result<BlindSignature, Error> {
2638 unpack_into!(
2639 let (
2640 keyset_id, amount, c, dleq_e, dleq_s
2641 ) = row
2642 );
2643
2644 let dleq = match (
2645 column_as_nullable_string!(dleq_e),
2646 column_as_nullable_string!(dleq_s),
2647 ) {
2648 (Some(e), Some(s)) => Some(BlindSignatureDleq {
2649 e: SecretKey::from_hex(e)?,
2650 s: SecretKey::from_hex(s)?,
2651 }),
2652 _ => None,
2653 };
2654
2655 let amount: u64 = column_as_number!(amount);
2656
2657 Ok(BlindSignature {
2658 amount: Amount::from(amount),
2659 keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
2660 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
2661 dleq,
2662 })
2663}
2664
2665fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
2666 unpack_into!(
2667 let (
2668 operation_id,
2669 operation_kind,
2670 state,
2671 blinded_secrets,
2672 input_ys,
2673 quote_id,
2674 created_at,
2675 updated_at
2676 ) = row
2677 );
2678
2679 let operation_id_str = column_as_string!(&operation_id);
2680 let operation_id = uuid::Uuid::parse_str(&operation_id_str)
2681 .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
2682
2683 let operation_kind_str = column_as_string!(&operation_kind);
2684 let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
2685 .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
2686
2687 let state_str = column_as_string!(&state);
2688 let state = mint::SagaStateEnum::new(operation_kind, &state_str)
2689 .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
2690
2691 let blinded_secrets_str = column_as_string!(&blinded_secrets);
2692 let blinded_secrets: Vec<PublicKey> = serde_json::from_str(&blinded_secrets_str)
2693 .map_err(|e| Error::Internal(format!("Failed to deserialize blinded_secrets: {e}")))?;
2694
2695 let input_ys_str = column_as_string!(&input_ys);
2696 let input_ys: Vec<PublicKey> = serde_json::from_str(&input_ys_str)
2697 .map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {e}")))?;
2698
2699 let quote_id = match "e_id {
2700 Column::Text(s) => {
2701 if s.is_empty() {
2702 None
2703 } else {
2704 Some(s.clone())
2705 }
2706 }
2707 Column::Null => None,
2708 _ => None,
2709 };
2710
2711 let created_at: u64 = column_as_number!(created_at);
2712 let updated_at: u64 = column_as_number!(updated_at);
2713
2714 Ok(mint::Saga {
2715 operation_id,
2716 operation_kind,
2717 state,
2718 blinded_secrets,
2719 input_ys,
2720 quote_id,
2721 created_at,
2722 updated_at,
2723 })
2724}
2725
2726#[cfg(test)]
2727mod test {
2728 use super::*;
2729
2730 mod max_order_to_amounts_migrations {
2731 use super::*;
2732
2733 #[test]
2734 fn legacy_payload() {
2735 let result = sql_row_to_keyset_info(vec![
2736 Column::Text("0083a60439303340".to_owned()),
2737 Column::Text("sat".to_owned()),
2738 Column::Integer(1),
2739 Column::Integer(1749844864),
2740 Column::Null,
2741 Column::Text("0'/0'/0'".to_owned()),
2742 Column::Integer(0),
2743 Column::Integer(32),
2744 Column::Null,
2745 Column::Integer(0),
2746 ]);
2747 assert!(result.is_ok());
2748 }
2749
2750 #[test]
2751 fn migrated_payload() {
2752 let legacy = sql_row_to_keyset_info(vec![
2753 Column::Text("0083a60439303340".to_owned()),
2754 Column::Text("sat".to_owned()),
2755 Column::Integer(1),
2756 Column::Integer(1749844864),
2757 Column::Null,
2758 Column::Text("0'/0'/0'".to_owned()),
2759 Column::Integer(0),
2760 Column::Integer(32),
2761 Column::Null,
2762 Column::Integer(0),
2763 ]);
2764 assert!(legacy.is_ok());
2765
2766 let amounts = (0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
2767 let migrated = sql_row_to_keyset_info(vec![
2768 Column::Text("0083a60439303340".to_owned()),
2769 Column::Text("sat".to_owned()),
2770 Column::Integer(1),
2771 Column::Integer(1749844864),
2772 Column::Null,
2773 Column::Text("0'/0'/0'".to_owned()),
2774 Column::Integer(0),
2775 Column::Integer(32),
2776 Column::Text(serde_json::to_string(&amounts).expect("valid json")),
2777 Column::Integer(0),
2778 ]);
2779 assert!(migrated.is_ok());
2780 assert_eq!(legacy.unwrap(), migrated.unwrap());
2781 }
2782
2783 #[test]
2784 fn amounts_over_max_order() {
2785 let legacy = sql_row_to_keyset_info(vec![
2786 Column::Text("0083a60439303340".to_owned()),
2787 Column::Text("sat".to_owned()),
2788 Column::Integer(1),
2789 Column::Integer(1749844864),
2790 Column::Null,
2791 Column::Text("0'/0'/0'".to_owned()),
2792 Column::Integer(0),
2793 Column::Integer(32),
2794 Column::Null,
2795 Column::Integer(0),
2796 ]);
2797 assert!(legacy.is_ok());
2798
2799 let amounts = (0..16).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
2800 let migrated = sql_row_to_keyset_info(vec![
2801 Column::Text("0083a60439303340".to_owned()),
2802 Column::Text("sat".to_owned()),
2803 Column::Integer(1),
2804 Column::Integer(1749844864),
2805 Column::Null,
2806 Column::Text("0'/0'/0'".to_owned()),
2807 Column::Integer(0),
2808 Column::Integer(32),
2809 Column::Text(serde_json::to_string(&amounts).expect("valid json")),
2810 Column::Integer(0),
2811 ]);
2812 assert!(migrated.is_ok());
2813 let migrated = migrated.unwrap();
2814 assert_ne!(legacy.unwrap(), migrated);
2815 assert_eq!(migrated.amounts.len(), 16);
2816 }
2817 }
2818}