Skip to main content

cdk_redb/wallet/
mod.rs

1//! Redb Wallet
2
3use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::path::Path;
6use std::str::FromStr;
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use cdk_common::database::{validate_kvstore_params, WalletDatabase};
11use cdk_common::mint_url::MintUrl;
12use cdk_common::nut00::KnownMethod;
13use cdk_common::util::unix_time;
14use cdk_common::wallet::{
15    self, MintQuote, ProofInfo, Transaction, TransactionDirection, TransactionId,
16};
17use cdk_common::{
18    database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod,
19    PublicKey, SpendingConditions, State,
20};
21use redb::{Database, MultimapTableDefinition, ReadableTable, TableDefinition};
22use tracing::instrument;
23
24use super::error::Error;
25use crate::migrations::migrate_00_to_01;
26use crate::wallet::migrations::{migrate_01_to_02, migrate_02_to_03, migrate_03_to_04};
27
28mod migrations;
29
30// <Mint_url, Info>
31const MINTS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mints_table");
32// <Mint_Url, Keyset_id>
33const MINT_KEYSETS_TABLE: MultimapTableDefinition<&str, &[u8]> =
34    MultimapTableDefinition::new("mint_keysets");
35// <Keyset_id, KeysetInfo>
36const KEYSETS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("keysets");
37// <Quote_id, quote>
38const MINT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_quotes");
39// <Quote_id, quote>
40const MELT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("melt_quotes");
41const MINT_KEYS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_keys");
42// <Y, Proof Info>
43const PROOFS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("proofs");
44const CONFIG_TABLE: TableDefinition<&str, &str> = TableDefinition::new("config");
45const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_counter");
46// <Transaction_id, Transaction>
47const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
48// <Saga_id, WalletSaga>
49const SAGAS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("wallet_sagas");
50
51const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
52// <(primary_namespace, secondary_namespace, key), value>
53const KV_STORE_TABLE: TableDefinition<(&str, &str, &str), &[u8]> = TableDefinition::new("kv_store");
54
55const DATABASE_VERSION: u32 = 4;
56
57/// Wallet Redb Database
58#[derive(Debug, Clone)]
59pub struct WalletRedbDatabase {
60    db: Arc<Database>,
61}
62
63impl WalletRedbDatabase {
64    /// Create new [`WalletRedbDatabase`]
65    pub fn new(path: &Path) -> Result<Self, Error> {
66        {
67            // Check if parent directory exists before attempting to create database
68            if let Some(parent) = path.parent() {
69                if !parent.exists() {
70                    return Err(Error::Io(std::io::Error::new(
71                        std::io::ErrorKind::NotFound,
72                        format!("Parent directory does not exist: {parent:?}"),
73                    )));
74                }
75            }
76
77            let db = Arc::new(Database::create(path)?);
78
79            let db_version: Option<String>;
80            {
81                // Check database version
82                let read_txn = db.begin_read()?;
83                let table = read_txn.open_table(CONFIG_TABLE);
84
85                db_version = match table {
86                    Ok(table) => table.get("db_version")?.map(|v| v.value().to_string()),
87                    Err(_) => None,
88                };
89            }
90
91            match db_version {
92                Some(db_version) => {
93                    let mut current_file_version = u32::from_str(&db_version)?;
94                    tracing::info!("Current file version {}", current_file_version);
95
96                    match current_file_version.cmp(&DATABASE_VERSION) {
97                        Ordering::Less => {
98                            tracing::info!(
99                                "Database needs to be upgraded at {} current is {}",
100                                current_file_version,
101                                DATABASE_VERSION
102                            );
103                            if current_file_version == 0 {
104                                current_file_version = migrate_00_to_01(Arc::clone(&db))?;
105                            }
106
107                            if current_file_version == 1 {
108                                current_file_version = migrate_01_to_02(Arc::clone(&db))?;
109                            }
110
111                            if current_file_version == 2 {
112                                current_file_version = migrate_02_to_03(Arc::clone(&db))?;
113                            }
114
115                            if current_file_version == 3 {
116                                current_file_version = migrate_03_to_04(Arc::clone(&db))?;
117                            }
118
119                            if current_file_version != DATABASE_VERSION {
120                                tracing::warn!(
121                                    "Database upgrade did not complete at {} current is {}",
122                                    current_file_version,
123                                    DATABASE_VERSION
124                                );
125                                return Err(Error::UnknownDatabaseVersion);
126                            }
127
128                            let write_txn = db.begin_write()?;
129                            {
130                                let mut table = write_txn.open_table(CONFIG_TABLE)?;
131
132                                table
133                                    .insert("db_version", DATABASE_VERSION.to_string().as_str())?;
134                            }
135
136                            write_txn.commit()?;
137                        }
138                        Ordering::Equal => {
139                            tracing::info!("Database is at current version {}", DATABASE_VERSION);
140                        }
141                        Ordering::Greater => {
142                            tracing::warn!(
143                                "Database upgrade did not complete at {} current is {}",
144                                current_file_version,
145                                DATABASE_VERSION
146                            );
147                            return Err(Error::UnknownDatabaseVersion);
148                        }
149                    }
150                }
151                None => {
152                    let write_txn = db.begin_write()?;
153                    {
154                        let mut table = write_txn.open_table(CONFIG_TABLE)?;
155                        // Open all tables to init a new db
156                        let _ = write_txn.open_table(MINTS_TABLE)?;
157                        let _ = write_txn.open_multimap_table(MINT_KEYSETS_TABLE)?;
158                        let _ = write_txn.open_table(KEYSETS_TABLE)?;
159                        let _ = write_txn.open_table(MINT_QUOTES_TABLE)?;
160                        let _ = write_txn.open_table(MELT_QUOTES_TABLE)?;
161                        let _ = write_txn.open_table(MINT_KEYS_TABLE)?;
162                        let _ = write_txn.open_table(PROOFS_TABLE)?;
163                        let _ = write_txn.open_table(KEYSET_COUNTER)?;
164                        let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
165                        let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
166                        let _ = write_txn.open_table(KV_STORE_TABLE)?;
167                        table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
168                    }
169
170                    write_txn.commit()?;
171                }
172            }
173            drop(db);
174        }
175
176        // Check parent directory again for final database creation
177        if let Some(parent) = path.parent() {
178            if !parent.exists() {
179                return Err(Error::Io(std::io::Error::new(
180                    std::io::ErrorKind::NotFound,
181                    format!("Parent directory does not exist: {parent:?}"),
182                )));
183            }
184        }
185
186        let mut db = Database::create(path)?;
187
188        db.upgrade()?;
189
190        Ok(Self { db: Arc::new(db) })
191    }
192}
193
194#[async_trait]
195impl WalletDatabase<database::Error> for WalletRedbDatabase {
196    #[instrument(skip(self))]
197    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
198        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
199        let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
200
201        if let Some(mint_info) = table
202            .get(mint_url.to_string().as_str())
203            .map_err(Error::from)?
204        {
205            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
206        }
207
208        Ok(None)
209    }
210
211    #[instrument(skip(self))]
212    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
213        let read_txn = self.db.begin_read().map_err(Error::from)?;
214        let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
215        let mints = table
216            .iter()
217            .map_err(Error::from)?
218            .flatten()
219            .filter_map(|(mint, mint_info)| {
220                MintUrl::from_str(mint.value())
221                    .ok()
222                    .map(|url| (url, serde_json::from_str(mint_info.value()).ok()))
223            })
224            .collect();
225
226        Ok(mints)
227    }
228
229    #[instrument(skip(self))]
230    async fn get_mint_keysets(
231        &self,
232        mint_url: MintUrl,
233    ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
234        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
235        let table = read_txn
236            .open_multimap_table(MINT_KEYSETS_TABLE)
237            .map_err(Error::from)?;
238
239        let keyset_ids = table
240            .get(mint_url.to_string().as_str())
241            .map_err(Error::from)?
242            .flatten()
243            .map(|k| Id::from_bytes(k.value()))
244            .collect::<Result<Vec<_>, _>>()?;
245
246        let mut keysets = vec![];
247
248        let keysets_t = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
249
250        for keyset_id in keyset_ids {
251            if let Some(keyset) = keysets_t
252                .get(keyset_id.to_bytes().as_slice())
253                .map_err(Error::from)?
254            {
255                let keyset = serde_json::from_str(keyset.value()).map_err(Error::from)?;
256
257                keysets.push(keyset);
258            }
259        }
260
261        match keysets.is_empty() {
262            true => Ok(None),
263            false => Ok(Some(keysets)),
264        }
265    }
266
267    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
268    async fn get_keyset_by_id(
269        &self,
270        keyset_id: &Id,
271    ) -> Result<Option<KeySetInfo>, database::Error> {
272        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
273        let table = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
274
275        match table
276            .get(keyset_id.to_bytes().as_slice())
277            .map_err(Error::from)?
278        {
279            Some(keyset) => {
280                let keyset: KeySetInfo =
281                    serde_json::from_str(keyset.value()).map_err(Error::from)?;
282
283                Ok(Some(keyset))
284            }
285            None => Ok(None),
286        }
287    }
288
289    #[instrument(skip_all)]
290    async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
291        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
292        let table = read_txn
293            .open_table(MINT_QUOTES_TABLE)
294            .map_err(Error::from)?;
295
296        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
297            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
298        }
299
300        Ok(None)
301    }
302
303    #[instrument(skip_all)]
304    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
305        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
306        let table = read_txn
307            .open_table(MINT_QUOTES_TABLE)
308            .map_err(Error::from)?;
309
310        Ok(table
311            .iter()
312            .map_err(Error::from)?
313            .flatten()
314            .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
315            .collect())
316    }
317
318    async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
319        let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
320        let table = read_txn
321            .open_table(MINT_QUOTES_TABLE)
322            .map_err(Error::from)?;
323
324        Ok(table
325            .iter()
326            .map_err(Error::from)?
327            .flatten()
328            .flat_map(|(_id, quote)| serde_json::from_str::<MintQuote>(quote.value()).ok())
329            .filter(|quote| {
330                quote.amount_issued == Amount::ZERO
331                    || quote.payment_method == PaymentMethod::Known(KnownMethod::Bolt12)
332            })
333            .collect())
334    }
335
336    #[instrument(skip_all)]
337    async fn get_melt_quote(
338        &self,
339        quote_id: &str,
340    ) -> Result<Option<wallet::MeltQuote>, database::Error> {
341        let read_txn = self.db.begin_read().map_err(Error::from)?;
342        let table = read_txn
343            .open_table(MELT_QUOTES_TABLE)
344            .map_err(Error::from)?;
345
346        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
347            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
348        }
349
350        Ok(None)
351    }
352
353    #[instrument(skip_all)]
354    async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
355        let read_txn = self.db.begin_read().map_err(Error::from)?;
356        let table = read_txn
357            .open_table(MELT_QUOTES_TABLE)
358            .map_err(Error::from)?;
359
360        Ok(table
361            .iter()
362            .map_err(Error::from)?
363            .flatten()
364            .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
365            .collect())
366    }
367
368    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
369    async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
370        let read_txn = self.db.begin_read().map_err(Error::from)?;
371        let table = read_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
372
373        if let Some(mint_info) = table
374            .get(keyset_id.to_string().as_str())
375            .map_err(Error::from)?
376        {
377            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
378        }
379
380        Ok(None)
381    }
382
383    #[instrument(skip_all)]
384    async fn get_proofs(
385        &self,
386        mint_url: Option<MintUrl>,
387        unit: Option<CurrencyUnit>,
388        state: Option<Vec<State>>,
389        spending_conditions: Option<Vec<SpendingConditions>>,
390    ) -> Result<Vec<ProofInfo>, database::Error> {
391        let read_txn = self.db.begin_read().map_err(Error::from)?;
392
393        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
394
395        let proofs: Vec<ProofInfo> = table
396            .iter()
397            .map_err(Error::from)?
398            .flatten()
399            .filter_map(|(_k, v)| {
400                let mut proof = None;
401
402                if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
403                    if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
404                    {
405                        proof = Some(proof_info)
406                    }
407                }
408
409                proof
410            })
411            .collect();
412
413        Ok(proofs)
414    }
415
416    #[instrument(skip(self, ys))]
417    async fn get_proofs_by_ys(
418        &self,
419        ys: Vec<PublicKey>,
420    ) -> Result<Vec<ProofInfo>, database::Error> {
421        if ys.is_empty() {
422            return Ok(Vec::new());
423        }
424
425        let read_txn = self.db.begin_read().map_err(Error::from)?;
426        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
427
428        let mut proofs = Vec::new();
429
430        for y in ys {
431            if let Some(proof) = table.get(y.to_bytes().as_slice()).map_err(Error::from)? {
432                let proof_info =
433                    serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
434                proofs.push(proof_info);
435            }
436        }
437
438        Ok(proofs)
439    }
440
441    async fn get_balance(
442        &self,
443        mint_url: Option<MintUrl>,
444        unit: Option<CurrencyUnit>,
445        state: Option<Vec<State>>,
446    ) -> Result<u64, database::Error> {
447        // For redb, we still need to fetch all proofs and sum them
448        // since redb doesn't have SQL aggregation
449        let proofs = self.get_proofs(mint_url, unit, state, None).await?;
450        Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
451    }
452
453    #[instrument(skip(self))]
454    async fn get_transaction(
455        &self,
456        transaction_id: TransactionId,
457    ) -> Result<Option<Transaction>, database::Error> {
458        let read_txn = self.db.begin_read().map_err(Error::from)?;
459        let table = read_txn
460            .open_table(TRANSACTIONS_TABLE)
461            .map_err(Error::from)?;
462
463        if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
464            return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
465        }
466
467        Ok(None)
468    }
469
470    #[instrument(skip(self))]
471    async fn list_transactions(
472        &self,
473        mint_url: Option<MintUrl>,
474        direction: Option<TransactionDirection>,
475        unit: Option<CurrencyUnit>,
476    ) -> Result<Vec<Transaction>, database::Error> {
477        let read_txn = self.db.begin_read().map_err(Error::from)?;
478
479        let table = read_txn
480            .open_table(TRANSACTIONS_TABLE)
481            .map_err(Error::from)?;
482
483        let transactions: Vec<Transaction> = table
484            .iter()
485            .map_err(Error::from)?
486            .flatten()
487            .filter_map(|(_k, v)| {
488                let mut transaction = None;
489
490                if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
491                    if tx.matches_conditions(&mint_url, &direction, &unit) {
492                        transaction = Some(tx)
493                    }
494                }
495
496                transaction
497            })
498            .collect();
499
500        Ok(transactions)
501    }
502
503    #[instrument(skip(self, added, removed_ys))]
504    async fn update_proofs(
505        &self,
506        added: Vec<ProofInfo>,
507        removed_ys: Vec<PublicKey>,
508    ) -> Result<(), database::Error> {
509        let write_txn = self.db.begin_write().map_err(Error::from)?;
510        {
511            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
512
513            for proof_info in added.iter() {
514                table
515                    .insert(
516                        proof_info.y.to_bytes().as_slice(),
517                        serde_json::to_string(&proof_info)
518                            .map_err(Error::from)?
519                            .as_str(),
520                    )
521                    .map_err(Error::from)?;
522            }
523
524            for y in removed_ys.iter() {
525                table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
526            }
527        }
528        write_txn.commit().map_err(Error::from)?;
529        Ok(())
530    }
531
532    async fn update_proofs_state(
533        &self,
534        ys: Vec<PublicKey>,
535        state: State,
536    ) -> Result<(), database::Error> {
537        let write_txn = self.db.begin_write().map_err(Error::from)?;
538        {
539            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
540
541            for y in ys {
542                let y_slice = y.to_bytes();
543                let proof = table
544                    .get(y_slice.as_slice())
545                    .map_err(Error::from)?
546                    .ok_or(Error::UnknownY)?;
547
548                let mut proof_info =
549                    serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
550                drop(proof);
551
552                proof_info.state = state;
553
554                table
555                    .insert(
556                        y_slice.as_slice(),
557                        serde_json::to_string(&proof_info)
558                            .map_err(Error::from)?
559                            .as_str(),
560                    )
561                    .map_err(Error::from)?;
562            }
563        }
564        write_txn.commit().map_err(Error::from)?;
565        Ok(())
566    }
567
568    #[instrument(skip(self))]
569    async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
570        let id = transaction.id();
571        let write_txn = self.db.begin_write().map_err(Error::from)?;
572        {
573            let mut table = write_txn
574                .open_table(TRANSACTIONS_TABLE)
575                .map_err(Error::from)?;
576            table
577                .insert(
578                    id.as_slice(),
579                    serde_json::to_string(&transaction)
580                        .map_err(Error::from)?
581                        .as_str(),
582                )
583                .map_err(Error::from)?;
584        }
585        write_txn.commit().map_err(Error::from)?;
586        Ok(())
587    }
588
589    #[instrument(skip(self))]
590    async fn update_mint_url(
591        &self,
592        old_mint_url: MintUrl,
593        new_mint_url: MintUrl,
594    ) -> Result<(), database::Error> {
595        let write_txn = self.db.begin_write().map_err(Error::from)?;
596
597        // Update proofs table
598        {
599            let read_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
600            let proofs: Vec<ProofInfo> = read_table
601                .iter()
602                .map_err(Error::from)?
603                .flatten()
604                .filter_map(|(_k, v)| {
605                    let proof_info = serde_json::from_str::<ProofInfo>(v.value()).ok()?;
606                    if proof_info.mint_url == old_mint_url {
607                        Some(proof_info)
608                    } else {
609                        None
610                    }
611                })
612                .collect();
613            drop(read_table);
614
615            if !proofs.is_empty() {
616                let mut write_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
617                for mut proof_info in proofs {
618                    proof_info.mint_url = new_mint_url.clone();
619                    write_table
620                        .insert(
621                            proof_info.y.to_bytes().as_slice(),
622                            serde_json::to_string(&proof_info)
623                                .map_err(Error::from)?
624                                .as_str(),
625                        )
626                        .map_err(Error::from)?;
627                }
628            }
629        }
630
631        // Update mint quotes
632        {
633            let mut table = write_txn
634                .open_table(MINT_QUOTES_TABLE)
635                .map_err(Error::from)?;
636
637            let unix_time = unix_time();
638
639            let quotes: Vec<MintQuote> = table
640                .iter()
641                .map_err(Error::from)?
642                .flatten()
643                .filter_map(|(_, quote)| {
644                    let mut q: MintQuote = serde_json::from_str(quote.value()).ok()?;
645                    if q.mint_url == old_mint_url && q.expiry >= unix_time {
646                        q.mint_url = new_mint_url.clone();
647                        Some(q)
648                    } else {
649                        None
650                    }
651                })
652                .collect();
653
654            for quote in quotes {
655                table
656                    .insert(
657                        quote.id.as_str(),
658                        serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
659                    )
660                    .map_err(Error::from)?;
661            }
662        }
663
664        write_txn.commit().map_err(Error::from)?;
665        Ok(())
666    }
667
668    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
669    async fn increment_keyset_counter(
670        &self,
671        keyset_id: &Id,
672        count: u32,
673    ) -> Result<u32, database::Error> {
674        let write_txn = self.db.begin_write().map_err(Error::from)?;
675        let new_counter = {
676            let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
677            let current_counter = table
678                .get(keyset_id.to_string().as_str())
679                .map_err(Error::from)?
680                .map(|x| x.value())
681                .unwrap_or_default();
682
683            let new_counter = current_counter + count;
684
685            table
686                .insert(keyset_id.to_string().as_str(), new_counter)
687                .map_err(Error::from)?;
688
689            new_counter
690        };
691        write_txn.commit().map_err(Error::from)?;
692        Ok(new_counter)
693    }
694
695    #[instrument(skip(self))]
696    async fn add_mint(
697        &self,
698        mint_url: MintUrl,
699        mint_info: Option<MintInfo>,
700    ) -> Result<(), database::Error> {
701        let write_txn = self.db.begin_write().map_err(Error::from)?;
702        {
703            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
704            table
705                .insert(
706                    mint_url.to_string().as_str(),
707                    serde_json::to_string(&mint_info)
708                        .map_err(Error::from)?
709                        .as_str(),
710                )
711                .map_err(Error::from)?;
712        }
713        write_txn.commit().map_err(Error::from)?;
714        Ok(())
715    }
716
717    #[instrument(skip(self))]
718    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
719        let write_txn = self.db.begin_write().map_err(Error::from)?;
720        {
721            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
722            table
723                .remove(mint_url.to_string().as_str())
724                .map_err(Error::from)?;
725        }
726        write_txn.commit().map_err(Error::from)?;
727        Ok(())
728    }
729
730    #[instrument(skip(self))]
731    async fn add_mint_keysets(
732        &self,
733        mint_url: MintUrl,
734        keysets: Vec<KeySetInfo>,
735    ) -> Result<(), database::Error> {
736        let write_txn = self.db.begin_write().map_err(Error::from)?;
737        {
738            let mut table = write_txn
739                .open_multimap_table(MINT_KEYSETS_TABLE)
740                .map_err(Error::from)?;
741            let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
742            let mut u32_table = write_txn
743                .open_table(KEYSET_U32_MAPPING)
744                .map_err(Error::from)?;
745
746            let mut existing_u32 = false;
747
748            for keyset in keysets {
749                // Check if keyset already exists
750                let existing_keyset = {
751                    let existing_keyset = keysets_table
752                        .get(keyset.id.to_bytes().as_slice())
753                        .map_err(Error::from)?;
754
755                    existing_keyset.map(|r| r.value().to_string())
756                };
757
758                let existing = u32_table
759                    .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
760                    .map_err(Error::from)?;
761
762                match existing {
763                    None => existing_u32 = false,
764                    Some(id) => {
765                        let id = Id::from_str(id.value())?;
766
767                        if id == keyset.id {
768                            existing_u32 = false;
769                        } else {
770                            existing_u32 = true;
771                            break;
772                        }
773                    }
774                }
775
776                let keyset = if let Some(existing_keyset) = existing_keyset {
777                    let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
778
779                    existing_keyset.active = keyset.active;
780                    existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
781
782                    existing_keyset
783                } else {
784                    table
785                        .insert(
786                            mint_url.to_string().as_str(),
787                            keyset.id.to_bytes().as_slice(),
788                        )
789                        .map_err(Error::from)?;
790
791                    keyset
792                };
793
794                keysets_table
795                    .insert(
796                        keyset.id.to_bytes().as_slice(),
797                        serde_json::to_string(&keyset)
798                            .map_err(Error::from)?
799                            .as_str(),
800                    )
801                    .map_err(Error::from)?;
802            }
803
804            if existing_u32 {
805                tracing::warn!("Keyset already exists for keyset id");
806                return Err(database::Error::Duplicate);
807            }
808        }
809        write_txn.commit().map_err(Error::from)?;
810        Ok(())
811    }
812
813    #[instrument(skip_all)]
814    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
815        let write_txn = self.db.begin_write().map_err(Error::from)?;
816        {
817            let mut table = write_txn
818                .open_table(MINT_QUOTES_TABLE)
819                .map_err(Error::from)?;
820
821            // Check for existing quote and version match
822            let existing_quote_json = table
823                .get(quote.id.as_str())
824                .map_err(Error::from)?
825                .map(|v| v.value().to_string());
826
827            let mut quote_to_save = quote.clone();
828
829            if let Some(json) = existing_quote_json {
830                let existing_quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
831
832                if existing_quote.version != quote.version {
833                    return Err(database::Error::ConcurrentUpdate);
834                }
835
836                // Increment version for update
837                quote_to_save.version = quote.version.wrapping_add(1);
838            }
839
840            table
841                .insert(
842                    quote_to_save.id.as_str(),
843                    serde_json::to_string(&quote_to_save)
844                        .map_err(Error::from)?
845                        .as_str(),
846                )
847                .map_err(Error::from)?;
848        }
849        write_txn.commit().map_err(Error::from)?;
850        Ok(())
851    }
852
853    #[instrument(skip_all)]
854    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
855        let write_txn = self.db.begin_write().map_err(Error::from)?;
856        {
857            let mut table = write_txn
858                .open_table(MINT_QUOTES_TABLE)
859                .map_err(Error::from)?;
860            table.remove(quote_id).map_err(Error::from)?;
861        }
862        write_txn.commit().map_err(Error::from)?;
863        Ok(())
864    }
865
866    #[instrument(skip_all)]
867    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
868        let write_txn = self.db.begin_write().map_err(Error::from)?;
869        {
870            let mut table = write_txn
871                .open_table(MELT_QUOTES_TABLE)
872                .map_err(Error::from)?;
873
874            // Check for existing quote and version match
875            let existing_quote_json = table
876                .get(quote.id.as_str())
877                .map_err(Error::from)?
878                .map(|v| v.value().to_string());
879
880            let mut quote_to_save = quote.clone();
881
882            if let Some(json) = existing_quote_json {
883                let existing_quote: wallet::MeltQuote =
884                    serde_json::from_str(&json).map_err(Error::from)?;
885
886                if existing_quote.version != quote.version {
887                    return Err(database::Error::ConcurrentUpdate);
888                }
889
890                // Increment version for update
891                quote_to_save.version = quote.version.wrapping_add(1);
892            }
893
894            table
895                .insert(
896                    quote_to_save.id.as_str(),
897                    serde_json::to_string(&quote_to_save)
898                        .map_err(Error::from)?
899                        .as_str(),
900                )
901                .map_err(Error::from)?;
902        }
903        write_txn.commit().map_err(Error::from)?;
904        Ok(())
905    }
906
907    #[instrument(skip_all)]
908    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
909        let write_txn = self.db.begin_write().map_err(Error::from)?;
910        {
911            let mut table = write_txn
912                .open_table(MELT_QUOTES_TABLE)
913                .map_err(Error::from)?;
914            table.remove(quote_id).map_err(Error::from)?;
915        }
916        write_txn.commit().map_err(Error::from)?;
917        Ok(())
918    }
919
920    #[instrument(skip_all)]
921    async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
922        let write_txn = self.db.begin_write().map_err(Error::from)?;
923
924        keyset.verify_id()?;
925
926        {
927            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
928
929            let existing_keys = table
930                .insert(
931                    keyset.id.to_string().as_str(),
932                    serde_json::to_string(&keyset.keys)
933                        .map_err(Error::from)?
934                        .as_str(),
935                )
936                .map_err(Error::from)?
937                .is_some();
938
939            let mut table = write_txn
940                .open_table(KEYSET_U32_MAPPING)
941                .map_err(Error::from)?;
942
943            let existing = table
944                .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
945                .map_err(Error::from)?;
946
947            let existing_u32 = match existing {
948                None => false,
949                Some(id) => {
950                    let id = Id::from_str(id.value())?;
951                    id != keyset.id
952                }
953            };
954
955            if existing_keys || existing_u32 {
956                tracing::warn!("Keys already exist for keyset id");
957                return Err(database::Error::Duplicate);
958            }
959        }
960        write_txn.commit().map_err(Error::from)?;
961        Ok(())
962    }
963
964    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
965    async fn remove_keys(&self, keyset_id: &Id) -> Result<(), database::Error> {
966        let write_txn = self.db.begin_write().map_err(Error::from)?;
967        {
968            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
969
970            table
971                .remove(keyset_id.to_string().as_str())
972                .map_err(Error::from)?;
973        }
974        write_txn.commit().map_err(Error::from)?;
975        Ok(())
976    }
977
978    #[instrument(skip(self))]
979    async fn remove_transaction(
980        &self,
981        transaction_id: TransactionId,
982    ) -> Result<(), database::Error> {
983        let write_txn = self.db.begin_write().map_err(Error::from)?;
984        {
985            let mut table = write_txn
986                .open_table(TRANSACTIONS_TABLE)
987                .map_err(Error::from)?;
988            table
989                .remove(transaction_id.as_slice())
990                .map_err(Error::from)?;
991        }
992        write_txn.commit().map_err(Error::from)?;
993        Ok(())
994    }
995
996    #[instrument(skip(self))]
997    async fn add_saga(&self, saga: wallet::WalletSaga) -> Result<(), database::Error> {
998        let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
999        let id_str = saga.id.to_string();
1000
1001        let write_txn = self.db.begin_write().map_err(Error::from)?;
1002        {
1003            let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1004            table
1005                .insert(id_str.as_str(), saga_json.as_str())
1006                .map_err(Error::from)?;
1007        }
1008        write_txn.commit().map_err(Error::from)?;
1009        Ok(())
1010    }
1011
1012    #[instrument(skip(self))]
1013    async fn get_saga(
1014        &self,
1015        id: &uuid::Uuid,
1016    ) -> Result<Option<wallet::WalletSaga>, database::Error> {
1017        let read_txn = self.db.begin_read().map_err(Error::from)?;
1018        let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1019        let id_str = id.to_string();
1020
1021        let result = table
1022            .get(id_str.as_str())
1023            .map_err(Error::from)?
1024            .map(|saga| serde_json::from_str(saga.value()).map_err(Error::from))
1025            .transpose()?;
1026
1027        Ok(result)
1028    }
1029
1030    #[instrument(skip(self))]
1031    async fn update_saga(&self, saga: wallet::WalletSaga) -> Result<bool, database::Error> {
1032        let id_str = saga.id.to_string();
1033
1034        // The saga.version has already been incremented by the caller, so we check
1035        // for (saga.version - 1) as the expected version in the database.
1036        let expected_version = saga.version.saturating_sub(1);
1037
1038        let write_txn = self.db.begin_write().map_err(Error::from)?;
1039        let updated = {
1040            let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1041
1042            // Read existing saga to check version (optimistic locking)
1043            let existing_saga_json = table
1044                .get(id_str.as_str())
1045                .map_err(Error::from)?
1046                .map(|v| v.value().to_string());
1047
1048            match existing_saga_json {
1049                Some(json) => {
1050                    let existing_saga: wallet::WalletSaga =
1051                        serde_json::from_str(&json).map_err(Error::from)?;
1052
1053                    // Check if version matches expected version
1054                    if existing_saga.version != expected_version {
1055                        // Version mismatch - another instance modified it
1056                        false
1057                    } else {
1058                        // Version matches - safe to update
1059                        let saga_json = serde_json::to_string(&saga).map_err(Error::from)?;
1060                        table
1061                            .insert(id_str.as_str(), saga_json.as_str())
1062                            .map_err(Error::from)?;
1063                        true
1064                    }
1065                }
1066                None => {
1067                    // Saga doesn't exist - can't update
1068                    false
1069                }
1070            }
1071        };
1072        write_txn.commit().map_err(Error::from)?;
1073        Ok(updated)
1074    }
1075
1076    #[instrument(skip(self))]
1077    async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), database::Error> {
1078        let write_txn = self.db.begin_write().map_err(Error::from)?;
1079        let id_str = id.to_string();
1080        {
1081            let mut table = write_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1082            table.remove(id_str.as_str()).map_err(Error::from)?;
1083        }
1084        write_txn.commit().map_err(Error::from)?;
1085        Ok(())
1086    }
1087
1088    #[instrument(skip(self))]
1089    async fn get_incomplete_sagas(&self) -> Result<Vec<wallet::WalletSaga>, database::Error> {
1090        let read_txn = self.db.begin_read().map_err(Error::from)?;
1091        let table = read_txn.open_table(SAGAS_TABLE).map_err(Error::from)?;
1092
1093        let mut sagas: Vec<wallet::WalletSaga> = table
1094            .iter()
1095            .map_err(Error::from)?
1096            .flatten()
1097            .filter_map(|(_, saga_json)| {
1098                serde_json::from_str::<wallet::WalletSaga>(saga_json.value()).ok()
1099            })
1100            .collect();
1101
1102        // Sort by created_at ascending (oldest first)
1103        sagas.sort_by_key(|saga| saga.created_at);
1104
1105        Ok(sagas)
1106    }
1107
1108    #[instrument(skip(self))]
1109    async fn reserve_proofs(
1110        &self,
1111        ys: Vec<PublicKey>,
1112        operation_id: &uuid::Uuid,
1113    ) -> Result<(), database::Error> {
1114        let write_txn = self.db.begin_write().map_err(Error::from)?;
1115
1116        {
1117            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1118
1119            for y in ys {
1120                let y_bytes = y.to_bytes();
1121
1122                // Read the proof and convert to string immediately
1123                let proof_json_str = {
1124                    let proof_json_opt = table.get(y_bytes.as_slice()).map_err(Error::from)?;
1125                    proof_json_opt.map(|proof_json| proof_json.value().to_string())
1126                };
1127
1128                let Some(proof_json_str) = proof_json_str else {
1129                    return Err(database::Error::ProofNotUnspent);
1130                };
1131
1132                let mut proof: ProofInfo =
1133                    serde_json::from_str(&proof_json_str).map_err(Error::from)?;
1134
1135                if proof.state != State::Unspent {
1136                    return Err(database::Error::ProofNotUnspent);
1137                }
1138
1139                proof.state = State::Reserved;
1140                proof.used_by_operation = Some(*operation_id);
1141
1142                let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1143                table
1144                    .insert(y_bytes.as_slice(), updated_json.as_str())
1145                    .map_err(Error::from)?;
1146            }
1147        }
1148
1149        write_txn.commit().map_err(Error::from)?;
1150        Ok(())
1151    }
1152
1153    #[instrument(skip(self))]
1154    async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1155        let write_txn = self.db.begin_write().map_err(Error::from)?;
1156
1157        {
1158            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1159
1160            // Collect all proofs first to avoid borrowing issues
1161            let all_proofs: Vec<(Vec<u8>, ProofInfo)> = table
1162                .iter()
1163                .map_err(Error::from)?
1164                .flatten()
1165                .filter_map(|(y, proof_json)| {
1166                    let proof: ProofInfo = serde_json::from_str(proof_json.value()).ok()?;
1167                    Some((y.value().to_vec(), proof))
1168                })
1169                .collect();
1170
1171            // Now update proofs that match the operation_id
1172            for (y_bytes, mut proof) in all_proofs {
1173                if proof.used_by_operation == Some(*operation_id) {
1174                    proof.state = State::Unspent;
1175                    proof.used_by_operation = None;
1176
1177                    let updated_json = serde_json::to_string(&proof).map_err(Error::from)?;
1178                    table
1179                        .insert(y_bytes.as_slice(), updated_json.as_str())
1180                        .map_err(Error::from)?;
1181                }
1182            }
1183        }
1184
1185        write_txn.commit().map_err(Error::from)?;
1186        Ok(())
1187    }
1188
1189    #[instrument(skip(self))]
1190    async fn get_reserved_proofs(
1191        &self,
1192        operation_id: &uuid::Uuid,
1193    ) -> Result<Vec<ProofInfo>, database::Error> {
1194        let read_txn = self.db.begin_read().map_err(Error::from)?;
1195        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
1196
1197        let proofs: Vec<ProofInfo> = table
1198            .iter()
1199            .map_err(Error::from)?
1200            .flatten()
1201            .filter_map(|(_, proof_json)| {
1202                serde_json::from_str::<ProofInfo>(proof_json.value()).ok()
1203            })
1204            .filter(|proof| proof.used_by_operation == Some(*operation_id))
1205            .collect();
1206
1207        Ok(proofs)
1208    }
1209
1210    #[instrument(skip(self))]
1211    async fn reserve_melt_quote(
1212        &self,
1213        quote_id: &str,
1214        operation_id: &uuid::Uuid,
1215    ) -> Result<(), database::Error> {
1216        let write_txn = self.db.begin_write().map_err(Error::from)?;
1217        let operation_id_str = operation_id.to_string();
1218
1219        {
1220            let mut table = write_txn
1221                .open_table(MELT_QUOTES_TABLE)
1222                .map_err(Error::from)?;
1223
1224            // Read existing quote
1225            let quote_json = table
1226                .get(quote_id)
1227                .map_err(Error::from)?
1228                .map(|v| v.value().to_string());
1229
1230            match quote_json {
1231                Some(json) => {
1232                    let mut quote: wallet::MeltQuote =
1233                        serde_json::from_str(&json).map_err(Error::from)?;
1234
1235                    // Check if already reserved by another operation
1236                    if quote.used_by_operation.is_some() {
1237                        return Err(database::Error::QuoteAlreadyInUse);
1238                    }
1239
1240                    // Reserve the quote
1241                    quote.used_by_operation = Some(operation_id_str);
1242                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1243                    table
1244                        .insert(quote_id, updated_json.as_str())
1245                        .map_err(Error::from)?;
1246                }
1247                None => {
1248                    return Err(database::Error::UnknownQuote);
1249                }
1250            }
1251        }
1252
1253        write_txn.commit().map_err(Error::from)?;
1254        Ok(())
1255    }
1256
1257    #[instrument(skip(self))]
1258    async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1259        let write_txn = self.db.begin_write().map_err(Error::from)?;
1260        let operation_id_str = operation_id.to_string();
1261
1262        {
1263            let mut table = write_txn
1264                .open_table(MELT_QUOTES_TABLE)
1265                .map_err(Error::from)?;
1266
1267            // Collect all quotes first to avoid borrowing issues
1268            let all_quotes: Vec<(String, wallet::MeltQuote)> = table
1269                .iter()
1270                .map_err(Error::from)?
1271                .flatten()
1272                .filter_map(|(id, quote_json)| {
1273                    let quote: wallet::MeltQuote = serde_json::from_str(quote_json.value()).ok()?;
1274                    Some((id.value().to_string(), quote))
1275                })
1276                .collect();
1277
1278            // Update quotes that match the operation_id
1279            for (quote_id, mut quote) in all_quotes {
1280                if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1281                    quote.used_by_operation = None;
1282                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1283                    table
1284                        .insert(quote_id.as_str(), updated_json.as_str())
1285                        .map_err(Error::from)?;
1286                }
1287            }
1288        }
1289
1290        write_txn.commit().map_err(Error::from)?;
1291        Ok(())
1292    }
1293
1294    #[instrument(skip(self))]
1295    async fn reserve_mint_quote(
1296        &self,
1297        quote_id: &str,
1298        operation_id: &uuid::Uuid,
1299    ) -> Result<(), database::Error> {
1300        let write_txn = self.db.begin_write().map_err(Error::from)?;
1301        let operation_id_str = operation_id.to_string();
1302
1303        {
1304            let mut table = write_txn
1305                .open_table(MINT_QUOTES_TABLE)
1306                .map_err(Error::from)?;
1307
1308            // Read existing quote
1309            let quote_json = table
1310                .get(quote_id)
1311                .map_err(Error::from)?
1312                .map(|v| v.value().to_string());
1313
1314            match quote_json {
1315                Some(json) => {
1316                    let mut quote: MintQuote = serde_json::from_str(&json).map_err(Error::from)?;
1317
1318                    // Check if already reserved by another operation
1319                    if quote.used_by_operation.is_some() {
1320                        return Err(database::Error::QuoteAlreadyInUse);
1321                    }
1322
1323                    // Reserve the quote
1324                    quote.used_by_operation = Some(operation_id_str);
1325                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1326                    table
1327                        .insert(quote_id, updated_json.as_str())
1328                        .map_err(Error::from)?;
1329                }
1330                None => {
1331                    return Err(database::Error::UnknownQuote);
1332                }
1333            }
1334        }
1335
1336        write_txn.commit().map_err(Error::from)?;
1337        Ok(())
1338    }
1339
1340    #[instrument(skip(self))]
1341    async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), database::Error> {
1342        let write_txn = self.db.begin_write().map_err(Error::from)?;
1343        let operation_id_str = operation_id.to_string();
1344
1345        {
1346            let mut table = write_txn
1347                .open_table(MINT_QUOTES_TABLE)
1348                .map_err(Error::from)?;
1349
1350            // Collect all quotes first to avoid borrowing issues
1351            let all_quotes: Vec<(String, MintQuote)> = table
1352                .iter()
1353                .map_err(Error::from)?
1354                .flatten()
1355                .filter_map(|(id, quote_json)| {
1356                    let quote: MintQuote = serde_json::from_str(quote_json.value()).ok()?;
1357                    Some((id.value().to_string(), quote))
1358                })
1359                .collect();
1360
1361            // Update quotes that match the operation_id
1362            for (quote_id, mut quote) in all_quotes {
1363                if quote.used_by_operation.as_deref() == Some(&operation_id_str) {
1364                    quote.used_by_operation = None;
1365                    let updated_json = serde_json::to_string(&quote).map_err(Error::from)?;
1366                    table
1367                        .insert(quote_id.as_str(), updated_json.as_str())
1368                        .map_err(Error::from)?;
1369                }
1370            }
1371        }
1372
1373        write_txn.commit().map_err(Error::from)?;
1374        Ok(())
1375    }
1376
1377    #[instrument(skip(self, value))]
1378    async fn kv_write(
1379        &self,
1380        primary_namespace: &str,
1381        secondary_namespace: &str,
1382        key: &str,
1383        value: &[u8],
1384    ) -> Result<(), database::Error> {
1385        // Validate parameters according to KV store requirements
1386        validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1387
1388        let write_txn = self.db.begin_write().map_err(Error::from)?;
1389        {
1390            let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1391            table
1392                .insert((primary_namespace, secondary_namespace, key), value)
1393                .map_err(Error::from)?;
1394        }
1395        write_txn.commit().map_err(Error::from)?;
1396
1397        Ok(())
1398    }
1399
1400    #[instrument(skip(self))]
1401    async fn kv_read(
1402        &self,
1403        primary_namespace: &str,
1404        secondary_namespace: &str,
1405        key: &str,
1406    ) -> Result<Option<Vec<u8>>, database::Error> {
1407        // Validate parameters according to KV store requirements
1408        validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1409
1410        let read_txn = self.db.begin_read().map_err(Error::from)?;
1411        let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1412
1413        let result = table
1414            .get((primary_namespace, secondary_namespace, key))
1415            .map_err(Error::from)?
1416            .map(|v| v.value().to_vec());
1417
1418        Ok(result)
1419    }
1420
1421    #[instrument(skip(self))]
1422    async fn kv_list(
1423        &self,
1424        primary_namespace: &str,
1425        secondary_namespace: &str,
1426    ) -> Result<Vec<String>, database::Error> {
1427        // Validate parameters according to KV store requirements
1428        validate_kvstore_params(primary_namespace, secondary_namespace, None)?;
1429
1430        let read_txn = self.db.begin_read().map_err(Error::from)?;
1431        let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1432
1433        let start = (primary_namespace, secondary_namespace, "");
1434        let iter = table.range(start..).map_err(Error::from)?;
1435
1436        let mut keys = Vec::new();
1437
1438        for item in iter {
1439            let (key, _) = item.map_err(Error::from)?;
1440            let (p, s, k) = key.value();
1441            if p == primary_namespace && s == secondary_namespace {
1442                keys.push(k.to_string());
1443            } else {
1444                break;
1445            }
1446        }
1447
1448        Ok(keys)
1449    }
1450
1451    #[instrument(skip(self))]
1452    async fn kv_remove(
1453        &self,
1454        primary_namespace: &str,
1455        secondary_namespace: &str,
1456        key: &str,
1457    ) -> Result<(), database::Error> {
1458        // Validate parameters according to KV store requirements
1459        validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
1460
1461        let write_txn = self.db.begin_write().map_err(Error::from)?;
1462        {
1463            let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
1464            table
1465                .remove((primary_namespace, secondary_namespace, key))
1466                .map_err(Error::from)?;
1467        }
1468        write_txn.commit().map_err(Error::from)?;
1469
1470        Ok(())
1471    }
1472}
1473
1474#[cfg(test)]
1475mod test {
1476    use std::path::PathBuf;
1477
1478    use cdk_common::wallet_db_test;
1479
1480    use super::WalletRedbDatabase;
1481
1482    async fn provide_db(test_id: String) -> WalletRedbDatabase {
1483        let path = PathBuf::from(format!("/tmp/cdk-test-{}.redb", test_id));
1484        WalletRedbDatabase::new(&path).expect("database")
1485    }
1486
1487    wallet_db_test!(provide_db);
1488}