nym_credential_proxy_lib/storage/
mod.rs

1// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: GPL-3.0-only
3
4use crate::deposits_buffer::helpers::{BufferedDeposit, PerformedDeposits};
5use crate::error::CredentialProxyError;
6use crate::storage::manager::SqliteStorageManager;
7use crate::storage::models::{BlindedShares, MinimalWalletShare};
8use nym_compact_ecash::PublicKeyUser;
9use nym_credentials::ecash::bandwidth::serialiser::VersionedSerialise;
10use nym_credentials::{
11    AggregatedCoinIndicesSignatures, AggregatedExpirationDateSignatures, EpochVerificationKey,
12};
13use nym_validator_client::ecash::BlindedSignatureResponse;
14use nym_validator_client::nym_api::EpochId;
15use nym_validator_client::nyxd::contract_traits::ecash_query_client::DepositId;
16use sqlx::ConnectOptions;
17use sqlx::sqlite::{SqliteAutoVacuum, SqliteSynchronous};
18use std::fmt::Debug;
19use std::path::Path;
20use std::time::Duration;
21use time::{Date, OffsetDateTime};
22use tracing::log::LevelFilter;
23use tracing::{debug, error, info, instrument};
24use uuid::Uuid;
25
26mod manager;
27pub mod models;
28pub(crate) mod pruner;
29pub mod traits;
30
31// TODO: proper import
32type NodeId = u64;
33
34#[derive(Clone)]
35pub struct CredentialProxyStorage {
36    pub(crate) storage_manager: SqliteStorageManager,
37}
38
39impl CredentialProxyStorage {
40    #[instrument]
41    pub async fn init<P: AsRef<Path> + Debug>(
42        database_path: P,
43    ) -> Result<Self, CredentialProxyError> {
44        debug!("Attempting to connect to database");
45
46        let opts = sqlx::sqlite::SqliteConnectOptions::new()
47            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
48            .synchronous(SqliteSynchronous::Normal)
49            .auto_vacuum(SqliteAutoVacuum::Incremental)
50            .filename(database_path)
51            .create_if_missing(true)
52            .log_statements(LevelFilter::Trace)
53            .log_slow_statements(LevelFilter::Warn, Duration::from_millis(250));
54
55        let pool_opts = sqlx::sqlite::SqlitePoolOptions::new()
56            .min_connections(5)
57            .max_connections(25)
58            .acquire_timeout(Duration::from_secs(60));
59
60        let connection_pool = match pool_opts.connect_with(opts).await {
61            Ok(db) => db,
62            Err(err) => {
63                error!("Failed to connect to SQLx database: {err}");
64                return Err(err.into());
65            }
66        };
67
68        if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
69            error!("Failed to initialize SQLx database: {err}");
70            return Err(err.into());
71        }
72
73        info!("Database migration finished!");
74
75        Ok(CredentialProxyStorage {
76            storage_manager: SqliteStorageManager { connection_pool },
77        })
78    }
79
80    #[allow(dead_code)]
81    pub async fn load_blinded_shares_status_by_shares_id(
82        &self,
83        id: i64,
84    ) -> Result<Option<BlindedShares>, CredentialProxyError> {
85        Ok(self
86            .storage_manager
87            .load_blinded_shares_status_by_shares_id(id)
88            .await?)
89    }
90
91    pub async fn load_wallet_shares_by_shares_id(
92        &self,
93        id: i64,
94    ) -> Result<Vec<MinimalWalletShare>, CredentialProxyError> {
95        Ok(self
96            .storage_manager
97            .load_wallet_shares_by_shares_id(id)
98            .await?)
99    }
100
101    pub async fn load_shares_error_by_shares_id(
102        &self,
103        id: i64,
104    ) -> Result<Option<String>, CredentialProxyError> {
105        Ok(self
106            .storage_manager
107            .load_shares_error_by_device_by_shares_id(id)
108            .await?)
109    }
110
111    #[allow(dead_code)]
112    pub async fn load_blinded_shares_status_by_device_and_credential_id(
113        &self,
114        device_id: &str,
115        credential_id: &str,
116    ) -> Result<Option<BlindedShares>, CredentialProxyError> {
117        Ok(self
118            .storage_manager
119            .load_blinded_shares_status_by_device_and_credential_id(device_id, credential_id)
120            .await?)
121    }
122
123    pub async fn load_wallet_shares_by_device_and_credential_id(
124        &self,
125        device_id: &str,
126        credential_id: &str,
127    ) -> Result<Vec<MinimalWalletShare>, CredentialProxyError> {
128        Ok(self
129            .storage_manager
130            .load_wallet_shares_by_device_and_credential_id(device_id, credential_id)
131            .await?)
132    }
133
134    pub async fn load_shares_error_by_device_and_credential_id(
135        &self,
136        device_id: &str,
137        credential_id: &str,
138    ) -> Result<Option<String>, CredentialProxyError> {
139        Ok(self
140            .storage_manager
141            .load_shares_error_by_device_and_credential_id(device_id, credential_id)
142            .await?)
143    }
144
145    pub async fn insert_new_pending_async_shares_request(
146        &self,
147        request: Uuid,
148        device_id: &str,
149        credential_id: &str,
150    ) -> Result<BlindedShares, CredentialProxyError> {
151        Ok(self
152            .storage_manager
153            .insert_new_pending_async_shares_request(request.to_string(), device_id, credential_id)
154            .await?)
155    }
156
157    pub async fn update_pending_async_blinded_shares_issued(
158        &self,
159        available_shares: usize,
160        device_id: &str,
161        credential_id: &str,
162    ) -> Result<BlindedShares, CredentialProxyError> {
163        Ok(self
164            .storage_manager
165            .update_pending_async_blinded_shares_issued(
166                available_shares as i64,
167                device_id,
168                credential_id,
169            )
170            .await?)
171    }
172
173    pub async fn update_pending_async_blinded_shares_error(
174        &self,
175        available_shares: usize,
176        device_id: &str,
177        credential_id: &str,
178        error: &str,
179    ) -> Result<BlindedShares, CredentialProxyError> {
180        Ok(self
181            .storage_manager
182            .update_pending_async_blinded_shares_error(
183                available_shares as i64,
184                device_id,
185                credential_id,
186                error,
187            )
188            .await?)
189    }
190
191    pub async fn prune_old_blinded_shares(&self) -> Result<(), CredentialProxyError> {
192        let max_age = OffsetDateTime::now_utc() - time::Duration::days(31);
193
194        self.storage_manager
195            .prune_old_partial_blinded_wallets(max_age)
196            .await?;
197        self.storage_manager
198            .prune_old_partial_blinded_wallet_failures(max_age)
199            .await?;
200        self.storage_manager
201            .prune_old_blinded_shares(max_age)
202            .await?;
203        Ok(())
204    }
205
206    pub async fn insert_new_deposits(
207        &self,
208        deposits: &PerformedDeposits,
209    ) -> Result<(), CredentialProxyError> {
210        debug!("inserting {} deposits data", deposits.deposits_data.len());
211
212        self.storage_manager
213            .insert_new_deposits(deposits.to_storable())
214            .await?;
215        Ok(())
216    }
217
218    pub async fn load_unused_deposits(&self) -> Result<Vec<BufferedDeposit>, CredentialProxyError> {
219        self.storage_manager
220            .load_unused_deposits()
221            .await?
222            .into_iter()
223            .map(|deposit| deposit.try_into())
224            .collect()
225    }
226
227    pub async fn insert_deposit_usage(
228        &self,
229        deposit_id: DepositId,
230        requested_on: OffsetDateTime,
231        client_pubkey: PublicKeyUser,
232        request_uuid: Uuid,
233    ) -> Result<(), CredentialProxyError> {
234        self.storage_manager
235            .insert_deposit_usage(
236                deposit_id,
237                requested_on,
238                client_pubkey.to_bytes(),
239                request_uuid.to_string(),
240            )
241            .await?;
242        Ok(())
243    }
244
245    pub async fn insert_deposit_usage_error(
246        &self,
247        deposit_id: DepositId,
248        error: String,
249    ) -> Result<(), CredentialProxyError> {
250        self.storage_manager
251            .insert_deposit_usage_error(deposit_id, error)
252            .await?;
253        Ok(())
254    }
255
256    pub async fn insert_partial_wallet_share(
257        &self,
258        deposit_id: DepositId,
259        epoch_id: EpochId,
260        expiration_date: Date,
261        node_id: NodeId,
262        res: &Result<BlindedSignatureResponse, CredentialProxyError>,
263    ) -> Result<(), CredentialProxyError> {
264        debug!("inserting partial wallet share");
265        let now = OffsetDateTime::now_utc();
266
267        match res {
268            Ok(share) => {
269                self.storage_manager
270                    .insert_partial_wallet_share(
271                        deposit_id,
272                        epoch_id as i64,
273                        expiration_date,
274                        node_id as i64,
275                        now,
276                        &share.blinded_signature.to_bytes(),
277                    )
278                    .await?;
279            }
280            Err(err) => {
281                self.storage_manager
282                    .insert_partial_wallet_issuance_failure(
283                        deposit_id,
284                        epoch_id as i64,
285                        expiration_date,
286                        node_id as i64,
287                        now,
288                        err.to_string(),
289                    )
290                    .await?
291            }
292        }
293        Ok(())
294    }
295
296    pub async fn get_master_verification_key(
297        &self,
298        epoch_id: EpochId,
299    ) -> Result<Option<EpochVerificationKey>, CredentialProxyError> {
300        let Some(raw) = self
301            .storage_manager
302            .get_master_verification_key(epoch_id as i64)
303            .await?
304        else {
305            return Ok(None);
306        };
307
308        let deserialised =
309            EpochVerificationKey::try_unpack(&raw.serialised_key, raw.serialization_revision)
310                .map_err(|err| CredentialProxyError::database_inconsistency(err.to_string()))?;
311        Ok(Some(deserialised))
312    }
313
314    pub async fn insert_master_verification_key(
315        &self,
316        key: &EpochVerificationKey,
317    ) -> Result<(), CredentialProxyError> {
318        let packed = key.pack();
319        Ok(self
320            .storage_manager
321            .insert_master_verification_key(packed.revision, key.epoch_id as i64, &packed.data)
322            .await?)
323    }
324
325    pub async fn get_master_coin_index_signatures(
326        &self,
327        epoch_id: EpochId,
328    ) -> Result<Option<AggregatedCoinIndicesSignatures>, CredentialProxyError> {
329        let Some(raw) = self
330            .storage_manager
331            .get_master_coin_index_signatures(epoch_id as i64)
332            .await?
333        else {
334            return Ok(None);
335        };
336
337        let deserialised = AggregatedCoinIndicesSignatures::try_unpack(
338            &raw.serialised_signatures,
339            raw.serialization_revision,
340        )
341        .map_err(|err| CredentialProxyError::database_inconsistency(err.to_string()))?;
342        Ok(Some(deserialised))
343    }
344
345    pub async fn insert_master_coin_index_signatures(
346        &self,
347        signatures: &AggregatedCoinIndicesSignatures,
348    ) -> Result<(), CredentialProxyError> {
349        let packed = signatures.pack();
350        self.storage_manager
351            .insert_master_coin_index_signatures(
352                packed.revision,
353                signatures.epoch_id as i64,
354                &packed.data,
355            )
356            .await?;
357        Ok(())
358    }
359
360    pub async fn get_master_expiration_date_signatures(
361        &self,
362        expiration_date: Date,
363        epoch_id: EpochId,
364    ) -> Result<Option<AggregatedExpirationDateSignatures>, CredentialProxyError> {
365        let Some(raw) = self
366            .storage_manager
367            .get_master_expiration_date_signatures(expiration_date, epoch_id as i64)
368            .await?
369        else {
370            return Ok(None);
371        };
372
373        let deserialised = AggregatedExpirationDateSignatures::try_unpack(
374            &raw.serialised_signatures,
375            raw.serialization_revision,
376        )
377        .map_err(|err| CredentialProxyError::database_inconsistency(err.to_string()))?;
378        Ok(Some(deserialised))
379    }
380
381    pub async fn insert_master_expiration_date_signatures(
382        &self,
383        signatures: &AggregatedExpirationDateSignatures,
384    ) -> Result<(), CredentialProxyError> {
385        let packed = signatures.pack();
386        self.storage_manager
387            .insert_master_expiration_date_signatures(
388                packed.revision,
389                signatures.epoch_id as i64,
390                signatures.expiration_date,
391                &packed.data,
392            )
393            .await?;
394        Ok(())
395    }
396}
397
398#[allow(clippy::expect_used)]
399#[allow(clippy::unwrap_used)]
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use crate::helpers::random_uuid;
404    use crate::storage::models::BlindedSharesStatus;
405    use nym_compact_ecash::scheme::keygen::KeyPairUser;
406    use nym_crypto::asymmetric::ed25519;
407    use nym_validator_client::nyxd::{Coin, Hash};
408    use rand::RngCore;
409    use rand::rngs::OsRng;
410    use std::ops::Deref;
411    use tempfile::{NamedTempFile, TempPath};
412
413    // create the wrapper so the underlying file gets deleted when it's no longer needed
414    struct StorageTestWrapper {
415        inner: CredentialProxyStorage,
416        _path: TempPath,
417    }
418
419    impl StorageTestWrapper {
420        async fn new() -> anyhow::Result<Self> {
421            let file = NamedTempFile::new()?;
422            let path = file.into_temp_path();
423
424            println!("Creating database at {path:?}...");
425
426            Ok(StorageTestWrapper {
427                inner: CredentialProxyStorage::init(&path).await?,
428                _path: path,
429            })
430        }
431
432        async fn insert_dummy_used_deposit(&self, uuid: Uuid) -> anyhow::Result<DepositId> {
433            let mut rng = OsRng;
434            let deposit_id = rng.next_u32();
435            let tx_hash = Hash::Sha256(Default::default());
436            let requested_on = OffsetDateTime::now_utc();
437            let deposit_amount = Coin::new(1, "ufoomp");
438            let client_keypair = KeyPairUser::new();
439            let client_ecash_pubkey = &client_keypair.public_key();
440
441            let deposit_key = ed25519::PrivateKey::new(&mut rng);
442
443            self.inner
444                .insert_new_deposits(&PerformedDeposits {
445                    deposits_data: vec![BufferedDeposit {
446                        deposit_id,
447                        ed25519_private_key: deposit_key,
448                    }],
449                    tx_hash,
450                    requested_on,
451                    deposit_amount,
452                })
453                .await?;
454            self.inner
455                .insert_deposit_usage(deposit_id, requested_on, *client_ecash_pubkey, uuid)
456                .await?;
457
458            Ok(deposit_id)
459        }
460    }
461
462    impl Deref for StorageTestWrapper {
463        type Target = CredentialProxyStorage;
464        fn deref(&self) -> &Self::Target {
465            &self.inner
466        }
467    }
468
469    async fn get_storage() -> anyhow::Result<StorageTestWrapper> {
470        StorageTestWrapper::new().await
471    }
472
473    #[tokio::test]
474    async fn test_creation() -> anyhow::Result<()> {
475        let storage = get_storage().await;
476        assert!(storage.is_ok());
477
478        Ok(())
479    }
480
481    #[tokio::test]
482    async fn test_add() -> anyhow::Result<()> {
483        let storage = get_storage().await?;
484
485        let dummy_uuid = random_uuid();
486        println!("🚀 insert_pending_blinded_share...");
487
488        storage.insert_dummy_used_deposit(dummy_uuid).await?;
489        let res = storage
490            .insert_new_pending_async_shares_request(dummy_uuid, "1234", "1234")
491            .await;
492        if let Err(e) = &res {
493            println!("❌ {e}");
494        }
495        assert!(res.is_ok());
496        let res = res?;
497        println!("res = {res:?}");
498        assert_eq!(res.status, BlindedSharesStatus::Pending);
499
500        println!("🚀 update_pending_blinded_share_error...");
501        let res = storage
502            .update_pending_async_blinded_shares_error(0, "1234", "1234", "this is an error")
503            .await;
504        if let Err(e) = &res {
505            println!("❌ {e}");
506        }
507        assert!(res.is_ok());
508        let res = res?;
509        println!("res = {res:?}");
510        assert!(res.error_message.is_some());
511        assert_eq!(res.status, BlindedSharesStatus::Error);
512
513        println!("🚀 update_pending_blinded_share_data...");
514        let res = storage
515            .update_pending_async_blinded_shares_issued(42, "1234", "1234")
516            .await;
517        if let Err(e) = &res {
518            println!("❌ {e}");
519        }
520        assert!(res.is_ok());
521        let res = res?;
522        println!("res = {res:?}");
523        assert_eq!(res.status, BlindedSharesStatus::Issued);
524        assert!(res.error_message.is_none());
525
526        Ok(())
527    }
528}