1use std::collections::HashMap;
4use std::str::FromStr;
5
6use async_trait::async_trait;
7use cdk_common::database::mint::Acquired;
8use cdk_common::database::{self, Error, MintProofsDatabase};
9use cdk_common::mint::{Operation, ProofsWithState};
10use cdk_common::nut00::ProofsMethods;
11use cdk_common::quote_id::QuoteId;
12use cdk_common::secret::Secret;
13use cdk_common::util::unix_time;
14use cdk_common::{Amount, Id, Proof, Proofs, PublicKey, State};
15
16use super::{SQLMintDatabase, SQLTransaction};
17use crate::database::DatabaseExecutor;
18use crate::pool::DatabasePool;
19use crate::stmt::{query, Column};
20use crate::{column_as_nullable_string, column_as_number, column_as_string, unpack_into};
21
22pub(super) async fn get_current_states<C>(
23 conn: &C,
24 ys: &[PublicKey],
25 for_update: bool,
26) -> Result<HashMap<PublicKey, State>, Error>
27where
28 C: DatabaseExecutor + Send + Sync,
29{
30 if ys.is_empty() {
31 return Ok(Default::default());
32 }
33 let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
34
35 query(&format!(
36 r#"SELECT y, state FROM proof WHERE y IN (:ys) {}"#,
37 for_update_clause
38 ))?
39 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
40 .fetch_all(conn)
41 .await?
42 .into_iter()
43 .map(|row| {
44 Ok((
45 column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
46 column_as_string!(&row[1], State::from_str),
47 ))
48 })
49 .collect::<Result<HashMap<_, _>, _>>()
50}
51
52pub(super) fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
53 unpack_into!(
54 let (
55 amount,
56 keyset_id,
57 secret,
58 c,
59 witness
60 ) = row
61 );
62
63 let amount: u64 = column_as_number!(amount);
64 Ok(Proof {
65 amount: Amount::from(amount),
66 keyset_id: column_as_string!(keyset_id, Id::from_str),
67 secret: column_as_string!(secret, Secret::from_str),
68 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
69 witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
70 dleq: None,
71 p2pk_e: None,
72 })
73}
74
75pub(super) fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, State), Error> {
76 unpack_into!(
77 let (
78 keyset_id, amount, secret, c, witness, state
79 ) = row
80 );
81
82 let amount: u64 = column_as_number!(amount);
83 let state = column_as_nullable_string!(state)
84 .and_then(|s| State::from_str(&s).ok())
85 .unwrap_or(State::Pending);
86
87 Ok((
88 Proof {
89 amount: Amount::from(amount),
90 keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
91 secret: column_as_string!(secret, Secret::from_str),
92 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
93 witness: column_as_nullable_string!(witness)
94 .and_then(|w| serde_json::from_str(&w).ok()),
95 dleq: None,
96 p2pk_e: None,
97 },
98 state,
99 ))
100}
101
102pub(super) fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
103 unpack_into!(
104 let (
105 keyset_id, amount
106 ) = row
107 );
108
109 let amount: u64 = column_as_number!(amount);
110 Ok((
111 column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
112 Amount::from(amount),
113 ))
114}
115
116#[async_trait]
117impl<RM> database::MintProofsTransaction for SQLTransaction<RM>
118where
119 RM: DatabasePool + 'static,
120{
121 type Err = Error;
122
123 async fn add_proofs(
133 &mut self,
134 proofs: Proofs,
135 quote_id: Option<QuoteId>,
136 operation: &Operation,
137 ) -> Result<Acquired<ProofsWithState>, Self::Err> {
138 let current_time = unix_time();
139
140 match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
143 .bind_vec(
144 "ys",
145 proofs
146 .iter()
147 .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
148 .collect::<Result<_, _>>()?,
149 )
150 .pluck(&self.inner)
151 .await?
152 .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
153 .transpose()?
154 {
155 Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
156 Some(_) => Err(database::Error::Duplicate),
157 None => Ok(()), }?;
159
160 for proof in &proofs {
161 let y = proof.y()?;
162
163 query(
164 r#"
165 INSERT INTO proof
166 (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
167 VALUES
168 (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
169 "#,
170 )?
171 .bind("y", y.to_bytes().to_vec())
172 .bind("amount", proof.amount.to_i64())
173 .bind("keyset_id", proof.keyset_id.to_string())
174 .bind("secret", proof.secret.to_string())
175 .bind("c", proof.c.to_bytes().to_vec())
176 .bind(
177 "witness",
178 proof.witness.clone().and_then(|w| serde_json::to_string(&w).inspect_err(|e| tracing::error!("Failed to serialize witness: {:?}", e)).ok()),
179 )
180 .bind("state", "UNSPENT".to_string())
181 .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
182 .bind("created_time", current_time as i64)
183 .bind("operation_kind", operation.kind().to_string())
184 .bind("operation_id", operation.id().to_string())
185 .execute(&self.inner)
186 .await?;
187 }
188
189 Ok(ProofsWithState::new(proofs, State::Unspent).into())
190 }
191
192 async fn update_proofs_state(
205 &mut self,
206 proofs: &mut Acquired<ProofsWithState>,
207 new_state: State,
208 ) -> Result<(), Self::Err> {
209 let ys = proofs.ys()?;
210
211 query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
212 .bind("new_state", new_state.to_string())
213 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
214 .execute(&self.inner)
215 .await?;
216
217 if new_state == State::Spent {
218 query(
219 r#"
220 INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
221 SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
222 FROM proof
223 WHERE y IN (:ys)
224 GROUP BY keyset_id
225 ON CONFLICT (keyset_id)
226 DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
227 "#,
228 )?
229 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
230 .execute(&self.inner)
231 .await?;
232 }
233
234 proofs.state = new_state;
235
236 Ok(())
237 }
238
239 async fn remove_proofs(
240 &mut self,
241 ys: &[PublicKey],
242 _quote_id: Option<QuoteId>,
243 ) -> Result<(), Self::Err> {
244 if ys.is_empty() {
245 return Ok(());
246 }
247 let total_deleted = query(
248 r#"
249 DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
250 "#,
251 )?
252 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
253 .bind_vec("exclude_state", vec![State::Spent.to_string()])
254 .execute(&self.inner)
255 .await?;
256
257 if total_deleted != ys.len() {
258 let current_states = get_current_states(&self.inner, ys, true).await?;
260
261 let missing_count = ys.len() - current_states.len();
262 let spent_count = current_states
263 .values()
264 .filter(|s| **s == State::Spent)
265 .count();
266
267 if missing_count > 0 {
268 tracing::warn!(
269 "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
270 missing_count,
271 ys.len()
272 );
273 }
274
275 if spent_count > 0 {
276 tracing::warn!(
277 "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
278 spent_count,
279 ys.len()
280 );
281 }
282
283 tracing::debug!(
284 "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
285 ys.len(),
286 total_deleted,
287 missing_count,
288 spent_count
289 );
290
291 return Err(Self::Err::AttemptRemoveSpentProof);
292 }
293
294 Ok(())
295 }
296
297 async fn get_proof_ys_by_quote_id(
298 &mut self,
299 quote_id: &QuoteId,
300 ) -> Result<Vec<PublicKey>, Self::Err> {
301 Ok(query(
302 r#"
303 SELECT
304 amount,
305 keyset_id,
306 secret,
307 c,
308 witness
309 FROM
310 proof
311 WHERE
312 quote_id = :quote_id
313 FOR UPDATE
314 "#,
315 )?
316 .bind("quote_id", quote_id.to_string())
317 .fetch_all(&self.inner)
318 .await?
319 .into_iter()
320 .map(sql_row_to_proof)
321 .collect::<Result<Vec<Proof>, _>>()?
322 .ys()?)
323 }
324
325 async fn get_proof_ys_by_operation_id(
326 &mut self,
327 operation_id: &uuid::Uuid,
328 ) -> Result<Vec<PublicKey>, Self::Err> {
329 Ok(query(
330 r#"
331 SELECT
332 y
333 FROM
334 proof
335 WHERE
336 operation_id = :operation_id
337 "#,
338 )?
339 .bind("operation_id", operation_id.to_string())
340 .fetch_all(&self.inner)
341 .await?
342 .into_iter()
343 .map(|row| -> Result<PublicKey, Error> {
344 Ok(column_as_string!(
345 &row[0],
346 PublicKey::from_hex,
347 PublicKey::from_slice
348 ))
349 })
350 .collect::<Result<Vec<_>, _>>()?)
351 }
352
353 async fn get_proofs(
354 &mut self,
355 ys: &[PublicKey],
356 ) -> Result<Acquired<ProofsWithState>, Self::Err> {
357 if ys.is_empty() {
358 return Err(database::Error::ProofNotFound);
359 }
360
361 let rows = query(
362 r#"
363 SELECT
364 keyset_id,
365 amount,
366 secret,
367 c,
368 witness,
369 state
370 FROM
371 proof
372 WHERE
373 y IN (:ys)
374 FOR UPDATE
375 "#,
376 )?
377 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
378 .fetch_all(&self.inner)
379 .await?;
380
381 if rows.is_empty() || rows.len() != ys.len() {
382 return Err(database::Error::ProofNotFound);
383 }
384
385 let results: Vec<(Proof, State)> = rows
386 .into_iter()
387 .map(sql_row_to_proof_with_state)
388 .collect::<Result<Vec<_>, _>>()?;
389
390 let mut proofs = Vec::with_capacity(results.len());
391 let mut first_state: Option<State> = None;
392
393 for (proof, state) in results {
394 if let Some(first) = first_state {
395 if first != state {
396 return Err(database::Error::Internal(
397 "Proofs have inconsistent states".to_string(),
398 ));
399 }
400 } else {
401 first_state = Some(state);
402 }
403
404 proofs.push(proof);
405 }
406
407 let state = first_state.unwrap_or(State::Unspent);
408 Ok(ProofsWithState::new(proofs, state).into())
409 }
410}
411
412#[async_trait]
413impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
414where
415 RM: DatabasePool + 'static,
416{
417 type Err = Error;
418
419 async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
420 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
421 let mut proofs = query(
422 r#"
423 SELECT
424 amount,
425 keyset_id,
426 secret,
427 c,
428 witness,
429 y
430 FROM
431 proof
432 WHERE
433 y IN (:ys)
434 "#,
435 )?
436 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
437 .fetch_all(&*conn)
438 .await?
439 .into_iter()
440 .map(|mut row| {
441 Ok((
442 column_as_string!(
443 row.pop().ok_or(Error::InvalidDbResponse)?,
444 PublicKey::from_hex,
445 PublicKey::from_slice
446 ),
447 sql_row_to_proof(row)?,
448 ))
449 })
450 .collect::<Result<HashMap<_, _>, Error>>()?;
451
452 Ok(ys.iter().map(|y| proofs.remove(y)).collect())
453 }
454
455 async fn get_proof_ys_by_quote_id(
456 &self,
457 quote_id: &QuoteId,
458 ) -> Result<Vec<PublicKey>, Self::Err> {
459 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
460 Ok(query(
461 r#"
462 SELECT
463 amount,
464 keyset_id,
465 secret,
466 c,
467 witness
468 FROM
469 proof
470 WHERE
471 quote_id = :quote_id
472 "#,
473 )?
474 .bind("quote_id", quote_id.to_string())
475 .fetch_all(&*conn)
476 .await?
477 .into_iter()
478 .map(sql_row_to_proof)
479 .collect::<Result<Vec<Proof>, _>>()?
480 .ys()?)
481 }
482
483 async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
484 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
485 let mut current_states = get_current_states(&*conn, ys, false).await?;
486
487 Ok(ys.iter().map(|y| current_states.remove(y)).collect())
488 }
489
490 async fn get_proofs_by_keyset_id(
491 &self,
492 keyset_id: &Id,
493 ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
494 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
495
496 let (proofs, states): (Vec<Proof>, Vec<State>) = query(
497 r#"
498 SELECT
499 keyset_id,
500 amount,
501 secret,
502 c,
503 witness,
504 state
505 FROM
506 proof
507 WHERE
508 keyset_id=:keyset_id
509 "#,
510 )?
511 .bind("keyset_id", keyset_id.to_string())
512 .fetch_all(&*conn)
513 .await?
514 .into_iter()
515 .map(sql_row_to_proof_with_state)
516 .collect::<Result<Vec<_>, _>>()?
517 .into_iter()
518 .unzip();
519
520 Ok((proofs, states.into_iter().map(Some).collect()))
521 }
522
523 async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
525 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
526 query(
527 r#"
528 SELECT
529 keyset_id,
530 total_redeemed as amount
531 FROM
532 keyset_amounts
533 "#,
534 )?
535 .fetch_all(&*conn)
536 .await?
537 .into_iter()
538 .map(sql_row_to_hashmap_amount)
539 .collect()
540 }
541
542 async fn get_proof_ys_by_operation_id(
543 &self,
544 operation_id: &uuid::Uuid,
545 ) -> Result<Vec<PublicKey>, Self::Err> {
546 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
547 query(
548 r#"
549 SELECT
550 y
551 FROM
552 proof
553 WHERE
554 operation_id = :operation_id
555 "#,
556 )?
557 .bind("operation_id", operation_id.to_string())
558 .fetch_all(&*conn)
559 .await?
560 .into_iter()
561 .map(|row| -> Result<PublicKey, Error> {
562 Ok(column_as_string!(
563 &row[0],
564 PublicKey::from_hex,
565 PublicKey::from_slice
566 ))
567 })
568 .collect::<Result<Vec<_>, _>>()
569 }
570}