Skip to main content

cdk_redb/wallet/
mod.rs

1//! Redb Wallet
2
3use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::path::Path;
6use std::str::FromStr;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use cdk_common::bitcoin::bip32::DerivationPath;
11use cdk_common::database::{validate_kvstore_params, WalletDatabase};
12use cdk_common::mint_url::MintUrl;
13use cdk_common::nut00::KnownMethod;
14use cdk_common::util::unix_time;
15use cdk_common::wallet::{
16    self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
17};
18use cdk_common::{
19    database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod,
20    PublicKey, SpendingConditions, State,
21};
22use redb::{Database, MultimapTableDefinition, ReadableDatabase, ReadableTable, TableDefinition};
23use tracing::instrument;
24
25use crate::error::Error;
26use crate::migrations::migrate_00_to_01;
27use crate::wallet::migrations::{
28    migrate_01_to_02, migrate_02_to_03, migrate_03_to_04, migrate_04_to_05,
29};
30
31mod migrations;
32
33// <Mint_url, Info>
34const MINTS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mints_table");
35// <Mint_Url, Keyset_id>
36const MINT_KEYSETS_TABLE: MultimapTableDefinition<&str, &[u8]> =
37    MultimapTableDefinition::new("mint_keysets");
38// <Keyset_id, KeysetInfo>
39const KEYSETS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("keysets");
40// <Quote_id, quote>
41const MINT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_quotes");
42// <Quote_id, quote>
43const MELT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("melt_quotes");
44const MINT_KEYS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_keys");
45// <Y, Proof Info>
46const PROOFS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("proofs");
47const CONFIG_TABLE: TableDefinition<&str, &str> = TableDefinition::new("config");
48const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_counter");
49// <Transaction_id, Transaction>
50const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
51// <Saga_id, WalletSaga>
52const SAGAS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("wallet_sagas");
53
54// <Pubkey, P2PKSigningKey>
55const P2PK_SIGNING_KEYS_TABLE: TableDefinition<&[u8], &str> =
56    TableDefinition::new("p2pk_signing_keys");
57
58const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
59// <(primary_namespace, secondary_namespace, key), value>
60const KV_STORE_TABLE: TableDefinition<(&str, &str, &str), &[u8]> = TableDefinition::new("kv_store");
61
62const DATABASE_VERSION: u32 = 5;
63
64/// Wallet Redb Database
65#[derive(Debug, Clone)]
66pub struct WalletRedbDatabase {
67    db: Arc<Database>,
68}
69
70impl WalletRedbDatabase {
71    /// Create new [`WalletRedbDatabase`]
72    pub fn new(path: &Path) -> Result<Self, Error> {
73        {
74            // Check if parent directory exists before attempting to create database
75            if let Some(parent) = path.parent() {
76                if !parent.exists() {
77                    return Err(Error::Io(std::io::Error::new(
78                        std::io::ErrorKind::NotFound,
79                        format!("Parent directory does not exist: {parent:?}"),
80                    )));
81                }
82            }
83
84            let db = Arc::new(Database::create(path)?);
85
86            let db_version: Option<String>;
87            {
88                // Check database version
89                let read_txn = db.begin_read()?;
90                let table = read_txn.open_table(CONFIG_TABLE);
91
92                db_version = match table {
93                    Ok(table) => table.get("db_version")?.map(|v| v.value().to_string()),
94                    Err(_) => None,
95                };
96            }
97
98            match db_version {
99                Some(db_version) => {
100                    let mut current_file_version = u32::from_str(&db_version)?;
101                    tracing::info!("Current file version {}", current_file_version);
102
103                    match current_file_version.cmp(&DATABASE_VERSION) {
104                        Ordering::Less => {
105                            tracing::info!(
106                                "Database needs to be upgraded at {} current is {}",
107                                current_file_version,
108                                DATABASE_VERSION
109                            );
110                            if current_file_version == 0 {
111                                current_file_version = migrate_00_to_01(Arc::clone(&db))?;
112                            }
113
114                            if current_file_version == 1 {
115                                current_file_version = migrate_01_to_02(Arc::clone(&db))?;
116                            }
117
118                            if current_file_version == 2 {
119                                current_file_version = migrate_02_to_03(Arc::clone(&db))?;
120                            }
121
122                            if current_file_version == 3 {
123                                current_file_version = migrate_03_to_04(Arc::clone(&db))?;
124                            }
125
126                            if current_file_version == 4 {
127                                current_file_version = migrate_04_to_05(Arc::clone(&db))?;
128                            }
129
130                            if current_file_version != DATABASE_VERSION {
131                                tracing::warn!(
132                                    "Database upgrade did not complete at {} current is {}",
133                                    current_file_version,
134                                    DATABASE_VERSION
135                                );
136                                return Err(Error::UnknownDatabaseVersion);
137                            }
138
139                            let write_txn = db.begin_write()?;
140                            {
141                                let mut table = write_txn.open_table(CONFIG_TABLE)?;
142
143                                table
144                                    .insert("db_version", DATABASE_VERSION.to_string().as_str())?;
145                            }
146
147                            write_txn.commit()?;
148                        }
149                        Ordering::Equal => {
150                            tracing::info!("Database is at current version {}", DATABASE_VERSION);
151                        }
152                        Ordering::Greater => {
153                            tracing::warn!(
154                                "Database upgrade did not complete at {} current is {}",
155                                current_file_version,
156                                DATABASE_VERSION
157                            );
158                            return Err(Error::UnknownDatabaseVersion);
159                        }
160                    }
161                }
162                None => {
163                    let write_txn = db.begin_write()?;
164                    {
165                        let mut table = write_txn.open_table(CONFIG_TABLE)?;
166                        // Open all tables to init a new db
167                        let _ = write_txn.open_table(MINTS_TABLE)?;
168                        let _ = write_txn.open_multimap_table(MINT_KEYSETS_TABLE)?;
169                        let _ = write_txn.open_table(KEYSETS_TABLE)?;
170                        let _ = write_txn.open_table(MINT_QUOTES_TABLE)?;
171                        let _ = write_txn.open_table(MELT_QUOTES_TABLE)?;
172                        let _ = write_txn.open_table(MINT_KEYS_TABLE)?;
173                        let _ = write_txn.open_table(PROOFS_TABLE)?;
174                        let _ = write_txn.open_table(KEYSET_COUNTER)?;
175                        let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
176                        let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
177                        let _ = write_txn.open_table(KV_STORE_TABLE)?;
178                        let _ = write_txn.open_table(P2PK_SIGNING_KEYS_TABLE)?;
179                        table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
180                    }
181
182                    write_txn.commit()?;
183                }
184            }
185            drop(db);
186        }
187
188        // Check parent directory again for final database creation
189        if let Some(parent) = path.parent() {
190            if !parent.exists() {
191                return Err(Error::Io(std::io::Error::new(
192                    std::io::ErrorKind::NotFound,
193                    format!("Parent directory does not exist: {parent:?}"),
194                )));
195            }
196        }
197
198        let db = Database::create(path)?;
199
200        Ok(Self { db: Arc::new(db) })
201    }
202}
203
204#[async_trait]
205impl WalletDatabase<database::Error> for WalletRedbDatabase {
206    #[instrument(skip(self))]
207    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
208        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
209        let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
210
211        if let Some(mint_info) = table
212            .get(mint_url.to_string().as_str())
213            .map_err(Error::from)?
214        {
215            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
216        }
217
218        Ok(None)
219    }
220
221    #[instrument(skip(self))]
222    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
223        let read_txn = self.db.begin_read().map_err(Error::from)?;
224        let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
225        let mints = table
226            .iter()
227            .map_err(Error::from)?
228            .flatten()
229            .filter_map(|(mint, mint_info)| {
230                MintUrl::from_str(mint.value())
231                    .ok()
232                    .map(|url| (url, serde_json::from_str(mint_info.value()).ok()))
233            })
234            .collect();
235
236        Ok(mints)
237    }
238
239    #[instrument(skip(self))]
240    async fn get_mint_keysets(
241        &self,
242        mint_url: MintUrl,
243    ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
244        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
245        let table = read_txn
246            .open_multimap_table(MINT_KEYSETS_TABLE)
247            .map_err(Error::from)?;
248
249        let keyset_ids = table
250            .get(mint_url.to_string().as_str())
251            .map_err(Error::from)?
252            .flatten()
253            .map(|k| Id::from_bytes(k.value()))
254            .collect::<Result<Vec<_>, _>>()?;
255
256        let mut keysets = vec![];
257
258        let keysets_t = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
259
260        for keyset_id in keyset_ids {
261            if let Some(keyset) = keysets_t
262                .get(keyset_id.to_bytes().as_slice())
263                .map_err(Error::from)?
264            {
265                let keyset = serde_json::from_str(keyset.value()).map_err(Error::from)?;
266
267                keysets.push(keyset);
268            }
269        }
270
271        match keysets.is_empty() {
272            true => Ok(None),
273            false => Ok(Some(keysets)),
274        }
275    }
276
277    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
278    async fn get_keyset_by_id(
279        &self,
280        keyset_id: &Id,
281    ) -> Result<Option<KeySetInfo>, database::Error> {
282        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
283        let table = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
284
285        match table
286            .get(keyset_id.to_bytes().as_slice())
287            .map_err(Error::from)?
288        {
289            Some(keyset) => {
290                let keyset: KeySetInfo =
291                    serde_json::from_str(keyset.value()).map_err(Error::from)?;
292
293                Ok(Some(keyset))
294            }
295            None => Ok(None),
296        }
297    }
298
299    #[instrument(skip_all)]
300    async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
301        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
302        let table = read_txn
303            .open_table(MINT_QUOTES_TABLE)
304            .map_err(Error::from)?;
305
306        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
307            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
308        }
309
310        Ok(None)
311    }
312
313    #[instrument(skip_all)]
314    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
315        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
316        let table = read_txn
317            .open_table(MINT_QUOTES_TABLE)
318            .map_err(Error::from)?;
319
320        Ok(table
321            .iter()
322            .map_err(Error::from)?
323            .flatten()
324            .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
325            .collect())
326    }
327
328    async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
329        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
330        let table = read_txn
331            .open_table(MINT_QUOTES_TABLE)
332            .map_err(Error::from)?;
333
334        Ok(table
335            .iter()
336            .map_err(Error::from)?
337            .flatten()
338            .flat_map(|(_id, quote)| serde_json::from_str::<MintQuote>(quote.value()).ok())
339            .filter(|quote| {
340                quote.amount_issued == Amount::ZERO
341                    || quote.payment_method == PaymentMethod::Known(KnownMethod::Bolt12)
342            })
343            .collect())
344    }
345
346    #[instrument(skip_all)]
347    async fn get_melt_quote(
348        &self,
349        quote_id: &str,
350    ) -> Result<Option<wallet::MeltQuote>, database::Error> {
351        let read_txn = self.db.begin_read().map_err(Error::from)?;
352        let table = read_txn
353            .open_table(MELT_QUOTES_TABLE)
354            .map_err(Error::from)?;
355
356        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
357            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
358        }
359
360        Ok(None)
361    }
362
363    #[instrument(skip_all)]
364    async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
365        let read_txn = self.db.begin_read().map_err(Error::from)?;
366        let table = read_txn
367            .open_table(MELT_QUOTES_TABLE)
368            .map_err(Error::from)?;
369
370        Ok(table
371            .iter()
372            .map_err(Error::from)?
373            .flatten()
374            .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
375            .collect())
376    }
377
378    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
379    async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
380        let read_txn = self.db.begin_read().map_err(Error::from)?;
381        let table = read_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
382
383        if let Some(mint_info) = table
384            .get(keyset_id.to_string().as_str())
385            .map_err(Error::from)?
386        {
387            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
388        }
389
390        Ok(None)
391    }
392
393    #[instrument(skip_all)]
394    async fn get_proofs(
395        &self,
396        mint_url: Option<MintUrl>,
397        unit: Option<CurrencyUnit>,
398        state: Option<Vec<State>>,
399        spending_conditions: Option<Vec<SpendingConditions>>,
400    ) -> Result<Vec<ProofInfo>, database::Error> {
401        let read_txn = self.db.begin_read().map_err(Error::from)?;
402
403        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
404
405        let proofs: Vec<ProofInfo> = table
406            .iter()
407            .map_err(Error::from)?
408            .flatten()
409            .filter_map(|(_k, v)| {
410                let mut proof = None;
411
412                if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
413                    if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
414                    {
415                        proof = Some(proof_info)
416                    }
417                }
418
419                proof
420            })
421            .collect();
422
423        Ok(proofs)
424    }
425
426    #[instrument(skip(self, ys))]
427    async fn get_proofs_by_ys(
428        &self,
429        ys: Vec<PublicKey>,
430    ) -> Result<Vec<ProofInfo>, database::Error> {
431        if ys.is_empty() {
432            return Ok(Vec::new());
433        }
434
435        let read_txn = self.db.begin_read().map_err(Error::from)?;
436        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
437
438        let mut proofs = Vec::new();
439
440        for y in ys {
441            if let Some(proof) = table.get(y.to_bytes().as_slice()).map_err(Error::from)? {
442                let proof_info =
443                    serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
444                proofs.push(proof_info);
445            }
446        }
447
448        Ok(proofs)
449    }
450
451    async fn get_balance(
452        &self,
453        mint_url: Option<MintUrl>,
454        unit: Option<CurrencyUnit>,
455        state: Option<Vec<State>>,
456    ) -> Result<u64, database::Error> {
457        // For redb, we still need to fetch all proofs and sum them
458        // since redb doesn't have SQL aggregation
459        let proofs = self.get_proofs(mint_url, unit, state, None).await?;
460        Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
461    }
462
463    #[instrument(skip(self))]
464    async fn get_transaction(
465        &self,
466        transaction_id: TransactionId,
467    ) -> Result<Option<Transaction>, database::Error> {
468        let read_txn = self.db.begin_read().map_err(Error::from)?;
469        let table = read_txn
470            .open_table(TRANSACTIONS_TABLE)
471            .map_err(Error::from)?;
472
473        if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
474            return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
475        }
476
477        Ok(None)
478    }
479
480    #[instrument(skip(self))]
481    async fn list_transactions(
482        &self,
483        mint_url: Option<MintUrl>,
484        direction: Option<TransactionDirection>,
485        unit: Option<CurrencyUnit>,
486    ) -> Result<Vec<Transaction>, database::Error> {
487        let read_txn = self.db.begin_read().map_err(Error::from)?;
488
489        let table = read_txn
490            .open_table(TRANSACTIONS_TABLE)
491            .map_err(Error::from)?;
492
493        let transactions: Vec<Transaction> = table
494            .iter()
495            .map_err(Error::from)?
496            .flatten()
497            .filter_map(|(_k, v)| {
498                let mut transaction = None;
499
500                if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
501                    if tx.matches_conditions(&mint_url, &direction, &unit) {
502                        transaction = Some(tx)
503                    }
504                }
505
506                transaction
507            })
508            .collect();
509
510        Ok(transactions)
511    }
512
513    #[instrument(skip(self, added, removed_ys))]
514    async fn update_proofs(
515        &self,
516        added: Vec<ProofInfo>,
517        removed_ys: Vec<PublicKey>,
518    ) -> Result<(), database::Error> {
519        let write_txn = self.db.begin_write().map_err(Error::from)?;
520        {
521            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
522
523            for proof_info in added.iter() {
524                table
525                    .insert(
526                        proof_info.y.to_bytes().as_slice(),
527                        serde_json::to_string(&proof_info)
528                            .map_err(Error::from)?
529                            .as_str(),
530                    )
531                    .map_err(Error::from)?;
532            }
533
534            for y in removed_ys.iter() {
535                table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
536            }
537        }
538        write_txn.commit().map_err(Error::from)?;
539        Ok(())
540    }
541
542    async fn update_proofs_state(
543        &self,
544        ys: Vec<PublicKey>,
545        state: State,
546    ) -> Result<(), database::Error> {
547        let write_txn = self.db.begin_write().map_err(Error::from)?;
548        {
549            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
550
551            for y in ys {
552                let y_slice = y.to_bytes();
553                let proof = table
554                    .get(y_slice.as_slice())
555                    .map_err(Error::from)?
556                    .ok_or(Error::UnknownY)?;
557
558                let mut proof_info =
559                    serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
560                drop(proof);
561
562                proof_info.state = state;
563
564                table
565                    .insert(
566                        y_slice.as_slice(),
567                        serde_json::to_string(&proof_info)
568                            .map_err(Error::from)?
569                            .as_str(),
570                    )
571                    .map_err(Error::from)?;
572            }
573        }
574        write_txn.commit().map_err(Error::from)?;
575        Ok(())
576    }
577
578    #[instrument(skip(self))]
579    async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
580        let id = transaction.id();
581        let write_txn = self.db.begin_write().map_err(Error::from)?;
582        {
583            let mut table = write_txn
584                .open_table(TRANSACTIONS_TABLE)
585                .map_err(Error::from)?;
586            table
587                .insert(
588                    id.as_slice(),
589                    serde_json::to_string(&transaction)
590                        .map_err(Error::from)?
591                        .as_str(),
592                )
593                .map_err(Error::from)?;
594        }
595        write_txn.commit().map_err(Error::from)?;
596        Ok(())
597    }
598
599    #[instrument(skip(self))]
600    async fn update_mint_url(
601        &self,
602        old_mint_url: MintUrl,
603        new_mint_url: MintUrl,
604    ) -> Result<(), database::Error> {
605        let write_txn = self.db.begin_write().map_err(Error::from)?;
606
607        // Update proofs table
608        {
609            let read_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
610            let proofs: Vec<ProofInfo> = read_table
611                .iter()
612                .map_err(Error::from)?
613                .flatten()
614                .filter_map(|(_k, v)| {
615                    let proof_info = serde_json::from_str::<ProofInfo>(v.value()).ok()?;
616                    if proof_info.mint_url == old_mint_url {
617                        Some(proof_info)
618                    } else {
619                        None
620                    }
621                })
622                .collect();
623            drop(read_table);
624
625            if !proofs.is_empty() {
626                let mut write_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
627                for mut proof_info in proofs {
628                    proof_info.mint_url = new_mint_url.clone();
629                    write_table
630                        .insert(
631                            proof_info.y.to_bytes().as_slice(),
632                            serde_json::to_string(&proof_info)
633                                .map_err(Error::from)?
634                                .as_str(),
635                        )
636                        .map_err(Error::from)?;
637                }
638            }
639        }
640
641        // Update mint quotes
642        {
643            let mut table = write_txn
644                .open_table(MINT_QUOTES_TABLE)
645                .map_err(Error::from)?;
646
647            let unix_time = unix_time();
648
649            let quotes: Vec<MintQuote> = table
650                .iter()
651                .map_err(Error::from)?
652                .flatten()
653                .filter_map(|(_, quote)| {
654                    let mut q: MintQuote = serde_json::from_str(quote.value()).ok()?;
655                    if q.mint_url == old_mint_url && q.expiry >= unix_time {
656                        q.mint_url = new_mint_url.clone();
657                        Some(q)
658                    } else {
659                        None
660                    }
661                })
662                .collect();
663
664            for quote in quotes {
665                table
666                    .insert(
667                        quote.id.as_str(),
668                        serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
669                    )
670                    .map_err(Error::from)?;
671            }
672        }
673
674        write_txn.commit().map_err(Error::from)?;
675        Ok(())
676    }
677
678    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
679    async fn increment_keyset_counter(
680        &self,
681        keyset_id: &Id,
682        count: u32,
683    ) -> Result<u32, database::Error> {
684        let write_txn = self.db.begin_write().map_err(Error::from)?;
685        let new_counter = {
686            let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
687            let current_counter = table
688                .get(keyset_id.to_string().as_str())
689                .map_err(Error::from)?
690                .map(|x| x.value())
691                .unwrap_or_default();
692
693            let new_counter = current_counter + count;
694
695            table
696                .insert(keyset_id.to_string().as_str(), new_counter)
697                .map_err(Error::from)?;
698
699            new_counter
700        };
701        write_txn.commit().map_err(Error::from)?;
702        Ok(new_counter)
703    }
704
705    #[instrument(skip(self))]
706    async fn add_mint(
707        &self,
708        mint_url: MintUrl,
709        mint_info: Option<MintInfo>,
710    ) -> Result<(), database::Error> {
711        let write_txn = self.db.begin_write().map_err(Error::from)?;
712        {
713            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
714            table
715                .insert(
716                    mint_url.to_string().as_str(),
717                    serde_json::to_string(&mint_info)
718                        .map_err(Error::from)?
719                        .as_str(),
720                )
721                .map_err(Error::from)?;
722        }
723        write_txn.commit().map_err(Error::from)?;
724        Ok(())
725    }
726
727    #[instrument(skip(self))]
728    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
729        let write_txn = self.db.begin_write().map_err(Error::from)?;
730        {
731            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
732            table
733                .remove(mint_url.to_string().as_str())
734                .map_err(Error::from)?;
735        }
736        write_txn.commit().map_err(Error::from)?;
737        Ok(())
738    }
739
740    #[instrument(skip(self))]
741    async fn add_mint_keysets(
742        &self,
743        mint_url: MintUrl,
744        keysets: Vec<KeySetInfo>,
745    ) -> Result<(), database::Error> {
746        let write_txn = self.db.begin_write().map_err(Error::from)?;
747        {
748            let mut table = write_txn
749                .open_multimap_table(MINT_KEYSETS_TABLE)
750                .map_err(Error::from)?;
751            let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
752            let mut u32_table = write_txn
753                .open_table(KEYSET_U32_MAPPING)
754                .map_err(Error::from)?;
755
756            let mut existing_u32 = false;
757
758            for keyset in keysets {
759                // Check if keyset already exists
760                let existing_keyset = {
761                    let existing_keyset = keysets_table
762                        .get(keyset.id.to_bytes().as_slice())
763                        .map_err(Error::from)?;
764
765                    existing_keyset.map(|r| r.value().to_string())
766                };
767
768                let existing = u32_table
769                    .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
770                    .map_err(Error::from)?;
771
772                match existing {
773                    None => existing_u32 = false,
774                    Some(id) => {
775                        let id = Id::from_str(id.value())?;
776
777                        if id == keyset.id {
778                            existing_u32 = false;
779                        } else {
780                            existing_u32 = true;
781                            break;
782                        }
783                    }
784                }
785
786                let keyset = if let Some(existing_keyset) = existing_keyset {
787                    let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
788
789                    existing_keyset.active = keyset.active;
790                    existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
791
792                    existing_keyset
793                } else {
794                    table
795                        .insert(
796                            mint_url.to_string().as_str(),
797                            keyset.id.to_bytes().as_slice(),
798                        )
799                        .map_err(Error::from)?;
800
801                    keyset
802                };
803
804                keysets_table
805                    .insert(
806                        keyset.id.to_bytes().as_slice(),
807                        serde_json::to_string(&keyset)
808                            .map_err(Error::from)?
809                            .as_str(),
810                    )
811                    .map_err(Error::from)?;
812            }
813
814            if existing_u32 {
815                tracing::warn!("Keyset already exists for keyset id");
816                return Err(database::Error::Duplicate);
817            }
818        }
819        write_txn.commit().map_err(Error::from)?;
820        Ok(())
821    }
822
823    #[instrument(skip_all)]
824    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
825        let write_txn = self.db.begin_write().map_err(Error::from)?;
826        {
827            let mut table = write_txn
828                .open_table(MINT_QUOTES_TABLE)
829                .map_err(Error::from)?;
830
831            // Check for existing quote and version match
832            let existing_quote_json = table
833                .get(quote.id.as_str())
834                .map_err(Error::from)?
835                .map(|v| v.value().to_string());
836
837            let mut quote_to_save = quote.clone();
838
839            if let Some(json) = existing_quote_json {
840                let existing_quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
841
842                if existing_quote.version != quote.version {
843                    return Err(database::Error::ConcurrentUpdate);
844                }
845
846                // Increment version for update
847                quote_to_save.version = quote.version.wrapping_add(1);
848            }
849
850            table
851                .insert(
852                    quote_to_save.id.as_str(),
853                    serde_json::to_string(&quote_to_save)
854                        .map_err(Error::from)?
855                        .as_str(),
856                )
857                .map_err(Error::from)?;
858        }
859        write_txn.commit().map_err(Error::from)?;
860        Ok(())
861    }
862
863    #[instrument(skip_all)]
864    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
865        let write_txn = self.db.begin_write().map_err(Error::from)?;
866        {
867            let mut table = write_txn
868                .open_table(MINT_QUOTES_TABLE)
869                .map_err(Error::from)?;
870            table.remove(quote_id).map_err(Error::from)?;
871        }
872        write_txn.commit().map_err(Error::from)?;
873        Ok(())
874    }
875
876    #[instrument(skip_all)]
877    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
878        let write_txn = self.db.begin_write().map_err(Error::from)?;
879        {
880            let mut table = write_txn
881                .open_table(MELT_QUOTES_TABLE)
882                .map_err(Error::from)?;
883
884            // Check for existing quote and version match
885            let existing_quote_json = table
886                .get(quote.id.as_str())
887                .map_err(Error::from)?
888                .map(|v| v.value().to_string());
889
890            let mut quote_to_save = quote.clone();
891
892            if let Some(json) = existing_quote_json {
893                let existing_quote: wallet::MeltQuote =
894                    serde_json::from_str(&json).map_err(Error::from)?;
895
896                if existing_quote.version != quote.version {
897                    return Err(database::Error::ConcurrentUpdate);
898                }
899
900                // Increment version for update
901                quote_to_save.version = quote.version.wrapping_add(1);
902            }
903
904            table
905                .insert(
906                    quote_to_save.id.as_str(),
907                    serde_json::to_string(&quote_to_save)
908                        .map_err(Error::from)?
909                        .as_str(),
910                )
911                .map_err(Error::from)?;
912        }
913        write_txn.commit().map_err(Error::from)?;
914        Ok(())
915    }
916
917    #[instrument(skip_all)]
918    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
919        let write_txn = self.db.begin_write().map_err(Error::from)?;
920        {
921            let mut table = write_txn
922                .open_table(MELT_QUOTES_TABLE)
923                .map_err(Error::from)?;
924            table.remove(quote_id).map_err(Error::from)?;
925        }
926        write_txn.commit().map_err(Error::from)?;
927        Ok(())
928    }
929
930    #[instrument(skip_all)]
931    async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
932        let write_txn = self.db.begin_write().map_err(Error::from)?;
933
934        keyset.verify_id()?;
935
936        {
937            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
938
939            let existing_keys = table
940                .insert(
941                    keyset.id.to_string().as_str(),
942                    serde_json::to_string(&keyset.keys)
943                        .map_err(Error::from)?
944                        .as_str(),
945                )
946                .map_err(Error::from)?
947                .is_some();
948
949            let mut table = write_txn
950                .open_table(KEYSET_U32_MAPPING)
951                .map_err(Error::from)?;
952
953            let existing = table
954                .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
955                .map_err(Error::from)?;
956
957            let existing_u32 = match existing {
958                None => false,
959                Some(id) => {
960                    let id = Id::from_str(id.value())?;
961                    id != keyset.id
962                }
963            };
964
965            if existing_keys || existing_u32 {
966                tracing::warn!("Keys already exist for keyset id");
967                return Err(database::Error::Duplicate);
968            }
969        }
970        write_txn.commit().map_err(Error::from)?;
971        Ok(())
972    }
973
974    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
975    async fn remove_keys(&self, keyset_id: &Id) -> Result<(), database::Error> {
976        let write_txn = self.db.begin_write().map_err(Error::from)?;
977        {
978            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
979
980            table
981                .remove(keyset_id.to_string().as_str())
982                .map_err(Error::from)?;
983        }
984        write_txn.commit().map_err(Error::from)?;
985        Ok(())
986    }
987
988    #[instrument(skip(self))]
989    async fn remove_transaction(
990        &self,
991        transaction_id: TransactionId,
992    ) -> Result<(), database::Error> {
993        let write_txn = self.db.begin_write().map_err(Error::from)?;
994        {
995            let mut table = write_txn
996                .open_table(TRANSACTIONS_TABLE)
997                .map_err(Error::from)?;
998            table
999                .remove(transaction_id.as_slice())
1000                .map_err(Error::from)?;
1001        }
1002        write_txn.commit().map_err(Error::from)?;
1003        Ok(())
1004    }
1005
1006    #[instrument(skip(self))]
1007    async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
1008        let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
1009        let id_str = saga.id.to_string();
1010
1011        let write_txn = self.db.begin_write().map_err(Error::from)?;
1012        {
1013            let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1014            table
1015                .insert(id_str.as_str(), saga_json.as_str())
1016                .map_err(Error::from)?;
1017        }
1018        write_txn.commit().map_err(Error::from)?;
1019        Ok(())
1020    }
1021
1022    #[instrument(skip(self))]
1023    async fn get_saga(
1024        &self,
1025        id: &uuid::Uuid,
1026    ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1027        let read_txn = self.db.begin_read().map_err(Error::from)?;
1028        let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1029        let id_str = id.to_string();
1030
1031        let result = table
1032            .get(id_str.as_str())
1033            .map_err(Error::from)?
1034            .map(|saga| serde_json::from_str(saga.value()).map_err(Error::from))
1035            .transpose()?;
1036
1037        Ok(result)
1038    }
1039
1040    #[instrument(skip(self))]
1041    async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1042        let id_str = saga.id.to_string();
1043
1044        // The saga.version has already been incremented by the caller, so we check
1045        // for (saga.version - 1) as the expected version in the database.
1046        let expected_version = saga.version.saturating_sub(1);
1047
1048        let write_txn = self.db.begin_write().map_err(Error::from)?;
1049        let updated = {
1050            let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1051
1052            // Read existing saga to check version (optimistic locking)
1053            let existing_saga_json = table
1054                .get(id_str.as_str())
1055                .map_err(Error::from)?
1056                .map(|v| v.value().to_string());
1057
1058            match existing_saga_json {
1059                Some(json) => {
1060                    let existing_saga: wallet::WalletSaga =
1061                        serde_json::from_str(&json).map_err(Error::from)?;
1062
1063                    // Check if version matches expected version
1064                    if existing_saga.version != expected_version {
1065                        // Version mismatch - another instance modified it
1066                        false
1067                    } else {
1068                        // Version matches - safe to update
1069                        let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
1070                        table
1071                            .insert(id_str.as_str(), saga_json.as_str())
1072                            .map_err(Error::from)?;
1073                        true
1074                    }
1075                }
1076                None => {
1077                    // Saga doesn't exist - can't update
1078                    false
1079                }
1080            }
1081        };
1082        write_txn.commit().map_err(Error::from)?;
1083        Ok(updated)
1084    }
1085
1086    #[instrument(skip(self))]
1087    async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1088        let write_txn = self.db.begin_write().map_err(Error::from)?;
1089        let id_str = id.to_string();
1090        {
1091            let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1092            table.remove(id_str.as_str()).map_err(Error::from)?;
1093        }
1094        write_txn.commit().map_err(Error::from)?;
1095        Ok(())
1096    }
1097
1098    #[instrument(skip(self))]
1099    async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1100        let read_txn = self.db.begin_read().map_err(Error::from)?;
1101        let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1102
1103        let mut sagas: Vec<wallet::WalletSaga> = table
1104            .iter()
1105            .map_err(Error::from)?
1106            .flatten()
1107            .filter_map(|(_, saga_json)| {
1108                serde_json::from_str::<wallet::WalletSaga>(saga_json.value()).ok()
1109            })
1110            .collect();
1111
1112        // Sort by created_at ascending (oldest first)
1113        sagas.sort_by_key(|saga| saga.created_at);
1114
1115        Ok(sagas)
1116    }
1117
1118    #[instrument(skip(self))]
1119    async fn reserve_proofs(
1120        &self,
1121        ys: Vec<PublicKey>,
1122        operation_id: &uuid::Uuid,
1123    ) -> Result<(), database::Error> {
1124        let write_txn = self.db.begin_write().map_err(Error::from)?;
1125
1126        {
1127            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1128
1129            for y in ys {
1130                let y_bytes = y.to_bytes();
1131
1132                // Read the proof and convert to string immediately
1133                let proof_json_str = {
1134                    let proof_json_opt = table.get(y_bytes.as_slice()).map_err(Error::from)?;
1135                    proof_json_opt.map(|proof_json| proof_json.value().to_string())
1136                };
1137
1138                let Some(proof_json_str) = proof_json_str else {
1139                    return Err(database::Error::ProofNotUnspent);
1140                };
1141
1142                let mut proof: ProofInfo =
1143                    serde_json::from_str(&proof_json_str).map_err(Error::from)?;
1144
1145                if proof.state != State::Unspent {
1146                    return Err(database::Error::ProofNotUnspent);
1147                }
1148
1149                proof.state = State::Reserved;
1150                proof.used_by_operation = Some(*operation_id);
1151
1152                let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1153                table
1154                    .insert(y_bytes.as_slice(), updated_json.as_str())
1155                    .map_err(Error::from)?;
1156            }
1157        }
1158
1159        write_txn.commit().map_err(Error::from)?;
1160        Ok(())
1161    }
1162
1163    #[instrument(skip(self))]
1164    async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1165        let write_txn = self.db.begin_write().map_err(Error::from)?;
1166
1167        {
1168            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1169
1170            // Collect all proofs first to avoid borrowing issues
1171            let all_proofs: Vec<(Vec<u8>, ProofInfo)> = table
1172                .iter()
1173                .map_err(Error::from)?
1174                .flatten()
1175                .filter_map(|(y, proof_json)| {
1176                    let proof: ProofInfo = serde_json::from_str(proof_json.value()).ok()?;
1177                    Some((y.value().to_vec(), proof))
1178                })
1179                .collect();
1180
1181            // Now update proofs that match the operation_id
1182            for (y_bytes, mut proof) in all_proofs {
1183                if proof.used_by_operation == Some(*operation_id) {
1184                    proof.state = State::Unspent;
1185                    proof.used_by_operation = None;
1186
1187                    let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1188                    table
1189                        .insert(y_bytes.as_slice(), updated_json.as_str())
1190                        .map_err(Error::from)?;
1191                }
1192            }
1193        }
1194
1195        write_txn.commit().map_err(Error::from)?;
1196        Ok(())
1197    }
1198
1199    #[instrument(skip(self))]
1200    async fn get_reserved_proofs(
1201        &self,
1202        operation_id: &uuid::Uuid,
1203    ) -> Result<Vec<ProofInfo>, database::Error> {
1204        let read_txn = self.db.begin_read().map_err(Error::from)?;
1205        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1206
1207        let proofs: Vec<ProofInfo> = table
1208            .iter()
1209            .map_err(Error::from)?
1210            .flatten()
1211            .filter_map(|(_, proof_json)| {
1212                serde_json::from_str::<ProofInfo>(proof_json.value()).ok()
1213            })
1214            .filter(|proof| proof.used_by_operation == Some(*operation_id))
1215            .collect();
1216
1217        Ok(proofs)
1218    }
1219
1220    #[instrument(skip(self))]
1221    async fn reserve_melt_quote(
1222        &self,
1223        quote_id: &str,
1224        operation_id: &uuid::Uuid,
1225    ) -> Result<(), database::Error> {
1226        let write_txn = self.db.begin_write().map_err(Error::from)?;
1227        let operation_id_str = operation_id.to_string();
1228
1229        {
1230            let mut table = write_txn
1231                .open_table(MELT_QUOTES_TABLE)
1232                .map_err(Error::from)?;
1233
1234            // Read existing quote
1235            let quote_json = table
1236                .get(quote_id)
1237                .map_err(Error::from)?
1238                .map(|v| v.value().to_string());
1239
1240            match quote_json {
1241                Some(json) => {
1242                    let mut quote: wallet::MeltQuote =
1243                        serde_json::from_str(&json).map_err(Error::from)?;
1244
1245                    // Check if already reserved by another operation
1246                    if quote.used_by_operation.is_some() {
1247                        return Err(database::Error::QuoteAlreadyInUse);
1248                    }
1249
1250                    // Reserve the quote
1251                    quote.used_by_operation = Some(operation_id_str);
1252                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1253                    table
1254                        .insert(quote_id, updated_json.as_str())
1255                        .map_err(Error::from)?;
1256                }
1257                None => {
1258                    return Err(database::Error::UnknownQuote);
1259                }
1260            }
1261        }
1262
1263        write_txn.commit().map_err(Error::from)?;
1264        Ok(())
1265    }
1266
1267    #[instrument(skip(self))]
1268    async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1269        let write_txn = self.db.begin_write().map_err(Error::from)?;
1270        let operation_id_str = operation_id.to_string();
1271
1272        {
1273            let mut table = write_txn
1274                .open_table(MELT_QUOTES_TABLE)
1275                .map_err(Error::from)?;
1276
1277            // Collect all quotes first to avoid borrowing issues
1278            let all_quotes: Vec<(String, wallet::MeltQuote)> = table
1279                .iter()
1280                .map_err(Error::from)?
1281                .flatten()
1282                .filter_map(|(id, quote_json)| {
1283                    let quote: wallet::MeltQuote = serde_json::from_str(quote_json.value()).ok()?;
1284                    Some((id.value().to_string(), quote))
1285                })
1286                .collect();
1287
1288            // Update quotes that match the operation_id
1289            for (quote_id, mut quote) in all_quotes {
1290                if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1291                    quote.used_by_operation = None;
1292                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1293                    table
1294                        .insert(quote_id.as_str(), updated_json.as_str())
1295                        .map_err(Error::from)?;
1296                }
1297            }
1298        }
1299
1300        write_txn.commit().map_err(Error::from)?;
1301        Ok(())
1302    }
1303
1304    #[instrument(skip(self))]
1305    async fn reserve_mint_quote(
1306        &self,
1307        quote_id: &str,
1308        operation_id: &uuid::Uuid,
1309    ) -> Result<(), database::Error> {
1310        let write_txn = self.db.begin_write().map_err(Error::from)?;
1311        let operation_id_str = operation_id.to_string();
1312
1313        {
1314            let mut table = write_txn
1315                .open_table(MINT_QUOTES_TABLE)
1316                .map_err(Error::from)?;
1317
1318            // Read existing quote
1319            let quote_json = table
1320                .get(quote_id)
1321                .map_err(Error::from)?
1322                .map(|v| v.value().to_string());
1323
1324            match quote_json {
1325                Some(json) => {
1326                    let mut quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
1327
1328                    // Check if already reserved by another operation
1329                    if quote.used_by_operation.is_some() {
1330                        return Err(database::Error::QuoteAlreadyInUse);
1331                    }
1332
1333                    // Reserve the quote
1334                    quote.used_by_operation = Some(operation_id_str);
1335                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1336                    table
1337                        .insert(quote_id, updated_json.as_str())
1338                        .map_err(Error::from)?;
1339                }
1340                None => {
1341                    return Err(database::Error::UnknownQuote);
1342                }
1343            }
1344        }
1345
1346        write_txn.commit().map_err(Error::from)?;
1347        Ok(())
1348    }
1349
1350    #[instrument(skip(self))]
1351    async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1352        let write_txn = self.db.begin_write().map_err(Error::from)?;
1353        let operation_id_str = operation_id.to_string();
1354
1355        {
1356            let mut table = write_txn
1357                .open_table(MINT_QUOTES_TABLE)
1358                .map_err(Error::from)?;
1359
1360            // Collect all quotes first to avoid borrowing issues
1361            let all_quotes: Vec<(String, MintQuote)> = table
1362                .iter()
1363                .map_err(Error::from)?
1364                .flatten()
1365                .filter_map(|(id, quote_json)| {
1366                    let quote: MintQuote = serde_json::from_str(quote_json.value()).ok()?;
1367                    Some((id.value().to_string(), quote))
1368                })
1369                .collect();
1370
1371            // Update quotes that match the operation_id
1372            for (quote_id, mut quote) in all_quotes {
1373                if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1374                    quote.used_by_operation = None;
1375                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1376                    table
1377                        .insert(quote_id.as_str(), updated_json.as_str())
1378                        .map_err(Error::from)?;
1379                }
1380            }
1381        }
1382
1383        write_txn.commit().map_err(Error::from)?;
1384        Ok(())
1385    }
1386
1387    #[instrument(skip(self, value))]
1388    async fn kv_write(
1389        &self,
1390        primary_namespace: &str,
1391        secondary_namespace: &str,
1392        key: &str,
1393        value: &[u8],
1394    ) -> Result<(), database::Error> {
1395        // Validate parameters according to KV store requirements
1396        validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1397
1398        let write_txn = self.db.begin_write().map_err(Error::from)?;
1399        {
1400            let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1401            table
1402                .insert((primary_namespace, secondary_namespace, key), value)
1403                .map_err(Error::from)?;
1404        }
1405        write_txn.commit().map_err(Error::from)?;
1406
1407        Ok(())
1408    }
1409
1410    #[instrument(skip(self))]
1411    async fn kv_read(
1412        &self,
1413        primary_namespace: &str,
1414        secondary_namespace: &str,
1415        key: &str,
1416    ) -> Result<Option<Vec<u8>>, database::Error> {
1417        // Validate parameters according to KV store requirements
1418        validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1419
1420        let read_txn = self.db.begin_read().map_err(Error::from)?;
1421        let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1422
1423        let result = table
1424            .get((primary_namespace, secondary_namespace, key))
1425            .map_err(Error::from)?
1426            .map(|v| v.value().to_vec());
1427
1428        Ok(result)
1429    }
1430
1431    #[instrument(skip(self))]
1432    async fn kv_list(
1433        &self,
1434        primary_namespace: &str,
1435        secondary_namespace: &str,
1436    ) -> Result<Vec<String>, database::Error> {
1437        // Validate parameters according to KV store requirements
1438        validate_kvstore_params(primary_namespace, secondary_namespace, None)?;
1439
1440        let read_txn = self.db.begin_read().map_err(Error::from)?;
1441        let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1442
1443        let start = (primary_namespace, secondary_namespace, "");
1444        let iter = table.range(start..).map_err(Error::from)?;
1445
1446        let mut keys = Vec::new();
1447
1448        for item in iter {
1449            let (key, _) = item.map_err(Error::from)?;
1450            let (p, s, k) = key.value();
1451            if p == primary_namespace && s == secondary_namespace {
1452                keys.push(k.to_string());
1453            } else {
1454                break;
1455            }
1456        }
1457
1458        Ok(keys)
1459    }
1460
1461    #[instrument(skip(self))]
1462    async fn kv_remove(
1463        &self,
1464        primary_namespace: &str,
1465        secondary_namespace: &str,
1466        key: &str,
1467    ) -> Result<(), database::Error> {
1468        // Validate parameters according to KV store requirements
1469        validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1470
1471        let write_txn = self.db.begin_write().map_err(Error::from)?;
1472        {
1473            let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1474            table
1475                .remove((primary_namespace, secondary_namespace, key))
1476                .map_err(Error::from)?;
1477        }
1478        write_txn.commit().map_err(Error::from)?;
1479
1480        Ok(())
1481    }
1482
1483    #[instrument(skip(self))]
1484    async fn add_p2pk_key(
1485        &self,
1486        pubkey: &PublicKey,
1487        derivation_path: DerivationPath,
1488        derivation_index: u32,
1489    ) -> Result<(), database::Error> {
1490        let write_txn = self.db.begin_write().map_err(Error::from)?;
1491        {
1492            let mut table = write_txn
1493                .open_table(P2PK_SIGNING_KEYS_TABLE)
1494                .map_err(Error::from)?;
1495            table
1496                .insert(
1497                    pubkey.to_bytes().as_slice(),
1498                    serde_json::to_string(&wallet::P2PKSigningKey {
1499                        pubkey: *pubkey,
1500                        derivation_path,
1501                        derivation_index,
1502                        created_time: unix_time(),
1503                    })
1504                    .map_err(Error::from)?
1505                    .as_str(),
1506                )
1507                .map_err(Error::from)?;
1508        }
1509        write_txn.commit().map_err(Error::from)?;
1510        Ok(())
1511    }
1512
1513    #[instrument(skip(self))]
1514    async fn get_p2pk_key(
1515        &self,
1516        pubkey: &PublicKey,
1517    ) -> Result<Option<wallet::P2PKSigningKey>, database::Error> {
1518        let read_txn = self.db.begin_read().map_err(Error::from)?;
1519        let table = read_txn
1520            .open_table(P2PK_SIGNING_KEYS_TABLE)
1521            .map_err(Error::from)?;
1522
1523        if let Some(key) = table
1524            .get(pubkey.to_bytes().as_slice())
1525            .map_err(Error::from)?
1526        {
1527            return Ok(Some(
1528                serde_json::from_str(key.value()).map_err(Error::from)?,
1529            ));
1530        }
1531
1532        Ok(None)
1533    }
1534
1535    #[instrument(skip(self))]
1536    async fn list_p2pk_keys(&self) -> Result<Vec<wallet::P2PKSigningKey>, database::Error> {
1537        let read_txn = self.db.begin_read().map_err(Error::from)?;
1538        let table = read_txn
1539            .open_table(P2PK_SIGNING_KEYS_TABLE)
1540            .map_err(Error::from)?;
1541
1542        let keys: Vec<wallet::P2PKSigningKey> = table
1543            .iter()
1544            .map_err(Error::from)?
1545            .flatten()
1546            .filter_map(|(_k, v)| {
1547                if let Ok(key) = serde_json::from_str::<wallet::P2PKSigningKey>(v.value()) {
1548                    return Some(key);
1549                }
1550
1551                None
1552            })
1553            .collect();
1554
1555        Ok(keys)
1556    }
1557
1558    #[instrument(skip(self))]
1559    async fn latest_p2pk(&self) -> Result<Option<wallet::P2PKSigningKey>, database::Error> {
1560        let read_txn = self.db.begin_read().map_err(Error::from)?;
1561        let table = read_txn
1562            .open_table(P2PK_SIGNING_KEYS_TABLE)
1563            .map_err(Error::from)?;
1564
1565        let latest_key = table
1566            .iter()
1567            .map_err(Error::from)?
1568            .flatten()
1569            .filter_map(|(_k, v)| serde_json::from_str::<wallet::P2PKSigningKey>(v.value()).ok())
1570            .max_by_key(|key| key.derivation_index);
1571
1572        Ok(latest_key)
1573    }
1574}
1575
1576#[cfg(test)]
1577mod test {
1578    use std::path::PathBuf;
1579
1580    use cdk_common::wallet_db_test;
1581
1582    use super::WalletRedbDatabase;
1583
1584    async fn provide_db(test_id: String) -> WalletRedbDatabase {
1585        let path = PathBuf::from(format!("/tmp/cdk-test-{}.redb", test_id));
1586        WalletRedbDatabase::new(&path).expect("database")
1587    }
1588
1589    wallet_db_test!(provide_db);
1590}