Skip to main content

cdk_redb/wallet/
mod.rs

1//! Redb Wallet
2
3use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::path::Path;
6use std::str::FromStr;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use cdk_common::bitcoin::bip32::DerivationPath;
11use cdk_common::database::{validate_kvstore_params, WalletDatabase};
12use cdk_common::mint_url::MintUrl;
13use cdk_common::nut00::KnownMethod;
14use cdk_common::util::unix_time;
15use cdk_common::wallet::{
16    self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
17};
18use cdk_common::{
19    database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod,
20    PublicKey, SpendingConditions, State,
21};
22use redb::{Database, MultimapTableDefinition, ReadableDatabase, ReadableTable, TableDefinition};
23use tracing::instrument;
24
25use crate::error::Error;
26use crate::migrations::migrate_00_to_01;
27use crate::wallet::migrations::{
28    migrate_01_to_02, migrate_02_to_03, migrate_03_to_04, migrate_04_to_05,
29};
30
31mod migrations;
32
33// <Mint_url, Info>
34const MINTS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mints_table");
35// <Mint_Url, Keyset_id>
36const MINT_KEYSETS_TABLE: MultimapTableDefinition<&str, &[u8]> =
37    MultimapTableDefinition::new("mint_keysets");
38// <Keyset_id, KeysetInfo>
39const KEYSETS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("keysets");
40// <Quote_id, quote>
41const MINT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_quotes");
42// <Quote_id, quote>
43const MELT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("melt_quotes");
44const MINT_KEYS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_keys");
45// <Y, Proof Info>
46const PROOFS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("proofs");
47const CONFIG_TABLE: TableDefinition<&str, &str> = TableDefinition::new("config");
48const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_counter");
49// <Transaction_id, Transaction>
50const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
51// <Saga_id, WalletSaga>
52const SAGAS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("wallet_sagas");
53
54// <Pubkey, P2PKSigningKey>
55const P2PK_SIGNING_KEYS_TABLE: TableDefinition<&[u8], &str> =
56    TableDefinition::new("p2pk_signing_keys");
57
58const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
59// <(primary_namespace, secondary_namespace, key), value>
60const KV_STORE_TABLE: TableDefinition<(&str, &str, &str), &[u8]> = TableDefinition::new("kv_store");
61
62const DATABASE_VERSION: u32 = 5;
63
64/// Wallet Redb Database
65#[derive(Debug, Clone)]
66pub struct WalletRedbDatabase {
67    db: Arc<Database>,
68}
69
70impl WalletRedbDatabase {
71    /// Create new [`WalletRedbDatabase`]
72    pub fn new(path: &Path) -> Result<Self, Error> {
73        {
74            // Check if parent directory exists before attempting to create database
75            if let Some(parent) = path.parent() {
76                if !parent.exists() {
77                    return Err(Error::Io(std::io::Error::new(
78                        std::io::ErrorKind::NotFound,
79                        format!("Parent directory does not exist: {parent:?}"),
80                    )));
81                }
82            }
83
84            let db = Arc::new(Database::create(path)?);
85
86            let db_version: Option<String>;
87            {
88                // Check database version
89                let read_txn = db.begin_read()?;
90                let table = read_txn.open_table(CONFIG_TABLE);
91
92                db_version = match table {
93                    Ok(table) => table.get("db_version")?.map(|v| v.value().to_string()),
94                    Err(_) => None,
95                };
96            }
97
98            match db_version {
99                Some(db_version) => {
100                    let mut current_file_version = u32::from_str(&db_version)?;
101                    tracing::info!("Current file version {}", current_file_version);
102
103                    match current_file_version.cmp(&DATABASE_VERSION) {
104                        Ordering::Less => {
105                            tracing::info!(
106                                "Database needs to be upgraded at {} current is {}",
107                                current_file_version,
108                                DATABASE_VERSION
109                            );
110                            if current_file_version == 0 {
111                                current_file_version = migrate_00_to_01(Arc::clone(&db))?;
112                            }
113
114                            if current_file_version == 1 {
115                                current_file_version = migrate_01_to_02(Arc::clone(&db))?;
116                            }
117
118                            if current_file_version == 2 {
119                                current_file_version = migrate_02_to_03(Arc::clone(&db))?;
120                            }
121
122                            if current_file_version == 3 {
123                                current_file_version = migrate_03_to_04(Arc::clone(&db))?;
124                            }
125
126                            if current_file_version == 4 {
127                                current_file_version = migrate_04_to_05(Arc::clone(&db))?;
128                            }
129
130                            if current_file_version != DATABASE_VERSION {
131                                tracing::warn!(
132                                    "Database upgrade did not complete at {} current is {}",
133                                    current_file_version,
134                                    DATABASE_VERSION
135                                );
136                                return Err(Error::UnknownDatabaseVersion);
137                            }
138
139                            let write_txn = db.begin_write()?;
140                            {
141                                let mut table = write_txn.open_table(CONFIG_TABLE)?;
142
143                                table
144                                    .insert("db_version", DATABASE_VERSION.to_string().as_str())?;
145                            }
146
147                            write_txn.commit()?;
148                        }
149                        Ordering::Equal => {
150                            tracing::info!("Database is at current version {}", DATABASE_VERSION);
151                        }
152                        Ordering::Greater => {
153                            tracing::warn!(
154                                "Database upgrade did not complete at {} current is {}",
155                                current_file_version,
156                                DATABASE_VERSION
157                            );
158                            return Err(Error::UnknownDatabaseVersion);
159                        }
160                    }
161                }
162                None => {
163                    let write_txn = db.begin_write()?;
164                    {
165                        let mut table = write_txn.open_table(CONFIG_TABLE)?;
166                        // Open all tables to init a new db
167                        let _ = write_txn.open_table(MINTS_TABLE)?;
168                        let _ = write_txn.open_multimap_table(MINT_KEYSETS_TABLE)?;
169                        let _ = write_txn.open_table(KEYSETS_TABLE)?;
170                        let _ = write_txn.open_table(MINT_QUOTES_TABLE)?;
171                        let _ = write_txn.open_table(MELT_QUOTES_TABLE)?;
172                        let _ = write_txn.open_table(MINT_KEYS_TABLE)?;
173                        let _ = write_txn.open_table(PROOFS_TABLE)?;
174                        let _ = write_txn.open_table(KEYSET_COUNTER)?;
175                        let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
176                        let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
177                        let _ = write_txn.open_table(KV_STORE_TABLE)?;
178                        let _ = write_txn.open_table(P2PK_SIGNING_KEYS_TABLE)?;
179                        table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
180                    }
181
182                    write_txn.commit()?;
183                }
184            }
185            drop(db);
186        }
187
188        // Check parent directory again for final database creation
189        if let Some(parent) = path.parent() {
190            if !parent.exists() {
191                return Err(Error::Io(std::io::Error::new(
192                    std::io::ErrorKind::NotFound,
193                    format!("Parent directory does not exist: {parent:?}"),
194                )));
195            }
196        }
197
198        let db = Database::create(path)?;
199
200        Ok(Self { db: Arc::new(db) })
201    }
202}
203
204#[async_trait]
205impl WalletDatabase<database::Error> for WalletRedbDatabase {
206    #[instrument(skip(self))]
207    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
208        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
209        let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
210
211        if let Some(mint_info) = table
212            .get(mint_url.to_string().as_str())
213            .map_err(Error::from)?
214        {
215            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
216        }
217
218        Ok(None)
219    }
220
221    #[instrument(skip(self))]
222    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
223        let read_txn = self.db.begin_read().map_err(Error::from)?;
224        let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
225        let mints = table
226            .iter()
227            .map_err(Error::from)?
228            .flatten()
229            .filter_map(|(mint, mint_info)| {
230                MintUrl::from_str(mint.value())
231                    .ok()
232                    .map(|url| (url, serde_json::from_str(mint_info.value()).ok()))
233            })
234            .collect();
235
236        Ok(mints)
237    }
238
239    #[instrument(skip(self))]
240    async fn get_mint_keysets(
241        &self,
242        mint_url: MintUrl,
243    ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
244        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
245        let table = read_txn
246            .open_multimap_table(MINT_KEYSETS_TABLE)
247            .map_err(Error::from)?;
248
249        let keyset_ids = table
250            .get(mint_url.to_string().as_str())
251            .map_err(Error::from)?
252            .flatten()
253            .map(|k| Id::from_bytes(k.value()))
254            .collect::<Result<Vec<_>, _>>()?;
255
256        let mut keysets = vec![];
257
258        let keysets_t = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
259
260        for keyset_id in keyset_ids {
261            if let Some(keyset) = keysets_t
262                .get(keyset_id.to_bytes().as_slice())
263                .map_err(Error::from)?
264            {
265                let keyset = serde_json::from_str(keyset.value()).map_err(Error::from)?;
266
267                keysets.push(keyset);
268            }
269        }
270
271        match keysets.is_empty() {
272            true => Ok(None),
273            false => Ok(Some(keysets)),
274        }
275    }
276
277    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
278    async fn get_keyset_by_id(
279        &self,
280        keyset_id: &Id,
281    ) -> Result<Option<KeySetInfo>, database::Error> {
282        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
283        let table = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
284
285        match table
286            .get(keyset_id.to_bytes().as_slice())
287            .map_err(Error::from)?
288        {
289            Some(keyset) => {
290                let keyset: KeySetInfo =
291                    serde_json::from_str(keyset.value()).map_err(Error::from)?;
292
293                Ok(Some(keyset))
294            }
295            None => Ok(None),
296        }
297    }
298
299    #[instrument(skip_all)]
300    async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
301        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
302        let table = read_txn
303            .open_table(MINT_QUOTES_TABLE)
304            .map_err(Error::from)?;
305
306        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
307            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
308        }
309
310        Ok(None)
311    }
312
313    #[instrument(skip_all)]
314    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
315        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
316        let table = read_txn
317            .open_table(MINT_QUOTES_TABLE)
318            .map_err(Error::from)?;
319
320        Ok(table
321            .iter()
322            .map_err(Error::from)?
323            .flatten()
324            .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
325            .collect())
326    }
327
328    async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
329        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
330        let table = read_txn
331            .open_table(MINT_QUOTES_TABLE)
332            .map_err(Error::from)?;
333
334        Ok(table
335            .iter()
336            .map_err(Error::from)?
337            .flatten()
338            .flat_map(|(_id, quote)| serde_json::from_str::<MintQuote>(quote.value()).ok())
339            .filter(|quote| {
340                quote.amount_issued == Amount::ZERO
341                    || quote.payment_method == PaymentMethod::Known(KnownMethod::Bolt12)
342            })
343            .collect())
344    }
345
346    #[instrument(skip_all)]
347    async fn get_melt_quote(
348        &self,
349        quote_id: &str,
350    ) -> Result<Option<wallet::MeltQuote>, database::Error> {
351        let read_txn = self.db.begin_read().map_err(Error::from)?;
352        let table = read_txn
353            .open_table(MELT_QUOTES_TABLE)
354            .map_err(Error::from)?;
355
356        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
357            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
358        }
359
360        Ok(None)
361    }
362
363    #[instrument(skip_all)]
364    async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
365        let read_txn = self.db.begin_read().map_err(Error::from)?;
366        let table = read_txn
367            .open_table(MELT_QUOTES_TABLE)
368            .map_err(Error::from)?;
369
370        Ok(table
371            .iter()
372            .map_err(Error::from)?
373            .flatten()
374            .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
375            .collect())
376    }
377
378    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
379    async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
380        let read_txn = self.db.begin_read().map_err(Error::from)?;
381        let table = read_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
382
383        if let Some(mint_info) = table
384            .get(keyset_id.to_string().as_str())
385            .map_err(Error::from)?
386        {
387            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
388        }
389
390        Ok(None)
391    }
392
393    #[instrument(skip_all)]
394    async fn get_proofs(
395        &self,
396        mint_url: Option<MintUrl>,
397        unit: Option<CurrencyUnit>,
398        state: Option<Vec<State>>,
399        spending_conditions: Option<Vec<SpendingConditions>>,
400    ) -> Result<Vec<ProofInfo>, database::Error> {
401        let read_txn = self.db.begin_read().map_err(Error::from)?;
402
403        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
404
405        let proofs: Vec<ProofInfo> = table
406            .iter()
407            .map_err(Error::from)?
408            .flatten()
409            .filter_map(|(_k, v)| {
410                let mut proof = None;
411
412                if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
413                    if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
414                    {
415                        proof = Some(proof_info)
416                    }
417                }
418
419                proof
420            })
421            .collect();
422
423        Ok(proofs)
424    }
425
426    #[instrument(skip(self, ys))]
427    async fn get_proofs_by_ys(
428        &self,
429        ys: Vec<PublicKey>,
430    ) -> Result<Vec<ProofInfo>, database::Error> {
431        if ys.is_empty() {
432            return Ok(Vec::new());
433        }
434
435        let read_txn = self.db.begin_read().map_err(Error::from)?;
436        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
437
438        let mut proofs = Vec::new();
439
440        for y in ys {
441            if let Some(proof) = table.get(y.to_bytes().as_slice()).map_err(Error::from)? {
442                let proof_info =
443                    serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
444                proofs.push(proof_info);
445            }
446        }
447
448        Ok(proofs)
449    }
450
451    async fn get_balance(
452        &self,
453        mint_url: Option<MintUrl>,
454        unit: Option<CurrencyUnit>,
455        state: Option<Vec<State>>,
456    ) -> Result<u64, database::Error> {
457        // For redb, we still need to fetch all proofs and sum them
458        // since redb doesn't have SQL aggregation
459        let proofs = self.get_proofs(mint_url, unit, state, None).await?;
460        Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
461    }
462
463    #[instrument(skip(self))]
464    async fn get_transaction(
465        &self,
466        transaction_id: TransactionId,
467    ) -> Result<Option<Transaction>, database::Error> {
468        let read_txn = self.db.begin_read().map_err(Error::from)?;
469        let table = read_txn
470            .open_table(TRANSACTIONS_TABLE)
471            .map_err(Error::from)?;
472
473        if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
474            return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
475        }
476
477        Ok(None)
478    }
479
480    #[instrument(skip(self))]
481    async fn list_transactions(
482        &self,
483        mint_url: Option<MintUrl>,
484        direction: Option<TransactionDirection>,
485        unit: Option<CurrencyUnit>,
486    ) -> Result<Vec<Transaction>, database::Error> {
487        let read_txn = self.db.begin_read().map_err(Error::from)?;
488
489        let table = read_txn
490            .open_table(TRANSACTIONS_TABLE)
491            .map_err(Error::from)?;
492
493        let transactions: Vec<Transaction> = table
494            .iter()
495            .map_err(Error::from)?
496            .flatten()
497            .filter_map(|(_k, v)| {
498                let mut transaction = None;
499
500                if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
501                    if tx.matches_conditions(&mint_url, &direction, &unit) {
502                        transaction = Some(tx)
503                    }
504                }
505
506                transaction
507            })
508            .collect();
509
510        Ok(transactions)
511    }
512
513    #[instrument(skip(self, added, removed_ys))]
514    async fn update_proofs(
515        &self,
516        added: Vec<ProofInfo>,
517        removed_ys: Vec<PublicKey>,
518    ) -> Result<(), database::Error> {
519        let write_txn = self.db.begin_write().map_err(Error::from)?;
520        {
521            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
522
523            for proof_info in added.iter() {
524                table
525                    .insert(
526                        proof_info.y.to_bytes().as_slice(),
527                        serde_json::to_string(&proof_info)
528                            .map_err(Error::from)?
529                            .as_str(),
530                    )
531                    .map_err(Error::from)?;
532            }
533
534            for y in removed_ys.iter() {
535                table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
536            }
537        }
538        write_txn.commit().map_err(Error::from)?;
539        Ok(())
540    }
541
542    async fn update_proofs_state(
543        &self,
544        ys: Vec<PublicKey>,
545        state: State,
546    ) -> Result<(), database::Error> {
547        let write_txn = self.db.begin_write().map_err(Error::from)?;
548        {
549            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
550
551            for y in ys {
552                let y_slice = y.to_bytes();
553                let proof = table
554                    .get(y_slice.as_slice())
555                    .map_err(Error::from)?
556                    .ok_or(Error::UnknownY)?;
557
558                let mut proof_info =
559                    serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
560                drop(proof);
561
562                proof_info.state = state;
563
564                table
565                    .insert(
566                        y_slice.as_slice(),
567                        serde_json::to_string(&proof_info)
568                            .map_err(Error::from)?
569                            .as_str(),
570                    )
571                    .map_err(Error::from)?;
572            }
573        }
574        write_txn.commit().map_err(Error::from)?;
575        Ok(())
576    }
577
578    #[instrument(skip(self))]
579    async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
580        let id = transaction.id();
581        let write_txn = self.db.begin_write().map_err(Error::from)?;
582        {
583            let mut table = write_txn
584                .open_table(TRANSACTIONS_TABLE)
585                .map_err(Error::from)?;
586            table
587                .insert(
588                    id.as_slice(),
589                    serde_json::to_string(&transaction)
590                        .map_err(Error::from)?
591                        .as_str(),
592                )
593                .map_err(Error::from)?;
594        }
595        write_txn.commit().map_err(Error::from)?;
596        Ok(())
597    }
598
599    #[instrument(skip(self))]
600    async fn update_mint_url(
601        &self,
602        old_mint_url: MintUrl,
603        new_mint_url: MintUrl,
604    ) -> Result<(), database::Error> {
605        let write_txn = self.db.begin_write().map_err(Error::from)?;
606
607        // Update proofs table
608        {
609            let read_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
610            let proofs: Vec<ProofInfo> = read_table
611                .iter()
612                .map_err(Error::from)?
613                .flatten()
614                .filter_map(|(_k, v)| {
615                    let proof_info = serde_json::from_str::<ProofInfo>(v.value()).ok()?;
616                    if proof_info.mint_url == old_mint_url {
617                        Some(proof_info)
618                    } else {
619                        None
620                    }
621                })
622                .collect();
623            drop(read_table);
624
625            if !proofs.is_empty() {
626                let mut write_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
627                for mut proof_info in proofs {
628                    proof_info.mint_url = new_mint_url.clone();
629                    write_table
630                        .insert(
631                            proof_info.y.to_bytes().as_slice(),
632                            serde_json::to_string(&proof_info)
633                                .map_err(Error::from)?
634                                .as_str(),
635                        )
636                        .map_err(Error::from)?;
637                }
638            }
639        }
640
641        // Update mint quotes
642        {
643            let mut table = write_txn
644                .open_table(MINT_QUOTES_TABLE)
645                .map_err(Error::from)?;
646
647            let unix_time = unix_time();
648
649            let quotes: Vec<MintQuote> = table
650                .iter()
651                .map_err(Error::from)?
652                .flatten()
653                .filter_map(|(_, quote)| {
654                    let mut q: MintQuote = serde_json::from_str(quote.value()).ok()?;
655                    if q.mint_url == old_mint_url && q.expiry >= unix_time {
656                        q.mint_url = new_mint_url.clone();
657                        Some(q)
658                    } else {
659                        None
660                    }
661                })
662                .collect();
663
664            for quote in quotes {
665                table
666                    .insert(
667                        quote.id.as_str(),
668                        serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
669                    )
670                    .map_err(Error::from)?;
671            }
672        }
673
674        write_txn.commit().map_err(Error::from)?;
675        Ok(())
676    }
677
678    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
679    async fn increment_keyset_counter(
680        &self,
681        keyset_id: &Id,
682        count: u32,
683    ) -> Result<u32, database::Error> {
684        let write_txn = self.db.begin_write().map_err(Error::from)?;
685        let new_counter = {
686            let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
687            let current_counter = table
688                .get(keyset_id.to_string().as_str())
689                .map_err(Error::from)?
690                .map(|x| x.value())
691                .unwrap_or_default();
692
693            let new_counter = current_counter
694                .checked_add(count)
695                .ok_or(database::Error::AmountOverflow)?;
696
697            table
698                .insert(keyset_id.to_string().as_str(), new_counter)
699                .map_err(Error::from)?;
700
701            new_counter
702        };
703        write_txn.commit().map_err(Error::from)?;
704        Ok(new_counter)
705    }
706
707    #[instrument(skip(self))]
708    async fn add_mint(
709        &self,
710        mint_url: MintUrl,
711        mint_info: Option<MintInfo>,
712    ) -> Result<(), database::Error> {
713        let write_txn = self.db.begin_write().map_err(Error::from)?;
714        {
715            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
716            table
717                .insert(
718                    mint_url.to_string().as_str(),
719                    serde_json::to_string(&mint_info)
720                        .map_err(Error::from)?
721                        .as_str(),
722                )
723                .map_err(Error::from)?;
724        }
725        write_txn.commit().map_err(Error::from)?;
726        Ok(())
727    }
728
729    #[instrument(skip(self))]
730    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
731        let write_txn = self.db.begin_write().map_err(Error::from)?;
732        {
733            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
734            table
735                .remove(mint_url.to_string().as_str())
736                .map_err(Error::from)?;
737        }
738        write_txn.commit().map_err(Error::from)?;
739        Ok(())
740    }
741
742    #[instrument(skip(self))]
743    async fn add_mint_keysets(
744        &self,
745        mint_url: MintUrl,
746        keysets: Vec<KeySetInfo>,
747    ) -> Result<(), database::Error> {
748        let write_txn = self.db.begin_write().map_err(Error::from)?;
749        {
750            let mut table = write_txn
751                .open_multimap_table(MINT_KEYSETS_TABLE)
752                .map_err(Error::from)?;
753            let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
754            let mut u32_table = write_txn
755                .open_table(KEYSET_U32_MAPPING)
756                .map_err(Error::from)?;
757
758            let mut existing_u32 = false;
759
760            for keyset in keysets {
761                // Check if keyset already exists
762                let existing_keyset = {
763                    let existing_keyset = keysets_table
764                        .get(keyset.id.to_bytes().as_slice())
765                        .map_err(Error::from)?;
766
767                    existing_keyset.map(|r| r.value().to_string())
768                };
769
770                let existing = u32_table
771                    .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
772                    .map_err(Error::from)?;
773
774                match existing {
775                    None => existing_u32 = false,
776                    Some(id) => {
777                        let id = Id::from_str(id.value())?;
778
779                        if id == keyset.id {
780                            existing_u32 = false;
781                        } else {
782                            existing_u32 = true;
783                            break;
784                        }
785                    }
786                }
787
788                let keyset = if let Some(existing_keyset) = existing_keyset {
789                    let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
790
791                    existing_keyset.active = keyset.active;
792                    existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
793
794                    existing_keyset
795                } else {
796                    table
797                        .insert(
798                            mint_url.to_string().as_str(),
799                            keyset.id.to_bytes().as_slice(),
800                        )
801                        .map_err(Error::from)?;
802
803                    keyset
804                };
805
806                keysets_table
807                    .insert(
808                        keyset.id.to_bytes().as_slice(),
809                        serde_json::to_string(&keyset)
810                            .map_err(Error::from)?
811                            .as_str(),
812                    )
813                    .map_err(Error::from)?;
814            }
815
816            if existing_u32 {
817                tracing::warn!("Keyset already exists for keyset id");
818                return Err(database::Error::Duplicate);
819            }
820        }
821        write_txn.commit().map_err(Error::from)?;
822        Ok(())
823    }
824
825    #[instrument(skip_all)]
826    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
827        let write_txn = self.db.begin_write().map_err(Error::from)?;
828        {
829            let mut table = write_txn
830                .open_table(MINT_QUOTES_TABLE)
831                .map_err(Error::from)?;
832
833            // Check for existing quote and version match
834            let existing_quote_json = table
835                .get(quote.id.as_str())
836                .map_err(Error::from)?
837                .map(|v| v.value().to_string());
838
839            let mut quote_to_save = quote.clone();
840
841            if let Some(json) = existing_quote_json {
842                let existing_quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
843
844                if existing_quote.version != quote.version {
845                    return Err(database::Error::ConcurrentUpdate);
846                }
847
848                // Increment version for update
849                quote_to_save.version = quote.version.wrapping_add(1);
850            }
851
852            table
853                .insert(
854                    quote_to_save.id.as_str(),
855                    serde_json::to_string(&quote_to_save)
856                        .map_err(Error::from)?
857                        .as_str(),
858                )
859                .map_err(Error::from)?;
860        }
861        write_txn.commit().map_err(Error::from)?;
862        Ok(())
863    }
864
865    #[instrument(skip_all)]
866    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
867        let write_txn = self.db.begin_write().map_err(Error::from)?;
868        {
869            let mut table = write_txn
870                .open_table(MINT_QUOTES_TABLE)
871                .map_err(Error::from)?;
872            table.remove(quote_id).map_err(Error::from)?;
873        }
874        write_txn.commit().map_err(Error::from)?;
875        Ok(())
876    }
877
878    #[instrument(skip_all)]
879    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
880        let write_txn = self.db.begin_write().map_err(Error::from)?;
881        {
882            let mut table = write_txn
883                .open_table(MELT_QUOTES_TABLE)
884                .map_err(Error::from)?;
885
886            // Check for existing quote and version match
887            let existing_quote_json = table
888                .get(quote.id.as_str())
889                .map_err(Error::from)?
890                .map(|v| v.value().to_string());
891
892            let mut quote_to_save = quote.clone();
893
894            if let Some(json) = existing_quote_json {
895                let existing_quote: wallet::MeltQuote =
896                    serde_json::from_str(&json).map_err(Error::from)?;
897
898                if existing_quote.version != quote.version {
899                    return Err(database::Error::ConcurrentUpdate);
900                }
901
902                // Increment version for update
903                quote_to_save.version = quote.version.wrapping_add(1);
904            }
905
906            table
907                .insert(
908                    quote_to_save.id.as_str(),
909                    serde_json::to_string(&quote_to_save)
910                        .map_err(Error::from)?
911                        .as_str(),
912                )
913                .map_err(Error::from)?;
914        }
915        write_txn.commit().map_err(Error::from)?;
916        Ok(())
917    }
918
919    #[instrument(skip_all)]
920    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
921        let write_txn = self.db.begin_write().map_err(Error::from)?;
922        {
923            let mut table = write_txn
924                .open_table(MELT_QUOTES_TABLE)
925                .map_err(Error::from)?;
926            table.remove(quote_id).map_err(Error::from)?;
927        }
928        write_txn.commit().map_err(Error::from)?;
929        Ok(())
930    }
931
932    #[instrument(skip_all)]
933    async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
934        let write_txn = self.db.begin_write().map_err(Error::from)?;
935
936        keyset.verify_id()?;
937
938        {
939            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
940
941            let existing_keys = table
942                .insert(
943                    keyset.id.to_string().as_str(),
944                    serde_json::to_string(&keyset.keys)
945                        .map_err(Error::from)?
946                        .as_str(),
947                )
948                .map_err(Error::from)?
949                .is_some();
950
951            let mut table = write_txn
952                .open_table(KEYSET_U32_MAPPING)
953                .map_err(Error::from)?;
954
955            let existing = table
956                .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
957                .map_err(Error::from)?;
958
959            let existing_u32 = match existing {
960                None => false,
961                Some(id) => {
962                    let id = Id::from_str(id.value())?;
963                    id != keyset.id
964                }
965            };
966
967            if existing_keys || existing_u32 {
968                tracing::warn!("Keys already exist for keyset id");
969                return Err(database::Error::Duplicate);
970            }
971        }
972        write_txn.commit().map_err(Error::from)?;
973        Ok(())
974    }
975
976    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
977    async fn remove_keys(&self, keyset_id: &Id) -> Result<(), database::Error> {
978        let write_txn = self.db.begin_write().map_err(Error::from)?;
979        {
980            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
981
982            table
983                .remove(keyset_id.to_string().as_str())
984                .map_err(Error::from)?;
985        }
986        write_txn.commit().map_err(Error::from)?;
987        Ok(())
988    }
989
990    #[instrument(skip(self))]
991    async fn remove_transaction(
992        &self,
993        transaction_id: TransactionId,
994    ) -> Result<(), database::Error> {
995        let write_txn = self.db.begin_write().map_err(Error::from)?;
996        {
997            let mut table = write_txn
998                .open_table(TRANSACTIONS_TABLE)
999                .map_err(Error::from)?;
1000            table
1001                .remove(transaction_id.as_slice())
1002                .map_err(Error::from)?;
1003        }
1004        write_txn.commit().map_err(Error::from)?;
1005        Ok(())
1006    }
1007
1008    #[instrument(skip(self))]
1009    async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
1010        let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
1011        let id_str = saga.id.to_string();
1012
1013        let write_txn = self.db.begin_write().map_err(Error::from)?;
1014        {
1015            let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1016            table
1017                .insert(id_str.as_str(), saga_json.as_str())
1018                .map_err(Error::from)?;
1019        }
1020        write_txn.commit().map_err(Error::from)?;
1021        Ok(())
1022    }
1023
1024    #[instrument(skip(self))]
1025    async fn get_saga(
1026        &self,
1027        id: &uuid::Uuid,
1028    ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1029        let read_txn = self.db.begin_read().map_err(Error::from)?;
1030        let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1031        let id_str = id.to_string();
1032
1033        let result = table
1034            .get(id_str.as_str())
1035            .map_err(Error::from)?
1036            .map(|saga| serde_json::from_str(saga.value()).map_err(Error::from))
1037            .transpose()?;
1038
1039        Ok(result)
1040    }
1041
1042    #[instrument(skip(self))]
1043    async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1044        let id_str = saga.id.to_string();
1045
1046        // The saga.version has already been incremented by the caller, so we check
1047        // for (saga.version - 1) as the expected version in the database.
1048        let expected_version = saga.version.saturating_sub(1);
1049
1050        let write_txn = self.db.begin_write().map_err(Error::from)?;
1051        let updated = {
1052            let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1053
1054            // Read existing saga to check version (optimistic locking)
1055            let existing_saga_json = table
1056                .get(id_str.as_str())
1057                .map_err(Error::from)?
1058                .map(|v| v.value().to_string());
1059
1060            match existing_saga_json {
1061                Some(json) => {
1062                    let existing_saga: wallet::WalletSaga =
1063                        serde_json::from_str(&json).map_err(Error::from)?;
1064
1065                    // Check if version matches expected version
1066                    if existing_saga.version != expected_version {
1067                        // Version mismatch - another instance modified it
1068                        false
1069                    } else {
1070                        // Version matches - safe to update
1071                        let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
1072                        table
1073                            .insert(id_str.as_str(), saga_json.as_str())
1074                            .map_err(Error::from)?;
1075                        true
1076                    }
1077                }
1078                None => {
1079                    // Saga doesn't exist - can't update
1080                    false
1081                }
1082            }
1083        };
1084        write_txn.commit().map_err(Error::from)?;
1085        Ok(updated)
1086    }
1087
1088    #[instrument(skip(self))]
1089    async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1090        let write_txn = self.db.begin_write().map_err(Error::from)?;
1091        let id_str = id.to_string();
1092        {
1093            let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1094            table.remove(id_str.as_str()).map_err(Error::from)?;
1095        }
1096        write_txn.commit().map_err(Error::from)?;
1097        Ok(())
1098    }
1099
1100    #[instrument(skip(self))]
1101    async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1102        let read_txn = self.db.begin_read().map_err(Error::from)?;
1103        let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1104
1105        let mut sagas: Vec<wallet::WalletSaga> = table
1106            .iter()
1107            .map_err(Error::from)?
1108            .flatten()
1109            .filter_map(|(_, saga_json)| {
1110                serde_json::from_str::<wallet::WalletSaga>(saga_json.value()).ok()
1111            })
1112            .collect();
1113
1114        // Sort by created_at ascending (oldest first)
1115        sagas.sort_by_key(|saga| saga.created_at);
1116
1117        Ok(sagas)
1118    }
1119
1120    #[instrument(skip(self))]
1121    async fn reserve_proofs(
1122        &self,
1123        ys: Vec<PublicKey>,
1124        operation_id: &uuid::Uuid,
1125    ) -> Result<(), database::Error> {
1126        let write_txn = self.db.begin_write().map_err(Error::from)?;
1127
1128        {
1129            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1130
1131            for y in ys {
1132                let y_bytes = y.to_bytes();
1133
1134                // Read the proof and convert to string immediately
1135                let proof_json_str = {
1136                    let proof_json_opt = table.get(y_bytes.as_slice()).map_err(Error::from)?;
1137                    proof_json_opt.map(|proof_json| proof_json.value().to_string())
1138                };
1139
1140                let Some(proof_json_str) = proof_json_str else {
1141                    return Err(database::Error::ProofNotUnspent);
1142                };
1143
1144                let mut proof: ProofInfo =
1145                    serde_json::from_str(&proof_json_str).map_err(Error::from)?;
1146
1147                if proof.state != State::Unspent {
1148                    return Err(database::Error::ProofNotUnspent);
1149                }
1150
1151                proof.state = State::Reserved;
1152                proof.used_by_operation = Some(*operation_id);
1153
1154                let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1155                table
1156                    .insert(y_bytes.as_slice(), updated_json.as_str())
1157                    .map_err(Error::from)?;
1158            }
1159        }
1160
1161        write_txn.commit().map_err(Error::from)?;
1162        Ok(())
1163    }
1164
1165    #[instrument(skip(self))]
1166    async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1167        let write_txn = self.db.begin_write().map_err(Error::from)?;
1168
1169        {
1170            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1171
1172            // Collect all proofs first to avoid borrowing issues
1173            let all_proofs: Vec<(Vec<u8>, ProofInfo)> = table
1174                .iter()
1175                .map_err(Error::from)?
1176                .flatten()
1177                .filter_map(|(y, proof_json)| {
1178                    let proof: ProofInfo = serde_json::from_str(proof_json.value()).ok()?;
1179                    Some((y.value().to_vec(), proof))
1180                })
1181                .collect();
1182
1183            // Now update proofs that match the operation_id
1184            for (y_bytes, mut proof) in all_proofs {
1185                if proof.used_by_operation == Some(*operation_id) {
1186                    proof.state = State::Unspent;
1187                    proof.used_by_operation = None;
1188
1189                    let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1190                    table
1191                        .insert(y_bytes.as_slice(), updated_json.as_str())
1192                        .map_err(Error::from)?;
1193                }
1194            }
1195        }
1196
1197        write_txn.commit().map_err(Error::from)?;
1198        Ok(())
1199    }
1200
1201    #[instrument(skip(self))]
1202    async fn get_reserved_proofs(
1203        &self,
1204        operation_id: &uuid::Uuid,
1205    ) -> Result<Vec<ProofInfo>, database::Error> {
1206        let read_txn = self.db.begin_read().map_err(Error::from)?;
1207        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1208
1209        let proofs: Vec<ProofInfo> = table
1210            .iter()
1211            .map_err(Error::from)?
1212            .flatten()
1213            .filter_map(|(_, proof_json)| {
1214                serde_json::from_str::<ProofInfo>(proof_json.value()).ok()
1215            })
1216            .filter(|proof| proof.used_by_operation == Some(*operation_id))
1217            .collect();
1218
1219        Ok(proofs)
1220    }
1221
1222    #[instrument(skip(self))]
1223    async fn reserve_melt_quote(
1224        &self,
1225        quote_id: &str,
1226        operation_id: &uuid::Uuid,
1227    ) -> Result<(), database::Error> {
1228        let write_txn = self.db.begin_write().map_err(Error::from)?;
1229        let operation_id_str = operation_id.to_string();
1230
1231        {
1232            let mut table = write_txn
1233                .open_table(MELT_QUOTES_TABLE)
1234                .map_err(Error::from)?;
1235
1236            // Read existing quote
1237            let quote_json = table
1238                .get(quote_id)
1239                .map_err(Error::from)?
1240                .map(|v| v.value().to_string());
1241
1242            match quote_json {
1243                Some(json) => {
1244                    let mut quote: wallet::MeltQuote =
1245                        serde_json::from_str(&json).map_err(Error::from)?;
1246
1247                    // Check if already reserved by another operation
1248                    if quote.used_by_operation.is_some() {
1249                        return Err(database::Error::QuoteAlreadyInUse);
1250                    }
1251
1252                    // Reserve the quote
1253                    quote.used_by_operation = Some(operation_id_str);
1254                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1255                    table
1256                        .insert(quote_id, updated_json.as_str())
1257                        .map_err(Error::from)?;
1258                }
1259                None => {
1260                    return Err(database::Error::UnknownQuote);
1261                }
1262            }
1263        }
1264
1265        write_txn.commit().map_err(Error::from)?;
1266        Ok(())
1267    }
1268
1269    #[instrument(skip(self))]
1270    async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1271        let write_txn = self.db.begin_write().map_err(Error::from)?;
1272        let operation_id_str = operation_id.to_string();
1273
1274        {
1275            let mut table = write_txn
1276                .open_table(MELT_QUOTES_TABLE)
1277                .map_err(Error::from)?;
1278
1279            // Collect all quotes first to avoid borrowing issues
1280            let all_quotes: Vec<(String, wallet::MeltQuote)> = table
1281                .iter()
1282                .map_err(Error::from)?
1283                .flatten()
1284                .filter_map(|(id, quote_json)| {
1285                    let quote: wallet::MeltQuote = serde_json::from_str(quote_json.value()).ok()?;
1286                    Some((id.value().to_string(), quote))
1287                })
1288                .collect();
1289
1290            // Update quotes that match the operation_id
1291            for (quote_id, mut quote) in all_quotes {
1292                if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1293                    quote.used_by_operation = None;
1294                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1295                    table
1296                        .insert(quote_id.as_str(), updated_json.as_str())
1297                        .map_err(Error::from)?;
1298                }
1299            }
1300        }
1301
1302        write_txn.commit().map_err(Error::from)?;
1303        Ok(())
1304    }
1305
1306    #[instrument(skip(self))]
1307    async fn reserve_mint_quote(
1308        &self,
1309        quote_id: &str,
1310        operation_id: &uuid::Uuid,
1311    ) -> Result<(), database::Error> {
1312        let write_txn = self.db.begin_write().map_err(Error::from)?;
1313        let operation_id_str = operation_id.to_string();
1314
1315        {
1316            let mut table = write_txn
1317                .open_table(MINT_QUOTES_TABLE)
1318                .map_err(Error::from)?;
1319
1320            // Read existing quote
1321            let quote_json = table
1322                .get(quote_id)
1323                .map_err(Error::from)?
1324                .map(|v| v.value().to_string());
1325
1326            match quote_json {
1327                Some(json) => {
1328                    let mut quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
1329
1330                    // Check if already reserved by another operation
1331                    if quote.used_by_operation.is_some() {
1332                        return Err(database::Error::QuoteAlreadyInUse);
1333                    }
1334
1335                    // Reserve the quote
1336                    quote.used_by_operation = Some(operation_id_str);
1337                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1338                    table
1339                        .insert(quote_id, updated_json.as_str())
1340                        .map_err(Error::from)?;
1341                }
1342                None => {
1343                    return Err(database::Error::UnknownQuote);
1344                }
1345            }
1346        }
1347
1348        write_txn.commit().map_err(Error::from)?;
1349        Ok(())
1350    }
1351
1352    #[instrument(skip(self))]
1353    async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1354        let write_txn = self.db.begin_write().map_err(Error::from)?;
1355        let operation_id_str = operation_id.to_string();
1356
1357        {
1358            let mut table = write_txn
1359                .open_table(MINT_QUOTES_TABLE)
1360                .map_err(Error::from)?;
1361
1362            // Collect all quotes first to avoid borrowing issues
1363            let all_quotes: Vec<(String, MintQuote)> = table
1364                .iter()
1365                .map_err(Error::from)?
1366                .flatten()
1367                .filter_map(|(id, quote_json)| {
1368                    let quote: MintQuote = serde_json::from_str(quote_json.value()).ok()?;
1369                    Some((id.value().to_string(), quote))
1370                })
1371                .collect();
1372
1373            // Update quotes that match the operation_id
1374            for (quote_id, mut quote) in all_quotes {
1375                if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1376                    quote.used_by_operation = None;
1377                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1378                    table
1379                        .insert(quote_id.as_str(), updated_json.as_str())
1380                        .map_err(Error::from)?;
1381                }
1382            }
1383        }
1384
1385        write_txn.commit().map_err(Error::from)?;
1386        Ok(())
1387    }
1388
1389    #[instrument(skip(self, value))]
1390    async fn kv_write(
1391        &self,
1392        primary_namespace: &str,
1393        secondary_namespace: &str,
1394        key: &str,
1395        value: &[u8],
1396    ) -> Result<(), database::Error> {
1397        // Validate parameters according to KV store requirements
1398        validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1399
1400        let write_txn = self.db.begin_write().map_err(Error::from)?;
1401        {
1402            let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1403            table
1404                .insert((primary_namespace, secondary_namespace, key), value)
1405                .map_err(Error::from)?;
1406        }
1407        write_txn.commit().map_err(Error::from)?;
1408
1409        Ok(())
1410    }
1411
1412    #[instrument(skip(self))]
1413    async fn kv_read(
1414        &self,
1415        primary_namespace: &str,
1416        secondary_namespace: &str,
1417        key: &str,
1418    ) -> Result<Option<Vec<u8>>, database::Error> {
1419        // Validate parameters according to KV store requirements
1420        validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1421
1422        let read_txn = self.db.begin_read().map_err(Error::from)?;
1423        let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1424
1425        let result = table
1426            .get((primary_namespace, secondary_namespace, key))
1427            .map_err(Error::from)?
1428            .map(|v| v.value().to_vec());
1429
1430        Ok(result)
1431    }
1432
1433    #[instrument(skip(self))]
1434    async fn kv_list(
1435        &self,
1436        primary_namespace: &str,
1437        secondary_namespace: &str,
1438    ) -> Result<Vec<String>, database::Error> {
1439        // Validate parameters according to KV store requirements
1440        validate_kvstore_params(primary_namespace, secondary_namespace, None)?;
1441
1442        let read_txn = self.db.begin_read().map_err(Error::from)?;
1443        let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1444
1445        let start = (primary_namespace, secondary_namespace, "");
1446        let iter = table.range(start..).map_err(Error::from)?;
1447
1448        let mut keys = Vec::new();
1449
1450        for item in iter {
1451            let (key, _) = item.map_err(Error::from)?;
1452            let (p, s, k) = key.value();
1453            if p == primary_namespace && s == secondary_namespace {
1454                keys.push(k.to_string());
1455            } else {
1456                break;
1457            }
1458        }
1459
1460        Ok(keys)
1461    }
1462
1463    #[instrument(skip(self))]
1464    async fn kv_remove(
1465        &self,
1466        primary_namespace: &str,
1467        secondary_namespace: &str,
1468        key: &str,
1469    ) -> Result<(), database::Error> {
1470        // Validate parameters according to KV store requirements
1471        validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1472
1473        let write_txn = self.db.begin_write().map_err(Error::from)?;
1474        {
1475            let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1476            table
1477                .remove((primary_namespace, secondary_namespace, key))
1478                .map_err(Error::from)?;
1479        }
1480        write_txn.commit().map_err(Error::from)?;
1481
1482        Ok(())
1483    }
1484
1485    #[instrument(skip(self))]
1486    async fn add_p2pk_key(
1487        &self,
1488        pubkey: &PublicKey,
1489        derivation_path: DerivationPath,
1490        derivation_index: u32,
1491    ) -> Result<(), database::Error> {
1492        let write_txn = self.db.begin_write().map_err(Error::from)?;
1493        {
1494            let mut table = write_txn
1495                .open_table(P2PK_SIGNING_KEYS_TABLE)
1496                .map_err(Error::from)?;
1497            table
1498                .insert(
1499                    pubkey.to_bytes().as_slice(),
1500                    serde_json::to_string(&wallet::P2PKSigningKey {
1501                        pubkey: *pubkey,
1502                        derivation_path,
1503                        derivation_index,
1504                        created_time: unix_time(),
1505                    })
1506                    .map_err(Error::from)?
1507                    .as_str(),
1508                )
1509                .map_err(Error::from)?;
1510        }
1511        write_txn.commit().map_err(Error::from)?;
1512        Ok(())
1513    }
1514
1515    #[instrument(skip(self))]
1516    async fn get_p2pk_key(
1517        &self,
1518        pubkey: &PublicKey,
1519    ) -> Result<Option<wallet::P2PKSigningKey>, database::Error> {
1520        let read_txn = self.db.begin_read().map_err(Error::from)?;
1521        let table = read_txn
1522            .open_table(P2PK_SIGNING_KEYS_TABLE)
1523            .map_err(Error::from)?;
1524
1525        if let Some(key) = table
1526            .get(pubkey.to_bytes().as_slice())
1527            .map_err(Error::from)?
1528        {
1529            return Ok(Some(
1530                serde_json::from_str(key.value()).map_err(Error::from)?,
1531            ));
1532        }
1533
1534        Ok(None)
1535    }
1536
1537    #[instrument(skip(self))]
1538    async fn list_p2pk_keys(&self) -> Result<Vec<wallet::P2PKSigningKey>, database::Error> {
1539        let read_txn = self.db.begin_read().map_err(Error::from)?;
1540        let table = read_txn
1541            .open_table(P2PK_SIGNING_KEYS_TABLE)
1542            .map_err(Error::from)?;
1543
1544        let keys: Vec<wallet::P2PKSigningKey> = table
1545            .iter()
1546            .map_err(Error::from)?
1547            .flatten()
1548            .filter_map(|(_k, v)| {
1549                if let Ok(key) = serde_json::from_str::<wallet::P2PKSigningKey>(v.value()) {
1550                    return Some(key);
1551                }
1552
1553                None
1554            })
1555            .collect();
1556
1557        Ok(keys)
1558    }
1559
1560    #[instrument(skip(self))]
1561    async fn latest_p2pk(&self) -> Result<Option<wallet::P2PKSigningKey>, database::Error> {
1562        let read_txn = self.db.begin_read().map_err(Error::from)?;
1563        let table = read_txn
1564            .open_table(P2PK_SIGNING_KEYS_TABLE)
1565            .map_err(Error::from)?;
1566
1567        let latest_key = table
1568            .iter()
1569            .map_err(Error::from)?
1570            .flatten()
1571            .filter_map(|(_k, v)| serde_json::from_str::<wallet::P2PKSigningKey>(v.value()).ok())
1572            .max_by_key(|key| key.derivation_index);
1573
1574        Ok(latest_key)
1575    }
1576}
1577
1578#[cfg(test)]
1579mod test {
1580    use std::path::PathBuf;
1581    use std::str::FromStr;
1582
1583    use cdk_common::database::{self, WalletDatabase};
1584    use cdk_common::{wallet_db_test, Id};
1585
1586    use super::WalletRedbDatabase;
1587
1588    async fn provide_db(test_id: String) -> WalletRedbDatabase {
1589        let path = PathBuf::from(format!("/tmp/cdk-test-{}.redb", test_id));
1590        WalletRedbDatabase::new(&path).expect("database")
1591    }
1592
1593    wallet_db_test!(provide_db);
1594
1595    #[tokio::test]
1596    async fn increment_keyset_counter_returns_error_on_overflow() {
1597        let db = provide_db(format!("counter-overflow-{}", uuid::Uuid::new_v4())).await;
1598        let keyset_id = Id::from_str("00916bbf7ef91a36").expect("valid keyset id");
1599
1600        let first = db
1601            .increment_keyset_counter(&keyset_id, u32::MAX)
1602            .await
1603            .expect("first increment should fit");
1604        assert_eq!(first, u32::MAX);
1605
1606        match db.increment_keyset_counter(&keyset_id, 1).await {
1607            Err(database::Error::AmountOverflow) => {}
1608            Ok(counter) => panic!("counter should not wrap, got {counter}"),
1609            Err(err) => panic!("expected amount overflow, got {err}"),
1610        }
1611    }
1612}