cdk_redb/wallet/
mod.rs

1//! Redb Wallet
2
3use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::path::Path;
6use std::str::FromStr;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use cdk_common::common::ProofInfo;
11use cdk_common::database::WalletDatabase;
12use cdk_common::mint_url::MintUrl;
13use cdk_common::util::unix_time;
14use cdk_common::wallet::{self, MintQuote, Transaction, TransactionDirection, TransactionId};
15use cdk_common::{
16    database, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PublicKey, SpendingConditions,
17    State,
18};
19use redb::{Database, MultimapTableDefinition, ReadableTable, TableDefinition};
20use tracing::instrument;
21
22use super::error::Error;
23use crate::migrations::migrate_00_to_01;
24use crate::wallet::migrations::{migrate_01_to_02, migrate_02_to_03, migrate_03_to_04};
25
26mod migrations;
27
28// <Mint_url, Info>
29const MINTS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mints_table");
30// <Mint_Url, Keyset_id>
31const MINT_KEYSETS_TABLE: MultimapTableDefinition<&str, &[u8]> =
32    MultimapTableDefinition::new("mint_keysets");
33// <Keyset_id, KeysetInfo>
34const KEYSETS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("keysets");
35// <Quote_id, quote>
36const MINT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_quotes");
37// <Quote_id, quote>
38const MELT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("melt_quotes");
39const MINT_KEYS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_keys");
40// <Y, Proof Info>
41const PROOFS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("proofs");
42const CONFIG_TABLE: TableDefinition<&str, &str> = TableDefinition::new("config");
43const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_counter");
44// <Transaction_id, Transaction>
45const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
46
47const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
48
49const DATABASE_VERSION: u32 = 4;
50
51/// Wallet Redb Database
52#[derive(Debug, Clone)]
53pub struct WalletRedbDatabase {
54    db: Arc<Database>,
55}
56
57impl WalletRedbDatabase {
58    /// Create new [`WalletRedbDatabase`]
59    pub fn new(path: &Path) -> Result<Self, Error> {
60        {
61            // Check if parent directory exists before attempting to create database
62            if let Some(parent) = path.parent() {
63                if !parent.exists() {
64                    return Err(Error::Io(std::io::Error::new(
65                        std::io::ErrorKind::NotFound,
66                        format!("Parent directory does not exist: {parent:?}"),
67                    )));
68                }
69            }
70
71            let db = Arc::new(Database::create(path)?);
72
73            let db_version: Option<String>;
74            {
75                // Check database version
76                let read_txn = db.begin_read()?;
77                let table = read_txn.open_table(CONFIG_TABLE);
78
79                db_version = match table {
80                    Ok(table) => table.get("db_version")?.map(|v| v.value().to_string()),
81                    Err(_) => None,
82                };
83            }
84
85            match db_version {
86                Some(db_version) => {
87                    let mut current_file_version = u32::from_str(&db_version)?;
88                    tracing::info!("Current file version {}", current_file_version);
89
90                    match current_file_version.cmp(&DATABASE_VERSION) {
91                        Ordering::Less => {
92                            tracing::info!(
93                                "Database needs to be upgraded at {} current is {}",
94                                current_file_version,
95                                DATABASE_VERSION
96                            );
97                            if current_file_version == 0 {
98                                current_file_version = migrate_00_to_01(Arc::clone(&db))?;
99                            }
100
101                            if current_file_version == 1 {
102                                current_file_version = migrate_01_to_02(Arc::clone(&db))?;
103                            }
104
105                            if current_file_version == 2 {
106                                current_file_version = migrate_02_to_03(Arc::clone(&db))?;
107                            }
108
109                            if current_file_version == 3 {
110                                current_file_version = migrate_03_to_04(Arc::clone(&db))?;
111                            }
112
113                            if current_file_version != DATABASE_VERSION {
114                                tracing::warn!(
115                                    "Database upgrade did not complete at {} current is {}",
116                                    current_file_version,
117                                    DATABASE_VERSION
118                                );
119                                return Err(Error::UnknownDatabaseVersion);
120                            }
121
122                            let write_txn = db.begin_write()?;
123                            {
124                                let mut table = write_txn.open_table(CONFIG_TABLE)?;
125
126                                table
127                                    .insert("db_version", DATABASE_VERSION.to_string().as_str())?;
128                            }
129
130                            write_txn.commit()?;
131                        }
132                        Ordering::Equal => {
133                            tracing::info!("Database is at current version {}", DATABASE_VERSION);
134                        }
135                        Ordering::Greater => {
136                            tracing::warn!(
137                                "Database upgrade did not complete at {} current is {}",
138                                current_file_version,
139                                DATABASE_VERSION
140                            );
141                            return Err(Error::UnknownDatabaseVersion);
142                        }
143                    }
144                }
145                None => {
146                    let write_txn = db.begin_write()?;
147                    {
148                        let mut table = write_txn.open_table(CONFIG_TABLE)?;
149                        // Open all tables to init a new db
150                        let _ = write_txn.open_table(MINTS_TABLE)?;
151                        let _ = write_txn.open_multimap_table(MINT_KEYSETS_TABLE)?;
152                        let _ = write_txn.open_table(KEYSETS_TABLE)?;
153                        let _ = write_txn.open_table(MINT_QUOTES_TABLE)?;
154                        let _ = write_txn.open_table(MELT_QUOTES_TABLE)?;
155                        let _ = write_txn.open_table(MINT_KEYS_TABLE)?;
156                        let _ = write_txn.open_table(PROOFS_TABLE)?;
157                        let _ = write_txn.open_table(KEYSET_COUNTER)?;
158                        let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
159                        let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
160                        table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
161                    }
162
163                    write_txn.commit()?;
164                }
165            }
166            drop(db);
167        }
168
169        // Check parent directory again for final database creation
170        if let Some(parent) = path.parent() {
171            if !parent.exists() {
172                return Err(Error::Io(std::io::Error::new(
173                    std::io::ErrorKind::NotFound,
174                    format!("Parent directory does not exist: {parent:?}"),
175                )));
176            }
177        }
178
179        let mut db = Database::create(path)?;
180
181        db.upgrade()?;
182
183        Ok(Self { db: Arc::new(db) })
184    }
185}
186
187#[async_trait]
188impl WalletDatabase for WalletRedbDatabase {
189    type Err = database::Error;
190
191    #[instrument(skip(self))]
192    async fn add_mint(
193        &self,
194        mint_url: MintUrl,
195        mint_info: Option<MintInfo>,
196    ) -> Result<(), Self::Err> {
197        let write_txn = self.db.begin_write().map_err(Error::from)?;
198
199        {
200            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
201            table
202                .insert(
203                    mint_url.to_string().as_str(),
204                    serde_json::to_string(&mint_info)
205                        .map_err(Error::from)?
206                        .as_str(),
207                )
208                .map_err(Error::from)?;
209        }
210        write_txn.commit().map_err(Error::from)?;
211
212        Ok(())
213    }
214
215    #[instrument(skip(self))]
216    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), Self::Err> {
217        let write_txn = self.db.begin_write().map_err(Error::from)?;
218
219        {
220            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
221            table
222                .remove(mint_url.to_string().as_str())
223                .map_err(Error::from)?;
224        }
225        write_txn.commit().map_err(Error::from)?;
226
227        Ok(())
228    }
229
230    #[instrument(skip(self))]
231    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, Self::Err> {
232        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
233        let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
234
235        if let Some(mint_info) = table
236            .get(mint_url.to_string().as_str())
237            .map_err(Error::from)?
238        {
239            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
240        }
241
242        Ok(None)
243    }
244
245    #[instrument(skip(self))]
246    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, Self::Err> {
247        let read_txn = self.db.begin_read().map_err(Error::from)?;
248        let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
249        let mints = table
250            .iter()
251            .map_err(Error::from)?
252            .flatten()
253            .map(|(mint, mint_info)| {
254                (
255                    MintUrl::from_str(mint.value()).unwrap(),
256                    serde_json::from_str(mint_info.value()).ok(),
257                )
258            })
259            .collect();
260
261        Ok(mints)
262    }
263
264    #[instrument(skip(self))]
265    async fn update_mint_url(
266        &self,
267        old_mint_url: MintUrl,
268        new_mint_url: MintUrl,
269    ) -> Result<(), Self::Err> {
270        // Update proofs table
271        {
272            let proofs = self
273                .get_proofs(Some(old_mint_url.clone()), None, None, None)
274                .await
275                .map_err(Error::from)?;
276
277            // Proofs with new url
278            let updated_proofs: Vec<ProofInfo> = proofs
279                .clone()
280                .into_iter()
281                .map(|mut p| {
282                    p.mint_url = new_mint_url.clone();
283                    p
284                })
285                .collect();
286
287            if !updated_proofs.is_empty() {
288                self.update_proofs(updated_proofs, vec![]).await?;
289            }
290        }
291
292        // Update mint quotes
293        {
294            let quotes = self.get_mint_quotes().await?;
295
296            let unix_time = unix_time();
297
298            let quotes: Vec<MintQuote> = quotes
299                .into_iter()
300                .filter_map(|mut q| {
301                    if q.expiry < unix_time {
302                        q.mint_url = new_mint_url.clone();
303                        Some(q)
304                    } else {
305                        None
306                    }
307                })
308                .collect();
309
310            for quote in quotes {
311                self.add_mint_quote(quote).await?;
312            }
313        }
314
315        Ok(())
316    }
317
318    #[instrument(skip(self))]
319    async fn add_mint_keysets(
320        &self,
321        mint_url: MintUrl,
322        keysets: Vec<KeySetInfo>,
323    ) -> Result<(), Self::Err> {
324        let write_txn = self.db.begin_write().map_err(Error::from)?;
325
326        let mut existing_u32 = false;
327
328        {
329            let mut table = write_txn
330                .open_multimap_table(MINT_KEYSETS_TABLE)
331                .map_err(Error::from)?;
332            let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
333            let mut u32_table = write_txn
334                .open_table(KEYSET_U32_MAPPING)
335                .map_err(Error::from)?;
336
337            for keyset in keysets {
338                // Check if keyset already exists
339                let existing_keyset = {
340                    let existing_keyset = keysets_table
341                        .get(keyset.id.to_bytes().as_slice())
342                        .map_err(Error::from)?;
343
344                    existing_keyset.map(|r| r.value().to_string())
345                };
346
347                let existing = u32_table
348                    .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
349                    .map_err(Error::from)?;
350
351                match existing {
352                    None => existing_u32 = false,
353                    Some(id) => {
354                        let id = Id::from_str(id.value())?;
355
356                        if id == keyset.id {
357                            existing_u32 = false;
358                        } else {
359                            println!("Breaking here");
360                            existing_u32 = true;
361                            break;
362                        }
363                    }
364                }
365
366                let keyset = if let Some(existing_keyset) = existing_keyset {
367                    let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
368
369                    existing_keyset.active = keyset.active;
370                    existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
371
372                    existing_keyset
373                } else {
374                    table
375                        .insert(
376                            mint_url.to_string().as_str(),
377                            keyset.id.to_bytes().as_slice(),
378                        )
379                        .map_err(Error::from)?;
380
381                    keyset
382                };
383
384                keysets_table
385                    .insert(
386                        keyset.id.to_bytes().as_slice(),
387                        serde_json::to_string(&keyset)
388                            .map_err(Error::from)?
389                            .as_str(),
390                    )
391                    .map_err(Error::from)?;
392            }
393        }
394
395        if existing_u32 {
396            tracing::warn!("Keyset already exists for keyset id");
397            write_txn.abort().map_err(Error::from)?;
398
399            return Err(database::Error::Duplicate);
400        }
401
402        write_txn.commit().map_err(Error::from)?;
403
404        Ok(())
405    }
406
407    #[instrument(skip(self))]
408    async fn get_mint_keysets(
409        &self,
410        mint_url: MintUrl,
411    ) -> Result<Option<Vec<KeySetInfo>>, Self::Err> {
412        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
413        let table = read_txn
414            .open_multimap_table(MINT_KEYSETS_TABLE)
415            .map_err(Error::from)?;
416
417        let keyset_ids = table
418            .get(mint_url.to_string().as_str())
419            .map_err(Error::from)?
420            .flatten()
421            .map(|k| Id::from_bytes(k.value()))
422            .collect::<Result<Vec<_>, _>>()?;
423
424        let mut keysets = vec![];
425
426        let keysets_t = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
427
428        for keyset_id in keyset_ids {
429            if let Some(keyset) = keysets_t
430                .get(keyset_id.to_bytes().as_slice())
431                .map_err(Error::from)?
432            {
433                let keyset = serde_json::from_str(keyset.value()).map_err(Error::from)?;
434
435                keysets.push(keyset);
436            }
437        }
438
439        match keysets.is_empty() {
440            true => Ok(None),
441            false => Ok(Some(keysets)),
442        }
443    }
444
445    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
446    async fn get_keyset_by_id(&self, keyset_id: &Id) -> Result<Option<KeySetInfo>, Self::Err> {
447        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
448        let table = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
449
450        match table
451            .get(keyset_id.to_bytes().as_slice())
452            .map_err(Error::from)?
453        {
454            Some(keyset) => {
455                let keyset: KeySetInfo =
456                    serde_json::from_str(keyset.value()).map_err(Error::from)?;
457
458                Ok(Some(keyset))
459            }
460            None => Ok(None),
461        }
462    }
463
464    #[instrument(skip_all)]
465    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), Self::Err> {
466        let write_txn = self.db.begin_write().map_err(Error::from)?;
467
468        {
469            let mut table = write_txn
470                .open_table(MINT_QUOTES_TABLE)
471                .map_err(Error::from)?;
472            table
473                .insert(
474                    quote.id.as_str(),
475                    serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
476                )
477                .map_err(Error::from)?;
478        }
479
480        write_txn.commit().map_err(Error::from)?;
481
482        Ok(())
483    }
484
485    #[instrument(skip_all)]
486    async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, Self::Err> {
487        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
488        let table = read_txn
489            .open_table(MINT_QUOTES_TABLE)
490            .map_err(Error::from)?;
491
492        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
493            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
494        }
495
496        Ok(None)
497    }
498
499    #[instrument(skip_all)]
500    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
501        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
502        let table = read_txn
503            .open_table(MINT_QUOTES_TABLE)
504            .map_err(Error::from)?;
505
506        Ok(table
507            .iter()
508            .map_err(Error::from)?
509            .flatten()
510            .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
511            .collect())
512    }
513
514    #[instrument(skip_all)]
515    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
516        let write_txn = self.db.begin_write().map_err(Error::from)?;
517
518        {
519            let mut table = write_txn
520                .open_table(MINT_QUOTES_TABLE)
521                .map_err(Error::from)?;
522            table.remove(quote_id).map_err(Error::from)?;
523        }
524
525        write_txn.commit().map_err(Error::from)?;
526
527        Ok(())
528    }
529
530    #[instrument(skip_all)]
531    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), Self::Err> {
532        let write_txn = self.db.begin_write().map_err(Error::from)?;
533
534        {
535            let mut table = write_txn
536                .open_table(MELT_QUOTES_TABLE)
537                .map_err(Error::from)?;
538            table
539                .insert(
540                    quote.id.as_str(),
541                    serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
542                )
543                .map_err(Error::from)?;
544        }
545
546        write_txn.commit().map_err(Error::from)?;
547
548        Ok(())
549    }
550
551    #[instrument(skip_all)]
552    async fn get_melt_quote(&self, quote_id: &str) -> Result<Option<wallet::MeltQuote>, Self::Err> {
553        let read_txn = self.db.begin_read().map_err(Error::from)?;
554        let table = read_txn
555            .open_table(MELT_QUOTES_TABLE)
556            .map_err(Error::from)?;
557
558        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
559            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
560        }
561
562        Ok(None)
563    }
564
565    #[instrument(skip_all)]
566    async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, Self::Err> {
567        let read_txn = self.db.begin_read().map_err(Error::from)?;
568        let table = read_txn
569            .open_table(MELT_QUOTES_TABLE)
570            .map_err(Error::from)?;
571
572        Ok(table
573            .iter()
574            .map_err(Error::from)?
575            .flatten()
576            .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
577            .collect())
578    }
579
580    #[instrument(skip_all)]
581    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
582        let write_txn = self.db.begin_write().map_err(Error::from)?;
583
584        {
585            let mut table = write_txn
586                .open_table(MELT_QUOTES_TABLE)
587                .map_err(Error::from)?;
588            table.remove(quote_id).map_err(Error::from)?;
589        }
590
591        write_txn.commit().map_err(Error::from)?;
592
593        Ok(())
594    }
595
596    #[instrument(skip_all)]
597    async fn add_keys(&self, keyset: KeySet) -> Result<(), Self::Err> {
598        let write_txn = self.db.begin_write().map_err(Error::from)?;
599
600        keyset.verify_id()?;
601
602        let existing_keys;
603        let existing_u32;
604
605        {
606            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
607
608            existing_keys = table
609                .insert(
610                    keyset.id.to_string().as_str(),
611                    serde_json::to_string(&keyset.keys)
612                        .map_err(Error::from)?
613                        .as_str(),
614                )
615                .map_err(Error::from)?
616                .is_some();
617
618            let mut table = write_txn
619                .open_table(KEYSET_U32_MAPPING)
620                .map_err(Error::from)?;
621
622            let existing = table
623                .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
624                .map_err(Error::from)?;
625
626            match existing {
627                None => existing_u32 = false,
628                Some(id) => {
629                    let id = Id::from_str(id.value())?;
630
631                    existing_u32 = id != keyset.id;
632                }
633            }
634        }
635
636        if existing_keys || existing_u32 {
637            tracing::warn!("Keys already exist for keyset id");
638            write_txn.abort().map_err(Error::from)?;
639
640            return Err(database::Error::Duplicate);
641        }
642
643        write_txn.commit().map_err(Error::from)?;
644
645        Ok(())
646    }
647
648    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
649    async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, Self::Err> {
650        let read_txn = self.db.begin_read().map_err(Error::from)?;
651        let table = read_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
652
653        if let Some(mint_info) = table
654            .get(keyset_id.to_string().as_str())
655            .map_err(Error::from)?
656        {
657            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
658        }
659
660        Ok(None)
661    }
662
663    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
664    async fn remove_keys(&self, keyset_id: &Id) -> Result<(), Self::Err> {
665        let write_txn = self.db.begin_write().map_err(Error::from)?;
666
667        {
668            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
669
670            table
671                .remove(keyset_id.to_string().as_str())
672                .map_err(Error::from)?;
673        }
674
675        write_txn.commit().map_err(Error::from)?;
676
677        Ok(())
678    }
679
680    #[instrument(skip(self, added, deleted_ys))]
681    async fn update_proofs(
682        &self,
683        added: Vec<ProofInfo>,
684        deleted_ys: Vec<PublicKey>,
685    ) -> Result<(), Self::Err> {
686        let write_txn = self.db.begin_write().map_err(Error::from)?;
687
688        {
689            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
690
691            for proof_info in added.iter() {
692                table
693                    .insert(
694                        proof_info.y.to_bytes().as_slice(),
695                        serde_json::to_string(&proof_info)
696                            .map_err(Error::from)?
697                            .as_str(),
698                    )
699                    .map_err(Error::from)?;
700            }
701
702            for y in deleted_ys.iter() {
703                table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
704            }
705        }
706        write_txn.commit().map_err(Error::from)?;
707
708        Ok(())
709    }
710
711    #[instrument(skip_all)]
712    async fn get_proofs(
713        &self,
714        mint_url: Option<MintUrl>,
715        unit: Option<CurrencyUnit>,
716        state: Option<Vec<State>>,
717        spending_conditions: Option<Vec<SpendingConditions>>,
718    ) -> Result<Vec<ProofInfo>, Self::Err> {
719        let read_txn = self.db.begin_read().map_err(Error::from)?;
720
721        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
722
723        let proofs: Vec<ProofInfo> = table
724            .iter()
725            .map_err(Error::from)?
726            .flatten()
727            .filter_map(|(_k, v)| {
728                let mut proof = None;
729
730                if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
731                    if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
732                    {
733                        proof = Some(proof_info)
734                    }
735                }
736
737                proof
738            })
739            .collect();
740
741        Ok(proofs)
742    }
743
744    async fn get_balance(
745        &self,
746        mint_url: Option<MintUrl>,
747        unit: Option<CurrencyUnit>,
748        state: Option<Vec<State>>,
749    ) -> Result<u64, database::Error> {
750        // For redb, we still need to fetch all proofs and sum them
751        // since redb doesn't have SQL aggregation
752        let proofs = self.get_proofs(mint_url, unit, state, None).await?;
753        Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
754    }
755
756    async fn update_proofs_state(
757        &self,
758        ys: Vec<PublicKey>,
759        state: State,
760    ) -> Result<(), database::Error> {
761        let read_txn = self.db.begin_read().map_err(Error::from)?;
762        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
763
764        let write_txn = self.db.begin_write().map_err(Error::from)?;
765
766        for y in ys {
767            let y_slice = y.to_bytes();
768            let proof = table
769                .get(y_slice.as_slice())
770                .map_err(Error::from)?
771                .ok_or(Error::UnknownY)?;
772
773            let mut proof_info =
774                serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
775
776            proof_info.state = state;
777
778            {
779                let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
780                table
781                    .insert(
782                        y_slice.as_slice(),
783                        serde_json::to_string(&proof_info)
784                            .map_err(Error::from)?
785                            .as_str(),
786                    )
787                    .map_err(Error::from)?;
788            }
789        }
790
791        write_txn.commit().map_err(Error::from)?;
792
793        Ok(())
794    }
795
796    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
797    async fn increment_keyset_counter(&self, keyset_id: &Id, count: u32) -> Result<u32, Self::Err> {
798        let write_txn = self.db.begin_write().map_err(Error::from)?;
799
800        let current_counter;
801        let new_counter;
802        {
803            let table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
804            let counter = table
805                .get(keyset_id.to_string().as_str())
806                .map_err(Error::from)?;
807
808            current_counter = match counter {
809                Some(c) => c.value(),
810                None => 0,
811            };
812
813            new_counter = current_counter + count;
814        }
815
816        {
817            let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
818
819            table
820                .insert(keyset_id.to_string().as_str(), new_counter)
821                .map_err(Error::from)?;
822        }
823        write_txn.commit().map_err(Error::from)?;
824
825        Ok(new_counter)
826    }
827
828    #[instrument(skip(self))]
829    async fn add_transaction(&self, transaction: Transaction) -> Result<(), Self::Err> {
830        let id = transaction.id();
831
832        let write_txn = self.db.begin_write().map_err(Error::from)?;
833
834        {
835            let mut table = write_txn
836                .open_table(TRANSACTIONS_TABLE)
837                .map_err(Error::from)?;
838            table
839                .insert(
840                    id.as_slice(),
841                    serde_json::to_string(&transaction)
842                        .map_err(Error::from)?
843                        .as_str(),
844                )
845                .map_err(Error::from)?;
846        }
847
848        write_txn.commit().map_err(Error::from)?;
849
850        Ok(())
851    }
852
853    #[instrument(skip(self))]
854    async fn get_transaction(
855        &self,
856        transaction_id: TransactionId,
857    ) -> Result<Option<Transaction>, Self::Err> {
858        let read_txn = self.db.begin_read().map_err(Error::from)?;
859        let table = read_txn
860            .open_table(TRANSACTIONS_TABLE)
861            .map_err(Error::from)?;
862
863        if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
864            return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
865        }
866
867        Ok(None)
868    }
869
870    #[instrument(skip(self))]
871    async fn list_transactions(
872        &self,
873        mint_url: Option<MintUrl>,
874        direction: Option<TransactionDirection>,
875        unit: Option<CurrencyUnit>,
876    ) -> Result<Vec<Transaction>, Self::Err> {
877        let read_txn = self.db.begin_read().map_err(Error::from)?;
878
879        let table = read_txn
880            .open_table(TRANSACTIONS_TABLE)
881            .map_err(Error::from)?;
882
883        let transactions: Vec<Transaction> = table
884            .iter()
885            .map_err(Error::from)?
886            .flatten()
887            .filter_map(|(_k, v)| {
888                let mut transaction = None;
889
890                if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
891                    if tx.matches_conditions(&mint_url, &direction, &unit) {
892                        transaction = Some(tx)
893                    }
894                }
895
896                transaction
897            })
898            .collect();
899
900        Ok(transactions)
901    }
902
903    #[instrument(skip(self))]
904    async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), Self::Err> {
905        let write_txn = self.db.begin_write().map_err(Error::from)?;
906
907        {
908            let mut table = write_txn
909                .open_table(TRANSACTIONS_TABLE)
910                .map_err(Error::from)?;
911            table
912                .remove(transaction_id.as_slice())
913                .map_err(Error::from)?;
914        }
915
916        write_txn.commit().map_err(Error::from)?;
917
918        Ok(())
919    }
920}