1use std::collections::HashMap;
4use std::str::FromStr;
5
6use async_trait::async_trait;
7use cdk_common::database::mint::Acquired;
8use cdk_common::database::{self, Error, MintProofsDatabase};
9use cdk_common::mint::{Operation, ProofsWithState};
10use cdk_common::nut00::ProofsMethods;
11use cdk_common::quote_id::QuoteId;
12use cdk_common::secret::Secret;
13use cdk_common::util::unix_time;
14use cdk_common::{Amount, Id, Proof, Proofs, PublicKey, State};
15
16use super::{SQLMintDatabase, SQLTransaction};
17use crate::database::DatabaseExecutor;
18use crate::pool::DatabasePool;
19use crate::stmt::{query, Column};
20use crate::{column_as_nullable_string, column_as_number, column_as_string, unpack_into};
21
22pub(super) async fn get_current_states<C>(
23 conn: &C,
24 ys: &[PublicKey],
25 for_update: bool,
26) -> Result<HashMap<PublicKey, State>, Error>
27where
28 C: DatabaseExecutor + Send + Sync,
29{
30 let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
31
32 query(&format!(
33 r#"SELECT y, state FROM proof WHERE y IN (:ys) {}"#,
34 for_update_clause
35 ))?
36 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
37 .fetch_all(conn)
38 .await?
39 .into_iter()
40 .map(|row| {
41 Ok((
42 column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
43 column_as_string!(&row[1], State::from_str),
44 ))
45 })
46 .collect::<Result<HashMap<_, _>, _>>()
47}
48
49pub(super) fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
50 unpack_into!(
51 let (
52 amount,
53 keyset_id,
54 secret,
55 c,
56 witness
57 ) = row
58 );
59
60 let amount: u64 = column_as_number!(amount);
61 Ok(Proof {
62 amount: Amount::from(amount),
63 keyset_id: column_as_string!(keyset_id, Id::from_str),
64 secret: column_as_string!(secret, Secret::from_str),
65 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
66 witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
67 dleq: None,
68 p2pk_e: None,
69 })
70}
71
72pub(super) fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, State), Error> {
73 unpack_into!(
74 let (
75 keyset_id, amount, secret, c, witness, state
76 ) = row
77 );
78
79 let amount: u64 = column_as_number!(amount);
80 let state = column_as_nullable_string!(state)
81 .and_then(|s| State::from_str(&s).ok())
82 .unwrap_or(State::Pending);
83
84 Ok((
85 Proof {
86 amount: Amount::from(amount),
87 keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
88 secret: column_as_string!(secret, Secret::from_str),
89 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
90 witness: column_as_nullable_string!(witness)
91 .and_then(|w| serde_json::from_str(&w).ok()),
92 dleq: None,
93 p2pk_e: None,
94 },
95 state,
96 ))
97}
98
99pub(super) fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
100 unpack_into!(
101 let (
102 keyset_id, amount
103 ) = row
104 );
105
106 let amount: u64 = column_as_number!(amount);
107 Ok((
108 column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
109 Amount::from(amount),
110 ))
111}
112
113#[async_trait]
114impl<RM> database::MintProofsTransaction for SQLTransaction<RM>
115where
116 RM: DatabasePool + 'static,
117{
118 type Err = Error;
119
120 async fn add_proofs(
130 &mut self,
131 proofs: Proofs,
132 quote_id: Option<QuoteId>,
133 operation: &Operation,
134 ) -> Result<Acquired<ProofsWithState>, Self::Err> {
135 let current_time = unix_time();
136
137 match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
140 .bind_vec(
141 "ys",
142 proofs
143 .iter()
144 .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
145 .collect::<Result<_, _>>()?,
146 )?
147 .pluck(&self.inner)
148 .await?
149 .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
150 .transpose()?
151 {
152 Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
153 Some(_) => Err(database::Error::Duplicate),
154 None => Ok(()), }?;
156
157 for proof in &proofs {
158 let y = proof.y()?;
159
160 query(
161 r#"
162 INSERT INTO proof
163 (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
164 VALUES
165 (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
166 "#,
167 )?
168 .bind("y", y.to_bytes().to_vec())
169 .bind("amount", proof.amount.to_i64())
170 .bind("keyset_id", proof.keyset_id.to_string())
171 .bind("secret", proof.secret.to_string())
172 .bind("c", proof.c.to_bytes().to_vec())
173 .bind(
174 "witness",
175 proof.witness.clone().and_then(|w| serde_json::to_string(&w).inspect_err(|e| tracing::error!("Failed to serialize witness: {:?}", e)).ok()),
176 )
177 .bind("state", "UNSPENT".to_string())
178 .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
179 .bind("created_time", current_time as i64)
180 .bind("operation_kind", operation.kind().to_string())
181 .bind("operation_id", operation.id().to_string())
182 .execute(&self.inner)
183 .await?;
184 }
185
186 Ok(ProofsWithState::new(proofs, State::Unspent).into())
187 }
188
189 async fn update_proofs_state(
202 &mut self,
203 proofs: &mut Acquired<ProofsWithState>,
204 new_state: State,
205 ) -> Result<(), Self::Err> {
206 let ys = proofs.ys()?;
207
208 query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
209 .bind("new_state", new_state.to_string())
210 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
211 .execute(&self.inner)
212 .await?;
213
214 if new_state == State::Spent {
215 query(
216 r#"
217 INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
218 SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
219 FROM proof
220 WHERE y IN (:ys)
221 GROUP BY keyset_id
222 ON CONFLICT (keyset_id)
223 DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
224 "#,
225 )?
226 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
227 .execute(&self.inner)
228 .await?;
229 }
230
231 proofs.state = new_state;
232
233 Ok(())
234 }
235
236 async fn remove_proofs(
237 &mut self,
238 ys: &[PublicKey],
239 _quote_id: Option<QuoteId>,
240 ) -> Result<(), Self::Err> {
241 let total_deleted = query(
242 r#"
243 DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
244 "#,
245 )?
246 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
247 .bind_vec("exclude_state", vec![State::Spent.to_string()])?
248 .execute(&self.inner)
249 .await?;
250
251 if total_deleted != ys.len() {
252 let current_states = get_current_states(&self.inner, ys, true).await?;
254
255 let missing_count = ys.len() - current_states.len();
256 let spent_count = current_states
257 .values()
258 .filter(|s| **s == State::Spent)
259 .count();
260
261 if missing_count > 0 {
262 tracing::warn!(
263 "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
264 missing_count,
265 ys.len()
266 );
267 }
268
269 if spent_count > 0 {
270 tracing::warn!(
271 "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
272 spent_count,
273 ys.len()
274 );
275 }
276
277 tracing::debug!(
278 "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
279 ys.len(),
280 total_deleted,
281 missing_count,
282 spent_count
283 );
284
285 return Err(Self::Err::AttemptRemoveSpentProof);
286 }
287
288 Ok(())
289 }
290
291 async fn get_proof_ys_by_quote_id(
292 &mut self,
293 quote_id: &QuoteId,
294 ) -> Result<Vec<PublicKey>, Self::Err> {
295 Ok(query(
296 r#"
297 SELECT
298 amount,
299 keyset_id,
300 secret,
301 c,
302 witness
303 FROM
304 proof
305 WHERE
306 quote_id = :quote_id
307 FOR UPDATE
308 "#,
309 )?
310 .bind("quote_id", quote_id.to_string())
311 .fetch_all(&self.inner)
312 .await?
313 .into_iter()
314 .map(sql_row_to_proof)
315 .collect::<Result<Vec<Proof>, _>>()?
316 .ys()?)
317 }
318
319 async fn get_proof_ys_by_operation_id(
320 &mut self,
321 operation_id: &uuid::Uuid,
322 ) -> Result<Vec<PublicKey>, Self::Err> {
323 Ok(query(
324 r#"
325 SELECT
326 y
327 FROM
328 proof
329 WHERE
330 operation_id = :operation_id
331 "#,
332 )?
333 .bind("operation_id", operation_id.to_string())
334 .fetch_all(&self.inner)
335 .await?
336 .into_iter()
337 .map(|row| -> Result<PublicKey, Error> {
338 Ok(column_as_string!(
339 &row[0],
340 PublicKey::from_hex,
341 PublicKey::from_slice
342 ))
343 })
344 .collect::<Result<Vec<_>, _>>()?)
345 }
346
347 async fn get_proofs(
348 &mut self,
349 ys: &[PublicKey],
350 ) -> Result<Acquired<ProofsWithState>, Self::Err> {
351 let rows = query(
352 r#"
353 SELECT
354 keyset_id,
355 amount,
356 secret,
357 c,
358 witness,
359 state
360 FROM
361 proof
362 WHERE
363 y IN (:ys)
364 FOR UPDATE
365 "#,
366 )?
367 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
368 .fetch_all(&self.inner)
369 .await?;
370
371 if rows.is_empty() || rows.len() != ys.len() {
372 return Err(database::Error::ProofNotFound);
373 }
374
375 let results: Vec<(Proof, State)> = rows
376 .into_iter()
377 .map(sql_row_to_proof_with_state)
378 .collect::<Result<Vec<_>, _>>()?;
379
380 let mut proofs = Vec::with_capacity(results.len());
381 let mut first_state: Option<State> = None;
382
383 for (proof, state) in results {
384 if let Some(first) = first_state {
385 if first != state {
386 return Err(database::Error::Internal(
387 "Proofs have inconsistent states".to_string(),
388 ));
389 }
390 } else {
391 first_state = Some(state);
392 }
393
394 proofs.push(proof);
395 }
396
397 let state = first_state.unwrap_or(State::Unspent);
398 Ok(ProofsWithState::new(proofs, state).into())
399 }
400}
401
402#[async_trait]
403impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
404where
405 RM: DatabasePool + 'static,
406{
407 type Err = Error;
408
409 async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
410 let conn = self
411 .pool
412 .get()
413 .await
414 .map_err(|e| Error::Database(Box::new(e)))?;
415 let mut proofs = query(
416 r#"
417 SELECT
418 amount,
419 keyset_id,
420 secret,
421 c,
422 witness,
423 y
424 FROM
425 proof
426 WHERE
427 y IN (:ys)
428 "#,
429 )?
430 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
431 .fetch_all(&*conn)
432 .await?
433 .into_iter()
434 .map(|mut row| {
435 Ok((
436 column_as_string!(
437 row.pop().ok_or(Error::InvalidDbResponse)?,
438 PublicKey::from_hex,
439 PublicKey::from_slice
440 ),
441 sql_row_to_proof(row)?,
442 ))
443 })
444 .collect::<Result<HashMap<_, _>, Error>>()?;
445
446 Ok(ys.iter().map(|y| proofs.remove(y)).collect())
447 }
448
449 async fn get_proof_ys_by_quote_id(
450 &self,
451 quote_id: &QuoteId,
452 ) -> Result<Vec<PublicKey>, Self::Err> {
453 let conn = self
454 .pool
455 .get()
456 .await
457 .map_err(|e| Error::Database(Box::new(e)))?;
458 Ok(query(
459 r#"
460 SELECT
461 amount,
462 keyset_id,
463 secret,
464 c,
465 witness
466 FROM
467 proof
468 WHERE
469 quote_id = :quote_id
470 "#,
471 )?
472 .bind("quote_id", quote_id.to_string())
473 .fetch_all(&*conn)
474 .await?
475 .into_iter()
476 .map(sql_row_to_proof)
477 .collect::<Result<Vec<Proof>, _>>()?
478 .ys()?)
479 }
480
481 async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
482 let conn = self
483 .pool
484 .get()
485 .await
486 .map_err(|e| Error::Database(Box::new(e)))?;
487 let mut current_states = get_current_states(&*conn, ys, false).await?;
488
489 Ok(ys.iter().map(|y| current_states.remove(y)).collect())
490 }
491
492 async fn get_proofs_by_keyset_id(
493 &self,
494 keyset_id: &Id,
495 ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
496 let conn = self
497 .pool
498 .get()
499 .await
500 .map_err(|e| Error::Database(Box::new(e)))?;
501
502 let (proofs, states): (Vec<Proof>, Vec<State>) = query(
503 r#"
504 SELECT
505 keyset_id,
506 amount,
507 secret,
508 c,
509 witness,
510 state
511 FROM
512 proof
513 WHERE
514 keyset_id=:keyset_id
515 "#,
516 )?
517 .bind("keyset_id", keyset_id.to_string())
518 .fetch_all(&*conn)
519 .await?
520 .into_iter()
521 .map(sql_row_to_proof_with_state)
522 .collect::<Result<Vec<_>, _>>()?
523 .into_iter()
524 .unzip();
525
526 Ok((proofs, states.into_iter().map(Some).collect()))
527 }
528
529 async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
531 let conn = self
532 .pool
533 .get()
534 .await
535 .map_err(|e| Error::Database(Box::new(e)))?;
536 query(
537 r#"
538 SELECT
539 keyset_id,
540 total_redeemed as amount
541 FROM
542 keyset_amounts
543 "#,
544 )?
545 .fetch_all(&*conn)
546 .await?
547 .into_iter()
548 .map(sql_row_to_hashmap_amount)
549 .collect()
550 }
551
552 async fn get_proof_ys_by_operation_id(
553 &self,
554 operation_id: &uuid::Uuid,
555 ) -> Result<Vec<PublicKey>, Self::Err> {
556 let conn = self
557 .pool
558 .get()
559 .await
560 .map_err(|e| Error::Database(Box::new(e)))?;
561 query(
562 r#"
563 SELECT
564 y
565 FROM
566 proof
567 WHERE
568 operation_id = :operation_id
569 "#,
570 )?
571 .bind("operation_id", operation_id.to_string())
572 .fetch_all(&*conn)
573 .await?
574 .into_iter()
575 .map(|row| -> Result<PublicKey, Error> {
576 Ok(column_as_string!(
577 &row[0],
578 PublicKey::from_hex,
579 PublicKey::from_slice
580 ))
581 })
582 .collect::<Result<Vec<_>, _>>()
583 }
584}