1use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::path::Path;
6use std::str::FromStr;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use cdk_common::common::ProofInfo;
11use cdk_common::database::WalletDatabase;
12use cdk_common::mint_url::MintUrl;
13use cdk_common::util::unix_time;
14use cdk_common::wallet::{self, MintQuote, Transaction, TransactionDirection, TransactionId};
15use cdk_common::{
16 database, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PublicKey, SpendingConditions,
17 State,
18};
19use redb::{Database, MultimapTableDefinition, ReadableTable, TableDefinition};
20use tracing::instrument;
21
22use super::error::Error;
23use crate::migrations::migrate_00_to_01;
24use crate::wallet::migrations::{migrate_01_to_02, migrate_02_to_03, migrate_03_to_04};
25
26mod migrations;
27
28const MINTS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mints_table");
30const MINT_KEYSETS_TABLE: MultimapTableDefinition<&str, &[u8]> =
32 MultimapTableDefinition::new("mint_keysets");
33const KEYSETS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("keysets");
35const MINT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_quotes");
37const MELT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("melt_quotes");
39const MINT_KEYS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_keys");
40const PROOFS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("proofs");
42const CONFIG_TABLE: TableDefinition<&str, &str> = TableDefinition::new("config");
43const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_counter");
44const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
46
47const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
48
49const DATABASE_VERSION: u32 = 4;
50
51#[derive(Debug, Clone)]
53pub struct WalletRedbDatabase {
54 db: Arc<Database>,
55}
56
57impl WalletRedbDatabase {
58 pub fn new(path: &Path) -> Result<Self, Error> {
60 {
61 let db = Arc::new(Database::create(path)?);
62
63 let db_version: Option<String>;
64 {
65 let read_txn = db.begin_read()?;
67 let table = read_txn.open_table(CONFIG_TABLE);
68
69 db_version = match table {
70 Ok(table) => table.get("db_version")?.map(|v| v.value().to_string()),
71 Err(_) => None,
72 };
73 }
74
75 match db_version {
76 Some(db_version) => {
77 let mut current_file_version = u32::from_str(&db_version)?;
78 tracing::info!("Current file version {}", current_file_version);
79
80 match current_file_version.cmp(&DATABASE_VERSION) {
81 Ordering::Less => {
82 tracing::info!(
83 "Database needs to be upgraded at {} current is {}",
84 current_file_version,
85 DATABASE_VERSION
86 );
87 if current_file_version == 0 {
88 current_file_version = migrate_00_to_01(Arc::clone(&db))?;
89 }
90
91 if current_file_version == 1 {
92 current_file_version = migrate_01_to_02(Arc::clone(&db))?;
93 }
94
95 if current_file_version == 2 {
96 current_file_version = migrate_02_to_03(Arc::clone(&db))?;
97 }
98
99 if current_file_version == 3 {
100 current_file_version = migrate_03_to_04(Arc::clone(&db))?;
101 }
102
103 if current_file_version != DATABASE_VERSION {
104 tracing::warn!(
105 "Database upgrade did not complete at {} current is {}",
106 current_file_version,
107 DATABASE_VERSION
108 );
109 return Err(Error::UnknownDatabaseVersion);
110 }
111
112 let write_txn = db.begin_write()?;
113 {
114 let mut table = write_txn.open_table(CONFIG_TABLE)?;
115
116 table
117 .insert("db_version", DATABASE_VERSION.to_string().as_str())?;
118 }
119
120 write_txn.commit()?;
121 }
122 Ordering::Equal => {
123 tracing::info!("Database is at current version {}", DATABASE_VERSION);
124 }
125 Ordering::Greater => {
126 tracing::warn!(
127 "Database upgrade did not complete at {} current is {}",
128 current_file_version,
129 DATABASE_VERSION
130 );
131 return Err(Error::UnknownDatabaseVersion);
132 }
133 }
134 }
135 None => {
136 let write_txn = db.begin_write()?;
137 {
138 let mut table = write_txn.open_table(CONFIG_TABLE)?;
139 let _ = write_txn.open_table(MINTS_TABLE)?;
141 let _ = write_txn.open_multimap_table(MINT_KEYSETS_TABLE)?;
142 let _ = write_txn.open_table(KEYSETS_TABLE)?;
143 let _ = write_txn.open_table(MINT_QUOTES_TABLE)?;
144 let _ = write_txn.open_table(MELT_QUOTES_TABLE)?;
145 let _ = write_txn.open_table(MINT_KEYS_TABLE)?;
146 let _ = write_txn.open_table(PROOFS_TABLE)?;
147 let _ = write_txn.open_table(KEYSET_COUNTER)?;
148 let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
149 let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
150 table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
151 }
152
153 write_txn.commit()?;
154 }
155 }
156 drop(db);
157 }
158
159 let db = Database::create(path)?;
160
161 Ok(Self { db: Arc::new(db) })
162 }
163}
164
165#[async_trait]
166impl WalletDatabase for WalletRedbDatabase {
167 type Err = database::Error;
168
169 #[instrument(skip(self))]
170 async fn add_mint(
171 &self,
172 mint_url: MintUrl,
173 mint_info: Option<MintInfo>,
174 ) -> Result<(), Self::Err> {
175 let write_txn = self.db.begin_write().map_err(Error::from)?;
176
177 {
178 let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
179 table
180 .insert(
181 mint_url.to_string().as_str(),
182 serde_json::to_string(&mint_info)
183 .map_err(Error::from)?
184 .as_str(),
185 )
186 .map_err(Error::from)?;
187 }
188 write_txn.commit().map_err(Error::from)?;
189
190 Ok(())
191 }
192
193 #[instrument(skip(self))]
194 async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), Self::Err> {
195 let write_txn = self.db.begin_write().map_err(Error::from)?;
196
197 {
198 let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
199 table
200 .remove(mint_url.to_string().as_str())
201 .map_err(Error::from)?;
202 }
203 write_txn.commit().map_err(Error::from)?;
204
205 Ok(())
206 }
207
208 #[instrument(skip(self))]
209 async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, Self::Err> {
210 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
211 let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
212
213 if let Some(mint_info) = table
214 .get(mint_url.to_string().as_str())
215 .map_err(Error::from)?
216 {
217 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
218 }
219
220 Ok(None)
221 }
222
223 #[instrument(skip(self))]
224 async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, Self::Err> {
225 let read_txn = self.db.begin_read().map_err(Error::from)?;
226 let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
227 let mints = table
228 .iter()
229 .map_err(Error::from)?
230 .flatten()
231 .map(|(mint, mint_info)| {
232 (
233 MintUrl::from_str(mint.value()).unwrap(),
234 serde_json::from_str(mint_info.value()).ok(),
235 )
236 })
237 .collect();
238
239 Ok(mints)
240 }
241
242 #[instrument(skip(self))]
243 async fn update_mint_url(
244 &self,
245 old_mint_url: MintUrl,
246 new_mint_url: MintUrl,
247 ) -> Result<(), Self::Err> {
248 {
250 let proofs = self
251 .get_proofs(Some(old_mint_url.clone()), None, None, None)
252 .await
253 .map_err(Error::from)?;
254
255 let updated_proofs: Vec<ProofInfo> = proofs
257 .clone()
258 .into_iter()
259 .map(|mut p| {
260 p.mint_url = new_mint_url.clone();
261 p
262 })
263 .collect();
264
265 if !updated_proofs.is_empty() {
266 self.update_proofs(updated_proofs, vec![]).await?;
267 }
268 }
269
270 {
272 let quotes = self.get_mint_quotes().await?;
273
274 let unix_time = unix_time();
275
276 let quotes: Vec<MintQuote> = quotes
277 .into_iter()
278 .filter_map(|mut q| {
279 if q.expiry < unix_time {
280 q.mint_url = new_mint_url.clone();
281 Some(q)
282 } else {
283 None
284 }
285 })
286 .collect();
287
288 for quote in quotes {
289 self.add_mint_quote(quote).await?;
290 }
291 }
292
293 Ok(())
294 }
295
296 #[instrument(skip(self))]
297 async fn add_mint_keysets(
298 &self,
299 mint_url: MintUrl,
300 keysets: Vec<KeySetInfo>,
301 ) -> Result<(), Self::Err> {
302 let write_txn = self.db.begin_write().map_err(Error::from)?;
303
304 let mut existing_u32 = false;
305
306 {
307 let mut table = write_txn
308 .open_multimap_table(MINT_KEYSETS_TABLE)
309 .map_err(Error::from)?;
310 let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
311 let mut u32_table = write_txn
312 .open_table(KEYSET_U32_MAPPING)
313 .map_err(Error::from)?;
314
315 for keyset in keysets {
316 let existing_keyset = {
318 let existing_keyset = keysets_table
319 .get(keyset.id.to_bytes().as_slice())
320 .map_err(Error::from)?;
321
322 existing_keyset.map(|r| r.value().to_string())
323 };
324
325 let existing = u32_table
326 .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
327 .map_err(Error::from)?;
328
329 match existing {
330 None => existing_u32 = false,
331 Some(id) => {
332 let id = Id::from_str(id.value())?;
333
334 if id == keyset.id {
335 existing_u32 = false;
336 } else {
337 println!("Breaking here");
338 existing_u32 = true;
339 break;
340 }
341 }
342 }
343
344 let keyset = if let Some(existing_keyset) = existing_keyset {
345 let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
346
347 existing_keyset.active = keyset.active;
348 existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
349
350 existing_keyset
351 } else {
352 table
353 .insert(
354 mint_url.to_string().as_str(),
355 keyset.id.to_bytes().as_slice(),
356 )
357 .map_err(Error::from)?;
358
359 keyset
360 };
361
362 keysets_table
363 .insert(
364 keyset.id.to_bytes().as_slice(),
365 serde_json::to_string(&keyset)
366 .map_err(Error::from)?
367 .as_str(),
368 )
369 .map_err(Error::from)?;
370 }
371 }
372
373 if existing_u32 {
374 tracing::warn!("Keyset already exists for keyset id");
375 write_txn.abort().map_err(Error::from)?;
376
377 return Err(database::Error::Duplicate);
378 }
379
380 write_txn.commit().map_err(Error::from)?;
381
382 Ok(())
383 }
384
385 #[instrument(skip(self))]
386 async fn get_mint_keysets(
387 &self,
388 mint_url: MintUrl,
389 ) -> Result<Option<Vec<KeySetInfo>>, Self::Err> {
390 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
391 let table = read_txn
392 .open_multimap_table(MINT_KEYSETS_TABLE)
393 .map_err(Error::from)?;
394
395 let keyset_ids = table
396 .get(mint_url.to_string().as_str())
397 .map_err(Error::from)?
398 .flatten()
399 .map(|k| Id::from_bytes(k.value()))
400 .collect::<Result<Vec<_>, _>>()?;
401
402 let mut keysets = vec![];
403
404 let keysets_t = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
405
406 for keyset_id in keyset_ids {
407 if let Some(keyset) = keysets_t
408 .get(keyset_id.to_bytes().as_slice())
409 .map_err(Error::from)?
410 {
411 let keyset = serde_json::from_str(keyset.value()).map_err(Error::from)?;
412
413 keysets.push(keyset);
414 }
415 }
416
417 match keysets.is_empty() {
418 true => Ok(None),
419 false => Ok(Some(keysets)),
420 }
421 }
422
423 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
424 async fn get_keyset_by_id(&self, keyset_id: &Id) -> Result<Option<KeySetInfo>, Self::Err> {
425 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
426 let table = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
427
428 match table
429 .get(keyset_id.to_bytes().as_slice())
430 .map_err(Error::from)?
431 {
432 Some(keyset) => {
433 let keyset: KeySetInfo =
434 serde_json::from_str(keyset.value()).map_err(Error::from)?;
435
436 Ok(Some(keyset))
437 }
438 None => Ok(None),
439 }
440 }
441
442 #[instrument(skip_all)]
443 async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), Self::Err> {
444 let write_txn = self.db.begin_write().map_err(Error::from)?;
445
446 {
447 let mut table = write_txn
448 .open_table(MINT_QUOTES_TABLE)
449 .map_err(Error::from)?;
450 table
451 .insert(
452 quote.id.as_str(),
453 serde_json::to_string("e).map_err(Error::from)?.as_str(),
454 )
455 .map_err(Error::from)?;
456 }
457
458 write_txn.commit().map_err(Error::from)?;
459
460 Ok(())
461 }
462
463 #[instrument(skip_all)]
464 async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, Self::Err> {
465 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
466 let table = read_txn
467 .open_table(MINT_QUOTES_TABLE)
468 .map_err(Error::from)?;
469
470 if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
471 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
472 }
473
474 Ok(None)
475 }
476
477 #[instrument(skip_all)]
478 async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
479 let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
480 let table = read_txn
481 .open_table(MINT_QUOTES_TABLE)
482 .map_err(Error::from)?;
483
484 Ok(table
485 .iter()
486 .map_err(Error::from)?
487 .flatten()
488 .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
489 .collect())
490 }
491
492 #[instrument(skip_all)]
493 async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
494 let write_txn = self.db.begin_write().map_err(Error::from)?;
495
496 {
497 let mut table = write_txn
498 .open_table(MINT_QUOTES_TABLE)
499 .map_err(Error::from)?;
500 table.remove(quote_id).map_err(Error::from)?;
501 }
502
503 write_txn.commit().map_err(Error::from)?;
504
505 Ok(())
506 }
507
508 #[instrument(skip_all)]
509 async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), Self::Err> {
510 let write_txn = self.db.begin_write().map_err(Error::from)?;
511
512 {
513 let mut table = write_txn
514 .open_table(MELT_QUOTES_TABLE)
515 .map_err(Error::from)?;
516 table
517 .insert(
518 quote.id.as_str(),
519 serde_json::to_string("e).map_err(Error::from)?.as_str(),
520 )
521 .map_err(Error::from)?;
522 }
523
524 write_txn.commit().map_err(Error::from)?;
525
526 Ok(())
527 }
528
529 #[instrument(skip_all)]
530 async fn get_melt_quote(&self, quote_id: &str) -> Result<Option<wallet::MeltQuote>, Self::Err> {
531 let read_txn = self.db.begin_read().map_err(Error::from)?;
532 let table = read_txn
533 .open_table(MELT_QUOTES_TABLE)
534 .map_err(Error::from)?;
535
536 if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
537 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
538 }
539
540 Ok(None)
541 }
542
543 #[instrument(skip_all)]
544 async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, Self::Err> {
545 let read_txn = self.db.begin_read().map_err(Error::from)?;
546 let table = read_txn
547 .open_table(MELT_QUOTES_TABLE)
548 .map_err(Error::from)?;
549
550 Ok(table
551 .iter()
552 .map_err(Error::from)?
553 .flatten()
554 .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
555 .collect())
556 }
557
558 #[instrument(skip_all)]
559 async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
560 let write_txn = self.db.begin_write().map_err(Error::from)?;
561
562 {
563 let mut table = write_txn
564 .open_table(MELT_QUOTES_TABLE)
565 .map_err(Error::from)?;
566 table.remove(quote_id).map_err(Error::from)?;
567 }
568
569 write_txn.commit().map_err(Error::from)?;
570
571 Ok(())
572 }
573
574 #[instrument(skip_all)]
575 async fn add_keys(&self, keyset: KeySet) -> Result<(), Self::Err> {
576 let write_txn = self.db.begin_write().map_err(Error::from)?;
577
578 keyset.verify_id()?;
579
580 let existing_keys;
581 let existing_u32;
582
583 {
584 let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
585
586 existing_keys = table
587 .insert(
588 keyset.id.to_string().as_str(),
589 serde_json::to_string(&keyset.keys)
590 .map_err(Error::from)?
591 .as_str(),
592 )
593 .map_err(Error::from)?
594 .is_some();
595
596 let mut table = write_txn
597 .open_table(KEYSET_U32_MAPPING)
598 .map_err(Error::from)?;
599
600 let existing = table
601 .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
602 .map_err(Error::from)?;
603
604 match existing {
605 None => existing_u32 = false,
606 Some(id) => {
607 let id = Id::from_str(id.value())?;
608
609 existing_u32 = id != keyset.id;
610 }
611 }
612 }
613
614 if existing_keys || existing_u32 {
615 tracing::warn!("Keys already exist for keyset id");
616 write_txn.abort().map_err(Error::from)?;
617
618 return Err(database::Error::Duplicate);
619 }
620
621 write_txn.commit().map_err(Error::from)?;
622
623 Ok(())
624 }
625
626 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
627 async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, Self::Err> {
628 let read_txn = self.db.begin_read().map_err(Error::from)?;
629 let table = read_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
630
631 if let Some(mint_info) = table
632 .get(keyset_id.to_string().as_str())
633 .map_err(Error::from)?
634 {
635 return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
636 }
637
638 Ok(None)
639 }
640
641 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
642 async fn remove_keys(&self, keyset_id: &Id) -> Result<(), Self::Err> {
643 let write_txn = self.db.begin_write().map_err(Error::from)?;
644
645 {
646 let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
647
648 table
649 .remove(keyset_id.to_string().as_str())
650 .map_err(Error::from)?;
651 }
652
653 write_txn.commit().map_err(Error::from)?;
654
655 Ok(())
656 }
657
658 #[instrument(skip(self, added, deleted_ys))]
659 async fn update_proofs(
660 &self,
661 added: Vec<ProofInfo>,
662 deleted_ys: Vec<PublicKey>,
663 ) -> Result<(), Self::Err> {
664 let write_txn = self.db.begin_write().map_err(Error::from)?;
665
666 {
667 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
668
669 for proof_info in added.iter() {
670 table
671 .insert(
672 proof_info.y.to_bytes().as_slice(),
673 serde_json::to_string(&proof_info)
674 .map_err(Error::from)?
675 .as_str(),
676 )
677 .map_err(Error::from)?;
678 }
679
680 for y in deleted_ys.iter() {
681 table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
682 }
683 }
684 write_txn.commit().map_err(Error::from)?;
685
686 Ok(())
687 }
688
689 #[instrument(skip_all)]
690 async fn get_proofs(
691 &self,
692 mint_url: Option<MintUrl>,
693 unit: Option<CurrencyUnit>,
694 state: Option<Vec<State>>,
695 spending_conditions: Option<Vec<SpendingConditions>>,
696 ) -> Result<Vec<ProofInfo>, Self::Err> {
697 let read_txn = self.db.begin_read().map_err(Error::from)?;
698
699 let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
700
701 let proofs: Vec<ProofInfo> = table
702 .iter()
703 .map_err(Error::from)?
704 .flatten()
705 .filter_map(|(_k, v)| {
706 let mut proof = None;
707
708 if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
709 if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
710 {
711 proof = Some(proof_info)
712 }
713 }
714
715 proof
716 })
717 .collect();
718
719 Ok(proofs)
720 }
721
722 async fn update_proofs_state(
723 &self,
724 ys: Vec<PublicKey>,
725 state: State,
726 ) -> Result<(), database::Error> {
727 let read_txn = self.db.begin_read().map_err(Error::from)?;
728 let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
729
730 let write_txn = self.db.begin_write().map_err(Error::from)?;
731
732 for y in ys {
733 let y_slice = y.to_bytes();
734 let proof = table
735 .get(y_slice.as_slice())
736 .map_err(Error::from)?
737 .ok_or(Error::UnknownY)?;
738
739 let mut proof_info =
740 serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
741
742 proof_info.state = state;
743
744 {
745 let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
746 table
747 .insert(
748 y_slice.as_slice(),
749 serde_json::to_string(&proof_info)
750 .map_err(Error::from)?
751 .as_str(),
752 )
753 .map_err(Error::from)?;
754 }
755 }
756
757 write_txn.commit().map_err(Error::from)?;
758
759 Ok(())
760 }
761
762 #[instrument(skip(self), fields(keyset_id = %keyset_id))]
763 async fn increment_keyset_counter(&self, keyset_id: &Id, count: u32) -> Result<u32, Self::Err> {
764 let write_txn = self.db.begin_write().map_err(Error::from)?;
765
766 let current_counter;
767 let new_counter;
768 {
769 let table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
770 let counter = table
771 .get(keyset_id.to_string().as_str())
772 .map_err(Error::from)?;
773
774 current_counter = match counter {
775 Some(c) => c.value(),
776 None => 0,
777 };
778
779 new_counter = current_counter + count;
780 }
781
782 {
783 let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
784
785 table
786 .insert(keyset_id.to_string().as_str(), new_counter)
787 .map_err(Error::from)?;
788 }
789 write_txn.commit().map_err(Error::from)?;
790
791 Ok(new_counter)
792 }
793
794 #[instrument(skip(self))]
795 async fn add_transaction(&self, transaction: Transaction) -> Result<(), Self::Err> {
796 let write_txn = self.db.begin_write().map_err(Error::from)?;
797
798 {
799 let mut table = write_txn
800 .open_table(TRANSACTIONS_TABLE)
801 .map_err(Error::from)?;
802 table
803 .insert(
804 transaction.id().as_slice(),
805 serde_json::to_string(&transaction)
806 .map_err(Error::from)?
807 .as_str(),
808 )
809 .map_err(Error::from)?;
810 }
811
812 write_txn.commit().map_err(Error::from)?;
813
814 Ok(())
815 }
816
817 #[instrument(skip(self))]
818 async fn get_transaction(
819 &self,
820 transaction_id: TransactionId,
821 ) -> Result<Option<Transaction>, Self::Err> {
822 let read_txn = self.db.begin_read().map_err(Error::from)?;
823 let table = read_txn
824 .open_table(TRANSACTIONS_TABLE)
825 .map_err(Error::from)?;
826
827 if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
828 return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
829 }
830
831 Ok(None)
832 }
833
834 #[instrument(skip(self))]
835 async fn list_transactions(
836 &self,
837 mint_url: Option<MintUrl>,
838 direction: Option<TransactionDirection>,
839 unit: Option<CurrencyUnit>,
840 ) -> Result<Vec<Transaction>, Self::Err> {
841 let read_txn = self.db.begin_read().map_err(Error::from)?;
842
843 let table = read_txn
844 .open_table(TRANSACTIONS_TABLE)
845 .map_err(Error::from)?;
846
847 let transactions: Vec<Transaction> = table
848 .iter()
849 .map_err(Error::from)?
850 .flatten()
851 .filter_map(|(_k, v)| {
852 let mut transaction = None;
853
854 if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
855 if tx.matches_conditions(&mint_url, &direction, &unit) {
856 transaction = Some(tx)
857 }
858 }
859
860 transaction
861 })
862 .collect();
863
864 Ok(transactions)
865 }
866
867 #[instrument(skip(self))]
868 async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), Self::Err> {
869 let write_txn = self.db.begin_write().map_err(Error::from)?;
870
871 {
872 let mut table = write_txn
873 .open_table(TRANSACTIONS_TABLE)
874 .map_err(Error::from)?;
875 table
876 .remove(transaction_id.as_slice())
877 .map_err(Error::from)?;
878 }
879
880 write_txn.commit().map_err(Error::from)?;
881
882 Ok(())
883 }
884}