1use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::path::Path;
6use std::str::FromStr;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use cdk_common::database::{validate_kvstore_params, WalletDatabase};
11use cdk_common::mint_url::MintUrl;
12use cdk_common::nut00::KnownMethod;
13use cdk_common::util::unix_time;
14use cdk_common::wallet::{
15 self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
16};
17use cdk_common::{
18 database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod,
19 PublicKey, SpendingConditions, State,
20};
21use redb::{Database, MultimapTableDefinition, ReadableTable, TableDefinition};
22use tracing::instrument;
23
24use super::error::Error;
25use crate::migrations::migrate_00_to_01;
26use crate::wallet::migrations::{migrate_01_to_02, migrate_02_to_03, migrate_03_to_04};
27
28mod migrations;
29
30const MINTS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mints_table");
32const MINT_KEYSETS_TABLE: MultimapTableDefinition<&str, &[u8]> =
34 MultimapTableDefinition::new("mint_keysets");
35const KEYSETS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("keysets");
37const MINT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_quotes");
39const MELT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("melt_quotes");
41const MINT_KEYS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_keys");
42const PROOFS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("proofs");
44const CONFIG_TABLE: TableDefinition<&str, &str> = TableDefinition::new("config");
45const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_counter");
46const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
48const SAGAS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("wallet_sagas");
50
51const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
52const KV_STORE_TABLE: TableDefinition<(&str, &str, &str), &[u8]> = TableDefinition::new("kv_store");
54
55const DATABASE_VERSION: u32 = 4;
56
57#[derive(Debug, Clone)]
59pub struct WalletRedbDatabase {
60 db: Arc<Database>,
61}
62
63impl WalletRedbDatabase {
64 pub fn new(path: &Path) -> Result<Self, Error> {
66 {
67 if let Some(parent) = path.parent() {
69 if !parent.exists() {
70 return Err(Error::Io(std::io::Error::new(
71 std::io::ErrorKind::NotFound,
72 format!("Parent directory does not exist: {parent:?}"),
73 )));
74 }
75 }
76
77 let db = Arc::new(Database::create(path)?);
78
79 let db_version: Option<String>;
80 {
81 let read_txn = db.begin_read()?;
83 let table = read_txn.open_table(CONFIG_TABLE);
84
85 db_version = match table {
86 Ok(table) => table.get("db_version")?.map(|v| v.value().to_string()),
87 Err(_) => None,
88 };
89 }
90
91 match db_version {
92 Some(db_version) => {
93 let mut current_file_version = u32::from_str(&db_version)?;
94 tracing::info!("Current file version {}", current_file_version);
95
96 match current_file_version.cmp(&DATABASE_VERSION) {
97 Ordering::Less => {
98 tracing::info!(
99 "Database needs to be upgraded at {} current is {}",
100 current_file_version,
101 DATABASE_VERSION
102 );
103 if current_file_version == 0 {
104 current_file_version = migrate_00_to_01(Arc::clone(&db))?;
105 }
106
107 if current_file_version == 1 {
108 current_file_version = migrate_01_to_02(Arc::clone(&db))?;
109 }
110
111 if current_file_version == 2 {
112 current_file_version = migrate_02_to_03(Arc::clone(&db))?;
113 }
114
115 if current_file_version == 3 {
116 current_file_version = migrate_03_to_04(Arc::clone(&db))?;
117 }
118
119 if current_file_version != DATABASE_VERSION {
120 tracing::warn!(
121 "Database upgrade did not complete at {} current is {}",
122 current_file_version,
123 DATABASE_VERSION
124 );
125 return Err(Error::UnknownDatabaseVersion);
126 }
127
128 let write_txn = db.begin_write()?;
129 {
130 let mut table = write_txn.open_table(CONFIG_TABLE)?;
131
132 table
133 .insert("db_version", DATABASE_VERSION.to_string().as_str())?;
134 }
135
136 write_txn.commit()?;
137 }
138 Ordering::Equal => {
139 tracing::info!("Database is at current version {}", DATABASE_VERSION);
140 }
141 Ordering::Greater => {
142 tracing::warn!(
143 "Database upgrade did not complete at {} current is {}",
144 current_file_version,
145 DATABASE_VERSION
146 );
147 return Err(Error::UnknownDatabaseVersion);
148 }
149 }
150 }
151 None => {
152 let write_txn = db.begin_write()?;
153 {
154 let mut table = write_txn.open_table(CONFIG_TABLE)?;
155 let _ = write_txn.open_table(MINTS_TABLE)?;
157 let _ = write_txn.open_multimap_table(MINT_KEYSETS_TABLE)?;
158 let _ = write_txn.open_table(KEYSETS_TABLE)?;
159 let _ = write_txn.open_table(MINT_QUOTES_TABLE)?;
160 let _ = write_txn.open_table(MELT_QUOTES_TABLE)?;
161 let _ = write_txn.open_table(MINT_KEYS_TABLE)?;
162 let _ = write_txn.open_table(PROOFS_TABLE)?;
163 let _ = write_txn.open_table(KEYSET_COUNTER)?;
164 let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
165 let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
166 let _ = write_txn.open_table(KV_STORE_TABLE)?;
167 table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
168 }
169
170 write_txn.commit()?;
171 }
172 }
173 drop(db);
174 }
175
176 if let Some(parent) = path.parent() {
178 if !parent.exists() {
179 return Err(Error::Io(std::io::Error::new(
180 std::io::ErrorKind::NotFound,
181 format!("Parent directory does not exist: {parent:?}"),
182 )));
183 }
184 }
185
186 let mut db = Database::create(path)?;
187
188 db.upgrade()?;
189
190 Ok(Self { db: Arc::new(db) })
191 }
192}
193
194#[async_trait]
195impl WalletDatabase<database::Error> for WalletRedbDatabase {
196 #[instrument(skip(self))]
197 async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
198 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
199 let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
200
201 if let Some(mint_info) = table
202 .get(mint_url.to_string().as_str())
203 .map_err(Error::from)?
204 {
205 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
206 }
207
208 Ok(None)
209 }
210
211 #[instrument(skip(self))]
212 async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
213 let read_txn = self.db.begin_read().map_err(Error::from)?;
214 let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
215 let mints = table
216 .iter()
217 .map_err(Error::from)?
218 .flatten()
219 .filter_map(|(mint, mint_info)| {
220 MintUrl::from_str(mint.value())
221 .ok()
222 .map(|url| (url, serde_json::from_str(mint_info.value()).ok()))
223 })
224 .collect();
225
226 Ok(mints)
227 }
228
229 #[instrument(skip(self))]
230 async fn get_mint_keysets(
231 &self,
232 mint_url: MintUrl,
233 ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
234 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
235 let table = read_txn
236 .open_multimap_table(MINT_KEYSETS_TABLE)
237 .map_err(Error::from)?;
238
239 let keyset_ids = table
240 .get(mint_url.to_string().as_str())
241 .map_err(Error::from)?
242 .flatten()
243 .map(|k| Id::from_bytes(k.value()))
244 .collect::<Result<Vec<_>, _>>()?;
245
246 let mut keysets = vec![];
247
248 let keysets_t = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
249
250 for keyset_id in keyset_ids {
251 if let Some(keyset) = keysets_t
252 .get(keyset_id.to_bytes().as_slice())
253 .map_err(Error::from)?
254 {
255 let keyset = serde_json::from_str(keyset.value()).map_err(Error::from)?;
256
257 keysets.push(keyset);
258 }
259 }
260
261 match keysets.is_empty() {
262 true => Ok(None),
263 false => Ok(Some(keysets)),
264 }
265 }
266
267 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
268 async fn get_keyset_by_id(
269 &self,
270 keyset_id: &Id,
271 ) -> Result<Option<KeySetInfo>, database::Error> {
272 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
273 let table = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
274
275 match table
276 .get(keyset_id.to_bytes().as_slice())
277 .map_err(Error::from)?
278 {
279 Some(keyset) => {
280 let keyset: KeySetInfo =
281 serde_json::from_str(keyset.value()).map_err(Error::from)?;
282
283 Ok(Some(keyset))
284 }
285 None => Ok(None),
286 }
287 }
288
289 #[instrument(skip_all)]
290 async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
291 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
292 let table = read_txn
293 .open_table(MINT_QUOTES_TABLE)
294 .map_err(Error::from)?;
295
296 if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
297 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
298 }
299
300 Ok(None)
301 }
302
303 #[instrument(skip_all)]
304 async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
305 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
306 let table = read_txn
307 .open_table(MINT_QUOTES_TABLE)
308 .map_err(Error::from)?;
309
310 Ok(table
311 .iter()
312 .map_err(Error::from)?
313 .flatten()
314 .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
315 .collect())
316 }
317
318 async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
319 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
320 let table = read_txn
321 .open_table(MINT_QUOTES_TABLE)
322 .map_err(Error::from)?;
323
324 Ok(table
325 .iter()
326 .map_err(Error::from)?
327 .flatten()
328 .flat_map(|(_id, quote)| serde_json::from_str::<MintQuote>(quote.value()).ok())
329 .filter(|quote| {
330 quote.amount_issued == Amount::ZERO
331 || quote.payment_method == PaymentMethod::Known(KnownMethod::Bolt12)
332 })
333 .collect())
334 }
335
336 #[instrument(skip_all)]
337 async fn get_melt_quote(
338 &self,
339 quote_id: &str,
340 ) -> Result<Option<wallet::MeltQuote>, database::Error> {
341 let read_txn = self.db.begin_read().map_err(Error::from)?;
342 let table = read_txn
343 .open_table(MELT_QUOTES_TABLE)
344 .map_err(Error::from)?;
345
346 if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
347 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
348 }
349
350 Ok(None)
351 }
352
353 #[instrument(skip_all)]
354 async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
355 let read_txn = self.db.begin_read().map_err(Error::from)?;
356 let table = read_txn
357 .open_table(MELT_QUOTES_TABLE)
358 .map_err(Error::from)?;
359
360 Ok(table
361 .iter()
362 .map_err(Error::from)?
363 .flatten()
364 .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
365 .collect())
366 }
367
368 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
369 async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
370 let read_txn = self.db.begin_read().map_err(Error::from)?;
371 let table = read_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
372
373 if let Some(mint_info) = table
374 .get(keyset_id.to_string().as_str())
375 .map_err(Error::from)?
376 {
377 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
378 }
379
380 Ok(None)
381 }
382
383 #[instrument(skip_all)]
384 async fn get_proofs(
385 &self,
386 mint_url: Option<MintUrl>,
387 unit: Option<CurrencyUnit>,
388 state: Option<Vec<State>>,
389 spending_conditions: Option<Vec<SpendingConditions>>,
390 ) -> Result<Vec<ProofInfo>, database::Error> {
391 let read_txn = self.db.begin_read().map_err(Error::from)?;
392
393 let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
394
395 let proofs: Vec<ProofInfo> = table
396 .iter()
397 .map_err(Error::from)?
398 .flatten()
399 .filter_map(|(_k, v)| {
400 let mut proof = None;
401
402 if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
403 if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
404 {
405 proof = Some(proof_info)
406 }
407 }
408
409 proof
410 })
411 .collect();
412
413 Ok(proofs)
414 }
415
416 #[instrument(skip(self, ys))]
417 async fn get_proofs_by_ys(
418 &self,
419 ys: Vec<PublicKey>,
420 ) -> Result<Vec<ProofInfo>, database::Error> {
421 if ys.is_empty() {
422 return Ok(Vec::new());
423 }
424
425 let read_txn = self.db.begin_read().map_err(Error::from)?;
426 let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
427
428 let mut proofs = Vec::new();
429
430 for y in ys {
431 if let Some(proof) = table.get(y.to_bytes().as_slice()).map_err(Error::from)? {
432 let proof_info =
433 serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
434 proofs.push(proof_info);
435 }
436 }
437
438 Ok(proofs)
439 }
440
441 async fn get_balance(
442 &self,
443 mint_url: Option<MintUrl>,
444 unit: Option<CurrencyUnit>,
445 state: Option<Vec<State>>,
446 ) -> Result<u64, database::Error> {
447 let proofs = self.get_proofs(mint_url, unit, state, None).await?;
450 Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
451 }
452
453 #[instrument(skip(self))]
454 async fn get_transaction(
455 &self,
456 transaction_id: TransactionId,
457 ) -> Result<Option<Transaction>, database::Error> {
458 let read_txn = self.db.begin_read().map_err(Error::from)?;
459 let table = read_txn
460 .open_table(TRANSACTIONS_TABLE)
461 .map_err(Error::from)?;
462
463 if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
464 return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
465 }
466
467 Ok(None)
468 }
469
470 #[instrument(skip(self))]
471 async fn list_transactions(
472 &self,
473 mint_url: Option<MintUrl>,
474 direction: Option<TransactionDirection>,
475 unit: Option<CurrencyUnit>,
476 ) -> Result<Vec<Transaction>, database::Error> {
477 let read_txn = self.db.begin_read().map_err(Error::from)?;
478
479 let table = read_txn
480 .open_table(TRANSACTIONS_TABLE)
481 .map_err(Error::from)?;
482
483 let transactions: Vec<Transaction> = table
484 .iter()
485 .map_err(Error::from)?
486 .flatten()
487 .filter_map(|(_k, v)| {
488 let mut transaction = None;
489
490 if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
491 if tx.matches_conditions(&mint_url, &direction, &unit) {
492 transaction = Some(tx)
493 }
494 }
495
496 transaction
497 })
498 .collect();
499
500 Ok(transactions)
501 }
502
503 #[instrument(skip(self, added, removed_ys))]
504 async fn update_proofs(
505 &self,
506 added: Vec<ProofInfo>,
507 removed_ys: Vec<PublicKey>,
508 ) -> Result<(), database::Error> {
509 let write_txn = self.db.begin_write().map_err(Error::from)?;
510 {
511 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
512
513 for proof_info in added.iter() {
514 table
515 .insert(
516 proof_info.y.to_bytes().as_slice(),
517 serde_json::to_string(&proof_info)
518 .map_err(Error::from)?
519 .as_str(),
520 )
521 .map_err(Error::from)?;
522 }
523
524 for y in removed_ys.iter() {
525 table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
526 }
527 }
528 write_txn.commit().map_err(Error::from)?;
529 Ok(())
530 }
531
532 async fn update_proofs_state(
533 &self,
534 ys: Vec<PublicKey>,
535 state: State,
536 ) -> Result<(), database::Error> {
537 let write_txn = self.db.begin_write().map_err(Error::from)?;
538 {
539 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
540
541 for y in ys {
542 let y_slice = y.to_bytes();
543 let proof = table
544 .get(y_slice.as_slice())
545 .map_err(Error::from)?
546 .ok_or(Error::UnknownY)?;
547
548 let mut proof_info =
549 serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
550 drop(proof);
551
552 proof_info.state = state;
553
554 table
555 .insert(
556 y_slice.as_slice(),
557 serde_json::to_string(&proof_info)
558 .map_err(Error::from)?
559 .as_str(),
560 )
561 .map_err(Error::from)?;
562 }
563 }
564 write_txn.commit().map_err(Error::from)?;
565 Ok(())
566 }
567
568 #[instrument(skip(self))]
569 async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
570 let id = transaction.id();
571 let write_txn = self.db.begin_write().map_err(Error::from)?;
572 {
573 let mut table = write_txn
574 .open_table(TRANSACTIONS_TABLE)
575 .map_err(Error::from)?;
576 table
577 .insert(
578 id.as_slice(),
579 serde_json::to_string(&transaction)
580 .map_err(Error::from)?
581 .as_str(),
582 )
583 .map_err(Error::from)?;
584 }
585 write_txn.commit().map_err(Error::from)?;
586 Ok(())
587 }
588
589 #[instrument(skip(self))]
590 async fn update_mint_url(
591 &self,
592 old_mint_url: MintUrl,
593 new_mint_url: MintUrl,
594 ) -> Result<(), database::Error> {
595 let write_txn = self.db.begin_write().map_err(Error::from)?;
596
597 {
599 let read_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
600 let proofs: Vec<ProofInfo> = read_table
601 .iter()
602 .map_err(Error::from)?
603 .flatten()
604 .filter_map(|(_k, v)| {
605 let proof_info = serde_json::from_str::<ProofInfo>(v.value()).ok()?;
606 if proof_info.mint_url == old_mint_url {
607 Some(proof_info)
608 } else {
609 None
610 }
611 })
612 .collect();
613 drop(read_table);
614
615 if !proofs.is_empty() {
616 let mut write_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
617 for mut proof_info in proofs {
618 proof_info.mint_url = new_mint_url.clone();
619 write_table
620 .insert(
621 proof_info.y.to_bytes().as_slice(),
622 serde_json::to_string(&proof_info)
623 .map_err(Error::from)?
624 .as_str(),
625 )
626 .map_err(Error::from)?;
627 }
628 }
629 }
630
631 {
633 let mut table = write_txn
634 .open_table(MINT_QUOTES_TABLE)
635 .map_err(Error::from)?;
636
637 let unix_time = unix_time();
638
639 let quotes: Vec<MintQuote> = table
640 .iter()
641 .map_err(Error::from)?
642 .flatten()
643 .filter_map(|(_, quote)| {
644 let mut q: MintQuote = serde_json::from_str(quote.value()).ok()?;
645 if q.mint_url == old_mint_url && q.expiry >= unix_time {
646 q.mint_url = new_mint_url.clone();
647 Some(q)
648 } else {
649 None
650 }
651 })
652 .collect();
653
654 for quote in quotes {
655 table
656 .insert(
657 quote.id.as_str(),
658 serde_json::to_string("e).map_err(Error::from)?.as_str(),
659 )
660 .map_err(Error::from)?;
661 }
662 }
663
664 write_txn.commit().map_err(Error::from)?;
665 Ok(())
666 }
667
668 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
669 async fn increment_keyset_counter(
670 &self,
671 keyset_id: &Id,
672 count: u32,
673 ) -> Result<u32, database::Error> {
674 let write_txn = self.db.begin_write().map_err(Error::from)?;
675 let new_counter = {
676 let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
677 let current_counter = table
678 .get(keyset_id.to_string().as_str())
679 .map_err(Error::from)?
680 .map(|x| x.value())
681 .unwrap_or_default();
682
683 let new_counter = current_counter + count;
684
685 table
686 .insert(keyset_id.to_string().as_str(), new_counter)
687 .map_err(Error::from)?;
688
689 new_counter
690 };
691 write_txn.commit().map_err(Error::from)?;
692 Ok(new_counter)
693 }
694
695 #[instrument(skip(self))]
696 async fn add_mint(
697 &self,
698 mint_url: MintUrl,
699 mint_info: Option<MintInfo>,
700 ) -> Result<(), database::Error> {
701 let write_txn = self.db.begin_write().map_err(Error::from)?;
702 {
703 let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
704 table
705 .insert(
706 mint_url.to_string().as_str(),
707 serde_json::to_string(&mint_info)
708 .map_err(Error::from)?
709 .as_str(),
710 )
711 .map_err(Error::from)?;
712 }
713 write_txn.commit().map_err(Error::from)?;
714 Ok(())
715 }
716
717 #[instrument(skip(self))]
718 async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
719 let write_txn = self.db.begin_write().map_err(Error::from)?;
720 {
721 let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
722 table
723 .remove(mint_url.to_string().as_str())
724 .map_err(Error::from)?;
725 }
726 write_txn.commit().map_err(Error::from)?;
727 Ok(())
728 }
729
730 #[instrument(skip(self))]
731 async fn add_mint_keysets(
732 &self,
733 mint_url: MintUrl,
734 keysets: Vec<KeySetInfo>,
735 ) -> Result<(), database::Error> {
736 let write_txn = self.db.begin_write().map_err(Error::from)?;
737 {
738 let mut table = write_txn
739 .open_multimap_table(MINT_KEYSETS_TABLE)
740 .map_err(Error::from)?;
741 let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
742 let mut u32_table = write_txn
743 .open_table(KEYSET_U32_MAPPING)
744 .map_err(Error::from)?;
745
746 let mut existing_u32 = false;
747
748 for keyset in keysets {
749 let existing_keyset = {
751 let existing_keyset = keysets_table
752 .get(keyset.id.to_bytes().as_slice())
753 .map_err(Error::from)?;
754
755 existing_keyset.map(|r| r.value().to_string())
756 };
757
758 let existing = u32_table
759 .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
760 .map_err(Error::from)?;
761
762 match existing {
763 None => existing_u32 = false,
764 Some(id) => {
765 let id = Id::from_str(id.value())?;
766
767 if id == keyset.id {
768 existing_u32 = false;
769 } else {
770 existing_u32 = true;
771 break;
772 }
773 }
774 }
775
776 let keyset = if let Some(existing_keyset) = existing_keyset {
777 let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
778
779 existing_keyset.active = keyset.active;
780 existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
781
782 existing_keyset
783 } else {
784 table
785 .insert(
786 mint_url.to_string().as_str(),
787 keyset.id.to_bytes().as_slice(),
788 )
789 .map_err(Error::from)?;
790
791 keyset
792 };
793
794 keysets_table
795 .insert(
796 keyset.id.to_bytes().as_slice(),
797 serde_json::to_string(&keyset)
798 .map_err(Error::from)?
799 .as_str(),
800 )
801 .map_err(Error::from)?;
802 }
803
804 if existing_u32 {
805 tracing::warn!("Keyset already exists for keyset id");
806 return Err(database::Error::Duplicate);
807 }
808 }
809 write_txn.commit().map_err(Error::from)?;
810 Ok(())
811 }
812
813 #[instrument(skip_all)]
814 async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
815 let write_txn = self.db.begin_write().map_err(Error::from)?;
816 {
817 let mut table = write_txn
818 .open_table(MINT_QUOTES_TABLE)
819 .map_err(Error::from)?;
820
821 let existing_quote_json = table
823 .get(quote.id.as_str())
824 .map_err(Error::from)?
825 .map(|v| v.value().to_string());
826
827 let mut quote_to_save = quote.clone();
828
829 if let Some(json) = existing_quote_json {
830 let existing_quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
831
832 if existing_quote.version != quote.version {
833 return Err(database::Error::ConcurrentUpdate);
834 }
835
836 quote_to_save.version = quote.version.wrapping_add(1);
838 }
839
840 table
841 .insert(
842 quote_to_save.id.as_str(),
843 serde_json::to_string("e_to_save)
844 .map_err(Error::from)?
845 .as_str(),
846 )
847 .map_err(Error::from)?;
848 }
849 write_txn.commit().map_err(Error::from)?;
850 Ok(())
851 }
852
853 #[instrument(skip_all)]
854 async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
855 let write_txn = self.db.begin_write().map_err(Error::from)?;
856 {
857 let mut table = write_txn
858 .open_table(MINT_QUOTES_TABLE)
859 .map_err(Error::from)?;
860 table.remove(quote_id).map_err(Error::from)?;
861 }
862 write_txn.commit().map_err(Error::from)?;
863 Ok(())
864 }
865
866 #[instrument(skip_all)]
867 async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
868 let write_txn = self.db.begin_write().map_err(Error::from)?;
869 {
870 let mut table = write_txn
871 .open_table(MELT_QUOTES_TABLE)
872 .map_err(Error::from)?;
873
874 let existing_quote_json = table
876 .get(quote.id.as_str())
877 .map_err(Error::from)?
878 .map(|v| v.value().to_string());
879
880 let mut quote_to_save = quote.clone();
881
882 if let Some(json) = existing_quote_json {
883 let existing_quote: wallet::MeltQuote =
884 serde_json::from_str(&json).map_err(Error::from)?;
885
886 if existing_quote.version != quote.version {
887 return Err(database::Error::ConcurrentUpdate);
888 }
889
890 quote_to_save.version = quote.version.wrapping_add(1);
892 }
893
894 table
895 .insert(
896 quote_to_save.id.as_str(),
897 serde_json::to_string("e_to_save)
898 .map_err(Error::from)?
899 .as_str(),
900 )
901 .map_err(Error::from)?;
902 }
903 write_txn.commit().map_err(Error::from)?;
904 Ok(())
905 }
906
907 #[instrument(skip_all)]
908 async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
909 let write_txn = self.db.begin_write().map_err(Error::from)?;
910 {
911 let mut table = write_txn
912 .open_table(MELT_QUOTES_TABLE)
913 .map_err(Error::from)?;
914 table.remove(quote_id).map_err(Error::from)?;
915 }
916 write_txn.commit().map_err(Error::from)?;
917 Ok(())
918 }
919
920 #[instrument(skip_all)]
921 async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
922 let write_txn = self.db.begin_write().map_err(Error::from)?;
923
924 keyset.verify_id()?;
925
926 {
927 let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
928
929 let existing_keys = table
930 .insert(
931 keyset.id.to_string().as_str(),
932 serde_json::to_string(&keyset.keys)
933 .map_err(Error::from)?
934 .as_str(),
935 )
936 .map_err(Error::from)?
937 .is_some();
938
939 let mut table = write_txn
940 .open_table(KEYSET_U32_MAPPING)
941 .map_err(Error::from)?;
942
943 let existing = table
944 .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
945 .map_err(Error::from)?;
946
947 let existing_u32 = match existing {
948 None => false,
949 Some(id) => {
950 let id = Id::from_str(id.value())?;
951 id != keyset.id
952 }
953 };
954
955 if existing_keys || existing_u32 {
956 tracing::warn!("Keys already exist for keyset id");
957 return Err(database::Error::Duplicate);
958 }
959 }
960 write_txn.commit().map_err(Error::from)?;
961 Ok(())
962 }
963
964 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
965 async fn remove_keys(&self, keyset_id: &Id) -> Result<(), database::Error> {
966 let write_txn = self.db.begin_write().map_err(Error::from)?;
967 {
968 let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
969
970 table
971 .remove(keyset_id.to_string().as_str())
972 .map_err(Error::from)?;
973 }
974 write_txn.commit().map_err(Error::from)?;
975 Ok(())
976 }
977
978 #[instrument(skip(self))]
979 async fn remove_transaction(
980 &self,
981 transaction_id: TransactionId,
982 ) -> Result<(), database::Error> {
983 let write_txn = self.db.begin_write().map_err(Error::from)?;
984 {
985 let mut table = write_txn
986 .open_table(TRANSACTIONS_TABLE)
987 .map_err(Error::from)?;
988 table
989 .remove(transaction_id.as_slice())
990 .map_err(Error::from)?;
991 }
992 write_txn.commit().map_err(Error::from)?;
993 Ok(())
994 }
995
996 #[instrument(skip(self))]
997 async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
998 let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
999 let id_str = saga.id.to_string();
1000
1001 let write_txn = self.db.begin_write().map_err(Error::from)?;
1002 {
1003 let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1004 table
1005 .insert(id_str.as_str(), saga_json.as_str())
1006 .map_err(Error::from)?;
1007 }
1008 write_txn.commit().map_err(Error::from)?;
1009 Ok(())
1010 }
1011
1012 #[instrument(skip(self))]
1013 async fn get_saga(
1014 &self,
1015 id: &uuid::Uuid,
1016 ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1017 let read_txn = self.db.begin_read().map_err(Error::from)?;
1018 let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1019 let id_str = id.to_string();
1020
1021 let result = table
1022 .get(id_str.as_str())
1023 .map_err(Error::from)?
1024 .map(|saga| serde_json::from_str(saga.value()).map_err(Error::from))
1025 .transpose()?;
1026
1027 Ok(result)
1028 }
1029
1030 #[instrument(skip(self))]
1031 async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1032 let id_str = saga.id.to_string();
1033
1034 let expected_version = saga.version.saturating_sub(1);
1037
1038 let write_txn = self.db.begin_write().map_err(Error::from)?;
1039 let updated = {
1040 let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1041
1042 let existing_saga_json = table
1044 .get(id_str.as_str())
1045 .map_err(Error::from)?
1046 .map(|v| v.value().to_string());
1047
1048 match existing_saga_json {
1049 Some(json) => {
1050 let existing_saga: wallet::WalletSaga =
1051 serde_json::from_str(&json).map_err(Error::from)?;
1052
1053 if existing_saga.version != expected_version {
1055 false
1057 } else {
1058 let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
1060 table
1061 .insert(id_str.as_str(), saga_json.as_str())
1062 .map_err(Error::from)?;
1063 true
1064 }
1065 }
1066 None => {
1067 false
1069 }
1070 }
1071 };
1072 write_txn.commit().map_err(Error::from)?;
1073 Ok(updated)
1074 }
1075
1076 #[instrument(skip(self))]
1077 async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1078 let write_txn = self.db.begin_write().map_err(Error::from)?;
1079 let id_str = id.to_string();
1080 {
1081 let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1082 table.remove(id_str.as_str()).map_err(Error::from)?;
1083 }
1084 write_txn.commit().map_err(Error::from)?;
1085 Ok(())
1086 }
1087
1088 #[instrument(skip(self))]
1089 async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1090 let read_txn = self.db.begin_read().map_err(Error::from)?;
1091 let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1092
1093 let mut sagas: Vec<wallet::WalletSaga> = table
1094 .iter()
1095 .map_err(Error::from)?
1096 .flatten()
1097 .filter_map(|(_, saga_json)| {
1098 serde_json::from_str::<wallet::WalletSaga>(saga_json.value()).ok()
1099 })
1100 .collect();
1101
1102 sagas.sort_by_key(|saga| saga.created_at);
1104
1105 Ok(sagas)
1106 }
1107
1108 #[instrument(skip(self))]
1109 async fn reserve_proofs(
1110 &self,
1111 ys: Vec<PublicKey>,
1112 operation_id: &uuid::Uuid,
1113 ) -> Result<(), database::Error> {
1114 let write_txn = self.db.begin_write().map_err(Error::from)?;
1115
1116 {
1117 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1118
1119 for y in ys {
1120 let y_bytes = y.to_bytes();
1121
1122 let proof_json_str = {
1124 let proof_json_opt = table.get(y_bytes.as_slice()).map_err(Error::from)?;
1125 proof_json_opt.map(|proof_json| proof_json.value().to_string())
1126 };
1127
1128 let Some(proof_json_str) = proof_json_str else {
1129 return Err(database::Error::ProofNotUnspent);
1130 };
1131
1132 let mut proof: ProofInfo =
1133 serde_json::from_str(&proof_json_str).map_err(Error::from)?;
1134
1135 if proof.state != State::Unspent {
1136 return Err(database::Error::ProofNotUnspent);
1137 }
1138
1139 proof.state = State::Reserved;
1140 proof.used_by_operation = Some(*operation_id);
1141
1142 let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1143 table
1144 .insert(y_bytes.as_slice(), updated_json.as_str())
1145 .map_err(Error::from)?;
1146 }
1147 }
1148
1149 write_txn.commit().map_err(Error::from)?;
1150 Ok(())
1151 }
1152
1153 #[instrument(skip(self))]
1154 async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1155 let write_txn = self.db.begin_write().map_err(Error::from)?;
1156
1157 {
1158 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1159
1160 let all_proofs: Vec<(Vec<u8>, ProofInfo)> = table
1162 .iter()
1163 .map_err(Error::from)?
1164 .flatten()
1165 .filter_map(|(y, proof_json)| {
1166 let proof: ProofInfo = serde_json::from_str(proof_json.value()).ok()?;
1167 Some((y.value().to_vec(), proof))
1168 })
1169 .collect();
1170
1171 for (y_bytes, mut proof) in all_proofs {
1173 if proof.used_by_operation == Some(*operation_id) {
1174 proof.state = State::Unspent;
1175 proof.used_by_operation = None;
1176
1177 let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1178 table
1179 .insert(y_bytes.as_slice(), updated_json.as_str())
1180 .map_err(Error::from)?;
1181 }
1182 }
1183 }
1184
1185 write_txn.commit().map_err(Error::from)?;
1186 Ok(())
1187 }
1188
1189 #[instrument(skip(self))]
1190 async fn get_reserved_proofs(
1191 &self,
1192 operation_id: &uuid::Uuid,
1193 ) -> Result<Vec<ProofInfo>, database::Error> {
1194 let read_txn = self.db.begin_read().map_err(Error::from)?;
1195 let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1196
1197 let proofs: Vec<ProofInfo> = table
1198 .iter()
1199 .map_err(Error::from)?
1200 .flatten()
1201 .filter_map(|(_, proof_json)| {
1202 serde_json::from_str::<ProofInfo>(proof_json.value()).ok()
1203 })
1204 .filter(|proof| proof.used_by_operation == Some(*operation_id))
1205 .collect();
1206
1207 Ok(proofs)
1208 }
1209
1210 #[instrument(skip(self))]
1211 async fn reserve_melt_quote(
1212 &self,
1213 quote_id: &str,
1214 operation_id: &uuid::Uuid,
1215 ) -> Result<(), database::Error> {
1216 let write_txn = self.db.begin_write().map_err(Error::from)?;
1217 let operation_id_str = operation_id.to_string();
1218
1219 {
1220 let mut table = write_txn
1221 .open_table(MELT_QUOTES_TABLE)
1222 .map_err(Error::from)?;
1223
1224 let quote_json = table
1226 .get(quote_id)
1227 .map_err(Error::from)?
1228 .map(|v| v.value().to_string());
1229
1230 match quote_json {
1231 Some(json) => {
1232 let mut quote: wallet::MeltQuote =
1233 serde_json::from_str(&json).map_err(Error::from)?;
1234
1235 if quote.used_by_operation.is_some() {
1237 return Err(database::Error::QuoteAlreadyInUse);
1238 }
1239
1240 quote.used_by_operation = Some(operation_id_str);
1242 let updated_json = serde_json::to_string("e).map_err(Error::from)?;
1243 table
1244 .insert(quote_id, updated_json.as_str())
1245 .map_err(Error::from)?;
1246 }
1247 None => {
1248 return Err(database::Error::UnknownQuote);
1249 }
1250 }
1251 }
1252
1253 write_txn.commit().map_err(Error::from)?;
1254 Ok(())
1255 }
1256
1257 #[instrument(skip(self))]
1258 async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1259 let write_txn = self.db.begin_write().map_err(Error::from)?;
1260 let operation_id_str = operation_id.to_string();
1261
1262 {
1263 let mut table = write_txn
1264 .open_table(MELT_QUOTES_TABLE)
1265 .map_err(Error::from)?;
1266
1267 let all_quotes: Vec<(String, wallet::MeltQuote)> = table
1269 .iter()
1270 .map_err(Error::from)?
1271 .flatten()
1272 .filter_map(|(id, quote_json)| {
1273 let quote: wallet::MeltQuote = serde_json::from_str(quote_json.value()).ok()?;
1274 Some((id.value().to_string(), quote))
1275 })
1276 .collect();
1277
1278 for (quote_id, mut quote) in all_quotes {
1280 if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1281 quote.used_by_operation = None;
1282 let updated_json = serde_json::to_string("e).map_err(Error::from)?;
1283 table
1284 .insert(quote_id.as_str(), updated_json.as_str())
1285 .map_err(Error::from)?;
1286 }
1287 }
1288 }
1289
1290 write_txn.commit().map_err(Error::from)?;
1291 Ok(())
1292 }
1293
1294 #[instrument(skip(self))]
1295 async fn reserve_mint_quote(
1296 &self,
1297 quote_id: &str,
1298 operation_id: &uuid::Uuid,
1299 ) -> Result<(), database::Error> {
1300 let write_txn = self.db.begin_write().map_err(Error::from)?;
1301 let operation_id_str = operation_id.to_string();
1302
1303 {
1304 let mut table = write_txn
1305 .open_table(MINT_QUOTES_TABLE)
1306 .map_err(Error::from)?;
1307
1308 let quote_json = table
1310 .get(quote_id)
1311 .map_err(Error::from)?
1312 .map(|v| v.value().to_string());
1313
1314 match quote_json {
1315 Some(json) => {
1316 let mut quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
1317
1318 if quote.used_by_operation.is_some() {
1320 return Err(database::Error::QuoteAlreadyInUse);
1321 }
1322
1323 quote.used_by_operation = Some(operation_id_str);
1325 let updated_json = serde_json::to_string("e).map_err(Error::from)?;
1326 table
1327 .insert(quote_id, updated_json.as_str())
1328 .map_err(Error::from)?;
1329 }
1330 None => {
1331 return Err(database::Error::UnknownQuote);
1332 }
1333 }
1334 }
1335
1336 write_txn.commit().map_err(Error::from)?;
1337 Ok(())
1338 }
1339
1340 #[instrument(skip(self))]
1341 async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1342 let write_txn = self.db.begin_write().map_err(Error::from)?;
1343 let operation_id_str = operation_id.to_string();
1344
1345 {
1346 let mut table = write_txn
1347 .open_table(MINT_QUOTES_TABLE)
1348 .map_err(Error::from)?;
1349
1350 let all_quotes: Vec<(String, MintQuote)> = table
1352 .iter()
1353 .map_err(Error::from)?
1354 .flatten()
1355 .filter_map(|(id, quote_json)| {
1356 let quote: MintQuote = serde_json::from_str(quote_json.value()).ok()?;
1357 Some((id.value().to_string(), quote))
1358 })
1359 .collect();
1360
1361 for (quote_id, mut quote) in all_quotes {
1363 if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1364 quote.used_by_operation = None;
1365 let updated_json = serde_json::to_string("e).map_err(Error::from)?;
1366 table
1367 .insert(quote_id.as_str(), updated_json.as_str())
1368 .map_err(Error::from)?;
1369 }
1370 }
1371 }
1372
1373 write_txn.commit().map_err(Error::from)?;
1374 Ok(())
1375 }
1376
1377 #[instrument(skip(self, value))]
1378 async fn kv_write(
1379 &self,
1380 primary_namespace: &str,
1381 secondary_namespace: &str,
1382 key: &str,
1383 value: &[u8],
1384 ) -> Result<(), database::Error> {
1385 validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1387
1388 let write_txn = self.db.begin_write().map_err(Error::from)?;
1389 {
1390 let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1391 table
1392 .insert((primary_namespace, secondary_namespace, key), value)
1393 .map_err(Error::from)?;
1394 }
1395 write_txn.commit().map_err(Error::from)?;
1396
1397 Ok(())
1398 }
1399
1400 #[instrument(skip(self))]
1401 async fn kv_read(
1402 &self,
1403 primary_namespace: &str,
1404 secondary_namespace: &str,
1405 key: &str,
1406 ) -> Result<Option<Vec<u8>>, database::Error> {
1407 validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1409
1410 let read_txn = self.db.begin_read().map_err(Error::from)?;
1411 let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1412
1413 let result = table
1414 .get((primary_namespace, secondary_namespace, key))
1415 .map_err(Error::from)?
1416 .map(|v| v.value().to_vec());
1417
1418 Ok(result)
1419 }
1420
1421 #[instrument(skip(self))]
1422 async fn kv_list(
1423 &self,
1424 primary_namespace: &str,
1425 secondary_namespace: &str,
1426 ) -> Result<Vec<String>, database::Error> {
1427 validate_kvstore_params(primary_namespace, secondary_namespace, None)?;
1429
1430 let read_txn = self.db.begin_read().map_err(Error::from)?;
1431 let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1432
1433 let start = (primary_namespace, secondary_namespace, "");
1434 let iter = table.range(start..).map_err(Error::from)?;
1435
1436 let mut keys = Vec::new();
1437
1438 for item in iter {
1439 let (key, _) = item.map_err(Error::from)?;
1440 let (p, s, k) = key.value();
1441 if p == primary_namespace && s == secondary_namespace {
1442 keys.push(k.to_string());
1443 } else {
1444 break;
1445 }
1446 }
1447
1448 Ok(keys)
1449 }
1450
1451 #[instrument(skip(self))]
1452 async fn kv_remove(
1453 &self,
1454 primary_namespace: &str,
1455 secondary_namespace: &str,
1456 key: &str,
1457 ) -> Result<(), database::Error> {
1458 validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1460
1461 let write_txn = self.db.begin_write().map_err(Error::from)?;
1462 {
1463 let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1464 table
1465 .remove((primary_namespace, secondary_namespace, key))
1466 .map_err(Error::from)?;
1467 }
1468 write_txn.commit().map_err(Error::from)?;
1469
1470 Ok(())
1471 }
1472}
1473
1474#[cfg(test)]
1475mod test {
1476 use std::path::PathBuf;
1477
1478 use cdk_common::wallet_db_test;
1479
1480 use super::WalletRedbDatabase;
1481
1482 async fn provide_db(test_id: String) -> WalletRedbDatabase {
1483 let path = PathBuf::from(format!("/tmp/cdk-test-{}.redb", test_id));
1484 WalletRedbDatabase::new(&path).expect("database")
1485 }
1486
1487 wallet_db_test!(provide_db);
1488}