1use std::collections::HashMap;
4use std::str::FromStr;
5
6use async_trait::async_trait;
7use cdk_common::database::mint::Acquired;
8use cdk_common::database::{self, Error, MintProofsDatabase};
9use cdk_common::mint::{Operation, ProofsWithState};
10use cdk_common::nut00::ProofsMethods;
11use cdk_common::quote_id::QuoteId;
12use cdk_common::secret::Secret;
13use cdk_common::util::unix_time;
14use cdk_common::{Amount, Id, Proof, Proofs, PublicKey, State};
15
16use super::{SQLMintDatabase, SQLTransaction};
17use crate::database::DatabaseExecutor;
18use crate::pool::DatabasePool;
19use crate::stmt::{query, Column};
20use crate::{column_as_nullable_string, column_as_number, column_as_string, unpack_into};
21
22pub(super) async fn get_current_states<C>(
23 conn: &C,
24 ys: &[PublicKey],
25 for_update: bool,
26) -> Result<HashMap<PublicKey, State>, Error>
27where
28 C: DatabaseExecutor + Send + Sync,
29{
30 if ys.is_empty() {
31 return Ok(Default::default());
32 }
33 let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
34
35 query(&format!(
36 r#"SELECT y, state FROM proof WHERE y IN (:ys) {}"#,
37 for_update_clause
38 ))?
39 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
40 .fetch_all(conn)
41 .await?
42 .into_iter()
43 .map(|row| {
44 Ok((
45 column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
46 column_as_string!(&row[1], State::from_str),
47 ))
48 })
49 .collect::<Result<HashMap<_, _>, _>>()
50}
51
52pub(super) fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
53 unpack_into!(
54 let (
55 amount,
56 keyset_id,
57 secret,
58 c,
59 witness
60 ) = row
61 );
62
63 let amount: u64 = column_as_number!(amount);
64 Ok(Proof {
65 amount: Amount::from(amount),
66 keyset_id: column_as_string!(keyset_id, Id::from_str),
67 secret: column_as_string!(secret, Secret::from_str),
68 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
69 witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
70 dleq: None,
71 })
72}
73
74pub(super) fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, State), Error> {
75 unpack_into!(
76 let (
77 keyset_id, amount, secret, c, witness, state
78 ) = row
79 );
80
81 let amount: u64 = column_as_number!(amount);
82 let state = column_as_nullable_string!(state)
83 .and_then(|s| State::from_str(&s).ok())
84 .unwrap_or(State::Pending);
85
86 Ok((
87 Proof {
88 amount: Amount::from(amount),
89 keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
90 secret: column_as_string!(secret, Secret::from_str),
91 c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
92 witness: column_as_nullable_string!(witness)
93 .and_then(|w| serde_json::from_str(&w).ok()),
94 dleq: None,
95 },
96 state,
97 ))
98}
99
100pub(super) fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
101 unpack_into!(
102 let (
103 keyset_id, amount
104 ) = row
105 );
106
107 let amount: u64 = column_as_number!(amount);
108 Ok((
109 column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
110 Amount::from(amount),
111 ))
112}
113
114#[async_trait]
115impl<RM> database::MintProofsTransaction for SQLTransaction<RM>
116where
117 RM: DatabasePool + 'static,
118{
119 type Err = Error;
120
121 async fn add_proofs(
131 &mut self,
132 proofs: Proofs,
133 quote_id: Option<QuoteId>,
134 operation: &Operation,
135 ) -> Result<Acquired<ProofsWithState>, Self::Err> {
136 let current_time = unix_time();
137
138 match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
141 .bind_vec(
142 "ys",
143 proofs
144 .iter()
145 .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
146 .collect::<Result<_, _>>()?,
147 )
148 .pluck(&self.inner)
149 .await?
150 .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
151 .transpose()?
152 {
153 Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
154 Some(_) => Err(database::Error::Duplicate),
155 None => Ok(()), }?;
157
158 for proof in &proofs {
159 let y = proof.y()?;
160
161 query(
162 r#"
163 INSERT INTO proof
164 (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
165 VALUES
166 (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
167 "#,
168 )?
169 .bind("y", y.to_bytes().to_vec())
170 .bind("amount", proof.amount.to_i64())
171 .bind("keyset_id", proof.keyset_id.to_string())
172 .bind("secret", proof.secret.to_string())
173 .bind("c", proof.c.to_bytes().to_vec())
174 .bind(
175 "witness",
176 proof.witness.clone().and_then(|w| serde_json::to_string(&w).inspect_err(|e| tracing::error!("Failed to serialize witness: {:?}", e)).ok()),
177 )
178 .bind("state", "UNSPENT".to_string())
179 .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
180 .bind("created_time", current_time as i64)
181 .bind("operation_kind", operation.kind().to_string())
182 .bind("operation_id", operation.id().to_string())
183 .execute(&self.inner)
184 .await?;
185 }
186
187 Ok(ProofsWithState::new(proofs, State::Unspent).into())
188 }
189
190 async fn update_proofs_state(
203 &mut self,
204 proofs: &mut Acquired<ProofsWithState>,
205 new_state: State,
206 ) -> Result<(), Self::Err> {
207 let ys = proofs.ys()?;
208
209 query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
210 .bind("new_state", new_state.to_string())
211 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
212 .execute(&self.inner)
213 .await?;
214
215 if new_state == State::Spent {
216 query(
217 r#"
218 INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
219 SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
220 FROM proof
221 WHERE y IN (:ys)
222 GROUP BY keyset_id
223 ON CONFLICT (keyset_id)
224 DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
225 "#,
226 )?
227 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
228 .execute(&self.inner)
229 .await?;
230 }
231
232 proofs.state = new_state;
233
234 Ok(())
235 }
236
237 async fn remove_proofs(
238 &mut self,
239 ys: &[PublicKey],
240 _quote_id: Option<QuoteId>,
241 ) -> Result<(), Self::Err> {
242 if ys.is_empty() {
243 return Ok(());
244 }
245 let total_deleted = query(
246 r#"
247 DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
248 "#,
249 )?
250 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
251 .bind_vec("exclude_state", vec![State::Spent.to_string()])
252 .execute(&self.inner)
253 .await?;
254
255 if total_deleted != ys.len() {
256 let current_states = get_current_states(&self.inner, ys, true).await?;
258
259 let missing_count = ys.len() - current_states.len();
260 let spent_count = current_states
261 .values()
262 .filter(|s| **s == State::Spent)
263 .count();
264
265 if missing_count > 0 {
266 tracing::warn!(
267 "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
268 missing_count,
269 ys.len()
270 );
271 }
272
273 if spent_count > 0 {
274 tracing::warn!(
275 "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
276 spent_count,
277 ys.len()
278 );
279 }
280
281 tracing::debug!(
282 "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
283 ys.len(),
284 total_deleted,
285 missing_count,
286 spent_count
287 );
288
289 return Err(Self::Err::AttemptRemoveSpentProof);
290 }
291
292 Ok(())
293 }
294
295 async fn get_proof_ys_by_quote_id(
296 &mut self,
297 quote_id: &QuoteId,
298 ) -> Result<Vec<PublicKey>, Self::Err> {
299 Ok(query(
300 r#"
301 SELECT
302 amount,
303 keyset_id,
304 secret,
305 c,
306 witness
307 FROM
308 proof
309 WHERE
310 quote_id = :quote_id
311 FOR UPDATE
312 "#,
313 )?
314 .bind("quote_id", quote_id.to_string())
315 .fetch_all(&self.inner)
316 .await?
317 .into_iter()
318 .map(sql_row_to_proof)
319 .collect::<Result<Vec<Proof>, _>>()?
320 .ys()?)
321 }
322
323 async fn get_proof_ys_by_operation_id(
324 &mut self,
325 operation_id: &uuid::Uuid,
326 ) -> Result<Vec<PublicKey>, Self::Err> {
327 Ok(query(
328 r#"
329 SELECT
330 y
331 FROM
332 proof
333 WHERE
334 operation_id = :operation_id
335 "#,
336 )?
337 .bind("operation_id", operation_id.to_string())
338 .fetch_all(&self.inner)
339 .await?
340 .into_iter()
341 .map(|row| -> Result<PublicKey, Error> {
342 Ok(column_as_string!(
343 &row[0],
344 PublicKey::from_hex,
345 PublicKey::from_slice
346 ))
347 })
348 .collect::<Result<Vec<_>, _>>()?)
349 }
350
351 async fn get_proofs(
352 &mut self,
353 ys: &[PublicKey],
354 ) -> Result<Acquired<ProofsWithState>, Self::Err> {
355 if ys.is_empty() {
356 return Err(database::Error::ProofNotFound);
357 }
358
359 let rows = query(
360 r#"
361 SELECT
362 keyset_id,
363 amount,
364 secret,
365 c,
366 witness,
367 state
368 FROM
369 proof
370 WHERE
371 y IN (:ys)
372 FOR UPDATE
373 "#,
374 )?
375 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
376 .fetch_all(&self.inner)
377 .await?;
378
379 if rows.is_empty() || rows.len() != ys.len() {
380 return Err(database::Error::ProofNotFound);
381 }
382
383 let results: Vec<(Proof, State)> = rows
384 .into_iter()
385 .map(sql_row_to_proof_with_state)
386 .collect::<Result<Vec<_>, _>>()?;
387
388 let mut proofs = Vec::with_capacity(results.len());
389 let mut first_state: Option<State> = None;
390
391 for (proof, state) in results {
392 if let Some(first) = first_state {
393 if first != state {
394 return Err(database::Error::Internal(
395 "Proofs have inconsistent states".to_string(),
396 ));
397 }
398 } else {
399 first_state = Some(state);
400 }
401
402 proofs.push(proof);
403 }
404
405 let state = first_state.unwrap_or(State::Unspent);
406 Ok(ProofsWithState::new(proofs, state).into())
407 }
408}
409
410#[async_trait]
411impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
412where
413 RM: DatabasePool + 'static,
414{
415 type Err = Error;
416
417 async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
418 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
419 let mut proofs = query(
420 r#"
421 SELECT
422 amount,
423 keyset_id,
424 secret,
425 c,
426 witness,
427 y
428 FROM
429 proof
430 WHERE
431 y IN (:ys)
432 "#,
433 )?
434 .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
435 .fetch_all(&*conn)
436 .await?
437 .into_iter()
438 .map(|mut row| {
439 Ok((
440 column_as_string!(
441 row.pop().ok_or(Error::InvalidDbResponse)?,
442 PublicKey::from_hex,
443 PublicKey::from_slice
444 ),
445 sql_row_to_proof(row)?,
446 ))
447 })
448 .collect::<Result<HashMap<_, _>, Error>>()?;
449
450 Ok(ys.iter().map(|y| proofs.remove(y)).collect())
451 }
452
453 async fn get_proof_ys_by_quote_id(
454 &self,
455 quote_id: &QuoteId,
456 ) -> Result<Vec<PublicKey>, Self::Err> {
457 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
458 Ok(query(
459 r#"
460 SELECT
461 amount,
462 keyset_id,
463 secret,
464 c,
465 witness
466 FROM
467 proof
468 WHERE
469 quote_id = :quote_id
470 "#,
471 )?
472 .bind("quote_id", quote_id.to_string())
473 .fetch_all(&*conn)
474 .await?
475 .into_iter()
476 .map(sql_row_to_proof)
477 .collect::<Result<Vec<Proof>, _>>()?
478 .ys()?)
479 }
480
481 async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
482 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
483 let mut current_states = get_current_states(&*conn, ys, false).await?;
484
485 Ok(ys.iter().map(|y| current_states.remove(y)).collect())
486 }
487
488 async fn get_proofs_by_keyset_id(
489 &self,
490 keyset_id: &Id,
491 ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
492 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
493
494 let (proofs, states): (Vec<Proof>, Vec<State>) = query(
495 r#"
496 SELECT
497 keyset_id,
498 amount,
499 secret,
500 c,
501 witness,
502 state
503 FROM
504 proof
505 WHERE
506 keyset_id=:keyset_id
507 "#,
508 )?
509 .bind("keyset_id", keyset_id.to_string())
510 .fetch_all(&*conn)
511 .await?
512 .into_iter()
513 .map(sql_row_to_proof_with_state)
514 .collect::<Result<Vec<_>, _>>()?
515 .into_iter()
516 .unzip();
517
518 Ok((proofs, states.into_iter().map(Some).collect()))
519 }
520
521 async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
523 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
524 query(
525 r#"
526 SELECT
527 keyset_id,
528 total_redeemed as amount
529 FROM
530 keyset_amounts
531 "#,
532 )?
533 .fetch_all(&*conn)
534 .await?
535 .into_iter()
536 .map(sql_row_to_hashmap_amount)
537 .collect()
538 }
539
540 async fn get_proof_ys_by_operation_id(
541 &self,
542 operation_id: &uuid::Uuid,
543 ) -> Result<Vec<PublicKey>, Self::Err> {
544 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
545 query(
546 r#"
547 SELECT
548 y
549 FROM
550 proof
551 WHERE
552 operation_id = :operation_id
553 "#,
554 )?
555 .bind("operation_id", operation_id.to_string())
556 .fetch_all(&*conn)
557 .await?
558 .into_iter()
559 .map(|row| -> Result<PublicKey, Error> {
560 Ok(column_as_string!(
561 &row[0],
562 PublicKey::from_hex,
563 PublicKey::from_slice
564 ))
565 })
566 .collect::<Result<Vec<_>, _>>()
567 }
568}