1use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::path::Path;
6use std::str::FromStr;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use cdk_common::bitcoin::bip32::DerivationPath;
11use cdk_common::database::{validate_kvstore_params, WalletDatabase};
12use cdk_common::mint_url::MintUrl;
13use cdk_common::nut00::KnownMethod;
14use cdk_common::util::unix_time;
15use cdk_common::wallet::{
16 self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
17};
18use cdk_common::{
19 database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod,
20 PublicKey, SpendingConditions, State,
21};
22use redb::{Database, MultimapTableDefinition, ReadableDatabase, ReadableTable, TableDefinition};
23use tracing::instrument;
24
25use crate::error::Error;
26use crate::migrations::migrate_00_to_01;
27use crate::wallet::migrations::{
28 migrate_01_to_02, migrate_02_to_03, migrate_03_to_04, migrate_04_to_05,
29};
30
31mod migrations;
32
33const MINTS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mints_table");
35const MINT_KEYSETS_TABLE: MultimapTableDefinition<&str, &[u8]> =
37 MultimapTableDefinition::new("mint_keysets");
38const KEYSETS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("keysets");
40const MINT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_quotes");
42const MELT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("melt_quotes");
44const MINT_KEYS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_keys");
45const PROOFS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("proofs");
47const CONFIG_TABLE: TableDefinition<&str, &str> = TableDefinition::new("config");
48const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_counter");
49const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
51const SAGAS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("wallet_sagas");
53
54const P2PK_SIGNING_KEYS_TABLE: TableDefinition<&[u8], &str> =
56 TableDefinition::new("p2pk_signing_keys");
57
58const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
59const KV_STORE_TABLE: TableDefinition<(&str, &str, &str), &[u8]> = TableDefinition::new("kv_store");
61
62const DATABASE_VERSION: u32 = 5;
63
64#[derive(Debug, Clone)]
66pub struct WalletRedbDatabase {
67 db: Arc<Database>,
68}
69
70impl WalletRedbDatabase {
71 pub fn new(path: &Path) -> Result<Self, Error> {
73 {
74 if let Some(parent) = path.parent() {
76 if !parent.exists() {
77 return Err(Error::Io(std::io::Error::new(
78 std::io::ErrorKind::NotFound,
79 format!("Parent directory does not exist: {parent:?}"),
80 )));
81 }
82 }
83
84 let db = Arc::new(Database::create(path)?);
85
86 let db_version: Option<String>;
87 {
88 let read_txn = db.begin_read()?;
90 let table = read_txn.open_table(CONFIG_TABLE);
91
92 db_version = match table {
93 Ok(table) => table.get("db_version")?.map(|v| v.value().to_string()),
94 Err(_) => None,
95 };
96 }
97
98 match db_version {
99 Some(db_version) => {
100 let mut current_file_version = u32::from_str(&db_version)?;
101 tracing::info!("Current file version {}", current_file_version);
102
103 match current_file_version.cmp(&DATABASE_VERSION) {
104 Ordering::Less => {
105 tracing::info!(
106 "Database needs to be upgraded at {} current is {}",
107 current_file_version,
108 DATABASE_VERSION
109 );
110 if current_file_version == 0 {
111 current_file_version = migrate_00_to_01(Arc::clone(&db))?;
112 }
113
114 if current_file_version == 1 {
115 current_file_version = migrate_01_to_02(Arc::clone(&db))?;
116 }
117
118 if current_file_version == 2 {
119 current_file_version = migrate_02_to_03(Arc::clone(&db))?;
120 }
121
122 if current_file_version == 3 {
123 current_file_version = migrate_03_to_04(Arc::clone(&db))?;
124 }
125
126 if current_file_version == 4 {
127 current_file_version = migrate_04_to_05(Arc::clone(&db))?;
128 }
129
130 if current_file_version != DATABASE_VERSION {
131 tracing::warn!(
132 "Database upgrade did not complete at {} current is {}",
133 current_file_version,
134 DATABASE_VERSION
135 );
136 return Err(Error::UnknownDatabaseVersion);
137 }
138
139 let write_txn = db.begin_write()?;
140 {
141 let mut table = write_txn.open_table(CONFIG_TABLE)?;
142
143 table
144 .insert("db_version", DATABASE_VERSION.to_string().as_str())?;
145 }
146
147 write_txn.commit()?;
148 }
149 Ordering::Equal => {
150 tracing::info!("Database is at current version {}", DATABASE_VERSION);
151 }
152 Ordering::Greater => {
153 tracing::warn!(
154 "Database upgrade did not complete at {} current is {}",
155 current_file_version,
156 DATABASE_VERSION
157 );
158 return Err(Error::UnknownDatabaseVersion);
159 }
160 }
161 }
162 None => {
163 let write_txn = db.begin_write()?;
164 {
165 let mut table = write_txn.open_table(CONFIG_TABLE)?;
166 let _ = write_txn.open_table(MINTS_TABLE)?;
168 let _ = write_txn.open_multimap_table(MINT_KEYSETS_TABLE)?;
169 let _ = write_txn.open_table(KEYSETS_TABLE)?;
170 let _ = write_txn.open_table(MINT_QUOTES_TABLE)?;
171 let _ = write_txn.open_table(MELT_QUOTES_TABLE)?;
172 let _ = write_txn.open_table(MINT_KEYS_TABLE)?;
173 let _ = write_txn.open_table(PROOFS_TABLE)?;
174 let _ = write_txn.open_table(KEYSET_COUNTER)?;
175 let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
176 let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
177 let _ = write_txn.open_table(KV_STORE_TABLE)?;
178 let _ = write_txn.open_table(P2PK_SIGNING_KEYS_TABLE)?;
179 table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
180 }
181
182 write_txn.commit()?;
183 }
184 }
185 drop(db);
186 }
187
188 if let Some(parent) = path.parent() {
190 if !parent.exists() {
191 return Err(Error::Io(std::io::Error::new(
192 std::io::ErrorKind::NotFound,
193 format!("Parent directory does not exist: {parent:?}"),
194 )));
195 }
196 }
197
198 let db = Database::create(path)?;
199
200 Ok(Self { db: Arc::new(db) })
201 }
202}
203
204#[async_trait]
205impl WalletDatabase<database::Error> for WalletRedbDatabase {
206 #[instrument(skip(self))]
207 async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
208 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
209 let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
210
211 if let Some(mint_info) = table
212 .get(mint_url.to_string().as_str())
213 .map_err(Error::from)?
214 {
215 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
216 }
217
218 Ok(None)
219 }
220
221 #[instrument(skip(self))]
222 async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
223 let read_txn = self.db.begin_read().map_err(Error::from)?;
224 let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
225 let mints = table
226 .iter()
227 .map_err(Error::from)?
228 .flatten()
229 .filter_map(|(mint, mint_info)| {
230 MintUrl::from_str(mint.value())
231 .ok()
232 .map(|url| (url, serde_json::from_str(mint_info.value()).ok()))
233 })
234 .collect();
235
236 Ok(mints)
237 }
238
239 #[instrument(skip(self))]
240 async fn get_mint_keysets(
241 &self,
242 mint_url: MintUrl,
243 ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
244 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
245 let table = read_txn
246 .open_multimap_table(MINT_KEYSETS_TABLE)
247 .map_err(Error::from)?;
248
249 let keyset_ids = table
250 .get(mint_url.to_string().as_str())
251 .map_err(Error::from)?
252 .flatten()
253 .map(|k| Id::from_bytes(k.value()))
254 .collect::<Result<Vec<_>, _>>()?;
255
256 let mut keysets = vec![];
257
258 let keysets_t = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
259
260 for keyset_id in keyset_ids {
261 if let Some(keyset) = keysets_t
262 .get(keyset_id.to_bytes().as_slice())
263 .map_err(Error::from)?
264 {
265 let keyset = serde_json::from_str(keyset.value()).map_err(Error::from)?;
266
267 keysets.push(keyset);
268 }
269 }
270
271 match keysets.is_empty() {
272 true => Ok(None),
273 false => Ok(Some(keysets)),
274 }
275 }
276
277 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
278 async fn get_keyset_by_id(
279 &self,
280 keyset_id: &Id,
281 ) -> Result<Option<KeySetInfo>, database::Error> {
282 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
283 let table = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
284
285 match table
286 .get(keyset_id.to_bytes().as_slice())
287 .map_err(Error::from)?
288 {
289 Some(keyset) => {
290 let keyset: KeySetInfo =
291 serde_json::from_str(keyset.value()).map_err(Error::from)?;
292
293 Ok(Some(keyset))
294 }
295 None => Ok(None),
296 }
297 }
298
299 #[instrument(skip_all)]
300 async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
301 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
302 let table = read_txn
303 .open_table(MINT_QUOTES_TABLE)
304 .map_err(Error::from)?;
305
306 if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
307 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
308 }
309
310 Ok(None)
311 }
312
313 #[instrument(skip_all)]
314 async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
315 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
316 let table = read_txn
317 .open_table(MINT_QUOTES_TABLE)
318 .map_err(Error::from)?;
319
320 Ok(table
321 .iter()
322 .map_err(Error::from)?
323 .flatten()
324 .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
325 .collect())
326 }
327
328 async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
329 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
330 let table = read_txn
331 .open_table(MINT_QUOTES_TABLE)
332 .map_err(Error::from)?;
333
334 Ok(table
335 .iter()
336 .map_err(Error::from)?
337 .flatten()
338 .flat_map(|(_id, quote)| serde_json::from_str::<MintQuote>(quote.value()).ok())
339 .filter(|quote| {
340 quote.amount_issued == Amount::ZERO
341 || quote.payment_method == PaymentMethod::Known(KnownMethod::Bolt12)
342 })
343 .collect())
344 }
345
346 #[instrument(skip_all)]
347 async fn get_melt_quote(
348 &self,
349 quote_id: &str,
350 ) -> Result<Option<wallet::MeltQuote>, database::Error> {
351 let read_txn = self.db.begin_read().map_err(Error::from)?;
352 let table = read_txn
353 .open_table(MELT_QUOTES_TABLE)
354 .map_err(Error::from)?;
355
356 if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
357 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
358 }
359
360 Ok(None)
361 }
362
363 #[instrument(skip_all)]
364 async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
365 let read_txn = self.db.begin_read().map_err(Error::from)?;
366 let table = read_txn
367 .open_table(MELT_QUOTES_TABLE)
368 .map_err(Error::from)?;
369
370 Ok(table
371 .iter()
372 .map_err(Error::from)?
373 .flatten()
374 .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
375 .collect())
376 }
377
378 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
379 async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
380 let read_txn = self.db.begin_read().map_err(Error::from)?;
381 let table = read_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
382
383 if let Some(mint_info) = table
384 .get(keyset_id.to_string().as_str())
385 .map_err(Error::from)?
386 {
387 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
388 }
389
390 Ok(None)
391 }
392
393 #[instrument(skip_all)]
394 async fn get_proofs(
395 &self,
396 mint_url: Option<MintUrl>,
397 unit: Option<CurrencyUnit>,
398 state: Option<Vec<State>>,
399 spending_conditions: Option<Vec<SpendingConditions>>,
400 ) -> Result<Vec<ProofInfo>, database::Error> {
401 let read_txn = self.db.begin_read().map_err(Error::from)?;
402
403 let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
404
405 let proofs: Vec<ProofInfo> = table
406 .iter()
407 .map_err(Error::from)?
408 .flatten()
409 .filter_map(|(_k, v)| {
410 let mut proof = None;
411
412 if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
413 if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
414 {
415 proof = Some(proof_info)
416 }
417 }
418
419 proof
420 })
421 .collect();
422
423 Ok(proofs)
424 }
425
426 #[instrument(skip(self, ys))]
427 async fn get_proofs_by_ys(
428 &self,
429 ys: Vec<PublicKey>,
430 ) -> Result<Vec<ProofInfo>, database::Error> {
431 if ys.is_empty() {
432 return Ok(Vec::new());
433 }
434
435 let read_txn = self.db.begin_read().map_err(Error::from)?;
436 let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
437
438 let mut proofs = Vec::new();
439
440 for y in ys {
441 if let Some(proof) = table.get(y.to_bytes().as_slice()).map_err(Error::from)? {
442 let proof_info =
443 serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
444 proofs.push(proof_info);
445 }
446 }
447
448 Ok(proofs)
449 }
450
451 async fn get_balance(
452 &self,
453 mint_url: Option<MintUrl>,
454 unit: Option<CurrencyUnit>,
455 state: Option<Vec<State>>,
456 ) -> Result<u64, database::Error> {
457 let proofs = self.get_proofs(mint_url, unit, state, None).await?;
460 Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
461 }
462
463 #[instrument(skip(self))]
464 async fn get_transaction(
465 &self,
466 transaction_id: TransactionId,
467 ) -> Result<Option<Transaction>, database::Error> {
468 let read_txn = self.db.begin_read().map_err(Error::from)?;
469 let table = read_txn
470 .open_table(TRANSACTIONS_TABLE)
471 .map_err(Error::from)?;
472
473 if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
474 return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
475 }
476
477 Ok(None)
478 }
479
480 #[instrument(skip(self))]
481 async fn list_transactions(
482 &self,
483 mint_url: Option<MintUrl>,
484 direction: Option<TransactionDirection>,
485 unit: Option<CurrencyUnit>,
486 ) -> Result<Vec<Transaction>, database::Error> {
487 let read_txn = self.db.begin_read().map_err(Error::from)?;
488
489 let table = read_txn
490 .open_table(TRANSACTIONS_TABLE)
491 .map_err(Error::from)?;
492
493 let transactions: Vec<Transaction> = table
494 .iter()
495 .map_err(Error::from)?
496 .flatten()
497 .filter_map(|(_k, v)| {
498 let mut transaction = None;
499
500 if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
501 if tx.matches_conditions(&mint_url, &direction, &unit) {
502 transaction = Some(tx)
503 }
504 }
505
506 transaction
507 })
508 .collect();
509
510 Ok(transactions)
511 }
512
513 #[instrument(skip(self, added, removed_ys))]
514 async fn update_proofs(
515 &self,
516 added: Vec<ProofInfo>,
517 removed_ys: Vec<PublicKey>,
518 ) -> Result<(), database::Error> {
519 let write_txn = self.db.begin_write().map_err(Error::from)?;
520 {
521 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
522
523 for proof_info in added.iter() {
524 table
525 .insert(
526 proof_info.y.to_bytes().as_slice(),
527 serde_json::to_string(&proof_info)
528 .map_err(Error::from)?
529 .as_str(),
530 )
531 .map_err(Error::from)?;
532 }
533
534 for y in removed_ys.iter() {
535 table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
536 }
537 }
538 write_txn.commit().map_err(Error::from)?;
539 Ok(())
540 }
541
542 async fn update_proofs_state(
543 &self,
544 ys: Vec<PublicKey>,
545 state: State,
546 ) -> Result<(), database::Error> {
547 let write_txn = self.db.begin_write().map_err(Error::from)?;
548 {
549 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
550
551 for y in ys {
552 let y_slice = y.to_bytes();
553 let proof = table
554 .get(y_slice.as_slice())
555 .map_err(Error::from)?
556 .ok_or(Error::UnknownY)?;
557
558 let mut proof_info =
559 serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
560 drop(proof);
561
562 proof_info.state = state;
563
564 table
565 .insert(
566 y_slice.as_slice(),
567 serde_json::to_string(&proof_info)
568 .map_err(Error::from)?
569 .as_str(),
570 )
571 .map_err(Error::from)?;
572 }
573 }
574 write_txn.commit().map_err(Error::from)?;
575 Ok(())
576 }
577
578 #[instrument(skip(self))]
579 async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
580 let id = transaction.id();
581 let write_txn = self.db.begin_write().map_err(Error::from)?;
582 {
583 let mut table = write_txn
584 .open_table(TRANSACTIONS_TABLE)
585 .map_err(Error::from)?;
586 table
587 .insert(
588 id.as_slice(),
589 serde_json::to_string(&transaction)
590 .map_err(Error::from)?
591 .as_str(),
592 )
593 .map_err(Error::from)?;
594 }
595 write_txn.commit().map_err(Error::from)?;
596 Ok(())
597 }
598
599 #[instrument(skip(self))]
600 async fn update_mint_url(
601 &self,
602 old_mint_url: MintUrl,
603 new_mint_url: MintUrl,
604 ) -> Result<(), database::Error> {
605 let write_txn = self.db.begin_write().map_err(Error::from)?;
606
607 {
609 let read_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
610 let proofs: Vec<ProofInfo> = read_table
611 .iter()
612 .map_err(Error::from)?
613 .flatten()
614 .filter_map(|(_k, v)| {
615 let proof_info = serde_json::from_str::<ProofInfo>(v.value()).ok()?;
616 if proof_info.mint_url == old_mint_url {
617 Some(proof_info)
618 } else {
619 None
620 }
621 })
622 .collect();
623 drop(read_table);
624
625 if !proofs.is_empty() {
626 let mut write_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
627 for mut proof_info in proofs {
628 proof_info.mint_url = new_mint_url.clone();
629 write_table
630 .insert(
631 proof_info.y.to_bytes().as_slice(),
632 serde_json::to_string(&proof_info)
633 .map_err(Error::from)?
634 .as_str(),
635 )
636 .map_err(Error::from)?;
637 }
638 }
639 }
640
641 {
643 let mut table = write_txn
644 .open_table(MINT_QUOTES_TABLE)
645 .map_err(Error::from)?;
646
647 let unix_time = unix_time();
648
649 let quotes: Vec<MintQuote> = table
650 .iter()
651 .map_err(Error::from)?
652 .flatten()
653 .filter_map(|(_, quote)| {
654 let mut q: MintQuote = serde_json::from_str(quote.value()).ok()?;
655 if q.mint_url == old_mint_url && q.expiry >= unix_time {
656 q.mint_url = new_mint_url.clone();
657 Some(q)
658 } else {
659 None
660 }
661 })
662 .collect();
663
664 for quote in quotes {
665 table
666 .insert(
667 quote.id.as_str(),
668 serde_json::to_string("e).map_err(Error::from)?.as_str(),
669 )
670 .map_err(Error::from)?;
671 }
672 }
673
674 write_txn.commit().map_err(Error::from)?;
675 Ok(())
676 }
677
678 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
679 async fn increment_keyset_counter(
680 &self,
681 keyset_id: &Id,
682 count: u32,
683 ) -> Result<u32, database::Error> {
684 let write_txn = self.db.begin_write().map_err(Error::from)?;
685 let new_counter = {
686 let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
687 let current_counter = table
688 .get(keyset_id.to_string().as_str())
689 .map_err(Error::from)?
690 .map(|x| x.value())
691 .unwrap_or_default();
692
693 let new_counter = current_counter
694 .checked_add(count)
695 .ok_or(database::Error::AmountOverflow)?;
696
697 table
698 .insert(keyset_id.to_string().as_str(), new_counter)
699 .map_err(Error::from)?;
700
701 new_counter
702 };
703 write_txn.commit().map_err(Error::from)?;
704 Ok(new_counter)
705 }
706
707 #[instrument(skip(self))]
708 async fn add_mint(
709 &self,
710 mint_url: MintUrl,
711 mint_info: Option<MintInfo>,
712 ) -> Result<(), database::Error> {
713 let write_txn = self.db.begin_write().map_err(Error::from)?;
714 {
715 let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
716 table
717 .insert(
718 mint_url.to_string().as_str(),
719 serde_json::to_string(&mint_info)
720 .map_err(Error::from)?
721 .as_str(),
722 )
723 .map_err(Error::from)?;
724 }
725 write_txn.commit().map_err(Error::from)?;
726 Ok(())
727 }
728
729 #[instrument(skip(self))]
730 async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
731 let write_txn = self.db.begin_write().map_err(Error::from)?;
732 {
733 let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
734 table
735 .remove(mint_url.to_string().as_str())
736 .map_err(Error::from)?;
737 }
738 write_txn.commit().map_err(Error::from)?;
739 Ok(())
740 }
741
742 #[instrument(skip(self))]
743 async fn add_mint_keysets(
744 &self,
745 mint_url: MintUrl,
746 keysets: Vec<KeySetInfo>,
747 ) -> Result<(), database::Error> {
748 let write_txn = self.db.begin_write().map_err(Error::from)?;
749 {
750 let mut table = write_txn
751 .open_multimap_table(MINT_KEYSETS_TABLE)
752 .map_err(Error::from)?;
753 let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
754 let mut u32_table = write_txn
755 .open_table(KEYSET_U32_MAPPING)
756 .map_err(Error::from)?;
757
758 let mut existing_u32 = false;
759
760 for keyset in keysets {
761 let existing_keyset = {
763 let existing_keyset = keysets_table
764 .get(keyset.id.to_bytes().as_slice())
765 .map_err(Error::from)?;
766
767 existing_keyset.map(|r| r.value().to_string())
768 };
769
770 let existing = u32_table
771 .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
772 .map_err(Error::from)?;
773
774 match existing {
775 None => existing_u32 = false,
776 Some(id) => {
777 let id = Id::from_str(id.value())?;
778
779 if id == keyset.id {
780 existing_u32 = false;
781 } else {
782 existing_u32 = true;
783 break;
784 }
785 }
786 }
787
788 let keyset = if let Some(existing_keyset) = existing_keyset {
789 let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
790
791 existing_keyset.active = keyset.active;
792 existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
793
794 existing_keyset
795 } else {
796 table
797 .insert(
798 mint_url.to_string().as_str(),
799 keyset.id.to_bytes().as_slice(),
800 )
801 .map_err(Error::from)?;
802
803 keyset
804 };
805
806 keysets_table
807 .insert(
808 keyset.id.to_bytes().as_slice(),
809 serde_json::to_string(&keyset)
810 .map_err(Error::from)?
811 .as_str(),
812 )
813 .map_err(Error::from)?;
814 }
815
816 if existing_u32 {
817 tracing::warn!("Keyset already exists for keyset id");
818 return Err(database::Error::Duplicate);
819 }
820 }
821 write_txn.commit().map_err(Error::from)?;
822 Ok(())
823 }
824
825 #[instrument(skip_all)]
826 async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
827 let write_txn = self.db.begin_write().map_err(Error::from)?;
828 {
829 let mut table = write_txn
830 .open_table(MINT_QUOTES_TABLE)
831 .map_err(Error::from)?;
832
833 let existing_quote_json = table
835 .get(quote.id.as_str())
836 .map_err(Error::from)?
837 .map(|v| v.value().to_string());
838
839 let mut quote_to_save = quote.clone();
840
841 if let Some(json) = existing_quote_json {
842 let existing_quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
843
844 if existing_quote.version != quote.version {
845 return Err(database::Error::ConcurrentUpdate);
846 }
847
848 quote_to_save.version = quote.version.wrapping_add(1);
850 }
851
852 table
853 .insert(
854 quote_to_save.id.as_str(),
855 serde_json::to_string("e_to_save)
856 .map_err(Error::from)?
857 .as_str(),
858 )
859 .map_err(Error::from)?;
860 }
861 write_txn.commit().map_err(Error::from)?;
862 Ok(())
863 }
864
865 #[instrument(skip_all)]
866 async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
867 let write_txn = self.db.begin_write().map_err(Error::from)?;
868 {
869 let mut table = write_txn
870 .open_table(MINT_QUOTES_TABLE)
871 .map_err(Error::from)?;
872 table.remove(quote_id).map_err(Error::from)?;
873 }
874 write_txn.commit().map_err(Error::from)?;
875 Ok(())
876 }
877
878 #[instrument(skip_all)]
879 async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
880 let write_txn = self.db.begin_write().map_err(Error::from)?;
881 {
882 let mut table = write_txn
883 .open_table(MELT_QUOTES_TABLE)
884 .map_err(Error::from)?;
885
886 let existing_quote_json = table
888 .get(quote.id.as_str())
889 .map_err(Error::from)?
890 .map(|v| v.value().to_string());
891
892 let mut quote_to_save = quote.clone();
893
894 if let Some(json) = existing_quote_json {
895 let existing_quote: wallet::MeltQuote =
896 serde_json::from_str(&json).map_err(Error::from)?;
897
898 if existing_quote.version != quote.version {
899 return Err(database::Error::ConcurrentUpdate);
900 }
901
902 quote_to_save.version = quote.version.wrapping_add(1);
904 }
905
906 table
907 .insert(
908 quote_to_save.id.as_str(),
909 serde_json::to_string("e_to_save)
910 .map_err(Error::from)?
911 .as_str(),
912 )
913 .map_err(Error::from)?;
914 }
915 write_txn.commit().map_err(Error::from)?;
916 Ok(())
917 }
918
919 #[instrument(skip_all)]
920 async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
921 let write_txn = self.db.begin_write().map_err(Error::from)?;
922 {
923 let mut table = write_txn
924 .open_table(MELT_QUOTES_TABLE)
925 .map_err(Error::from)?;
926 table.remove(quote_id).map_err(Error::from)?;
927 }
928 write_txn.commit().map_err(Error::from)?;
929 Ok(())
930 }
931
932 #[instrument(skip_all)]
933 async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
934 let write_txn = self.db.begin_write().map_err(Error::from)?;
935
936 keyset.verify_id()?;
937
938 {
939 let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
940
941 let existing_keys = table
942 .insert(
943 keyset.id.to_string().as_str(),
944 serde_json::to_string(&keyset.keys)
945 .map_err(Error::from)?
946 .as_str(),
947 )
948 .map_err(Error::from)?
949 .is_some();
950
951 let mut table = write_txn
952 .open_table(KEYSET_U32_MAPPING)
953 .map_err(Error::from)?;
954
955 let existing = table
956 .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
957 .map_err(Error::from)?;
958
959 let existing_u32 = match existing {
960 None => false,
961 Some(id) => {
962 let id = Id::from_str(id.value())?;
963 id != keyset.id
964 }
965 };
966
967 if existing_keys || existing_u32 {
968 tracing::warn!("Keys already exist for keyset id");
969 return Err(database::Error::Duplicate);
970 }
971 }
972 write_txn.commit().map_err(Error::from)?;
973 Ok(())
974 }
975
976 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
977 async fn remove_keys(&self, keyset_id: &Id) -> Result<(), database::Error> {
978 let write_txn = self.db.begin_write().map_err(Error::from)?;
979 {
980 let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
981
982 table
983 .remove(keyset_id.to_string().as_str())
984 .map_err(Error::from)?;
985 }
986 write_txn.commit().map_err(Error::from)?;
987 Ok(())
988 }
989
990 #[instrument(skip(self))]
991 async fn remove_transaction(
992 &self,
993 transaction_id: TransactionId,
994 ) -> Result<(), database::Error> {
995 let write_txn = self.db.begin_write().map_err(Error::from)?;
996 {
997 let mut table = write_txn
998 .open_table(TRANSACTIONS_TABLE)
999 .map_err(Error::from)?;
1000 table
1001 .remove(transaction_id.as_slice())
1002 .map_err(Error::from)?;
1003 }
1004 write_txn.commit().map_err(Error::from)?;
1005 Ok(())
1006 }
1007
1008 #[instrument(skip(self))]
1009 async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
1010 let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
1011 let id_str = saga.id.to_string();
1012
1013 let write_txn = self.db.begin_write().map_err(Error::from)?;
1014 {
1015 let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1016 table
1017 .insert(id_str.as_str(), saga_json.as_str())
1018 .map_err(Error::from)?;
1019 }
1020 write_txn.commit().map_err(Error::from)?;
1021 Ok(())
1022 }
1023
1024 #[instrument(skip(self))]
1025 async fn get_saga(
1026 &self,
1027 id: &uuid::Uuid,
1028 ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1029 let read_txn = self.db.begin_read().map_err(Error::from)?;
1030 let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1031 let id_str = id.to_string();
1032
1033 let result = table
1034 .get(id_str.as_str())
1035 .map_err(Error::from)?
1036 .map(|saga| serde_json::from_str(saga.value()).map_err(Error::from))
1037 .transpose()?;
1038
1039 Ok(result)
1040 }
1041
1042 #[instrument(skip(self))]
1043 async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1044 let id_str = saga.id.to_string();
1045
1046 let expected_version = saga.version.saturating_sub(1);
1049
1050 let write_txn = self.db.begin_write().map_err(Error::from)?;
1051 let updated = {
1052 let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1053
1054 let existing_saga_json = table
1056 .get(id_str.as_str())
1057 .map_err(Error::from)?
1058 .map(|v| v.value().to_string());
1059
1060 match existing_saga_json {
1061 Some(json) => {
1062 let existing_saga: wallet::WalletSaga =
1063 serde_json::from_str(&json).map_err(Error::from)?;
1064
1065 if existing_saga.version != expected_version {
1067 false
1069 } else {
1070 let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
1072 table
1073 .insert(id_str.as_str(), saga_json.as_str())
1074 .map_err(Error::from)?;
1075 true
1076 }
1077 }
1078 None => {
1079 false
1081 }
1082 }
1083 };
1084 write_txn.commit().map_err(Error::from)?;
1085 Ok(updated)
1086 }
1087
1088 #[instrument(skip(self))]
1089 async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1090 let write_txn = self.db.begin_write().map_err(Error::from)?;
1091 let id_str = id.to_string();
1092 {
1093 let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1094 table.remove(id_str.as_str()).map_err(Error::from)?;
1095 }
1096 write_txn.commit().map_err(Error::from)?;
1097 Ok(())
1098 }
1099
1100 #[instrument(skip(self))]
1101 async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1102 let read_txn = self.db.begin_read().map_err(Error::from)?;
1103 let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1104
1105 let mut sagas: Vec<wallet::WalletSaga> = table
1106 .iter()
1107 .map_err(Error::from)?
1108 .flatten()
1109 .filter_map(|(_, saga_json)| {
1110 serde_json::from_str::<wallet::WalletSaga>(saga_json.value()).ok()
1111 })
1112 .collect();
1113
1114 sagas.sort_by_key(|saga| saga.created_at);
1116
1117 Ok(sagas)
1118 }
1119
1120 #[instrument(skip(self))]
1121 async fn reserve_proofs(
1122 &self,
1123 ys: Vec<PublicKey>,
1124 operation_id: &uuid::Uuid,
1125 ) -> Result<(), database::Error> {
1126 let write_txn = self.db.begin_write().map_err(Error::from)?;
1127
1128 {
1129 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1130
1131 for y in ys {
1132 let y_bytes = y.to_bytes();
1133
1134 let proof_json_str = {
1136 let proof_json_opt = table.get(y_bytes.as_slice()).map_err(Error::from)?;
1137 proof_json_opt.map(|proof_json| proof_json.value().to_string())
1138 };
1139
1140 let Some(proof_json_str) = proof_json_str else {
1141 return Err(database::Error::ProofNotUnspent);
1142 };
1143
1144 let mut proof: ProofInfo =
1145 serde_json::from_str(&proof_json_str).map_err(Error::from)?;
1146
1147 if proof.state != State::Unspent {
1148 return Err(database::Error::ProofNotUnspent);
1149 }
1150
1151 proof.state = State::Reserved;
1152 proof.used_by_operation = Some(*operation_id);
1153
1154 let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1155 table
1156 .insert(y_bytes.as_slice(), updated_json.as_str())
1157 .map_err(Error::from)?;
1158 }
1159 }
1160
1161 write_txn.commit().map_err(Error::from)?;
1162 Ok(())
1163 }
1164
1165 #[instrument(skip(self))]
1166 async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1167 let write_txn = self.db.begin_write().map_err(Error::from)?;
1168
1169 {
1170 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1171
1172 let all_proofs: Vec<(Vec<u8>, ProofInfo)> = table
1174 .iter()
1175 .map_err(Error::from)?
1176 .flatten()
1177 .filter_map(|(y, proof_json)| {
1178 let proof: ProofInfo = serde_json::from_str(proof_json.value()).ok()?;
1179 Some((y.value().to_vec(), proof))
1180 })
1181 .collect();
1182
1183 for (y_bytes, mut proof) in all_proofs {
1185 if proof.used_by_operation == Some(*operation_id) {
1186 proof.state = State::Unspent;
1187 proof.used_by_operation = None;
1188
1189 let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1190 table
1191 .insert(y_bytes.as_slice(), updated_json.as_str())
1192 .map_err(Error::from)?;
1193 }
1194 }
1195 }
1196
1197 write_txn.commit().map_err(Error::from)?;
1198 Ok(())
1199 }
1200
1201 #[instrument(skip(self))]
1202 async fn get_reserved_proofs(
1203 &self,
1204 operation_id: &uuid::Uuid,
1205 ) -> Result<Vec<ProofInfo>, database::Error> {
1206 let read_txn = self.db.begin_read().map_err(Error::from)?;
1207 let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1208
1209 let proofs: Vec<ProofInfo> = table
1210 .iter()
1211 .map_err(Error::from)?
1212 .flatten()
1213 .filter_map(|(_, proof_json)| {
1214 serde_json::from_str::<ProofInfo>(proof_json.value()).ok()
1215 })
1216 .filter(|proof| proof.used_by_operation == Some(*operation_id))
1217 .collect();
1218
1219 Ok(proofs)
1220 }
1221
1222 #[instrument(skip(self))]
1223 async fn reserve_melt_quote(
1224 &self,
1225 quote_id: &str,
1226 operation_id: &uuid::Uuid,
1227 ) -> Result<(), database::Error> {
1228 let write_txn = self.db.begin_write().map_err(Error::from)?;
1229 let operation_id_str = operation_id.to_string();
1230
1231 {
1232 let mut table = write_txn
1233 .open_table(MELT_QUOTES_TABLE)
1234 .map_err(Error::from)?;
1235
1236 let quote_json = table
1238 .get(quote_id)
1239 .map_err(Error::from)?
1240 .map(|v| v.value().to_string());
1241
1242 match quote_json {
1243 Some(json) => {
1244 let mut quote: wallet::MeltQuote =
1245 serde_json::from_str(&json).map_err(Error::from)?;
1246
1247 if quote.used_by_operation.is_some() {
1249 return Err(database::Error::QuoteAlreadyInUse);
1250 }
1251
1252 quote.used_by_operation = Some(operation_id_str);
1254 let updated_json = serde_json::to_string("e).map_err(Error::from)?;
1255 table
1256 .insert(quote_id, updated_json.as_str())
1257 .map_err(Error::from)?;
1258 }
1259 None => {
1260 return Err(database::Error::UnknownQuote);
1261 }
1262 }
1263 }
1264
1265 write_txn.commit().map_err(Error::from)?;
1266 Ok(())
1267 }
1268
1269 #[instrument(skip(self))]
1270 async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1271 let write_txn = self.db.begin_write().map_err(Error::from)?;
1272 let operation_id_str = operation_id.to_string();
1273
1274 {
1275 let mut table = write_txn
1276 .open_table(MELT_QUOTES_TABLE)
1277 .map_err(Error::from)?;
1278
1279 let all_quotes: Vec<(String, wallet::MeltQuote)> = table
1281 .iter()
1282 .map_err(Error::from)?
1283 .flatten()
1284 .filter_map(|(id, quote_json)| {
1285 let quote: wallet::MeltQuote = serde_json::from_str(quote_json.value()).ok()?;
1286 Some((id.value().to_string(), quote))
1287 })
1288 .collect();
1289
1290 for (quote_id, mut quote) in all_quotes {
1292 if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1293 quote.used_by_operation = None;
1294 let updated_json = serde_json::to_string("e).map_err(Error::from)?;
1295 table
1296 .insert(quote_id.as_str(), updated_json.as_str())
1297 .map_err(Error::from)?;
1298 }
1299 }
1300 }
1301
1302 write_txn.commit().map_err(Error::from)?;
1303 Ok(())
1304 }
1305
1306 #[instrument(skip(self))]
1307 async fn reserve_mint_quote(
1308 &self,
1309 quote_id: &str,
1310 operation_id: &uuid::Uuid,
1311 ) -> Result<(), database::Error> {
1312 let write_txn = self.db.begin_write().map_err(Error::from)?;
1313 let operation_id_str = operation_id.to_string();
1314
1315 {
1316 let mut table = write_txn
1317 .open_table(MINT_QUOTES_TABLE)
1318 .map_err(Error::from)?;
1319
1320 let quote_json = table
1322 .get(quote_id)
1323 .map_err(Error::from)?
1324 .map(|v| v.value().to_string());
1325
1326 match quote_json {
1327 Some(json) => {
1328 let mut quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
1329
1330 if quote.used_by_operation.is_some() {
1332 return Err(database::Error::QuoteAlreadyInUse);
1333 }
1334
1335 quote.used_by_operation = Some(operation_id_str);
1337 let updated_json = serde_json::to_string("e).map_err(Error::from)?;
1338 table
1339 .insert(quote_id, updated_json.as_str())
1340 .map_err(Error::from)?;
1341 }
1342 None => {
1343 return Err(database::Error::UnknownQuote);
1344 }
1345 }
1346 }
1347
1348 write_txn.commit().map_err(Error::from)?;
1349 Ok(())
1350 }
1351
1352 #[instrument(skip(self))]
1353 async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1354 let write_txn = self.db.begin_write().map_err(Error::from)?;
1355 let operation_id_str = operation_id.to_string();
1356
1357 {
1358 let mut table = write_txn
1359 .open_table(MINT_QUOTES_TABLE)
1360 .map_err(Error::from)?;
1361
1362 let all_quotes: Vec<(String, MintQuote)> = table
1364 .iter()
1365 .map_err(Error::from)?
1366 .flatten()
1367 .filter_map(|(id, quote_json)| {
1368 let quote: MintQuote = serde_json::from_str(quote_json.value()).ok()?;
1369 Some((id.value().to_string(), quote))
1370 })
1371 .collect();
1372
1373 for (quote_id, mut quote) in all_quotes {
1375 if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1376 quote.used_by_operation = None;
1377 let updated_json = serde_json::to_string("e).map_err(Error::from)?;
1378 table
1379 .insert(quote_id.as_str(), updated_json.as_str())
1380 .map_err(Error::from)?;
1381 }
1382 }
1383 }
1384
1385 write_txn.commit().map_err(Error::from)?;
1386 Ok(())
1387 }
1388
1389 #[instrument(skip(self, value))]
1390 async fn kv_write(
1391 &self,
1392 primary_namespace: &str,
1393 secondary_namespace: &str,
1394 key: &str,
1395 value: &[u8],
1396 ) -> Result<(), database::Error> {
1397 validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1399
1400 let write_txn = self.db.begin_write().map_err(Error::from)?;
1401 {
1402 let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1403 table
1404 .insert((primary_namespace, secondary_namespace, key), value)
1405 .map_err(Error::from)?;
1406 }
1407 write_txn.commit().map_err(Error::from)?;
1408
1409 Ok(())
1410 }
1411
1412 #[instrument(skip(self))]
1413 async fn kv_read(
1414 &self,
1415 primary_namespace: &str,
1416 secondary_namespace: &str,
1417 key: &str,
1418 ) -> Result<Option<Vec<u8>>, database::Error> {
1419 validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1421
1422 let read_txn = self.db.begin_read().map_err(Error::from)?;
1423 let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1424
1425 let result = table
1426 .get((primary_namespace, secondary_namespace, key))
1427 .map_err(Error::from)?
1428 .map(|v| v.value().to_vec());
1429
1430 Ok(result)
1431 }
1432
1433 #[instrument(skip(self))]
1434 async fn kv_list(
1435 &self,
1436 primary_namespace: &str,
1437 secondary_namespace: &str,
1438 ) -> Result<Vec<String>, database::Error> {
1439 validate_kvstore_params(primary_namespace, secondary_namespace, None)?;
1441
1442 let read_txn = self.db.begin_read().map_err(Error::from)?;
1443 let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1444
1445 let start = (primary_namespace, secondary_namespace, "");
1446 let iter = table.range(start..).map_err(Error::from)?;
1447
1448 let mut keys = Vec::new();
1449
1450 for item in iter {
1451 let (key, _) = item.map_err(Error::from)?;
1452 let (p, s, k) = key.value();
1453 if p == primary_namespace && s == secondary_namespace {
1454 keys.push(k.to_string());
1455 } else {
1456 break;
1457 }
1458 }
1459
1460 Ok(keys)
1461 }
1462
1463 #[instrument(skip(self))]
1464 async fn kv_remove(
1465 &self,
1466 primary_namespace: &str,
1467 secondary_namespace: &str,
1468 key: &str,
1469 ) -> Result<(), database::Error> {
1470 validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1472
1473 let write_txn = self.db.begin_write().map_err(Error::from)?;
1474 {
1475 let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1476 table
1477 .remove((primary_namespace, secondary_namespace, key))
1478 .map_err(Error::from)?;
1479 }
1480 write_txn.commit().map_err(Error::from)?;
1481
1482 Ok(())
1483 }
1484
1485 #[instrument(skip(self))]
1486 async fn add_p2pk_key(
1487 &self,
1488 pubkey: &PublicKey,
1489 derivation_path: DerivationPath,
1490 derivation_index: u32,
1491 ) -> Result<(), database::Error> {
1492 let write_txn = self.db.begin_write().map_err(Error::from)?;
1493 {
1494 let mut table = write_txn
1495 .open_table(P2PK_SIGNING_KEYS_TABLE)
1496 .map_err(Error::from)?;
1497 table
1498 .insert(
1499 pubkey.to_bytes().as_slice(),
1500 serde_json::to_string(&wallet::P2PKSigningKey {
1501 pubkey: *pubkey,
1502 derivation_path,
1503 derivation_index,
1504 created_time: unix_time(),
1505 })
1506 .map_err(Error::from)?
1507 .as_str(),
1508 )
1509 .map_err(Error::from)?;
1510 }
1511 write_txn.commit().map_err(Error::from)?;
1512 Ok(())
1513 }
1514
1515 #[instrument(skip(self))]
1516 async fn get_p2pk_key(
1517 &self,
1518 pubkey: &PublicKey,
1519 ) -> Result<Option<wallet::P2PKSigningKey>, database::Error> {
1520 let read_txn = self.db.begin_read().map_err(Error::from)?;
1521 let table = read_txn
1522 .open_table(P2PK_SIGNING_KEYS_TABLE)
1523 .map_err(Error::from)?;
1524
1525 if let Some(key) = table
1526 .get(pubkey.to_bytes().as_slice())
1527 .map_err(Error::from)?
1528 {
1529 return Ok(Some(
1530 serde_json::from_str(key.value()).map_err(Error::from)?,
1531 ));
1532 }
1533
1534 Ok(None)
1535 }
1536
1537 #[instrument(skip(self))]
1538 async fn list_p2pk_keys(&self) -> Result<Vec<wallet::P2PKSigningKey>, database::Error> {
1539 let read_txn = self.db.begin_read().map_err(Error::from)?;
1540 let table = read_txn
1541 .open_table(P2PK_SIGNING_KEYS_TABLE)
1542 .map_err(Error::from)?;
1543
1544 let keys: Vec<wallet::P2PKSigningKey> = table
1545 .iter()
1546 .map_err(Error::from)?
1547 .flatten()
1548 .filter_map(|(_k, v)| {
1549 if let Ok(key) = serde_json::from_str::<wallet::P2PKSigningKey>(v.value()) {
1550 return Some(key);
1551 }
1552
1553 None
1554 })
1555 .collect();
1556
1557 Ok(keys)
1558 }
1559
1560 #[instrument(skip(self))]
1561 async fn latest_p2pk(&self) -> Result<Option<wallet::P2PKSigningKey>, database::Error> {
1562 let read_txn = self.db.begin_read().map_err(Error::from)?;
1563 let table = read_txn
1564 .open_table(P2PK_SIGNING_KEYS_TABLE)
1565 .map_err(Error::from)?;
1566
1567 let latest_key = table
1568 .iter()
1569 .map_err(Error::from)?
1570 .flatten()
1571 .filter_map(|(_k, v)| serde_json::from_str::<wallet::P2PKSigningKey>(v.value()).ok())
1572 .max_by_key(|key| key.derivation_index);
1573
1574 Ok(latest_key)
1575 }
1576}
1577
1578#[cfg(test)]
1579mod test {
1580 use std::path::PathBuf;
1581 use std::str::FromStr;
1582
1583 use cdk_common::database::{self, WalletDatabase};
1584 use cdk_common::{wallet_db_test, Id};
1585
1586 use super::WalletRedbDatabase;
1587
1588 async fn provide_db(test_id: String) -> WalletRedbDatabase {
1589 let path = PathBuf::from(format!("/tmp/cdk-test-{}.redb", test_id));
1590 WalletRedbDatabase::new(&path).expect("database")
1591 }
1592
1593 wallet_db_test!(provide_db);
1594
1595 #[tokio::test]
1596 async fn increment_keyset_counter_returns_error_on_overflow() {
1597 let db = provide_db(format!("counter-overflow-{}", uuid::Uuid::new_v4())).await;
1598 let keyset_id = Id::from_str("00916bbf7ef91a36").expect("valid keyset id");
1599
1600 let first = db
1601 .increment_keyset_counter(&keyset_id, u32::MAX)
1602 .await
1603 .expect("first increment should fit");
1604 assert_eq!(first, u32::MAX);
1605
1606 match db.increment_keyset_counter(&keyset_id, 1).await {
1607 Err(database::Error::AmountOverflow) => {}
1608 Ok(counter) => panic!("counter should not wrap, got {counter}"),
1609 Err(err) => panic!("expected amount overflow, got {err}"),
1610 }
1611 }
1612}